ot_app_analyst.go 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287
  1. package service
  2. import (
  3. "context"
  4. "fmt"
  5. "go-admin/app/observe/models"
  6. "go-admin/utils"
  7. "time"
  8. "github.com/pkg/errors"
  9. )
  10. type AppAnalyst struct {
  11. utils.OtService
  12. }
  13. func (e *AppAnalyst) GetAppLatencyListByInterval(ctx context.Context, p *models.AppInterval, l *models.AppLatencyByInterval) error {
  14. // timestamp 秒级单位
  15. /*
  16. 计算90分位值
  17. WITH durations AS (
  18. SELECT quantile(0.98)(Duration) AS Duration_90th_percentile
  19. FROM otel_traces_duration_ts otdt
  20. )
  21. */
  22. sql := `WITH durations AS (
  23. SELECT quantile(?)(Duration) AS Duration_90th_percentile
  24. FROM otel_traces
  25. WHERE
  26. toTimeZone(Timestamp,'Asia/Hong_Kong') >= ? and toTimeZone(Timestamp,'Asia/Hong_Kong') < ? and AppAlias = ?
  27. )
  28. Select
  29. toTimeZone(Timestamp,'Asia/Hong_Kong') as ts,
  30. round(Duration/1e6, 2) AS Dms,
  31. if(SpanAttributes['http.status_code']>0, SpanAttributes['http.status_code'], if(SpanAttributes['http.response.status_code']>0, SpanAttributes['http.response.status_code'], 0)) as HttpCode
  32. FROM otel_traces
  33. WHERE
  34. toTimeZone(Timestamp,'Asia/Hong_Kong') >= ? and toTimeZone(Timestamp,'Asia/Hong_Kong') < ? and AppAlias = ? %s AND (ParentSpanId = '') and Duration >= (SELECT Duration_90th_percentile FROM durations)
  35. ORDER BY Timestamp ASC;`
  36. if p.ServiceName != "" {
  37. sql = fmt.Sprintf(sql, fmt.Sprintf("AND ServiceName = '%s'", p.ServiceName))
  38. } else {
  39. sql = fmt.Sprintf(sql, " ")
  40. }
  41. rows, err := e.OlapConn.Query(ctx,
  42. sql,
  43. p.Percentile,
  44. time.Unix(p.StartTime, 0).Format(time.DateTime),
  45. time.Unix(p.EndTime, 0).Format(time.DateTime),
  46. p.AppAlias,
  47. time.Unix(p.StartTime, 0).Format(time.DateTime),
  48. time.Unix(p.EndTime, 0).Format(time.DateTime),
  49. p.AppAlias)
  50. if err != nil {
  51. e.Log.Errorf("olap error query: rows, %s", err)
  52. return err
  53. }
  54. var t time.Time
  55. var hc int32
  56. var d float64
  57. for rows.Next() {
  58. if err := rows.Scan(&t, &d, &hc); err != nil {
  59. e.Log.Errorf("olap error query: successRows, %s", err)
  60. return err
  61. }
  62. if hc < 400 {
  63. l.Success = append(l.Success, []any{t, d})
  64. } else {
  65. l.Failed = append(l.Failed, []any{t, d})
  66. }
  67. }
  68. rows.Close()
  69. return nil
  70. }
  71. // for rows.Next() {
  72. // err = rows.Scan(&t, &d, &e, &r)
  73. // if err != nil {
  74. // s.Log.Errorf("扫描行到变量失败:%s", err)
  75. // break
  76. // }
  77. // estr := "success"
  78. // if e {
  79. // estr = "error"
  80. // }
  81. // if _, ok := (*result)[r]; !ok {
  82. // (*result)[r] = map[string][]models.CoordinatePoint{
  83. // "success": {},
  84. // "error": {},
  85. // }
  86. // }
  87. // (*result)[r][estr] = append((*result)[r][estr], [2]any{t, d})
  88. // }
  89. // return
  90. func (e *AppAnalyst) GetServiceLatencyListByInterval(ctx context.Context, p *models.AppServiceInterval, l *models.AppLatencyByInterval) error {
  91. // timestamp 秒级单位
  92. /*
  93. 计算90分位值
  94. WITH durations AS (
  95. SELECT quantile(0.98)(Duration) AS Duration_90th_percentile
  96. FROM otel_traces_duration_ts otdt
  97. )
  98. */
  99. sql := `WITH durations AS (
  100. SELECT quantile(?)(Duration) AS Duration_90th_percentile
  101. FROM otel_traces
  102. WHERE
  103. Timestamp >= ?
  104. and Timestamp < ?
  105. and AppAlias = ?
  106. and ServiceName = ?
  107. )
  108. Select
  109. distinct
  110. toStartOfInterval(Timestamp, interval ? second, 'Asia/Hong_Kong') as Ts,
  111. round(Duration/1e6) AS Dms,
  112. if(StatusCode='STATUS_CODE_ERROR', 1, 0) AS Err
  113. FROM otel_traces
  114. WHERE
  115. Timestamp >= ? and Timestamp < ?
  116. and AppAlias = ? AND ServiceName = ?
  117. and Duration >= (SELECT Duration_90th_percentile FROM durations)
  118. order by Err desc, Duration desc
  119. limit 1000` // 为了查询效率,不按时间排序,按错误 和 延迟排序,取前1000条
  120. interval := 60
  121. if p.EndTime-p.StartTime <= 20*60 {
  122. interval = 10
  123. } else if p.EndTime-p.StartTime <= 60*60 {
  124. interval = 30
  125. }
  126. list := []struct {
  127. Ts string
  128. Dms float64
  129. Err bool
  130. }{}
  131. err := e.ChOrm.Raw(sql, p.Percentile, p.StartTime, p.EndTime, p.AppAlias, p.SourceService, interval, p.StartTime, p.EndTime, p.AppAlias, p.SourceService).Scan(&list).Error
  132. if err != nil {
  133. return errors.Wrap(err, "查询失败")
  134. }
  135. for _, item := range list {
  136. if item.Err {
  137. l.Failed = append(l.Failed, []any{item.Ts, item.Dms})
  138. } else {
  139. l.Success = append(l.Success, []any{item.Ts, item.Dms})
  140. }
  141. }
  142. return nil
  143. // rows, err := e.OlapConn.Query(ctx,
  144. // sql,
  145. // p.Percentile,
  146. // time.Unix(p.StartTime, 0).Format(time.DateTime),
  147. // time.Unix(p.EndTime, 0).Format(time.DateTime),
  148. // p.AppAlias,
  149. // p.SourceService,
  150. // time.Unix(p.StartTime, 0).Format(time.DateTime),
  151. // time.Unix(p.EndTime, 0).Format(time.DateTime),
  152. // p.AppAlias,
  153. // p.SourceService)
  154. // if err != nil {
  155. // e.Log.Errorf("olap error query: rows, %s", err)
  156. // return err
  157. // }
  158. // var t time.Time
  159. // var hc string
  160. // var d float64
  161. // for rows.Next() {
  162. // if err := rows.Scan(&t, &d, &hc); err != nil {
  163. // e.Log.Errorf("olap error query: successRows, %s", err)
  164. // return err
  165. // }
  166. // // 以下d之所以乘以10, 是因为上面的sql 除以的1e7, *10后才是毫秒; 为了缩小数据量, sql按时间和延迟进行了分组
  167. // if hc != "STATUS_CODE_ERROR" {
  168. // l.Success = append(l.Success, [2]any{t, d * 10})
  169. // } else {
  170. // l.Failed = append(l.Failed, [2]any{t, d * 10})
  171. // }
  172. // }
  173. // rows.Close()
  174. // return nil
  175. }
  176. func (e *AppAnalyst) GetEdgeLatencyListByInterval(ctx context.Context, p *models.AppServiceInterval, l *models.AppLatencyByInterval) error {
  177. // timestamp 秒级单位
  178. /*
  179. 计算90分位值
  180. WITH durations AS (
  181. SELECT quantile(0.98)(Duration) AS Duration_90th_percentile
  182. FROM otel_traces_duration_ts otdt
  183. )
  184. */
  185. sql := `WITH ? AS StartTime, ? AS EndTime, ? AS appAlias,
  186. durations AS (
  187. SELECT quantile(?)(Duration) AS Duration_90th_percentile
  188. FROM
  189. (
  190. SELECT
  191. TraceId,
  192. SpanId,
  193. ParentSpanId,
  194. ServiceName,
  195. Duration,
  196. StatusCode,
  197. AppAlias,
  198. if(SpanAttributes['http.status_code']>0, SpanAttributes['http.status_code'], if(SpanAttributes['http.response.status_code']>0, SpanAttributes['http.response.status_code'], 0)) as HttpCode
  199. FROM otel.otel_traces
  200. WHERE AppAlias=appAlias AND Timestamp > StartTime AND Timestamp < EndTime AND ServiceName = ?
  201. ) AS ot1
  202. INNER JOIN
  203. (
  204. SELECT
  205. SpanId,
  206. ServiceName
  207. FROM otel.otel_traces
  208. WHERE AppAlias=appAlias and Timestamp > StartTime AND Timestamp < EndTime AND ServiceName = ?
  209. ) AS ot2 ON ot2.SpanId = ot1.ParentSpanId
  210. )
  211. Select
  212. toTimeZone(Timestamp,'Asia/Hong_Kong') as ts,
  213. round(Duration/1e6, 2) AS Dms,
  214. if(SpanAttributes['http.status_code']>0, SpanAttributes['http.status_code'], if(SpanAttributes['http.response.status_code']>0, SpanAttributes['http.response.status_code'], 0)) as HttpCode
  215. FROM
  216. (
  217. SELECT
  218. Timestamp,
  219. TraceId,
  220. SpanId,
  221. ParentSpanId,
  222. ServiceName,
  223. Duration,
  224. StatusCode,
  225. AppAlias,
  226. if(SpanAttributes['http.status_code']>0, SpanAttributes['http.status_code'], if(SpanAttributes['http.response.status_code']>0, SpanAttributes['http.response.status_code'], 0)) as HttpCode
  227. FROM otel.otel_traces
  228. WHERE AppAlias=appAlias AND Timestamp > StartTime AND Timestamp < EndTime AND ServiceName = ?
  229. ) AS ot1
  230. INNER JOIN
  231. (
  232. SELECT
  233. SpanId,
  234. ServiceName
  235. FROM otel.otel_traces
  236. WHERE AppAlias=appAlias and Timestamp > StartTime AND Timestamp < EndTime AND ServiceName = ?
  237. ) AS ot2 ON ot2.SpanId = ot1.ParentSpanId
  238. WHERE
  239. Duration >= (SELECT Duration_90th_percentile FROM durations)
  240. ORDER BY Timestamp ASC;`
  241. rows, err := e.OlapConn.Query(ctx,
  242. sql,
  243. p.StartTime,
  244. p.EndTime,
  245. p.AppAlias,
  246. p.Percentile,
  247. p.TargetService,
  248. p.SourceService,
  249. p.TargetService,
  250. p.SourceService,
  251. )
  252. if err != nil {
  253. e.Log.Errorf("olap error query: rows, %s", err)
  254. return err
  255. }
  256. var t time.Time
  257. var hc int32
  258. var d float64
  259. for rows.Next() {
  260. if err := rows.Scan(&t, &d, &hc); err != nil {
  261. e.Log.Errorf("olap error query: successRows, %s", err)
  262. return err
  263. }
  264. if hc < 400 {
  265. l.Success = append(l.Success, []any{t, d})
  266. } else {
  267. l.Failed = append(l.Failed, []any{t, d})
  268. }
  269. }
  270. rows.Close()
  271. return nil
  272. }