123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415 |
- /*
- Copyright © 2023 NAME HERE <EMAIL ADDRESS>
- */
- package biz
- import (
- "fmt"
- "go-admin/app/observe/models"
- "go-admin/app/observe/models/query"
- "go-admin/common/database"
- "go-admin/common/olap"
- "go-admin/common/opentelemetry"
- "go-admin/common/storage"
- "go-admin/common/utils"
- extConfig "go-admin/config"
- "strings"
- "time"
- "github.com/go-admin-team/go-admin-core/config/source/file"
- log "github.com/go-admin-team/go-admin-core/logger"
- "github.com/go-admin-team/go-admin-core/sdk"
- "github.com/go-admin-team/go-admin-core/sdk/config"
- "github.com/go-redis/redis/v7"
- "github.com/spf13/cobra"
- "gorm.io/gorm"
- "gorm.io/gorm/clause"
- )
- var configYml string
- var rebuilt bool
- var chdb *gorm.DB
- var mydb *gorm.DB
- var rdb *redis.Client
- // consumer/biz/bizCmd represents the consumer/biz/biz command
- var BizCmd = &cobra.Command{
- Use: "biz",
- Short: "生成业务",
- PreRun: func(cmd *cobra.Command, args []string) {
- config.ExtendConfig = &extConfig.ExtConfig
- config.Setup(
- file.NewSource(file.WithPath(configYml)),
- database.SetupWithoutOtel,
- storage.Setup,
- // olap.Setup,
- // opentelemetry.Setup,
- )
- },
- Run: func(cmd *cobra.Command, args []string) {
- run()
- },
- }
- func init() {
- BizCmd.PersistentFlags().StringVarP(&configYml, "config", "c", "config/settings.yml", "启动命令时指定的配置文件, 默认为config/settings.yml")
- BizCmd.Flags().BoolVar(&rebuilt, "rebuilt", false, "如果指定了这个参数,先清空biz相关表与缓存,来重建biz相关表数据")
- }
- func ReBuilt() error {
- if !rebuilt {
- return nil
- }
- return mydb.Transaction(func(tx *gorm.DB) error {
- if err := tx.Exec("truncate ot_biz").Error; err != nil {
- return err
- }
- if err := tx.Exec("truncate ot_biz_node").Error; err != nil {
- return err
- }
- if err := tx.Exec("truncate ot_biz_edge").Error; err != nil {
- return err
- }
- // if err := tx.Exec("truncate ot_biz_stats").Error; err != nil {
- // return err
- // }
- keys, err := rdb.Keys("observe__biz*").Result()
- if err != nil {
- return err
- }
- pipe := rdb.Pipeline()
- for _, key := range keys {
- pipe.Del(key)
- }
- if _, err = pipe.Exec(); err != nil {
- return err
- }
- log.Info("重建biz初始化数据库成功")
- return nil
- })
- }
- func getMysqlDB() *gorm.DB {
- host := "*"
- mysql := sdk.Runtime.GetDbByKey(host)
- if config.DatabasesConfig[host].Driver == "mysql" {
- mysql.Set("gorm:table_options", "ENGINE=InnoDB CHARSET=utf8mb4")
- }
- return mysql
- }
- func run() {
- rdb = config.GetRedisClient()
- mydb = getMysqlDB()
- chdb = olap.GetClickhouseOrm()
- if err := ReBuilt(); err != nil {
- panic(fmt.Sprintf("重建biz, 初始化失败: %s", err))
- }
- GenBiz()
- }
- type tinySpan struct {
- TraceId string `gorm:"column:TraceId;not null"`
- Timestamp time.Time `gorm:"column:Timestamp"`
- SpanId string `gorm:"column:SpanId;not null"`
- SpanName string `gorm:"column:SpanName;not null"`
- ParentSpanId string `gorm:"column:ParentSpanId;not null"`
- SpanKind string `gorm:"column:SpanKind;not null"`
- SpanAttributes map[string]string `gorm:"column:SpanAttributes;type:Map(LowCardinality(String), String);not null"`
- StatusCode string `gorm:"column:StatusCode;not null"`
- Duration int64 `gorm:"column:Duration;not null"`
- ServiceName string `gorm:"column:ServiceName;not null"`
- AppAlias string `gorm:"column:AppAlias;not null"`
- }
- func getSpanType(span tinySpan) string {
- spanAttrs := opentelemetry.NewSpanAttributes(span.SpanAttributes)
- if span.SpanKind == "SPAN_KIND_INTERNAL" {
- return "internal"
- } else if span.SpanKind == "SPAN_KIND_SERVER" {
- if spanAttrs.RpcSystem() != "" {
- return fmt.Sprintf("%s server", spanAttrs.RpcSystem())
- } else if spanAttrs.StatusCode() >= 200 {
- return "http server"
- }
- return "server"
- } else if span.SpanKind == "SPAN_KIND_CLIENT" {
- if spanAttrs.IsDB() {
- if spanAttrs.DbSystem() != "" {
- return fmt.Sprintf("%s client", spanAttrs.DbSystem())
- } else {
- return "database client"
- }
- } else if spanAttrs.RpcSystem() != "" {
- return fmt.Sprintf("%s client", spanAttrs.RpcSystem())
- } else if spanAttrs.StatusCode() >= 200 {
- return "http client"
- }
- return "client"
- } else if span.SpanKind == "SPAN_KIND_CONSUMER" || span.SpanKind == "SPAN_KIND_PRODUCER" {
- arr := strings.Split(span.SpanKind, "_")
- tail := strings.ToLower(arr[len(arr)-1])
- head := "messaging"
- if spanAttrs.MessagingSystem() != "" {
- head = spanAttrs.MessagingSystem()
- }
- return fmt.Sprintf("%s %s", head, tail)
- }
- return "unknown"
- }
- // 生成业务相关数据
- func GenBiz() {
- // 1. 获取当前时间前5到前6分钟内所有的AppAlias
- // 2. 针对每个AppAlias,取前n条TraceId
- // 3. 针对每个TraceId, 取出所有span 并 分析出边关系
- for {
- now := time.Now().Unix()
- start, end := now-360, now-300
- earliest := start - 600
- appAliases := []string{}
- if err := chdb.Debug().Table(models.TableNameTrace).Distinct("AppAlias").
- Where("Timestamp>=toDateTime(?) AND Timestamp<toDateTime(?)", start, end).
- Where("ParentSpanId=''").Pluck("AppAlias", &appAliases).Error; err != nil {
- log.Errorf("查询AppAlias失败: %s", err)
- continue
- }
- for _, appAlias := range appAliases {
- appId, _ := query.NewApp().Alias2ID(appAlias) // 在未在后台设置的情况下 appId可能存在0的情况
- traceIds := []string{}
- if err := chdb.Table(models.TableNameTrace).Distinct("TraceId").
- Where("Timestamp>=toDateTime(?) AND Timestamp<toDateTime(?)", start, end).
- Where("AppAlias", appAlias).
- Where("ParentSpanId=''").
- Where("SpanAttributes['db.system']=''"). // 过滤root span为sql查询的
- Limit(10).Pluck("TraceId", &traceIds).Error; err != nil {
- log.Errorf("查询TraceId失败: %s", err)
- continue
- }
- for _, traceId := range traceIds {
- tinySpans := []tinySpan{}
- if err := chdb.Table(models.TableNameTrace).Select([]string{
- "TraceId", "Timestamp", "SpanId", "SpanName", "ParentSpanId", "SpanKind", "SpanAttributes", "StatusCode", "Duration", "ServiceName", "AppAlias",
- }).
- Where("Timestamp>=toDateTime(?)", earliest).
- Where("TraceId", traceId).
- Order("TraceId ASC, Timestamp ASC").Find(&tinySpans).Error; err != nil {
- log.Errorf("查询spans失败:%s %s", traceId, err)
- continue
- }
- // 过滤不必要的仅有一个span的业务
- if len(tinySpans) == 1 {
- span := tinySpans[0]
- if span.SpanKind == "SPAN_KIND_INTERNAL" { // 这种业务看起来没有意义
- log.Debugf("跳过仅有一个span,且span kind为internal的数据:%s %s", span.SpanName, span.TraceId)
- continue
- }
- }
- tinySpanMap := map[string]tinySpan{}
- var rootSpanId string
- for _, ts := range tinySpans {
- tinySpanMap[ts.SpanId] = ts
- if ts.ParentSpanId == "" {
- rootSpanId = ts.SpanId
- }
- }
- if rootSpanId == "" {
- // 由于前端是通过ParentSpanId为空查找到,所以不会走到这里
- log.Errorf("未获取到root span: %s", traceId)
- continue
- }
- rootSpan := tinySpanMap[rootSpanId]
- // 获取bizId
- bizId, bizHash, err := query.NewBiz().CheckAddBiz(appId, appAlias, rootSpan.ServiceName, rootSpan.SpanName, rootSpan.SpanKind, rootSpan.SpanAttributes)
- if err != nil {
- log.Errorf("创建业务失败:%s", err)
- continue
- }
- edges := []models.BizEdge{}
- for _, tinySpan := range tinySpans {
- targetNode := genBizNode(appId, appAlias, tinySpan)
- targetId, err := query.NewBizNode().CheckAddBizNode(&targetNode)
- if err != nil {
- log.Errorf("添加目标结点失败:%s", err)
- continue
- }
- if tinySpan.ParentSpanId == "" {
- // 如果是root, 其上游添加一个源结点
- // 主要用于当只一个结点时 找不到边数据(因为结点不与业务直接关联了, 而边与业务直接关联)问题
- edges = append(edges, models.BizEdge{
- Source: 0,
- Target: targetId,
- Type: "",
- BizId: bizId,
- BizHash: bizHash,
- AppId: appId,
- AppAlias: appAlias,
- })
- continue
- }
- tinySpan2, ok := tinySpanMap[tinySpan.ParentSpanId]
- if !ok {
- log.Infof("未找到源结点:parent span id = %s", tinySpan.ParentSpanId)
- continue
- }
- sourceNode := genBizNode(appId, appAlias, tinySpan2)
- sourceId, err := query.NewBizNode().CheckAddBizNode(&sourceNode)
- if err != nil {
- log.Errorf("添加源结点失败:%s", err)
- continue
- }
- if sourceId == targetId {
- log.Warnf("特殊情况,源结点与目标结点一致,暂时忽略:%s %s", tinySpan.SpanId, tinySpan2.SpanId)
- continue
- }
- edges = append(edges, models.BizEdge{
- Source: sourceId,
- Target: targetId,
- Type: getRelation(tinySpan),
- BizId: bizId,
- BizHash: bizHash,
- AppId: appId,
- AppAlias: appAlias,
- })
- }
- log.Infof("trace id: %s, edge len: %d", traceId, len(edges))
- edgeCreate(edges)
- }
- }
- diff := time.Now().Unix() - now
- log.Infof("耗时:%ds", diff)
- if diff < 60 {
- time.Sleep(time.Second * time.Duration(60-diff))
- }
- }
- }
- // 获取当前结点与下级结点间的关系
- func getRelation(span tinySpan) (relation string) {
- spanAttrs := opentelemetry.NewSpanAttributes(span.SpanAttributes)
- if span.SpanKind == "SPAN_KIND_INTERNAL" { // 当前span为internal, 则上级与当前的关系为internal
- relation = "internal"
- } else if span.SpanKind == "SPAN_KIND_CLIENT" { // 当前span为client, 上级可能为server或internal,与上级间的关系为internal
- relation = "internal"
- } else if span.SpanKind == "SPAN_KIND_SERVER" { // 当前span为server,上级必为client,由于当前span不是root,所以上级必定存在
- relation = "http"
- if span.SpanAttributes["rpc.system"] != "" {
- relation = span.SpanAttributes["rpc.system"]
- }
- } else if span.SpanKind == "SPAN_KIND_CONSUMER" { // 当前span为consumer,上级必为producer
- // relation = span.SpanAttributes["messaging.system"]
- relation = fmt.Sprintf("%s(%s)", spanAttrs.MessagingSystem(), spanAttrs.MessagingOperation())
- } else if span.SpanKind == "SPAN_KIND_PRODUCER" { // 当前span为producer, 上级可能为server或internal,与上级的关系肯定为内部调用
- // relation = "internal"
- // 为了方便后面清洗node与edge,将当前span与producer span之间的关系定义为 system(operation)
- relation = fmt.Sprintf("%s(%s)", spanAttrs.MessagingSystem(), spanAttrs.MessagingOperation())
- } else {
- relation = "unknown"
- log.Warnf("未识别的上下游关系:%s", spanAttrs)
- }
- return
- }
- func genBizNode(appId int64, appAlias string, tinySpan tinySpan) models.BizNode {
- externalId := 0
- if tinySpan.SpanKind == "SPAN_KIND_SERVER" {
- spanAttrs := opentelemetry.NewSpanAttributes(tinySpan.SpanAttributes)
- if spanAttrs.RpcSystem() == "" && spanAttrs.StatusCode() >= 200 {
- externalId = query.NewUrlMapping().UrlMappingID(appAlias, tinySpan.ServiceName, spanAttrs.HttpMethod(), spanAttrs.HttpRoute())
- }
- }
- return models.BizNode{
- Name: tinySpan.SpanName,
- Hash: utils.SimpleHash(tinySpan.AppAlias, tinySpan.ServiceName, tinySpan.SpanName, tinySpan.SpanKind),
- // Type: getNodeType(source.Span, spanMap), // 废弃
- // 废弃一下两个字段, node不与业务关连,edge再与业务关联,因为不同业务可能会有相同的结点
- // BizID: bzId,
- // BizHash: bzHash,
- AppID: appId,
- AppAlias: appAlias,
- ExternalID: int64(externalId),
- ServiceName: tinySpan.ServiceName,
- SpanName: tinySpan.SpanName,
- SpanKind: tinySpan.SpanKind,
- SpanType: getSpanType(tinySpan),
- IsVirtual: 0,
- }
- }
- func (e EdgeRaw) EdgeType() string {
- return ""
- }
- type EdgeRaw struct {
- SourceAppAlias string
- SourceServiceName string
- SourceSpanName string
- SourceSpanKind string
- TargetAppAlias string
- TargetServiceName string
- TargetSpanName string
- TargetSpanKind string
- TargetSpanAttributes map[string]string
- }
- const edgeCacheKey = "observe__biz_edges"
- func edgeCreate(edges []models.BizEdge) {
- if len(edges) == 0 {
- return
- }
- newEdges := []models.BizEdge{}
- uniq := map[string]struct{}{}
- for _, edge := range edges {
- if !edgeExists(edge.AppAlias, edge.BizId, edge.Source, edge.Target) {
- key := fmt.Sprintf("%d-%d-%d", edge.Source, edge.Target, edge.AppId)
- if _, ok := uniq[key]; !ok {
- newEdges = append(newEdges, edge)
- uniq[key] = struct{}{}
- }
- }
- }
- if len(newEdges) == 0 {
- return
- }
- result := mydb.Model(&models.BizEdge{}).Clauses(clause.OnConflict{DoNothing: true}).Create(&newEdges)
- if result.Error != nil {
- log.Errorf("批量插入边数据失败:%s", result.Error)
- for _, edge := range newEdges {
- fmt.Println(edge.BizId, edge.Source, edge.Target)
- }
- return
- }
- pipe := rdb.Pipeline()
- for _, edge := range newEdges {
- pipe.SAdd(edgeCacheKey, edgeMember(edge.AppAlias, edge.BizId, edge.Source, edge.Target))
- }
- // pipe.Expire(edgeCacheKey, time.Minute)
- pipe.Exec()
- dur, _ := rdb.TTL(edgeCacheKey).Result()
- if dur == -1 {
- rdb.Expire(edgeCacheKey, time.Minute)
- }
- }
- // 检测edge是否存在
- func edgeExists(appAlias string, bizId, sourceId, targetId int64) bool {
- exists, _ := rdb.SIsMember(edgeCacheKey, edgeMember(appAlias, bizId, sourceId, targetId)).Result()
- if !exists {
- cnt := int64(0)
- mydb.Model(&models.BizEdge{}).Where("app_alias", appAlias).Where("biz_id", bizId).
- Where("source", sourceId).Where("target", targetId).Count(&cnt)
- if cnt > 0 {
- rdb.SAdd(edgeCacheKey, edgeMember(appAlias, bizId, sourceId, targetId))
- }
- return cnt > 0
- }
- return exists
- }
- func edgeMember(appAlias string, bizId, sourceId, targetId int64) string {
- return fmt.Sprintf("%s-%d-%d-%d", appAlias, bizId, sourceId, targetId)
- }
|