|
- package prometheus
- import (
- "context"
- "fmt"
- "math"
- "sort"
- "strconv"
- "strings"
- "sync"
- "time"
- "github.com/pkg/errors"
- "github.com/prometheus/common/model"
- )
- type Histogram struct {
- wg *sync.WaitGroup
- name string // 对于直方图, name不包括metric name后的_count _sum _bucket等
- labels map[string]string
- rangeSelector string
- baseTime time.Time
- minutes int64
- }
- func NewHistogram(wg *sync.WaitGroup, name string, labels map[string]string, ts time.Time, mins int64) Histogram {
- rangeSelector := fmt.Sprintf("%dm", mins)
- return Histogram{
- wg: wg,
- name: name,
- labels: labels,
- rangeSelector: rangeSelector,
- baseTime: ts,
- minutes: mins,
- }
- }
- func (h Histogram) makeMetricNameLabels(name string) string {
- labelList := make([]string, 0, len(h.labels))
- for key, val := range h.labels {
- labelList = append(labelList, fmt.Sprintf("%s='%s'", key, val))
- }
- return fmt.Sprintf("%s{%s}", name, strings.Join(labelList, ","))
- }
- // 计算分位数
- func (h Histogram) Quantile(q float64, res *float64) error {
- defer h.wg.Done()
- bucket := fmt.Sprintf("%s_bucket", h.name)
- nameLabels := h.makeMetricNameLabels(bucket)
- query := fmt.Sprintf(`histogram_quantile(%.2f, sum by (le) (rate(%s[%s])))`, q, nameLabels, h.rangeSelector)
- result, _, err := PromeApi().Query(context.Background(), query, h.baseTime)
- if err != nil {
- return err
- }
- for _, sample := range result.(model.Vector) {
- *res = float64(sample.Value)
- if math.IsNaN(*res) {
- *res = 0
- }
- }
- return nil
- }
- // 分位数计算,同QuantileMinutes类似,只是灵活指定单位,不再固定分钟
- func (h Histogram) QuantileTimeUnit(timeUnit string, q float64, times *[]string, qlist *[]float64) error {
- defer h.wg.Done()
- bucket := fmt.Sprintf("%s_bucket", h.name)
- labelList := make([]string, 0, len(h.labels))
- for key, val := range h.labels {
- labelList = append(labelList, fmt.Sprintf("%s='%s'", key, val))
- }
- bucketLabels := fmt.Sprintf("%s{%s}", bucket, strings.Join(labelList, ","))
- query := fmt.Sprintf(`histogram_quantile(%.2f, sum by (le) (rate(%s[2%s])))[%s:1%s]`, q, bucketLabels, timeUnit, h.rangeSelector, timeUnit)
- api := PromeApi()
- result, _, err := api.Query(context.Background(), query, h.baseTime)
- if err != nil {
- return err
- }
- *times = make([]string, 0)
- layout := ""
- diff := 0
- if timeUnit == "m" {
- layout = "2006-01-02 15:04"
- diff = 60
- } else if timeUnit == "h" {
- layout = "2006-01-02 15"
- diff = 3600
- } else if timeUnit == "d" {
- layout = "2006-01-02"
- diff = 86400
- } else {
- return errors.New("时间参数非法")
- }
- for _, val := range result.(model.Matrix) {
- for _, pair := range val.Values {
- *times = append(*times, pair.Timestamp.Time().Format(layout))
- qitem := float64(0)
- if !math.IsNaN(float64(pair.Value)) {
- qitem = float64(pair.Value)
- if math.IsInf(float64(pair.Value), 1) {
- qitem = 10000 // 暂时这么固定
- }
- }
- qitem = math.Round(qitem*100) / 100
- *qlist = append(*qlist, qitem)
- }
- }
- if len(*times) == 0 { // 如果prometheus中没有数据,初始化返回值
- endTime := h.baseTime.Unix()
- startTime := h.baseTime.Add(-time.Minute * time.Duration(h.minutes)).Unix()
- for startTime < endTime {
- *times = append(*times, time.Unix(startTime, 0).Format(timeUnit))
- startTime += int64(diff)
- }
- (*qlist) = make([]float64, len(*times))
- }
- return nil
- }
- // 计算某一指标每分钟的分位数
- // ts 时间,指从哪一时间开始计算分位数
- // q 分位数,发0.5 0.9 0.99等
- // times 区间内的每分钟, 格式固定为2006-01-02 15:04
- // qlist 区间内每分钟的分位数,与times一一对应
- func (h Histogram) QuantileMinutes(q float64, times *[]string, qlist *[]float64) error {
- defer h.wg.Done()
- bucket := fmt.Sprintf("%s_bucket", h.name)
- labelList := make([]string, 0, len(h.labels))
- for key, val := range h.labels {
- labelList = append(labelList, fmt.Sprintf("%s='%s'", key, val))
- }
- bucketLabels := fmt.Sprintf("%s{%s}", bucket, strings.Join(labelList, ","))
- query := fmt.Sprintf(`histogram_quantile(%.2f, sum by (le) (rate(%s[2m])))[%s:1m]`, q, bucketLabels, h.rangeSelector)
- api := PromeApi()
- result, _, err := api.Query(context.Background(), query, h.baseTime)
- if err != nil {
- return err
- }
- *times = make([]string, 0)
- for _, val := range result.(model.Matrix) {
- for _, pair := range val.Values {
- *times = append(*times, pair.Timestamp.Time().Format("2006-01-02 15:04"))
- qitem := float64(0)
- if !math.IsNaN(float64(pair.Value)) {
- qitem = float64(pair.Value)
- if math.IsInf(float64(pair.Value), 1) {
- qitem = 10000 // 暂时这么固定
- }
- }
- qitem = math.Round(qitem*100) / 100
- *qlist = append(*qlist, qitem)
- }
- }
- if len(*times) == 0 {
- h.DefaultQuantileMinutes(times)
- *qlist = make([]float64, len(*times))
- }
- return nil
- }
- func (h Histogram) DefaultQuantileMinutes(times *[]string) error {
- endTime := h.baseTime.Unix()
- startTime := h.baseTime.Add(-time.Minute * time.Duration(h.minutes)).Unix()
- for startTime < endTime {
- *times = append(*times, time.Unix(startTime, 0).Format("2006-01-02 15:04"))
- startTime += 60
- }
- return nil
- }
- // 计算错误率
- func (h Histogram) ErrorRate(errorRate *float64) error {
- defer h.wg.Done()
- counter := fmt.Sprintf("%s_count", h.name)
- labelList := make([]string, 0, len(h.labels))
- for key, val := range h.labels {
- labelList = append(labelList, fmt.Sprintf("%s='%s'", key, val))
- }
- totalLabels := fmt.Sprintf("%s{%s}", counter, strings.Join(labelList, ","))
- labelList = append(labelList, "error='true'")
- errorLabels := fmt.Sprintf("%s{%s}", counter, strings.Join(labelList, ","))
- query := fmt.Sprintf("sum(rate(%s[%s]))/sum(rate(%s[%s]))", errorLabels, h.rangeSelector, totalLabels, h.rangeSelector)
- result, _, err := PromeApi().Query(context.Background(), query, h.baseTime)
- if err != nil {
- return err
- }
- for _, sample := range result.(model.Vector) {
- *errorRate = float64(sample.Value)
- if math.IsNaN(*errorRate) {
- *errorRate = 0
- }
- }
- return nil
- }
- func (h Histogram) Total(total *int64) error {
- defer h.wg.Done()
- counter := fmt.Sprintf("%s_count", h.name)
- labelList := make([]string, 0, len(h.labels))
- for key, val := range h.labels {
- labelList = append(labelList, fmt.Sprintf("%s='%s'", key, val))
- }
- totalLabels := fmt.Sprintf("%s{%s}", counter, strings.Join(labelList, ","))
- query := fmt.Sprintf("sum(increase(%s[%s]))", totalLabels, h.rangeSelector)
- result, _, err := PromeApi().Query(context.Background(), query, h.baseTime)
- if err != nil {
- return err
- }
- for _, sample := range result.(model.Vector) {
- *total = int64(sample.Value)
- }
- return nil
- }
- // 计算平均时延
- func (h Histogram) Avg(avg *float64) error {
- defer h.wg.Done()
- counter := fmt.Sprintf("%s_count", h.name)
- sumer := fmt.Sprintf("%s_sum", h.name)
- labelList := make([]string, 0, len(h.labels))
- for key, val := range h.labels {
- labelList = append(labelList, fmt.Sprintf("%s='%s'", key, val))
- }
- totalLabels := fmt.Sprintf("%s{%s}", counter, strings.Join(labelList, ","))
- sumLabels := fmt.Sprintf("%s{%s}", sumer, strings.Join(labelList, ","))
- query := fmt.Sprintf("sum(rate(%s[%s]))/sum(rate(%s[%s]))", sumLabels, h.rangeSelector, totalLabels, h.rangeSelector)
- result, _, err := PromeApi().Query(context.Background(), query, h.baseTime)
- if err != nil {
- return err
- }
- for _, sample := range result.(model.Vector) {
- if !math.IsNaN(float64(sample.Value)) {
- *avg = float64(sample.Value)
- }
- }
- return nil
- }
- func (h Histogram) TotalUnits(unit string, times *[]time.Time, totals *[]float64) error {
- if h.wg != nil {
- defer h.wg.Done()
- }
- counter := fmt.Sprintf("%s_count", h.name)
- nameLabels := h.makeMetricNameLabels(counter)
- rangeSelector := h.rangeSelector
- if unit == "" {
- return errors.New("Range Selector参数非法")
- }
- if unit == "d" {
- rangeSelector = strconv.Itoa(int(h.minutes/60/24)) + unit
- } else if unit == "h" {
- rangeSelector = strconv.Itoa(int(h.minutes/60)) + unit
- }
- query := fmt.Sprintf("sum(increase(%s[1%s]))[%s:1%s]", nameLabels, unit, rangeSelector, unit)
- result, _, err := PromeApi().Query(context.Background(), query, h.baseTime)
- if err != nil {
- return errors.Wrap(err, "查询total units失败")
- }
- for _, metrix := range result.(model.Matrix) {
- for _, val := range metrix.Values {
- *times = append(*times, val.Timestamp.Time())
- *totals = append(*totals, float64(val.Value))
- }
- }
- return nil
- }
- // 获取每个桶中count值
- func (h Histogram) BucketCounts(buckets *[]float64, counts *[]float64) error {
- defer h.wg.Done()
- bucket := fmt.Sprintf("%s_bucket", h.name)
- nameLables := h.makeMetricNameLabels(bucket)
- query := fmt.Sprintf("sum by(le) (increase(%s[%s]))", nameLables, h.rangeSelector)
- result, _, err := PromeApi().Query(context.Background(), query, h.baseTime)
- if err != nil {
- return err
- }
- ans := [][2]float64{}
- for _, sample := range result.(model.Vector) {
- // sample.Metric
- for key, val := range sample.Metric {
- if key == "le" {
- border := math.Inf(1)
- if val != "+Inf" {
- border, _ = strconv.ParseFloat(string(val), 64)
- }
- ans = append(ans, [2]float64{border, float64(sample.Value)})
- }
- }
- }
- sort.Slice(ans, func(i, j int) bool {
- return ans[i][0] < ans[j][0]
- })
- *buckets, *counts = make([]float64, len(ans)), make([]float64, len(ans))
- for i, arr := range ans {
- (*buckets)[i] = arr[0]
- (*counts)[i] = arr[1]
- }
- return nil
- }
- // 计算时间窗口内每分钟的QPS
- func (h Histogram) QPS(timeUnit string, group string, times *map[string][]string, qlist *map[string][]float64) error {
- defer h.wg.Done()
- bucket := fmt.Sprintf("%s_count", h.name)
- labelList := make([]string, 0, len(h.labels))
- for key, val := range h.labels {
- labelList = append(labelList, fmt.Sprintf("%s='%s'", key, val))
- }
- bucketLabels := fmt.Sprintf("%s{%s}", bucket, strings.Join(labelList, ","))
- query := fmt.Sprintf(`sum(rate(%s[2%s]))[%s:1%s]`, bucketLabels, timeUnit, h.rangeSelector, timeUnit)
- if group != "" {
- query = fmt.Sprintf(`sum by (%s) (rate(%s[2%s]))[%s:1%s]`, group, bucketLabels, timeUnit, h.rangeSelector, timeUnit)
- }
- fmt.Println("###### ", query)
- api := PromeApi()
- result, _, err := api.Query(context.Background(), query, h.baseTime)
- if err != nil {
- return err
- }
- *times = make(map[string][]string, 0)
- *qlist = make(map[string][]float64, 0)
- layout := ""
- diff := 0
- if timeUnit == "m" {
- layout = "2006-01-02 15:04"
- diff = 60
- } else if timeUnit == "h" {
- layout = "2006-01-02 15"
- diff = 3600
- } else if timeUnit == "d" {
- layout = "2006-01-02"
- diff = 86400
- } else {
- return errors.New("时间参数非法")
- }
- for _, val := range result.(model.Matrix) {
- fmt.Println(val.Metric[model.LabelName(group)])
- key := string(val.Metric[model.LabelName(group)])
- (*times)[key] = []string{}
- (*qlist)[key] = []float64{}
- for _, pair := range val.Values {
- (*times)[key] = append((*times)[key], pair.Timestamp.Time().Format(layout))
- qitem := float64(0)
- if !math.IsNaN(float64(pair.Value)) {
- qitem = float64(pair.Value)
- }
- qitem = math.Round(qitem*100) / 100
- (*qlist)[key] = append((*qlist)[key], qitem)
- }
- }
- if len(*times) == 0 { // 如果prometheus中没有数据,初始化返回值
- endTime := h.baseTime.Unix()
- startTime := h.baseTime.Add(-time.Minute * time.Duration(h.minutes)).Unix()
- key := ""
- (*times)[key] = []string{}
- for startTime < endTime {
- (*times)[key] = append((*times)[key], time.Unix(startTime, 0).Format(timeUnit))
- startTime += int64(diff)
- }
- (*qlist)[key] = make([]float64, len((*times)[key]))
- }
- return nil
- }
|