inmem_signal.go 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117
  1. package metrics
  2. import (
  3. "bytes"
  4. "fmt"
  5. "io"
  6. "os"
  7. "os/signal"
  8. "strings"
  9. "sync"
  10. "syscall"
  11. )
  12. // InmemSignal is used to listen for a given signal, and when received,
  13. // to dump the current metrics from the InmemSink to an io.Writer
  14. type InmemSignal struct {
  15. signal syscall.Signal
  16. inm *InmemSink
  17. w io.Writer
  18. sigCh chan os.Signal
  19. stop bool
  20. stopCh chan struct{}
  21. stopLock sync.Mutex
  22. }
  23. // NewInmemSignal creates a new InmemSignal which listens for a given signal,
  24. // and dumps the current metrics out to a writer
  25. func NewInmemSignal(inmem *InmemSink, sig syscall.Signal, w io.Writer) *InmemSignal {
  26. i := &InmemSignal{
  27. signal: sig,
  28. inm: inmem,
  29. w: w,
  30. sigCh: make(chan os.Signal, 1),
  31. stopCh: make(chan struct{}),
  32. }
  33. signal.Notify(i.sigCh, sig)
  34. go i.run()
  35. return i
  36. }
  37. // DefaultInmemSignal returns a new InmemSignal that responds to SIGUSR1
  38. // and writes output to stderr. Windows uses SIGBREAK
  39. func DefaultInmemSignal(inmem *InmemSink) *InmemSignal {
  40. return NewInmemSignal(inmem, DefaultSignal, os.Stderr)
  41. }
  42. // Stop is used to stop the InmemSignal from listening
  43. func (i *InmemSignal) Stop() {
  44. i.stopLock.Lock()
  45. defer i.stopLock.Unlock()
  46. if i.stop {
  47. return
  48. }
  49. i.stop = true
  50. close(i.stopCh)
  51. signal.Stop(i.sigCh)
  52. }
  53. // run is a long running routine that handles signals
  54. func (i *InmemSignal) run() {
  55. for {
  56. select {
  57. case <-i.sigCh:
  58. i.dumpStats()
  59. case <-i.stopCh:
  60. return
  61. }
  62. }
  63. }
  64. // dumpStats is used to dump the data to output writer
  65. func (i *InmemSignal) dumpStats() {
  66. buf := bytes.NewBuffer(nil)
  67. data := i.inm.Data()
  68. // Skip the last period which is still being aggregated
  69. for j := 0; j < len(data)-1; j++ {
  70. intv := data[j]
  71. intv.RLock()
  72. for _, val := range intv.Gauges {
  73. name := i.flattenLabels(val.Name, val.Labels)
  74. fmt.Fprintf(buf, "[%v][G] '%s': %0.3f\n", intv.Interval, name, val.Value)
  75. }
  76. for name, vals := range intv.Points {
  77. for _, val := range vals {
  78. fmt.Fprintf(buf, "[%v][P] '%s': %0.3f\n", intv.Interval, name, val)
  79. }
  80. }
  81. for _, agg := range intv.Counters {
  82. name := i.flattenLabels(agg.Name, agg.Labels)
  83. fmt.Fprintf(buf, "[%v][C] '%s': %s\n", intv.Interval, name, agg.AggregateSample)
  84. }
  85. for _, agg := range intv.Samples {
  86. name := i.flattenLabels(agg.Name, agg.Labels)
  87. fmt.Fprintf(buf, "[%v][S] '%s': %s\n", intv.Interval, name, agg.AggregateSample)
  88. }
  89. intv.RUnlock()
  90. }
  91. // Write out the bytes
  92. i.w.Write(buf.Bytes())
  93. }
  94. // Flattens the key for formatting along with its labels, removes spaces
  95. func (i *InmemSignal) flattenLabels(name string, labels []Label) string {
  96. buf := bytes.NewBufferString(name)
  97. replacer := strings.NewReplacer(" ", "_", ":", "_")
  98. for _, label := range labels {
  99. replacer.WriteString(buf, ".")
  100. replacer.WriteString(buf, label.Value)
  101. }
  102. return buf.String()
  103. }