123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172 |
- package metrics
- import (
- "bufio"
- "fmt"
- "log"
- "net"
- "net/url"
- "strings"
- "time"
- )
- const (
- // We force flush the statsite metrics after this period of
- // inactivity. Prevents stats from getting stuck in a buffer
- // forever.
- flushInterval = 100 * time.Millisecond
- )
- // NewStatsiteSinkFromURL creates an StatsiteSink from a URL. It is used
- // (and tested) from NewMetricSinkFromURL.
- func NewStatsiteSinkFromURL(u *url.URL) (MetricSink, error) {
- return NewStatsiteSink(u.Host)
- }
- // StatsiteSink provides a MetricSink that can be used with a
- // statsite metrics server
- type StatsiteSink struct {
- addr string
- metricQueue chan string
- }
- // NewStatsiteSink is used to create a new StatsiteSink
- func NewStatsiteSink(addr string) (*StatsiteSink, error) {
- s := &StatsiteSink{
- addr: addr,
- metricQueue: make(chan string, 4096),
- }
- go s.flushMetrics()
- return s, nil
- }
- // Close is used to stop flushing to statsite
- func (s *StatsiteSink) Shutdown() {
- close(s.metricQueue)
- }
- func (s *StatsiteSink) SetGauge(key []string, val float32) {
- flatKey := s.flattenKey(key)
- s.pushMetric(fmt.Sprintf("%s:%f|g\n", flatKey, val))
- }
- func (s *StatsiteSink) SetGaugeWithLabels(key []string, val float32, labels []Label) {
- flatKey := s.flattenKeyLabels(key, labels)
- s.pushMetric(fmt.Sprintf("%s:%f|g\n", flatKey, val))
- }
- func (s *StatsiteSink) EmitKey(key []string, val float32) {
- flatKey := s.flattenKey(key)
- s.pushMetric(fmt.Sprintf("%s:%f|kv\n", flatKey, val))
- }
- func (s *StatsiteSink) IncrCounter(key []string, val float32) {
- flatKey := s.flattenKey(key)
- s.pushMetric(fmt.Sprintf("%s:%f|c\n", flatKey, val))
- }
- func (s *StatsiteSink) IncrCounterWithLabels(key []string, val float32, labels []Label) {
- flatKey := s.flattenKeyLabels(key, labels)
- s.pushMetric(fmt.Sprintf("%s:%f|c\n", flatKey, val))
- }
- func (s *StatsiteSink) AddSample(key []string, val float32) {
- flatKey := s.flattenKey(key)
- s.pushMetric(fmt.Sprintf("%s:%f|ms\n", flatKey, val))
- }
- func (s *StatsiteSink) AddSampleWithLabels(key []string, val float32, labels []Label) {
- flatKey := s.flattenKeyLabels(key, labels)
- s.pushMetric(fmt.Sprintf("%s:%f|ms\n", flatKey, val))
- }
- // Flattens the key for formatting, removes spaces
- func (s *StatsiteSink) flattenKey(parts []string) string {
- joined := strings.Join(parts, ".")
- return strings.Map(func(r rune) rune {
- switch r {
- case ':':
- fallthrough
- case ' ':
- return '_'
- default:
- return r
- }
- }, joined)
- }
- // Flattens the key along with labels for formatting, removes spaces
- func (s *StatsiteSink) flattenKeyLabels(parts []string, labels []Label) string {
- for _, label := range labels {
- parts = append(parts, label.Value)
- }
- return s.flattenKey(parts)
- }
- // Does a non-blocking push to the metrics queue
- func (s *StatsiteSink) pushMetric(m string) {
- select {
- case s.metricQueue <- m:
- default:
- }
- }
- // Flushes metrics
- func (s *StatsiteSink) flushMetrics() {
- var sock net.Conn
- var err error
- var wait <-chan time.Time
- var buffered *bufio.Writer
- ticker := time.NewTicker(flushInterval)
- defer ticker.Stop()
- CONNECT:
- // Attempt to connect
- sock, err = net.Dial("tcp", s.addr)
- if err != nil {
- log.Printf("[ERR] Error connecting to statsite! Err: %s", err)
- goto WAIT
- }
- // Create a buffered writer
- buffered = bufio.NewWriter(sock)
- for {
- select {
- case metric, ok := <-s.metricQueue:
- // Get a metric from the queue
- if !ok {
- goto QUIT
- }
- // Try to send to statsite
- _, err := buffered.Write([]byte(metric))
- if err != nil {
- log.Printf("[ERR] Error writing to statsite! Err: %s", err)
- goto WAIT
- }
- case <-ticker.C:
- if err := buffered.Flush(); err != nil {
- log.Printf("[ERR] Error flushing to statsite! Err: %s", err)
- goto WAIT
- }
- }
- }
- WAIT:
- // Wait for a while
- wait = time.After(time.Duration(5) * time.Second)
- for {
- select {
- // Dequeue the messages to avoid backlog
- case _, ok := <-s.metricQueue:
- if !ok {
- goto QUIT
- }
- case <-wait:
- goto CONNECT
- }
- }
- QUIT:
- s.metricQueue = nil
- }
|