123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388 |
- /*
- Copyright © 2023 NAME HERE <EMAIL ADDRESS>
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- package traceurl
- import (
- "context"
- "fmt"
- "go-admin/app/observe/models"
- "go-admin/app/observe/models/query"
- "go-admin/common/database"
- "go-admin/common/olap"
- "go-admin/common/opentelemetry"
- "go-admin/common/storage"
- "go-admin/common/utils"
- extConfig "go-admin/config"
- "strings"
- "sync"
- "time"
- log "github.com/go-admin-team/go-admin-core/logger"
- "github.com/panjf2000/ants/v2"
- "github.com/segmentio/kafka-go"
- // _ "net/http/pprof"
- "gorm.io/gorm"
- "github.com/go-admin-team/go-admin-core/config/source/file"
- "github.com/go-admin-team/go-admin-core/sdk"
- "github.com/go-admin-team/go-admin-core/sdk/config"
- "github.com/spf13/cobra"
- "go.opentelemetry.io/collector/pdata/ptrace"
- )
- var configYml string
- var chdb *gorm.DB
- var mydb *gorm.DB
- var chInsertInterval int64 = 3 // 默认每隔3秒插入一次数据
- // consumer/traceurl/traceurlCmd represents the consumer/traceurl/traceurl command
- var TraceUrlCmd = &cobra.Command{
- Use: "traceurl",
- Short: "消费otel-collector发送至kafka的数据到otel_traces_url表",
- Long: `具体逻辑...`,
- PreRun: func(cmd *cobra.Command, args []string) {
- config.ExtendConfig = &extConfig.ExtConfig
- config.Setup(
- file.NewSource(file.WithPath(configYml)),
- database.SetupWithoutOtel,
- storage.Setup,
- // olap.Setup,
- // opentelemetry.Setup,
- )
- },
- Run: func(cmd *cobra.Command, args []string) {
- run()
- },
- }
- func init() {
- TraceUrlCmd.PersistentFlags().StringVarP(&configYml, "config", "c", "config/settings.yml", "启动命令时指定的配置文件, 默认为config/settings.yml")
- }
- func run() {
- // runtime.GOMAXPROCS(1) // 限制 CPU 使用数,避免过载
- // runtime.SetMutexProfileFraction(1) // 开启对锁调用的跟踪
- // runtime.SetBlockProfileRate(1) // 开启对阻塞操作的跟踪
- // go func() {
- // // 启动一个 http server,注意 pprof 相关的 handler 已经自动注册过了
- // if err := http.ListenAndServe(":6060", nil); err != nil {
- // log.Fatal(err)
- // }
- // os.Exit(0)
- // }()
- if config.ApplicationConfig.Mode == "dev" {
- chInsertInterval = 10
- }
- chdb = olap.GetClickhouseOrm()
- mydb = getMysqlDB()
- var maxConsumerNum = extConfig.ExtConfig.Kafka.Consumers
- var wg sync.WaitGroup
- var p *ants.Pool
- var tg = taskGroup{}
- // tg.traceUrls = new(safeTraceUrls)
- // tg.traceUrls.slice = make([]models.TracesURLLocalBuf, 0, 2000)
- tg.trachUrlCh = make(chan models.TracesURLLocalBuf, 2000)
- // tg.appInfos = new(safeAppInfo)
- // tg.appInfos.mp = make(map[string]appInfo)
- wg.Add(1)
- // go tg.upTraceUrls(&wg)
- go tg.upUrlMappings(&wg)
- p, _ = ants.NewPool(maxConsumerNum, ants.WithPanicHandler(func(i interface{}) {
- log.Errorf("消费者 goroutine task 出现异常: %s, 当前共有%d个goroutine在运行", i, p.Running())
- wg.Add(1)
- if err := p.Submit(tg.task(&wg)); err == nil {
- log.Infof("重新提交任务成功,当前共计有%d个goroutine在运行", p.Running())
- } else {
- log.Errorf("重新提交任务失败: %s", err.Error())
- }
- }))
- defer p.Release()
- wg.Add(maxConsumerNum)
- for i := 0; i < maxConsumerNum; i++ {
- p.Submit(tg.task(&wg))
- }
- log.Infof("当前共计有%d个 goroutine task 在运行", p.Running())
- wg.Wait()
- }
- type taskGroup struct {
- // appInfos *safeAppInfo
- trachUrlCh chan models.TracesURLLocalBuf
- }
- func (tg *taskGroup) task(wg *sync.WaitGroup) func() {
- return func() {
- defer wg.Done()
- r := kafka.NewReader(kafka.ReaderConfig{
- Brokers: extConfig.ExtConfig.Kafka.Brokers,
- Topic: extConfig.ExtConfig.Kafka.Topic,
- MaxBytes: 10e6, // 10MB
- StartOffset: kafka.LastOffset,
- GroupID: "observe-consumer-group",
- })
- defer r.Close()
- var mAlias map[string]struct{} = map[string]struct{}{}
- var messageUnmarshaler ptrace.ProtoUnmarshaler
- for {
- // ruleModels := map[string]models.UrlMapping{}
- m, err := r.ReadMessage(context.Background())
- if err != nil {
- log.Errorf("消费kafka消息失败: %s", err)
- break
- }
- var td = ptrace.NewTraces()
- td, err = messageUnmarshaler.UnmarshalTraces(m.Value)
- if err != nil {
- log.Errorf("解析kafka消息失败: %s", err)
- break
- }
- for i := 0; i < td.ResourceSpans().Len(); i++ {
- spans := td.ResourceSpans().At(i)
- res := spans.Resource()
- resAttr := opentelemetry.NewResourceAttributes(&res)
- // var serviceName, appAlias string
- serviceName, err := resAttr.ServiceName()
- if err != nil {
- log.Debugf("未能获取到ServiceName: %s %s", spans.SchemaUrl(), err)
- }
- appAlias, err := resAttr.AppName()
- if err != nil {
- log.Debugf("ServiceName: %s获取appAlias失败, 使用默认值UNSET: %s", serviceName, err)
- appAlias = "UNSET"
- }
- mAlias[appAlias] = struct{}{}
- for j := 0; j < spans.ScopeSpans().Len(); j++ {
- rs := spans.ScopeSpans().At(j).Spans()
- for k := 0; k < rs.Len(); k++ {
- r := rs.At(k)
- if r.Kind() != ptrace.SpanKindServer {
- log.Debugf("span kind 为 %s, 跳过", r.Kind().String())
- continue
- }
- attrs2 := opentelemetry.NewSpanAttributesWithRaw(r.Attributes())
- if attrs2.HttpMethod() == "" {
- log.Debugf("非http请求 %s %s, 跳过", r.SpanID(), attrs2.RpcSystem())
- continue
- }
- if attrs2.RpcSystem() != "" && attrs2.RpcSystem() != "http" { // 兼容使用proto定义接口的情况
- log.Debugf("基于http的rpc请求 %s %s, 跳过", r.SpanID(), attrs2.RpcSystem())
- continue
- }
- // 由于 otel_traces_url使用物化视图生成,此处隐藏
- // route := attrs2.HttpRoute()
- // t := models.TracesURLLocalBuf{
- // Timestamp: r.EndTimestamp().AsTime(),
- // // AppID: int64(appInfo.Id), // todo 从 mysql库中获取,每隔一段时间更新一次
- // // AppName: appInfo.Name,
- // // Kind: kind,
- // // Name: kInfo.name,
- // Route: route,
- // Target: attrs2.HttpTarget(),
- // URL: attrs2.HttpUrl(),
- // Flavor: attrs2.HttpFlavor(),
- // Host: attrs2.HttpHost(),
- // Method: attrs2.HttpMethod(),
- // StatusCode: attrs2.StatusCode(),
- // Message: r.Status().Message(),
- // TraceID: r.TraceID().String(),
- // SpanID: r.SpanID().String(),
- // ServiceName: serviceName,
- // Duration: int64(r.EndTimestamp().AsTime().Sub(r.StartTimestamp().AsTime())),
- // AppAlias: appAlias,
- // }
- // tg.trachUrlCh <- t
- }
- }
- }
- }
- panic("kafka消费出现错误")
- }
- }
- func (tg *taskGroup) upTraceUrls(wg *sync.WaitGroup) {
- defer wg.Done()
- ticker := time.NewTicker(1 * time.Second)
- traceUrls := make([]models.TracesURLLocalBuf, 0)
- insert := func() {
- result := chdb.Model(&models.TracesURLLocalBuf{}).Create(&traceUrls)
- if result.Error != nil {
- log.Error("批量插入数据到clickhouse失败: %s", result.Error.Error())
- return
- }
- log.Infof("批量写入数据到clickhouse成功, count: %d", len(traceUrls))
- traceUrls = traceUrls[:0]
- }
- for {
- select {
- case traceUrl := <-tg.trachUrlCh:
- traceUrls = append(traceUrls, traceUrl)
- if len(traceUrls) >= 1000 {
- insert()
- }
- case <-ticker.C:
- if time.Now().Unix()%chInsertInterval == 0 && len(traceUrls) > 0 {
- insert()
- }
- }
- }
- }
- // 生成url mapping, 这种方式有个问题,就是对于入库时间与生成链路的时间相差较多时,可能会统计不到
- func (tg *taskGroup) upUrlMappings(wg *sync.WaitGroup) {
- defer wg.Done()
- seconds := 30
- ticker := time.NewTicker(time.Duration(seconds) * time.Second)
- type tinyUrlMapping struct {
- Method string
- Route string
- ServiceName string
- AppAlias string
- }
- for t := range ticker.C {
- log.Infof("触发 url mapping 更新: %s", t)
- urlMappings := []tinyUrlMapping{}
- fields := "Method, Route, ServiceName, AppAlias"
- if err := chdb.Model(&models.TracesURL{}).Distinct(fields).
- // 由于Timestamp不是入库时间,这里需要统计时间长一些
- Where(fmt.Sprintf("Timestamp>=now()-INTERVAL %d SECOND", seconds*10)).Find(&urlMappings).Error; err != nil {
- log.Errorf("获取route失败: %s", err)
- continue
- }
- log.Infof("获取 traces url 基础数据成功,共计%d条数据", len(urlMappings))
- svcNameMap := map[string]struct{}{}
- for _, um := range urlMappings {
- svcNameMap[um.ServiceName] = struct{}{}
- }
- serviceNames := make([]string, 0, len(svcNameMap))
- for svcName := range svcNameMap {
- serviceNames = append(serviceNames, svcName)
- }
- existsUrlMappings := []tinyUrlMapping{}
- err := mydb.Model(&models.UrlMapping{}).Where("service_name in ? and is_perfect_match=1", serviceNames).
- Distinct("app_alias, service_name, method, url as route").Scan(&existsUrlMappings).Error
- if err != nil {
- log.Errorf("获取已经存在的url mapping失败")
- }
- existsUrlMappingsMap := map[string]struct{}{}
- for _, um := range existsUrlMappings {
- existsUrlMappingsMap[fmt.Sprintf("%s-%s-%s-%s", um.AppAlias, um.ServiceName, um.Method, um.Route)] = struct{}{}
- }
- newUrlMappings := map[string]models.UrlMapping{}
- for _, um := range urlMappings {
- // 检测当前route是否存在
- _, exists := existsUrlMappingsMap[fmt.Sprintf("%s-%s-%s-%s", um.AppAlias, um.ServiceName, um.Method, um.Route)]
- if exists {
- continue
- }
- // if config.ApplicationConfig.Mode == "dev" {
- // exists, _ = query.NewUrlMapping().Exists(um.AppAlias, um.ServiceName, um.Method, um.Route)
- // } else {
- // exists, _ = query.NewUrlMapping().ExistsCache(um.AppAlias, um.ServiceName, um.Method, um.Route)
- // }
- // if exists {
- // mydb.Model(&models.UrlMapping{}).
- // Where("app_alias=? and service_name=? and method=? and url=?", um.AppAlias, um.ServiceName, um.Method, um.Route).
- // Update("is_perfect_match", 1)
- // continue
- // }
- log.Infof("AppAlias: %s, ServiceName: %s, Method: %s, Route: %s 在url mapping中不存在", um.AppAlias, um.ServiceName, um.Method, um.Route)
- // 如果route不存在
- arr := strings.Split(um.Route, "/")
- key := utils.SimpleHash(um.AppAlias, um.ServiceName, um.Method, um.Route)
- appId, err := query.NewApp().Alias2ID(um.AppAlias)
- if err != nil {
- log.Errorf("未找到%s的app id", um.AppAlias)
- continue
- }
- name := strings.ReplaceAll(strings.Title(strings.Join(arr, " ")), " ", "")
- if um.Route == "/" {
- name = "ROOT"
- }
- newUrlMappings[key] = models.UrlMapping{
- AppId: appId,
- AppAlias: um.AppAlias,
- Name: name,
- Url: um.Route,
- Method: um.Method,
- ServiceName: um.ServiceName,
- Level: int32(len(arr) - 1),
- IsPerfectMatch: 1,
- }
- for len(arr) > 1 {
- arr = arr[:len(arr)-1]
- name := strings.ReplaceAll(strings.Title(strings.Join(arr, " ")), " ", "")
- subroute := strings.Join(arr, "/")
- if subroute == "" {
- subroute = "/"
- name = "ROOT"
- }
- // 检测subroute是否存在, 不存在则insert
- exists := false
- if config.ApplicationConfig.Mode == "dev" {
- exists, _ = query.NewUrlMapping().Exists(um.AppAlias, um.ServiceName, um.Method, subroute)
- } else {
- exists, _ = query.NewUrlMapping().ExistsCache(um.AppAlias, um.ServiceName, um.Method, subroute)
- }
- if exists {
- continue
- }
- level := len(arr) - 1
- key := utils.SimpleHash(um.AppAlias, um.ServiceName, um.Method, subroute)
- newUrlMappings[key] = models.UrlMapping{
- AppId: appId,
- AppAlias: um.AppAlias,
- Name: name,
- Url: subroute,
- Method: um.Method,
- ServiceName: um.ServiceName,
- Level: int32(level),
- IsPerfectMatch: 0,
- }
- }
- }
- if len(newUrlMappings) > 0 {
- data := make([]models.UrlMapping, 0, len(newUrlMappings))
- for _, um := range newUrlMappings {
- data = append(data, um)
- }
- result := mydb.Create(data)
- if err := result.Error; err != nil {
- log.Errorf("批量插入url mapping失败: %s", err)
- } else {
- log.Infof("批量插入url mapping成功, 共%d条", result.RowsAffected)
- }
- }
- }
- }
- func getMysqlDB() *gorm.DB {
- host := "*"
- mysql := sdk.Runtime.GetDbByKey(host)
- if config.DatabasesConfig[host].Driver == "mysql" {
- mysql.Set("gorm:table_options", "ENGINE=InnoDB CHARSET=utf8mb4")
- }
- return mysql
- }
|