package service import ( "context" "go-admin/app/observe/models" "go-admin/app/observe/service/dto" "go-admin/utils" "time" "github.com/pkg/errors" ) type AppScore struct { utils.OtService } func (e *AppScore) GetAllAppsScore(asList *models.AppScoreList, params *models.ScoreParams) error { /* SELECT AppAlias, countIf(Duration <= 1.5 * 1000 * 1000 *1000 ) AS satisfied, countIf(Duration > 1.5 * 1000 * 1000 * 1000 AND Duration <= 2.5 * 1000 * 1000 * 1000 ) AS tolerable, countIf(Duration > 2.5 * 1000 * 1000 * 1000 ) AS frustrated, (satisfied + tolerable / 2.0 ) / (satisfied + tolerable + frustrated) AS apdex, count(*) as TotalRequests, (TotalRequests / 5) as Rate, countIf(HttpCode >= 400) / TotalRequests as ErrorRate, (sum(Duration) / TotalRequests / (1000 * 1000)) as Latency FROM otel.otel_traces_duration_ts WHERE Timestamp >= now() - INTERVAL 5 MINUTE AND AppAlias IN ('UNSET', 'picc') GROUP BY AppAlias ; */ // builder := new(strings.Builder) appAliases := make([]string, 0, len(params.ReqAppList)) for _, a := range params.ReqAppList { // builder.WriteString(fmt.Sprintf("'%s',", a.AppAlias)) appAliases = append(appAliases, a.AppAlias) } apdexA := params.PolicyT["apdex_a"].(int) apdexB := params.PolicyT["apdex_b"].(int) // ckapdexSQL := fmt.Sprintf(models.APDEXANDREDSQL, // apdexA, apdexA, apdexB, apdexB, // params.Interval, params.Interval, // builder.String()) ckapdexSQL := models.APDEXANDREDSQL // log.Debug("ck appapdex sql: ", ckapdexSQL, "builder string: ", builder.String()) rows, err := e.OlapConn.Query(context.Background(), ckapdexSQL, apdexA, apdexA, apdexB, apdexB, params.Interval, params.Interval, appAliases) if err != nil { e.Log.Errorf("olap error query: %s", err) return err } var resApp models.AppScore for rows.Next() { if err := rows.ScanStruct(&resApp); err != nil { e.Log.Errorf("olap error rows: %s", err) return err } asList.Scores = append(asList.Scores, resApp) } rows.Close() return nil } func (e *AppScore) GetServicesDetails(ctx context.Context, gs *[]models.GraphNodeScope, params *models.ServiceScoreParams) error { sql := ` SELECT ServiceName, countIf(Duration <= 1.0 * 1000 * 1000 *1000 ) AS satisfied, countIf(Duration > 1.0 * 1000 * 1000 * 1000 AND Duration <= 2.0 * 1000 * 1000 * 1000 ) AS tolerable, countIf(Duration > 2.0 * 1000 * 1000 * 1000 ) AS frustrated, ROUND((satisfied + tolerable / 2.0 ) / (satisfied + tolerable + frustrated), 4) AS Apdex, COUNT(DISTINCT TraceId) AS TraceNum, COUNT() AS SpanNum, SUM(IF(SpanKind IN ('SPAN_KIND_CLIENT', 'SPAN_KIND_PRODUCER'), 1, 0)) AS SentNum, SUM(IF(SpanKind IN ('SPAN_KIND_SERVER', 'SPAN_KIND_CONSUMER'), 1, 0)) AS ReceivedNum, SUM(IF(SpanKind='SPAN_KIND_INTERNAL', 1, 0)) AS InternalNum, SUM(IF(StatusCode='STATUS_CODE_ERROR', 1, 0)) AS ErrorNum, any(ResourceAttributes['telemetry.sdk.language']) AS SdkLang FROM otel.otel_traces ot WHERE Timestamp > ? AND Timestamp < ? AND AppAlias = ? GROUP BY ServiceName` rows, err := e.OlapConn.Query(ctx, sql, params.StartTime, params.EndTime, params.AppAlias) if err != nil { return err } svcNodes := []struct { ServiceName string Name string }{} if err := e.Orm.Model(&models.ServiceNode{}).Where("app_alias", params.AppAlias).Find(&svcNodes).Error; err != nil { return errors.Wrap(err, "获取服务列表失败") } svcMap := map[string]string{} for _, node := range svcNodes { svcMap[node.ServiceName] = node.Name } for rows.Next() { row := new(dto.SerivceGraphNodeRaw) if err := rows.ScanStruct(row); err != nil { e.Log.Errorf("扫描行到结构体SerivceGraphNodeRaw失败: %s", err) } g := new(models.GraphNodeScope) // utils.CopyStructMembers(g, row) g.ID = row.ServiceName g.Title = svcMap[row.ServiceName] g.Send = int64(row.SentNum) g.Receive = int64(row.ReceivedNum) g.ArcSuccess = float64(row.SpanNum-row.ErrorNum) / float64(row.SpanNum) g.ArcFaild = float64(row.ErrorNum) / float64(row.SpanNum) g.Icon = row.SdkLang g.Apdex = row.Apdex g.SpanNum = int64(row.SpanNum) *gs = append(*gs, *g) } return nil } func (e *AppScore) GetAppDetails(ctx context.Context, gs *models.GraphNodeScope, params *models.ServiceScoreParams) error { sql := `SELECT countIf(Duration <= 1.0 * 1000 * 1000 *1000 ) AS satisfied, countIf(Duration > 1.0 * 1000 * 1000 * 1000 AND Duration <= 2.0 * 1000 * 1000 * 1000 ) AS tolerable, countIf(Duration > 2.0 * 1000 * 1000 * 1000 ) AS frustrated, ROUND((satisfied + tolerable / 2.0 ) / (satisfied + tolerable + frustrated), 4) AS Apdex, COUNT(DISTINCT TraceId) AS TraceNum, COUNT() AS SpanNum, SUM(IF(SpanKind IN ('SPAN_KIND_CLIENT', 'SPAN_KIND_PRODUCER'), 1, 0)) AS SentNum, SUM(IF(SpanKind IN ('SPAN_KIND_SERVER', 'SPAN_KIND_CONSUMER'), 1, 0)) AS ReceivedNum, SUM(IF(SpanKind='SPAN_KIND_INTERNAL', 1, 0)) AS InternalNum, SUM(IF(StatusCode='STATUS_CODE_ERROR', 1, 0)) AS ErrorNum, any(ResourceAttributes['telemetry.sdk.language']) AS SdkLang FROM otel_traces ot WHERE Timestamp > ? AND Timestamp < ? AND AppAlias = ? AND ServiceName = ?` row := e.OlapConn.QueryRow(ctx, sql, params.StartTime, params.EndTime, params.AppAlias, params.SourceService) if row.Err() != nil { e.Log.Error("执行sql错误: %s", row.Err()) return row.Err() } sr := new(dto.SerivceGraphNodeRaw) if err := row.ScanStruct(sr); err != nil { e.Log.Error("扫描行到结构体失败: ", err) return err } gs.ID = sr.ServiceName gs.Title = sr.ServiceName gs.Send = int64(sr.SentNum) gs.Receive = int64(sr.ReceivedNum) gs.ArcSuccess = float64(sr.SpanNum-sr.ErrorNum) / float64(sr.SpanNum) gs.ArcFaild = float64(sr.ErrorNum) / float64(sr.SpanNum) gs.Icon = sr.SdkLang gs.Apdex = sr.Apdex // MainStat: fmt.Sprintf("sent: %d, received: %d", row.SentNum, row.ReceivedNum), return nil } func (e *AppScore) GetEdgeDetails(ctx context.Context, gs *[]models.GraphServiceEdge, params *models.ServiceScoreParams) error { sql := `WITH ? AS StartTime, ? AS EndTime, ? AS seconds, ? AS appAlias SELECT ot2.ServiceName AS SourceService, ot1.ServiceName AS TargetService, COUNT(*) AS TotalNum, round(COUNT()/seconds, 2) AS Qps, countIf(Duration <= 1.0 * 1000 * 1000 *1000 ) AS Satisfied, countIf(Duration > 1.0 * 1000 * 1000 * 1000 AND Duration <= 2.0 * 1000 * 1000 * 1000 ) AS Tolerable, countIf(Duration > 2.0 * 1000 * 1000 * 1000 ) AS Frustrated, ROUND((Satisfied + Tolerable / 2.0 ) / (Satisfied + Tolerable + Frustrated), 4) AS Apdex, SUM(IF(StatusCode == 'STATUS_CODE_ERROR', 1, 0)) AS ErrorNum, SUM(IF(StatusCode == 'STATUS_CODE_ERROR', 1, 0))/COUNT() AS ErrorRate, round(AVG(Duration)/1e6, 2) AS DurationAverage, round(quantile(0.5)(Duration)/1e6, 2) AS DurationMedian, round(quantile(0.9)(Duration)/1e6, 2) AS DurationP90, round(quantile(0.99)(Duration)/1e6, 2) AS DurationP99 FROM ( SELECT TraceId, SpanId, ParentSpanId, ServiceName, Duration, StatusCode, AppAlias FROM otel.otel_traces WHERE AppAlias=appAlias AND Timestamp > StartTime AND Timestamp < EndTime AND ServiceName = ? ) AS ot1 INNER JOIN ( SELECT SpanId, ServiceName FROM otel.otel_traces WHERE AppAlias=appAlias and Timestamp > StartTime AND Timestamp < EndTime AND ServiceName = ? ) AS ot2 ON ot2.SpanId = ot1.ParentSpanId GROUP BY SourceService, TargetService` // sql = fmt.Sprintf(sql, ChWithSql) begin, err := time.Parse(time.DateTime, time.Unix(params.StartTime, 0).Format(time.DateTime)) if err != nil { return err } end, err := time.Parse(time.DateTime, time.Unix(params.EndTime, 0).Format(time.DateTime)) if err != nil { return err } seconds := end.Sub(begin).Seconds() // s.Log.Debug(begin, end, seconds) rows, err := e.OlapConn.Query(ctx, sql, params.StartTime, params.EndTime, seconds, params.AppAlias, params.TargetService, params.SourceService) // s.Log.Info(s.OlapConn.Stats()) if err != nil { return err } for rows.Next() { row := new(dto.ServiceEdgeRaw) if err := rows.ScanStruct(row); err != nil { e.Log.Errorf("扫描行到结构体失败: %s", err) } g := new(models.GraphServiceEdge) utils.CopyStructMembers(g, row) *gs = append(*gs, *g) } return nil } func (e *AppScore) GetApdexPolicy(params *models.ScoreParams) *AppScore { //TODO: 获取业务应用个性化配置,提取apdex params.PolicyT = e.defatultAppApdexPolicy() return e } func (e *AppScore) defatultAppApdexPolicy() map[string]interface{} { //获取默认apdex规则 apdexPolicy := make(map[string]interface{}) apdexPolicy["apdex_a"] = 1500 apdexPolicy["apdex_b"] = 2500 return apdexPolicy } func (e *AppScore) CalApdex(rl models.ReqApp, now int32) (apdex float32) { // e.OlapConn.QueryRow("") /* SELECT AppAlias , countIf(Duration <= 1.5 * 1000 * 1000 *1000 ) AS satisfied, countIf(Duration > 1.5 * 1000 * 1000 * 1000 AND Duration <= 2.5 * 1000 * 1000 * 1000 ) AS tolerable, countIf(Duration > 2.5 * 1000 * 1000 * 1000 ) AS frustrated, (satisfied + tolerable / 2.0 ) / (satisfied + tolerable + frustrated) AS apdex FROM otel_traces_duration_ts WHERE `Timestamp` >= now() - INTERVAL 5 MINUTE GROUP BY AppAlias; */ return 0 } func (e *AppScore) CalRED(rl models.ReqApp, now int32) ( rate float32, err float32, duration float32) { /* SELECT AppAlias, count(*) as TotalRequests, #请求速率 5min的每min平均请求量 (TotalRequests / 5) as Rate, #错误率 countIf(HttpCode >= 400) / TotalRequests as ErrorRate, #平均延迟率/平均响应速率 (sum(Duration) / TotalRequests) as Latency FROM otel.otel_traces_duration_ts WHERE Timestamp >= now() - INTERVAL 5 MINUTE GROUP BY AppAlias */ return 0, 0, 0 }