statsite.go 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172
  1. package metrics
  2. import (
  3. "bufio"
  4. "fmt"
  5. "log"
  6. "net"
  7. "net/url"
  8. "strings"
  9. "time"
  10. )
  11. const (
  12. // We force flush the statsite metrics after this period of
  13. // inactivity. Prevents stats from getting stuck in a buffer
  14. // forever.
  15. flushInterval = 100 * time.Millisecond
  16. )
  17. // NewStatsiteSinkFromURL creates an StatsiteSink from a URL. It is used
  18. // (and tested) from NewMetricSinkFromURL.
  19. func NewStatsiteSinkFromURL(u *url.URL) (MetricSink, error) {
  20. return NewStatsiteSink(u.Host)
  21. }
  22. // StatsiteSink provides a MetricSink that can be used with a
  23. // statsite metrics server
  24. type StatsiteSink struct {
  25. addr string
  26. metricQueue chan string
  27. }
  28. // NewStatsiteSink is used to create a new StatsiteSink
  29. func NewStatsiteSink(addr string) (*StatsiteSink, error) {
  30. s := &StatsiteSink{
  31. addr: addr,
  32. metricQueue: make(chan string, 4096),
  33. }
  34. go s.flushMetrics()
  35. return s, nil
  36. }
  37. // Close is used to stop flushing to statsite
  38. func (s *StatsiteSink) Shutdown() {
  39. close(s.metricQueue)
  40. }
  41. func (s *StatsiteSink) SetGauge(key []string, val float32) {
  42. flatKey := s.flattenKey(key)
  43. s.pushMetric(fmt.Sprintf("%s:%f|g\n", flatKey, val))
  44. }
  45. func (s *StatsiteSink) SetGaugeWithLabels(key []string, val float32, labels []Label) {
  46. flatKey := s.flattenKeyLabels(key, labels)
  47. s.pushMetric(fmt.Sprintf("%s:%f|g\n", flatKey, val))
  48. }
  49. func (s *StatsiteSink) EmitKey(key []string, val float32) {
  50. flatKey := s.flattenKey(key)
  51. s.pushMetric(fmt.Sprintf("%s:%f|kv\n", flatKey, val))
  52. }
  53. func (s *StatsiteSink) IncrCounter(key []string, val float32) {
  54. flatKey := s.flattenKey(key)
  55. s.pushMetric(fmt.Sprintf("%s:%f|c\n", flatKey, val))
  56. }
  57. func (s *StatsiteSink) IncrCounterWithLabels(key []string, val float32, labels []Label) {
  58. flatKey := s.flattenKeyLabels(key, labels)
  59. s.pushMetric(fmt.Sprintf("%s:%f|c\n", flatKey, val))
  60. }
  61. func (s *StatsiteSink) AddSample(key []string, val float32) {
  62. flatKey := s.flattenKey(key)
  63. s.pushMetric(fmt.Sprintf("%s:%f|ms\n", flatKey, val))
  64. }
  65. func (s *StatsiteSink) AddSampleWithLabels(key []string, val float32, labels []Label) {
  66. flatKey := s.flattenKeyLabels(key, labels)
  67. s.pushMetric(fmt.Sprintf("%s:%f|ms\n", flatKey, val))
  68. }
  69. // Flattens the key for formatting, removes spaces
  70. func (s *StatsiteSink) flattenKey(parts []string) string {
  71. joined := strings.Join(parts, ".")
  72. return strings.Map(func(r rune) rune {
  73. switch r {
  74. case ':':
  75. fallthrough
  76. case ' ':
  77. return '_'
  78. default:
  79. return r
  80. }
  81. }, joined)
  82. }
  83. // Flattens the key along with labels for formatting, removes spaces
  84. func (s *StatsiteSink) flattenKeyLabels(parts []string, labels []Label) string {
  85. for _, label := range labels {
  86. parts = append(parts, label.Value)
  87. }
  88. return s.flattenKey(parts)
  89. }
  90. // Does a non-blocking push to the metrics queue
  91. func (s *StatsiteSink) pushMetric(m string) {
  92. select {
  93. case s.metricQueue <- m:
  94. default:
  95. }
  96. }
  97. // Flushes metrics
  98. func (s *StatsiteSink) flushMetrics() {
  99. var sock net.Conn
  100. var err error
  101. var wait <-chan time.Time
  102. var buffered *bufio.Writer
  103. ticker := time.NewTicker(flushInterval)
  104. defer ticker.Stop()
  105. CONNECT:
  106. // Attempt to connect
  107. sock, err = net.Dial("tcp", s.addr)
  108. if err != nil {
  109. log.Printf("[ERR] Error connecting to statsite! Err: %s", err)
  110. goto WAIT
  111. }
  112. // Create a buffered writer
  113. buffered = bufio.NewWriter(sock)
  114. for {
  115. select {
  116. case metric, ok := <-s.metricQueue:
  117. // Get a metric from the queue
  118. if !ok {
  119. goto QUIT
  120. }
  121. // Try to send to statsite
  122. _, err := buffered.Write([]byte(metric))
  123. if err != nil {
  124. log.Printf("[ERR] Error writing to statsite! Err: %s", err)
  125. goto WAIT
  126. }
  127. case <-ticker.C:
  128. if err := buffered.Flush(); err != nil {
  129. log.Printf("[ERR] Error flushing to statsite! Err: %s", err)
  130. goto WAIT
  131. }
  132. }
  133. }
  134. WAIT:
  135. // Wait for a while
  136. wait = time.After(time.Duration(5) * time.Second)
  137. for {
  138. select {
  139. // Dequeue the messages to avoid backlog
  140. case _, ok := <-s.metricQueue:
  141. if !ok {
  142. goto QUIT
  143. }
  144. case <-wait:
  145. goto CONNECT
  146. }
  147. }
  148. QUIT:
  149. s.metricQueue = nil
  150. }