ot_app_score.go 9.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302
  1. package service
  2. import (
  3. "context"
  4. "go-admin/app/observe/models"
  5. "go-admin/app/observe/service/dto"
  6. "go-admin/utils"
  7. "time"
  8. "github.com/pkg/errors"
  9. )
  10. type AppScore struct {
  11. utils.OtService
  12. }
  13. func (e *AppScore) GetAllAppsScore(asList *models.AppScoreList, params *models.ScoreParams) error {
  14. /*
  15. SELECT
  16. AppAlias,
  17. countIf(Duration <= 1.5 * 1000 * 1000 *1000 ) AS satisfied,
  18. countIf(Duration > 1.5 * 1000 * 1000 * 1000 AND Duration <= 2.5 * 1000 * 1000 * 1000 ) AS tolerable,
  19. countIf(Duration > 2.5 * 1000 * 1000 * 1000 ) AS frustrated,
  20. (satisfied + tolerable / 2.0 ) / (satisfied + tolerable + frustrated) AS apdex,
  21. count(*) as TotalRequests,
  22. (TotalRequests / 5) as Rate,
  23. countIf(HttpCode >= 400) / TotalRequests as ErrorRate,
  24. (sum(Duration) / TotalRequests / (1000 * 1000)) as Latency
  25. FROM otel.otel_traces_duration_ts
  26. WHERE Timestamp >= now() - INTERVAL 5 MINUTE
  27. AND AppAlias IN ('UNSET', 'picc')
  28. GROUP BY AppAlias ;
  29. */
  30. // builder := new(strings.Builder)
  31. appAliases := make([]string, 0, len(params.ReqAppList))
  32. for _, a := range params.ReqAppList {
  33. // builder.WriteString(fmt.Sprintf("'%s',", a.AppAlias))
  34. appAliases = append(appAliases, a.AppAlias)
  35. }
  36. apdexA := params.PolicyT["apdex_a"].(int)
  37. apdexB := params.PolicyT["apdex_b"].(int)
  38. // ckapdexSQL := fmt.Sprintf(models.APDEXANDREDSQL,
  39. // apdexA, apdexA, apdexB, apdexB,
  40. // params.Interval, params.Interval,
  41. // builder.String())
  42. ckapdexSQL := models.APDEXANDREDSQL
  43. // log.Debug("ck appapdex sql: ", ckapdexSQL, "builder string: ", builder.String())
  44. rows, err := e.OlapConn.Query(context.Background(), ckapdexSQL, apdexA, apdexA, apdexB, apdexB,
  45. params.Interval, params.Interval,
  46. appAliases)
  47. if err != nil {
  48. e.Log.Errorf("olap error query: %s", err)
  49. return err
  50. }
  51. var resApp models.AppScore
  52. for rows.Next() {
  53. if err := rows.ScanStruct(&resApp); err != nil {
  54. e.Log.Errorf("olap error rows: %s", err)
  55. return err
  56. }
  57. asList.Scores = append(asList.Scores, resApp)
  58. }
  59. rows.Close()
  60. return nil
  61. }
  62. func (e *AppScore) GetServicesDetails(ctx context.Context, gs *[]models.GraphNodeScope, params *models.ServiceScoreParams) error {
  63. sql := `
  64. SELECT
  65. ServiceName,
  66. countIf(Duration <= 1.0 * 1000 * 1000 *1000 ) AS satisfied,
  67. countIf(Duration > 1.0 * 1000 * 1000 * 1000 AND Duration <= 2.0 * 1000 * 1000 * 1000 ) AS tolerable,
  68. countIf(Duration > 2.0 * 1000 * 1000 * 1000 ) AS frustrated,
  69. ROUND((satisfied + tolerable / 2.0 ) / (satisfied + tolerable + frustrated), 4) AS Apdex,
  70. COUNT(DISTINCT TraceId) AS TraceNum,
  71. COUNT() AS SpanNum,
  72. SUM(IF(SpanKind IN ('SPAN_KIND_CLIENT', 'SPAN_KIND_PRODUCER'), 1, 0)) AS SentNum,
  73. SUM(IF(SpanKind IN ('SPAN_KIND_SERVER', 'SPAN_KIND_CONSUMER'), 1, 0)) AS ReceivedNum,
  74. SUM(IF(SpanKind='SPAN_KIND_INTERNAL', 1, 0)) AS InternalNum,
  75. SUM(IF(StatusCode='STATUS_CODE_ERROR', 1, 0)) AS ErrorNum,
  76. any(ResourceAttributes['telemetry.sdk.language']) AS SdkLang
  77. FROM
  78. otel.otel_traces ot
  79. WHERE
  80. Timestamp > ?
  81. AND Timestamp < ?
  82. AND AppAlias = ? GROUP BY ServiceName`
  83. rows, err := e.OlapConn.Query(ctx, sql,
  84. params.StartTime,
  85. params.EndTime,
  86. params.AppAlias)
  87. if err != nil {
  88. return err
  89. }
  90. svcNodes := []struct {
  91. ServiceName string
  92. Name string
  93. }{}
  94. if err := e.Orm.Model(&models.ServiceNode{}).Where("app_alias", params.AppAlias).Find(&svcNodes).Error; err != nil {
  95. return errors.Wrap(err, "获取服务列表失败")
  96. }
  97. svcMap := map[string]string{}
  98. for _, node := range svcNodes {
  99. svcMap[node.ServiceName] = node.Name
  100. }
  101. for rows.Next() {
  102. row := new(dto.SerivceGraphNodeRaw)
  103. if err := rows.ScanStruct(row); err != nil {
  104. e.Log.Errorf("扫描行到结构体SerivceGraphNodeRaw失败: %s", err)
  105. }
  106. g := new(models.GraphNodeScope)
  107. // utils.CopyStructMembers(g, row)
  108. g.ID = row.ServiceName
  109. g.Title = svcMap[row.ServiceName]
  110. g.Send = int64(row.SentNum)
  111. g.Receive = int64(row.ReceivedNum)
  112. g.ArcSuccess = float64(row.SpanNum-row.ErrorNum) / float64(row.SpanNum)
  113. g.ArcFaild = float64(row.ErrorNum) / float64(row.SpanNum)
  114. g.Icon = row.SdkLang
  115. g.Apdex = row.Apdex
  116. g.SpanNum = int64(row.SpanNum)
  117. *gs = append(*gs, *g)
  118. }
  119. return nil
  120. }
  121. func (e *AppScore) GetAppDetails(ctx context.Context, gs *models.GraphNodeScope, params *models.ServiceScoreParams) error {
  122. sql := `SELECT
  123. countIf(Duration <= 1.0 * 1000 * 1000 *1000 ) AS satisfied,
  124. countIf(Duration > 1.0 * 1000 * 1000 * 1000 AND Duration <= 2.0 * 1000 * 1000 * 1000 ) AS tolerable,
  125. countIf(Duration > 2.0 * 1000 * 1000 * 1000 ) AS frustrated,
  126. ROUND((satisfied + tolerable / 2.0 ) / (satisfied + tolerable + frustrated), 4) AS Apdex,
  127. COUNT(DISTINCT TraceId) AS TraceNum,
  128. COUNT() AS SpanNum,
  129. SUM(IF(SpanKind IN ('SPAN_KIND_CLIENT', 'SPAN_KIND_PRODUCER'), 1, 0)) AS SentNum,
  130. SUM(IF(SpanKind IN ('SPAN_KIND_SERVER', 'SPAN_KIND_CONSUMER'), 1, 0)) AS ReceivedNum,
  131. SUM(IF(SpanKind='SPAN_KIND_INTERNAL', 1, 0)) AS InternalNum,
  132. SUM(IF(StatusCode='STATUS_CODE_ERROR', 1, 0)) AS ErrorNum,
  133. any(ResourceAttributes['telemetry.sdk.language']) AS SdkLang
  134. FROM
  135. otel_traces ot
  136. WHERE
  137. Timestamp > ?
  138. AND Timestamp < ?
  139. AND AppAlias = ?
  140. AND ServiceName = ?`
  141. row := e.OlapConn.QueryRow(ctx, sql,
  142. params.StartTime,
  143. params.EndTime,
  144. params.AppAlias,
  145. params.SourceService)
  146. if row.Err() != nil {
  147. e.Log.Error("执行sql错误: %s", row.Err())
  148. return row.Err()
  149. }
  150. sr := new(dto.SerivceGraphNodeRaw)
  151. if err := row.ScanStruct(sr); err != nil {
  152. e.Log.Error("扫描行到结构体失败: ", err)
  153. return err
  154. }
  155. gs.ID = sr.ServiceName
  156. gs.Title = sr.ServiceName
  157. gs.Send = int64(sr.SentNum)
  158. gs.Receive = int64(sr.ReceivedNum)
  159. gs.ArcSuccess = float64(sr.SpanNum-sr.ErrorNum) / float64(sr.SpanNum)
  160. gs.ArcFaild = float64(sr.ErrorNum) / float64(sr.SpanNum)
  161. gs.Icon = sr.SdkLang
  162. gs.Apdex = sr.Apdex
  163. // MainStat: fmt.Sprintf("sent: %d, received: %d", row.SentNum, row.ReceivedNum),
  164. return nil
  165. }
  166. func (e *AppScore) GetEdgeDetails(ctx context.Context, gs *[]models.GraphServiceEdge, params *models.ServiceScoreParams) error {
  167. sql := `WITH ? AS StartTime, ? AS EndTime, ? AS seconds, ? AS appAlias
  168. SELECT
  169. ot2.ServiceName AS SourceService,
  170. ot1.ServiceName AS TargetService,
  171. COUNT(*) AS TotalNum,
  172. round(COUNT()/seconds, 2) AS Qps,
  173. countIf(Duration <= 1.0 * 1000 * 1000 *1000 ) AS Satisfied,
  174. countIf(Duration > 1.0 * 1000 * 1000 * 1000 AND Duration <= 2.0 * 1000 * 1000 * 1000 ) AS Tolerable,
  175. countIf(Duration > 2.0 * 1000 * 1000 * 1000 ) AS Frustrated,
  176. ROUND((Satisfied + Tolerable / 2.0 ) / (Satisfied + Tolerable + Frustrated), 4) AS Apdex,
  177. SUM(IF(StatusCode == 'STATUS_CODE_ERROR', 1, 0)) AS ErrorNum,
  178. SUM(IF(StatusCode == 'STATUS_CODE_ERROR', 1, 0))/COUNT() AS ErrorRate,
  179. round(AVG(Duration)/1e6, 2) AS DurationAverage,
  180. round(quantile(0.5)(Duration)/1e6, 2) AS DurationMedian,
  181. round(quantile(0.9)(Duration)/1e6, 2) AS DurationP90,
  182. round(quantile(0.99)(Duration)/1e6, 2) AS DurationP99
  183. FROM
  184. (
  185. SELECT
  186. TraceId,
  187. SpanId,
  188. ParentSpanId,
  189. ServiceName,
  190. Duration,
  191. StatusCode,
  192. AppAlias
  193. FROM otel.otel_traces
  194. WHERE AppAlias=appAlias AND Timestamp > StartTime AND Timestamp < EndTime AND ServiceName = ?
  195. ) AS ot1
  196. INNER JOIN
  197. (
  198. SELECT
  199. SpanId,
  200. ServiceName
  201. FROM otel.otel_traces
  202. WHERE AppAlias=appAlias and Timestamp > StartTime AND Timestamp < EndTime AND ServiceName = ?
  203. ) AS ot2 ON ot2.SpanId = ot1.ParentSpanId
  204. GROUP BY SourceService, TargetService`
  205. // sql = fmt.Sprintf(sql, ChWithSql)
  206. begin, err := time.Parse(time.DateTime, time.Unix(params.StartTime, 0).Format(time.DateTime))
  207. if err != nil {
  208. return err
  209. }
  210. end, err := time.Parse(time.DateTime, time.Unix(params.EndTime, 0).Format(time.DateTime))
  211. if err != nil {
  212. return err
  213. }
  214. seconds := end.Sub(begin).Seconds()
  215. // s.Log.Debug(begin, end, seconds)
  216. rows, err := e.OlapConn.Query(ctx, sql,
  217. params.StartTime,
  218. params.EndTime,
  219. seconds,
  220. params.AppAlias,
  221. params.TargetService,
  222. params.SourceService)
  223. // s.Log.Info(s.OlapConn.Stats())
  224. if err != nil {
  225. return err
  226. }
  227. for rows.Next() {
  228. row := new(dto.ServiceEdgeRaw)
  229. if err := rows.ScanStruct(row); err != nil {
  230. e.Log.Errorf("扫描行到结构体失败: %s", err)
  231. }
  232. g := new(models.GraphServiceEdge)
  233. utils.CopyStructMembers(g, row)
  234. *gs = append(*gs, *g)
  235. }
  236. return nil
  237. }
  238. func (e *AppScore) GetApdexPolicy(params *models.ScoreParams) *AppScore {
  239. //TODO: 获取业务应用个性化配置,提取apdex
  240. params.PolicyT = e.defatultAppApdexPolicy()
  241. return e
  242. }
  243. func (e *AppScore) defatultAppApdexPolicy() map[string]interface{} {
  244. //获取默认apdex规则
  245. apdexPolicy := make(map[string]interface{})
  246. apdexPolicy["apdex_a"] = 1500
  247. apdexPolicy["apdex_b"] = 2500
  248. return apdexPolicy
  249. }
  250. func (e *AppScore) CalApdex(rl models.ReqApp, now int32) (apdex float32) {
  251. // e.OlapConn.QueryRow("")
  252. /*
  253. SELECT
  254. AppAlias ,
  255. countIf(Duration <= 1.5 * 1000 * 1000 *1000 ) AS satisfied,
  256. countIf(Duration > 1.5 * 1000 * 1000 * 1000 AND Duration <= 2.5 * 1000 * 1000 * 1000 ) AS tolerable,
  257. countIf(Duration > 2.5 * 1000 * 1000 * 1000 ) AS frustrated,
  258. (satisfied + tolerable / 2.0 ) / (satisfied + tolerable + frustrated) AS apdex
  259. FROM
  260. otel_traces_duration_ts
  261. WHERE
  262. `Timestamp` >= now() - INTERVAL 5 MINUTE
  263. GROUP BY
  264. AppAlias;
  265. */
  266. return 0
  267. }
  268. func (e *AppScore) CalRED(rl models.ReqApp, now int32) (
  269. rate float32, err float32, duration float32) {
  270. /*
  271. SELECT
  272. AppAlias,
  273. count(*) as TotalRequests,
  274. #请求速率 5min的每min平均请求量
  275. (TotalRequests / 5) as Rate,
  276. #错误率
  277. countIf(HttpCode >= 400) / TotalRequests as ErrorRate,
  278. #平均延迟率/平均响应速率
  279. (sum(Duration) / TotalRequests) as Latency
  280. FROM otel.otel_traces_duration_ts
  281. WHERE Timestamp >= now() - INTERVAL 5 MINUTE
  282. GROUP BY AppAlias
  283. */
  284. return 0, 0, 0
  285. }