histogram.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371
  1. package prometheus
  2. import (
  3. "context"
  4. "fmt"
  5. "math"
  6. "sort"
  7. "strconv"
  8. "strings"
  9. "sync"
  10. "time"
  11. "github.com/pkg/errors"
  12. "github.com/prometheus/common/model"
  13. )
  14. type Histogram struct {
  15. wg *sync.WaitGroup
  16. name string // 对于直方图, name不包括metric name后的_count _sum _bucket等
  17. labels map[string]string
  18. rangeSelector string
  19. baseTime time.Time
  20. minutes int64
  21. }
  22. func NewHistogram(wg *sync.WaitGroup, name string, labels map[string]string, ts time.Time, mins int64) Histogram {
  23. rangeSelector := fmt.Sprintf("%dm", mins)
  24. return Histogram{
  25. wg: wg,
  26. name: name,
  27. labels: labels,
  28. rangeSelector: rangeSelector,
  29. baseTime: ts,
  30. minutes: mins,
  31. }
  32. }
  33. func (h Histogram) makeMetricNameLabels(name string) string {
  34. labelList := make([]string, 0, len(h.labels))
  35. for key, val := range h.labels {
  36. labelList = append(labelList, fmt.Sprintf("%s='%s'", key, val))
  37. }
  38. return fmt.Sprintf("%s{%s}", name, strings.Join(labelList, ","))
  39. }
  40. // 计算分位数
  41. func (h Histogram) Quantile(q float64, res *float64) error {
  42. defer h.wg.Done()
  43. bucket := fmt.Sprintf("%s_bucket", h.name)
  44. nameLabels := h.makeMetricNameLabels(bucket)
  45. query := fmt.Sprintf(`histogram_quantile(%.2f, sum by (le) (rate(%s[%s])))`, q, nameLabels, h.rangeSelector)
  46. result, _, err := PromeApi().Query(context.Background(), query, h.baseTime)
  47. if err != nil {
  48. return err
  49. }
  50. for _, sample := range result.(model.Vector) {
  51. *res = float64(sample.Value)
  52. if math.IsNaN(*res) {
  53. *res = 0
  54. }
  55. }
  56. return nil
  57. }
  58. // 分位数计算,同QuantileMinutes类似,只是灵活指定单位,不再固定分钟
  59. func (h Histogram) QuantileTimeUnit(timeUnit string, q float64, times *[]string, qlist *[]float64) error {
  60. defer h.wg.Done()
  61. bucket := fmt.Sprintf("%s_bucket", h.name)
  62. labelList := make([]string, 0, len(h.labels))
  63. for key, val := range h.labels {
  64. labelList = append(labelList, fmt.Sprintf("%s='%s'", key, val))
  65. }
  66. bucketLabels := fmt.Sprintf("%s{%s}", bucket, strings.Join(labelList, ","))
  67. query := fmt.Sprintf(`histogram_quantile(%.2f, sum by (le) (rate(%s[2%s])))[%s:1%s]`, q, bucketLabels, timeUnit, h.rangeSelector, timeUnit)
  68. api := PromeApi()
  69. result, _, err := api.Query(context.Background(), query, h.baseTime)
  70. if err != nil {
  71. return err
  72. }
  73. *times = make([]string, 0)
  74. layout := ""
  75. diff := 0
  76. if timeUnit == "m" {
  77. layout = "2006-01-02 15:04"
  78. diff = 60
  79. } else if timeUnit == "h" {
  80. layout = "2006-01-02 15"
  81. diff = 3600
  82. } else if timeUnit == "d" {
  83. layout = "2006-01-02"
  84. diff = 86400
  85. } else {
  86. return errors.New("时间参数非法")
  87. }
  88. for _, val := range result.(model.Matrix) {
  89. for _, pair := range val.Values {
  90. *times = append(*times, pair.Timestamp.Time().Format(layout))
  91. qitem := float64(0)
  92. if !math.IsNaN(float64(pair.Value)) {
  93. qitem = float64(pair.Value)
  94. if math.IsInf(float64(pair.Value), 1) {
  95. qitem = 10000 // 暂时这么固定
  96. }
  97. }
  98. qitem = math.Round(qitem*100) / 100
  99. *qlist = append(*qlist, qitem)
  100. }
  101. }
  102. if len(*times) == 0 { // 如果prometheus中没有数据,初始化返回值
  103. endTime := h.baseTime.Unix()
  104. startTime := h.baseTime.Add(-time.Minute * time.Duration(h.minutes)).Unix()
  105. for startTime < endTime {
  106. *times = append(*times, time.Unix(startTime, 0).Format(timeUnit))
  107. startTime += int64(diff)
  108. }
  109. (*qlist) = make([]float64, len(*times))
  110. }
  111. return nil
  112. }
  113. // 计算某一指标每分钟的分位数
  114. // ts 时间,指从哪一时间开始计算分位数
  115. // q 分位数,发0.5 0.9 0.99等
  116. // times 区间内的每分钟, 格式固定为2006-01-02 15:04
  117. // qlist 区间内每分钟的分位数,与times一一对应
  118. func (h Histogram) QuantileMinutes(q float64, times *[]string, qlist *[]float64) error {
  119. defer h.wg.Done()
  120. bucket := fmt.Sprintf("%s_bucket", h.name)
  121. labelList := make([]string, 0, len(h.labels))
  122. for key, val := range h.labels {
  123. labelList = append(labelList, fmt.Sprintf("%s='%s'", key, val))
  124. }
  125. bucketLabels := fmt.Sprintf("%s{%s}", bucket, strings.Join(labelList, ","))
  126. query := fmt.Sprintf(`histogram_quantile(%.2f, sum by (le) (rate(%s[2m])))[%s:1m]`, q, bucketLabels, h.rangeSelector)
  127. api := PromeApi()
  128. result, _, err := api.Query(context.Background(), query, h.baseTime)
  129. if err != nil {
  130. return err
  131. }
  132. *times = make([]string, 0)
  133. for _, val := range result.(model.Matrix) {
  134. for _, pair := range val.Values {
  135. *times = append(*times, pair.Timestamp.Time().Format("2006-01-02 15:04"))
  136. qitem := float64(0)
  137. if !math.IsNaN(float64(pair.Value)) {
  138. qitem = float64(pair.Value)
  139. if math.IsInf(float64(pair.Value), 1) {
  140. qitem = 10000 // 暂时这么固定
  141. }
  142. }
  143. qitem = math.Round(qitem*100) / 100
  144. *qlist = append(*qlist, qitem)
  145. }
  146. }
  147. if len(*times) == 0 {
  148. h.DefaultQuantileMinutes(times)
  149. *qlist = make([]float64, len(*times))
  150. }
  151. return nil
  152. }
  153. func (h Histogram) DefaultQuantileMinutes(times *[]string) error {
  154. endTime := h.baseTime.Unix()
  155. startTime := h.baseTime.Add(-time.Minute * time.Duration(h.minutes)).Unix()
  156. for startTime < endTime {
  157. *times = append(*times, time.Unix(startTime, 0).Format("2006-01-02 15:04"))
  158. startTime += 60
  159. }
  160. return nil
  161. }
  162. // 计算错误率
  163. func (h Histogram) ErrorRate(errorRate *float64) error {
  164. defer h.wg.Done()
  165. counter := fmt.Sprintf("%s_count", h.name)
  166. labelList := make([]string, 0, len(h.labels))
  167. for key, val := range h.labels {
  168. labelList = append(labelList, fmt.Sprintf("%s='%s'", key, val))
  169. }
  170. totalLabels := fmt.Sprintf("%s{%s}", counter, strings.Join(labelList, ","))
  171. labelList = append(labelList, "error='true'")
  172. errorLabels := fmt.Sprintf("%s{%s}", counter, strings.Join(labelList, ","))
  173. query := fmt.Sprintf("sum(rate(%s[%s]))/sum(rate(%s[%s]))", errorLabels, h.rangeSelector, totalLabels, h.rangeSelector)
  174. result, _, err := PromeApi().Query(context.Background(), query, h.baseTime)
  175. if err != nil {
  176. return err
  177. }
  178. for _, sample := range result.(model.Vector) {
  179. *errorRate = float64(sample.Value)
  180. if math.IsNaN(*errorRate) {
  181. *errorRate = 0
  182. }
  183. }
  184. return nil
  185. }
  186. func (h Histogram) Total(total *int64) error {
  187. defer h.wg.Done()
  188. counter := fmt.Sprintf("%s_count", h.name)
  189. labelList := make([]string, 0, len(h.labels))
  190. for key, val := range h.labels {
  191. labelList = append(labelList, fmt.Sprintf("%s='%s'", key, val))
  192. }
  193. totalLabels := fmt.Sprintf("%s{%s}", counter, strings.Join(labelList, ","))
  194. query := fmt.Sprintf("sum(increase(%s[%s]))", totalLabels, h.rangeSelector)
  195. result, _, err := PromeApi().Query(context.Background(), query, h.baseTime)
  196. if err != nil {
  197. return err
  198. }
  199. for _, sample := range result.(model.Vector) {
  200. *total = int64(sample.Value)
  201. }
  202. return nil
  203. }
  204. // 计算平均时延
  205. func (h Histogram) Avg(avg *float64) error {
  206. defer h.wg.Done()
  207. counter := fmt.Sprintf("%s_count", h.name)
  208. sumer := fmt.Sprintf("%s_sum", h.name)
  209. labelList := make([]string, 0, len(h.labels))
  210. for key, val := range h.labels {
  211. labelList = append(labelList, fmt.Sprintf("%s='%s'", key, val))
  212. }
  213. totalLabels := fmt.Sprintf("%s{%s}", counter, strings.Join(labelList, ","))
  214. sumLabels := fmt.Sprintf("%s{%s}", sumer, strings.Join(labelList, ","))
  215. query := fmt.Sprintf("sum(rate(%s[%s]))/sum(rate(%s[%s]))", sumLabels, h.rangeSelector, totalLabels, h.rangeSelector)
  216. result, _, err := PromeApi().Query(context.Background(), query, h.baseTime)
  217. if err != nil {
  218. return err
  219. }
  220. for _, sample := range result.(model.Vector) {
  221. if !math.IsNaN(float64(sample.Value)) {
  222. *avg = float64(sample.Value)
  223. }
  224. }
  225. return nil
  226. }
  227. func (h Histogram) TotalUnits(unit string, times *[]time.Time, totals *[]float64) error {
  228. if h.wg != nil {
  229. defer h.wg.Done()
  230. }
  231. counter := fmt.Sprintf("%s_count", h.name)
  232. nameLabels := h.makeMetricNameLabels(counter)
  233. rangeSelector := h.rangeSelector
  234. if unit == "" {
  235. return errors.New("Range Selector参数非法")
  236. }
  237. if unit == "d" {
  238. rangeSelector = strconv.Itoa(int(h.minutes/60/24)) + unit
  239. } else if unit == "h" {
  240. rangeSelector = strconv.Itoa(int(h.minutes/60)) + unit
  241. }
  242. query := fmt.Sprintf("sum(increase(%s[1%s]))[%s:1%s]", nameLabels, unit, rangeSelector, unit)
  243. result, _, err := PromeApi().Query(context.Background(), query, h.baseTime)
  244. if err != nil {
  245. return errors.Wrap(err, "查询total units失败")
  246. }
  247. for _, metrix := range result.(model.Matrix) {
  248. for _, val := range metrix.Values {
  249. *times = append(*times, val.Timestamp.Time())
  250. *totals = append(*totals, float64(val.Value))
  251. }
  252. }
  253. return nil
  254. }
  255. // 获取每个桶中count值
  256. func (h Histogram) BucketCounts(buckets *[]float64, counts *[]float64) error {
  257. defer h.wg.Done()
  258. bucket := fmt.Sprintf("%s_bucket", h.name)
  259. nameLables := h.makeMetricNameLabels(bucket)
  260. query := fmt.Sprintf("sum by(le) (increase(%s[%s]))", nameLables, h.rangeSelector)
  261. result, _, err := PromeApi().Query(context.Background(), query, h.baseTime)
  262. if err != nil {
  263. return err
  264. }
  265. ans := [][2]float64{}
  266. for _, sample := range result.(model.Vector) {
  267. // sample.Metric
  268. for key, val := range sample.Metric {
  269. if key == "le" {
  270. border := math.Inf(1)
  271. if val != "+Inf" {
  272. border, _ = strconv.ParseFloat(string(val), 64)
  273. }
  274. ans = append(ans, [2]float64{border, float64(sample.Value)})
  275. }
  276. }
  277. }
  278. sort.Slice(ans, func(i, j int) bool {
  279. return ans[i][0] < ans[j][0]
  280. })
  281. *buckets, *counts = make([]float64, len(ans)), make([]float64, len(ans))
  282. for i, arr := range ans {
  283. (*buckets)[i] = arr[0]
  284. (*counts)[i] = arr[1]
  285. }
  286. return nil
  287. }
  288. // 计算时间窗口内每分钟的QPS
  289. func (h Histogram) QPS(timeUnit string, group string, times *map[string][]string, qlist *map[string][]float64) error {
  290. defer h.wg.Done()
  291. bucket := fmt.Sprintf("%s_count", h.name)
  292. labelList := make([]string, 0, len(h.labels))
  293. for key, val := range h.labels {
  294. labelList = append(labelList, fmt.Sprintf("%s='%s'", key, val))
  295. }
  296. bucketLabels := fmt.Sprintf("%s{%s}", bucket, strings.Join(labelList, ","))
  297. query := fmt.Sprintf(`sum(rate(%s[2%s]))[%s:1%s]`, bucketLabels, timeUnit, h.rangeSelector, timeUnit)
  298. if group != "" {
  299. query = fmt.Sprintf(`sum by (%s) (rate(%s[2%s]))[%s:1%s]`, group, bucketLabels, timeUnit, h.rangeSelector, timeUnit)
  300. }
  301. fmt.Println("###### ", query)
  302. api := PromeApi()
  303. result, _, err := api.Query(context.Background(), query, h.baseTime)
  304. if err != nil {
  305. return err
  306. }
  307. *times = make(map[string][]string, 0)
  308. *qlist = make(map[string][]float64, 0)
  309. layout := ""
  310. diff := 0
  311. if timeUnit == "m" {
  312. layout = "2006-01-02 15:04"
  313. diff = 60
  314. } else if timeUnit == "h" {
  315. layout = "2006-01-02 15"
  316. diff = 3600
  317. } else if timeUnit == "d" {
  318. layout = "2006-01-02"
  319. diff = 86400
  320. } else {
  321. return errors.New("时间参数非法")
  322. }
  323. for _, val := range result.(model.Matrix) {
  324. fmt.Println(val.Metric[model.LabelName(group)])
  325. key := string(val.Metric[model.LabelName(group)])
  326. (*times)[key] = []string{}
  327. (*qlist)[key] = []float64{}
  328. for _, pair := range val.Values {
  329. (*times)[key] = append((*times)[key], pair.Timestamp.Time().Format(layout))
  330. qitem := float64(0)
  331. if !math.IsNaN(float64(pair.Value)) {
  332. qitem = float64(pair.Value)
  333. }
  334. qitem = math.Round(qitem*100) / 100
  335. (*qlist)[key] = append((*qlist)[key], qitem)
  336. }
  337. }
  338. if len(*times) == 0 { // 如果prometheus中没有数据,初始化返回值
  339. endTime := h.baseTime.Unix()
  340. startTime := h.baseTime.Add(-time.Minute * time.Duration(h.minutes)).Unix()
  341. key := ""
  342. (*times)[key] = []string{}
  343. for startTime < endTime {
  344. (*times)[key] = append((*times)[key], time.Unix(startTime, 0).Format(timeUnit))
  345. startTime += int64(diff)
  346. }
  347. (*qlist)[key] = make([]float64, len((*times)[key]))
  348. }
  349. return nil
  350. }