package service import ( "context" "fmt" "go-admin/app/observe/models" "go-admin/utils" "time" "github.com/pkg/errors" ) type AppAnalyst struct { utils.OtService } func (e *AppAnalyst) GetAppLatencyListByInterval(ctx context.Context, p *models.AppInterval, l *models.AppLatencyByInterval) error { // timestamp 秒级单位 /* 计算90分位值 WITH durations AS ( SELECT quantile(0.98)(Duration) AS Duration_90th_percentile FROM otel_traces_duration_ts otdt ) */ sql := `WITH durations AS ( SELECT quantile(?)(Duration) AS Duration_90th_percentile FROM otel_traces WHERE toTimeZone(Timestamp,'Asia/Hong_Kong') >= ? and toTimeZone(Timestamp,'Asia/Hong_Kong') < ? and AppAlias = ? ) Select toTimeZone(Timestamp,'Asia/Hong_Kong') as ts, round(Duration/1e6, 2) AS Dms, if(SpanAttributes['http.status_code']>0, SpanAttributes['http.status_code'], if(SpanAttributes['http.response.status_code']>0, SpanAttributes['http.response.status_code'], 0)) as HttpCode FROM otel_traces WHERE toTimeZone(Timestamp,'Asia/Hong_Kong') >= ? and toTimeZone(Timestamp,'Asia/Hong_Kong') < ? and AppAlias = ? %s AND (ParentSpanId = '') and Duration >= (SELECT Duration_90th_percentile FROM durations) ORDER BY Timestamp ASC;` if p.ServiceName != "" { sql = fmt.Sprintf(sql, fmt.Sprintf("AND ServiceName = '%s'", p.ServiceName)) } else { sql = fmt.Sprintf(sql, " ") } rows, err := e.OlapConn.Query(ctx, sql, p.Percentile, time.Unix(p.StartTime, 0).Format(time.DateTime), time.Unix(p.EndTime, 0).Format(time.DateTime), p.AppAlias, time.Unix(p.StartTime, 0).Format(time.DateTime), time.Unix(p.EndTime, 0).Format(time.DateTime), p.AppAlias) if err != nil { e.Log.Errorf("olap error query: rows, %s", err) return err } var t time.Time var hc int32 var d float64 for rows.Next() { if err := rows.Scan(&t, &d, &hc); err != nil { e.Log.Errorf("olap error query: successRows, %s", err) return err } if hc < 400 { l.Success = append(l.Success, []any{t, d}) } else { l.Failed = append(l.Failed, []any{t, d}) } } rows.Close() return nil } // for rows.Next() { // err = rows.Scan(&t, &d, &e, &r) // if err != nil { // s.Log.Errorf("扫描行到变量失败:%s", err) // break // } // estr := "success" // if e { // estr = "error" // } // if _, ok := (*result)[r]; !ok { // (*result)[r] = map[string][]models.CoordinatePoint{ // "success": {}, // "error": {}, // } // } // (*result)[r][estr] = append((*result)[r][estr], [2]any{t, d}) // } // return func (e *AppAnalyst) GetServiceLatencyListByInterval(ctx context.Context, p *models.AppServiceInterval, l *models.AppLatencyByInterval) error { // timestamp 秒级单位 /* 计算90分位值 WITH durations AS ( SELECT quantile(0.98)(Duration) AS Duration_90th_percentile FROM otel_traces_duration_ts otdt ) */ sql := `WITH durations AS ( SELECT quantile(?)(Duration) AS Duration_90th_percentile FROM otel_traces WHERE Timestamp >= ? and Timestamp < ? and AppAlias = ? and ServiceName = ? ) Select distinct toStartOfInterval(Timestamp, interval ? second, 'Asia/Hong_Kong') as Ts, round(Duration/1e6) AS Dms, if(StatusCode='STATUS_CODE_ERROR', 1, 0) AS Err FROM otel_traces WHERE Timestamp >= ? and Timestamp < ? and AppAlias = ? AND ServiceName = ? and Duration >= (SELECT Duration_90th_percentile FROM durations) order by Err desc, Duration desc limit 1000` // 为了查询效率,不按时间排序,按错误 和 延迟排序,取前1000条 interval := 60 if p.EndTime-p.StartTime <= 20*60 { interval = 10 } else if p.EndTime-p.StartTime <= 60*60 { interval = 30 } list := []struct { Ts string Dms float64 Err bool }{} err := e.ChOrm.Raw(sql, p.Percentile, p.StartTime, p.EndTime, p.AppAlias, p.SourceService, interval, p.StartTime, p.EndTime, p.AppAlias, p.SourceService).Scan(&list).Error if err != nil { return errors.Wrap(err, "查询失败") } for _, item := range list { if item.Err { l.Failed = append(l.Failed, []any{item.Ts, item.Dms}) } else { l.Success = append(l.Success, []any{item.Ts, item.Dms}) } } return nil // rows, err := e.OlapConn.Query(ctx, // sql, // p.Percentile, // time.Unix(p.StartTime, 0).Format(time.DateTime), // time.Unix(p.EndTime, 0).Format(time.DateTime), // p.AppAlias, // p.SourceService, // time.Unix(p.StartTime, 0).Format(time.DateTime), // time.Unix(p.EndTime, 0).Format(time.DateTime), // p.AppAlias, // p.SourceService) // if err != nil { // e.Log.Errorf("olap error query: rows, %s", err) // return err // } // var t time.Time // var hc string // var d float64 // for rows.Next() { // if err := rows.Scan(&t, &d, &hc); err != nil { // e.Log.Errorf("olap error query: successRows, %s", err) // return err // } // // 以下d之所以乘以10, 是因为上面的sql 除以的1e7, *10后才是毫秒; 为了缩小数据量, sql按时间和延迟进行了分组 // if hc != "STATUS_CODE_ERROR" { // l.Success = append(l.Success, [2]any{t, d * 10}) // } else { // l.Failed = append(l.Failed, [2]any{t, d * 10}) // } // } // rows.Close() // return nil } func (e *AppAnalyst) GetEdgeLatencyListByInterval(ctx context.Context, p *models.AppServiceInterval, l *models.AppLatencyByInterval) error { // timestamp 秒级单位 /* 计算90分位值 WITH durations AS ( SELECT quantile(0.98)(Duration) AS Duration_90th_percentile FROM otel_traces_duration_ts otdt ) */ sql := `WITH ? AS StartTime, ? AS EndTime, ? AS appAlias, durations AS ( SELECT quantile(?)(Duration) AS Duration_90th_percentile FROM ( SELECT TraceId, SpanId, ParentSpanId, ServiceName, Duration, StatusCode, AppAlias, if(SpanAttributes['http.status_code']>0, SpanAttributes['http.status_code'], if(SpanAttributes['http.response.status_code']>0, SpanAttributes['http.response.status_code'], 0)) as HttpCode FROM otel.otel_traces WHERE AppAlias=appAlias AND Timestamp > StartTime AND Timestamp < EndTime AND ServiceName = ? ) AS ot1 INNER JOIN ( SELECT SpanId, ServiceName FROM otel.otel_traces WHERE AppAlias=appAlias and Timestamp > StartTime AND Timestamp < EndTime AND ServiceName = ? ) AS ot2 ON ot2.SpanId = ot1.ParentSpanId ) Select toTimeZone(Timestamp,'Asia/Hong_Kong') as ts, round(Duration/1e6, 2) AS Dms, if(SpanAttributes['http.status_code']>0, SpanAttributes['http.status_code'], if(SpanAttributes['http.response.status_code']>0, SpanAttributes['http.response.status_code'], 0)) as HttpCode FROM ( SELECT Timestamp, TraceId, SpanId, ParentSpanId, ServiceName, Duration, StatusCode, AppAlias, if(SpanAttributes['http.status_code']>0, SpanAttributes['http.status_code'], if(SpanAttributes['http.response.status_code']>0, SpanAttributes['http.response.status_code'], 0)) as HttpCode FROM otel.otel_traces WHERE AppAlias=appAlias AND Timestamp > StartTime AND Timestamp < EndTime AND ServiceName = ? ) AS ot1 INNER JOIN ( SELECT SpanId, ServiceName FROM otel.otel_traces WHERE AppAlias=appAlias and Timestamp > StartTime AND Timestamp < EndTime AND ServiceName = ? ) AS ot2 ON ot2.SpanId = ot1.ParentSpanId WHERE Duration >= (SELECT Duration_90th_percentile FROM durations) ORDER BY Timestamp ASC;` rows, err := e.OlapConn.Query(ctx, sql, p.StartTime, p.EndTime, p.AppAlias, p.Percentile, p.TargetService, p.SourceService, p.TargetService, p.SourceService, ) if err != nil { e.Log.Errorf("olap error query: rows, %s", err) return err } var t time.Time var hc int32 var d float64 for rows.Next() { if err := rows.Scan(&t, &d, &hc); err != nil { e.Log.Errorf("olap error query: successRows, %s", err) return err } if hc < 400 { l.Success = append(l.Success, []any{t, d}) } else { l.Failed = append(l.Failed, []any{t, d}) } } rows.Close() return nil }