liubing 1 mês atrás
pai
commit
875622a93a

+ 25 - 0
rbac.yaml

@@ -0,0 +1,25 @@
+apiVersion: rbac.authorization.k8s.io/v1
+kind: Role
+metadata:
+  name: flink-role-for-default-sa-in-observe
+  namespace: observe
+rules:
+  - apiGroups: [""]
+    resources: ["pods"]
+    verbs: ["get", "list", "watch", "create", "delete"]
+
+---
+
+apiVersion: rbac.authorization.k8s.io/v1
+kind: RoleBinding
+metadata:
+  name: flink-role-binding-for-default-sa-in-observe
+  namespace: observe
+subjects:
+  - kind: ServiceAccount
+    name: default
+    namespace: observe
+roleRef:
+  kind: Role
+  name: flink-role-for-default-sa-in-observe
+  apiGroup: rbac.authorization.k8s.io/v1

+ 342 - 0
sql/metrics_table.sql

@@ -0,0 +1,342 @@
+create database ob_metrics;
+create table ob_metrics.public.service_metrics_15s
+(
+    timestamp timestamp,
+    app_name varchar(256),
+    service_name varchar(512),
+    apdex float,
+    http_code_300_count int,
+    http_code_400_count int,
+    http_code_500_count int,
+    span_status_error_count int,
+    exception_count int,
+    max_latency bigint,
+    min_latency bigint,
+    avg_latency bigint,
+    request_count bigint,
+    rpm bigint,
+    count_le_500 bigint,
+    count_le_1000 bigint,
+    count_le_5000 bigint,
+    quantile_50 bigint,
+    quantile_90 float,
+    quantile_95 float,
+    quantile_99 float
+);
+
+SELECT create_hypertable('service_metrics_15s', by_range('timestamp'));
+
+create table ob_metrics.public.service_instance_metrics_15s
+(
+    timestamp timestamp,
+    app_name varchar(256),
+    service_name varchar(512),
+    apdex float,
+    http_code_300_count int,
+    http_code_400_count int,
+    http_code_500_count int,
+    span_status_error_count int,
+    exception_count int,
+    max_latency bigint,
+    min_latency bigint,
+    avg_latency bigint,
+    request_count bigint,
+    rpm bigint,
+    count_le_500 bigint,
+    count_le_1000 bigint,
+    count_le_5000 bigint,
+    quantile_50 bigint,
+    quantile_90 float,
+    quantile_95 float,
+    quantile_99 float,
+    instanceID varchar(512)
+);
+
+SELECT create_hypertable('service_instance_metrics_15s', by_range('timestamp'));
+
+create table ob_metrics.public.endpoint_metrics_15s
+(
+    timestamp timestamp,
+    app_name varchar(256),
+    service_name varchar(512),
+    apdex float,
+    http_code_300_count int,
+    http_code_400_count int,
+    http_code_500_count int,
+    span_status_error_count int,
+    exception_count int,
+    max_latency bigint,
+    min_latency bigint,
+    avg_latency bigint,
+    request_count bigint,
+    rpm bigint,
+    count_le_500 bigint,
+    count_le_1000 bigint,
+    count_le_5000 bigint,
+    quantile_50 bigint,
+    quantile_90 float,
+    quantile_95 float,
+    quantile_99 float,
+    instanceID varchar(512),
+        endpoint varchar(512)
+);
+
+SELECT create_hypertable('endpoint_metrics_15s', by_range('timestamp'));
+
+
+create table if not exists ob_metrics.public.service_metrics_5m
+(
+    timestamp timestamp,
+    app_name varchar(256),
+    service_name varchar(512),
+    apdex float,
+    http_code_300_count int,
+    http_code_400_count int,
+    http_code_500_count int,
+    span_status_error_count int,
+    exception_count int,
+    max_latency bigint,
+    min_latency bigint,
+    avg_latency bigint,
+    request_count bigint,
+    rpm bigint,
+    count_le_500 bigint,
+    count_le_1000 bigint,
+    count_le_5000 bigint,
+    quantile_50 bigint,
+    quantile_90 float,
+    quantile_95 float,
+    quantile_99 float
+);
+
+SELECT create_hypertable('service_metrics_5m', by_range('timestamp'));
+
+create table if not exists ob_metrics.public.service_instance_metrics_5m
+(
+    timestamp timestamp,
+    app_name varchar(256),
+    service_name varchar(512),
+    apdex float,
+    http_code_300_count int,
+    http_code_400_count int,
+    http_code_500_count int,
+    span_status_error_count int,
+    exception_count int,
+    max_latency bigint,
+    min_latency bigint,
+    avg_latency bigint,
+    request_count bigint,
+    rpm bigint,
+    count_le_500 bigint,
+    count_le_1000 bigint,
+    count_le_5000 bigint,
+    quantile_50 bigint,
+    quantile_90 float,
+    quantile_95 float,
+    quantile_99 float,
+    instanceID varchar(512)
+);
+
+SELECT create_hypertable('service_instance_metrics_5m', by_range('timestamp'));
+
+create table if not exists ob_metrics.public.endpoint_metrics_5m
+(
+    timestamp timestamp,
+    app_name varchar(256),
+    service_name varchar(512),
+    apdex float,
+    http_code_300_count int,
+    http_code_400_count int,
+    http_code_500_count int,
+    span_status_error_count int,
+    exception_count int,
+    max_latency bigint,
+    min_latency bigint,
+    avg_latency bigint,
+    request_count bigint,
+    rpm bigint,
+    count_le_500 bigint,
+    count_le_1000 bigint,
+    count_le_5000 bigint,
+    quantile_50 bigint,
+    quantile_90 float,
+    quantile_95 float,
+    quantile_99 float,
+    instanceID varchar(512),
+    endpoint varchar(512)
+);
+
+SELECT create_hypertable('endpoint_metrics_5m', by_range('timestamp'));
+
+
+
+create table if not exists ob_metrics.public.service_metrics_20m
+(
+    timestamp timestamp,
+    app_name varchar(256),
+    service_name varchar(512),
+    apdex float,
+    http_code_300_count int,
+    http_code_400_count int,
+    http_code_500_count int,
+    span_status_error_count int,
+    exception_count int,
+    max_latency bigint,
+    min_latency bigint,
+    avg_latency bigint,
+    request_count bigint,
+    rpm bigint,
+    count_le_500 bigint,
+    count_le_1000 bigint,
+    count_le_5000 bigint,
+    quantile_50 bigint,
+    quantile_90 float,
+    quantile_95 float,
+    quantile_99 float
+);
+
+SELECT create_hypertable('service_metrics_20m', by_range('timestamp'));
+
+create table if not exists ob_metrics.public.service_instance_metrics_20m
+(
+    timestamp timestamp,
+    app_name varchar(256),
+    service_name varchar(512),
+    apdex float,
+    http_code_300_count int,
+    http_code_400_count int,
+    http_code_500_count int,
+    span_status_error_count int,
+    exception_count int,
+    max_latency bigint,
+    min_latency bigint,
+    avg_latency bigint,
+    request_count bigint,
+    rpm bigint,
+    count_le_500 bigint,
+    count_le_1000 bigint,
+    count_le_5000 bigint,
+    quantile_50 bigint,
+    quantile_90 float,
+    quantile_95 float,
+    quantile_99 float,
+    instanceID varchar(512)
+);
+
+SELECT create_hypertable('service_instance_metrics_20m', by_range('timestamp'));
+
+create table if not exists ob_metrics.public.endpoint_metrics_20m
+(
+    timestamp timestamp,
+    app_name varchar(256),
+    service_name varchar(512),
+    apdex float,
+    http_code_300_count int,
+    http_code_400_count int,
+    http_code_500_count int,
+    span_status_error_count int,
+    exception_count int,
+    max_latency bigint,
+    min_latency bigint,
+    avg_latency bigint,
+    request_count bigint,
+    rpm bigint,
+    count_le_500 bigint,
+    count_le_1000 bigint,
+    count_le_5000 bigint,
+    quantile_50 bigint,
+    quantile_90 float,
+    quantile_95 float,
+    quantile_99 float,
+    instanceID varchar(512),
+    endpoint varchar(512)
+);
+
+SELECT create_hypertable('endpoint_metrics_20m', by_range('timestamp'));
+
+create table if not exists ob_metrics.public.service_metrics_1h
+(
+    timestamp timestamp,
+    app_name varchar(256),
+    service_name varchar(512),
+    apdex float,
+    http_code_300_count int,
+    http_code_400_count int,
+    http_code_500_count int,
+    span_status_error_count int,
+    exception_count int,
+    max_latency bigint,
+    min_latency bigint,
+    avg_latency bigint,
+    request_count bigint,
+    rpm bigint,
+    count_le_500 bigint,
+    count_le_1000 bigint,
+    count_le_5000 bigint,
+    quantile_50 bigint,
+    quantile_90 float,
+    quantile_95 float,
+    quantile_99 float
+);
+
+SELECT create_hypertable('service_metrics_1h', by_range('timestamp'));
+
+create table if not exists ob_metrics.public.service_instance_metrics_1h
+(
+    timestamp timestamp,
+    app_name varchar(256),
+    service_name varchar(512),
+    apdex float,
+    http_code_300_count int,
+    http_code_400_count int,
+    http_code_500_count int,
+    span_status_error_count int,
+    exception_count int,
+    max_latency bigint,
+    min_latency bigint,
+    avg_latency bigint,
+    request_count bigint,
+    rpm bigint,
+    count_le_500 bigint,
+    count_le_1000 bigint,
+    count_le_5000 bigint,
+    quantile_50 bigint,
+    quantile_90 float,
+    quantile_95 float,
+    quantile_99 float,
+    instanceID varchar(512)
+);
+
+SELECT create_hypertable('service_instance_metrics_1h', by_range('timestamp'));
+
+create table if not exists ob_metrics.public.endpoint_metrics_1h
+(
+    timestamp timestamp,
+    app_name varchar(256),
+    service_name varchar(512),
+    apdex float,
+    http_code_300_count int,
+    http_code_400_count int,
+    http_code_500_count int,
+    span_status_error_count int,
+    exception_count int,
+    max_latency bigint,
+    min_latency bigint,
+    avg_latency bigint,
+    request_count bigint,
+    rpm bigint,
+    count_le_500 bigint,
+    count_le_1000 bigint,
+    count_le_5000 bigint,
+    quantile_50 bigint,
+    quantile_90 float,
+    quantile_95 float,
+    quantile_99 float,
+    instanceID varchar(512),
+    endpoint varchar(512)
+);
+
+SELECT create_hypertable('endpoint_metrics_1h', by_range('timestamp'));
+
+
+

+ 889 - 0
sql/no-replica.sql

@@ -0,0 +1,889 @@
+CREATE DATABASE if not exists otel on cluster default;
+CREATE TABLE if not exists otel.otel_traces_local on cluster default
+(
+    `Timestamp`             DateTime64(9) CODEC (Delta(8), ZSTD(1)),
+    `TraceId`               String CODEC (ZSTD(1)),
+    `SpanId`                String CODEC (ZSTD(1)),
+    `ParentSpanId`          String CODEC (ZSTD(1)),
+    `TraceState`            String CODEC (ZSTD(1)),
+    `SpanName` LowCardinality(String) CODEC (ZSTD(1)) ,
+    `SpanNameOrigin` LowCardinality(String) CODEC (ZSTD(1)) ,
+    `SpanKindNumber`              Int8 CODEC (ZSTD(1)) ,
+    `SpanKind`              LowCardinality(String) CODEC(ZSTD(1)),
+    `ServiceName` LowCardinality(String) CODEC (ZSTD(1)),
+    `ResourceAttributes` Map(LowCardinality(String), String) CODEC (ZSTD(1)),
+    `ScopeName`             String CODEC (ZSTD(1)),
+    `ScopeVersion`          String CODEC (ZSTD(1)),
+    `SpanAttributes` Map(LowCardinality(String), String) CODEC (ZSTD(1)),
+    `Duration`              Int64 CODEC (ZSTD(1)),
+    `StatusCodeNumber`            Int32 CODEC (ZSTD(1)),
+    `StatusCode`            LowCardinality(String) CODEC(ZSTD(1)),
+    `StatusMessage`         String CODEC (ZSTD(1)),
+    `Events.Timestamp`      Array(DateTime64(9)) CODEC (ZSTD(1)),
+    `Events.Name`           Array(LowCardinality(String)) CODEC (ZSTD(1)),
+    `Events.Attributes`     Array(Map(LowCardinality(String), String)) CODEC (ZSTD(1)),
+    `Links.TraceId`         Array(String) CODEC (ZSTD(1)),
+    `Links.SpanId`          Array(String) CODEC (ZSTD(1)),
+    `Links.TraceState`      Array(String) CODEC (ZSTD(1)),
+    `Links.Attributes`      Array(Map(LowCardinality(String), String)) CODEC (ZSTD(1)),
+    `HttpCode`              Int32,
+    `HttpMethod`            String CODEC (ZSTD(1)),
+    `HttpURL`               String CODEC (ZSTD(1)),
+    `ContainerId`           String CODEC (ZSTD(1)),
+    `srcIP`                 String CODEC (ZSTD(1)),
+    `srcPort`               Int32 CODEC (ZSTD(1)),
+    `targetIP`              String CODEC (ZSTD(1)),
+    `targetPort`            Int32 CODEC (ZSTD(1)),
+    `RPCType`               String,
+    `RPCName`               String,
+    `RPCRequest` Map(LowCardinality(String), String),
+    `RPCResult`             Int8,
+    `FuncNameSpace`         String,
+    `FuncName`              String,
+    `FuncLineNO`            Int32,
+    `FuncResult`            Int32,
+    `dbStatement`           String CODEC (ZSTD(1)),
+    `dbConnectionString`    String CODEC (ZSTD(1)),
+    `Exceptions.type`       Array(String),
+    `Exceptions.message`    Array(String),
+    `Exceptions.stacktrace` Array(String),
+    `AppAlias`              String,
+    INDEX idx_trace_id TraceId TYPE bloom_filter(0.001) GRANULARITY 1,
+    INDEX idx_res_attr_key mapKeys(ResourceAttributes) TYPE bloom_filter(0.01) GRANULARITY 1,
+    INDEX idx_res_attr_value mapValues(ResourceAttributes) TYPE bloom_filter(0.01) GRANULARITY 1,
+    INDEX idx_span_attr_key mapKeys(SpanAttributes) TYPE bloom_filter(0.01) GRANULARITY 1,
+    INDEX idx_span_attr_value mapValues(SpanAttributes) TYPE bloom_filter(0.01) GRANULARITY 1,
+    INDEX idx_duration Duration TYPE minmax GRANULARITY 1
+) ENGINE = MergeTree() PARTITION BY (AppAlias, toDate(Timestamp))
+      ORDER BY
+          (
+           AppAlias,
+           ServiceName,
+           SpanName,
+           toDateTime(Timestamp)
+              ) TTL toDateTime(Timestamp) + toIntervalDay(10),
+    toDateTime(Timestamp) + toIntervalDay(2)
+WHERE
+    AppAlias = 'WLGLPT' SETTINGS index_granularity = 8192,
+    ttl_only_drop_parts = 1;
+
+
+
+drop table if exists otel.otel_traces on cluster default;
+CREATE TABLE if not exists otel.otel_traces on cluster default
+(
+    `Timestamp`             DateTime64(9) CODEC (Delta(8), ZSTD(1)),
+    `TraceId`               String CODEC (ZSTD(1)),
+    `SpanId`                String CODEC (ZSTD(1)),
+    `ParentSpanId`          String CODEC (ZSTD(1)),
+    `TraceState`            String CODEC (ZSTD(1)),
+    `SpanName` LowCardinality(String) CODEC (ZSTD(1)) ,
+    `SpanNameOrigin` LowCardinality(String) CODEC (ZSTD(1)) ,
+    `SpanKindNumber`              Int8 CODEC (ZSTD(1)) ,
+    `SpanKind`              LowCardinality(String) CODEC(ZSTD(1)),
+    `ServiceName` LowCardinality(String) CODEC (ZSTD(1)),
+    `ResourceAttributes` Map(LowCardinality(String), String) CODEC (ZSTD(1)),
+    `ScopeName`             String CODEC (ZSTD(1)),
+    `ScopeVersion`          String CODEC (ZSTD(1)),
+    `SpanAttributes` Map(LowCardinality(String), String) CODEC (ZSTD(1)),
+    `Duration`              Int64 CODEC (ZSTD(1)),
+    `StatusCodeNumber`            Int32 CODEC (ZSTD(1)),
+    `StatusCode`            Int32 CODEC (ZSTD(1)),
+    `StatusMessage`         String CODEC (ZSTD(1)),
+    `Events.Timestamp`      Array(DateTime64(9)) CODEC (ZSTD(1)),
+    `Events.Name`           Array(LowCardinality(String)) CODEC (ZSTD(1)),
+    `Events.Attributes`     Array(Map(LowCardinality(String), String)) CODEC (ZSTD(1)),
+    `Links.TraceId`         Array(String) CODEC (ZSTD(1)),
+    `Links.SpanId`          Array(String) CODEC (ZSTD(1)),
+    `Links.TraceState`      Array(String) CODEC (ZSTD(1)),
+    `Links.Attributes`      Array(Map(LowCardinality(String), String)) CODEC (ZSTD(1)),
+    `HttpCode`              Int32,
+    `HttpMethod`            String CODEC (ZSTD(1)),
+    `HttpURL`               String CODEC (ZSTD(1)),
+    `ContainerId`           String CODEC (ZSTD(1)),
+    `srcIP`                 String CODEC (ZSTD(1)),
+    `srcPort`               Int32 CODEC (ZSTD(1)),
+    `targetIP`              String CODEC (ZSTD(1)),
+    `targetPort`            Int32 CODEC (ZSTD(1)),
+    `RPCType`               String,
+    `RPCName`               String,
+    `RPCRequest` Map(LowCardinality(String), String),
+    `RPCResult`             Int8,
+    `FuncNameSpace`         String,
+    `FuncName`              String,
+    `FuncLineNO`            Int32,
+    `FuncResult`            Int32,
+    `dbStatement`           String CODEC (ZSTD(1)),
+    `dbConnectionString`    String CODEC (ZSTD(1)),
+    `Exceptions.type`       Array(String),
+    `Exceptions.message`    Array(String),
+    `Exceptions.stacktrace` Array(String),
+    `AppAlias`              String
+) ENGINE = Distributed(
+           'default',
+           'otel',
+           'otel_traces_local',
+           rand()
+                      );
+
+CREATE TABLE if not exists otel.otel_traces_url_local on cluster default
+(
+    `Timestamp`  DateTime64(3) COMMENT '时间',
+    `TraceId`    String COMMENT 'TraceId',
+    `SpanId`     String COMMENT 'SpanId',
+    `Route` LowCardinality(String) COMMENT '路由',
+    `RouteRaw` LowCardinality(String),
+    `Path`       String COMMENT 'URI Path',
+    `Query`      String COMMENT 'URI Query',
+    `Target`     String COMMENT '请求目标,正常情况下为path?query',
+    `ProtocolName` LowCardinality(String) COMMENT '协议名称',
+    `ProtocolVersion` LowCardinality(String) COMMENT '协议版本号',
+    `Method` LowCardinality(String)   DEFAULT 'GET' COMMENT 'http方法,如GET、POST',
+    `StatusCode` Int64                DEFAULT 200 COMMENT 'http 状态码',
+    `Message`    String COMMENT '信息,通常为异常信息',
+    `UserAgent`  String COMMENT '用户代理',
+    `Duration`   Int64 COMMENT '耗时',
+    `ServiceName` LowCardinality(String) COMMENT '服务名',
+    `AppAlias` LowCardinality(String) DEFAULT '' COMMENT '应用别名'
+) ENGINE = MergeTree() PARTITION BY toDate(Timestamp)
+      ORDER BY
+          (
+           AppAlias,
+           ServiceName,
+           Route,
+           Target,
+           toDateTime(Timestamp)
+              ) TTL toDateTime(Timestamp) + toIntervalDay(10) SETTINGS index_granularity = 8192,
+        ttl_only_drop_parts = 1 COMMENT '记录url相关的span';
+
+
+CREATE TABLE if not exists otel.otel_traces_url on cluster default
+(
+    `Timestamp`  DateTime64(3) COMMENT '时间',
+    `TraceId`    String COMMENT 'TraceId',
+    `SpanId`     String COMMENT 'SpanId',
+    `Route` LowCardinality(String) COMMENT '路由',
+    `RouteRaw` LowCardinality(String),
+    `Path`       String COMMENT 'URI Path',
+    `Query`      String COMMENT 'URI Query',
+    `Target`     String COMMENT '请求目标,正常情况下为path?query',
+    `ProtocolName` LowCardinality(String) COMMENT '协议名称',
+    `ProtocolVersion` LowCardinality(String) COMMENT '协议版本号',
+    `Method` LowCardinality(String) DEFAULT 'GET' COMMENT 'http方法, 如GET、POST',
+    `StatusCode` Int64              DEFAULT 200 COMMENT 'http 状态码',
+    `Message`    String COMMENT '信息,通常为异常信息',
+    `UserAgent`  String COMMENT '用户代理',
+    `Duration`   Int64 COMMENT '耗时',
+    `ServiceName` LowCardinality(String) COMMENT '服务名',
+    `AppAlias`   String             DEFAULT '' COMMENT '应用别名'
+) ENGINE = Distributed(
+           'default',
+           'otel',
+           'otel_traces_url_local',
+           cityHash64(TraceId)
+                      );
+
+
+CREATE MATERIALIZED VIEW if not exists otel.otel_traces_url_local_mv
+            on cluster default TO otel.otel_traces_url_local (
+                                                              `Timestamp` DateTime64(9),
+                                                              `TraceId` String,
+                                                              `SpanId` String,
+                                                              `Route` String,
+                                                              `RouteRaw` String,
+                                                              `Path` String,
+                                                              `Query` String,
+                                                              `Target` String,
+                                                              `ProtocolName` String,
+                                                              `ProtocolVersion` String,
+                                                              `Method` String,
+                                                              `StatusCode` Int64,
+                                                              `Message` String,
+                                                              `UserAgent` String,
+                                                              `Duration` Int64,
+                                                              `ServiceName` LowCardinality(String),
+                                                              `AppAlias` String
+        )
+AS
+WITH '\\b[a-fA-F0-9]{32,}\\b|\\b[a-fA-F0-9\\-]{36}\\b|\\b\\[1-9]\\d*\\b' AS varPattern, SpanAttributes['http.route'] AS route, lower(
+        splitByString('.', splitByString('/', Path)[-1])[-1]
+                                                                                                                               ) AS fileExt, multiIf(
+            (SpanAttributes['url.path']) != '',
+            SpanAttributes['url.path'],
+            (SpanAttributes['http.target']) != '',
+            path(SpanAttributes['http.target']),
+            (SpanAttributes['http.url']) != '',
+            path(SpanAttributes['http.url']),
+            (SpanAttributes['url.full']) != '',
+            path(SpanAttributes['url.full']),
+            SpanAttributes['http.route']
+                                                                                                                                             ) AS path
+SELECT Timestamp,
+       TraceId,
+       SpanId,
+       if(
+                   route != '',
+                   route,
+                   replaceRegexpOne(Path, varPattern, '{:var}')
+       )     AS Route,
+       route AS RouteRaw,
+       path  AS Path,
+       if(
+                   (SpanAttributes['url.query']) != '',
+                   SpanAttributes['url.query'],
+                   queryString(SpanAttributes['http.target'])
+       )     AS Query,
+       if(
+                   (SpanAttributes['http.target']) != '',
+                   SpanAttributes['http.target'],
+                   if(
+                               (SpanAttributes['url.path']) != '',
+                               SpanAttributes['url.path'],
+                               SpanAttributes['http.route']
+                   )
+       )     AS Target,
+       multiIf(
+                   (SpanAttributes['http.scheme']) != '',
+                   SpanAttributes['http.scheme'],
+                   (SpanAttributes['url.scheme']) != '',
+                   SpanAttributes['url.scheme'],
+                   SpanAttributes['network.protocol.name']
+       )     AS ProtocolName,
+       if(
+                   (SpanAttributes['http.flavor']) != '',
+                   SpanAttributes['http.flavor'],
+                   SpanAttributes['network.protocol.version']
+       )     AS ProtocolVersion,
+       if(
+                   (SpanAttributes['http.method']) != '',
+                   SpanAttributes['http.method'],
+                   SpanAttributes['http.request.method']
+       )     AS Method,
+       toInt64OrZero(
+               if(
+                           (SpanAttributes['http.status_code']) != '',
+                           SpanAttributes['http.status_code'],
+                           SpanAttributes['http.response.status_code']
+               )
+       )     AS StatusCode,
+       if(
+                   StatusMessage != '',
+                   StatusMessage,
+                   SpanAttributes['error.type']
+       )     AS Message,
+       if(
+                   (SpanAttributes['http.user_agent']) != '',
+                   SpanAttributes['http.user_agent'],
+                   SpanAttributes['user_agent.original']
+       )     AS UserAgent,
+       Duration,
+       ServiceName,
+       AppAlias
+FROM otel.otel_traces
+WHERE (SpanKind = 2)
+  AND (Path != '')
+  AND (
+        ((SpanAttributes['http.method']) != '')
+        OR ((SpanAttributes['http.request.method']) != '')
+    )
+  AND (fileExt != Path)
+  AND (
+        fileExt NOT IN (
+                        'css',
+                        'gif',
+                        'ico',
+                        'jpg',
+                        'jpeg',
+                        'js',
+                        'json',
+                        'mp3',
+                        'mp4',
+                        'png',
+                        'svg',
+                        'ttf',
+                        'txt',
+                        'wasm',
+                        'woff'
+        )
+    );
+
+CREATE TABLE if not exists otel.otel_url_path_local on cluster default
+(
+    `AppAlias` LowCardinality(String) DEFAULT '' COMMENT '应用别名',
+    `ServiceName` LowCardinality(String) COMMENT '服务名',
+    `Method` LowCardinality(String)   DEFAULT 'GET' COMMENT 'http方法, 如GET, POST',
+    `Route` String COMMENT '路由',
+    `Path`  String COMMENT 'URL Path'
+) ENGINE = MergeTree()
+      PARTITION BY AppAlias
+      ORDER BY
+          (AppAlias, ServiceName, Method, Route, Path) SETTINGS index_granularity = 8192,
+        ttl_only_drop_parts = 1 COMMENT '记录uri, 每个(AppAlias+ServiceName+Method)下的uri最终唯一';
+
+CREATE TABLE if not exists otel.otel_url_path on cluster default
+(
+    `AppAlias` LowCardinality(String) DEFAULT '' COMMENT '应用别名',
+    `ServiceName` LowCardinality(String) COMMENT '服务名',
+    `Method` LowCardinality(String)   DEFAULT 'GET' COMMENT 'http方法, 如GET, POST',
+    `Route` String COMMENT '路由',
+    `Path`  String COMMENT 'URL Path'
+) ENGINE = Distributed(
+           'default',
+           'otel',
+           'otel_url_path_local',
+           cityHash64(Path)
+                      ) COMMENT '记录uri, 每个(AppAlias+ServiceName+Method)下的uri最终唯一';
+
+
+CREATE MATERIALIZED VIEW if not exists otel.otel_url_path_local_mv
+            on cluster default TO otel.otel_url_path_local (
+                                                            `AppAlias` LowCardinality(String) DEFAULT '' COMMENT '应用别名',
+                                                            `ServiceName` LowCardinality(String) COMMENT '服务名',
+                                                            `Method` LowCardinality(String) DEFAULT 'GET' COMMENT 'http方法, 如GET, POST',
+                                                            `Route` String COMMENT '路由',
+                                                            `Path` String COMMENT 'URL Path'
+        )
+AS
+SELECT DISTINCT ServiceName,
+                AppAlias,
+                Method,
+                Route,
+                Path
+FROM otel.otel_traces_url_local
+WHERE (Path != '')
+  AND (StatusCode != 404)
+  AND (
+        lower(
+                splitByString('.', splitByString('/', Path)[-1])[2]
+        ) NOT IN (
+                  'css',
+                  'gif',
+                  'ico',
+                  'jpg',
+                  'jpeg',
+                  'js',
+                  'json',
+                  'mp3',
+                  'mp4',
+                  'png',
+                  'svg',
+                  'ttf',
+                  'txt',
+                  'wasm',
+                  'woff'
+            )
+    );
+
+
+CREATE TABLE if not exists otel.otel_traces_trace_id_ts on cluster default
+(
+    `TraceId` String CODEC (ZSTD(1)),
+    `Start`   DateTime64(9) CODEC (Delta(8), ZSTD(1)),
+    `End`     DateTime64(9) CODEC (Delta(8), ZSTD(1)),
+    `Latency` Int64,
+    INDEX idx_trace_id TraceId TYPE bloom_filter(0.01) GRANULARITY 1
+) ENGINE = MergeTree
+      ORDER BY
+          (TraceId, Latency) TTL toDateTime(Start) + toIntervalDay(10) SETTINGS index_granularity = 8192;
+
+
+CREATE MATERIALIZED VIEW if not exists otel.otel_traces_trace_id_ts_mv
+            on cluster default TO otel.otel_traces_trace_id_ts (
+                                                                `Start` DateTime64(9),
+                                                                `End` DateTime64(9),
+                                                                `Latency` Int64
+        )
+AS
+SELECT min(Timestamp)                   AS Start,
+       max(
+               fromUnixTimestamp64Nano(toUnixTimestamp64Nano(Timestamp) + Duration)
+       )                                AS
+                                           End,
+       toUnixTimestamp64Nano(End
+       ) - toUnixTimestamp64Nano(Start) AS Latency
+FROM otel.otel_traces
+WHERE TraceId != ''
+GROUP BY TraceId;
+
+
+CREATE TABLE if not exists otel.otel_traces_summary_local on cluster default
+(
+    `TraceId`    String COMMENT 'TraceId',
+    `StartTime`  DateTime64(3) COMMENT '开始时间',
+    `EndTime`    DateTime64(3) COMMENT '结束时间',
+    `Duration`   Int64 COMMENT '耗时',
+    `SpanNum`    Int64 COMMENT 'Span数量',
+    `ServiceNum` Int64 COMMENT '服务数量',
+    `ErrorNum`   Int64 COMMENT '错误数量',
+    `EventNum`   Int64 COMMENT '事件数量'
+) ENGINE = MergeTree() PARTITION BY toDate(StartTime)
+      ORDER BY
+          (StartTime, Duration) TTL toDateTime(EndTime) + toIntervalDay(10) SETTINGS ttl_only_drop_parts = 1,
+        index_granularity = 8192;
+
+CREATE TABLE if not exists otel.otel_traces_summary on cluster default
+(
+    `TraceId`    String COMMENT 'TraceId',
+    `StartTime`  DateTime64(3) COMMENT '开始时间',
+    `EndTime`    DateTime64(3) COMMENT '结束时间',
+    `Duration`   Int64 COMMENT '耗时',
+    `SpanNum`    Int64 COMMENT 'Span数量',
+    `ServiceNum` Int64 COMMENT '服务数量',
+    `ErrorNum`   Int64 COMMENT '错误数量',
+    `EventNum`   Int64 COMMENT '事件数量'
+) ENGINE = Distributed(
+           'default',
+           'otel',
+           'otel_traces_summary_local',
+           toSecond(StartTime)
+                      );
+
+CREATE MATERIALIZED VIEW if not exists otel.otel_traces_summary_mv
+            on cluster default TO otel.otel_traces_summary (
+                                                            `TraceId` String,
+                                                            `SpanNum` UInt64,
+                                                            `ServiceNum` UInt64,
+                                                            `StartTime` DateTime64(9),
+                                                            `EndTime` DateTime64(9),
+                                                            `Duration` Int64,
+                                                            `ErrorNum` UInt64,
+                                                            `EventNum` UInt64
+        )
+AS
+SELECT TraceId,
+       count()                                         AS SpanNum,
+       COUNTDistinct(ServiceName)                      AS ServiceNum,
+       min(Timestamp)                                  AS StartTime,
+       max(Timestamp)                                  AS EndTime,
+       max(Duration)                                   AS Duration,
+       sum(if(StatusCode = 'STATUS_CODE_ERROR', 1, 0)) AS ErrorNum,
+       sum(length(`Events.Name`))                      AS EventNum
+FROM otel.otel_traces AS ot
+GROUP BY TraceId;
+
+CREATE TABLE if not exists otel.otel_traces_error_local on cluster default
+(
+    `Timestamp`     DateTime64(3) COMMENT '时间',
+    `TraceId`       String COMMENT 'TraceId',
+    `SpanId`        String COMMENT 'SpanId',
+    `ParentSpanId`  String CODEC (ZSTD(1)),
+    `SpanName` LowCardinality(String) CODEC (ZSTD(1)),
+    `SpanKind` LowCardinality(String) CODEC (ZSTD(1)),
+    `SpanAttributes` Map(LowCardinality(String), String) CODEC (ZSTD(1)),
+    `ServiceName` LowCardinality(String) CODEC (ZSTD(1)),
+    `Duration`      UInt64 CODEC (ZSTD(1)),
+    `StatusCode` LowCardinality(String) CODEC (ZSTD(1)),
+    `StatusMessage` String CODEC (ZSTD(1)),
+    `AppAlias` LowCardinality(String)
+) ENGINE = MergeTree() PARTITION BY toDate(Timestamp)
+      ORDER BY
+          (
+           AppAlias,
+           ServiceName,
+           StatusMessage,
+           toDateTime(Timestamp)
+              ) TTL toDateTime(Timestamp) + toIntervalDay(10) SETTINGS index_granularity = 8192,
+        ttl_only_drop_parts = 1 COMMENT '记录错误相关的span';
+
+CREATE TABLE if not exists otel.otel_traces_error on cluster default
+(
+    `Timestamp`     DateTime64(3) COMMENT '时间',
+    `TraceId`       String COMMENT 'TraceId',
+    `SpanId`        String COMMENT 'SpanId',
+    `ParentSpanId`  String CODEC (ZSTD(1)),
+    `SpanName` LowCardinality(String) CODEC (ZSTD(1)),
+    `SpanKind` LowCardinality(String) CODEC (ZSTD(1)),
+    `SpanAttributes` Map(LowCardinality(String), String) CODEC (ZSTD(1)),
+    `ServiceName` LowCardinality(String) CODEC (ZSTD(1)),
+    `Duration`      UInt64 CODEC (ZSTD(1)),
+    `StatusCode` LowCardinality(String) CODEC (ZSTD(1)),
+    `StatusMessage` String CODEC (ZSTD(1)),
+    `AppAlias` LowCardinality(String)
+) ENGINE = Distributed(
+           'default',
+           'otel',
+           'otel_traces_error_local',
+           cityHash64(TraceId)
+                      );
+
+
+CREATE TABLE if not exists otel.otel_logs_local on cluster default
+(
+    `Timestamp`         DateTime64(9) CODEC (Delta(8), ZSTD(1)),
+    `TraceId`           String CODEC (ZSTD(1)),
+    `SpanId`            String CODEC (ZSTD(1)),
+    `TraceFlags`        UInt32 CODEC (ZSTD(1)),
+    `SeverityText` LowCardinality(String) CODEC (ZSTD(1)),
+    `SeverityNumber`    Int32 CODEC (ZSTD(1)),
+    `ServiceName` LowCardinality(String) CODEC (ZSTD(1)),
+    `Body`              String CODEC (ZSTD(1)),
+    `ResourceSchemaUrl` String CODEC (ZSTD(1)),
+    `ResourceAttributes` Map(LowCardinality(String), String) CODEC (ZSTD(1)),
+    `ScopeSchemaUrl`    String CODEC (ZSTD(1)),
+    `ScopeName`         String CODEC (ZSTD(1)),
+    `ScopeVersion`      String CODEC (ZSTD(1)),
+    `ScopeAttributes` Map(LowCardinality(String), String) CODEC (ZSTD(1)),
+    `LogAttributes` Map(LowCardinality(String), String) CODEC (ZSTD(1)),
+    INDEX idx_trace_id TraceId TYPE bloom_filter(0.001) GRANULARITY 1,
+    INDEX idx_res_attr_key mapKeys(ResourceAttributes) TYPE bloom_filter(0.01) GRANULARITY 1,
+    INDEX idx_res_attr_value mapValues(ResourceAttributes) TYPE bloom_filter(0.01) GRANULARITY 1,
+    INDEX idx_log_attr_key mapKeys(LogAttributes) TYPE bloom_filter(0.01) GRANULARITY 1,
+    INDEX idx_log_attr_value mapValues(LogAttributes) TYPE bloom_filter(0.01) GRANULARITY 1,
+    INDEX idx_body Body TYPE tokenbf_v1(32768, 3, 0) GRANULARITY 1
+)
+    ENGINE = MergeTree() PARTITION BY toDate(Timestamp) ORDER BY (ServiceName, SeverityText, toUnixTimestamp(Timestamp), TraceId) TTL toDateTime(Timestamp) + toIntervalDay(10)
+        SETTINGS index_granularity = 8192, ttl_only_drop_parts = 1;
+
+
+CREATE TABLE if not exists otel.otel_logs on cluster default
+(
+    `Timestamp`         DateTime64(9),
+    `TraceId`           String,
+    `SpanId`            String,
+    `TraceFlags`        UInt32,
+    `SeverityText` LowCardinality(String),
+    `SeverityNumber`    Int32,
+    `ServiceName` LowCardinality(String),
+    `Body`              String,
+    `ResourceSchemaUrl` String CODEC (ZSTD(1)),
+    `ResourceAttributes` Map(LowCardinality(String), String),
+    `ScopeSchemaUrl`    String CODEC (ZSTD(1)),
+    `ScopeName`         String CODEC (ZSTD(1)),
+    `ScopeVersion`      String CODEC (ZSTD(1)),
+    `ScopeAttributes` Map(LowCardinality(String), String) CODEC (ZSTD(1)),
+    `LogAttributes` Map(LowCardinality(String), String)
+) ENGINE = Distributed('default', 'otel', 'otel_logs_local', rand());
+
+
+CREATE TABLE if not exists otel.otel_http_route_local on cluster default
+(
+    `AppAlias` LowCardinality(String) DEFAULT '' COMMENT '应用别名',
+    `ServiceName` LowCardinality(String) COMMENT '服务名',
+    `Method` LowCardinality(String)   DEFAULT 'GET' COMMENT 'http方法, 如GET, POST',
+    `Route`         String COMMENT '路由(可能是原始的,如果原始路由不存在,则为自动匹配的路由)',
+    `RouteRaw`      String COMMENT '路由(原始)',
+    `LastPath`      String COMMENT '最近的一个url path',
+    `LastTimestamp` DateTime64(3) COMMENT '最近一个来的Route的时间',
+    `IsDeleted`     UInt8
+) ENGINE = MergeTree() PARTITION BY AppAlias
+      ORDER BY
+          (AppAlias, ServiceName, Method, Route) SETTINGS index_granularity = 8192,
+        ttl_only_drop_parts = 1 COMMENT '记录uri, 每个(AppAlias+ServiceName+Method)下的uri最终唯一';
+
+
+CREATE TABLE if not exists otel.otel_http_route on cluster default
+(
+    `AppAlias` LowCardinality(String) DEFAULT '' COMMENT '应用别名',
+    `ServiceName` LowCardinality(String) COMMENT '服务名',
+    `Method` LowCardinality(String)   DEFAULT 'GET' COMMENT 'http方法, 如GET, POST',
+    `Route`         String COMMENT '路由',
+    `RouteRaw`      String COMMENT '路由(原始)',
+    `LastPath`      String COMMENT '最近的一个url path',
+    `LastTimestamp` DateTime64(3) COMMENT '最近一个来的Route的时间',
+    `IsDeleted`     UInt8
+) ENGINE = Distributed(
+           'default',
+           'otel',
+           'otel_http_route_local',
+           cityHash64(Route)
+                      ) COMMENT 'http路由表, 每个路由在(AppAlias+ServiceName+Method)下最终唯一';
+
+
+CREATE MATERIALIZED VIEW if not exists otel.otel_http_route_local_mv
+            on cluster default TO otel.otel_http_route_local (
+                                                              `AppAlias` LowCardinality(String),
+                                                              `ServiceName` LowCardinality(String),
+                                                              `Method` LowCardinality(String),
+                                                              `Route` LowCardinality(String),
+                                                              `RouteRaw` LowCardinality(String),
+                                                              `LastPath` String,
+                                                              `LastTimestamp` DateTime64(3)
+        )
+AS
+SELECT DISTINCT AppAlias,
+                ServiceName,
+                Method,
+                Route,
+                RouteRaw,
+                Path      AS LastPath,
+                Timestamp AS LastTimestamp
+FROM otel.otel_traces_url_local
+WHERE Path != '';
+
+
+
+CREATE TABLE if not exists otel.otel_events_local on cluster default
+(
+    `Timestamp`   DateTime64(9) CODEC (Delta(8), ZSTD(1)),
+    `AppendTime`  DateTime64(9) COMMENT '发生时间',
+    `UID` LowCardinality(String) COMMENT '规则执行唯一标识',
+    `AppId`       Int64 COMMENT '应用ID',
+    `RuleId`      Int64 COMMENT '规则ID',
+    `AppName` LowCardinality(String) COMMENT '应用名称',
+    `AppAlias` LowCardinality(String) COMMENT '应用别名',
+    `ExceptionName` LowCardinality(String) COMMENT '异常名称',
+    `RuleInfo` Map(LowCardinality(String), String) COMMENT '规则信息' CODEC(ZSTD(1)),
+    `CompareV`    String COMMENT '实际比对值',
+    `RowResult`   String COMMENT 'row类型值',
+    `RowsResult`  Array(Map(LowCardinality(String), String)) COMMENT 'rows类型值' CODEC(ZSTD(1)),
+    `AlertStatus` Int64 COMMENT '告警状态'
+) ENGINE = MergeTree() PARTITION BY AppId
+      ORDER BY
+          AppendTime TTL toDateTime(Timestamp) + toIntervalDay(30) SETTINGS index_granularity = 8192 COMMENT '自定义的otel事件';
+
+CREATE TABLE if not exists otel.otel_events on cluster default
+(
+    `Timestamp`   DateTime64(9) CODEC (Delta(8), ZSTD(1)),
+    `AppendTime`  DateTime64(9) COMMENT '发生时间',
+    `UID` LowCardinality(String) COMMENT '规则执行唯一标识',
+    `AppId`       Int64 COMMENT '应用ID',
+    `RuleId`      Int64 COMMENT '规则ID',
+    `AppName` LowCardinality(String) COMMENT '应用名称',
+    `AppAlias` LowCardinality(String) COMMENT '应用别名',
+    `ExceptionName` LowCardinality(String) COMMENT '异常名称',
+    `RuleInfo` Map(LowCardinality(String), String) COMMENT '规则信息' CODEC(ZSTD(1)),
+    `CompareV`    String COMMENT '实际比对值',
+    `RowResult`   String COMMENT 'row类型值',
+    `RowsResult`  Array(Map(LowCardinality(String), String)) COMMENT 'rows类型值' CODEC(ZSTD(1)),
+    `AlertStatus` Int64 COMMENT '告警状态'
+) ENGINE = Distributed('default', 'otel', 'otel_events_local', rand());
+
+
+CREATE TABLE if not exists otel.ot_url_mapping on cluster default
+(
+    `app_alias`        String,
+    `name`             String,
+    `url`              String,
+    `method`           String,
+    `service_name`     String,
+    `route`            String,
+    `route_raw`        String,
+    `favor`            Int8,
+    `level`            Int8,
+    `is_perfect_match` Int8,
+    `updated_at`       DateTime64(3)
+) ENGINE = MySQL(
+           'mysql-primary.observe.svc.cluster.local:3306',
+           'observe',
+           'ot_url_mapping',
+           'root',
+           'pg3mfWRtYonekZWB'
+                );
+
+
+CREATE MATERIALIZED VIEW if not exists otel.ot_url_mapping_mv
+            on cluster default TO otel.ot_url_mapping (
+                                                       `app_alias` LowCardinality(String),
+                                                       `service_name` LowCardinality(String),
+                                                       `method` LowCardinality(String),
+                                                       `route` String,
+                                                       `route_raw` String,
+                                                       `level` Int64,
+                                                       `is_perfect_match` UInt8,
+                                                       `updated_at` DateTime64(3)
+        )
+AS
+SELECT DISTINCT AppAlias      AS app_alias,
+                ServiceName   AS service_name,
+                Method        AS method,
+                Route         AS route,
+                RouteRaw      AS route_raw,
+                if(
+                            Route = '/',
+                            0,
+                            length(splitByString('/', Route)) - 1
+                )             AS level,
+                1             AS is_perfect_match,
+                LastTimestamp AS updated_at
+FROM otel.otel_http_route
+WHERE IsDeleted = 0;
+
+CREATE TABLE if not exists otel.ot_service_nodes on cluster default
+(
+    `id`           Int64,
+    `app_alias`    String,
+    `service_name` String,
+    `name`         String,
+    `type`         Int32,
+    `Kind`         String
+) ENGINE = MySQL(
+           'mysql-primary.observe.svc.cluster.local:3306',
+           'observe',
+           'ot_service_nodes',
+           'root',
+           'pg3mfWRtYonekZWB'
+                );
+
+
+CREATE TABLE if not exists otel.otel_traces_aggbyapp_local on cluster default
+(
+    `AppAlias` LowCardinality(String),
+    `StartTime` DateTime,
+    `ServiceNum` AggregateFunction(uniq, LowCardinality(String)),
+    `ServiceList` AggregateFunction(groupUniqArray, LowCardinality(String)),
+    `TraceNum` AggregateFunction(uniq, String),
+    `TraceErrorNum` AggregateFunction(uniqIf, String, UInt8),
+    `TraceDurationMin` AggregateFunction(minIf, Int64, UInt8),
+    `TraceDurationMax` AggregateFunction(maxIf, Int64, UInt8),
+    `TraceDurationAvg` AggregateFunction(avgIf, Int64, UInt8),
+    `TraceDurationQuantile` AggregateFunction(quantileIf, Int64, UInt8),
+    `SpanNum` AggregateFunction(count),
+    `SpanErrorNum` AggregateFunction(countIf, UInt8)
+) ENGINE = AggregatingMergeTree() PARTITION BY toDate(StartTime, 'Asia/Shanghai')
+      ORDER BY
+          (AppAlias, StartTime) TTL StartTime + toIntervalDay(1) SETTINGS index_granularity = 8192;
+
+CREATE TABLE if not exists otel.otel_traces_aggbyapp on cluster default
+(
+    `AppAlias` LowCardinality(String),
+    `StartTime` DateTime,
+    `ServiceNum` AggregateFunction(uniq, LowCardinality(String)),
+    `ServiceList` AggregateFunction(groupUniqArray, LowCardinality(String)),
+    `TraceNum` AggregateFunction(uniq, String),
+    `TraceErrorNum` AggregateFunction(uniqIf, String, UInt8),
+    `TraceDurationMin` AggregateFunction(minIf, Int64, UInt8),
+    `TraceDurationMax` AggregateFunction(maxIf, Int64, UInt8),
+    `TraceDurationAvg` AggregateFunction(avgIf, Int64, UInt8),
+    `TraceDurationQuantile` AggregateFunction(quantileIf, Int64, UInt8),
+    `SpanNum` AggregateFunction(count),
+    `SpanErrorNum` AggregateFunction(countIf, UInt8)
+) ENGINE = Distributed('default', 'otel', 'otel_traces_aggbyapp_local');
+
+
+CREATE TABLE if not exists otel.otel_traces_aggbyapp_local_utc on cluster default
+(
+    `AppAlias` LowCardinality(String),
+    `StartTime` DateTime,
+    `ServiceNum` AggregateFunction(uniq, LowCardinality(String)),
+    `ServiceList` AggregateFunction(groupUniqArray, LowCardinality(String)),
+    `TraceNum` AggregateFunction(uniq, String),
+    `TraceErrorNum` AggregateFunction(uniqIf, String, UInt8),
+    `TraceDurationMin` AggregateFunction(minIf, Int64, UInt8),
+    `TraceDurationMax` AggregateFunction(maxIf, Int64, UInt8),
+    `TraceDurationAvg` AggregateFunction(avgIf, Int64, UInt8),
+    `TraceDurationQuantile` AggregateFunction(quantileIf, Int64, UInt8),
+    `SpanNum` AggregateFunction(count),
+    `SpanErrorNum` AggregateFunction(countIf, UInt8)
+) ENGINE = AggregatingMergeTree() PARTITION BY toDate(StartTime)
+      ORDER BY
+          (AppAlias, StartTime) SETTINGS index_granularity = 8192;
+
+drop table if exists otel.otel_traces_aggbyapp_local_mv on cluster default;
+CREATE MATERIALIZED VIEW if not exists otel.otel_traces_aggbyapp_local_mv
+            on cluster default TO otel.otel_traces_aggbyapp_local (
+                                                                   `AppAlias` String,
+                                                                   `StartTime` DateTime,
+                                                                   `ServiceNum` AggregateFunction(uniq, String),
+                                                                   `ServiceList` AggregateFunction(groupUniqArray, String),
+                                                                   `TraceNum` AggregateFunction(uniq, String),
+                                                                   `TraceErrorNum` AggregateFunction(uniqIf, String, UInt8),
+                                                                   `TraceDurationMin` AggregateFunction(minIf, Int64, UInt8),
+                                                                   `TraceDurationMax` AggregateFunction(maxIf, Int64, UInt8),
+                                                                   `TraceDurationAvg` AggregateFunction(avgIf, Int64, UInt8),
+                                                                   `TraceDurationQuantile` AggregateFunction(quantileIf, Int64, UInt8),
+                                                                   `SpanNum` AggregateFunction(count),
+                                                                   `SpanErrorNum` AggregateFunction(countIf, UInt8)
+        )
+AS
+SELECT AppAlias,
+       toStartOfFiveMinutes(Timestamp)              AS StartTime,
+       uniqState(ServiceName)                       AS ServiceNum,
+       groupUniqArrayState(ServiceName)             AS ServiceList,
+       uniqState(TraceId)                           AS TraceNum,
+       uniqIfState(TraceId, StatusCode = 2)         AS TraceErrorNum,
+       minIfState(Duration, ParentSpanId = '')      AS TraceDurationMin,
+       maxIfState(Duration, ParentSpanId = '')      AS TraceDurationMax,
+       avgIfState(Duration, ParentSpanId = '')      AS TraceDurationAvg,
+       quantileIfState(Duration, ParentSpanId = '') AS TraceDurationQuantile,
+       countState()                                 AS SpanNum,
+       countIfState(StatusCodeNumber = 2)                 AS SpanErrorNum
+FROM otel.otel_traces_local
+GROUP BY AppAlias,
+         StartTime;
+
+CREATE TABLE if not exists otel.otel_traces_aggbysvc_local on cluster default
+(
+    `AppAlias` LowCardinality(String),
+    `ServiceName` LowCardinality(String),
+    `StartTime` DateTime,
+    `TraceNum` AggregateFunction(count, String),
+    `TraceErrorNum` AggregateFunction(countIf, UInt8),
+    `DurationMin` SimpleAggregateFunction(min, Int64),
+    `DurationMax` SimpleAggregateFunction(max, Int64),
+    `DurationAvg` AggregateFunction(avg, Int64),
+    `DurationQuantile` AggregateFunction(quantile, Int64)
+) ENGINE = AggregatingMergeTree() PARTITION BY toDate(StartTime, 'Asia/Shanghai')
+      ORDER BY
+          (AppAlias, ServiceName, StartTime) TTL toDateTime(StartTime) + toIntervalDay(10) SETTINGS index_granularity = 8192,
+        ttl_only_drop_parts = 1;
+
+CREATE TABLE if not exists otel.otel_traces_aggbysvc on cluster default
+(
+    `AppAlias` LowCardinality(String),
+    `ServiceName` LowCardinality(String),
+    `StartTime` DateTime,
+    `TraceNum` AggregateFunction(count),
+    `TraceErrorNum` AggregateFunction(countIf, UInt8),
+    `DurationMin` SimpleAggregateFunction(min, Int64),
+    `DurationMax` SimpleAggregateFunction(max, Int64),
+    `DurationAvg` AggregateFunction(avg, Int64),
+    `DurationQuantile` AggregateFunction(quantile, Int64)
+) ENGINE = Distributed('default', 'otel', 'otel_traces_aggbysvc_local');
+
+CREATE VIEW if not exists otel.otel_traces_aggbysvc_merge
+            on cluster default (
+                                `AppAlias` LowCardinality(String),
+                                `ServiceName` LowCardinality(String),
+                                `StartTime` DateTime,
+                                `TraceNum` UInt64,
+                                `TraceErrorNum` UInt64,
+                                `DurationMin` SimpleAggregateFunction(min, Int64),
+                                `DurationMax` SimpleAggregateFunction(max, Int64),
+                                `DurationAvg` Float64,
+                                `DurationP50` Float64,
+                                `DurationP90` Float64,
+                                `DurationP99` Float64
+        )
+AS
+SELECT AppAlias,
+       ServiceName,
+       StartTime,
+       countMerge(TraceNum)                  AS TraceNum,
+       countIfMerge(TraceErrorNum)           AS TraceErrorNum,
+       min(DurationMin)                      AS DurationMin,
+       max(DurationMax)                      AS DurationMax,
+       avgMerge(DurationAvg)                 AS DurationAvg,
+       quantileMerge(0.5)(DurationQuantile)  AS DurationP50,
+       quantileMerge(0.9)(DurationQuantile)  AS DurationP90,
+       quantileMerge(0.99)(DurationQuantile) AS DurationP99
+FROM otel.otel_traces_aggbysvc
+GROUP BY AppAlias,
+         ServiceName,
+         StartTime;
+
+CREATE MATERIALIZED VIEW if not exists otel.otel_traces_aggbysvc_local_mv
+            on cluster default TO otel.otel_traces_aggbysvc_local (
+                                                                   `AppAlias` String,
+                                                                   `ServiceName` LowCardinality(String),
+                                                                   `StartTime` DateTime,
+                                                                   `TraceNum` AggregateFunction(count),
+                                                                   `TraceErrorNum` AggregateFunction(countIf, UInt8),
+                                                                   `DurationMin` Int64,
+                                                                   `DurationMax` Int64,
+                                                                   `DurationAvg` AggregateFunction(avg, Int64),
+                                                                   `DurationQuantile` AggregateFunction(quantile, Int64)
+        )
+AS
+SELECT AppAlias,
+       ServiceName,
+       toStartOfFiveMinutes(Timestamp) AS StartTime,
+       countState()                    AS TraceNum,
+       countIfState(StatusCodeNumber = 2)    AS TraceErrorNum,
+       min(Duration)                   AS DurationMin,
+       max(Duration)                   AS DurationMax,
+       avgState(Duration)              AS DurationAvg,
+       quantileState(Duration)         AS DurationQuantile
+FROM otel.otel_traces_local
+GROUP BY AppAlias,
+         ServiceName,
+         StartTime;

+ 68 - 62
src/main/java/com/cecf/observe/agentstream/Main.java

@@ -30,7 +30,9 @@ import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
 import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.OutputTag;
 import org.slf4j.Logger;
@@ -204,16 +206,20 @@ public class Main {
         DataStream<ServiceMetric> serviceMetricStreamLevel1 = endpointStream
                 .keyBy(PojoSpan::getServiceName)
                 .window(TumblingEventTimeWindows.of(Duration.ofSeconds(15)))
-                .aggregate(new ServiceMetricAggregator());
+                .aggregate(new ServiceMetricAggregator(), new ServiceMetricTimestampAdder());
         sinkServiceMetric(serviceMetricStreamLevel1, oasConfig.getSpanMetric(), Duration.ofSeconds(15));
 
-//        DataStream<ServiceMetric> serviceMetricDataStreamLevel2 = nextLevelServiceMetric(serviceMetricStreamLevel1,
-//                jobConfig.getWatermarkIdlenessSeconds(), 5);
-//        sinkServiceMetric(serviceMetricDataStreamLevel2, oasConfig.getPgConfig(), Duration.ofMinutes(5));
-//
-//        DataStream<ServiceMetric> serviceMetricDataStreamLevel3 = nextLevelServiceMetric(serviceMetricStreamLevel1,
-//                jobConfig.getWatermarkIdlenessSeconds(), 60);
-//        sinkServiceMetric(serviceMetricDataStreamLevel3, oasConfig.getPgConfig(), Duration.ofHours(1));
+        DataStream<ServiceMetric> serviceMetricDataStream5m = nextLevelServiceMetric(serviceMetricStreamLevel1,
+                jobConfig.getWatermarkIdlenessSeconds(), 5);
+        sinkServiceMetric(serviceMetricDataStream5m, oasConfig.getSpanMetric(), Duration.ofMinutes(5));
+
+        DataStream<ServiceMetric> serviceMetricDataStream20m = nextLevelServiceMetric(serviceMetricDataStream5m,
+                jobConfig.getWatermarkIdlenessSeconds(), 20);
+        sinkServiceMetric(serviceMetricDataStream20m, oasConfig.getSpanMetric(), Duration.ofMinutes(20));
+
+        DataStream<ServiceMetric> serviceMetricDataStream1h = nextLevelServiceMetric(serviceMetricDataStream20m,
+                jobConfig.getWatermarkIdlenessSeconds(), 60);
+        sinkServiceMetric(serviceMetricDataStream1h, oasConfig.getSpanMetric(), Duration.ofHours(1));
 
 //        // traceURL stream
 //        handelTraceURLStream(oasConfig.getTraceURLConfig(), baseStream);
@@ -300,63 +306,63 @@ public class Main {
                                            "quantile_90, quantile_95, quantile_99, instanceID) " +
                                            "values (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)", tableSuffix);
         serviceMetricDataStream.flatMap((FlatMapFunction<ServiceMetric, Tuple2<ServiceInstanceMeta, EndpointMetrics>>) (value, out) -> {
-            for (ServiceInstanceMetric instanceMetric : value.getServiceInstanceMetrics()) {
-                out.collect(new Tuple2<>(new ServiceInstanceMeta(value.getAppName(), value.getTimestamp(), value.getServiceName(),
-                        instanceMetric.getInstanceID()), instanceMetric.getInstanceMetrics(du)));
-            }
-        })
+                    for (ServiceInstanceMetric instanceMetric : value.getServiceInstanceMetrics()) {
+                        out.collect(new Tuple2<>(new ServiceInstanceMeta(value.getAppName(), value.getTimestamp(), value.getServiceName(),
+                                instanceMetric.getInstanceID()), instanceMetric.getInstanceMetrics(du)));
+                    }
+                })
                 .returns(
                         TypeExtractor.getForObject(new Tuple2<>(new ServiceInstanceMeta(), new EndpointMetrics()))
                 )
                 .addSink(JdbcSink.sink(
-                instanceSql,
-                (ps, t) -> {
-                    ServiceInstanceMeta meta = t.f0;
-                    EndpointMetrics em = t.f1;
-                    int httpCode300Count = 0;
-                    int httpCode400Count = 0;
-                    int httpCode500Count = 0;
-                    for (Map.Entry<Long, Integer> entry : em.getHttpCode2Count().entrySet()) {
-                        Long code = entry.getKey();
-                        if (code >= 300 && code < 400) {
-                            httpCode300Count += entry.getValue();
-                        } else if (code >= 400 && code < 500) {
-                            httpCode400Count += entry.getValue();
-                        } else if (code >= 500 && code < 600) {
-                            httpCode500Count += entry.getValue();
-                        }
-                    }
-                    int exceptionCount = 0;
-                    for (Integer value : em.getExceptionType2Count().values()) {
-                        exceptionCount += value;
-                    }
-                    int spanErrorCount = em.getSpanStatusCode2Count().getOrDefault(
-                            Status.StatusCode.STATUS_CODE_ERROR.getNumber(), 0);
-                    ps.setTimestamp(1, new Timestamp(meta.getTimestamp()));
-                    ps.setString(2, meta.getAppName());
-                    ps.setString(3, meta.getServiceName());
-                    ps.setDouble(4, em.getApdex());
-                    ps.setInt(5, httpCode300Count);
-                    ps.setInt(6, httpCode400Count);
-                    ps.setInt(7, httpCode500Count);
-                    ps.setInt(8, spanErrorCount);
-                    ps.setInt(9, exceptionCount);
-                    ps.setLong(10, em.getMaxLatencyMS());
-                    ps.setLong(11, em.getMinLatencyMS());
-                    ps.setLong(12, em.getAvgLatencyMS());
-                    ps.setLong(13, em.getRequestCount());
-                    ps.setLong(14, em.getRPM());
-                    ps.setLong(15, em.getLatencyLe500());
-                    ps.setLong(16, em.getLatencyLe1000());
-                    ps.setLong(17, em.getLatencyLe5000());
-                    ps.setDouble(18, em.getQuantileLatencyMS50());
-                    ps.setDouble(19, em.getQuantileLatencyMS90());
-                    ps.setDouble(20, em.getQuantileLatencyMS95());
-                    ps.setDouble(21, em.getQuantileLatencyMS99());
-                    ps.setString(22, meta.getInstanceId());
-                },
-                jdbcConnectionOptions
-        ));
+                        instanceSql,
+                        (ps, t) -> {
+                            ServiceInstanceMeta meta = t.f0;
+                            EndpointMetrics em = t.f1;
+                            int httpCode300Count = 0;
+                            int httpCode400Count = 0;
+                            int httpCode500Count = 0;
+                            for (Map.Entry<Long, Integer> entry : em.getHttpCode2Count().entrySet()) {
+                                Long code = entry.getKey();
+                                if (code >= 300 && code < 400) {
+                                    httpCode300Count += entry.getValue();
+                                } else if (code >= 400 && code < 500) {
+                                    httpCode400Count += entry.getValue();
+                                } else if (code >= 500 && code < 600) {
+                                    httpCode500Count += entry.getValue();
+                                }
+                            }
+                            int exceptionCount = 0;
+                            for (Integer value : em.getExceptionType2Count().values()) {
+                                exceptionCount += value;
+                            }
+                            int spanErrorCount = em.getSpanStatusCode2Count().getOrDefault(
+                                    Status.StatusCode.STATUS_CODE_ERROR.getNumber(), 0);
+                            ps.setTimestamp(1, new Timestamp(meta.getTimestamp()));
+                            ps.setString(2, meta.getAppName());
+                            ps.setString(3, meta.getServiceName());
+                            ps.setDouble(4, em.getApdex());
+                            ps.setInt(5, httpCode300Count);
+                            ps.setInt(6, httpCode400Count);
+                            ps.setInt(7, httpCode500Count);
+                            ps.setInt(8, spanErrorCount);
+                            ps.setInt(9, exceptionCount);
+                            ps.setLong(10, em.getMaxLatencyMS());
+                            ps.setLong(11, em.getMinLatencyMS());
+                            ps.setLong(12, em.getAvgLatencyMS());
+                            ps.setLong(13, em.getRequestCount());
+                            ps.setLong(14, em.getRPM());
+                            ps.setLong(15, em.getLatencyLe500());
+                            ps.setLong(16, em.getLatencyLe1000());
+                            ps.setLong(17, em.getLatencyLe5000());
+                            ps.setDouble(18, em.getQuantileLatencyMS50());
+                            ps.setDouble(19, em.getQuantileLatencyMS90());
+                            ps.setDouble(20, em.getQuantileLatencyMS95());
+                            ps.setDouble(21, em.getQuantileLatencyMS99());
+                            ps.setString(22, meta.getInstanceId());
+                        },
+                        jdbcConnectionOptions
+                ));
         String endpointSql = String.format("insert into endpoint_metrics_%s " +
                                            "(timestamp, app_name, service_name, apdex, http_code_300_count, " +
                                            "http_code_400_count, http_code_500_count, span_status_error_count, " +
@@ -441,7 +447,7 @@ public class Main {
                 .assignTimestampsAndWatermarks(watermarkStrategyLevel2)
                 .keyBy(ServiceMetric::getServiceName)
                 .window(TumblingEventTimeWindows.of(Duration.ofMinutes(windowMinutes)))
-                .aggregate(new EndpointMetricGranularityAggregator());
+                .aggregate(new ServiceMetricGranularityAggregator(), new ServiceMetricTimestampAdder());
 
     }
 

+ 3 - 0
src/main/java/com/cecf/observe/agentstream/PojoSpanSpanCSVConverter.java

@@ -21,8 +21,10 @@ public class PojoSpanSpanCSVConverter extends CHConverter implements ClickHouseS
         addString(ss.SpanID, builder);
         addString(ss.ParentSpanID, builder);
         addString(ss.TraceState, builder);
+        addString(ss.getSpanNameProcessed(), builder);
         addString(ss.SpanName, builder);
         addInt(ss.SpanKind, builder);
+        addString(ss.getSpanKindText(), builder);
         addString(ss.ServiceName, builder);
         addMap(ss.ResourceAttributes, builder);
         addString(ss.ScopeName, builder);
@@ -30,6 +32,7 @@ public class PojoSpanSpanCSVConverter extends CHConverter implements ClickHouseS
         addMap(ss.SpanAttributes, builder);
         addLong(ss.Duration, builder);
         addInt(ss.StatusCode, builder);
+        addString(ss.getStatusCodeText(), builder);
         addString(ss.StatusMessage, builder);
 
         builder.append(list2ArrayStr(ss.Events == null ? null : Stream.of(ss.Events).map(s -> formatter.format(s.getZonedDateTime())).collect(Collectors.toList())));

+ 0 - 5
src/main/java/com/cecf/observe/agentstream/metrics/ServiceMetric.java

@@ -1,9 +1,7 @@
 package com.cecf.observe.agentstream.metrics;
 
-import io.opentelemetry.proto.trace.v1.Status;
 import lombok.Getter;
 import lombok.Setter;
-import org.testcontainers.shaded.org.bouncycastle.crypto.prng.drbg.DualECPoints;
 
 import java.time.Duration;
 import java.util.ArrayList;
@@ -43,9 +41,6 @@ public class ServiceMetric {
         if (this.appName == null) {
             this.appName = appName;
         }
-        if (this.timestamp<=0) {
-            this.timestamp = timestamp;
-        }
         ServiceInstanceMetric serviceInstanceMetric = serviceInstanceMetrics.get(instanceID);
         if (serviceInstanceMetric == null) {
             ServiceInstanceMetric newInstance = new ServiceInstanceMetric(instanceID);

+ 2 - 3
src/main/java/com/cecf/observe/agentstream/metrics/EndpointMetricGranularityAggregator.java → src/main/java/com/cecf/observe/agentstream/metrics/ServiceMetricGranularityAggregator.java

@@ -1,12 +1,11 @@
 package com.cecf.observe.agentstream.metrics;
 
-import com.cecf.observe.agentstream.otel.PojoSpan;
 import org.apache.flink.api.common.functions.AggregateFunction;
 
-public class EndpointMetricGranularityAggregator implements AggregateFunction<ServiceMetric, ServiceMetric, ServiceMetric> {
+public class ServiceMetricGranularityAggregator implements AggregateFunction<ServiceMetric, ServiceMetric, ServiceMetric> {
     @Override
     public ServiceMetric createAccumulator() {
-        return null;
+        return new ServiceMetric();
     }
 
     @Override

+ 17 - 0
src/main/java/com/cecf/observe/agentstream/metrics/ServiceMetricTimestampAdder.java

@@ -0,0 +1,17 @@
+package com.cecf.observe.agentstream.metrics;
+
+import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.util.Collector;
+
+public class ServiceMetricTimestampAdder extends ProcessWindowFunction<ServiceMetric, ServiceMetric, String, TimeWindow> {
+
+    @Override
+    public void process(String s, ProcessWindowFunction<ServiceMetric, ServiceMetric,
+            String, TimeWindow>.Context context, Iterable<ServiceMetric> elements, Collector<ServiceMetric> out) throws Exception {
+        TimeWindow timeWindow = context.window();
+        ServiceMetric value = elements.iterator().next();
+        value.setTimestamp(timeWindow.getStart());
+        out.collect(value);
+    }
+}

+ 60 - 0
src/main/java/com/cecf/observe/agentstream/otel/PojoSpan.java

@@ -404,6 +404,66 @@ public class PojoSpan {
     public long getLatencyMS() {
         return (long) (Duration / 1e6);
     }
+
+    public String getSpanNameProcessed() {
+        if (SpanKind == Span.SpanKind.SPAN_KIND_CLIENT_VALUE &&
+            hasAttribute("http.url") &&
+            !hasAttribute("rpc.system")) {
+            return String.format("%s->%s", SpanName, removeQueryParamsFromURL(getAttribute("http.url")));
+        }
+        return SpanName;
+    }
+
+
+    public static String removeQueryParamsFromURL(String url) {
+        if (url == null) {
+            return null;
+        }
+        int questionMarkIndex = url.indexOf('?');
+        if (questionMarkIndex == -1) {
+            return url;
+        }
+        return url.substring(0, questionMarkIndex);
+    }
+
+    public String getSpanKindText() {
+        switch (SpanKind) {
+            case 1: {
+                return "SPAN_KIND_INTERNAL";
+            }
+            case 2: {
+                return "SPAN_KIND_SERVER";
+            }
+            case 3: {
+                return "SPAN_KIND_CLIENT";
+            }
+            case 4: {
+                return "SPAN_KIND_PRODUCER";
+            }
+            case 5: {
+                return "SPAN_KIND_CONSUMER";
+            }
+            default:
+                return "SPAN_KIND_UNSPECIFIED";
+        }
+    }
+
+    public String getStatusCodeText() {
+        switch (StatusCode) {
+            case 0: {
+                return "STATUS_CODE_UNSET";
+            }
+            case 1: {
+                return "STATUS_CODE_OK";
+            }
+            case 2: {
+
+                return "STATUS_CODE_ERROR";
+            }
+            default:
+                return "UNKNOWN";
+        }
+    }
 }
 
 

+ 1 - 1
version

@@ -1 +1 @@
-v2.5.13
+v2.5.14