package prometheus import ( "context" "fmt" "math" "sort" "strconv" "strings" "sync" "time" "github.com/pkg/errors" "github.com/prometheus/common/model" ) type Histogram struct { wg *sync.WaitGroup name string // 对于直方图, name不包括metric name后的_count _sum _bucket等 labels map[string]string rangeSelector string baseTime time.Time minutes int64 } func NewHistogram(wg *sync.WaitGroup, name string, labels map[string]string, ts time.Time, mins int64) Histogram { rangeSelector := fmt.Sprintf("%dm", mins) return Histogram{ wg: wg, name: name, labels: labels, rangeSelector: rangeSelector, baseTime: ts, minutes: mins, } } func (h Histogram) makeMetricNameLabels(name string) string { labelList := make([]string, 0, len(h.labels)) for key, val := range h.labels { labelList = append(labelList, fmt.Sprintf("%s='%s'", key, val)) } return fmt.Sprintf("%s{%s}", name, strings.Join(labelList, ",")) } // 计算分位数 func (h Histogram) Quantile(q float64, res *float64) error { defer h.wg.Done() bucket := fmt.Sprintf("%s_bucket", h.name) nameLabels := h.makeMetricNameLabels(bucket) query := fmt.Sprintf(`histogram_quantile(%.2f, sum by (le) (rate(%s[%s])))`, q, nameLabels, h.rangeSelector) result, _, err := PromeApi().Query(context.Background(), query, h.baseTime) if err != nil { return err } for _, sample := range result.(model.Vector) { *res = float64(sample.Value) if math.IsNaN(*res) { *res = 0 } } return nil } // 分位数计算,同QuantileMinutes类似,只是灵活指定单位,不再固定分钟 func (h Histogram) QuantileTimeUnit(timeUnit string, q float64, times *[]string, qlist *[]float64) error { defer h.wg.Done() bucket := fmt.Sprintf("%s_bucket", h.name) labelList := make([]string, 0, len(h.labels)) for key, val := range h.labels { labelList = append(labelList, fmt.Sprintf("%s='%s'", key, val)) } bucketLabels := fmt.Sprintf("%s{%s}", bucket, strings.Join(labelList, ",")) query := fmt.Sprintf(`histogram_quantile(%.2f, sum by (le) (rate(%s[2%s])))[%s:1%s]`, q, bucketLabels, timeUnit, h.rangeSelector, timeUnit) api := PromeApi() result, _, err := api.Query(context.Background(), query, h.baseTime) if err != nil { return err } *times = make([]string, 0) layout := "" diff := 0 if timeUnit == "m" { layout = "2006-01-02 15:04" diff = 60 } else if timeUnit == "h" { layout = "2006-01-02 15" diff = 3600 } else if timeUnit == "d" { layout = "2006-01-02" diff = 86400 } else { return errors.New("时间参数非法") } for _, val := range result.(model.Matrix) { for _, pair := range val.Values { *times = append(*times, pair.Timestamp.Time().Format(layout)) qitem := float64(0) if !math.IsNaN(float64(pair.Value)) { qitem = float64(pair.Value) if math.IsInf(float64(pair.Value), 1) { qitem = 10000 // 暂时这么固定 } } qitem = math.Round(qitem*100) / 100 *qlist = append(*qlist, qitem) } } if len(*times) == 0 { // 如果prometheus中没有数据,初始化返回值 endTime := h.baseTime.Unix() startTime := h.baseTime.Add(-time.Minute * time.Duration(h.minutes)).Unix() for startTime < endTime { *times = append(*times, time.Unix(startTime, 0).Format(timeUnit)) startTime += int64(diff) } (*qlist) = make([]float64, len(*times)) } return nil } // 计算某一指标每分钟的分位数 // ts 时间,指从哪一时间开始计算分位数 // q 分位数,发0.5 0.9 0.99等 // times 区间内的每分钟, 格式固定为2006-01-02 15:04 // qlist 区间内每分钟的分位数,与times一一对应 func (h Histogram) QuantileMinutes(q float64, times *[]string, qlist *[]float64) error { defer h.wg.Done() bucket := fmt.Sprintf("%s_bucket", h.name) labelList := make([]string, 0, len(h.labels)) for key, val := range h.labels { labelList = append(labelList, fmt.Sprintf("%s='%s'", key, val)) } bucketLabels := fmt.Sprintf("%s{%s}", bucket, strings.Join(labelList, ",")) query := fmt.Sprintf(`histogram_quantile(%.2f, sum by (le) (rate(%s[2m])))[%s:1m]`, q, bucketLabels, h.rangeSelector) api := PromeApi() result, _, err := api.Query(context.Background(), query, h.baseTime) if err != nil { return err } *times = make([]string, 0) for _, val := range result.(model.Matrix) { for _, pair := range val.Values { *times = append(*times, pair.Timestamp.Time().Format("2006-01-02 15:04")) qitem := float64(0) if !math.IsNaN(float64(pair.Value)) { qitem = float64(pair.Value) if math.IsInf(float64(pair.Value), 1) { qitem = 10000 // 暂时这么固定 } } qitem = math.Round(qitem*100) / 100 *qlist = append(*qlist, qitem) } } if len(*times) == 0 { h.DefaultQuantileMinutes(times) *qlist = make([]float64, len(*times)) } return nil } func (h Histogram) DefaultQuantileMinutes(times *[]string) error { endTime := h.baseTime.Unix() startTime := h.baseTime.Add(-time.Minute * time.Duration(h.minutes)).Unix() for startTime < endTime { *times = append(*times, time.Unix(startTime, 0).Format("2006-01-02 15:04")) startTime += 60 } return nil } // 计算错误率 func (h Histogram) ErrorRate(errorRate *float64) error { defer h.wg.Done() counter := fmt.Sprintf("%s_count", h.name) labelList := make([]string, 0, len(h.labels)) for key, val := range h.labels { labelList = append(labelList, fmt.Sprintf("%s='%s'", key, val)) } totalLabels := fmt.Sprintf("%s{%s}", counter, strings.Join(labelList, ",")) labelList = append(labelList, "error='true'") errorLabels := fmt.Sprintf("%s{%s}", counter, strings.Join(labelList, ",")) query := fmt.Sprintf("sum(rate(%s[%s]))/sum(rate(%s[%s]))", errorLabels, h.rangeSelector, totalLabels, h.rangeSelector) result, _, err := PromeApi().Query(context.Background(), query, h.baseTime) if err != nil { return err } for _, sample := range result.(model.Vector) { *errorRate = float64(sample.Value) if math.IsNaN(*errorRate) { *errorRate = 0 } } return nil } func (h Histogram) Total(total *int64) error { defer h.wg.Done() counter := fmt.Sprintf("%s_count", h.name) labelList := make([]string, 0, len(h.labels)) for key, val := range h.labels { labelList = append(labelList, fmt.Sprintf("%s='%s'", key, val)) } totalLabels := fmt.Sprintf("%s{%s}", counter, strings.Join(labelList, ",")) query := fmt.Sprintf("sum(increase(%s[%s]))", totalLabels, h.rangeSelector) result, _, err := PromeApi().Query(context.Background(), query, h.baseTime) if err != nil { return err } for _, sample := range result.(model.Vector) { *total = int64(sample.Value) } return nil } // 计算平均时延 func (h Histogram) Avg(avg *float64) error { defer h.wg.Done() counter := fmt.Sprintf("%s_count", h.name) sumer := fmt.Sprintf("%s_sum", h.name) labelList := make([]string, 0, len(h.labels)) for key, val := range h.labels { labelList = append(labelList, fmt.Sprintf("%s='%s'", key, val)) } totalLabels := fmt.Sprintf("%s{%s}", counter, strings.Join(labelList, ",")) sumLabels := fmt.Sprintf("%s{%s}", sumer, strings.Join(labelList, ",")) query := fmt.Sprintf("sum(rate(%s[%s]))/sum(rate(%s[%s]))", sumLabels, h.rangeSelector, totalLabels, h.rangeSelector) result, _, err := PromeApi().Query(context.Background(), query, h.baseTime) if err != nil { return err } for _, sample := range result.(model.Vector) { if !math.IsNaN(float64(sample.Value)) { *avg = float64(sample.Value) } } return nil } func (h Histogram) TotalUnits(unit string, times *[]time.Time, totals *[]float64) error { if h.wg != nil { defer h.wg.Done() } counter := fmt.Sprintf("%s_count", h.name) nameLabels := h.makeMetricNameLabels(counter) rangeSelector := h.rangeSelector if unit == "" { return errors.New("Range Selector参数非法") } if unit == "d" { rangeSelector = strconv.Itoa(int(h.minutes/60/24)) + unit } else if unit == "h" { rangeSelector = strconv.Itoa(int(h.minutes/60)) + unit } query := fmt.Sprintf("sum(increase(%s[1%s]))[%s:1%s]", nameLabels, unit, rangeSelector, unit) result, _, err := PromeApi().Query(context.Background(), query, h.baseTime) if err != nil { return errors.Wrap(err, "查询total units失败") } for _, metrix := range result.(model.Matrix) { for _, val := range metrix.Values { *times = append(*times, val.Timestamp.Time()) *totals = append(*totals, float64(val.Value)) } } return nil } // 获取每个桶中count值 func (h Histogram) BucketCounts(buckets *[]float64, counts *[]float64) error { defer h.wg.Done() bucket := fmt.Sprintf("%s_bucket", h.name) nameLables := h.makeMetricNameLabels(bucket) query := fmt.Sprintf("sum by(le) (increase(%s[%s]))", nameLables, h.rangeSelector) result, _, err := PromeApi().Query(context.Background(), query, h.baseTime) if err != nil { return err } ans := [][2]float64{} for _, sample := range result.(model.Vector) { // sample.Metric for key, val := range sample.Metric { if key == "le" { border := math.Inf(1) if val != "+Inf" { border, _ = strconv.ParseFloat(string(val), 64) } ans = append(ans, [2]float64{border, float64(sample.Value)}) } } } sort.Slice(ans, func(i, j int) bool { return ans[i][0] < ans[j][0] }) *buckets, *counts = make([]float64, len(ans)), make([]float64, len(ans)) for i, arr := range ans { (*buckets)[i] = arr[0] (*counts)[i] = arr[1] } return nil } // 计算时间窗口内每分钟的QPS func (h Histogram) QPS(timeUnit string, group string, times *map[string][]string, qlist *map[string][]float64) error { defer h.wg.Done() bucket := fmt.Sprintf("%s_count", h.name) labelList := make([]string, 0, len(h.labels)) for key, val := range h.labels { labelList = append(labelList, fmt.Sprintf("%s='%s'", key, val)) } bucketLabels := fmt.Sprintf("%s{%s}", bucket, strings.Join(labelList, ",")) query := fmt.Sprintf(`sum(rate(%s[2%s]))[%s:1%s]`, bucketLabels, timeUnit, h.rangeSelector, timeUnit) if group != "" { query = fmt.Sprintf(`sum by (%s) (rate(%s[2%s]))[%s:1%s]`, group, bucketLabels, timeUnit, h.rangeSelector, timeUnit) } fmt.Println("###### ", query) api := PromeApi() result, _, err := api.Query(context.Background(), query, h.baseTime) if err != nil { return err } *times = make(map[string][]string, 0) *qlist = make(map[string][]float64, 0) layout := "" diff := 0 if timeUnit == "m" { layout = "2006-01-02 15:04" diff = 60 } else if timeUnit == "h" { layout = "2006-01-02 15" diff = 3600 } else if timeUnit == "d" { layout = "2006-01-02" diff = 86400 } else { return errors.New("时间参数非法") } for _, val := range result.(model.Matrix) { fmt.Println(val.Metric[model.LabelName(group)]) key := string(val.Metric[model.LabelName(group)]) (*times)[key] = []string{} (*qlist)[key] = []float64{} for _, pair := range val.Values { (*times)[key] = append((*times)[key], pair.Timestamp.Time().Format(layout)) qitem := float64(0) if !math.IsNaN(float64(pair.Value)) { qitem = float64(pair.Value) } qitem = math.Round(qitem*100) / 100 (*qlist)[key] = append((*qlist)[key], qitem) } } if len(*times) == 0 { // 如果prometheus中没有数据,初始化返回值 endTime := h.baseTime.Unix() startTime := h.baseTime.Add(-time.Minute * time.Duration(h.minutes)).Unix() key := "" (*times)[key] = []string{} for startTime < endTime { (*times)[key] = append((*times)[key], time.Unix(startTime, 0).Format(timeUnit)) startTime += int64(diff) } (*qlist)[key] = make([]float64, len((*times)[key])) } return nil }