package neo import ( "encoding/json" "fmt" "go-admin/common/olap" "log" "math" "math/rand" "reflect" "github.com/neo4j/neo4j-go-driver/v4/neo4j" ) // func Run() { // traceId := []string{ // "7714b36323bbcb06f6ac9544782f2ce2", // "7d7211606430994f41ee36b112c4cc28", // "94d8e8e33ff15d894f1aa7c86ed41e3c", // "bb3253d84b75ef81743d9f08f6dee793", // "435ee2993d35eee658fe863405ea44c0", // "922246f263a6b60c2f5215bd4a334a0d", // "b24c8575de4bee3968541208b2a31e09", // "67de55a30fe55ac14510045a2c44bccd", // // "c4eee3e423c9fd1f9e5c64f2fbfc0c8c", // // "0a77cdecf8307c24a0b2cca58cb0bda8", // // "355ee77b77cf759f331354e369f4885a", // // "7ab48fccd68e7b635a9a21eda24f385c", // // "8f5a09f43ee0c01b9edad05e5ee875fd", // // "1f9d04366115fea6095b76ef3eb89ead", // // "72e2a4892e00bf16cbc4872ed5ac7860", // // "90dbd7ea5d8fc72eff9b9465ad4d6c1e", // // "7003def92e3971f5f18f1d5f2b8f93e8", // // "a339701dda8c5b4593729fd548816802", // // "c9e4d6417c29ea567cc1d9922ff0525e", // // "d6e8810867a7d5a0d44b80cab7fc8208", // // "bf2177ad18ce0df68866d65feabce0f4", // // "e9efddf40b589370be369ab9519129c9", // // "69816e28ccff3be6f7cf82a5bec0470b", // // "bbf932b3b7d50e194704f8966af05934", // // "f0114e32f45e519e203ae24876757f44", // // "f7f44e0c17099410da48ec753e009de6", // // "afb3f5e1cb11580a1ce0d08fe2f43a6a", // // "eec6c58c7479f5e6466045b9ddaee1df", // // "8b155fef092839cb838ff40352a6cdb0", // // "c9320da7df5d241c8015072ce48fad77", // // "8304a07c64993c1d0bf1a9d8b7033bb0", // // "11f8e57b713b8f928284a7c42304b3a1", // // "7928231613294ddae1716516942e58cc", // // "76bbd69be7b411b050ab478348b971ff", // // "f0198194a3dd811585c027ac9c82a175", // // "4d715c5eb3f793ae84229cf52c347c37", // // "8398929eb60035c1dc572bd3cb9adf43", // // "a89519c9c75987956823620fcf8a32c1", // // "63dbe8597abe6db0037ed634903e1fd3", // // "d1d8a7b8b460be9ac06acb4b5d722052", // // "60f17a55113bcd4434ba468a36f7ac26", // // "c1d0af28c2c5a17796b0032bb34db9ce", // // "e2d2f43e847a3ba6c114dd4f570e5a24", // // "439c9fb844b45064b6681162015645e7", // } // for _, id := range traceId { // run(id) // } // } func Run() { chInstance := olap.GetClickhouseOrm() query := fmt.Sprintf( "SELECT Timestamp,TraceId,AppAlias, ServiceName, ResourceAttributes, SpanAttributes FROM otel_traces Where TraceId IN ('7714b36323bbcb06f6ac9544782f2ce2', '7d7211606430994f41ee36b112c4cc28', '94d8e8e33ff15d894f1aa7c86ed41e3c','bb3253d84b75ef81743d9f08f6dee793','435ee2993d35eee658fe863405ea44c0')") rows, err := chInstance.Table("otel_traces").Raw(query).Rows() if err != nil { panic(err) } defer rows.Close() // 连接到Neo4j数据库 // neo4jURL := "bolt://182.92.239.90:30671" // neo4jURL := "neo4j+s://4da7a2b0.databases.neo4j.io" // // neo4jConn, err := neo4j.NewDriver(neo4jURL, neo4j.BasicAuth("neo4j", "insur132", "")) // neo4jConn, err := neo4j.NewDriver(neo4jURL, neo4j.BasicAuth("neo4j", "JcE5ri3NbhlF0lEMzGiHsbJRK3ZLCM8VDYQ3OqeSnn8", "")) // if err != nil { // panic(err) // } // defer neo4jConn.Close() // neo4jSession := neo4jConn.NewSession(neo4j.SessionConfig{AccessMode: neo4j.AccessModeWrite, DatabaseName: "neo4j"}) // defer neo4jSession.Close() gi := new(GraphInsightModel) nodes := make(map[string]map[string]interface{}) edges := make(map[string]map[string]interface{}) // 解析并导入数据到Neo4j for rows.Next() { ti := new(TraceInfo) err := chInstance.ScanRows(rows, ti) if err != nil { panic(err) } // neo4j处理模式 // ti.neo4jHandler(neo4jSession) // graphInsight处理模式 ti.graphHandler(nodes, edges) } if err := rows.Err(); err != nil { panic(err) } for _, v := range nodes { gi.Nodes = append(gi.Nodes, v) } for _, v := range edges { gi.Edges = append(gi.Edges, v) } sj, err := json.Marshal(gi) if err != nil { log.Panicln(err) } fmt.Println("==========================================================") fmt.Println("") fmt.Println(string(sj)) return } type GraphInsightModel struct { Nodes []map[string]interface{} `json:"nodes"` Edges []map[string]interface{} `json:"edges"` } func cypherWrite(session neo4j.Session, sql string, params map[string]interface{}) error { props, err := session.WriteTransaction(func(tx neo4j.Transaction) (interface{}, error) { result, err := tx.Run(sql, params) if err != nil { log.Println("wirte to DB with error:", err) return nil, err } return result.Consume() }) log.Println("cypher: ", sql) log.Println("props: ", props) return err } func nodeQuery(driver neo4j.Driver, Cypher string, DB string) ([]neo4j.Node, error) { var list []neo4j.Node session := driver.NewSession(neo4j.SessionConfig{AccessMode: neo4j.AccessModeRead}) defer session.Close() _, err := session.ReadTransaction(func(tx neo4j.Transaction) (interface{}, error) { result, err := tx.Run(Cypher, nil) if err != nil { return nil, err } for result.Next() { record := result.Record() if value, ok := record.Get("p"); ok { node := value.(neo4j.Node) list = append(list, node) } } if err = result.Err(); err != nil { return nil, err } return list, result.Err() }) if err != nil { log.Println("Read error:", err) } return list, err } func relationshipQuery(driver neo4j.Driver, Cypher string, DB string) ([]neo4j.Relationship, error) { var list []neo4j.Relationship session := driver.NewSession(neo4j.SessionConfig{AccessMode: neo4j.AccessModeRead}) defer session.Close() _, err := session.ReadTransaction(func(tx neo4j.Transaction) (interface{}, error) { result, err := tx.Run(Cypher, nil) if err != nil { log.Println("RelationshipQuery failed: ", err) return nil, err } for result.Next() { record := result.Record() if value, ok := record.Get("r"); ok { relationship := value.(neo4j.Relationship) list = append(list, relationship) } } if err = result.Err(); err != nil { return nil, err } return list, result.Err() }) if err != nil { log.Println("Read error:", err) } return list, err } /* 设定分层 1.业务 app 2.接口 api 3.服务 service 4.进程/线程 process 5.容器 container 5.5 中间件 消息系统、数据库、缓存库 6.操作系统 os 7.云/设施 */ const ( SPAN_KIND_INTERNAL = "SPAN_KIND_INTERNAL" SPAN_KIND_SERVER = "SPAN_KIND_SERVER" SPAN_KIND_CLIENT = "SPAN_KIND_CLIENT" ) func mapToStruct(m map[string]interface{}, s interface{}) { structValue := reflect.ValueOf(s).Elem() structType := structValue.Type() for i := 0; i < structValue.NumField(); i++ { field := structValue.Field(i) fieldType := structType.Field(i) // 检查 map 中是否存在结构字段对应的键 value, ok := m[fieldType.Name] if !ok { continue } // 将 map 中的值转换为结构字段的类型 fieldValue := reflect.ValueOf(value) if fieldValue.Type().AssignableTo(field.Type()) { field.Set(fieldValue) } else { fmt.Printf("Type mismatch for field %s\n", fieldType.Name) } } } type TraceInfo struct { Timestamp string `json:"Timestamp"` TraceId string `json:"TraceId"` AppAlias string `json:"AppAlias"` SpanKind string `json:"SpanKind"` ServiceName string `json:"ServiceName"` ResourceAttributes map[string]string `json:"ResourceAttributes"` SpanAttributes map[string]string `json:"SpanAttributes"` } // func (ti *TraceInfo) appHandler(api map[string]interface{}) { // if value, ok := ti.SpanAttributes["rpc.method"]; ok { // api["rpc.method"] = value // } // api["app_alias"] = ti.AppAlias // api["service_name"] = ti.ServiceName // api["timestamp"] = ti.Timestamp // return // } func (ti *TraceInfo) graphHandler(n, e map[string]map[string]interface{}) { /* { "nodes": [ { "id": "8b6a17f2-eb82-4340-abc2-fd24c57bdd7a", "_name": "Juan Hirthe", "_phone": "474-339-7595", "_email": "Selmer18@yahoo.com", "user_type": "Others", "total_pay_amt": 0, "total_receive_amt": 55000 }, { "id": "31da7f71-33f6-48f7-baff-ac88e8151a47", "_name": "Agnes Dach", "_phone": "1-587-272-2478 x88717", "_email": "Ilene.Kub@gmail.com", "user_type": "Others", "total_pay_amt": 17624, "total_receive_amt": 14045.37 }], "edges": [ { "id": "618ff2ba-4c70-4e18-9825-ae2c140c94c1", "source": "f936adc7-f660-4694-9ac9-1bff7d468d6f", "target": "b21900c3-8e3c-43e9-9065-16b8bb40428a", "total_amt": 27000, "total_times": 2, "trade_type": "未知" }, { "id": "09bb8cf1-fc11-464c-a8e0-c0d670d3d423", "source": "b21900c3-8e3c-43e9-9065-16b8bb40428a", "target": "f936adc7-f660-4694-9ac9-1bff7d468d6f", "total_amt": 233543, "total_times": 7, "trade_type": "未知" }] */ //处理节点 appProps := make(map[string]interface{}) ti.appGraphHandler(appProps) log.Println("app props: ", appProps) n[fmt.Sprintf("%v", (appProps["name"]))] = appProps apiProps := make(map[string]interface{}) ti.apiGraphHandler(apiProps) log.Println("api props: ", apiProps) n[fmt.Sprintf("%v", (apiProps["name"]))] = apiProps serviceProps := make(map[string]interface{}) ti.serviceGraphHandler(serviceProps) log.Println("service props: ", serviceProps) n[fmt.Sprintf("%v", (serviceProps["name"]))] = serviceProps processProps := make(map[string]interface{}) ti.processGraphHandler(processProps) log.Println("process props: ", processProps) n[fmt.Sprintf("%v", (processProps["name"]))] = processProps containerProps := make(map[string]interface{}) ti.containerGraphHandler(containerProps) log.Println("container props: ", containerProps) n[fmt.Sprintf("%v", (containerProps["name"]))] = containerProps osProps := make(map[string]interface{}) ti.osGraphHandler(osProps) log.Println("os props: ", osProps) n[fmt.Sprintf("%v", (osProps["name"]))] = osProps cloudProps := make(map[string]interface{}) ti.cloudGraphHandler(cloudProps) log.Println("cloud props: ", cloudProps) n[fmt.Sprintf("%v", (cloudProps["name"]))] = cloudProps // 处理边 e[fmt.Sprintf("%v+%v", (appProps["name"]), apiProps["name"])] = map[string]interface{}{ "id": fmt.Sprintf("%v+%v", (appProps["name"]), apiProps["name"]), "source": appProps["name"], "target": apiProps["name"], "total_times": 3451, "edge_type": "INCLUDE", } e[fmt.Sprintf("%v+%v", (apiProps["name"]), serviceProps["name"])] = map[string]interface{}{ "id": fmt.Sprintf("%v+%v", (apiProps["name"]), serviceProps["name"]), "source": apiProps["name"], "target": serviceProps["name"], "total_times": 872, "edge_type": "BELONG", } e[fmt.Sprintf("%v+%v", (serviceProps["name"]), processProps["name"])] = map[string]interface{}{ "id": fmt.Sprintf("%v+%v", (serviceProps["name"]), processProps["name"]), "source": serviceProps["name"], "target": processProps["name"], "total_times": 72, "edge_type": "INCLUDE", } e[fmt.Sprintf("%v+%v", (processProps["name"]), containerProps["name"])] = map[string]interface{}{ "id": fmt.Sprintf("%v+%v", (processProps["name"]), containerProps["name"]), "source": processProps["name"], "target": containerProps["name"], "total_times": 462, "edge_type": "IN", } e[fmt.Sprintf("%v+%v", (containerProps["name"]), osProps["name"])] = map[string]interface{}{ "id": fmt.Sprintf("%v+%v", (containerProps["name"]), osProps["name"]), "source": containerProps["name"], "target": osProps["name"], "total_times": 187, "edge_type": "IN", } e[fmt.Sprintf("%v+%v", (osProps["name"]), cloudProps["name"])] = map[string]interface{}{ "id": fmt.Sprintf("%v+%v", (osProps["name"]), cloudProps["name"]), "source": osProps["name"], "target": cloudProps["name"], "total_times": 95, "edge_type": "IN", } return } func (ti *TraceInfo) neo4jHandler(neo4jSession neo4j.Session) { appProps := make(map[string]interface{}) ti.appHandler(appProps) log.Println("app props: ", appProps) // appSql := "CREATE (n:App) SET n = $props RETURN n" appSql := fmt.Sprintf("MERGE (n:App {name: '%s'}) ON CREATE SET n = $props RETURN n", appProps["name"]) if err := cypherWrite(neo4jSession, appSql, map[string]interface{}{"props": appProps}); err != nil { log.Println("app cypher: ", err) } apiProps := make(map[string]interface{}) ti.apiHandler(apiProps) log.Println("api props: ", apiProps) apiSql := fmt.Sprintf("MERGE (n:Api {name: '%s'}) ON CREATE SET n = $props RETURN n", apiProps["name"]) // apiSql := "CREATE (n:Api) SET n = $props RETURN n" if err := cypherWrite(neo4jSession, apiSql, map[string]interface{}{"props": apiProps}); err != nil { log.Println("api cypher: ", err) } serviceProps := make(map[string]interface{}) ti.serviceHandler(serviceProps) log.Println("service props: ", serviceProps) // serviceSql := "CREATE (n:Service) SET n = $props RETURN n" serviceSql := fmt.Sprintf("MERGE (n:Service {name: '%s'}) ON CREATE SET n = $props RETURN n", serviceProps["name"]) if err := cypherWrite(neo4jSession, serviceSql, map[string]interface{}{"props": serviceProps}); err != nil { log.Println("service cypher: ", err) } processProps := make(map[string]interface{}) ti.processHandler(processProps) log.Println("process props: ", processProps) // processSql := "CREATE (n:Process) SET n = $props RETURN n" processSql := fmt.Sprintf("MERGE (n:Process {name: '%s'}) ON CREATE SET n = $props RETURN n", processProps["name"]) if err := cypherWrite(neo4jSession, processSql, map[string]interface{}{"props": processProps}); err != nil { log.Println("process cypher: ", err) } containerProps := make(map[string]interface{}) ti.containerHandler(containerProps) log.Println("container props: ", containerProps) // containerSql := "CREATE (n:Container) SET n = $props RETURN n" containerSql := fmt.Sprintf("MERGE (n:Container {name: '%s'}) ON CREATE SET n = $props RETURN n", containerProps["name"]) if err := cypherWrite(neo4jSession, containerSql, map[string]interface{}{"props": containerProps}); err != nil { log.Println("Container cypher: ", err) } osProps := make(map[string]interface{}) ti.osHandler(osProps) log.Println("os props: ", osProps) osSql := fmt.Sprintf("MERGE (n:Os {name: '%s'}) ON CREATE SET n = $props RETURN n", osProps["name"]) // osSql := "CREATE (n:Os) SET n = $props RETURN n" if err := cypherWrite(neo4jSession, osSql, map[string]interface{}{"props": osProps}); err != nil { log.Println("os cypher: ", err) } cloudProps := make(map[string]interface{}) ti.cloudHandler(cloudProps) log.Println("cloud props: ", cloudProps) cloudSql := fmt.Sprintf("MERGE (n:Cloud {name: '%s'}) ON CREATE SET n = $props RETURN n", cloudProps["name"]) // cloudSql := "CREATE (n:Cloud) SET n = $props RETURN n" if err := cypherWrite(neo4jSession, cloudSql, map[string]interface{}{"props": cloudProps}); err != nil { log.Println("cloud cypher: ", err) } var err error // 创建链接关系 _, err = neo4jSession.Run("MATCH (n1:App {name: $name1}),(n2:Api {name: $name2}) MERGE (n1)-[r:INCLUDE {count: 3451}}]->(n2) RETURN r", map[string]interface{}{ "name1": appProps["name"], "name2": apiProps["name"], }) // _, err = neo4jSession.Run("MATCH (n1:App {name: $name1}),(n2:Service {name: $name2}) MERGE (n1)-[r:INCLUDE]->(n2) RETURN r", map[string]interface{}{ // "name1": appProps["name"], // "name2": serviceProps["name"], // }) _, err = neo4jSession.Run("MATCH (n1:Api {name: $name1}),(n2:Service {name: $name2}) MERGE (n1)-[r:BELONG {count: 345}]->(n2) RETURN r", map[string]interface{}{ "name1": apiProps["name"], "name2": serviceProps["name"], }) _, err = neo4jSession.Run("MATCH (n1:Service {name: $name1}),(n2:Process {name: $name2}) MERGE (n1)-[r:INCLUDE{count: 876}]->(n2) RETURN r", map[string]interface{}{ "name1": serviceProps["name"], "name2": processProps["name"], }) _, err = neo4jSession.Run("MATCH (n1:Process {name: $name1}),(n2:Container {name: $name2}) MERGE (n1)-[r:IN{count: 426}]->(n2) RETURN r", map[string]interface{}{ "name1": processProps["name"], "name2": containerProps["name"], }) _, err = neo4jSession.Run("MATCH (n1:Container {name: $name1}),(n2:Os {name: $name2}) MERGE (n1)-[r:IN{count: 187}]->(n2) RETURN r", map[string]interface{}{ "name1": containerProps["name"], "name2": osProps["name"], }) _, err = neo4jSession.Run("MATCH (n1:Os {name: $name1}),(n2:Cloud {name: $name2}) MERGE (n1)-[r:IN{count: 39}]->(n2) RETURN r", map[string]interface{}{ "name1": osProps["name"], "name2": cloudProps["name"], }) if err != nil { panic(err) } } func (ti *TraceInfo) appHandler(app map[string]interface{}) { app["name"] = ti.AppAlias app["app_alias"] = ti.AppAlias app["timestamp"] = ti.Timestamp app["apdex"] = 0.97 return } func (ti *TraceInfo) appGraphHandler(app map[string]interface{}) { app["name"] = ti.AppAlias app["app_alias"] = ti.AppAlias app["timestamp"] = ti.Timestamp app["apdex"] = 0.97 app["id"] = ti.AppAlias app["node_type"] = "app" return } func (ti *TraceInfo) apiHandler(api map[string]interface{}) { if value, ok := ti.SpanAttributes["rpc.method"]; ok { api["rpc.method"] = value api["name"] = value } if value, ok := ti.SpanAttributes["code.function"]; ok { api["code.function"] = value api["name"] = value } if value, ok := ti.SpanAttributes["http.url"]; ok { api["http.url"] = value api["name"] = value } if value, ok := ti.SpanAttributes["http.route"]; ok { api["http.route"] = value api["name"] = value } if value, ok := ti.SpanAttributes["http.method"]; ok { api["http.method"] = value } else if value, ok := ti.SpanAttributes["http.request.method"]; ok { api["http.method"] = value } if value, ok := ti.SpanAttributes["http.client_ip"]; ok { api["http.client_ip"] = value } if value, ok := ti.SpanAttributes["http.target"]; ok { api["http.target"] = value } if value, ok := ti.SpanAttributes["net.host.name"]; ok { api["net.host.name"] = value } if value, ok := ti.SpanAttributes["net.peer.name"]; ok { api["net.peer.name"] = value } api["span_kind"] = ti.SpanKind api["app_alias"] = ti.AppAlias api["service_name"] = ti.ServiceName api["timestamp"] = ti.Timestamp api["apdex"] = generateRandomFloat(0.7, 1.0) api["err_rate"] = generateRandomFloat(0.8, 1.0) api["latency"] = generateRandomFloat(0.01, 12) return } func (ti *TraceInfo) apiGraphHandler(api map[string]interface{}) { api["name"] = "unknow-api" if value, ok := ti.SpanAttributes["rpc.method"]; ok { api["rpc.method"] = value api["name"] = value } if value, ok := ti.SpanAttributes["code.function"]; ok { api["code.function"] = value api["name"] = value } if value, ok := ti.SpanAttributes["http.url"]; ok { api["http.url"] = value api["name"] = value } if value, ok := ti.SpanAttributes["http.route"]; ok { api["http.route"] = value api["name"] = value } if value, ok := ti.SpanAttributes["http.method"]; ok { api["http.method"] = value } if value, ok := ti.SpanAttributes["http.request.method"]; ok { api["http.method"] = value } if value, ok := ti.SpanAttributes["http.client_ip"]; ok { api["http.client_ip"] = value } if value, ok := ti.SpanAttributes["http.target"]; ok { api["http.target"] = value } if value, ok := ti.SpanAttributes["net.host.name"]; ok { api["net.host.name"] = value } if value, ok := ti.SpanAttributes["net.peer.name"]; ok { api["net.peer.name"] = value } api["span_kind"] = ti.SpanKind api["app_alias"] = ti.AppAlias api["service_name"] = ti.ServiceName api["timestamp"] = ti.Timestamp api["apdex"] = generateRandomFloat(0.7, 1.0) api["err_rate"] = generateRandomFloat(0.8, 1.0) api["latency"] = generateRandomFloat(0.01, 12) api["id"] = api["name"] api["node_type"] = "api" return } func (ti *TraceInfo) serviceHandler(service map[string]interface{}) { service["service_name"] = ti.ServiceName service["name"] = ti.ServiceName if value, ok := ti.ResourceAttributes["process.runtime.name"]; ok { service["process_name"] = value } service["timestamp"] = ti.Timestamp service["apdex"] = generateRandomFloat(0.6, 1.0) return } func (ti *TraceInfo) serviceGraphHandler(service map[string]interface{}) { service["service_name"] = ti.ServiceName service["name"] = ti.ServiceName if value, ok := ti.ResourceAttributes["process.runtime.name"]; ok { service["process_name"] = value } service["timestamp"] = ti.Timestamp service["apdex"] = generateRandomFloat(0.6, 1.0) service["id"] = ti.ServiceName service["node_type"] = "service" return } func (ti *TraceInfo) processHandler(process map[string]interface{}) { if value, ok := ti.ResourceAttributes["process.runtime.name"]; ok { process["process_name"] = value } if value, ok := ti.ResourceAttributes["process.runtime.description"]; ok { process["process.runtime.description"] = value } if value, ok := ti.ResourceAttributes["process.executable.path"]; ok { process["process.executable.path"] = value } if value, ok := ti.ResourceAttributes["process.command_line"]; ok { process["process.command_line"] = value process["name"] = value } if value, ok := ti.ResourceAttributes["process.runtime.version"]; ok { process["process.runtime.version"] = value } if value, ok := ti.ResourceAttributes["process.command"]; ok { process["process.command"] = value } if value, ok := ti.ResourceAttributes["container.id"]; ok { process["container.id"] = value } if value, ok := ti.ResourceAttributes["k8s.pod.name"]; ok { process["pod.name"] = value } process["timestamp"] = ti.Timestamp return /* 原有的process运行在某个pod中,当发生重新调度或者重新部署后,pod中的容器会重建, 相当于断开了之前process与容器的关系,所以,当有该process信息解析时,需要判断是否是之前的container, 如果有对端的container长时间未更新,则有可能是不再调度。但是这个条件不足于支撑直接删除该container链接关系, 还需要有其他的条件做支撑,暂时不删除关联,等找到这个支撑条件再处理 TODO: */ } func (ti *TraceInfo) processGraphHandler(process map[string]interface{}) { process["name"] = "unknow-process" if value, ok := ti.ResourceAttributes["process.runtime.name"]; ok { process["process_name"] = value } if value, ok := ti.ResourceAttributes["process.runtime.description"]; ok { process["process.runtime.description"] = value } if value, ok := ti.ResourceAttributes["process.executable.path"]; ok { process["process.executable.path"] = value } if value, ok := ti.ResourceAttributes["process.command"]; ok { process["process.command"] = value process["name"] = value } if value, ok := ti.ResourceAttributes["process.command_args"]; ok { process["process.command_args"] = value process["name"] = value } if value, ok := ti.ResourceAttributes["process.command_line"]; ok { process["process.command_line"] = value process["name"] = value } if value, ok := ti.ResourceAttributes["process.runtime.version"]; ok { process["process.runtime.version"] = value } if value, ok := ti.ResourceAttributes["container.id"]; ok { process["container.id"] = value } if value, ok := ti.ResourceAttributes["k8s.pod.name"]; ok { process["pod.name"] = value } process["timestamp"] = ti.Timestamp process["id"] = process["name"] process["node_type"] = "process" if process["name"] == "unknow-process" { process["trace_id"] = ti.TraceId process["id"] = process["container.id"] } return /* 原有的process运行在某个pod中,当发生重新调度或者重新部署后,pod中的容器会重建, 相当于断开了之前process与容器的关系,所以,当有该process信息解析时,需要判断是否是之前的container, 如果有对端的container长时间未更新,则有可能是不再调度。但是这个条件不足于支撑直接删除该container链接关系, 还需要有其他的条件做支撑,暂时不删除关联,等找到这个支撑条件再处理 TODO: */ } func (ti *TraceInfo) containerHandler(container map[string]interface{}) { container["timestamp"] = ti.Timestamp if value, ok := ti.ResourceAttributes["container.id"]; ok { container["container.id"] = value } if value, ok := ti.ResourceAttributes["k8s.namespace.name"]; ok { container["k8s.namespace.name"] = value } if value, ok := ti.ResourceAttributes["k8s.deployment.name"]; ok { container["k8s.deployment.name"] = value } if value, ok := ti.ResourceAttributes["k8s.node.name"]; ok { container["k8s.node.name"] = value } if value, ok := ti.ResourceAttributes["k8s.pod.name"]; ok { container["pod.name"] = value container["name"] = value } if value, ok := ti.ResourceAttributes["k8s.pod.ip"]; ok { container["pod.ip"] = value } return } func (ti *TraceInfo) containerGraphHandler(container map[string]interface{}) { container["timestamp"] = ti.Timestamp if value, ok := ti.ResourceAttributes["container.id"]; ok { container["container.id"] = value } if value, ok := ti.ResourceAttributes["k8s.namespace.name"]; ok { container["k8s.namespace.name"] = value } if value, ok := ti.ResourceAttributes["k8s.deployment.name"]; ok { container["k8s.deployment.name"] = value } if value, ok := ti.ResourceAttributes["k8s.node.name"]; ok { container["k8s.node.name"] = value } if value, ok := ti.ResourceAttributes["k8s.pod.name"]; ok { container["pod.name"] = value container["name"] = value } if value, ok := ti.ResourceAttributes["k8s.pod.ip"]; ok { container["pod.ip"] = value } container["id"] = container["name"] container["node_type"] = "container" return } func (ti *TraceInfo) osHandler(os map[string]interface{}) { os["timestamp"] = ti.Timestamp if value, ok := ti.ResourceAttributes["k8s.node.name"]; ok { os["k8s.node.name"] = value os["name"] = value } if value, ok := ti.ResourceAttributes["host.arch"]; ok { os["host.arch"] = value } if value, ok := ti.ResourceAttributes["os.description"]; ok { os["os.description"] = value } if value, ok := ti.ResourceAttributes["os.type"]; ok { os["os.type"] = value } if value, ok := ti.ResourceAttributes["os.id"]; ok { os["os.id"] = value } if value, ok := ti.ResourceAttributes["os.version"]; ok { os["os.version"] = value } if value, ok := ti.ResourceAttributes["hots.type"]; ok { os["host.type"] = value } if value, ok := ti.ResourceAttributes["cloud.platform"]; ok { os["cloud.platform"] = value } os["memory"] = generateRandomFloat(0.1, 0.8) os["disk"] = generateRandomFloat(0.1, 0.8) os["load"] = generateRandomFloat(0.01, 4.0) return } func (ti *TraceInfo) osGraphHandler(os map[string]interface{}) { os["timestamp"] = ti.Timestamp if value, ok := ti.ResourceAttributes["k8s.node.name"]; ok { os["k8s.node.name"] = value os["name"] = value } if value, ok := ti.ResourceAttributes["host.arch"]; ok { os["host.arch"] = value } if value, ok := ti.ResourceAttributes["os.description"]; ok { os["os.description"] = value } if value, ok := ti.ResourceAttributes["os.type"]; ok { os["os.type"] = value } if value, ok := ti.ResourceAttributes["os.id"]; ok { os["os.id"] = value } if value, ok := ti.ResourceAttributes["os.version"]; ok { os["os.version"] = value } if value, ok := ti.ResourceAttributes["hots.type"]; ok { os["host.type"] = value } if value, ok := ti.ResourceAttributes["cloud.platform"]; ok { os["cloud.platform"] = value } os["memory"] = generateRandomFloat(0.1, 0.8) os["disk"] = generateRandomFloat(0.1, 0.8) os["load"] = generateRandomFloat(0.01, 4.0) os["id"] = os["name"] os["node_type"] = "os" return } func (ti *TraceInfo) cloudHandler(cloud map[string]interface{}) { cloud["timestamp"] = ti.Timestamp if value, ok := ti.ResourceAttributes["cloud.platform"]; ok { cloud["cloud.platform"] = value cloud["name"] = value } else { cloud["name"] = "unkonw" } if value, ok := ti.ResourceAttributes["cloud.region"]; ok { cloud["cloud.region"] = value } if value, ok := ti.ResourceAttributes["cloud.provider"]; ok { cloud["cloud.provider"] = value } return } func (ti *TraceInfo) cloudGraphHandler(cloud map[string]interface{}) { cloud["timestamp"] = ti.Timestamp if value, ok := ti.ResourceAttributes["cloud.platform"]; ok { cloud["cloud.platform"] = value cloud["name"] = value } else { cloud["name"] = "unkonw" } if value, ok := ti.ResourceAttributes["cloud.region"]; ok { cloud["cloud.region"] = value } if value, ok := ti.ResourceAttributes["cloud.provider"]; ok { cloud["cloud.provider"] = value } cloud["id"] = cloud["name"] cloud["node_type"] = "cloud" return } func generateRandomFloat(min, max float64) float64 { randomFloat := min + rand.Float64()*(max-min) return math.Round(randomFloat*100) / 100 // 保留两位小数 }