handler.go 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873
  1. package neo
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "go-admin/common/olap"
  6. "log"
  7. "math"
  8. "math/rand"
  9. "reflect"
  10. "github.com/neo4j/neo4j-go-driver/v4/neo4j"
  11. )
  12. // func Run() {
  13. // traceId := []string{
  14. // "7714b36323bbcb06f6ac9544782f2ce2",
  15. // "7d7211606430994f41ee36b112c4cc28",
  16. // "94d8e8e33ff15d894f1aa7c86ed41e3c",
  17. // "bb3253d84b75ef81743d9f08f6dee793",
  18. // "435ee2993d35eee658fe863405ea44c0",
  19. // "922246f263a6b60c2f5215bd4a334a0d",
  20. // "b24c8575de4bee3968541208b2a31e09",
  21. // "67de55a30fe55ac14510045a2c44bccd",
  22. // // "c4eee3e423c9fd1f9e5c64f2fbfc0c8c",
  23. // // "0a77cdecf8307c24a0b2cca58cb0bda8",
  24. // // "355ee77b77cf759f331354e369f4885a",
  25. // // "7ab48fccd68e7b635a9a21eda24f385c",
  26. // // "8f5a09f43ee0c01b9edad05e5ee875fd",
  27. // // "1f9d04366115fea6095b76ef3eb89ead",
  28. // // "72e2a4892e00bf16cbc4872ed5ac7860",
  29. // // "90dbd7ea5d8fc72eff9b9465ad4d6c1e",
  30. // // "7003def92e3971f5f18f1d5f2b8f93e8",
  31. // // "a339701dda8c5b4593729fd548816802",
  32. // // "c9e4d6417c29ea567cc1d9922ff0525e",
  33. // // "d6e8810867a7d5a0d44b80cab7fc8208",
  34. // // "bf2177ad18ce0df68866d65feabce0f4",
  35. // // "e9efddf40b589370be369ab9519129c9",
  36. // // "69816e28ccff3be6f7cf82a5bec0470b",
  37. // // "bbf932b3b7d50e194704f8966af05934",
  38. // // "f0114e32f45e519e203ae24876757f44",
  39. // // "f7f44e0c17099410da48ec753e009de6",
  40. // // "afb3f5e1cb11580a1ce0d08fe2f43a6a",
  41. // // "eec6c58c7479f5e6466045b9ddaee1df",
  42. // // "8b155fef092839cb838ff40352a6cdb0",
  43. // // "c9320da7df5d241c8015072ce48fad77",
  44. // // "8304a07c64993c1d0bf1a9d8b7033bb0",
  45. // // "11f8e57b713b8f928284a7c42304b3a1",
  46. // // "7928231613294ddae1716516942e58cc",
  47. // // "76bbd69be7b411b050ab478348b971ff",
  48. // // "f0198194a3dd811585c027ac9c82a175",
  49. // // "4d715c5eb3f793ae84229cf52c347c37",
  50. // // "8398929eb60035c1dc572bd3cb9adf43",
  51. // // "a89519c9c75987956823620fcf8a32c1",
  52. // // "63dbe8597abe6db0037ed634903e1fd3",
  53. // // "d1d8a7b8b460be9ac06acb4b5d722052",
  54. // // "60f17a55113bcd4434ba468a36f7ac26",
  55. // // "c1d0af28c2c5a17796b0032bb34db9ce",
  56. // // "e2d2f43e847a3ba6c114dd4f570e5a24",
  57. // // "439c9fb844b45064b6681162015645e7",
  58. // }
  59. // for _, id := range traceId {
  60. // run(id)
  61. // }
  62. // }
  63. func Run() {
  64. chInstance := olap.GetClickhouseOrm()
  65. query := fmt.Sprintf(
  66. "SELECT Timestamp,TraceId,AppAlias, ServiceName, ResourceAttributes, SpanAttributes FROM otel_traces Where TraceId IN ('7714b36323bbcb06f6ac9544782f2ce2', '7d7211606430994f41ee36b112c4cc28', '94d8e8e33ff15d894f1aa7c86ed41e3c','bb3253d84b75ef81743d9f08f6dee793','435ee2993d35eee658fe863405ea44c0')")
  67. rows, err := chInstance.Table("otel_traces").Raw(query).Rows()
  68. if err != nil {
  69. panic(err)
  70. }
  71. defer rows.Close()
  72. // 连接到Neo4j数据库
  73. // neo4jURL := "bolt://182.92.239.90:30671"
  74. // neo4jURL := "neo4j+s://4da7a2b0.databases.neo4j.io"
  75. // // neo4jConn, err := neo4j.NewDriver(neo4jURL, neo4j.BasicAuth("neo4j", "insur132", ""))
  76. // neo4jConn, err := neo4j.NewDriver(neo4jURL, neo4j.BasicAuth("neo4j", "JcE5ri3NbhlF0lEMzGiHsbJRK3ZLCM8VDYQ3OqeSnn8", ""))
  77. // if err != nil {
  78. // panic(err)
  79. // }
  80. // defer neo4jConn.Close()
  81. // neo4jSession := neo4jConn.NewSession(neo4j.SessionConfig{AccessMode: neo4j.AccessModeWrite, DatabaseName: "neo4j"})
  82. // defer neo4jSession.Close()
  83. gi := new(GraphInsightModel)
  84. nodes := make(map[string]map[string]interface{})
  85. edges := make(map[string]map[string]interface{})
  86. // 解析并导入数据到Neo4j
  87. for rows.Next() {
  88. ti := new(TraceInfo)
  89. err := chInstance.ScanRows(rows, ti)
  90. if err != nil {
  91. panic(err)
  92. }
  93. // neo4j处理模式
  94. // ti.neo4jHandler(neo4jSession)
  95. // graphInsight处理模式
  96. ti.graphHandler(nodes, edges)
  97. }
  98. if err := rows.Err(); err != nil {
  99. panic(err)
  100. }
  101. for _, v := range nodes {
  102. gi.Nodes = append(gi.Nodes, v)
  103. }
  104. for _, v := range edges {
  105. gi.Edges = append(gi.Edges, v)
  106. }
  107. sj, err := json.Marshal(gi)
  108. if err != nil {
  109. log.Panicln(err)
  110. }
  111. fmt.Println("==========================================================")
  112. fmt.Println("")
  113. fmt.Println(string(sj))
  114. return
  115. }
  116. type GraphInsightModel struct {
  117. Nodes []map[string]interface{} `json:"nodes"`
  118. Edges []map[string]interface{} `json:"edges"`
  119. }
  120. func cypherWrite(session neo4j.Session, sql string, params map[string]interface{}) error {
  121. props, err := session.WriteTransaction(func(tx neo4j.Transaction) (interface{}, error) {
  122. result, err := tx.Run(sql, params)
  123. if err != nil {
  124. log.Println("wirte to DB with error:", err)
  125. return nil, err
  126. }
  127. return result.Consume()
  128. })
  129. log.Println("cypher: ", sql)
  130. log.Println("props: ", props)
  131. return err
  132. }
  133. func nodeQuery(driver neo4j.Driver, Cypher string, DB string) ([]neo4j.Node, error) {
  134. var list []neo4j.Node
  135. session := driver.NewSession(neo4j.SessionConfig{AccessMode: neo4j.AccessModeRead})
  136. defer session.Close()
  137. _, err := session.ReadTransaction(func(tx neo4j.Transaction) (interface{}, error) {
  138. result, err := tx.Run(Cypher, nil)
  139. if err != nil {
  140. return nil, err
  141. }
  142. for result.Next() {
  143. record := result.Record()
  144. if value, ok := record.Get("p"); ok {
  145. node := value.(neo4j.Node)
  146. list = append(list, node)
  147. }
  148. }
  149. if err = result.Err(); err != nil {
  150. return nil, err
  151. }
  152. return list, result.Err()
  153. })
  154. if err != nil {
  155. log.Println("Read error:", err)
  156. }
  157. return list, err
  158. }
  159. func relationshipQuery(driver neo4j.Driver, Cypher string, DB string) ([]neo4j.Relationship, error) {
  160. var list []neo4j.Relationship
  161. session := driver.NewSession(neo4j.SessionConfig{AccessMode: neo4j.AccessModeRead})
  162. defer session.Close()
  163. _, err := session.ReadTransaction(func(tx neo4j.Transaction) (interface{}, error) {
  164. result, err := tx.Run(Cypher, nil)
  165. if err != nil {
  166. log.Println("RelationshipQuery failed: ", err)
  167. return nil, err
  168. }
  169. for result.Next() {
  170. record := result.Record()
  171. if value, ok := record.Get("r"); ok {
  172. relationship := value.(neo4j.Relationship)
  173. list = append(list, relationship)
  174. }
  175. }
  176. if err = result.Err(); err != nil {
  177. return nil, err
  178. }
  179. return list, result.Err()
  180. })
  181. if err != nil {
  182. log.Println("Read error:", err)
  183. }
  184. return list, err
  185. }
  186. /*
  187. 设定分层
  188. 1.业务 app
  189. 2.接口 api
  190. 3.服务 service
  191. 4.进程/线程 process
  192. 5.容器 container
  193. 5.5 中间件 消息系统、数据库、缓存库
  194. 6.操作系统 os
  195. 7.云/设施
  196. */
  197. const (
  198. SPAN_KIND_INTERNAL = "SPAN_KIND_INTERNAL"
  199. SPAN_KIND_SERVER = "SPAN_KIND_SERVER"
  200. SPAN_KIND_CLIENT = "SPAN_KIND_CLIENT"
  201. )
  202. func mapToStruct(m map[string]interface{}, s interface{}) {
  203. structValue := reflect.ValueOf(s).Elem()
  204. structType := structValue.Type()
  205. for i := 0; i < structValue.NumField(); i++ {
  206. field := structValue.Field(i)
  207. fieldType := structType.Field(i)
  208. // 检查 map 中是否存在结构字段对应的键
  209. value, ok := m[fieldType.Name]
  210. if !ok {
  211. continue
  212. }
  213. // 将 map 中的值转换为结构字段的类型
  214. fieldValue := reflect.ValueOf(value)
  215. if fieldValue.Type().AssignableTo(field.Type()) {
  216. field.Set(fieldValue)
  217. } else {
  218. fmt.Printf("Type mismatch for field %s\n", fieldType.Name)
  219. }
  220. }
  221. }
  222. type TraceInfo struct {
  223. Timestamp string `json:"Timestamp"`
  224. TraceId string `json:"TraceId"`
  225. AppAlias string `json:"AppAlias"`
  226. SpanKind string `json:"SpanKind"`
  227. ServiceName string `json:"ServiceName"`
  228. ResourceAttributes map[string]string `json:"ResourceAttributes"`
  229. SpanAttributes map[string]string `json:"SpanAttributes"`
  230. }
  231. // func (ti *TraceInfo) appHandler(api map[string]interface{}) {
  232. // if value, ok := ti.SpanAttributes["rpc.method"]; ok {
  233. // api["rpc.method"] = value
  234. // }
  235. // api["app_alias"] = ti.AppAlias
  236. // api["service_name"] = ti.ServiceName
  237. // api["timestamp"] = ti.Timestamp
  238. // return
  239. // }
  240. func (ti *TraceInfo) graphHandler(n, e map[string]map[string]interface{}) {
  241. /*
  242. {
  243. "nodes": [
  244. {
  245. "id": "8b6a17f2-eb82-4340-abc2-fd24c57bdd7a",
  246. "_name": "Juan Hirthe",
  247. "_phone": "474-339-7595",
  248. "_email": "Selmer18@yahoo.com",
  249. "user_type": "Others",
  250. "total_pay_amt": 0,
  251. "total_receive_amt": 55000
  252. },
  253. {
  254. "id": "31da7f71-33f6-48f7-baff-ac88e8151a47",
  255. "_name": "Agnes Dach",
  256. "_phone": "1-587-272-2478 x88717",
  257. "_email": "Ilene.Kub@gmail.com",
  258. "user_type": "Others",
  259. "total_pay_amt": 17624,
  260. "total_receive_amt": 14045.37
  261. }],
  262. "edges": [
  263. {
  264. "id": "618ff2ba-4c70-4e18-9825-ae2c140c94c1",
  265. "source": "f936adc7-f660-4694-9ac9-1bff7d468d6f",
  266. "target": "b21900c3-8e3c-43e9-9065-16b8bb40428a",
  267. "total_amt": 27000,
  268. "total_times": 2,
  269. "trade_type": "未知"
  270. },
  271. {
  272. "id": "09bb8cf1-fc11-464c-a8e0-c0d670d3d423",
  273. "source": "b21900c3-8e3c-43e9-9065-16b8bb40428a",
  274. "target": "f936adc7-f660-4694-9ac9-1bff7d468d6f",
  275. "total_amt": 233543,
  276. "total_times": 7,
  277. "trade_type": "未知"
  278. }]
  279. */
  280. //处理节点
  281. appProps := make(map[string]interface{})
  282. ti.appGraphHandler(appProps)
  283. log.Println("app props: ", appProps)
  284. n[fmt.Sprintf("%v", (appProps["name"]))] = appProps
  285. apiProps := make(map[string]interface{})
  286. ti.apiGraphHandler(apiProps)
  287. log.Println("api props: ", apiProps)
  288. n[fmt.Sprintf("%v", (apiProps["name"]))] = apiProps
  289. serviceProps := make(map[string]interface{})
  290. ti.serviceGraphHandler(serviceProps)
  291. log.Println("service props: ", serviceProps)
  292. n[fmt.Sprintf("%v", (serviceProps["name"]))] = serviceProps
  293. processProps := make(map[string]interface{})
  294. ti.processGraphHandler(processProps)
  295. log.Println("process props: ", processProps)
  296. n[fmt.Sprintf("%v", (processProps["name"]))] = processProps
  297. containerProps := make(map[string]interface{})
  298. ti.containerGraphHandler(containerProps)
  299. log.Println("container props: ", containerProps)
  300. n[fmt.Sprintf("%v", (containerProps["name"]))] = containerProps
  301. osProps := make(map[string]interface{})
  302. ti.osGraphHandler(osProps)
  303. log.Println("os props: ", osProps)
  304. n[fmt.Sprintf("%v", (osProps["name"]))] = osProps
  305. cloudProps := make(map[string]interface{})
  306. ti.cloudGraphHandler(cloudProps)
  307. log.Println("cloud props: ", cloudProps)
  308. n[fmt.Sprintf("%v", (cloudProps["name"]))] = cloudProps
  309. // 处理边
  310. e[fmt.Sprintf("%v+%v", (appProps["name"]), apiProps["name"])] = map[string]interface{}{
  311. "id": fmt.Sprintf("%v+%v", (appProps["name"]), apiProps["name"]),
  312. "source": appProps["name"],
  313. "target": apiProps["name"],
  314. "total_times": 3451,
  315. "edge_type": "INCLUDE",
  316. }
  317. e[fmt.Sprintf("%v+%v", (apiProps["name"]), serviceProps["name"])] = map[string]interface{}{
  318. "id": fmt.Sprintf("%v+%v", (apiProps["name"]), serviceProps["name"]),
  319. "source": apiProps["name"],
  320. "target": serviceProps["name"],
  321. "total_times": 872,
  322. "edge_type": "BELONG",
  323. }
  324. e[fmt.Sprintf("%v+%v", (serviceProps["name"]), processProps["name"])] = map[string]interface{}{
  325. "id": fmt.Sprintf("%v+%v", (serviceProps["name"]), processProps["name"]),
  326. "source": serviceProps["name"],
  327. "target": processProps["name"],
  328. "total_times": 72,
  329. "edge_type": "INCLUDE",
  330. }
  331. e[fmt.Sprintf("%v+%v", (processProps["name"]), containerProps["name"])] = map[string]interface{}{
  332. "id": fmt.Sprintf("%v+%v", (processProps["name"]), containerProps["name"]),
  333. "source": processProps["name"],
  334. "target": containerProps["name"],
  335. "total_times": 462,
  336. "edge_type": "IN",
  337. }
  338. e[fmt.Sprintf("%v+%v", (containerProps["name"]), osProps["name"])] = map[string]interface{}{
  339. "id": fmt.Sprintf("%v+%v", (containerProps["name"]), osProps["name"]),
  340. "source": containerProps["name"],
  341. "target": osProps["name"],
  342. "total_times": 187,
  343. "edge_type": "IN",
  344. }
  345. e[fmt.Sprintf("%v+%v", (osProps["name"]), cloudProps["name"])] = map[string]interface{}{
  346. "id": fmt.Sprintf("%v+%v", (osProps["name"]), cloudProps["name"]),
  347. "source": osProps["name"],
  348. "target": cloudProps["name"],
  349. "total_times": 95,
  350. "edge_type": "IN",
  351. }
  352. return
  353. }
  354. func (ti *TraceInfo) neo4jHandler(neo4jSession neo4j.Session) {
  355. appProps := make(map[string]interface{})
  356. ti.appHandler(appProps)
  357. log.Println("app props: ", appProps)
  358. // appSql := "CREATE (n:App) SET n = $props RETURN n"
  359. appSql := fmt.Sprintf("MERGE (n:App {name: '%s'}) ON CREATE SET n = $props RETURN n", appProps["name"])
  360. if err := cypherWrite(neo4jSession, appSql, map[string]interface{}{"props": appProps}); err != nil {
  361. log.Println("app cypher: ", err)
  362. }
  363. apiProps := make(map[string]interface{})
  364. ti.apiHandler(apiProps)
  365. log.Println("api props: ", apiProps)
  366. apiSql := fmt.Sprintf("MERGE (n:Api {name: '%s'}) ON CREATE SET n = $props RETURN n", apiProps["name"])
  367. // apiSql := "CREATE (n:Api) SET n = $props RETURN n"
  368. if err := cypherWrite(neo4jSession, apiSql, map[string]interface{}{"props": apiProps}); err != nil {
  369. log.Println("api cypher: ", err)
  370. }
  371. serviceProps := make(map[string]interface{})
  372. ti.serviceHandler(serviceProps)
  373. log.Println("service props: ", serviceProps)
  374. // serviceSql := "CREATE (n:Service) SET n = $props RETURN n"
  375. serviceSql := fmt.Sprintf("MERGE (n:Service {name: '%s'}) ON CREATE SET n = $props RETURN n", serviceProps["name"])
  376. if err := cypherWrite(neo4jSession, serviceSql, map[string]interface{}{"props": serviceProps}); err != nil {
  377. log.Println("service cypher: ", err)
  378. }
  379. processProps := make(map[string]interface{})
  380. ti.processHandler(processProps)
  381. log.Println("process props: ", processProps)
  382. // processSql := "CREATE (n:Process) SET n = $props RETURN n"
  383. processSql := fmt.Sprintf("MERGE (n:Process {name: '%s'}) ON CREATE SET n = $props RETURN n", processProps["name"])
  384. if err := cypherWrite(neo4jSession, processSql, map[string]interface{}{"props": processProps}); err != nil {
  385. log.Println("process cypher: ", err)
  386. }
  387. containerProps := make(map[string]interface{})
  388. ti.containerHandler(containerProps)
  389. log.Println("container props: ", containerProps)
  390. // containerSql := "CREATE (n:Container) SET n = $props RETURN n"
  391. containerSql := fmt.Sprintf("MERGE (n:Container {name: '%s'}) ON CREATE SET n = $props RETURN n", containerProps["name"])
  392. if err := cypherWrite(neo4jSession, containerSql, map[string]interface{}{"props": containerProps}); err != nil {
  393. log.Println("Container cypher: ", err)
  394. }
  395. osProps := make(map[string]interface{})
  396. ti.osHandler(osProps)
  397. log.Println("os props: ", osProps)
  398. osSql := fmt.Sprintf("MERGE (n:Os {name: '%s'}) ON CREATE SET n = $props RETURN n", osProps["name"])
  399. // osSql := "CREATE (n:Os) SET n = $props RETURN n"
  400. if err := cypherWrite(neo4jSession, osSql, map[string]interface{}{"props": osProps}); err != nil {
  401. log.Println("os cypher: ", err)
  402. }
  403. cloudProps := make(map[string]interface{})
  404. ti.cloudHandler(cloudProps)
  405. log.Println("cloud props: ", cloudProps)
  406. cloudSql := fmt.Sprintf("MERGE (n:Cloud {name: '%s'}) ON CREATE SET n = $props RETURN n", cloudProps["name"])
  407. // cloudSql := "CREATE (n:Cloud) SET n = $props RETURN n"
  408. if err := cypherWrite(neo4jSession, cloudSql, map[string]interface{}{"props": cloudProps}); err != nil {
  409. log.Println("cloud cypher: ", err)
  410. }
  411. var err error
  412. // 创建链接关系
  413. _, err = neo4jSession.Run("MATCH (n1:App {name: $name1}),(n2:Api {name: $name2}) MERGE (n1)-[r:INCLUDE {count: 3451}}]->(n2) RETURN r", map[string]interface{}{
  414. "name1": appProps["name"],
  415. "name2": apiProps["name"],
  416. })
  417. // _, err = neo4jSession.Run("MATCH (n1:App {name: $name1}),(n2:Service {name: $name2}) MERGE (n1)-[r:INCLUDE]->(n2) RETURN r", map[string]interface{}{
  418. // "name1": appProps["name"],
  419. // "name2": serviceProps["name"],
  420. // })
  421. _, err = neo4jSession.Run("MATCH (n1:Api {name: $name1}),(n2:Service {name: $name2}) MERGE (n1)-[r:BELONG {count: 345}]->(n2) RETURN r", map[string]interface{}{
  422. "name1": apiProps["name"],
  423. "name2": serviceProps["name"],
  424. })
  425. _, err = neo4jSession.Run("MATCH (n1:Service {name: $name1}),(n2:Process {name: $name2}) MERGE (n1)-[r:INCLUDE{count: 876}]->(n2) RETURN r", map[string]interface{}{
  426. "name1": serviceProps["name"],
  427. "name2": processProps["name"],
  428. })
  429. _, err = neo4jSession.Run("MATCH (n1:Process {name: $name1}),(n2:Container {name: $name2}) MERGE (n1)-[r:IN{count: 426}]->(n2) RETURN r", map[string]interface{}{
  430. "name1": processProps["name"],
  431. "name2": containerProps["name"],
  432. })
  433. _, err = neo4jSession.Run("MATCH (n1:Container {name: $name1}),(n2:Os {name: $name2}) MERGE (n1)-[r:IN{count: 187}]->(n2) RETURN r", map[string]interface{}{
  434. "name1": containerProps["name"],
  435. "name2": osProps["name"],
  436. })
  437. _, err = neo4jSession.Run("MATCH (n1:Os {name: $name1}),(n2:Cloud {name: $name2}) MERGE (n1)-[r:IN{count: 39}]->(n2) RETURN r", map[string]interface{}{
  438. "name1": osProps["name"],
  439. "name2": cloudProps["name"],
  440. })
  441. if err != nil {
  442. panic(err)
  443. }
  444. }
  445. func (ti *TraceInfo) appHandler(app map[string]interface{}) {
  446. app["name"] = ti.AppAlias
  447. app["app_alias"] = ti.AppAlias
  448. app["timestamp"] = ti.Timestamp
  449. app["apdex"] = 0.97
  450. return
  451. }
  452. func (ti *TraceInfo) appGraphHandler(app map[string]interface{}) {
  453. app["name"] = ti.AppAlias
  454. app["app_alias"] = ti.AppAlias
  455. app["timestamp"] = ti.Timestamp
  456. app["apdex"] = 0.97
  457. app["id"] = ti.AppAlias
  458. app["node_type"] = "app"
  459. return
  460. }
  461. func (ti *TraceInfo) apiHandler(api map[string]interface{}) {
  462. if value, ok := ti.SpanAttributes["rpc.method"]; ok {
  463. api["rpc.method"] = value
  464. api["name"] = value
  465. }
  466. if value, ok := ti.SpanAttributes["code.function"]; ok {
  467. api["code.function"] = value
  468. api["name"] = value
  469. }
  470. if value, ok := ti.SpanAttributes["http.url"]; ok {
  471. api["http.url"] = value
  472. api["name"] = value
  473. }
  474. if value, ok := ti.SpanAttributes["http.route"]; ok {
  475. api["http.route"] = value
  476. api["name"] = value
  477. }
  478. if value, ok := ti.SpanAttributes["http.method"]; ok {
  479. api["http.method"] = value
  480. } else if value, ok := ti.SpanAttributes["http.request.method"]; ok {
  481. api["http.method"] = value
  482. }
  483. if value, ok := ti.SpanAttributes["http.client_ip"]; ok {
  484. api["http.client_ip"] = value
  485. }
  486. if value, ok := ti.SpanAttributes["http.target"]; ok {
  487. api["http.target"] = value
  488. }
  489. if value, ok := ti.SpanAttributes["net.host.name"]; ok {
  490. api["net.host.name"] = value
  491. }
  492. if value, ok := ti.SpanAttributes["net.peer.name"]; ok {
  493. api["net.peer.name"] = value
  494. }
  495. api["span_kind"] = ti.SpanKind
  496. api["app_alias"] = ti.AppAlias
  497. api["service_name"] = ti.ServiceName
  498. api["timestamp"] = ti.Timestamp
  499. api["apdex"] = generateRandomFloat(0.7, 1.0)
  500. api["err_rate"] = generateRandomFloat(0.8, 1.0)
  501. api["latency"] = generateRandomFloat(0.01, 12)
  502. return
  503. }
  504. func (ti *TraceInfo) apiGraphHandler(api map[string]interface{}) {
  505. api["name"] = "unknow-api"
  506. if value, ok := ti.SpanAttributes["rpc.method"]; ok {
  507. api["rpc.method"] = value
  508. api["name"] = value
  509. }
  510. if value, ok := ti.SpanAttributes["code.function"]; ok {
  511. api["code.function"] = value
  512. api["name"] = value
  513. }
  514. if value, ok := ti.SpanAttributes["http.url"]; ok {
  515. api["http.url"] = value
  516. api["name"] = value
  517. }
  518. if value, ok := ti.SpanAttributes["http.route"]; ok {
  519. api["http.route"] = value
  520. api["name"] = value
  521. }
  522. if value, ok := ti.SpanAttributes["http.method"]; ok {
  523. api["http.method"] = value
  524. }
  525. if value, ok := ti.SpanAttributes["http.request.method"]; ok {
  526. api["http.method"] = value
  527. }
  528. if value, ok := ti.SpanAttributes["http.client_ip"]; ok {
  529. api["http.client_ip"] = value
  530. }
  531. if value, ok := ti.SpanAttributes["http.target"]; ok {
  532. api["http.target"] = value
  533. }
  534. if value, ok := ti.SpanAttributes["net.host.name"]; ok {
  535. api["net.host.name"] = value
  536. }
  537. if value, ok := ti.SpanAttributes["net.peer.name"]; ok {
  538. api["net.peer.name"] = value
  539. }
  540. api["span_kind"] = ti.SpanKind
  541. api["app_alias"] = ti.AppAlias
  542. api["service_name"] = ti.ServiceName
  543. api["timestamp"] = ti.Timestamp
  544. api["apdex"] = generateRandomFloat(0.7, 1.0)
  545. api["err_rate"] = generateRandomFloat(0.8, 1.0)
  546. api["latency"] = generateRandomFloat(0.01, 12)
  547. api["id"] = api["name"]
  548. api["node_type"] = "api"
  549. return
  550. }
  551. func (ti *TraceInfo) serviceHandler(service map[string]interface{}) {
  552. service["service_name"] = ti.ServiceName
  553. service["name"] = ti.ServiceName
  554. if value, ok := ti.ResourceAttributes["process.runtime.name"]; ok {
  555. service["process_name"] = value
  556. }
  557. service["timestamp"] = ti.Timestamp
  558. service["apdex"] = generateRandomFloat(0.6, 1.0)
  559. return
  560. }
  561. func (ti *TraceInfo) serviceGraphHandler(service map[string]interface{}) {
  562. service["service_name"] = ti.ServiceName
  563. service["name"] = ti.ServiceName
  564. if value, ok := ti.ResourceAttributes["process.runtime.name"]; ok {
  565. service["process_name"] = value
  566. }
  567. service["timestamp"] = ti.Timestamp
  568. service["apdex"] = generateRandomFloat(0.6, 1.0)
  569. service["id"] = ti.ServiceName
  570. service["node_type"] = "service"
  571. return
  572. }
  573. func (ti *TraceInfo) processHandler(process map[string]interface{}) {
  574. if value, ok := ti.ResourceAttributes["process.runtime.name"]; ok {
  575. process["process_name"] = value
  576. }
  577. if value, ok := ti.ResourceAttributes["process.runtime.description"]; ok {
  578. process["process.runtime.description"] = value
  579. }
  580. if value, ok := ti.ResourceAttributes["process.executable.path"]; ok {
  581. process["process.executable.path"] = value
  582. }
  583. if value, ok := ti.ResourceAttributes["process.command_line"]; ok {
  584. process["process.command_line"] = value
  585. process["name"] = value
  586. }
  587. if value, ok := ti.ResourceAttributes["process.runtime.version"]; ok {
  588. process["process.runtime.version"] = value
  589. }
  590. if value, ok := ti.ResourceAttributes["process.command"]; ok {
  591. process["process.command"] = value
  592. }
  593. if value, ok := ti.ResourceAttributes["container.id"]; ok {
  594. process["container.id"] = value
  595. }
  596. if value, ok := ti.ResourceAttributes["k8s.pod.name"]; ok {
  597. process["pod.name"] = value
  598. }
  599. process["timestamp"] = ti.Timestamp
  600. return
  601. /*
  602. 原有的process运行在某个pod中,当发生重新调度或者重新部署后,pod中的容器会重建,
  603. 相当于断开了之前process与容器的关系,所以,当有该process信息解析时,需要判断是否是之前的container,
  604. 如果有对端的container长时间未更新,则有可能是不再调度。但是这个条件不足于支撑直接删除该container链接关系,
  605. 还需要有其他的条件做支撑,暂时不删除关联,等找到这个支撑条件再处理 TODO:
  606. */
  607. }
  608. func (ti *TraceInfo) processGraphHandler(process map[string]interface{}) {
  609. process["name"] = "unknow-process"
  610. if value, ok := ti.ResourceAttributes["process.runtime.name"]; ok {
  611. process["process_name"] = value
  612. }
  613. if value, ok := ti.ResourceAttributes["process.runtime.description"]; ok {
  614. process["process.runtime.description"] = value
  615. }
  616. if value, ok := ti.ResourceAttributes["process.executable.path"]; ok {
  617. process["process.executable.path"] = value
  618. }
  619. if value, ok := ti.ResourceAttributes["process.command"]; ok {
  620. process["process.command"] = value
  621. process["name"] = value
  622. }
  623. if value, ok := ti.ResourceAttributes["process.command_args"]; ok {
  624. process["process.command_args"] = value
  625. process["name"] = value
  626. }
  627. if value, ok := ti.ResourceAttributes["process.command_line"]; ok {
  628. process["process.command_line"] = value
  629. process["name"] = value
  630. }
  631. if value, ok := ti.ResourceAttributes["process.runtime.version"]; ok {
  632. process["process.runtime.version"] = value
  633. }
  634. if value, ok := ti.ResourceAttributes["container.id"]; ok {
  635. process["container.id"] = value
  636. }
  637. if value, ok := ti.ResourceAttributes["k8s.pod.name"]; ok {
  638. process["pod.name"] = value
  639. }
  640. process["timestamp"] = ti.Timestamp
  641. process["id"] = process["name"]
  642. process["node_type"] = "process"
  643. if process["name"] == "unknow-process" {
  644. process["trace_id"] = ti.TraceId
  645. process["id"] = process["container.id"]
  646. }
  647. return
  648. /*
  649. 原有的process运行在某个pod中,当发生重新调度或者重新部署后,pod中的容器会重建,
  650. 相当于断开了之前process与容器的关系,所以,当有该process信息解析时,需要判断是否是之前的container,
  651. 如果有对端的container长时间未更新,则有可能是不再调度。但是这个条件不足于支撑直接删除该container链接关系,
  652. 还需要有其他的条件做支撑,暂时不删除关联,等找到这个支撑条件再处理 TODO:
  653. */
  654. }
  655. func (ti *TraceInfo) containerHandler(container map[string]interface{}) {
  656. container["timestamp"] = ti.Timestamp
  657. if value, ok := ti.ResourceAttributes["container.id"]; ok {
  658. container["container.id"] = value
  659. }
  660. if value, ok := ti.ResourceAttributes["k8s.namespace.name"]; ok {
  661. container["k8s.namespace.name"] = value
  662. }
  663. if value, ok := ti.ResourceAttributes["k8s.deployment.name"]; ok {
  664. container["k8s.deployment.name"] = value
  665. }
  666. if value, ok := ti.ResourceAttributes["k8s.node.name"]; ok {
  667. container["k8s.node.name"] = value
  668. }
  669. if value, ok := ti.ResourceAttributes["k8s.pod.name"]; ok {
  670. container["pod.name"] = value
  671. container["name"] = value
  672. }
  673. if value, ok := ti.ResourceAttributes["k8s.pod.ip"]; ok {
  674. container["pod.ip"] = value
  675. }
  676. return
  677. }
  678. func (ti *TraceInfo) containerGraphHandler(container map[string]interface{}) {
  679. container["timestamp"] = ti.Timestamp
  680. if value, ok := ti.ResourceAttributes["container.id"]; ok {
  681. container["container.id"] = value
  682. }
  683. if value, ok := ti.ResourceAttributes["k8s.namespace.name"]; ok {
  684. container["k8s.namespace.name"] = value
  685. }
  686. if value, ok := ti.ResourceAttributes["k8s.deployment.name"]; ok {
  687. container["k8s.deployment.name"] = value
  688. }
  689. if value, ok := ti.ResourceAttributes["k8s.node.name"]; ok {
  690. container["k8s.node.name"] = value
  691. }
  692. if value, ok := ti.ResourceAttributes["k8s.pod.name"]; ok {
  693. container["pod.name"] = value
  694. container["name"] = value
  695. }
  696. if value, ok := ti.ResourceAttributes["k8s.pod.ip"]; ok {
  697. container["pod.ip"] = value
  698. }
  699. container["id"] = container["name"]
  700. container["node_type"] = "container"
  701. return
  702. }
  703. func (ti *TraceInfo) osHandler(os map[string]interface{}) {
  704. os["timestamp"] = ti.Timestamp
  705. if value, ok := ti.ResourceAttributes["k8s.node.name"]; ok {
  706. os["k8s.node.name"] = value
  707. os["name"] = value
  708. }
  709. if value, ok := ti.ResourceAttributes["host.arch"]; ok {
  710. os["host.arch"] = value
  711. }
  712. if value, ok := ti.ResourceAttributes["os.description"]; ok {
  713. os["os.description"] = value
  714. }
  715. if value, ok := ti.ResourceAttributes["os.type"]; ok {
  716. os["os.type"] = value
  717. }
  718. if value, ok := ti.ResourceAttributes["os.id"]; ok {
  719. os["os.id"] = value
  720. }
  721. if value, ok := ti.ResourceAttributes["os.version"]; ok {
  722. os["os.version"] = value
  723. }
  724. if value, ok := ti.ResourceAttributes["hots.type"]; ok {
  725. os["host.type"] = value
  726. }
  727. if value, ok := ti.ResourceAttributes["cloud.platform"]; ok {
  728. os["cloud.platform"] = value
  729. }
  730. os["memory"] = generateRandomFloat(0.1, 0.8)
  731. os["disk"] = generateRandomFloat(0.1, 0.8)
  732. os["load"] = generateRandomFloat(0.01, 4.0)
  733. return
  734. }
  735. func (ti *TraceInfo) osGraphHandler(os map[string]interface{}) {
  736. os["timestamp"] = ti.Timestamp
  737. if value, ok := ti.ResourceAttributes["k8s.node.name"]; ok {
  738. os["k8s.node.name"] = value
  739. os["name"] = value
  740. }
  741. if value, ok := ti.ResourceAttributes["host.arch"]; ok {
  742. os["host.arch"] = value
  743. }
  744. if value, ok := ti.ResourceAttributes["os.description"]; ok {
  745. os["os.description"] = value
  746. }
  747. if value, ok := ti.ResourceAttributes["os.type"]; ok {
  748. os["os.type"] = value
  749. }
  750. if value, ok := ti.ResourceAttributes["os.id"]; ok {
  751. os["os.id"] = value
  752. }
  753. if value, ok := ti.ResourceAttributes["os.version"]; ok {
  754. os["os.version"] = value
  755. }
  756. if value, ok := ti.ResourceAttributes["hots.type"]; ok {
  757. os["host.type"] = value
  758. }
  759. if value, ok := ti.ResourceAttributes["cloud.platform"]; ok {
  760. os["cloud.platform"] = value
  761. }
  762. os["memory"] = generateRandomFloat(0.1, 0.8)
  763. os["disk"] = generateRandomFloat(0.1, 0.8)
  764. os["load"] = generateRandomFloat(0.01, 4.0)
  765. os["id"] = os["name"]
  766. os["node_type"] = "os"
  767. return
  768. }
  769. func (ti *TraceInfo) cloudHandler(cloud map[string]interface{}) {
  770. cloud["timestamp"] = ti.Timestamp
  771. if value, ok := ti.ResourceAttributes["cloud.platform"]; ok {
  772. cloud["cloud.platform"] = value
  773. cloud["name"] = value
  774. } else {
  775. cloud["name"] = "unkonw"
  776. }
  777. if value, ok := ti.ResourceAttributes["cloud.region"]; ok {
  778. cloud["cloud.region"] = value
  779. }
  780. if value, ok := ti.ResourceAttributes["cloud.provider"]; ok {
  781. cloud["cloud.provider"] = value
  782. }
  783. return
  784. }
  785. func (ti *TraceInfo) cloudGraphHandler(cloud map[string]interface{}) {
  786. cloud["timestamp"] = ti.Timestamp
  787. if value, ok := ti.ResourceAttributes["cloud.platform"]; ok {
  788. cloud["cloud.platform"] = value
  789. cloud["name"] = value
  790. } else {
  791. cloud["name"] = "unkonw"
  792. }
  793. if value, ok := ti.ResourceAttributes["cloud.region"]; ok {
  794. cloud["cloud.region"] = value
  795. }
  796. if value, ok := ti.ResourceAttributes["cloud.provider"]; ok {
  797. cloud["cloud.provider"] = value
  798. }
  799. cloud["id"] = cloud["name"]
  800. cloud["node_type"] = "cloud"
  801. return
  802. }
  803. func generateRandomFloat(min, max float64) float64 {
  804. randomFloat := min + rand.Float64()*(max-min)
  805. return math.Round(randomFloat*100) / 100 // 保留两位小数
  806. }