yawyd 1 week ago
parent
commit
084c8651f6
4 changed files with 130 additions and 47 deletions
  1. 1 1
      Dockerfile
  2. 39 44
      cmd/i6000pusher/aggregator.go
  3. 89 1
      cmd/i6000pusher/aggregator_test.go
  4. 1 1
      version

+ 1 - 1
Dockerfile

@@ -1,4 +1,4 @@
-FROM golang:1.21.10
+FROM golang:1.23.4
 
 WORKDIR /app
 USER root

+ 39 - 44
cmd/i6000pusher/aggregator.go

@@ -158,19 +158,11 @@ func (a *Aggregator) LoadTraceData(start, end time.Time, appAlias string) ([]Top
 	5. 沿着parent向上找,一直找不到或appAlias是其他的, 找到的span就是顶点
 	6. 顶点就是tx, client span是src, server span是dest
 	*/
-
 	traceIdSql := `
-select distinct TraceId from ( 
-	select TraceId, ServiceName, SpanId from otel.otel_traces where 
-		Timestamp > ? and Timestamp < ? and SpanKindNumber = 3 and AppAlias = ? 
-	)
-as client inner join (
-    select TraceId, ServiceName, SpanId, ParentSpanId, SpanAttributes['http.route'] as route
-		from otel.otel_traces where Timestamp > ? and Timestamp < ?
-		and SpanKindNumber = 2 and ParentSpanId is not null and AppAlias = ? 
-		and route is not null and route != '' 
-	) as server
-on client.TraceId = server.TraceId and client.SpanId = server.ParentSpanId
+    select distinct TraceId from otel.otel_traces_url where 
+		Timestamp > ? and Timestamp < ? and 
+		AppAlias = ? and Route is not null and 
+		Route != ''
 		`
 	traceRows, errQueryTraceId := a.chCli.Query(ctx, traceIdSql, start, end, appAlias,
 		start, end, appAlias)
@@ -212,7 +204,7 @@ on client.TraceId = server.TraceId and client.SpanId = server.ParentSpanId
 			continue
 		}
 		for _, spans := range traceId2Spans {
-			endpointTrees, errCalEndPointTree := a.calEndPointFromSpans(spans)
+			endpointTrees, errCalEndPointTree := a.calEndPointFromSpans(spans, appAlias)
 			if errCalEndPointTree != nil {
 				a.logger.WithError(errCalEndPointTree).Errorf("failed to cal end point, spans:%v", spans)
 				continue
@@ -227,15 +219,15 @@ on client.TraceId = server.TraceId and client.SpanId = server.ParentSpanId
 }
 
 type Span2Cal struct {
-	TraceId          string
-	ServiceName      string
-	SpanId           string
-	ParentSpanId     string
-	Duration         int64
-	SpanKindNumber   int8
-	Route            string
-	StatusCodeNumber int32
-	children         []*Span2Cal
+	TraceId      string
+	ServiceName  string
+	AppAlias     string
+	SpanId       string
+	ParentSpanId string
+	Duration     int64
+	Route        string
+	StatusCode   int64
+	children     []*Span2Cal
 }
 
 type ServerEndpointNode struct {
@@ -338,8 +330,8 @@ func findTreeEndpointPair(tree *ServerEndpointNode) []serviceEndpointPair {
 }
 
 func (a *Aggregator) loadTraceList(start, end time.Time, tidList []string) (map[string][]Span2Cal, error) {
-	sql := ` select TraceId, ServiceName, SpanId, ParentSpanId, Duration, SpanKindNumber,
-       SpanAttributes['http.route'] as Route, StatusCodeNumber from otel.otel_traces
+	sql := ` select TraceId, ServiceName, SpanId, ParentSpanId, Duration, 
+       Route, StatusCode, AppAlias from otel.otel_traces_url
 		where Timestamp > ? and Timestamp < ? and TraceId in ?
 `
 	ctx, cancel := context.WithTimeout(context.Background(), a.cfg.LoadTimeout())
@@ -367,7 +359,7 @@ func (a *Aggregator) loadTraceList(start, end time.Time, tidList []string) (map[
 	return ret, nil
 }
 
-func (a *Aggregator) calEndPointFromSpans(spans []Span2Cal) ([]*ServerEndpointNode, error) {
+func (a *Aggregator) calEndPointFromSpans(spans []Span2Cal, targetAppAlias string) ([]*ServerEndpointNode, error) {
 	nodeMap := make(map[string]*Span2Cal)
 	for i := range spans {
 		nodeMap[spans[i].SpanId] = &spans[i]
@@ -378,8 +370,9 @@ func (a *Aggregator) calEndPointFromSpans(spans []Span2Cal) ([]*ServerEndpointNo
 			tree = append(tree, nodeMap[node.SpanId])
 			continue
 		}
-		parent := nodeMap[node.ParentSpanId]
-		if parent == nil {
+		parent, findParent := nodeMap[node.ParentSpanId]
+		if !findParent {
+			// 丢数据了,让这个tree独立出来,最大限度去找
 			tree = append(tree, nodeMap[node.SpanId])
 			continue
 		}
@@ -391,7 +384,7 @@ func (a *Aggregator) calEndPointFromSpans(spans []Span2Cal) ([]*ServerEndpointNo
 	}
 	ret := make([]*ServerEndpointNode, 0)
 	for _, rootSpan := range tree {
-		sen := EndpointTreeFromSpanTree(rootSpan, nil)
+		sen := EndpointTreeFromSpanTree(rootSpan, nil, targetAppAlias)
 		if sen != nil {
 			ret = append(ret, sen...)
 		}
@@ -399,31 +392,33 @@ func (a *Aggregator) calEndPointFromSpans(spans []Span2Cal) ([]*ServerEndpointNo
 	return ret, nil
 }
 
-func EndpointTreeFromSpanTree(span *Span2Cal, parentEndpointNode *ServerEndpointNode) []*ServerEndpointNode {
+func EndpointTreeFromSpanTree(span *Span2Cal, parentEndpointNode *ServerEndpointNode, targetAppAlias string) []*ServerEndpointNode {
 	if span == nil {
 		return nil
 	}
 	currentService := span.ServiceName
-	if parentEndpointNode == nil || parentEndpointNode.ServerName != currentService {
-		serverEndpointNode := ServerEndpointNode{
-			ServerName: currentService,
-			Endpoint:   span.Route,
-			children:   nil,
-		}
-		for _, childSpan := range span.children {
-			childNodes := EndpointTreeFromSpanTree(childSpan, &serverEndpointNode)
+	if (span.AppAlias != targetAppAlias) ||
+		(parentEndpointNode != nil && parentEndpointNode.ServerName == currentService) {
+		// 略过本节点,返回子节点结果
+		ret := make([]*ServerEndpointNode, 0)
+		for _, child := range span.children {
+			childNodes := EndpointTreeFromSpanTree(child, parentEndpointNode, targetAppAlias)
 			if childNodes != nil && len(childNodes) > 0 {
-				serverEndpointNode.children = append(serverEndpointNode.children, childNodes...)
+				ret = append(ret, childNodes...)
 			}
 		}
-		return []*ServerEndpointNode{&serverEndpointNode}
+		return ret
 	}
-	ret := make([]*ServerEndpointNode, 0)
-	for _, child := range span.children {
-		childNodes := EndpointTreeFromSpanTree(child, parentEndpointNode)
+	serverEndpointNode := ServerEndpointNode{
+		ServerName: currentService,
+		Endpoint:   span.Route,
+		children:   nil,
+	}
+	for _, childSpan := range span.children {
+		childNodes := EndpointTreeFromSpanTree(childSpan, &serverEndpointNode, targetAppAlias)
 		if childNodes != nil && len(childNodes) > 0 {
-			ret = append(ret, childNodes...)
+			serverEndpointNode.children = append(serverEndpointNode.children, childNodes...)
 		}
 	}
-	return ret
+	return []*ServerEndpointNode{&serverEndpointNode}
 }

+ 89 - 1
cmd/i6000pusher/aggregator_test.go

@@ -2,40 +2,126 @@ package main
 
 import "testing"
 
+func TestEndpointTreeFromSpanTreeMultiAppAlias(t *testing.T) {
+	aa := "aAlias"
+	ba := "bAlias"
+	rootSpan := Span2Cal{
+		ServiceName: "A",
+		AppAlias:    ba,
+		children: []*Span2Cal{
+			{
+				ServiceName: "B",
+				AppAlias:    aa,
+				children: []*Span2Cal{
+					{
+						ServiceName: "C",
+						AppAlias:    aa,
+						children:    nil,
+					},
+				},
+			},
+			{
+				ServiceName: "A",
+				AppAlias:    aa,
+				children: []*Span2Cal{
+					{
+						ServiceName: "E",
+						AppAlias:    aa,
+						children:    nil,
+					},
+					{
+						ServiceName: "G",
+						AppAlias:    aa,
+						children:    nil,
+					},
+				},
+			},
+			{
+				ServiceName: "A",
+				AppAlias:    aa,
+				children: []*Span2Cal{
+					{
+						ServiceName: "A",
+						AppAlias:    aa,
+						children: []*Span2Cal{
+							{
+								ServiceName: "F",
+								AppAlias:    aa,
+								children:    nil,
+							},
+						},
+					},
+				},
+			},
+			{
+				ServiceName: "A",
+				AppAlias:    aa,
+				children: []*Span2Cal{
+					{
+						ServiceName: "H",
+						AppAlias:    ba,
+						children: []*Span2Cal{
+							{
+								ServiceName: "I",
+								AppAlias:    ba,
+								children:    nil,
+							},
+						},
+					},
+				},
+			},
+		},
+	}
+	eps := EndpointTreeFromSpanTree(&rootSpan, nil, aa)
+	if len(eps) != 4 {
+		t.Fatalf("len(eps) = %d, want %d, eps:%+v", len(eps), 4, eps)
+	}
+	t.Logf("endpoint tree:%+v", eps[0])
+}
 func TestEndpointTreeFromSpanTree(t *testing.T) {
+	appAlias := "ABC"
 	rootSpan := Span2Cal{
 		ServiceName: "A",
+		AppAlias:    appAlias,
 		children: []*Span2Cal{
 			{
 				ServiceName: "B",
+				AppAlias:    appAlias,
 				children: []*Span2Cal{
 					{
 						ServiceName: "C",
+						AppAlias:    appAlias,
 						children:    nil,
 					},
 				},
 			},
 			{
 				ServiceName: "A",
+				AppAlias:    appAlias,
 				children: []*Span2Cal{
 					{
 						ServiceName: "E",
+						AppAlias:    appAlias,
 						children:    nil,
 					},
 					{
 						ServiceName: "G",
+						AppAlias:    appAlias,
 						children:    nil,
 					},
 				},
 			},
 			{
 				ServiceName: "A",
+				AppAlias:    appAlias,
 				children: []*Span2Cal{
 					{
 						ServiceName: "A",
+						AppAlias:    appAlias,
 						children: []*Span2Cal{
 							{
 								ServiceName: "F",
+								AppAlias:    appAlias,
 								children:    nil,
 							},
 						},
@@ -47,9 +133,11 @@ func TestEndpointTreeFromSpanTree(t *testing.T) {
 				children: []*Span2Cal{
 					{
 						ServiceName: "H",
+						AppAlias:    appAlias,
 						children: []*Span2Cal{
 							{
 								ServiceName: "I",
+								AppAlias:    appAlias,
 								children:    nil,
 							},
 						},
@@ -58,7 +146,7 @@ func TestEndpointTreeFromSpanTree(t *testing.T) {
 			},
 		},
 	}
-	eps := EndpointTreeFromSpanTree(&rootSpan, nil)
+	eps := EndpointTreeFromSpanTree(&rootSpan, nil, appAlias)
 	if len(eps) != 1 {
 		t.Fatalf("len(eps) = %d, want %d, eps:%+v", len(eps), 1, eps)
 	}

+ 1 - 1
version

@@ -1 +1 @@
-v1.3.0
+latest