aggregator.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491
  1. package main
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/ClickHouse/clickhouse-go/v2"
  6. "github.com/pkg/errors"
  7. "github.com/prometheus/client_golang/prometheus"
  8. "github.com/sirupsen/logrus"
  9. "time"
  10. )
  11. type Aggregator struct {
  12. chCli clickhouse.Conn
  13. logger *logrus.Entry
  14. cfg AggregateConfig
  15. relation *ServiceNameAppAliasCmdbIDRelation
  16. sqlQueryErrorCounter prometheus.Counter
  17. endpointNotFoundCounter *prometheus.CounterVec
  18. endpointToLoadCounter *prometheus.CounterVec
  19. apdexGoodMS int64
  20. apdexBadMS int64
  21. }
  22. func NewAggregator(chCli clickhouse.Conn, logger *logrus.Entry, cfg AggregateConfig, relation *ServiceNameAppAliasCmdbIDRelation, reg *prometheus.Registry) *Aggregator {
  23. endPointToLoadCounter := prometheus.NewCounterVec(prometheus.CounterOpts{
  24. Name: "i6000pusherAggregatorEndpointToLoadCounter",
  25. Help: "需要从clickhouse读取的endpoint数量",
  26. }, []string{"service", "endpoint"})
  27. endPointNotFoundCounter := prometheus.NewCounterVec(prometheus.CounterOpts{
  28. Name: "i6000pusherAggregatorEndpointNotFoundCounter",
  29. Help: "需要从clickhouse读取的endpoint但没有找到的数量",
  30. }, []string{"service", "endpoint"})
  31. reg.MustRegister(endPointToLoadCounter)
  32. reg.MustRegister(endPointNotFoundCounter)
  33. return &Aggregator{chCli: chCli, logger: logger, cfg: cfg, relation: relation,
  34. sqlQueryErrorCounter: prometheus.NewCounter(prometheus.CounterOpts{
  35. Name: "i6000pusherAggregatorSqlQueryErrorCounter",
  36. Help: "从clickhouse读取endpoint信息sql出现错误的counter",
  37. }),
  38. endpointToLoadCounter: endPointToLoadCounter,
  39. endpointNotFoundCounter: endPointNotFoundCounter,
  40. apdexBadMS: 8000,
  41. apdexGoodMS: 2000,
  42. }
  43. }
  44. func (a *Aggregator) LoadMetricData(start time.Time, end time.Time, appAlias string) ([]ServiceMetricData, error) {
  45. cmdbID, findCmdbID := a.relation.GetCmdbIDFromAppAlias(appAlias)
  46. if !findCmdbID {
  47. return nil, errors.Errorf("cmdbID not found, appAlias: %s ", appAlias)
  48. }
  49. ctx, cancel := context.WithTimeout(context.Background(), a.cfg.LoadTimeout())
  50. defer cancel()
  51. /***
  52. 1. 读这段时间内的 endpoint匹配的 sever span list
  53. 2. 计算每个endpoint的metricMeters
  54. 3. 聚合出所有的instance, 根据instance分组
  55. */
  56. sql := `
  57. WITH '\\b[a-fA-F0-9]{32,}\\b|\\b[a-fA-F0-9\\-]{36}\\b|\\b\\[1-9]\\d*\\b' AS varPattern, SpanAttributes['http.route'] AS route, multiIf(
  58. (SpanAttributes['url.path']) != '', SpanAttributes['url.path'], (SpanAttributes['http.target']) != '',
  59. path(SpanAttributes['http.target']), (SpanAttributes['http.url']) != '', path(SpanAttributes['http.url']),
  60. (SpanAttributes['u\nrl.full']) != '', path(SpanAttributes['url.full']),
  61. SpanAttributes['http.route']) AS path
  62. select
  63. ServiceName,
  64. concat(address, ':', port) as instanceAddress,
  65. if(route != '', route, replaceRegexpOne(path, varPattern, '{:var}')) AS endpoint,
  66. countIf(StatusCodeNumber == 2) as failedCount,
  67. count() as requestCount,
  68. sum(Duration) as latencySum,
  69. sum(length(Exceptions.type)) as exceptionCount,
  70. countIf(Duration < ?) as apdexGood,
  71. countIf(Duration < ? and Duration > ?) as apdexFair
  72. from otel.otel_traces
  73. where
  74. Timestamp > ?
  75. and Timestamp < ?
  76. and SpanKindNumber = 2
  77. and AppAlias = ?
  78. group by ServiceName, SpanAttributes['net.host.name'] as address, SpanAttributes['net.host.port'] as port, endpoint
  79. `
  80. apdexGoodNS := a.apdexGoodMS * 1e6
  81. apdexFairNS := a.apdexBadMS * 1e6
  82. spanRows, errQuery := a.chCli.Query(ctx, sql, apdexGoodNS, apdexFairNS, apdexGoodNS, start.UTC(), end.UTC(), appAlias)
  83. if errQuery != nil {
  84. a.sqlQueryErrorCounter.Inc()
  85. return nil, errors.Wrapf(errQuery, "failed to query metrics: %s, sql:%s", appAlias, sql)
  86. }
  87. serviceName2MetricData := make(map[string]*ServiceMetricData, 100)
  88. for spanRows.Next() {
  89. var serviceName string
  90. var instance string
  91. var em ServiceInstanceEndpointMetrics
  92. em.Metrics.aggDuration = end.Sub(start)
  93. var failedCount uint64
  94. var requestCount uint64
  95. var exceptionCount uint64
  96. var apdexGoodRequestCount uint64
  97. var apdexFairRequestCount uint64
  98. if errScan := spanRows.Scan(&serviceName, &instance, &em.Endpoint, &failedCount, &requestCount,
  99. &em.Metrics.responseTimeSumNS, &exceptionCount, &apdexGoodRequestCount,
  100. &apdexFairRequestCount); errScan != nil {
  101. a.logger.WithError(errScan).Error("failed to scan metrics")
  102. a.sqlQueryErrorCounter.Inc()
  103. continue
  104. }
  105. em.Metrics.requestCount = int64(requestCount)
  106. em.Metrics.failedCount = int64(failedCount)
  107. em.Metrics.exceptionCount = int64(exceptionCount)
  108. em.Metrics.apdexFairRequestCount = int64(apdexFairRequestCount)
  109. em.Metrics.apdexGoodRequestCount = int64(apdexGoodRequestCount)
  110. a.logger.WithFields(logrus.Fields{
  111. "endpointMetric": em,
  112. "service": serviceName,
  113. "instance": instance,
  114. "startTime": start,
  115. "endTime": end,
  116. }).Debugf("Query metric from clickhouse")
  117. a.endpointToLoadCounter.With(
  118. prometheus.Labels{
  119. "service": serviceName,
  120. "endpoint": em.Endpoint,
  121. }).Inc()
  122. serviceMetricData, find := serviceName2MetricData[serviceName]
  123. if !find {
  124. serviceName2MetricData[serviceName] = &ServiceMetricData{
  125. AppAlias: appAlias,
  126. CmdbID: cmdbID,
  127. ServiceName: serviceName,
  128. InstanceMetrics: []ServiceInstanceMetric{
  129. {
  130. InstanceID: instance,
  131. InstanceIP: instance,
  132. EndpointMetrics: []ServiceInstanceEndpointMetrics{em},
  133. },
  134. },
  135. }
  136. continue
  137. }
  138. serviceMetricData.AddMetric(instance, em)
  139. }
  140. sds := make([]ServiceMetricData, 0, len(serviceName2MetricData))
  141. for _, data := range serviceName2MetricData {
  142. sds = append(sds, *data)
  143. }
  144. return sds, nil
  145. }
  146. func (a *Aggregator) LoadTraceData(start, end time.Time, appAlias string) ([]TopoData, error) {
  147. cmdbID, findCmdbID := a.relation.GetCmdbIDFromAppAlias(appAlias)
  148. if !findCmdbID {
  149. return nil, errors.Errorf("cmdbID not found, appAlias: %s ", appAlias)
  150. }
  151. ctx, cancel := context.WithTimeout(context.Background(), a.cfg.LoadTimeout())
  152. defer cancel()
  153. /***
  154. 1. 读这段时间内, appAlias的 拥有parent client span 的 server span
  155. 2. 分段计算
  156. 3. 通过traceID读出这段数据内某个整条链路
  157. 4. serverName必须不一样
  158. 5. 沿着parent向上找,一直找不到或appAlias是其他的, 找到的span就是顶点
  159. 6. 顶点就是tx, client span是src, server span是dest
  160. */
  161. traceIdSql := `
  162. select distinct TraceId from otel.otel_traces_url where
  163. Timestamp > ? and Timestamp < ? and
  164. AppAlias = ? and Route is not null and
  165. Route != ''
  166. `
  167. traceRows, errQueryTraceId := a.chCli.Query(ctx, traceIdSql, start, end, appAlias,
  168. start, end, appAlias)
  169. if errQueryTraceId != nil {
  170. a.sqlQueryErrorCounter.Inc()
  171. return nil, errors.Wrapf(errQueryTraceId, "failed to query metrics: %s, sql:%s", appAlias, traceIdSql)
  172. }
  173. traceIds := make([]string, 0)
  174. for traceRows.Next() {
  175. var traceId string
  176. if errScan := traceRows.Scan(&traceId); errScan != nil {
  177. a.logger.WithError(errScan).Error("failed to scan traceId")
  178. continue
  179. }
  180. traceIds = append(traceIds, traceId)
  181. }
  182. a.logger.Infof("LoadTraceData from clickhouse, traceIds length %d, sql:%s, params:(%v, %v, %s)",
  183. len(traceIds), traceIdSql, start, end, appAlias)
  184. if len(traceIds) == 0 {
  185. return []TopoData{}, nil
  186. }
  187. topoData := make([]TopoData, 0, len(traceIds)*10)
  188. traceBatchSize := 50
  189. timePadding := 10 * time.Minute
  190. for i := 0; i < len(traceIds); i += traceBatchSize {
  191. upperBound := i + traceBatchSize
  192. if upperBound > len(traceIds) {
  193. upperBound = len(traceIds)
  194. }
  195. tidList := traceIds[i:upperBound]
  196. traceId2Spans, errLoadTraces := a.loadTraceList(start.Add(-timePadding), end.Add(timePadding), tidList)
  197. if errLoadTraces != nil {
  198. a.logger.WithError(errLoadTraces).Error("failed to load traces")
  199. continue
  200. }
  201. a.logger.Infof("LoadTrace from clickhouse by trace id, traceIds length %d, %d found", len(tidList),
  202. len(traceId2Spans))
  203. if len(traceId2Spans) == 0 {
  204. continue
  205. }
  206. for _, spans := range traceId2Spans {
  207. endpointTrees, errCalEndPointTree := a.calEndPointFromSpans(spans, appAlias)
  208. if errCalEndPointTree != nil {
  209. a.logger.WithError(errCalEndPointTree).Errorf("failed to cal end point, spans:%v", spans)
  210. continue
  211. }
  212. for _, endpointTree := range endpointTrees {
  213. topoDataInEndpointTree := serviceTopoDataWithEndpoint(cmdbID, endpointTree)
  214. topoData = append(topoData, topoDataInEndpointTree...)
  215. }
  216. }
  217. }
  218. return aggregateTopoData(topoData), nil
  219. }
  220. func aggregateTopoData(topoList []TopoData) []TopoData {
  221. topoID2TopoList := make(map[string][]TopoData, len(topoList))
  222. for _, t := range topoList {
  223. topoID := t.TopoID()
  224. if _, ok := topoID2TopoList[topoID]; !ok {
  225. topoID2TopoList[topoID] = make([]TopoData, 0, 200)
  226. }
  227. topoID2TopoList[topoID] = append(topoID2TopoList[topoID], t)
  228. }
  229. ret := make([]TopoData, 0, len(topoID2TopoList))
  230. for _, ds := range topoID2TopoList {
  231. if len(ds) == 0 {
  232. continue
  233. }
  234. topoData := fromTopoDataList2AggregatedTopo(ds)
  235. ret = append(ret, topoData)
  236. }
  237. return ret
  238. }
  239. func fromTopoDataList2AggregatedTopo(ds []TopoData) TopoData {
  240. ret := ds[0]
  241. ret.RequestCount = int64(len(ds))
  242. ret.FailedCount = 0
  243. su := int64(0)
  244. for _, d := range ds {
  245. su += d.AvgResponseTime
  246. }
  247. ret.AvgResponseTime = (su / int64(len(ds))) / 1e6 //ms
  248. return ret
  249. }
  250. type Span2Cal struct {
  251. TraceId string
  252. ServiceName string
  253. AppAlias string
  254. SpanId string
  255. ParentSpanId string
  256. Duration int64
  257. Route string
  258. StatusCode int32 `ch:"StatusCodeNumber"`
  259. children []*Span2Cal
  260. }
  261. func (s *Span2Cal) HasError() bool {
  262. return s.StatusCode < 200 || s.StatusCode > 299
  263. }
  264. type ServerEndpointNode struct {
  265. ServerName string
  266. Endpoint string
  267. HasError bool
  268. Duration int64
  269. children []*ServerEndpointNode
  270. }
  271. func (s ServerEndpointNode) String() string {
  272. str := fmt.Sprintf("%s:%s", s.ServerName, s.Endpoint)
  273. str += "children:\n"
  274. for _, child := range s.children {
  275. str += child.ServerName + ":" + child.Endpoint + "\n"
  276. }
  277. return str
  278. }
  279. func subTreesWithEndpointRoot(snn *ServerEndpointNode) []*ServerEndpointNode {
  280. if snn == nil {
  281. return nil
  282. }
  283. if snn.Endpoint != "" {
  284. return []*ServerEndpointNode{snn}
  285. }
  286. ret := make([]*ServerEndpointNode, 0)
  287. for _, child := range snn.children {
  288. ens := subTreesWithEndpointRoot(child)
  289. if ens != nil && len(ens) > 0 {
  290. ret = append(ret, ens...)
  291. }
  292. }
  293. return ret
  294. }
  295. func serviceTopoDataWithEndpoint(cmdbID string, snn *ServerEndpointNode) []TopoData {
  296. if snn == nil {
  297. return make([]TopoData, 0)
  298. }
  299. subTrees := subTreesWithEndpointRoot(snn)
  300. if subTrees == nil || len(subTrees) == 0 {
  301. return make([]TopoData, 0)
  302. }
  303. ret := make([]TopoData, 0)
  304. for _, tree := range subTrees {
  305. pairs := findTreeEndpointPair(tree)
  306. if pairs == nil || len(pairs) == 0 {
  307. continue
  308. }
  309. for _, pair := range pairs {
  310. failedCount := 0
  311. if pair.hasError {
  312. failedCount = 1
  313. }
  314. ret = append(ret, TopoData{
  315. TxServiceName: tree.ServerName,
  316. TxEndpoint: tree.Endpoint,
  317. TxCmdbID: cmdbID,
  318. SrcServiceName: pair.srcServiceName,
  319. SrcEndpoint: pair.srcEndpoint,
  320. SrcCmdbID: cmdbID,
  321. DestServiceName: pair.destService,
  322. DestEndpoint: pair.destEndpoint,
  323. DestCmdbID: cmdbID,
  324. RequestCount: 1,
  325. FailedCount: int64(failedCount),
  326. AvgResponseTime: pair.duration,
  327. })
  328. }
  329. }
  330. return ret
  331. }
  332. type serviceEndpointPair struct {
  333. srcServiceName string
  334. srcEndpoint string
  335. destService string
  336. destEndpoint string
  337. hasError bool
  338. duration int64
  339. }
  340. func findTreeEndpointPair(tree *ServerEndpointNode) []serviceEndpointPair {
  341. if tree == nil {
  342. return nil
  343. }
  344. ret := make([]serviceEndpointPair, 0)
  345. if tree.Endpoint != "" {
  346. for _, child := range tree.children {
  347. if child.Endpoint != "" {
  348. ret = append(ret, serviceEndpointPair{
  349. srcServiceName: tree.ServerName,
  350. srcEndpoint: tree.Endpoint,
  351. destService: child.ServerName,
  352. destEndpoint: child.Endpoint,
  353. hasError: child.HasError,
  354. duration: child.Duration,
  355. })
  356. }
  357. }
  358. }
  359. for _, child := range tree.children {
  360. childPairs := findTreeEndpointPair(child)
  361. if len(childPairs) > 0 {
  362. ret = append(ret, childPairs...)
  363. }
  364. }
  365. return ret
  366. }
  367. func (a *Aggregator) loadTraceList(start, end time.Time, tidList []string) (map[string][]Span2Cal, error) {
  368. sql := `
  369. WITH '\\b[a-fA-F0-9]{32,}\\b|\\b[a-fA-F0-9\\-]{36}\\b|\\b\\[1-9]\\d*\\b' AS varPattern,
  370. SpanAttributes['http.route'] AS route,
  371. multiIf( (SpanAttributes['url.path']) != '', SpanAttributes['url.path'], (SpanAttributes['http.target']) != '',
  372. path(SpanAttributes['http.target']), (SpanAttributes['http.url']) != '', path(SpanAttributes['http.url']),
  373. (SpanAttributes['u\nrl.full']) != '', path(SpanAttributes['url.full']),
  374. SpanAttributes['http.route']) AS path
  375. SELECT TraceId,
  376. ServiceName,
  377. SpanId,
  378. ParentSpanId,
  379. Duration,
  380. StatusCodeNumber,
  381. if(route != '', route, replaceRegexpOne(path, varPattern, '{:var}')) AS Route,
  382. AppAlias
  383. from otel.otel_traces
  384. where Timestamp > ? and Timestamp < ? and TraceId in ?
  385. `
  386. ctx, cancel := context.WithTimeout(context.Background(), a.cfg.LoadTimeout())
  387. defer cancel()
  388. spanRows, errQueryTraceId := a.chCli.Query(ctx, sql, start, end,
  389. tidList)
  390. if errQueryTraceId != nil {
  391. a.sqlQueryErrorCounter.Inc()
  392. return nil, errors.Wrapf(errQueryTraceId, "failed to query Traces: %s, sql:%s", tidList, sql)
  393. }
  394. ret := make(map[string][]Span2Cal)
  395. for spanRows.Next() {
  396. var span2Cal Span2Cal
  397. if err := spanRows.ScanStruct(&span2Cal); err != nil {
  398. a.logger.WithError(err).Error("failed to scan span2")
  399. continue
  400. }
  401. _, ok := ret[span2Cal.TraceId]
  402. if !ok {
  403. ret[span2Cal.TraceId] = []Span2Cal{span2Cal}
  404. continue
  405. }
  406. ret[span2Cal.TraceId] = append(ret[span2Cal.TraceId], span2Cal)
  407. }
  408. return ret, nil
  409. }
  410. func (a *Aggregator) calEndPointFromSpans(spans []Span2Cal, targetAppAlias string) ([]*ServerEndpointNode, error) {
  411. nodeMap := make(map[string]*Span2Cal)
  412. for i := range spans {
  413. nodeMap[spans[i].SpanId] = &spans[i]
  414. }
  415. var tree []*Span2Cal
  416. for _, node := range spans {
  417. if node.ParentSpanId == "" {
  418. tree = append(tree, nodeMap[node.SpanId])
  419. continue
  420. }
  421. parent, findParent := nodeMap[node.ParentSpanId]
  422. if !findParent {
  423. // 丢数据了,让这个tree独立出来,最大限度去找
  424. tree = append(tree, nodeMap[node.SpanId])
  425. continue
  426. }
  427. parent.children = append(parent.children, nodeMap[node.SpanId])
  428. }
  429. if len(tree) == 0 {
  430. a.logger.Errorf("no tree found in :%+v", spans)
  431. return nil, fmt.Errorf("no tree found in :%+v", spans)
  432. }
  433. ret := make([]*ServerEndpointNode, 0)
  434. for _, rootSpan := range tree {
  435. sen := EndpointTreeFromSpanTree(rootSpan, nil, targetAppAlias)
  436. if sen != nil {
  437. ret = append(ret, sen...)
  438. }
  439. }
  440. return ret, nil
  441. }
  442. func EndpointTreeFromSpanTree(span *Span2Cal, parentEndpointNode *ServerEndpointNode, targetAppAlias string) []*ServerEndpointNode {
  443. if span == nil {
  444. return nil
  445. }
  446. currentService := span.ServiceName
  447. if (span.AppAlias != targetAppAlias) ||
  448. (parentEndpointNode != nil && parentEndpointNode.ServerName == currentService) {
  449. // 略过本节点,返回子节点结果
  450. ret := make([]*ServerEndpointNode, 0)
  451. for _, child := range span.children {
  452. childNodes := EndpointTreeFromSpanTree(child, parentEndpointNode, targetAppAlias)
  453. if childNodes != nil && len(childNodes) > 0 {
  454. ret = append(ret, childNodes...)
  455. }
  456. }
  457. return ret
  458. }
  459. serverEndpointNode := ServerEndpointNode{
  460. ServerName: currentService,
  461. Endpoint: span.Route,
  462. HasError: span.HasError(),
  463. Duration: span.Duration,
  464. children: nil,
  465. }
  466. for _, childSpan := range span.children {
  467. childNodes := EndpointTreeFromSpanTree(childSpan, &serverEndpointNode, targetAppAlias)
  468. if childNodes != nil && len(childNodes) > 0 {
  469. serverEndpointNode.children = append(serverEndpointNode.children, childNodes...)
  470. }
  471. }
  472. return []*ServerEndpointNode{&serverEndpointNode}
  473. }