123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491 |
- package main
- import (
- "context"
- "fmt"
- "github.com/ClickHouse/clickhouse-go/v2"
- "github.com/pkg/errors"
- "github.com/prometheus/client_golang/prometheus"
- "github.com/sirupsen/logrus"
- "time"
- )
- type Aggregator struct {
- chCli clickhouse.Conn
- logger *logrus.Entry
- cfg AggregateConfig
- relation *ServiceNameAppAliasCmdbIDRelation
- sqlQueryErrorCounter prometheus.Counter
- endpointNotFoundCounter *prometheus.CounterVec
- endpointToLoadCounter *prometheus.CounterVec
- apdexGoodMS int64
- apdexBadMS int64
- }
- func NewAggregator(chCli clickhouse.Conn, logger *logrus.Entry, cfg AggregateConfig, relation *ServiceNameAppAliasCmdbIDRelation, reg *prometheus.Registry) *Aggregator {
- endPointToLoadCounter := prometheus.NewCounterVec(prometheus.CounterOpts{
- Name: "i6000pusherAggregatorEndpointToLoadCounter",
- Help: "需要从clickhouse读取的endpoint数量",
- }, []string{"service", "endpoint"})
- endPointNotFoundCounter := prometheus.NewCounterVec(prometheus.CounterOpts{
- Name: "i6000pusherAggregatorEndpointNotFoundCounter",
- Help: "需要从clickhouse读取的endpoint但没有找到的数量",
- }, []string{"service", "endpoint"})
- reg.MustRegister(endPointToLoadCounter)
- reg.MustRegister(endPointNotFoundCounter)
- return &Aggregator{chCli: chCli, logger: logger, cfg: cfg, relation: relation,
- sqlQueryErrorCounter: prometheus.NewCounter(prometheus.CounterOpts{
- Name: "i6000pusherAggregatorSqlQueryErrorCounter",
- Help: "从clickhouse读取endpoint信息sql出现错误的counter",
- }),
- endpointToLoadCounter: endPointToLoadCounter,
- endpointNotFoundCounter: endPointNotFoundCounter,
- apdexBadMS: 8000,
- apdexGoodMS: 2000,
- }
- }
- func (a *Aggregator) LoadMetricData(start time.Time, end time.Time, appAlias string) ([]ServiceMetricData, error) {
- cmdbID, findCmdbID := a.relation.GetCmdbIDFromAppAlias(appAlias)
- if !findCmdbID {
- return nil, errors.Errorf("cmdbID not found, appAlias: %s ", appAlias)
- }
- ctx, cancel := context.WithTimeout(context.Background(), a.cfg.LoadTimeout())
- defer cancel()
- /***
- 1. 读这段时间内的 endpoint匹配的 sever span list
- 2. 计算每个endpoint的metricMeters
- 3. 聚合出所有的instance, 根据instance分组
- */
- sql := `
- WITH '\\b[a-fA-F0-9]{32,}\\b|\\b[a-fA-F0-9\\-]{36}\\b|\\b\\[1-9]\\d*\\b' AS varPattern, SpanAttributes['http.route'] AS route, multiIf(
- (SpanAttributes['url.path']) != '', SpanAttributes['url.path'], (SpanAttributes['http.target']) != '',
- path(SpanAttributes['http.target']), (SpanAttributes['http.url']) != '', path(SpanAttributes['http.url']),
- (SpanAttributes['u\nrl.full']) != '', path(SpanAttributes['url.full']),
- SpanAttributes['http.route']) AS path
- select
- ServiceName,
- concat(address, ':', port) as instanceAddress,
- if(route != '', route, replaceRegexpOne(path, varPattern, '{:var}')) AS endpoint,
- countIf(StatusCodeNumber == 2) as failedCount,
- count() as requestCount,
- sum(Duration) as latencySum,
- sum(length(Exceptions.type)) as exceptionCount,
- countIf(Duration < ?) as apdexGood,
- countIf(Duration < ? and Duration > ?) as apdexFair
- from otel.otel_traces
- where
- Timestamp > ?
- and Timestamp < ?
- and SpanKindNumber = 2
- and AppAlias = ?
- group by ServiceName, SpanAttributes['net.host.name'] as address, SpanAttributes['net.host.port'] as port, endpoint
- `
- apdexGoodNS := a.apdexGoodMS * 1e6
- apdexFairNS := a.apdexBadMS * 1e6
- spanRows, errQuery := a.chCli.Query(ctx, sql, apdexGoodNS, apdexFairNS, apdexGoodNS, start.UTC(), end.UTC(), appAlias)
- if errQuery != nil {
- a.sqlQueryErrorCounter.Inc()
- return nil, errors.Wrapf(errQuery, "failed to query metrics: %s, sql:%s", appAlias, sql)
- }
- serviceName2MetricData := make(map[string]*ServiceMetricData, 100)
- for spanRows.Next() {
- var serviceName string
- var instance string
- var em ServiceInstanceEndpointMetrics
- em.Metrics.aggDuration = end.Sub(start)
- var failedCount uint64
- var requestCount uint64
- var exceptionCount uint64
- var apdexGoodRequestCount uint64
- var apdexFairRequestCount uint64
- if errScan := spanRows.Scan(&serviceName, &instance, &em.Endpoint, &failedCount, &requestCount,
- &em.Metrics.responseTimeSumNS, &exceptionCount, &apdexGoodRequestCount,
- &apdexFairRequestCount); errScan != nil {
- a.logger.WithError(errScan).Error("failed to scan metrics")
- a.sqlQueryErrorCounter.Inc()
- continue
- }
- em.Metrics.requestCount = int64(requestCount)
- em.Metrics.failedCount = int64(failedCount)
- em.Metrics.exceptionCount = int64(exceptionCount)
- em.Metrics.apdexFairRequestCount = int64(apdexFairRequestCount)
- em.Metrics.apdexGoodRequestCount = int64(apdexGoodRequestCount)
- a.logger.WithFields(logrus.Fields{
- "endpointMetric": em,
- "service": serviceName,
- "instance": instance,
- "startTime": start,
- "endTime": end,
- }).Debugf("Query metric from clickhouse")
- a.endpointToLoadCounter.With(
- prometheus.Labels{
- "service": serviceName,
- "endpoint": em.Endpoint,
- }).Inc()
- serviceMetricData, find := serviceName2MetricData[serviceName]
- if !find {
- serviceName2MetricData[serviceName] = &ServiceMetricData{
- AppAlias: appAlias,
- CmdbID: cmdbID,
- ServiceName: serviceName,
- InstanceMetrics: []ServiceInstanceMetric{
- {
- InstanceID: instance,
- InstanceIP: instance,
- EndpointMetrics: []ServiceInstanceEndpointMetrics{em},
- },
- },
- }
- continue
- }
- serviceMetricData.AddMetric(instance, em)
- }
- sds := make([]ServiceMetricData, 0, len(serviceName2MetricData))
- for _, data := range serviceName2MetricData {
- sds = append(sds, *data)
- }
- return sds, nil
- }
- func (a *Aggregator) LoadTraceData(start, end time.Time, appAlias string) ([]TopoData, error) {
- cmdbID, findCmdbID := a.relation.GetCmdbIDFromAppAlias(appAlias)
- if !findCmdbID {
- return nil, errors.Errorf("cmdbID not found, appAlias: %s ", appAlias)
- }
- ctx, cancel := context.WithTimeout(context.Background(), a.cfg.LoadTimeout())
- defer cancel()
- /***
- 1. 读这段时间内, appAlias的 拥有parent client span 的 server span
- 2. 分段计算
- 3. 通过traceID读出这段数据内某个整条链路
- 4. serverName必须不一样
- 5. 沿着parent向上找,一直找不到或appAlias是其他的, 找到的span就是顶点
- 6. 顶点就是tx, client span是src, server span是dest
- */
- traceIdSql := `
- select distinct TraceId from otel.otel_traces_url where
- Timestamp > ? and Timestamp < ? and
- AppAlias = ? and Route is not null and
- Route != ''
- `
- traceRows, errQueryTraceId := a.chCli.Query(ctx, traceIdSql, start, end, appAlias,
- start, end, appAlias)
- if errQueryTraceId != nil {
- a.sqlQueryErrorCounter.Inc()
- return nil, errors.Wrapf(errQueryTraceId, "failed to query metrics: %s, sql:%s", appAlias, traceIdSql)
- }
- traceIds := make([]string, 0)
- for traceRows.Next() {
- var traceId string
- if errScan := traceRows.Scan(&traceId); errScan != nil {
- a.logger.WithError(errScan).Error("failed to scan traceId")
- continue
- }
- traceIds = append(traceIds, traceId)
- }
- a.logger.Infof("LoadTraceData from clickhouse, traceIds length %d, sql:%s, params:(%v, %v, %s)",
- len(traceIds), traceIdSql, start, end, appAlias)
- if len(traceIds) == 0 {
- return []TopoData{}, nil
- }
- topoData := make([]TopoData, 0, len(traceIds)*10)
- traceBatchSize := 50
- timePadding := 10 * time.Minute
- for i := 0; i < len(traceIds); i += traceBatchSize {
- upperBound := i + traceBatchSize
- if upperBound > len(traceIds) {
- upperBound = len(traceIds)
- }
- tidList := traceIds[i:upperBound]
- traceId2Spans, errLoadTraces := a.loadTraceList(start.Add(-timePadding), end.Add(timePadding), tidList)
- if errLoadTraces != nil {
- a.logger.WithError(errLoadTraces).Error("failed to load traces")
- continue
- }
- a.logger.Infof("LoadTrace from clickhouse by trace id, traceIds length %d, %d found", len(tidList),
- len(traceId2Spans))
- if len(traceId2Spans) == 0 {
- continue
- }
- for _, spans := range traceId2Spans {
- endpointTrees, errCalEndPointTree := a.calEndPointFromSpans(spans, appAlias)
- if errCalEndPointTree != nil {
- a.logger.WithError(errCalEndPointTree).Errorf("failed to cal end point, spans:%v", spans)
- continue
- }
- for _, endpointTree := range endpointTrees {
- topoDataInEndpointTree := serviceTopoDataWithEndpoint(cmdbID, endpointTree)
- topoData = append(topoData, topoDataInEndpointTree...)
- }
- }
- }
- return aggregateTopoData(topoData), nil
- }
- func aggregateTopoData(topoList []TopoData) []TopoData {
- topoID2TopoList := make(map[string][]TopoData, len(topoList))
- for _, t := range topoList {
- topoID := t.TopoID()
- if _, ok := topoID2TopoList[topoID]; !ok {
- topoID2TopoList[topoID] = make([]TopoData, 0, 200)
- }
- topoID2TopoList[topoID] = append(topoID2TopoList[topoID], t)
- }
- ret := make([]TopoData, 0, len(topoID2TopoList))
- for _, ds := range topoID2TopoList {
- if len(ds) == 0 {
- continue
- }
- topoData := fromTopoDataList2AggregatedTopo(ds)
- ret = append(ret, topoData)
- }
- return ret
- }
- func fromTopoDataList2AggregatedTopo(ds []TopoData) TopoData {
- ret := ds[0]
- ret.RequestCount = int64(len(ds))
- ret.FailedCount = 0
- su := int64(0)
- for _, d := range ds {
- su += d.AvgResponseTime
- }
- ret.AvgResponseTime = (su / int64(len(ds))) / 1e6 //ms
- return ret
- }
- type Span2Cal struct {
- TraceId string
- ServiceName string
- AppAlias string
- SpanId string
- ParentSpanId string
- Duration int64
- Route string
- StatusCode int32 `ch:"StatusCodeNumber"`
- children []*Span2Cal
- }
- func (s *Span2Cal) HasError() bool {
- return s.StatusCode < 200 || s.StatusCode > 299
- }
- type ServerEndpointNode struct {
- ServerName string
- Endpoint string
- HasError bool
- Duration int64
- children []*ServerEndpointNode
- }
- func (s ServerEndpointNode) String() string {
- str := fmt.Sprintf("%s:%s", s.ServerName, s.Endpoint)
- str += "children:\n"
- for _, child := range s.children {
- str += child.ServerName + ":" + child.Endpoint + "\n"
- }
- return str
- }
- func subTreesWithEndpointRoot(snn *ServerEndpointNode) []*ServerEndpointNode {
- if snn == nil {
- return nil
- }
- if snn.Endpoint != "" {
- return []*ServerEndpointNode{snn}
- }
- ret := make([]*ServerEndpointNode, 0)
- for _, child := range snn.children {
- ens := subTreesWithEndpointRoot(child)
- if ens != nil && len(ens) > 0 {
- ret = append(ret, ens...)
- }
- }
- return ret
- }
- func serviceTopoDataWithEndpoint(cmdbID string, snn *ServerEndpointNode) []TopoData {
- if snn == nil {
- return make([]TopoData, 0)
- }
- subTrees := subTreesWithEndpointRoot(snn)
- if subTrees == nil || len(subTrees) == 0 {
- return make([]TopoData, 0)
- }
- ret := make([]TopoData, 0)
- for _, tree := range subTrees {
- pairs := findTreeEndpointPair(tree)
- if pairs == nil || len(pairs) == 0 {
- continue
- }
- for _, pair := range pairs {
- failedCount := 0
- if pair.hasError {
- failedCount = 1
- }
- ret = append(ret, TopoData{
- TxServiceName: tree.ServerName,
- TxEndpoint: tree.Endpoint,
- TxCmdbID: cmdbID,
- SrcServiceName: pair.srcServiceName,
- SrcEndpoint: pair.srcEndpoint,
- SrcCmdbID: cmdbID,
- DestServiceName: pair.destService,
- DestEndpoint: pair.destEndpoint,
- DestCmdbID: cmdbID,
- RequestCount: 1,
- FailedCount: int64(failedCount),
- AvgResponseTime: pair.duration,
- })
- }
- }
- return ret
- }
- type serviceEndpointPair struct {
- srcServiceName string
- srcEndpoint string
- destService string
- destEndpoint string
- hasError bool
- duration int64
- }
- func findTreeEndpointPair(tree *ServerEndpointNode) []serviceEndpointPair {
- if tree == nil {
- return nil
- }
- ret := make([]serviceEndpointPair, 0)
- if tree.Endpoint != "" {
- for _, child := range tree.children {
- if child.Endpoint != "" {
- ret = append(ret, serviceEndpointPair{
- srcServiceName: tree.ServerName,
- srcEndpoint: tree.Endpoint,
- destService: child.ServerName,
- destEndpoint: child.Endpoint,
- hasError: child.HasError,
- duration: child.Duration,
- })
- }
- }
- }
- for _, child := range tree.children {
- childPairs := findTreeEndpointPair(child)
- if len(childPairs) > 0 {
- ret = append(ret, childPairs...)
- }
- }
- return ret
- }
- func (a *Aggregator) loadTraceList(start, end time.Time, tidList []string) (map[string][]Span2Cal, error) {
- sql := `
- WITH '\\b[a-fA-F0-9]{32,}\\b|\\b[a-fA-F0-9\\-]{36}\\b|\\b\\[1-9]\\d*\\b' AS varPattern,
- SpanAttributes['http.route'] AS route,
- multiIf( (SpanAttributes['url.path']) != '', SpanAttributes['url.path'], (SpanAttributes['http.target']) != '',
- path(SpanAttributes['http.target']), (SpanAttributes['http.url']) != '', path(SpanAttributes['http.url']),
- (SpanAttributes['u\nrl.full']) != '', path(SpanAttributes['url.full']),
- SpanAttributes['http.route']) AS path
- SELECT TraceId,
- ServiceName,
- SpanId,
- ParentSpanId,
- Duration,
- StatusCodeNumber,
- if(route != '', route, replaceRegexpOne(path, varPattern, '{:var}')) AS Route,
- AppAlias
- from otel.otel_traces
- where Timestamp > ? and Timestamp < ? and TraceId in ?
- `
- ctx, cancel := context.WithTimeout(context.Background(), a.cfg.LoadTimeout())
- defer cancel()
- spanRows, errQueryTraceId := a.chCli.Query(ctx, sql, start, end,
- tidList)
- if errQueryTraceId != nil {
- a.sqlQueryErrorCounter.Inc()
- return nil, errors.Wrapf(errQueryTraceId, "failed to query Traces: %s, sql:%s", tidList, sql)
- }
- ret := make(map[string][]Span2Cal)
- for spanRows.Next() {
- var span2Cal Span2Cal
- if err := spanRows.ScanStruct(&span2Cal); err != nil {
- a.logger.WithError(err).Error("failed to scan span2")
- continue
- }
- _, ok := ret[span2Cal.TraceId]
- if !ok {
- ret[span2Cal.TraceId] = []Span2Cal{span2Cal}
- continue
- }
- ret[span2Cal.TraceId] = append(ret[span2Cal.TraceId], span2Cal)
- }
- return ret, nil
- }
- func (a *Aggregator) calEndPointFromSpans(spans []Span2Cal, targetAppAlias string) ([]*ServerEndpointNode, error) {
- nodeMap := make(map[string]*Span2Cal)
- for i := range spans {
- nodeMap[spans[i].SpanId] = &spans[i]
- }
- var tree []*Span2Cal
- for _, node := range spans {
- if node.ParentSpanId == "" {
- tree = append(tree, nodeMap[node.SpanId])
- continue
- }
- parent, findParent := nodeMap[node.ParentSpanId]
- if !findParent {
- // 丢数据了,让这个tree独立出来,最大限度去找
- tree = append(tree, nodeMap[node.SpanId])
- continue
- }
- parent.children = append(parent.children, nodeMap[node.SpanId])
- }
- if len(tree) == 0 {
- a.logger.Errorf("no tree found in :%+v", spans)
- return nil, fmt.Errorf("no tree found in :%+v", spans)
- }
- ret := make([]*ServerEndpointNode, 0)
- for _, rootSpan := range tree {
- sen := EndpointTreeFromSpanTree(rootSpan, nil, targetAppAlias)
- if sen != nil {
- ret = append(ret, sen...)
- }
- }
- return ret, nil
- }
- func EndpointTreeFromSpanTree(span *Span2Cal, parentEndpointNode *ServerEndpointNode, targetAppAlias string) []*ServerEndpointNode {
- if span == nil {
- return nil
- }
- currentService := span.ServiceName
- if (span.AppAlias != targetAppAlias) ||
- (parentEndpointNode != nil && parentEndpointNode.ServerName == currentService) {
- // 略过本节点,返回子节点结果
- ret := make([]*ServerEndpointNode, 0)
- for _, child := range span.children {
- childNodes := EndpointTreeFromSpanTree(child, parentEndpointNode, targetAppAlias)
- if childNodes != nil && len(childNodes) > 0 {
- ret = append(ret, childNodes...)
- }
- }
- return ret
- }
- serverEndpointNode := ServerEndpointNode{
- ServerName: currentService,
- Endpoint: span.Route,
- HasError: span.HasError(),
- Duration: span.Duration,
- children: nil,
- }
- for _, childSpan := range span.children {
- childNodes := EndpointTreeFromSpanTree(childSpan, &serverEndpointNode, targetAppAlias)
- if childNodes != nil && len(childNodes) > 0 {
- serverEndpointNode.children = append(serverEndpointNode.children, childNodes...)
- }
- }
- return []*ServerEndpointNode{&serverEndpointNode}
- }
|