traceurl.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388
  1. /*
  2. Copyright © 2023 NAME HERE <EMAIL ADDRESS>
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package traceurl
  14. import (
  15. "context"
  16. "fmt"
  17. "go-admin/app/observe/models"
  18. "go-admin/app/observe/models/query"
  19. "go-admin/common/database"
  20. "go-admin/common/olap"
  21. "go-admin/common/opentelemetry"
  22. "go-admin/common/storage"
  23. "go-admin/common/utils"
  24. extConfig "go-admin/config"
  25. "strings"
  26. "sync"
  27. "time"
  28. log "github.com/go-admin-team/go-admin-core/logger"
  29. "github.com/panjf2000/ants/v2"
  30. "github.com/segmentio/kafka-go"
  31. // _ "net/http/pprof"
  32. "gorm.io/gorm"
  33. "github.com/go-admin-team/go-admin-core/config/source/file"
  34. "github.com/go-admin-team/go-admin-core/sdk"
  35. "github.com/go-admin-team/go-admin-core/sdk/config"
  36. "github.com/spf13/cobra"
  37. "go.opentelemetry.io/collector/pdata/ptrace"
  38. )
  39. var configYml string
  40. var chdb *gorm.DB
  41. var mydb *gorm.DB
  42. var chInsertInterval int64 = 3 // 默认每隔3秒插入一次数据
  43. // consumer/traceurl/traceurlCmd represents the consumer/traceurl/traceurl command
  44. var TraceUrlCmd = &cobra.Command{
  45. Use: "traceurl",
  46. Short: "消费otel-collector发送至kafka的数据到otel_traces_url表",
  47. Long: `具体逻辑...`,
  48. PreRun: func(cmd *cobra.Command, args []string) {
  49. config.ExtendConfig = &extConfig.ExtConfig
  50. config.Setup(
  51. file.NewSource(file.WithPath(configYml)),
  52. database.SetupWithoutOtel,
  53. storage.Setup,
  54. // olap.Setup,
  55. // opentelemetry.Setup,
  56. )
  57. },
  58. Run: func(cmd *cobra.Command, args []string) {
  59. run()
  60. },
  61. }
  62. func init() {
  63. TraceUrlCmd.PersistentFlags().StringVarP(&configYml, "config", "c", "config/settings.yml", "启动命令时指定的配置文件, 默认为config/settings.yml")
  64. }
  65. func run() {
  66. // runtime.GOMAXPROCS(1) // 限制 CPU 使用数,避免过载
  67. // runtime.SetMutexProfileFraction(1) // 开启对锁调用的跟踪
  68. // runtime.SetBlockProfileRate(1) // 开启对阻塞操作的跟踪
  69. // go func() {
  70. // // 启动一个 http server,注意 pprof 相关的 handler 已经自动注册过了
  71. // if err := http.ListenAndServe(":6060", nil); err != nil {
  72. // log.Fatal(err)
  73. // }
  74. // os.Exit(0)
  75. // }()
  76. if config.ApplicationConfig.Mode == "dev" {
  77. chInsertInterval = 10
  78. }
  79. chdb = olap.GetClickhouseOrm()
  80. mydb = getMysqlDB()
  81. var maxConsumerNum = extConfig.ExtConfig.Kafka.Consumers
  82. var wg sync.WaitGroup
  83. var p *ants.Pool
  84. var tg = taskGroup{}
  85. // tg.traceUrls = new(safeTraceUrls)
  86. // tg.traceUrls.slice = make([]models.TracesURLLocalBuf, 0, 2000)
  87. tg.trachUrlCh = make(chan models.TracesURLLocalBuf, 2000)
  88. // tg.appInfos = new(safeAppInfo)
  89. // tg.appInfos.mp = make(map[string]appInfo)
  90. wg.Add(1)
  91. // go tg.upTraceUrls(&wg)
  92. go tg.upUrlMappings(&wg)
  93. p, _ = ants.NewPool(maxConsumerNum, ants.WithPanicHandler(func(i interface{}) {
  94. log.Errorf("消费者 goroutine task 出现异常: %s, 当前共有%d个goroutine在运行", i, p.Running())
  95. wg.Add(1)
  96. if err := p.Submit(tg.task(&wg)); err == nil {
  97. log.Infof("重新提交任务成功,当前共计有%d个goroutine在运行", p.Running())
  98. } else {
  99. log.Errorf("重新提交任务失败: %s", err.Error())
  100. }
  101. }))
  102. defer p.Release()
  103. wg.Add(maxConsumerNum)
  104. for i := 0; i < maxConsumerNum; i++ {
  105. p.Submit(tg.task(&wg))
  106. }
  107. log.Infof("当前共计有%d个 goroutine task 在运行", p.Running())
  108. wg.Wait()
  109. }
  110. type taskGroup struct {
  111. // appInfos *safeAppInfo
  112. trachUrlCh chan models.TracesURLLocalBuf
  113. }
  114. func (tg *taskGroup) task(wg *sync.WaitGroup) func() {
  115. return func() {
  116. defer wg.Done()
  117. r := kafka.NewReader(kafka.ReaderConfig{
  118. Brokers: extConfig.ExtConfig.Kafka.Brokers,
  119. Topic: extConfig.ExtConfig.Kafka.Topic,
  120. MaxBytes: 10e6, // 10MB
  121. StartOffset: kafka.LastOffset,
  122. GroupID: "observe-consumer-group",
  123. })
  124. defer r.Close()
  125. var mAlias map[string]struct{} = map[string]struct{}{}
  126. var messageUnmarshaler ptrace.ProtoUnmarshaler
  127. for {
  128. // ruleModels := map[string]models.UrlMapping{}
  129. m, err := r.ReadMessage(context.Background())
  130. if err != nil {
  131. log.Errorf("消费kafka消息失败: %s", err)
  132. break
  133. }
  134. var td = ptrace.NewTraces()
  135. td, err = messageUnmarshaler.UnmarshalTraces(m.Value)
  136. if err != nil {
  137. log.Errorf("解析kafka消息失败: %s", err)
  138. break
  139. }
  140. for i := 0; i < td.ResourceSpans().Len(); i++ {
  141. spans := td.ResourceSpans().At(i)
  142. res := spans.Resource()
  143. resAttr := opentelemetry.NewResourceAttributes(&res)
  144. // var serviceName, appAlias string
  145. serviceName, err := resAttr.ServiceName()
  146. if err != nil {
  147. log.Debugf("未能获取到ServiceName: %s %s", spans.SchemaUrl(), err)
  148. }
  149. appAlias, err := resAttr.AppName()
  150. if err != nil {
  151. log.Debugf("ServiceName: %s获取appAlias失败, 使用默认值UNSET: %s", serviceName, err)
  152. appAlias = "UNSET"
  153. }
  154. mAlias[appAlias] = struct{}{}
  155. for j := 0; j < spans.ScopeSpans().Len(); j++ {
  156. rs := spans.ScopeSpans().At(j).Spans()
  157. for k := 0; k < rs.Len(); k++ {
  158. r := rs.At(k)
  159. if r.Kind() != ptrace.SpanKindServer {
  160. log.Debugf("span kind 为 %s, 跳过", r.Kind().String())
  161. continue
  162. }
  163. attrs2 := opentelemetry.NewSpanAttributesWithRaw(r.Attributes())
  164. if attrs2.HttpMethod() == "" {
  165. log.Debugf("非http请求 %s %s, 跳过", r.SpanID(), attrs2.RpcSystem())
  166. continue
  167. }
  168. if attrs2.RpcSystem() != "" && attrs2.RpcSystem() != "http" { // 兼容使用proto定义接口的情况
  169. log.Debugf("基于http的rpc请求 %s %s, 跳过", r.SpanID(), attrs2.RpcSystem())
  170. continue
  171. }
  172. // 由于 otel_traces_url使用物化视图生成,此处隐藏
  173. // route := attrs2.HttpRoute()
  174. // t := models.TracesURLLocalBuf{
  175. // Timestamp: r.EndTimestamp().AsTime(),
  176. // // AppID: int64(appInfo.Id), // todo 从 mysql库中获取,每隔一段时间更新一次
  177. // // AppName: appInfo.Name,
  178. // // Kind: kind,
  179. // // Name: kInfo.name,
  180. // Route: route,
  181. // Target: attrs2.HttpTarget(),
  182. // URL: attrs2.HttpUrl(),
  183. // Flavor: attrs2.HttpFlavor(),
  184. // Host: attrs2.HttpHost(),
  185. // Method: attrs2.HttpMethod(),
  186. // StatusCode: attrs2.StatusCode(),
  187. // Message: r.Status().Message(),
  188. // TraceID: r.TraceID().String(),
  189. // SpanID: r.SpanID().String(),
  190. // ServiceName: serviceName,
  191. // Duration: int64(r.EndTimestamp().AsTime().Sub(r.StartTimestamp().AsTime())),
  192. // AppAlias: appAlias,
  193. // }
  194. // tg.trachUrlCh <- t
  195. }
  196. }
  197. }
  198. }
  199. panic("kafka消费出现错误")
  200. }
  201. }
  202. func (tg *taskGroup) upTraceUrls(wg *sync.WaitGroup) {
  203. defer wg.Done()
  204. ticker := time.NewTicker(1 * time.Second)
  205. traceUrls := make([]models.TracesURLLocalBuf, 0)
  206. insert := func() {
  207. result := chdb.Model(&models.TracesURLLocalBuf{}).Create(&traceUrls)
  208. if result.Error != nil {
  209. log.Error("批量插入数据到clickhouse失败: %s", result.Error.Error())
  210. return
  211. }
  212. log.Infof("批量写入数据到clickhouse成功, count: %d", len(traceUrls))
  213. traceUrls = traceUrls[:0]
  214. }
  215. for {
  216. select {
  217. case traceUrl := <-tg.trachUrlCh:
  218. traceUrls = append(traceUrls, traceUrl)
  219. if len(traceUrls) >= 1000 {
  220. insert()
  221. }
  222. case <-ticker.C:
  223. if time.Now().Unix()%chInsertInterval == 0 && len(traceUrls) > 0 {
  224. insert()
  225. }
  226. }
  227. }
  228. }
  229. // 生成url mapping, 这种方式有个问题,就是对于入库时间与生成链路的时间相差较多时,可能会统计不到
  230. func (tg *taskGroup) upUrlMappings(wg *sync.WaitGroup) {
  231. defer wg.Done()
  232. seconds := 30
  233. ticker := time.NewTicker(time.Duration(seconds) * time.Second)
  234. type tinyUrlMapping struct {
  235. Method string
  236. Route string
  237. ServiceName string
  238. AppAlias string
  239. }
  240. for t := range ticker.C {
  241. log.Infof("触发 url mapping 更新: %s", t)
  242. urlMappings := []tinyUrlMapping{}
  243. fields := "Method, Route, ServiceName, AppAlias"
  244. if err := chdb.Model(&models.TracesURL{}).Distinct(fields).
  245. // 由于Timestamp不是入库时间,这里需要统计时间长一些
  246. Where(fmt.Sprintf("Timestamp>=now()-INTERVAL %d SECOND", seconds*10)).Find(&urlMappings).Error; err != nil {
  247. log.Errorf("获取route失败: %s", err)
  248. continue
  249. }
  250. log.Infof("获取 traces url 基础数据成功,共计%d条数据", len(urlMappings))
  251. svcNameMap := map[string]struct{}{}
  252. for _, um := range urlMappings {
  253. svcNameMap[um.ServiceName] = struct{}{}
  254. }
  255. serviceNames := make([]string, 0, len(svcNameMap))
  256. for svcName := range svcNameMap {
  257. serviceNames = append(serviceNames, svcName)
  258. }
  259. existsUrlMappings := []tinyUrlMapping{}
  260. err := mydb.Model(&models.UrlMapping{}).Where("service_name in ? and is_perfect_match=1", serviceNames).
  261. Distinct("app_alias, service_name, method, url as route").Scan(&existsUrlMappings).Error
  262. if err != nil {
  263. log.Errorf("获取已经存在的url mapping失败")
  264. }
  265. existsUrlMappingsMap := map[string]struct{}{}
  266. for _, um := range existsUrlMappings {
  267. existsUrlMappingsMap[fmt.Sprintf("%s-%s-%s-%s", um.AppAlias, um.ServiceName, um.Method, um.Route)] = struct{}{}
  268. }
  269. newUrlMappings := map[string]models.UrlMapping{}
  270. for _, um := range urlMappings {
  271. // 检测当前route是否存在
  272. _, exists := existsUrlMappingsMap[fmt.Sprintf("%s-%s-%s-%s", um.AppAlias, um.ServiceName, um.Method, um.Route)]
  273. if exists {
  274. continue
  275. }
  276. // if config.ApplicationConfig.Mode == "dev" {
  277. // exists, _ = query.NewUrlMapping().Exists(um.AppAlias, um.ServiceName, um.Method, um.Route)
  278. // } else {
  279. // exists, _ = query.NewUrlMapping().ExistsCache(um.AppAlias, um.ServiceName, um.Method, um.Route)
  280. // }
  281. // if exists {
  282. // mydb.Model(&models.UrlMapping{}).
  283. // Where("app_alias=? and service_name=? and method=? and url=?", um.AppAlias, um.ServiceName, um.Method, um.Route).
  284. // Update("is_perfect_match", 1)
  285. // continue
  286. // }
  287. log.Infof("AppAlias: %s, ServiceName: %s, Method: %s, Route: %s 在url mapping中不存在", um.AppAlias, um.ServiceName, um.Method, um.Route)
  288. // 如果route不存在
  289. arr := strings.Split(um.Route, "/")
  290. key := utils.SimpleHash(um.AppAlias, um.ServiceName, um.Method, um.Route)
  291. appId, err := query.NewApp().Alias2ID(um.AppAlias)
  292. if err != nil {
  293. log.Errorf("未找到%s的app id", um.AppAlias)
  294. continue
  295. }
  296. name := strings.ReplaceAll(strings.Title(strings.Join(arr, " ")), " ", "")
  297. if um.Route == "/" {
  298. name = "ROOT"
  299. }
  300. newUrlMappings[key] = models.UrlMapping{
  301. AppId: appId,
  302. AppAlias: um.AppAlias,
  303. Name: name,
  304. Url: um.Route,
  305. Method: um.Method,
  306. ServiceName: um.ServiceName,
  307. Level: int32(len(arr) - 1),
  308. IsPerfectMatch: 1,
  309. }
  310. for len(arr) > 1 {
  311. arr = arr[:len(arr)-1]
  312. name := strings.ReplaceAll(strings.Title(strings.Join(arr, " ")), " ", "")
  313. subroute := strings.Join(arr, "/")
  314. if subroute == "" {
  315. subroute = "/"
  316. name = "ROOT"
  317. }
  318. // 检测subroute是否存在, 不存在则insert
  319. exists := false
  320. if config.ApplicationConfig.Mode == "dev" {
  321. exists, _ = query.NewUrlMapping().Exists(um.AppAlias, um.ServiceName, um.Method, subroute)
  322. } else {
  323. exists, _ = query.NewUrlMapping().ExistsCache(um.AppAlias, um.ServiceName, um.Method, subroute)
  324. }
  325. if exists {
  326. continue
  327. }
  328. level := len(arr) - 1
  329. key := utils.SimpleHash(um.AppAlias, um.ServiceName, um.Method, subroute)
  330. newUrlMappings[key] = models.UrlMapping{
  331. AppId: appId,
  332. AppAlias: um.AppAlias,
  333. Name: name,
  334. Url: subroute,
  335. Method: um.Method,
  336. ServiceName: um.ServiceName,
  337. Level: int32(level),
  338. IsPerfectMatch: 0,
  339. }
  340. }
  341. }
  342. if len(newUrlMappings) > 0 {
  343. data := make([]models.UrlMapping, 0, len(newUrlMappings))
  344. for _, um := range newUrlMappings {
  345. data = append(data, um)
  346. }
  347. result := mydb.Create(data)
  348. if err := result.Error; err != nil {
  349. log.Errorf("批量插入url mapping失败: %s", err)
  350. } else {
  351. log.Infof("批量插入url mapping成功, 共%d条", result.RowsAffected)
  352. }
  353. }
  354. }
  355. }
  356. func getMysqlDB() *gorm.DB {
  357. host := "*"
  358. mysql := sdk.Runtime.GetDbByKey(host)
  359. if config.DatabasesConfig[host].Driver == "mysql" {
  360. mysql.Set("gorm:table_options", "ENGINE=InnoDB CHARSET=utf8mb4")
  361. }
  362. return mysql
  363. }