sink.go 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115
  1. package metrics
  2. import (
  3. "fmt"
  4. "net/url"
  5. )
  6. // The MetricSink interface is used to transmit metrics information
  7. // to an external system
  8. type MetricSink interface {
  9. // A Gauge should retain the last value it is set to
  10. SetGauge(key []string, val float32)
  11. SetGaugeWithLabels(key []string, val float32, labels []Label)
  12. // Should emit a Key/Value pair for each call
  13. EmitKey(key []string, val float32)
  14. // Counters should accumulate values
  15. IncrCounter(key []string, val float32)
  16. IncrCounterWithLabels(key []string, val float32, labels []Label)
  17. // Samples are for timing information, where quantiles are used
  18. AddSample(key []string, val float32)
  19. AddSampleWithLabels(key []string, val float32, labels []Label)
  20. }
  21. // BlackholeSink is used to just blackhole messages
  22. type BlackholeSink struct{}
  23. func (*BlackholeSink) SetGauge(key []string, val float32) {}
  24. func (*BlackholeSink) SetGaugeWithLabels(key []string, val float32, labels []Label) {}
  25. func (*BlackholeSink) EmitKey(key []string, val float32) {}
  26. func (*BlackholeSink) IncrCounter(key []string, val float32) {}
  27. func (*BlackholeSink) IncrCounterWithLabels(key []string, val float32, labels []Label) {}
  28. func (*BlackholeSink) AddSample(key []string, val float32) {}
  29. func (*BlackholeSink) AddSampleWithLabels(key []string, val float32, labels []Label) {}
  30. // FanoutSink is used to sink to fanout values to multiple sinks
  31. type FanoutSink []MetricSink
  32. func (fh FanoutSink) SetGauge(key []string, val float32) {
  33. fh.SetGaugeWithLabels(key, val, nil)
  34. }
  35. func (fh FanoutSink) SetGaugeWithLabels(key []string, val float32, labels []Label) {
  36. for _, s := range fh {
  37. s.SetGaugeWithLabels(key, val, labels)
  38. }
  39. }
  40. func (fh FanoutSink) EmitKey(key []string, val float32) {
  41. for _, s := range fh {
  42. s.EmitKey(key, val)
  43. }
  44. }
  45. func (fh FanoutSink) IncrCounter(key []string, val float32) {
  46. fh.IncrCounterWithLabels(key, val, nil)
  47. }
  48. func (fh FanoutSink) IncrCounterWithLabels(key []string, val float32, labels []Label) {
  49. for _, s := range fh {
  50. s.IncrCounterWithLabels(key, val, labels)
  51. }
  52. }
  53. func (fh FanoutSink) AddSample(key []string, val float32) {
  54. fh.AddSampleWithLabels(key, val, nil)
  55. }
  56. func (fh FanoutSink) AddSampleWithLabels(key []string, val float32, labels []Label) {
  57. for _, s := range fh {
  58. s.AddSampleWithLabels(key, val, labels)
  59. }
  60. }
  61. // sinkURLFactoryFunc is an generic interface around the *SinkFromURL() function provided
  62. // by each sink type
  63. type sinkURLFactoryFunc func(*url.URL) (MetricSink, error)
  64. // sinkRegistry supports the generic NewMetricSink function by mapping URL
  65. // schemes to metric sink factory functions
  66. var sinkRegistry = map[string]sinkURLFactoryFunc{
  67. "statsd": NewStatsdSinkFromURL,
  68. "statsite": NewStatsiteSinkFromURL,
  69. "inmem": NewInmemSinkFromURL,
  70. }
  71. // NewMetricSinkFromURL allows a generic URL input to configure any of the
  72. // supported sinks. The scheme of the URL identifies the type of the sink, the
  73. // and query parameters are used to set options.
  74. //
  75. // "statsd://" - Initializes a StatsdSink. The host and port are passed through
  76. // as the "addr" of the sink
  77. //
  78. // "statsite://" - Initializes a StatsiteSink. The host and port become the
  79. // "addr" of the sink
  80. //
  81. // "inmem://" - Initializes an InmemSink. The host and port are ignored. The
  82. // "interval" and "duration" query parameters must be specified with valid
  83. // durations, see NewInmemSink for details.
  84. func NewMetricSinkFromURL(urlStr string) (MetricSink, error) {
  85. u, err := url.Parse(urlStr)
  86. if err != nil {
  87. return nil, err
  88. }
  89. sinkURLFactoryFunc := sinkRegistry[u.Scheme]
  90. if sinkURLFactoryFunc == nil {
  91. return nil, fmt.Errorf(
  92. "cannot create metric sink, unrecognized sink name: %q", u.Scheme)
  93. }
  94. return sinkURLFactoryFunc(u)
  95. }