package main import ( "context" "fmt" "github.com/ClickHouse/clickhouse-go/v2" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/sirupsen/logrus" "time" ) type Aggregator struct { chCli clickhouse.Conn logger *logrus.Entry cfg AggregateConfig relation *ServiceNameAppAliasCmdbIDRelation sqlQueryErrorCounter prometheus.Counter endpointNotFoundCounter *prometheus.CounterVec endpointToLoadCounter *prometheus.CounterVec apdexGoodMS int64 apdexBadMS int64 } func NewAggregator(chCli clickhouse.Conn, logger *logrus.Entry, cfg AggregateConfig, relation *ServiceNameAppAliasCmdbIDRelation, reg *prometheus.Registry) *Aggregator { endPointToLoadCounter := prometheus.NewCounterVec(prometheus.CounterOpts{ Name: "i6000pusherAggregatorEndpointToLoadCounter", Help: "需要从clickhouse读取的endpoint数量", }, []string{"service", "endpoint"}) endPointNotFoundCounter := prometheus.NewCounterVec(prometheus.CounterOpts{ Name: "i6000pusherAggregatorEndpointNotFoundCounter", Help: "需要从clickhouse读取的endpoint但没有找到的数量", }, []string{"service", "endpoint"}) reg.MustRegister(endPointToLoadCounter) reg.MustRegister(endPointNotFoundCounter) return &Aggregator{chCli: chCli, logger: logger, cfg: cfg, relation: relation, sqlQueryErrorCounter: prometheus.NewCounter(prometheus.CounterOpts{ Name: "i6000pusherAggregatorSqlQueryErrorCounter", Help: "从clickhouse读取endpoint信息sql出现错误的counter", }), endpointToLoadCounter: endPointToLoadCounter, endpointNotFoundCounter: endPointNotFoundCounter, apdexBadMS: 8000, apdexGoodMS: 2000, } } func (a *Aggregator) LoadMetricData(start time.Time, end time.Time, appAlias string) ([]ServiceMetricData, error) { cmdbID, findCmdbID := a.relation.GetCmdbIDFromAppAlias(appAlias) if !findCmdbID { return nil, errors.Errorf("cmdbID not found, appAlias: %s ", appAlias) } ctx, cancel := context.WithTimeout(context.Background(), a.cfg.LoadTimeout()) defer cancel() /*** 1. 读这段时间内的 endpoint匹配的 sever span list 2. 计算每个endpoint的metricMeters 3. 聚合出所有的instance, 根据instance分组 */ sql := ` WITH '\\b[a-fA-F0-9]{32,}\\b|\\b[a-fA-F0-9\\-]{36}\\b|\\b\\[1-9]\\d*\\b' AS varPattern, SpanAttributes['http.route'] AS route, multiIf( (SpanAttributes['url.path']) != '', SpanAttributes['url.path'], (SpanAttributes['http.target']) != '', path(SpanAttributes['http.target']), (SpanAttributes['http.url']) != '', path(SpanAttributes['http.url']), (SpanAttributes['u\nrl.full']) != '', path(SpanAttributes['url.full']), SpanAttributes['http.route']) AS path select ServiceName, concat(address, ':', port) as instanceAddress, if(route != '', route, replaceRegexpOne(path, varPattern, '{:var}')) AS endpoint, countIf(StatusCodeNumber == 2) as failedCount, count() as requestCount, sum(Duration) as latencySum, sum(length(Exceptions.type)) as exceptionCount, countIf(Duration < ?) as apdexGood, countIf(Duration < ? and Duration > ?) as apdexFair from otel.otel_traces where Timestamp > ? and Timestamp < ? and SpanKindNumber = 2 and AppAlias = ? group by ServiceName, SpanAttributes['net.host.name'] as address, SpanAttributes['net.host.port'] as port, endpoint ` apdexGoodNS := a.apdexGoodMS * 1e6 apdexFairNS := a.apdexBadMS * 1e6 spanRows, errQuery := a.chCli.Query(ctx, sql, apdexGoodNS, apdexFairNS, apdexGoodNS, start.UTC(), end.UTC(), appAlias) if errQuery != nil { a.sqlQueryErrorCounter.Inc() return nil, errors.Wrapf(errQuery, "failed to query metrics: %s, sql:%s", appAlias, sql) } serviceName2MetricData := make(map[string]*ServiceMetricData, 100) for spanRows.Next() { var serviceName string var instance string var em ServiceInstanceEndpointMetrics em.Metrics.aggDuration = end.Sub(start) var failedCount uint64 var requestCount uint64 var exceptionCount uint64 var apdexGoodRequestCount uint64 var apdexFairRequestCount uint64 if errScan := spanRows.Scan(&serviceName, &instance, &em.Endpoint, &failedCount, &requestCount, &em.Metrics.responseTimeSumNS, &exceptionCount, &apdexGoodRequestCount, &apdexFairRequestCount); errScan != nil { a.logger.WithError(errScan).Error("failed to scan metrics") a.sqlQueryErrorCounter.Inc() continue } em.Metrics.requestCount = int64(requestCount) em.Metrics.failedCount = int64(failedCount) em.Metrics.exceptionCount = int64(exceptionCount) em.Metrics.apdexFairRequestCount = int64(apdexFairRequestCount) em.Metrics.apdexGoodRequestCount = int64(apdexGoodRequestCount) a.logger.WithFields(logrus.Fields{ "endpointMetric": em, "service": serviceName, "instance": instance, "startTime": start, "endTime": end, }).Debugf("Query metric from clickhouse") a.endpointToLoadCounter.With( prometheus.Labels{ "service": serviceName, "endpoint": em.Endpoint, }).Inc() serviceMetricData, find := serviceName2MetricData[serviceName] if !find { serviceName2MetricData[serviceName] = &ServiceMetricData{ AppAlias: appAlias, CmdbID: cmdbID, ServiceName: serviceName, InstanceMetrics: []ServiceInstanceMetric{ { InstanceID: instance, InstanceIP: instance, EndpointMetrics: []ServiceInstanceEndpointMetrics{em}, }, }, } continue } serviceMetricData.AddMetric(instance, em) } sds := make([]ServiceMetricData, 0, len(serviceName2MetricData)) for _, data := range serviceName2MetricData { sds = append(sds, *data) } return sds, nil } func (a *Aggregator) LoadTraceData(start, end time.Time, appAlias string) ([]TopoData, error) { cmdbID, findCmdbID := a.relation.GetCmdbIDFromAppAlias(appAlias) if !findCmdbID { return nil, errors.Errorf("cmdbID not found, appAlias: %s ", appAlias) } ctx, cancel := context.WithTimeout(context.Background(), a.cfg.LoadTimeout()) defer cancel() /*** 1. 读这段时间内, appAlias的 拥有parent client span 的 server span 2. 分段计算 3. 通过traceID读出这段数据内某个整条链路 4. serverName必须不一样 5. 沿着parent向上找,一直找不到或appAlias是其他的, 找到的span就是顶点 6. 顶点就是tx, client span是src, server span是dest */ traceIdSql := ` select distinct TraceId from otel.otel_traces_url where Timestamp > ? and Timestamp < ? and AppAlias = ? and Route is not null and Route != '' ` traceRows, errQueryTraceId := a.chCli.Query(ctx, traceIdSql, start, end, appAlias, start, end, appAlias) if errQueryTraceId != nil { a.sqlQueryErrorCounter.Inc() return nil, errors.Wrapf(errQueryTraceId, "failed to query metrics: %s, sql:%s", appAlias, traceIdSql) } traceIds := make([]string, 0) for traceRows.Next() { var traceId string if errScan := traceRows.Scan(&traceId); errScan != nil { a.logger.WithError(errScan).Error("failed to scan traceId") continue } traceIds = append(traceIds, traceId) } a.logger.Infof("LoadTraceData from clickhouse, traceIds length %d, sql:%s, params:(%v, %v, %s)", len(traceIds), traceIdSql, start, end, appAlias) if len(traceIds) == 0 { return []TopoData{}, nil } topoData := make([]TopoData, 0, len(traceIds)*10) traceBatchSize := 50 timePadding := 10 * time.Minute for i := 0; i < len(traceIds); i += traceBatchSize { upperBound := i + traceBatchSize if upperBound > len(traceIds) { upperBound = len(traceIds) } tidList := traceIds[i:upperBound] traceId2Spans, errLoadTraces := a.loadTraceList(start.Add(-timePadding), end.Add(timePadding), tidList) if errLoadTraces != nil { a.logger.WithError(errLoadTraces).Error("failed to load traces") continue } a.logger.Infof("LoadTrace from clickhouse by trace id, traceIds length %d, %d found", len(tidList), len(traceId2Spans)) if len(traceId2Spans) == 0 { continue } for _, spans := range traceId2Spans { endpointTrees, errCalEndPointTree := a.calEndPointFromSpans(spans, appAlias) if errCalEndPointTree != nil { a.logger.WithError(errCalEndPointTree).Errorf("failed to cal end point, spans:%v", spans) continue } for _, endpointTree := range endpointTrees { topoDataInEndpointTree := serviceTopoDataWithEndpoint(cmdbID, endpointTree) topoData = append(topoData, topoDataInEndpointTree...) } } } return aggregateTopoData(topoData), nil } func aggregateTopoData(topoList []TopoData) []TopoData { topoID2TopoList := make(map[string][]TopoData, len(topoList)) for _, t := range topoList { topoID := t.TopoID() if _, ok := topoID2TopoList[topoID]; !ok { topoID2TopoList[topoID] = make([]TopoData, 0, 200) } topoID2TopoList[topoID] = append(topoID2TopoList[topoID], t) } ret := make([]TopoData, 0, len(topoID2TopoList)) for _, ds := range topoID2TopoList { if len(ds) == 0 { continue } topoData := fromTopoDataList2AggregatedTopo(ds) ret = append(ret, topoData) } return ret } func fromTopoDataList2AggregatedTopo(ds []TopoData) TopoData { ret := ds[0] ret.RequestCount = int64(len(ds)) ret.FailedCount = 0 su := int64(0) for _, d := range ds { su += d.AvgResponseTime } ret.AvgResponseTime = (su / int64(len(ds))) / 1e6 //ms return ret } type Span2Cal struct { TraceId string ServiceName string AppAlias string SpanId string ParentSpanId string Duration int64 Route string StatusCode int32 `ch:"StatusCodeNumber"` children []*Span2Cal } func (s *Span2Cal) HasError() bool { return s.StatusCode < 200 || s.StatusCode > 299 } type ServerEndpointNode struct { ServerName string Endpoint string HasError bool Duration int64 children []*ServerEndpointNode } func (s ServerEndpointNode) String() string { str := fmt.Sprintf("%s:%s", s.ServerName, s.Endpoint) str += "children:\n" for _, child := range s.children { str += child.ServerName + ":" + child.Endpoint + "\n" } return str } func subTreesWithEndpointRoot(snn *ServerEndpointNode) []*ServerEndpointNode { if snn == nil { return nil } if snn.Endpoint != "" { return []*ServerEndpointNode{snn} } ret := make([]*ServerEndpointNode, 0) for _, child := range snn.children { ens := subTreesWithEndpointRoot(child) if ens != nil && len(ens) > 0 { ret = append(ret, ens...) } } return ret } func serviceTopoDataWithEndpoint(cmdbID string, snn *ServerEndpointNode) []TopoData { if snn == nil { return make([]TopoData, 0) } subTrees := subTreesWithEndpointRoot(snn) if subTrees == nil || len(subTrees) == 0 { return make([]TopoData, 0) } ret := make([]TopoData, 0) for _, tree := range subTrees { pairs := findTreeEndpointPair(tree) if pairs == nil || len(pairs) == 0 { continue } for _, pair := range pairs { failedCount := 0 if pair.hasError { failedCount = 1 } ret = append(ret, TopoData{ TxServiceName: tree.ServerName, TxEndpoint: tree.Endpoint, TxCmdbID: cmdbID, SrcServiceName: pair.srcServiceName, SrcEndpoint: pair.srcEndpoint, SrcCmdbID: cmdbID, DestServiceName: pair.destService, DestEndpoint: pair.destEndpoint, DestCmdbID: cmdbID, RequestCount: 1, FailedCount: int64(failedCount), AvgResponseTime: pair.duration, }) } } return ret } type serviceEndpointPair struct { srcServiceName string srcEndpoint string destService string destEndpoint string hasError bool duration int64 } func findTreeEndpointPair(tree *ServerEndpointNode) []serviceEndpointPair { if tree == nil { return nil } ret := make([]serviceEndpointPair, 0) if tree.Endpoint != "" { for _, child := range tree.children { if child.Endpoint != "" { ret = append(ret, serviceEndpointPair{ srcServiceName: tree.ServerName, srcEndpoint: tree.Endpoint, destService: child.ServerName, destEndpoint: child.Endpoint, hasError: child.HasError, duration: child.Duration, }) } } } for _, child := range tree.children { childPairs := findTreeEndpointPair(child) if len(childPairs) > 0 { ret = append(ret, childPairs...) } } return ret } func (a *Aggregator) loadTraceList(start, end time.Time, tidList []string) (map[string][]Span2Cal, error) { sql := ` WITH '\\b[a-fA-F0-9]{32,}\\b|\\b[a-fA-F0-9\\-]{36}\\b|\\b\\[1-9]\\d*\\b' AS varPattern, SpanAttributes['http.route'] AS route, multiIf( (SpanAttributes['url.path']) != '', SpanAttributes['url.path'], (SpanAttributes['http.target']) != '', path(SpanAttributes['http.target']), (SpanAttributes['http.url']) != '', path(SpanAttributes['http.url']), (SpanAttributes['u\nrl.full']) != '', path(SpanAttributes['url.full']), SpanAttributes['http.route']) AS path SELECT TraceId, ServiceName, SpanId, ParentSpanId, Duration, StatusCodeNumber, if(route != '', route, replaceRegexpOne(path, varPattern, '{:var}')) AS Route, AppAlias from otel.otel_traces where Timestamp > ? and Timestamp < ? and TraceId in ? ` ctx, cancel := context.WithTimeout(context.Background(), a.cfg.LoadTimeout()) defer cancel() spanRows, errQueryTraceId := a.chCli.Query(ctx, sql, start, end, tidList) if errQueryTraceId != nil { a.sqlQueryErrorCounter.Inc() return nil, errors.Wrapf(errQueryTraceId, "failed to query Traces: %s, sql:%s", tidList, sql) } ret := make(map[string][]Span2Cal) for spanRows.Next() { var span2Cal Span2Cal if err := spanRows.ScanStruct(&span2Cal); err != nil { a.logger.WithError(err).Error("failed to scan span2") continue } _, ok := ret[span2Cal.TraceId] if !ok { ret[span2Cal.TraceId] = []Span2Cal{span2Cal} continue } ret[span2Cal.TraceId] = append(ret[span2Cal.TraceId], span2Cal) } return ret, nil } func (a *Aggregator) calEndPointFromSpans(spans []Span2Cal, targetAppAlias string) ([]*ServerEndpointNode, error) { nodeMap := make(map[string]*Span2Cal) for i := range spans { nodeMap[spans[i].SpanId] = &spans[i] } var tree []*Span2Cal for _, node := range spans { if node.ParentSpanId == "" { tree = append(tree, nodeMap[node.SpanId]) continue } parent, findParent := nodeMap[node.ParentSpanId] if !findParent { // 丢数据了,让这个tree独立出来,最大限度去找 tree = append(tree, nodeMap[node.SpanId]) continue } parent.children = append(parent.children, nodeMap[node.SpanId]) } if len(tree) == 0 { a.logger.Errorf("no tree found in :%+v", spans) return nil, fmt.Errorf("no tree found in :%+v", spans) } ret := make([]*ServerEndpointNode, 0) for _, rootSpan := range tree { sen := EndpointTreeFromSpanTree(rootSpan, nil, targetAppAlias) if sen != nil { ret = append(ret, sen...) } } return ret, nil } func EndpointTreeFromSpanTree(span *Span2Cal, parentEndpointNode *ServerEndpointNode, targetAppAlias string) []*ServerEndpointNode { if span == nil { return nil } currentService := span.ServiceName if (span.AppAlias != targetAppAlias) || (parentEndpointNode != nil && parentEndpointNode.ServerName == currentService) { // 略过本节点,返回子节点结果 ret := make([]*ServerEndpointNode, 0) for _, child := range span.children { childNodes := EndpointTreeFromSpanTree(child, parentEndpointNode, targetAppAlias) if childNodes != nil && len(childNodes) > 0 { ret = append(ret, childNodes...) } } return ret } serverEndpointNode := ServerEndpointNode{ ServerName: currentService, Endpoint: span.Route, HasError: span.HasError(), Duration: span.Duration, children: nil, } for _, childSpan := range span.children { childNodes := EndpointTreeFromSpanTree(childSpan, &serverEndpointNode, targetAppAlias) if childNodes != nil && len(childNodes) > 0 { serverEndpointNode.children = append(serverEndpointNode.children, childNodes...) } } return []*ServerEndpointNode{&serverEndpointNode} }