|
- package metrics
- import (
- "bytes"
- "fmt"
- "math"
- "net/url"
- "strings"
- "sync"
- "time"
- )
- // InmemSink provides a MetricSink that does in-memory aggregation
- // without sending metrics over a network. It can be embedded within
- // an application to provide profiling information.
- type InmemSink struct {
- // How long is each aggregation interval
- interval time.Duration
- // Retain controls how many metrics interval we keep
- retain time.Duration
- // maxIntervals is the maximum length of intervals.
- // It is retain / interval.
- maxIntervals int
- // intervals is a slice of the retained intervals
- intervals []*IntervalMetrics
- intervalLock sync.RWMutex
- rateDenom float64
- }
- // IntervalMetrics stores the aggregated metrics
- // for a specific interval
- type IntervalMetrics struct {
- sync.RWMutex
- // The start time of the interval
- Interval time.Time
- // Gauges maps the key to the last set value
- Gauges map[string]GaugeValue
- // Points maps the string to the list of emitted values
- // from EmitKey
- Points map[string][]float32
- // Counters maps the string key to a sum of the counter
- // values
- Counters map[string]SampledValue
- // Samples maps the key to an AggregateSample,
- // which has the rolled up view of a sample
- Samples map[string]SampledValue
- }
- // NewIntervalMetrics creates a new IntervalMetrics for a given interval
- func NewIntervalMetrics(intv time.Time) *IntervalMetrics {
- return &IntervalMetrics{
- Interval: intv,
- Gauges: make(map[string]GaugeValue),
- Points: make(map[string][]float32),
- Counters: make(map[string]SampledValue),
- Samples: make(map[string]SampledValue),
- }
- }
- // AggregateSample is used to hold aggregate metrics
- // about a sample
- type AggregateSample struct {
- Count int // The count of emitted pairs
- Rate float64 // The values rate per time unit (usually 1 second)
- Sum float64 // The sum of values
- SumSq float64 `json:"-"` // The sum of squared values
- Min float64 // Minimum value
- Max float64 // Maximum value
- LastUpdated time.Time `json:"-"` // When value was last updated
- }
- // Computes a Stddev of the values
- func (a *AggregateSample) Stddev() float64 {
- num := (float64(a.Count) * a.SumSq) - math.Pow(a.Sum, 2)
- div := float64(a.Count * (a.Count - 1))
- if div == 0 {
- return 0
- }
- return math.Sqrt(num / div)
- }
- // Computes a mean of the values
- func (a *AggregateSample) Mean() float64 {
- if a.Count == 0 {
- return 0
- }
- return a.Sum / float64(a.Count)
- }
- // Ingest is used to update a sample
- func (a *AggregateSample) Ingest(v float64, rateDenom float64) {
- a.Count++
- a.Sum += v
- a.SumSq += (v * v)
- if v < a.Min || a.Count == 1 {
- a.Min = v
- }
- if v > a.Max || a.Count == 1 {
- a.Max = v
- }
- a.Rate = float64(a.Sum) / rateDenom
- a.LastUpdated = time.Now()
- }
- func (a *AggregateSample) String() string {
- if a.Count == 0 {
- return "Count: 0"
- } else if a.Stddev() == 0 {
- return fmt.Sprintf("Count: %d Sum: %0.3f LastUpdated: %s", a.Count, a.Sum, a.LastUpdated)
- } else {
- return fmt.Sprintf("Count: %d Min: %0.3f Mean: %0.3f Max: %0.3f Stddev: %0.3f Sum: %0.3f LastUpdated: %s",
- a.Count, a.Min, a.Mean(), a.Max, a.Stddev(), a.Sum, a.LastUpdated)
- }
- }
- // NewInmemSinkFromURL creates an InmemSink from a URL. It is used
- // (and tested) from NewMetricSinkFromURL.
- func NewInmemSinkFromURL(u *url.URL) (MetricSink, error) {
- params := u.Query()
- interval, err := time.ParseDuration(params.Get("interval"))
- if err != nil {
- return nil, fmt.Errorf("Bad 'interval' param: %s", err)
- }
- retain, err := time.ParseDuration(params.Get("retain"))
- if err != nil {
- return nil, fmt.Errorf("Bad 'retain' param: %s", err)
- }
- return NewInmemSink(interval, retain), nil
- }
- // NewInmemSink is used to construct a new in-memory sink.
- // Uses an aggregation interval and maximum retention period.
- func NewInmemSink(interval, retain time.Duration) *InmemSink {
- rateTimeUnit := time.Second
- i := &InmemSink{
- interval: interval,
- retain: retain,
- maxIntervals: int(retain / interval),
- rateDenom: float64(interval.Nanoseconds()) / float64(rateTimeUnit.Nanoseconds()),
- }
- i.intervals = make([]*IntervalMetrics, 0, i.maxIntervals)
- return i
- }
- func (i *InmemSink) SetGauge(key []string, val float32) {
- i.SetGaugeWithLabels(key, val, nil)
- }
- func (i *InmemSink) SetGaugeWithLabels(key []string, val float32, labels []Label) {
- k, name := i.flattenKeyLabels(key, labels)
- intv := i.getInterval()
- intv.Lock()
- defer intv.Unlock()
- intv.Gauges[k] = GaugeValue{Name: name, Value: val, Labels: labels}
- }
- func (i *InmemSink) EmitKey(key []string, val float32) {
- k := i.flattenKey(key)
- intv := i.getInterval()
- intv.Lock()
- defer intv.Unlock()
- vals := intv.Points[k]
- intv.Points[k] = append(vals, val)
- }
- func (i *InmemSink) IncrCounter(key []string, val float32) {
- i.IncrCounterWithLabels(key, val, nil)
- }
- func (i *InmemSink) IncrCounterWithLabels(key []string, val float32, labels []Label) {
- k, name := i.flattenKeyLabels(key, labels)
- intv := i.getInterval()
- intv.Lock()
- defer intv.Unlock()
- agg, ok := intv.Counters[k]
- if !ok {
- agg = SampledValue{
- Name: name,
- AggregateSample: &AggregateSample{},
- Labels: labels,
- }
- intv.Counters[k] = agg
- }
- agg.Ingest(float64(val), i.rateDenom)
- }
- func (i *InmemSink) AddSample(key []string, val float32) {
- i.AddSampleWithLabels(key, val, nil)
- }
- func (i *InmemSink) AddSampleWithLabels(key []string, val float32, labels []Label) {
- k, name := i.flattenKeyLabels(key, labels)
- intv := i.getInterval()
- intv.Lock()
- defer intv.Unlock()
- agg, ok := intv.Samples[k]
- if !ok {
- agg = SampledValue{
- Name: name,
- AggregateSample: &AggregateSample{},
- Labels: labels,
- }
- intv.Samples[k] = agg
- }
- agg.Ingest(float64(val), i.rateDenom)
- }
- // Data is used to retrieve all the aggregated metrics
- // Intervals may be in use, and a read lock should be acquired
- func (i *InmemSink) Data() []*IntervalMetrics {
- // Get the current interval, forces creation
- i.getInterval()
- i.intervalLock.RLock()
- defer i.intervalLock.RUnlock()
- n := len(i.intervals)
- intervals := make([]*IntervalMetrics, n)
- copy(intervals[:n-1], i.intervals[:n-1])
- current := i.intervals[n-1]
- // make its own copy for current interval
- intervals[n-1] = &IntervalMetrics{}
- copyCurrent := intervals[n-1]
- current.RLock()
- *copyCurrent = *current
- copyCurrent.Gauges = make(map[string]GaugeValue, len(current.Gauges))
- for k, v := range current.Gauges {
- copyCurrent.Gauges[k] = v
- }
- // saved values will be not change, just copy its link
- copyCurrent.Points = make(map[string][]float32, len(current.Points))
- for k, v := range current.Points {
- copyCurrent.Points[k] = v
- }
- copyCurrent.Counters = make(map[string]SampledValue, len(current.Counters))
- for k, v := range current.Counters {
- copyCurrent.Counters[k] = v.deepCopy()
- }
- copyCurrent.Samples = make(map[string]SampledValue, len(current.Samples))
- for k, v := range current.Samples {
- copyCurrent.Samples[k] = v.deepCopy()
- }
- current.RUnlock()
- return intervals
- }
- func (i *InmemSink) getExistingInterval(intv time.Time) *IntervalMetrics {
- i.intervalLock.RLock()
- defer i.intervalLock.RUnlock()
- n := len(i.intervals)
- if n > 0 && i.intervals[n-1].Interval == intv {
- return i.intervals[n-1]
- }
- return nil
- }
- func (i *InmemSink) createInterval(intv time.Time) *IntervalMetrics {
- i.intervalLock.Lock()
- defer i.intervalLock.Unlock()
- // Check for an existing interval
- n := len(i.intervals)
- if n > 0 && i.intervals[n-1].Interval == intv {
- return i.intervals[n-1]
- }
- // Add the current interval
- current := NewIntervalMetrics(intv)
- i.intervals = append(i.intervals, current)
- n++
- // Truncate the intervals if they are too long
- if n >= i.maxIntervals {
- copy(i.intervals[0:], i.intervals[n-i.maxIntervals:])
- i.intervals = i.intervals[:i.maxIntervals]
- }
- return current
- }
- // getInterval returns the current interval to write to
- func (i *InmemSink) getInterval() *IntervalMetrics {
- intv := time.Now().Truncate(i.interval)
- if m := i.getExistingInterval(intv); m != nil {
- return m
- }
- return i.createInterval(intv)
- }
- // Flattens the key for formatting, removes spaces
- func (i *InmemSink) flattenKey(parts []string) string {
- buf := &bytes.Buffer{}
- replacer := strings.NewReplacer(" ", "_")
- if len(parts) > 0 {
- replacer.WriteString(buf, parts[0])
- }
- for _, part := range parts[1:] {
- replacer.WriteString(buf, ".")
- replacer.WriteString(buf, part)
- }
- return buf.String()
- }
- // Flattens the key for formatting along with its labels, removes spaces
- func (i *InmemSink) flattenKeyLabels(parts []string, labels []Label) (string, string) {
- buf := &bytes.Buffer{}
- replacer := strings.NewReplacer(" ", "_")
- if len(parts) > 0 {
- replacer.WriteString(buf, parts[0])
- }
- for _, part := range parts[1:] {
- replacer.WriteString(buf, ".")
- replacer.WriteString(buf, part)
- }
- key := buf.String()
- for _, label := range labels {
- replacer.WriteString(buf, fmt.Sprintf(";%s=%s", label.Name, label.Value))
- }
- return buf.String(), key
- }
|