Browse Source

span metrics & app metrics

liubing 3 weeks ago
parent
commit
37569cd452
28 changed files with 962 additions and 358 deletions
  1. 1 0
      conf/oas.yaml
  2. 42 0
      sql/metric_tables_1202.sql
  3. 17 4
      sql/metrics_table.sql
  4. 104 8
      sql/no-replica.sql
  5. 67 0
      src/main/java/com/cecf/observe/agentstream/ExecuteEnvBuilder.java
  6. 53 0
      src/main/java/com/cecf/observe/agentstream/I6000Stream.java
  7. 237 309
      src/main/java/com/cecf/observe/agentstream/Main.java
  8. 48 1
      src/main/java/com/cecf/observe/agentstream/OasConfig.java
  9. 79 0
      src/main/java/com/cecf/observe/agentstream/PojoSourceBuilder.java
  10. 2 2
      src/main/java/com/cecf/observe/agentstream/PojoSpanSpanCSVConverter.java
  11. 57 0
      src/main/java/com/cecf/observe/agentstream/TraceURLStream.java
  12. 9 0
      src/main/java/com/cecf/observe/agentstream/common/StringUtils.java
  13. 1 1
      src/main/java/com/cecf/observe/agentstream/i6000/TopoAggregator.java
  14. 17 0
      src/main/java/com/cecf/observe/agentstream/metrics/AppNameMetricMeta.java
  15. 2 0
      src/main/java/com/cecf/observe/agentstream/metrics/EndpointMeta.java
  16. 31 0
      src/main/java/com/cecf/observe/agentstream/metrics/EndpointMetrics.java
  17. 2 1
      src/main/java/com/cecf/observe/agentstream/metrics/ServiceInstanceMeta.java
  18. 7 2
      src/main/java/com/cecf/observe/agentstream/metrics/ServiceMetric.java
  19. 14 2
      src/main/java/com/cecf/observe/agentstream/metrics/ServiceMetricAggregator.java
  20. 8 5
      src/main/java/com/cecf/observe/agentstream/metrics/ServiceMetricMeta.java
  21. 1 1
      src/main/java/com/cecf/observe/agentstream/metrics/ServiceMetricTimestampAdder.java
  22. 25 0
      src/main/java/com/cecf/observe/agentstream/metrics/SpanTagsMetrics.java
  23. 97 0
      src/main/java/com/cecf/observe/agentstream/metrics/SpanTagsProcess.java
  24. 1 1
      src/main/java/com/cecf/observe/agentstream/otel/HttpPathFilter.java
  25. 37 18
      src/main/java/com/cecf/observe/agentstream/otel/PojoSpan.java
  26. 1 1
      src/main/java/com/cecf/observe/agentstream/otel/ServiceMetricAggregator.java
  27. 1 1
      src/main/java/com/cecf/observe/agentstream/traceurl/PojoSpan2TraceURLMapper.java
  28. 1 1
      version

+ 1 - 0
conf/oas.yaml

@@ -38,6 +38,7 @@ jobConfig:
   i6000MetricKafkaSinkParallelism: 1
   i6000Topo2BytesParallelism: 1
   i6000TopoKafkaSinkParallelism: 1
+  endpointSpanFilterParallelism: 1
   watermarkOrderlessSeconds: 10
   watermarkIdlenessSeconds: 60
   filterHttpDBParallelism: 3

+ 42 - 0
sql/metric_tables_1202.sql

@@ -0,0 +1,42 @@
+create table if not exists ob_metrics.public.app_metrics_15s
+(
+    timestamp               timestamp,
+    app_name                varchar(256),
+    apdex                   float,
+    http_code_300_count     int,
+    http_code_400_count     int,
+    http_code_500_count     int,
+    span_status_error_count int,
+    exception_count         int,
+    max_latency             bigint,
+    min_latency             bigint,
+    avg_latency             bigint,
+    request_count           bigint,
+    rpm                     bigint,
+    count_le_500            bigint,
+    count_le_1000           bigint,
+    count_le_5000           bigint,
+    quantile_50             bigint,
+    quantile_90             float,
+    quantile_95             float,
+    quantile_99             float
+);
+
+SELECT create_hypertable('app_metrics_15s', by_range('timestamp'));
+SELECT add_retention_policy('app_metrics_15s', INTERVAL '10 days');
+
+drop table if exists ob_metrics.public.span_metrics_1m;
+create table if not exists ob_metrics.public.span_metrics_1m
+(
+    timestamp        timestamp,
+    app_name         varchar(256),
+    service_name     varchar(512),
+    span_count       int,
+    http_count       int,
+    http_error_count int,
+    rpc_count        int,
+    database_count   int
+);
+
+SELECT create_hypertable('span_metrics_1m', 'timestamp');
+SELECT add_retention_policy('span_metrics_1m', INTERVAL '10 days');

+ 17 - 4
sql/metrics_table.sql

@@ -1,5 +1,5 @@
-create database ob_metrics;
-create table ob_metrics.public.service_metrics_15s
+create database if not exists ob_metrics;
+create table if not exists ob_metrics.public.service_metrics_15s
 (
     timestamp timestamp,
     app_name varchar(256),
@@ -26,7 +26,7 @@ create table ob_metrics.public.service_metrics_15s
 
 SELECT create_hypertable('service_metrics_15s', by_range('timestamp'));
 
-create table ob_metrics.public.service_instance_metrics_15s
+create table if not exists ob_metrics.public.service_instance_metrics_15s
 (
     timestamp timestamp,
     app_name varchar(256),
@@ -54,7 +54,7 @@ create table ob_metrics.public.service_instance_metrics_15s
 
 SELECT create_hypertable('service_instance_metrics_15s', by_range('timestamp'));
 
-create table ob_metrics.public.endpoint_metrics_15s
+create table if not exists ob_metrics.public.endpoint_metrics_15s
 (
     timestamp timestamp,
     app_name varchar(256),
@@ -340,3 +340,16 @@ SELECT create_hypertable('endpoint_metrics_1h', by_range('timestamp'));
 
 
 
+
+SELECT add_retention_policy('service_metrics_15s', INTERVAL '10 days');
+SELECT add_retention_policy('service_metrics_5m', INTERVAL '10 days');
+SELECT add_retention_policy('service_metrics_20m', INTERVAL '10 days');
+SELECT add_retention_policy('service_metrics_1h', INTERVAL '10 days');
+SELECT add_retention_policy('service_instance_metrics_15s', INTERVAL '10 days');
+SELECT add_retention_policy('service_instance_metrics_5m', INTERVAL '10 days');
+SELECT add_retention_policy('service_instance_metrics_20m', INTERVAL '10 days');
+SELECT add_retention_policy('service_instance_metrics_1h', INTERVAL '10 days');
+SELECT add_retention_policy('endpoint_metrics_15s', INTERVAL '10 days');
+SELECT add_retention_policy('endpoint_metrics_5m', INTERVAL '10 days');
+SELECT add_retention_policy('endpoint_metrics_20m', INTERVAL '10 days');
+SELECT add_retention_policy('endpoint_metrics_1h', INTERVAL '10 days');

+ 104 - 8
sql/no-replica.sql

@@ -69,7 +69,6 @@ WHERE
 
 
 
-drop table if exists otel.otel_traces on cluster default;
 CREATE TABLE if not exists otel.otel_traces on cluster default
 (
     `Timestamp`             DateTime64(9) CODEC (Delta(8), ZSTD(1)),
@@ -88,7 +87,7 @@ CREATE TABLE if not exists otel.otel_traces on cluster default
     `SpanAttributes` Map(LowCardinality(String), String) CODEC (ZSTD(1)),
     `Duration`              Int64 CODEC (ZSTD(1)),
     `StatusCodeNumber`            Int32 CODEC (ZSTD(1)),
-    `StatusCode`            Int32 CODEC (ZSTD(1)),
+    `StatusCode`            LowCardinality(String) CODEC(ZSTD(1)),
     `StatusMessage`         String CODEC (ZSTD(1)),
     `Events.Timestamp`      Array(DateTime64(9)) CODEC (ZSTD(1)),
     `Events.Name`           Array(LowCardinality(String)) CODEC (ZSTD(1)),
@@ -184,6 +183,7 @@ CREATE TABLE if not exists otel.otel_traces_url on cluster default
                       );
 
 
+drop table if exists otel.otel_traces_url_local_mv on cluster default;
 CREATE MATERIALIZED VIEW if not exists otel.otel_traces_url_local_mv
             on cluster default TO otel.otel_traces_url_local (
                                                               `Timestamp` DateTime64(9),
@@ -279,8 +279,8 @@ SELECT Timestamp,
        Duration,
        ServiceName,
        AppAlias
-FROM otel.otel_traces
-WHERE (SpanKind = 2)
+FROM otel.otel_traces_local
+WHERE (SpanKindNumber = 2)
   AND (Path != '')
   AND (
         ((SpanAttributes['http.method']) != '')
@@ -565,7 +565,7 @@ CREATE TABLE if not exists otel.otel_http_route_local on cluster default
     `LastPath`      String COMMENT '最近的一个url path',
     `LastTimestamp` DateTime64(3) COMMENT '最近一个来的Route的时间',
     `IsDeleted`     UInt8
-) ENGINE = MergeTree() PARTITION BY AppAlias
+) ENGINE = ReplacingMergeTree(LastTimestamp, IsDeleted) PARTITION BY AppAlias
       ORDER BY
           (AppAlias, ServiceName, Method, Route) SETTINGS index_granularity = 8192,
         ttl_only_drop_parts = 1 COMMENT '记录uri, 每个(AppAlias+ServiceName+Method)下的uri最终唯一';
@@ -768,7 +768,6 @@ CREATE TABLE if not exists otel.otel_traces_aggbyapp_local_utc on cluster defaul
       ORDER BY
           (AppAlias, StartTime) SETTINGS index_granularity = 8192;
 
-drop table if exists otel.otel_traces_aggbyapp_local_mv on cluster default;
 CREATE MATERIALIZED VIEW if not exists otel.otel_traces_aggbyapp_local_mv
             on cluster default TO otel.otel_traces_aggbyapp_local (
                                                                    `AppAlias` String,
@@ -790,7 +789,7 @@ SELECT AppAlias,
        uniqState(ServiceName)                       AS ServiceNum,
        groupUniqArrayState(ServiceName)             AS ServiceList,
        uniqState(TraceId)                           AS TraceNum,
-       uniqIfState(TraceId, StatusCode = 2)         AS TraceErrorNum,
+       uniqIfState(TraceId, StatusCodeNumber = 2)         AS TraceErrorNum,
        minIfState(Duration, ParentSpanId = '')      AS TraceDurationMin,
        maxIfState(Duration, ParentSpanId = '')      AS TraceDurationMax,
        avgIfState(Duration, ParentSpanId = '')      AS TraceDurationAvg,
@@ -886,4 +885,101 @@ SELECT AppAlias,
 FROM otel.otel_traces_local
 GROUP BY AppAlias,
          ServiceName,
-         StartTime;
+         StartTime;
+
+
+
+
+CREATE VIEW if not exists otel.otel_traces_flat_spring_boot on cluster default (
+    `Timestamp` Int64,
+    `DateTime` DateTime64(9),
+    `TraceId` String,
+    `SpanId` String,
+    `ParentSpanId` String,
+    `TraceState` String,
+    `SpanName` LowCardinality(String),
+    `SpanKind` Int8,
+    `ScopeName` String,
+    `ServiceName` LowCardinality(String),
+    `ServiceIP` String,
+    `Duration` Int64,
+    `srcIP` String,
+    `srcPort` String,
+    `targetIP` String,
+    `targetPort` String,
+    `RPCType` String,
+    `RPCName` String,
+    `RPCRequest` Map(LowCardinality(String), String),
+    `RPCResult` Int8,
+    `FuncNameSpace` String,
+    `FuncName` String,
+    `FuncLineNO` Int32,
+    `FuncResult` Int32,
+    `Status` Int32,
+    `StatusMessage` String,
+    `Exceptions` String
+) AS
+SELECT
+    toUnixTimestamp64Nano(Timestamp) AS Timestamp,
+    ot1.Timestamp AS DateTime,
+    TraceId,
+    SpanId,
+    ParentSpanId,
+    TraceState,
+    SpanNameOrigin,
+    SpanKindNumber,
+    ScopeName,
+    ServiceName,
+    srcIP AS ServiceIP,
+    Duration,
+    srcIP,
+    CAST(srcPort, 'String') AS srcPort,
+    targetIP,
+    CAST(targetPort, 'String') AS targetPort,
+    RPCType,
+    RPCName,
+    RPCRequest,
+    RPCResult,
+    FuncNameSpace,
+    FuncName,
+    FuncLineNO,
+    FuncResult,
+    StatusCodeNumber AS Status,
+    StatusMessage,
+    arrayStringConcat(Exceptions.message, ';') AS Exceptions
+FROM
+    otel.otel_traces AS ot1;
+
+
+
+CREATE MATERIALIZED VIEW if not exists otel.otel_traces_error_local_mv on cluster default TO otel.otel_traces_error_local (
+    `Timestamp` DateTime64(9),
+    `TraceId` String,
+    `SpanId` String,
+    `ParentSpanId` String,
+    `SpanName` LowCardinality(String),
+    `SpanKind` String,
+    `SpanAttributes` Map(LowCardinality(String), String),
+    `ServiceName` LowCardinality(String),
+    `Duration` Int64,
+    `StatusCode` String,
+    `StatusMessage` String,
+    `AppAlias` String
+) AS
+SELECT
+    Timestamp,
+    TraceId,
+    SpanId,
+    ParentSpanId,
+    SpanName,
+    SpanKind,
+    SpanAttributes,
+    ServiceName,
+    Duration,
+    StatusCode,
+    StatusMessage,
+    AppAlias
+FROM
+    otel.otel_traces_local
+WHERE
+    StatusCode = 'STATUS_CODE_ERROR';

+ 67 - 0
src/main/java/com/cecf/observe/agentstream/ExecuteEnvBuilder.java

@@ -0,0 +1,67 @@
+package com.cecf.observe.agentstream;
+
+import org.apache.flink.api.common.operators.SlotSharingGroup;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestartStrategyOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+import java.time.Duration;
+
+public class ExecuteEnvBuilder {
+
+    public static StreamExecutionEnvironment buildExecuteEnv(OasConfig oasConfig) {
+
+        ParameterTool parameters = ParameterTool.fromMap(oasConfig.getExecuteEnvironmentGlobalConfig());
+        OasConfig.JobConfig jobConfig = oasConfig.getJobConfig();
+        Configuration envConfig = new Configuration();
+        envConfig.set(RestartStrategyOptions.RESTART_STRATEGY, "fixed-delay");
+        envConfig.set(RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, Integer.MAX_VALUE); // number of restart attempts
+        envConfig.set(RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_DELAY, Duration.ofSeconds(10)); // delay
+
+//        envConfig.set(TaskManagerOptions.CPU_CORES, 1.0);
+
+//        envConfig.set(StateBackendOptions.STATE_BACKEND, "rocksdb");
+//
+//        envConfig.set(CheckpointingOptions.CHECKPOINT_STORAGE, "filesystem");
+//        envConfig.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, "hdfs:///checkpoints-data/");
+
+
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(envConfig);
+
+        env.getConfig().setGlobalJobParameters(parameters);
+        if (jobConfig.isDisableOperatorChaining()) {
+            env.disableOperatorChaining();
+        }
+
+//        CheckpointConfig checkPointConfig = env.getCheckpointConfig();
+//        checkPointConfig.setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
+
+//        SlotSharingGroup clickhouseSinkSSG = SlotSharingGroup.newBuilder("clickhouseSSG")
+//                .setCpuCores(2)
+//                .setTaskOffHeapMemoryMB(400)
+//                .setTaskHeapMemoryMB(200)
+//                .setManagedMemoryMB(200)
+//                .build();
+//        SlotSharingGroup metricWindowSSG = SlotSharingGroup.newBuilder("clickhouseSSG")
+//                .setCpuCores(2)
+//                .setTaskOffHeapMemoryMB(400)
+//                .setManagedMemoryMB(200)
+//                .setTaskHeapMemoryMB(200)
+//                .build();
+//        SlotSharingGroup commonSSG = SlotSharingGroup.newBuilder("clickhouseSSG")
+//                .setCpuCores(2)
+//                .setTaskOffHeapMemoryMB(400)
+//                .setManagedMemoryMB(200)
+//                .setTaskHeapMemoryMB(200)
+//                .build();
+//        env.registerSlotSharingGroup(clickhouseSinkSSG)
+//                .registerSlotSharingGroup(metricWindowSSG)
+//                .registerSlotSharingGroup(commonSSG);
+
+
+        return env;
+    }
+
+}

+ 53 - 0
src/main/java/com/cecf/observe/agentstream/I6000Stream.java

@@ -0,0 +1,53 @@
+package com.cecf.observe.agentstream;
+
+public class I6000Stream {
+//    private static void handleI6000Stream(OasConfig oasConfig, SingleOutputStreamOperator<PojoSpan> baseStream) {
+//        Properties i6000KafkaProducerSettings = new Properties();
+//        i6000KafkaProducerSettings.put("batch.size", "0");
+//        i6000KafkaProducerSettings.put("linger.ms", "0");
+//        OasConfig.I6000Config i6000Config = oasConfig.getI6000Config();
+//        OasConfig.JobConfig jobConfig = oasConfig.getJobConfig();
+//        final OutputTag<PojoSpan> httpMetricOutput = new OutputTag<>("http-metric-stream") {
+//        };
+//        Set<String> i6000IncludeAppNameSet = new HashSet<>(i6000Config.getIncludeAppNames());
+//        SingleOutputStreamOperator<PojoSpan> outputStreamOperator = baseStream
+//                .process(new ProcessFunction<PojoSpan, PojoSpan>() {
+//                    @Override
+//                    public void processElement(PojoSpan value, ProcessFunction<PojoSpan, PojoSpan>.Context ctx, Collector<PojoSpan> out) {
+//                        out.collect(value);
+//                        if (!i6000IncludeAppNameSet.contains(value.AppName)) {
+//                            return;
+//                        }
+//                        if (value.isHttpServerScopeType()) {
+//                            ctx.output(httpMetricOutput, value);
+//                        }
+//                    }
+//                })
+//                .name("旁路输出httpMetric/topo&Filter")
+//                .setParallelism(jobConfig.getProcessOutputTagParallelism());
+//
+//        OasConfig.HttpMetricConfig httpMetricConfig = oasConfig.getHttpMetricConfig();
+//        Integer i6000Metric2BytesParallelism = oasConfig.getJobConfig().getI6000Metric2BytesParallelism();
+//
+//
+//        String kafkaBootstrapServers = i6000Config.getKafkaBootstrapServer();
+//        KafkaSink<byte[]> i6000MetricSink = KafkaSink.<byte[]>builder()
+//                .setBootstrapServers(kafkaBootstrapServers)
+//                .setKafkaProducerConfig(i6000KafkaProducerSettings)
+//                .setRecordSerializer(
+//                        KafkaRecordSerializationSchema.builder()
+//                                .setTopic(i6000Config.getKafkaTopicMetric())
+//                                .setValueSerializationSchema(new I6000MessageSerializer()).build()
+//                )
+//                .build();
+//        outputStreamOperator.getSideOutput(httpMetricOutput)
+//                .windowAll(TumblingEventTimeWindows.of(Duration.ofSeconds(httpMetricConfig.getWindowIntervalSeconds())))
+//                .aggregate(new ServiceMetricAggregator(), new TimeWindowAllAdder<>())
+//                .name("aggEndPointMetrics")
+//                .flatMap(new Metric2BytesMapper(i6000Config.getMetricIntervalMinutes(), i6000Config.getAppName2CmdbID(),
+//                        i6000Config.getIncludeAppNames()))
+//                .setParallelism(i6000Metric2BytesParallelism).name("i6000Metrics2Bytes")
+//                .sinkTo(i6000MetricSink).setParallelism(jobConfig.getI6000MetricKafkaSinkParallelism()).name("i6000MetricKafka");
+//        // todo 拆解i6000与metric生成
+//    }
+}

+ 237 - 309
src/main/java/com/cecf/observe/agentstream/Main.java

@@ -1,54 +1,31 @@
 package com.cecf.observe.agentstream;
 
-import com.cecf.observe.agentstream.common.Config;
-import com.cecf.observe.agentstream.metrics.ServiceInstanceMeta;
-import com.cecf.observe.agentstream.metrics.ServiceInstanceMetric;
-import com.cecf.observe.agentstream.metrics.ServiceMetricAggregator;
+import com.cecf.observe.agentstream.common.StringUtils;
 import com.cecf.observe.agentstream.metrics.*;
-import com.cecf.observe.agentstream.otel.*;
-import com.cecf.observe.agentstream.traceurl.PojoSpan2TraceURLMapper;
-import com.cecf.observe.agentstream.traceurl.TraceURL;
-import com.cecf.observe.agentstream.traceurl.TraceURLCHConverter;
-import com.cecf.observe.agentstream.traceurl.URLAggregator;
-import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceRequest;
+import com.cecf.observe.agentstream.otel.PojoSpan;
+import com.cecf.observe.agentstream.otel.ServiceNameFilter;
 import io.opentelemetry.proto.trace.v1.Status;
-import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
 import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.serialization.DeserializationSchema;
-import org.apache.flink.api.common.serialization.SimpleStringSchema;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.api.java.utils.ParameterTool;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.RestartStrategyOptions;
 import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
 import org.apache.flink.connector.jdbc.JdbcSink;
-import org.apache.flink.connector.kafka.source.KafkaSource;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.ProcessFunction;
 import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
 import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 import org.apache.flink.util.Collector;
-import org.apache.flink.util.OutputTag;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.yaml.snakeyaml.Yaml;
 import ru.ivi.opensource.flinkclickhousesink.ClickHouseSink;
-import ru.ivi.opensource.flinkclickhousesink.model.ClickHouseClusterSettings;
-import ru.ivi.opensource.flinkclickhousesink.model.ClickHouseSinkConst;
 
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.InputStream;
 import java.sql.Timestamp;
 import java.time.Duration;
-import java.util.Collections;
-import java.util.HashMap;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
@@ -57,128 +34,9 @@ public class Main {
 
     public static final Logger logger = LoggerFactory.getLogger(Main.class);
 
-    public static DataStream<PojoSpan> buildPojoSourceStream(StreamExecutionEnvironment env, OasConfig oasConfig) {
-        SingleOutputStreamOperator<PojoSpan> pbOp = null;
-        SingleOutputStreamOperator<PojoSpan> jsonOp = null;
-        boolean enablePbSource = oasConfig.getPbSource().isEnabled();
-        boolean enableJsonSource = oasConfig.getJsonSource().isEnabled();
-        if (!enableJsonSource && !enablePbSource) {
-            throw new RuntimeException("至少配置一个Source");
-        }
-        OasConfig.JobConfig jobConfig = oasConfig.getJobConfig();
-        WatermarkStrategy<PojoSpan> watermarkStrategy = WatermarkStrategy
-                .<PojoSpan>forBoundedOutOfOrderness(Duration.ofSeconds(jobConfig.getWatermarkOrderlessSeconds()))
-                .withTimestampAssigner(
-                        (SerializableTimestampAssigner<PojoSpan>) (element, recordTimestamp) -> element.Timestamp.getTime()
-                ).withIdleness(
-                        Duration.ofSeconds(jobConfig.getWatermarkIdlenessSeconds())
-                );
-        if (enablePbSource) {
-            OasConfig.KafkaSourceConfig pbKafkaSourceConfig = oasConfig.getPbSource();
-            DeserializationSchema<ExportTraceServiceRequest> pbDeserializationSchema = new ExportTraceServiceRequestSchema();
-            KafkaSource<ExportTraceServiceRequest> pbSource =
-                    (new KafkaSourceBuilder<ExportTraceServiceRequest>()).setBrokers(pbKafkaSourceConfig.getBrokers())
-                            .setTopics(pbKafkaSourceConfig.getTopics())
-                            .setGroupID(pbKafkaSourceConfig.getConsumerGroupID())
-                            .setDeserializationSchema(pbDeserializationSchema)
-                            .build();
-            pbOp = env.fromSource(pbSource, WatermarkStrategy.noWatermarks(), "pbSource")
-                    .setParallelism(oasConfig.getJobConfig().getKafkaSourceSinkParallelism())
-                    .flatMap(new TraceServiceRequest2PojoSpanFlatMap())
-                    .setParallelism(jobConfig.getPojoParseParallelism())
-                    .name("TraceServiceRequestFlatMap2PojoSpan")
-                    .assignTimestampsAndWatermarks(watermarkStrategy)
-                    .name("assignTimestampAndWatermark")
-                    .setParallelism(jobConfig.getPojoParseParallelism());
-        }
-
-        if (enableJsonSource) {
-            OasConfig.KafkaSourceConfig jsonKafkaSourceConfig = oasConfig.getJsonSource();
-            KafkaSource<String> jsonSource =
-                    (new KafkaSourceBuilder<String>()).setBrokers(jsonKafkaSourceConfig.getBrokers())
-                            .setTopics(jsonKafkaSourceConfig.getTopics())
-                            .setGroupID(jsonKafkaSourceConfig.getConsumerGroupID())
-                            .setDeserializationSchema(new SimpleStringSchema())
-                            .build();
-            jsonOp = env.fromSource(jsonSource, WatermarkStrategy.noWatermarks(), "jsonSource")
-                    .setParallelism(oasConfig.getJobConfig().getKafkaSourceSinkParallelism())
-                    .flatMap(new JSONString2PojoSpanFlatMapFunc())
-                    .setParallelism(jobConfig.getPojoParseParallelism())
-                    .name("JsonFlatMap2PojoSpan")
-                    .assignTimestampsAndWatermarks(watermarkStrategy)
-                    .name("assignTimestampAndWatermark")
-                    .setParallelism(jobConfig.getPojoParseParallelism());
-        }
-        if (pbOp == null) {
-            return jsonOp;
-        }
-        if (jsonOp == null) {
-            return pbOp;
-        }
-        return pbOp.union(jsonOp);
-    }
-
-    private static Map<String, String> getClickhouseGlobalConfig(OasConfig.ClickhouseStreamConfig clickhouseStreamConfig) {
-        Map<String, String> globalParameters = new HashMap<>();
-        String chHosts = clickhouseStreamConfig.getChHost();
-        globalParameters.put(ClickHouseClusterSettings.CLICKHOUSE_HOSTS, chHosts);
-        globalParameters.put(ClickHouseClusterSettings.CLICKHOUSE_USER, clickhouseStreamConfig.getChUser());
-        globalParameters.put(ClickHouseClusterSettings.CLICKHOUSE_PASSWORD, clickhouseStreamConfig.getChPwd());
-
-        globalParameters.put(ClickHouseSinkConst.TIMEOUT_SEC, clickhouseStreamConfig.getChTimeoutSeconds().toString());
-        globalParameters.put(ClickHouseSinkConst.FAILED_RECORDS_PATH, "./failed_record");
-        globalParameters.put(ClickHouseSinkConst.NUM_WRITERS, clickhouseStreamConfig.getChWriterNum().toString());
-        globalParameters.put(ClickHouseSinkConst.NUM_RETRIES, clickhouseStreamConfig.getChWriteRetry().toString());
-        globalParameters.put(ClickHouseSinkConst.QUEUE_MAX_CAPACITY, clickhouseStreamConfig.getChWriteQueueSize().toString());
-        globalParameters.put(ClickHouseSinkConst.IGNORING_CLICKHOUSE_SENDING_EXCEPTION_ENABLED, "true");
-        return globalParameters;
-    }
-
-    private static StreamExecutionEnvironment buildExecuteEnv(ParameterTool parameters, OasConfig oasConfig) {
-        OasConfig.JobConfig jobConfig = oasConfig.getJobConfig();
-        Configuration envConfig = new Configuration();
-        envConfig.set(RestartStrategyOptions.RESTART_STRATEGY, "fixed-delay");
-        envConfig.set(RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, Integer.MAX_VALUE); // number of restart attempts
-        envConfig.set(RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_DELAY, Duration.ofSeconds(10)); // delay
-
-//        envConfig.set(StateBackendOptions.STATE_BACKEND, "rocksdb");
-//
-//        envConfig.set(CheckpointingOptions.CHECKPOINT_STORAGE, "filesystem");
-//        envConfig.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, "hdfs:///checkpoints-data/");
-
-
-        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(envConfig);
-        env.getConfig().setGlobalJobParameters(parameters);
-        if (jobConfig.isDisableOperatorChaining()) {
-            env.disableOperatorChaining();
-        }
-
-//        CheckpointConfig checkPointConfig = env.getCheckpointConfig();
-//        checkPointConfig.setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
-
-        return env;
-    }
-
-    public static OasConfig parseConfig() {
-        String oasConfigPath = Config.getString("ob.config.path", "/conf/oas.yaml");
-        Yaml yaml = new Yaml();
-        try {
-            InputStream inputStream = new FileInputStream(oasConfigPath);
-            OasConfig oasConfig = yaml.loadAs(inputStream, OasConfig.class);
-            logger.info("[配置]: {}", oasConfig.toString());
-            oasConfig.fillDefaults();
-            return oasConfig;
-        } catch (FileNotFoundException e) {
-            throw new RuntimeException(String.format("读取配置[%s]错误:%s", oasConfigPath, e));
-        }
-    }
-
     public static void main(String[] args) throws Exception {
-        OasConfig oasConfig = parseConfig();
-
-        ParameterTool parameters = ParameterTool.fromMap(getClickhouseGlobalConfig(oasConfig.getClickhouseStreamConfig()));
-        StreamExecutionEnvironment env = buildExecuteEnv(parameters, oasConfig);
-
+        OasConfig oasConfig = OasConfig.fromPath();
+        StreamExecutionEnvironment env = ExecuteEnvBuilder.buildExecuteEnv(oasConfig);
         FillAppNameMapFunc fillAppNameMapFunc = FillAppNameMapFunc.buildFromConfig(oasConfig.getMySqlConfig());
 
         // build base stream
@@ -189,7 +47,7 @@ public class Main {
                 .setWriteBufSize(oasConfig.getClickhouseStreamConfig().getChWriteBufSize().toString())
                 .setConverter(new PojoSpanSpanCSVConverter())
                 .build();
-        DataStream<PojoSpan> baseStream = buildPojoSourceStream(env, oasConfig)
+        DataStream<PojoSpan> baseStream = PojoSourceBuilder.buildPojoSourceStream(env, oasConfig)
                 .map(fillAppNameMapFunc)
                 .name("添加缺失的AppName")
                 .setParallelism(jobConfig.getAttachAppNameParallelism())
@@ -197,29 +55,166 @@ public class Main {
                 .name("过滤ServiceName")
                 .setParallelism(jobConfig.getFilterByServiceNameParallelism());
         // clickhouse traces stream
+//        SlotSharingGroup clickhouseSinkSSG = SlotSharingGroup.newBuilder("clickhouseSSG")
+//                .setCpuCores(1)
+//                .setManagedMemoryMB(300)
+//                .setTaskOffHeapMemoryMB(500)
+//                .setTaskHeapMemoryMB(2000)
+//                .build();
+//        env.registerSlotSharingGroup(clickhouseSinkSSG);
         baseStream
                 .addSink(tracesClickhouseSink)
                 .setParallelism(jobConfig.getClickhouseSinkParallelism())
+//                .slotSharingGroup(clickhouseSinkSSG)
                 .name("ClickhouseSink");
 
-        DataStream<PojoSpan> endpointStream = baseStream.filter(new EndpointPojoFilter());
+        OasConfig.SpanMetricConfig configSpanMetric = oasConfig.getSpanMetric();
+        JdbcConnectionOptions jdbcConnectionOptions = new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
+                .withUrl(configSpanMetric.getJdbcUrl())
+                .withDriverName("org.postgresql.Driver")
+                .withUsername(configSpanMetric.getPgUser())
+                .withPassword(configSpanMetric.getPgPwd())
+                .build();
+        (new SpanTagsProcess.SpanTagsProcessBuilder())
+                .setWindowDuration(Duration.ofMinutes(1))
+                .setJdbcConnectionOptions(jdbcConnectionOptions)
+                .buildStream(baseStream);
+
+        DataStream<PojoSpan> endpointStream = baseStream
+                .filter(new EndpointPojoFilter())
+                .setParallelism(jobConfig.getEndpointSpanFilterParallelism())
+                .name("filterEndpointSpans");
         DataStream<ServiceMetric> serviceMetricStreamLevel1 = endpointStream
                 .keyBy(PojoSpan::getServiceName)
                 .window(TumblingEventTimeWindows.of(Duration.ofSeconds(15)))
                 .aggregate(new ServiceMetricAggregator(), new ServiceMetricTimestampAdder());
-        sinkServiceMetric(serviceMetricStreamLevel1, oasConfig.getSpanMetric(), Duration.ofSeconds(15));
 
-        DataStream<ServiceMetric> serviceMetricDataStream5m = nextLevelServiceMetric(serviceMetricStreamLevel1,
-                jobConfig.getWatermarkIdlenessSeconds(), 5);
-        sinkServiceMetric(serviceMetricDataStream5m, oasConfig.getSpanMetric(), Duration.ofMinutes(5));
+        Duration du15s = Duration.ofSeconds(15);
+        DataStream<Tuple2<ServiceMetricMeta, EndpointMetrics>> serviceOnlyStream = serviceMetricStreamLevel1
+                .map((MapFunction<ServiceMetric, Tuple2<ServiceMetricMeta, EndpointMetrics>>) (v) -> new Tuple2<>(
+                        new ServiceMetricMeta(v.getAppName(), v.getTimestamp(), v.getServiceName()), v.getServerMetrics(du15s)))
+                .name("serviceMetric2ServiceMeta+EndpointMetrics")
+                .returns(
+                        TypeExtractor.getForObject(new Tuple2<>(new ServiceMetricMeta(), new EndpointMetrics()))
+                );
+        sinkServiceOnlyMetric(serviceOnlyStream, jdbcConnectionOptions, du15s);
+
+        DataStream<Tuple2<AppNameMetricMeta, EndpointMetrics>> appNameMetricsStream = serviceOnlyStream
+                .assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10)))
+                .keyBy(t -> t.f0.getAppName())
+                .window(TumblingEventTimeWindows.of(du15s))
+                .process(new ProcessWindowFunction<>() {
+                    @Override
+                    public void process(String s, ProcessWindowFunction<Tuple2<ServiceMetricMeta, EndpointMetrics>,
+                            Tuple2<AppNameMetricMeta, EndpointMetrics>, String, TimeWindow>.Context context,
+                                        Iterable<Tuple2<ServiceMetricMeta, EndpointMetrics>> elements,
+                                        Collector<Tuple2<AppNameMetricMeta, EndpointMetrics>> out) throws Exception {
+                        List<EndpointMetrics> endpointMetrics = new ArrayList<>();
+                        for (Tuple2<ServiceMetricMeta, EndpointMetrics> element : elements) {
+                            endpointMetrics.add(element.f1);
+                        }
+                        EndpointMetrics agg = new EndpointMetrics(endpointMetrics);
+                        out.collect(new Tuple2<>(new AppNameMetricMeta(s, context.window().getEnd()), agg));
+                    }
+                });
+        sinkAppNameMetric(appNameMetricsStream, jdbcConnectionOptions, du15s);
 
-        DataStream<ServiceMetric> serviceMetricDataStream20m = nextLevelServiceMetric(serviceMetricDataStream5m,
-                jobConfig.getWatermarkIdlenessSeconds(), 20);
-        sinkServiceMetric(serviceMetricDataStream20m, oasConfig.getSpanMetric(), Duration.ofMinutes(20));
 
-        DataStream<ServiceMetric> serviceMetricDataStream1h = nextLevelServiceMetric(serviceMetricDataStream20m,
-                jobConfig.getWatermarkIdlenessSeconds(), 60);
-        sinkServiceMetric(serviceMetricDataStream1h, oasConfig.getSpanMetric(), Duration.ofHours(1));
+        DataStream<Tuple2<ServiceInstanceMeta, EndpointMetrics>> serviceInstanceOnlyStream = serviceMetricStreamLevel1
+                .flatMap((FlatMapFunction<ServiceMetric, Tuple2<ServiceInstanceMeta, EndpointMetrics>>) (value, out) -> {
+                    for (ServiceInstanceMetric instanceMetric : value.getServiceInstanceMetrics()) {
+                        out.collect(new Tuple2<>(new ServiceInstanceMeta(value.getAppName(), value.getTimestamp(), value.getServiceName(),
+                                instanceMetric.getInstanceID()), instanceMetric.getInstanceMetrics(du15s)));
+                    }
+                })
+                .name("serviceMetric2InstanceMeta+EndpointMetrics")
+                .returns(
+                        TypeExtractor.getForObject(new Tuple2<>(new ServiceInstanceMeta(), new EndpointMetrics()))
+                );
+        sinkServiceInstanceOnlyMetric(serviceInstanceOnlyStream, jdbcConnectionOptions, du15s);
+        DataStream<Tuple2<EndpointMeta, EndpointMetrics>> endpointOnlyMetricStream = serviceMetricStreamLevel1
+                .flatMap((FlatMapFunction<ServiceMetric, Tuple2<EndpointMeta, EndpointMetrics>>) (value, out) -> {
+                    for (ServiceInstanceMetric serviceInstanceMetric : value.getServiceInstanceMetrics()) {
+                        for (EndpointMetric endpointMetric : serviceInstanceMetric.getEndpointMetrics()) {
+                            out.collect(new Tuple2<>(
+                                    new EndpointMeta(value.getAppName(), value.getTimestamp(), value.getServiceName(), serviceInstanceMetric.getInstanceID(),
+                                            endpointMetric.getEndpoint()), endpointMetric.getMetrics(du15s)));
+                        }
+                    }
+                })
+                .name("serviceMetric2EndpointMeta+EndpointMetrics")
+                .returns(
+                        TypeExtractor.getForObject(new Tuple2<>(new EndpointMeta(), new EndpointMetrics()))
+                );
+        sinkEndpointOnlyMetric(endpointOnlyMetricStream, jdbcConnectionOptions, du15s);
+
+        List<Duration> granularity = List.of(Duration.ofMinutes(5), Duration.ofMinutes(20), Duration.ofMinutes(60));
+        DataStream<Tuple2<ServiceMetricMeta, EndpointMetrics>> preLevelServiceOnlyStream = serviceOnlyStream;
+        DataStream<Tuple2<ServiceInstanceMeta, EndpointMetrics>> preLevelServiceInstanceOnlyStream = serviceInstanceOnlyStream;
+        DataStream<Tuple2<EndpointMeta, EndpointMetrics>> preLevelEndpointOnlyMetricStream = endpointOnlyMetricStream;
+        for (Duration du : granularity) {
+            String duStr = StringUtils.getDurationString(du);
+            DataStream<Tuple2<ServiceMetricMeta, EndpointMetrics>> serviceMetricDataStreamDu = preLevelServiceOnlyStream
+                    .keyBy((t) -> t.f0)
+                    .window(TumblingEventTimeWindows.of(du))
+                    .process(new ProcessWindowFunction<Tuple2<ServiceMetricMeta, EndpointMetrics>, Tuple2<ServiceMetricMeta, EndpointMetrics>, ServiceMetricMeta, TimeWindow>() {
+                        @Override
+                        public void process(ServiceMetricMeta serviceMetricMeta,
+                                            ProcessWindowFunction<Tuple2<ServiceMetricMeta, EndpointMetrics>, Tuple2<ServiceMetricMeta, EndpointMetrics>, ServiceMetricMeta, TimeWindow>.Context context,
+                                            Iterable<Tuple2<ServiceMetricMeta, EndpointMetrics>> elements, Collector<Tuple2<ServiceMetricMeta, EndpointMetrics>> out) throws Exception {
+                            List<EndpointMetrics> endpointMetrics = new ArrayList<>();
+                            for (Tuple2<ServiceMetricMeta, EndpointMetrics> element : elements) {
+                                endpointMetrics.add(element.f1);
+                            }
+                            EndpointMetrics agg = new EndpointMetrics(endpointMetrics);
+                            serviceMetricMeta.setTimestamp(context.window().getEnd());
+                            out.collect(new Tuple2<>(serviceMetricMeta, agg));
+                        }
+                    })
+                    .name(String.format("serviceMetric%s", duStr));
+            sinkServiceOnlyMetric(serviceMetricDataStreamDu, jdbcConnectionOptions, du);
+            preLevelServiceOnlyStream = serviceMetricDataStreamDu;
+            DataStream<Tuple2<ServiceInstanceMeta, EndpointMetrics>> serviceInstanceStreamDu = preLevelServiceInstanceOnlyStream
+                    .keyBy((t) -> t.f0)
+                    .window(TumblingEventTimeWindows.of(du))
+                    .process(new ProcessWindowFunction<Tuple2<ServiceInstanceMeta, EndpointMetrics>, Tuple2<ServiceInstanceMeta, EndpointMetrics>, ServiceInstanceMeta, TimeWindow>() {
+                        @Override
+                        public void process(ServiceInstanceMeta serviceMetricMeta,
+                                            ProcessWindowFunction<Tuple2<ServiceInstanceMeta, EndpointMetrics>, Tuple2<ServiceInstanceMeta, EndpointMetrics>, ServiceInstanceMeta, TimeWindow>.Context context,
+                                            Iterable<Tuple2<ServiceInstanceMeta, EndpointMetrics>> elements, Collector<Tuple2<ServiceInstanceMeta, EndpointMetrics>> out) throws Exception {
+                            List<EndpointMetrics> endpointMetrics = new ArrayList<>();
+                            for (Tuple2<ServiceInstanceMeta, EndpointMetrics> element : elements) {
+                                endpointMetrics.add(element.f1);
+                            }
+                            EndpointMetrics agg = new EndpointMetrics(endpointMetrics);
+                            serviceMetricMeta.setTimestamp(context.window().getEnd());
+                            out.collect(new Tuple2<>(serviceMetricMeta, agg));
+                        }
+                    })
+                    .name(String.format("instanceMetric%s", duStr));
+            sinkServiceInstanceOnlyMetric(serviceInstanceStreamDu, jdbcConnectionOptions, du);
+            preLevelServiceInstanceOnlyStream = serviceInstanceStreamDu;
+            DataStream<Tuple2<EndpointMeta, EndpointMetrics>> endpointStreamDu = preLevelEndpointOnlyMetricStream
+                    .keyBy((t) -> t.f0)
+                    .window(TumblingEventTimeWindows.of(du))
+                    .process(new ProcessWindowFunction<Tuple2<EndpointMeta, EndpointMetrics>, Tuple2<EndpointMeta, EndpointMetrics>, EndpointMeta, TimeWindow>() {
+                        @Override
+                        public void process(EndpointMeta serviceMetricMeta,
+                                            ProcessWindowFunction<Tuple2<EndpointMeta, EndpointMetrics>, Tuple2<EndpointMeta, EndpointMetrics>, EndpointMeta, TimeWindow>.Context context,
+                                            Iterable<Tuple2<EndpointMeta, EndpointMetrics>> elements, Collector<Tuple2<EndpointMeta, EndpointMetrics>> out) throws Exception {
+                            List<EndpointMetrics> endpointMetrics = new ArrayList<>();
+                            for (Tuple2<EndpointMeta, EndpointMetrics> element : elements) {
+                                endpointMetrics.add(element.f1);
+                            }
+                            EndpointMetrics agg = new EndpointMetrics(endpointMetrics);
+                            serviceMetricMeta.setTimestamp(context.window().getEnd());
+                            out.collect(new Tuple2<>(serviceMetricMeta, agg));
+                        }
+                    })
+                    .name(String.format("endpointMetric%s", duStr));
+            sinkEndpointOnlyMetric(endpointStreamDu, jdbcConnectionOptions, du);
+            preLevelEndpointOnlyMetricStream = endpointStreamDu;
+        }
 
 //        // traceURL stream
 //        handelTraceURLStream(oasConfig.getTraceURLConfig(), baseStream);
@@ -230,9 +225,66 @@ public class Main {
         env.execute(oasConfig.getJobConfig().getJobName());
     }
 
-    private static void sinkServiceMetric(DataStream<ServiceMetric> serviceMetricDataStream, OasConfig.SpanMetricConfig pgConfig, Duration du) {
-        String tableSuffix = du.toHours() >= 1 ? String.format("%dh", du.toHours()) :
-                (du.toMinutes() >= 1 ? String.format("%dm", du.toMinutes()) : String.format("%ds", du.getSeconds()));
+    private static void sinkAppNameMetric(DataStream<Tuple2<AppNameMetricMeta, EndpointMetrics>> serviceMetricDataStream,
+                                          JdbcConnectionOptions jdbcConnectionOptions, Duration du) {
+        String tableSuffix = StringUtils.getDurationString(du);
+        String serviceSql = String.format("insert into app_metrics_%s " +
+                                          "(timestamp, app_name, apdex, http_code_300_count, " +
+                                          "http_code_400_count, http_code_500_count, span_status_error_count, " +
+                                          "exception_count, max_latency, min_latency, avg_latency, request_count," +
+                                          " rpm, count_le_500, count_le_1000, count_le_5000, quantile_50, " +
+                                          "quantile_90, quantile_95, quantile_99) " +
+                                          "values (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)", tableSuffix);
+        serviceMetricDataStream
+                .addSink(JdbcSink.sink(
+                        serviceSql,
+                        (ps, t) -> {
+                            AppNameMetricMeta meta = t.f0;
+                            EndpointMetrics em = t.f1;
+                            int httpCode300Count = 0;
+                            int httpCode400Count = 0;
+                            int httpCode500Count = 0;
+                            for (Map.Entry<Long, Integer> entry : em.getHttpCode2Count().entrySet()) {
+                                Long code = entry.getKey();
+                                if (code >= 300 && code < 400) {
+                                    httpCode300Count += entry.getValue();
+                                } else if (code >= 400 && code < 500) {
+                                    httpCode400Count += entry.getValue();
+                                } else if (code >= 500 && code < 600) {
+                                    httpCode500Count += entry.getValue();
+                                }
+                            }
+                            int exceptionCount = 0;
+                            for (Integer value : em.getExceptionType2Count().values()) {
+                                exceptionCount += value;
+                            }
+                            int spanErrorCount = em.getSpanStatusCode2Count().getOrDefault(Status.StatusCode.STATUS_CODE_ERROR.getNumber(), 0);
+                            ps.setTimestamp(1, new Timestamp(meta.getTimestamp()));
+                            ps.setString(2, meta.getAppName());
+                            ps.setDouble(3, em.getApdex());
+                            ps.setInt(4, httpCode300Count);
+                            ps.setInt(5, httpCode400Count);
+                            ps.setInt(6, httpCode500Count);
+                            ps.setInt(7, spanErrorCount);
+                            ps.setInt(8, exceptionCount);
+                            ps.setLong(9, em.getMaxLatencyMS());
+                            ps.setLong(10, em.getMinLatencyMS());
+                            ps.setLong(11, em.getAvgLatencyMS());
+                            ps.setLong(12, em.getRequestCount());
+                            ps.setLong(13, em.getRPM());
+                            ps.setLong(14, em.getLatencyLe500());
+                            ps.setLong(15, em.getLatencyLe1000());
+                            ps.setLong(16, em.getLatencyLe5000());
+                            ps.setDouble(17, em.getQuantileLatencyMS50());
+                            ps.setDouble(18, em.getQuantileLatencyMS90());
+                            ps.setDouble(19, em.getQuantileLatencyMS95());
+                            ps.setDouble(20, em.getQuantileLatencyMS99());
+                        },
+                        jdbcConnectionOptions));
+    }
+
+    private static void sinkServiceOnlyMetric(DataStream<Tuple2<ServiceMetricMeta, EndpointMetrics>> serviceMetricDataStream, JdbcConnectionOptions jdbcConnectionOptions, Duration du) {
+        String tableSuffix = StringUtils.getDurationString(du);
         String serviceSql = String.format("insert into service_metrics_%s " +
                                           "(timestamp, app_name, service_name, apdex, http_code_300_count, " +
                                           "http_code_400_count, http_code_500_count, span_status_error_count, " +
@@ -240,18 +292,7 @@ public class Main {
                                           " rpm, count_le_500, count_le_1000, count_le_5000, quantile_50, " +
                                           "quantile_90, quantile_95, quantile_99) " +
                                           "values (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)", tableSuffix);
-        JdbcConnectionOptions jdbcConnectionOptions = new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
-                .withUrl(pgConfig.getJdbcUrl())
-                .withDriverName("org.postgresql.Driver")
-                .withUsername(pgConfig.getPgUser())
-                .withPassword(pgConfig.getPgPwd())
-                .build();
         serviceMetricDataStream
-                .map((MapFunction<ServiceMetric, Tuple2<ServiceMetricMeta, EndpointMetrics>>) (v) -> new Tuple2<>(
-                        new ServiceMetricMeta(v.getAppName(), v.getTimestamp(), v.getServiceName()), v.getServerMetrics(du)))
-                .returns(
-                        TypeExtractor.getForObject(new Tuple2<>(new ServiceMetricMeta(), new EndpointMetrics()))
-                )
                 .addSink(JdbcSink.sink(
                         serviceSql,
                         (ps, t) -> {
@@ -298,26 +339,22 @@ public class Main {
                             ps.setDouble(21, em.getQuantileLatencyMS99());
                         },
                         jdbcConnectionOptions));
-        String instanceSql = String.format("insert into service_instance_metrics_%s " +
+    }
+
+    private static void sinkEndpointOnlyMetric(DataStream<Tuple2<EndpointMeta, EndpointMetrics>> endpointOnlyMetricStream, JdbcConnectionOptions jdbcConnectionOptions, Duration du15s) {
+        String tableSuffix = StringUtils.getDurationString(du15s);
+        String endpointSql = String.format("insert into endpoint_metrics_%s " +
                                            "(timestamp, app_name, service_name, apdex, http_code_300_count, " +
                                            "http_code_400_count, http_code_500_count, span_status_error_count, " +
                                            "exception_count, max_latency, min_latency, avg_latency, request_count," +
                                            " rpm, count_le_500, count_le_1000, count_le_5000, quantile_50, " +
-                                           "quantile_90, quantile_95, quantile_99, instanceID) " +
-                                           "values (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)", tableSuffix);
-        serviceMetricDataStream.flatMap((FlatMapFunction<ServiceMetric, Tuple2<ServiceInstanceMeta, EndpointMetrics>>) (value, out) -> {
-                    for (ServiceInstanceMetric instanceMetric : value.getServiceInstanceMetrics()) {
-                        out.collect(new Tuple2<>(new ServiceInstanceMeta(value.getAppName(), value.getTimestamp(), value.getServiceName(),
-                                instanceMetric.getInstanceID()), instanceMetric.getInstanceMetrics(du)));
-                    }
-                })
-                .returns(
-                        TypeExtractor.getForObject(new Tuple2<>(new ServiceInstanceMeta(), new EndpointMetrics()))
-                )
+                                           "quantile_90, quantile_95, quantile_99, instanceID, endpoint) " +
+                                           "values (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)", tableSuffix);
+        endpointOnlyMetricStream
                 .addSink(JdbcSink.sink(
-                        instanceSql,
+                        endpointSql,
                         (ps, t) -> {
-                            ServiceInstanceMeta meta = t.f0;
+                            EndpointMeta meta = t.f0;
                             EndpointMetrics em = t.f1;
                             int httpCode300Count = 0;
                             int httpCode400Count = 0;
@@ -360,32 +397,27 @@ public class Main {
                             ps.setDouble(20, em.getQuantileLatencyMS95());
                             ps.setDouble(21, em.getQuantileLatencyMS99());
                             ps.setString(22, meta.getInstanceId());
+                            String endpoint = meta.getEndpoint();
+                            ps.setString(23, endpoint.length() > 1024 ? endpoint.substring(0, 1024) : endpoint);
                         },
                         jdbcConnectionOptions
                 ));
-        String endpointSql = String.format("insert into endpoint_metrics_%s " +
+    }
+
+    private static void sinkServiceInstanceOnlyMetric(DataStream<Tuple2<ServiceInstanceMeta, EndpointMetrics>> serviceMetricDataStream, JdbcConnectionOptions jdbcConnectionOptions, Duration du) {
+        String tableSuffix = StringUtils.getDurationString(du);
+        String instanceSql = String.format("insert into service_instance_metrics_%s " +
                                            "(timestamp, app_name, service_name, apdex, http_code_300_count, " +
                                            "http_code_400_count, http_code_500_count, span_status_error_count, " +
                                            "exception_count, max_latency, min_latency, avg_latency, request_count," +
                                            " rpm, count_le_500, count_le_1000, count_le_5000, quantile_50, " +
-                                           "quantile_90, quantile_95, quantile_99, instanceID, endpoint) " +
-                                           "values (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)", tableSuffix);
-        serviceMetricDataStream.flatMap((FlatMapFunction<ServiceMetric, Tuple2<EndpointMeta, EndpointMetrics>>) (value, out) -> {
-                    for (ServiceInstanceMetric serviceInstanceMetric : value.getServiceInstanceMetrics()) {
-                        for (EndpointMetric endpointMetric : serviceInstanceMetric.getEndpointMetrics()) {
-                            out.collect(new Tuple2<>(
-                                    new EndpointMeta(value.getAppName(), value.getTimestamp(), value.getServiceName(), serviceInstanceMetric.getInstanceID(),
-                                            endpointMetric.getEndpoint()), endpointMetric.getMetrics(du)));
-                        }
-                    }
-                })
-                .returns(
-                        TypeExtractor.getForObject(new Tuple2<>(new EndpointMeta(), new EndpointMetrics()))
-                )
+                                           "quantile_90, quantile_95, quantile_99, instanceID) " +
+                                           "values (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)", tableSuffix);
+        serviceMetricDataStream
                 .addSink(JdbcSink.sink(
-                        endpointSql,
+                        instanceSql,
                         (ps, t) -> {
-                            EndpointMeta meta = t.f0;
+                            ServiceInstanceMeta meta = t.f0;
                             EndpointMetrics em = t.f1;
                             int httpCode300Count = 0;
                             int httpCode400Count = 0;
@@ -428,28 +460,11 @@ public class Main {
                             ps.setDouble(20, em.getQuantileLatencyMS95());
                             ps.setDouble(21, em.getQuantileLatencyMS99());
                             ps.setString(22, meta.getInstanceId());
-                            ps.setString(23, meta.getEndpoint());
                         },
                         jdbcConnectionOptions
                 ));
     }
 
-    private static DataStream<ServiceMetric> nextLevelServiceMetric(DataStream<ServiceMetric> base, int idleness, int windowMinutes) {
-        WatermarkStrategy<ServiceMetric> watermarkStrategyLevel2 = WatermarkStrategy
-                .<ServiceMetric>forBoundedOutOfOrderness(Duration.ofSeconds(0))
-                .withTimestampAssigner(
-                        (SerializableTimestampAssigner<ServiceMetric>) (element, recordTimestamp) -> element.getTimestamp()
-                ).withIdleness(
-                        Duration.ofSeconds(idleness)
-                );
-
-        return base
-                .assignTimestampsAndWatermarks(watermarkStrategyLevel2)
-                .keyBy(ServiceMetric::getServiceName)
-                .window(TumblingEventTimeWindows.of(Duration.ofMinutes(windowMinutes)))
-                .aggregate(new ServiceMetricGranularityAggregator(), new ServiceMetricTimestampAdder());
-
-    }
 
     private static void handleSpanMetrics(OasConfig.SpanMetricConfig spanMetric, SingleOutputStreamOperator<PojoSpan> mainStreamOperator) {
         if (!spanMetric.isEnabled()) {
@@ -477,92 +492,5 @@ public class Main {
 //
     }
 
-    private static void handelTraceURLStream(OasConfig.TraceURLConfig traceURL, SingleOutputStreamOperator<PojoSpan> mainStreamOperator) {
-        if (!traceURL.isEnabled()) {
-            return;
-        }
-        final OutputTag<PojoSpan> traceURLOutputTag = new OutputTag<>("traceURL") {
-        };
-        mainStreamOperator.process(new PojoSpanOutputTagProcessFunc(Collections.singletonList(traceURLOutputTag)));
-        String traceURLTableName = traceURL.getSinkTableName();
-        Integer traceURLWriteBufSize = traceURL.getWriteBufSize();
-        ClickHouseSink<TraceURL> traceURLClickHouseSink = (new ClickhouseSinkBuilder<TraceURL>()).
-                setTableName(traceURLTableName).
-                setWriteBufSize(traceURLWriteBufSize.toString())
-                .setConverter(new TraceURLCHConverter())
-                .build();
-
-        final OutputTag<TraceURL> otURLMappingOutput = new OutputTag<>("otURLMapping") {
-        };
-        SingleOutputStreamOperator<TraceURL> traceURLSingleOutputStreamOperator = mainStreamOperator
-                .getSideOutput(traceURLOutputTag)
-                .filter(new SpanKindFilter(OtelConst.SpanKindServer))
-                .name("filterSpanKindServer")
-                .filter(new ScopeTypeFilter(SpanScopeType.SPAN_SCOPE_TYPE_HTTP_SERVER))
-                .name("filterHTTPScope")
-                .map(new PojoSpan2TraceURLMapper())
-                .name("pojoSpan2TraceURL")
-                .process(new ProcessFunction<>() {
-                    @Override
-                    public void processElement(TraceURL traceURL, ProcessFunction<TraceURL, TraceURL>.Context context, Collector<TraceURL> collector) {
-                        collector.collect(traceURL);
-                        context.output(otURLMappingOutput, traceURL);
-                    }
-                });
-        traceURLSingleOutputStreamOperator.windowAll(TumblingEventTimeWindows.of(Duration.ofSeconds(30)))
-                .aggregate(new URLAggregator())
-                .name("aggregateByASMR")
-                .print();
-        traceURLSingleOutputStreamOperator.addSink(traceURLClickHouseSink).name("Sink2Clickhouse");
-    }
 
-//    private static void handleI6000Stream(OasConfig oasConfig, SingleOutputStreamOperator<PojoSpan> baseStream) {
-//        Properties i6000KafkaProducerSettings = new Properties();
-//        i6000KafkaProducerSettings.put("batch.size", "0");
-//        i6000KafkaProducerSettings.put("linger.ms", "0");
-//        OasConfig.I6000Config i6000Config = oasConfig.getI6000Config();
-//        OasConfig.JobConfig jobConfig = oasConfig.getJobConfig();
-//        final OutputTag<PojoSpan> httpMetricOutput = new OutputTag<>("http-metric-stream") {
-//        };
-//        Set<String> i6000IncludeAppNameSet = new HashSet<>(i6000Config.getIncludeAppNames());
-//        SingleOutputStreamOperator<PojoSpan> outputStreamOperator = baseStream
-//                .process(new ProcessFunction<PojoSpan, PojoSpan>() {
-//                    @Override
-//                    public void processElement(PojoSpan value, ProcessFunction<PojoSpan, PojoSpan>.Context ctx, Collector<PojoSpan> out) {
-//                        out.collect(value);
-//                        if (!i6000IncludeAppNameSet.contains(value.AppName)) {
-//                            return;
-//                        }
-//                        if (value.isHttpServerScopeType()) {
-//                            ctx.output(httpMetricOutput, value);
-//                        }
-//                    }
-//                })
-//                .name("旁路输出httpMetric/topo&Filter")
-//                .setParallelism(jobConfig.getProcessOutputTagParallelism());
-//
-//        OasConfig.HttpMetricConfig httpMetricConfig = oasConfig.getHttpMetricConfig();
-//        Integer i6000Metric2BytesParallelism = oasConfig.getJobConfig().getI6000Metric2BytesParallelism();
-//
-//
-//        String kafkaBootstrapServers = i6000Config.getKafkaBootstrapServer();
-//        KafkaSink<byte[]> i6000MetricSink = KafkaSink.<byte[]>builder()
-//                .setBootstrapServers(kafkaBootstrapServers)
-//                .setKafkaProducerConfig(i6000KafkaProducerSettings)
-//                .setRecordSerializer(
-//                        KafkaRecordSerializationSchema.builder()
-//                                .setTopic(i6000Config.getKafkaTopicMetric())
-//                                .setValueSerializationSchema(new I6000MessageSerializer()).build()
-//                )
-//                .build();
-//        outputStreamOperator.getSideOutput(httpMetricOutput)
-//                .windowAll(TumblingEventTimeWindows.of(Duration.ofSeconds(httpMetricConfig.getWindowIntervalSeconds())))
-//                .aggregate(new ServiceMetricAggregator(), new TimeWindowAllAdder<>())
-//                .name("aggEndPointMetrics")
-//                .flatMap(new Metric2BytesMapper(i6000Config.getMetricIntervalMinutes(), i6000Config.getAppName2CmdbID(),
-//                        i6000Config.getIncludeAppNames()))
-//                .setParallelism(i6000Metric2BytesParallelism).name("i6000Metrics2Bytes")
-//                .sinkTo(i6000MetricSink).setParallelism(jobConfig.getI6000MetricKafkaSinkParallelism()).name("i6000MetricKafka");
-//        // todo 拆解i6000与metric生成
-//    }
 }

+ 48 - 1
src/main/java/com/cecf/observe/agentstream/OasConfig.java

@@ -1,20 +1,64 @@
 package com.cecf.observe.agentstream;
 
+import com.cecf.observe.agentstream.common.Config;
+import com.typesafe.sslconfig.akka.util.AkkaLoggerBridge;
 import lombok.Data;
 import lombok.Getter;
 import lombok.Setter;
 import lombok.ToString;
-
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.yaml.snakeyaml.Yaml;
+import ru.ivi.opensource.flinkclickhousesink.model.ClickHouseClusterSettings;
+import ru.ivi.opensource.flinkclickhousesink.model.ClickHouseSinkConst;
+
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.InputStream;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.logging.LogManager;
 
 
 @ToString()
 @Setter
 @Getter
 public class OasConfig {
+
+    private static final Logger logger = LoggerFactory.getLogger(OasConfig.class);
+
+    public static OasConfig fromPath() {
+        String oasConfigPath = Config.getString("ob.config.path", "/conf/oas.yaml");
+        Yaml yaml = new Yaml();
+        try {
+            InputStream inputStream = new FileInputStream(oasConfigPath);
+            OasConfig oasConfig = yaml.loadAs(inputStream, OasConfig.class);
+            logger.info("[配置]: {}", oasConfig.toString());
+            oasConfig.fillDefaults();
+            return oasConfig;
+        } catch (FileNotFoundException e) {
+            throw new RuntimeException(String.format("读取配置[%s]错误:%s", oasConfigPath, e));
+        }
+    }
+
+    public Map<String, String> getExecuteEnvironmentGlobalConfig() {
+        Map<String, String> globalParameters = new HashMap<>();
+        String chHosts = clickhouseStreamConfig.getChHost();
+        globalParameters.put(ClickHouseClusterSettings.CLICKHOUSE_HOSTS, chHosts);
+        globalParameters.put(ClickHouseClusterSettings.CLICKHOUSE_USER, clickhouseStreamConfig.getChUser());
+        globalParameters.put(ClickHouseClusterSettings.CLICKHOUSE_PASSWORD, clickhouseStreamConfig.getChPwd());
+
+        globalParameters.put(ClickHouseSinkConst.TIMEOUT_SEC, clickhouseStreamConfig.getChTimeoutSeconds().toString());
+        globalParameters.put(ClickHouseSinkConst.FAILED_RECORDS_PATH, "./failed_record");
+        globalParameters.put(ClickHouseSinkConst.NUM_WRITERS, clickhouseStreamConfig.getChWriterNum().toString());
+        globalParameters.put(ClickHouseSinkConst.NUM_RETRIES, clickhouseStreamConfig.getChWriteRetry().toString());
+        globalParameters.put(ClickHouseSinkConst.QUEUE_MAX_CAPACITY, clickhouseStreamConfig.getChWriteQueueSize().toString());
+        globalParameters.put(ClickHouseSinkConst.IGNORING_CLICKHOUSE_SENDING_EXCEPTION_ENABLED, "true");
+        return globalParameters;
+    }
+
     @ToString
     @Getter
     @Setter
@@ -49,8 +93,10 @@ public class OasConfig {
 
         private Integer watermarkOrderlessSeconds;
         private Integer watermarkIdlenessSeconds;
+        private Integer endpointSpanFilterParallelism;
 
         private boolean disableOperatorChaining = false;
+
     }
 
     @ToString
@@ -142,6 +188,7 @@ public class OasConfig {
         private int pgMaxRetry = 3;
         private int pgMaxRetryDelay = 1000;
         private int pgMaxRetryDelayMultiplier = 2;
+
         public String getJdbcUrl() {
             return String.format("jdbc:postgresql://%s:%d/%s", pgHost, pgPort, pgDatabase);
         }

+ 79 - 0
src/main/java/com/cecf/observe/agentstream/PojoSourceBuilder.java

@@ -0,0 +1,79 @@
+package com.cecf.observe.agentstream;
+
+import com.cecf.observe.agentstream.otel.PojoSpan;
+import com.cecf.observe.agentstream.otel.TraceServiceRequest2PojoSpanFlatMap;
+import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceRequest;
+import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.connector.kafka.source.KafkaSource;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+import java.time.Duration;
+
+public class PojoSourceBuilder {
+    public static DataStream<PojoSpan> buildPojoSourceStream(StreamExecutionEnvironment env, OasConfig oasConfig) {
+        SingleOutputStreamOperator<PojoSpan> pbOp = null;
+        SingleOutputStreamOperator<PojoSpan> jsonOp = null;
+        boolean enablePbSource = oasConfig.getPbSource().isEnabled();
+        boolean enableJsonSource = oasConfig.getJsonSource().isEnabled();
+        if (!enableJsonSource && !enablePbSource) {
+            throw new RuntimeException("至少配置一个Source");
+        }
+        OasConfig.JobConfig jobConfig = oasConfig.getJobConfig();
+        WatermarkStrategy<PojoSpan> watermarkStrategy = WatermarkStrategy
+                .<PojoSpan>forBoundedOutOfOrderness(Duration.ofSeconds(jobConfig.getWatermarkOrderlessSeconds()))
+                .withTimestampAssigner(
+                        (SerializableTimestampAssigner<PojoSpan>) (element, recordTimestamp) -> element.Timestamp.getTime()
+                ).withIdleness(
+                        Duration.ofSeconds(jobConfig.getWatermarkIdlenessSeconds())
+                );
+        if (enablePbSource) {
+            OasConfig.KafkaSourceConfig pbKafkaSourceConfig = oasConfig.getPbSource();
+            DeserializationSchema<ExportTraceServiceRequest> pbDeserializationSchema = new ExportTraceServiceRequestSchema();
+            KafkaSource<ExportTraceServiceRequest> pbSource =
+                    (new KafkaSourceBuilder<ExportTraceServiceRequest>()).setBrokers(pbKafkaSourceConfig.getBrokers())
+                            .setTopics(pbKafkaSourceConfig.getTopics())
+                            .setGroupID(pbKafkaSourceConfig.getConsumerGroupID())
+                            .setDeserializationSchema(pbDeserializationSchema)
+                            .build();
+            pbOp = env.fromSource(pbSource, WatermarkStrategy.noWatermarks(), "pbSource")
+                    .setParallelism(oasConfig.getJobConfig().getKafkaSourceSinkParallelism())
+                    .flatMap(new TraceServiceRequest2PojoSpanFlatMap())
+                    .setParallelism(jobConfig.getPojoParseParallelism())
+                    .name("TraceServiceRequestFlatMap2PojoSpan")
+                    .assignTimestampsAndWatermarks(watermarkStrategy)
+                    .name("assignTimestampAndWatermark")
+                    .setParallelism(jobConfig.getPojoParseParallelism());
+        }
+
+        if (enableJsonSource) {
+            OasConfig.KafkaSourceConfig jsonKafkaSourceConfig = oasConfig.getJsonSource();
+            KafkaSource<String> jsonSource =
+                    (new KafkaSourceBuilder<String>()).setBrokers(jsonKafkaSourceConfig.getBrokers())
+                            .setTopics(jsonKafkaSourceConfig.getTopics())
+                            .setGroupID(jsonKafkaSourceConfig.getConsumerGroupID())
+                            .setDeserializationSchema(new SimpleStringSchema())
+                            .build();
+            jsonOp = env.fromSource(jsonSource, WatermarkStrategy.noWatermarks(), "jsonSource")
+                    .setParallelism(oasConfig.getJobConfig().getKafkaSourceSinkParallelism())
+                    .flatMap(new JSONString2PojoSpanFlatMapFunc())
+                    .setParallelism(jobConfig.getPojoParseParallelism())
+                    .name("JsonFlatMap2PojoSpan")
+                    .assignTimestampsAndWatermarks(watermarkStrategy)
+                    .name("assignTimestampAndWatermark")
+                    .setParallelism(jobConfig.getPojoParseParallelism());
+        }
+        if (pbOp == null) {
+            return jsonOp;
+        }
+        if (jsonOp == null) {
+            return pbOp;
+        }
+        return pbOp.union(jsonOp);
+    }
+
+}

+ 2 - 2
src/main/java/com/cecf/observe/agentstream/PojoSpanSpanCSVConverter.java

@@ -9,6 +9,7 @@ import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 public class PojoSpanSpanCSVConverter extends CHConverter implements ClickHouseSinkConverter<PojoSpan> {
+    private final static org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PojoSpanSpanCSVConverter.class);
 
     @Override
     public String convert(PojoSpan ss) {
@@ -51,7 +52,7 @@ public class PojoSpanSpanCSVConverter extends CHConverter implements ClickHouseS
 
         addInt(ss.HttpCode, builder);
         addString(ss.HttpMethod, builder);
-        addString(ss.getHttpPath(), builder);
+        addString(ss.getNormalizedHttpPath(), builder);
 
         addString(ss.ContainerID, builder);
 
@@ -85,7 +86,6 @@ public class PojoSpanSpanCSVConverter extends CHConverter implements ClickHouseS
         addString(ss.AppName, builder, true);
 
         builder.append(")");
-//        logger.info("SQL语句values:{}", builder.toString());
         return builder.toString();
     }
 }

+ 57 - 0
src/main/java/com/cecf/observe/agentstream/TraceURLStream.java

@@ -0,0 +1,57 @@
+package com.cecf.observe.agentstream;
+
+import com.cecf.observe.agentstream.otel.*;
+import com.cecf.observe.agentstream.traceurl.PojoSpan2TraceURLMapper;
+import com.cecf.observe.agentstream.traceurl.TraceURL;
+import com.cecf.observe.agentstream.traceurl.TraceURLCHConverter;
+import com.cecf.observe.agentstream.traceurl.URLAggregator;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.OutputTag;
+import ru.ivi.opensource.flinkclickhousesink.ClickHouseSink;
+
+import java.time.Duration;
+import java.util.Collections;
+
+public class TraceURLStream {
+    private static void handelTraceURLStream(OasConfig.TraceURLConfig traceURL, SingleOutputStreamOperator<PojoSpan> mainStreamOperator) {
+        if (!traceURL.isEnabled()) {
+            return;
+        }
+        final OutputTag<PojoSpan> traceURLOutputTag = new OutputTag<>("traceURL") {
+        };
+        mainStreamOperator.process(new PojoSpanOutputTagProcessFunc(Collections.singletonList(traceURLOutputTag)));
+        String traceURLTableName = traceURL.getSinkTableName();
+        Integer traceURLWriteBufSize = traceURL.getWriteBufSize();
+        ClickHouseSink<TraceURL> traceURLClickHouseSink = (new ClickhouseSinkBuilder<TraceURL>()).
+                setTableName(traceURLTableName).
+                setWriteBufSize(traceURLWriteBufSize.toString())
+                .setConverter(new TraceURLCHConverter())
+                .build();
+
+        final OutputTag<TraceURL> otURLMappingOutput = new OutputTag<>("otURLMapping") {
+        };
+        SingleOutputStreamOperator<TraceURL> traceURLSingleOutputStreamOperator = mainStreamOperator
+                .getSideOutput(traceURLOutputTag)
+                .filter(new SpanKindFilter(OtelConst.SpanKindServer))
+                .name("filterSpanKindServer")
+                .filter(new ScopeTypeFilter(SpanScopeType.SPAN_SCOPE_TYPE_HTTP_SERVER))
+                .name("filterHTTPScope")
+                .map(new PojoSpan2TraceURLMapper())
+                .name("pojoSpan2TraceURL")
+                .process(new ProcessFunction<>() {
+                    @Override
+                    public void processElement(TraceURL traceURL, ProcessFunction<TraceURL, TraceURL>.Context context, Collector<TraceURL> collector) {
+                        collector.collect(traceURL);
+                        context.output(otURLMappingOutput, traceURL);
+                    }
+                });
+        traceURLSingleOutputStreamOperator.windowAll(TumblingEventTimeWindows.of(Duration.ofSeconds(30)))
+                .aggregate(new URLAggregator())
+                .name("aggregateByASMR")
+                .print();
+        traceURLSingleOutputStreamOperator.addSink(traceURLClickHouseSink).name("Sink2Clickhouse");
+    }
+}

+ 9 - 0
src/main/java/com/cecf/observe/agentstream/common/StringUtils.java

@@ -1,5 +1,9 @@
 package com.cecf.observe.agentstream.common;
 
+import org.jetbrains.annotations.NotNull;
+
+import java.time.Duration;
+
 public class StringUtils {
     public static String capitalize(String input) {
         if (input == null || input.isEmpty()) {
@@ -7,4 +11,9 @@ public class StringUtils {
         }
         return Character.toUpperCase(input.charAt(0)) + input.substring(1);
     }
+
+    public static @NotNull String getDurationString(Duration du) {
+        return du.toHours() >= 1 ? String.format("%dh", du.toHours()) :
+                (du.toMinutes() >= 1 ? String.format("%dm", du.toMinutes()) : String.format("%ds", du.getSeconds()));
+    }
 }

+ 1 - 1
src/main/java/com/cecf/observe/agentstream/i6000/TopoAggregator.java

@@ -33,7 +33,7 @@ public class TopoAggregator implements AggregateFunction<PojoSpan, TopoAggregato
                     value.Duration, code < 200 || code > 299, value.TraceID));
             return accumulator;
         }
-        accumulator.addServer(new ReceivedSpanMeta(value.AppName, value.ServiceName, value.getHttpPath(),
+        accumulator.addServer(new ReceivedSpanMeta(value.AppName, value.ServiceName, value.getOriginHttpPath(),
                 value.ParentSpanID, value.TraceID, value.SpanID));
         return accumulator;
     }

+ 17 - 0
src/main/java/com/cecf/observe/agentstream/metrics/AppNameMetricMeta.java

@@ -0,0 +1,17 @@
+package com.cecf.observe.agentstream.metrics;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.NoArgsConstructor;
+
+@NoArgsConstructor
+@AllArgsConstructor
+@Data
+@EqualsAndHashCode
+public class AppNameMetricMeta {
+    private String appName;
+
+    @EqualsAndHashCode.Exclude
+    private long timestamp;
+}

+ 2 - 0
src/main/java/com/cecf/observe/agentstream/metrics/EndpointMeta.java

@@ -1,10 +1,12 @@
 package com.cecf.observe.agentstream.metrics;
 
+import lombok.EqualsAndHashCode;
 import lombok.Getter;
 import lombok.NoArgsConstructor;
 
 @NoArgsConstructor
 @Getter
+@EqualsAndHashCode(callSuper = true)
 public class EndpointMeta extends ServiceInstanceMeta {
     private String endpoint;
 

+ 31 - 0
src/main/java/com/cecf/observe/agentstream/metrics/EndpointMetrics.java

@@ -33,6 +33,37 @@ public class EndpointMetrics {
     private double quantileLatencyMS95;
     private double quantileLatencyMS99;
 
+
+    public EndpointMetrics(List<EndpointMetrics> els) {
+        this.apdex = els.stream().mapToDouble(EndpointMetrics::getApdex).average().orElse(0.0);
+        this.maxLatencyMS = els.stream().mapToLong(EndpointMetrics::getMaxLatencyMS).max().orElse(0);
+        this.minLatencyMS = els.stream().mapToLong(EndpointMetrics::getMinLatencyMS).min().orElse(0);
+        this.avgLatencyMS = els.stream().mapToLong(EndpointMetrics::getAvgLatencyMS).sum() / els.size();
+        this.requestCount = els.stream().mapToLong(EndpointMetrics::getRequestCount).sum();
+        this.RPM = els.stream().mapToLong(EndpointMetrics::getRPM).sum() / els.size();
+        this.latencyLe500 = els.stream().mapToLong(EndpointMetrics::getLatencyLe500).sum() / (els.size());
+        this.latencyLe1000 = els.stream().mapToLong(EndpointMetrics::getLatencyLe1000).sum() / (els.size());
+        this.latencyLe5000 = els.stream().mapToLong(EndpointMetrics::getLatencyLe5000).sum() / (els.size());
+        this.quantileLatencyMS50 = els.stream().mapToDouble(EndpointMetrics::getQuantileLatencyMS50).sum() / els.size();
+        this.quantileLatencyMS90 = els.stream().mapToDouble(EndpointMetrics::getQuantileLatencyMS90).sum() / els.size();
+        this.quantileLatencyMS95 = els.stream().mapToDouble(EndpointMetrics::getQuantileLatencyMS95).sum() / els.size();
+        this.quantileLatencyMS99 = els.stream().mapToDouble(EndpointMetrics::getQuantileLatencyMS99).sum() / els.size();
+        this.httpCode2Count = new HashMap<>();
+        this.spanStatusCode2Count = new HashMap<>();
+        this.exceptionType2Count = new HashMap<>();
+        for (EndpointMetrics el : els) {
+            for (Map.Entry<Long, Integer> entry : el.httpCode2Count.entrySet()) {
+                this.httpCode2Count.put(entry.getKey(), this.httpCode2Count.getOrDefault(entry.getKey(), 0) + entry.getValue());
+            }
+            for (Map.Entry<Integer, Integer> entry : el.spanStatusCode2Count.entrySet()) {
+                this.spanStatusCode2Count.put(entry.getKey(), this.spanStatusCode2Count.getOrDefault(entry.getKey(), 0) + entry.getValue());
+            }
+            for (Map.Entry<String, Integer> entry : el.exceptionType2Count.entrySet()) {
+                this.exceptionType2Count.put(entry.getKey(), this.exceptionType2Count.getOrDefault(entry.getKey(), 0) + entry.getValue());
+            }
+        }
+    }
+
     public static class EndpointMetricsBuilder {
         private final List<ServerMetricMeasure> serverMetricMeasures;
         private final Histogram sumHistogram;

+ 2 - 1
src/main/java/com/cecf/observe/agentstream/metrics/ServiceInstanceMeta.java

@@ -1,6 +1,6 @@
 package com.cecf.observe.agentstream.metrics;
 
-import lombok.AllArgsConstructor;
+import lombok.EqualsAndHashCode;
 import lombok.Getter;
 import lombok.NoArgsConstructor;
 import lombok.Setter;
@@ -8,6 +8,7 @@ import lombok.Setter;
 @NoArgsConstructor
 @Getter
 @Setter
+@EqualsAndHashCode(callSuper = true)
 public class ServiceInstanceMeta extends ServiceMetricMeta {
     private String instanceId;
 

+ 7 - 2
src/main/java/com/cecf/observe/agentstream/metrics/ServiceMetric.java

@@ -15,6 +15,7 @@ public class ServiceMetric {
     public ServiceMetric() {
         serviceInstanceMetrics = new HashMap<>();
     }
+
     private long timestamp;
     private String serviceName;
     private String appName;
@@ -32,7 +33,7 @@ public class ServiceMetric {
         return result;
     }
 
-    public void addEndpointRequest(String appName, long timestamp, String serviceName, String instanceID,
+    public void addEndpointRequest(String appName, String serviceName, String instanceID,
                                    String endpoint, int spanStatusCode, long latencyMS,
                                    long httpCode, Map<String, Integer> ex2c) {
         if (this.serviceName == null) {
@@ -63,7 +64,7 @@ public class ServiceMetric {
 
     public void merge(ServiceMetric b) {
         for (ServiceInstanceMetric bi : b.getServiceInstanceMetrics()) {
-            ServiceInstanceMetric   ai =  serviceInstanceMetrics.get(bi.getInstanceID());
+            ServiceInstanceMetric ai = serviceInstanceMetrics.get(bi.getInstanceID());
             if (ai == null) {
                 serviceInstanceMetrics.put(bi.getInstanceID(), bi);
                 continue;
@@ -71,4 +72,8 @@ public class ServiceMetric {
             ai.merge(bi);
         }
     }
+
+    public boolean isEmpty() {
+        return serviceName == null || appName == null;
+    }
 }

+ 14 - 2
src/main/java/com/cecf/observe/agentstream/metrics/ServiceMetricAggregator.java

@@ -22,6 +22,10 @@ public class ServiceMetricAggregator implements AggregateFunction<PojoSpan, Serv
         }
         String serviceName = value.getServiceName();
         String instanceID = value.getServiceInstanceID();
+        if (instanceID == null) {
+            logger.warn("no instanceID found, skip span");
+            return accumulator;
+        }
         Map<String, Integer> ex2c = null;
         if (value.hasExceptionEvents()) {
             ex2c = new HashMap<>();
@@ -29,14 +33,22 @@ public class ServiceMetricAggregator implements AggregateFunction<PojoSpan, Serv
                 ex2c.put(exception.Typ, ex2c.getOrDefault(exception.Typ, 0) + 1);
             }
         }
-        String endpoint = value.getHttpPath();
-        accumulator.addEndpointRequest(value.AppName, value.Timestamp.getTime(),  serviceName, instanceID, endpoint, value.StatusCode,
+        String endpoint = value.getNormalizedHttpPath();
+        if (endpoint == null) {
+            logger.warn("no endpoint found, skip span");
+            return accumulator;
+        }
+        accumulator.addEndpointRequest(value.AppName, serviceName, instanceID, endpoint, value.StatusCode,
                 value.getLatencyMS(), value.getHttpStatusCode(), ex2c);
         return accumulator;
     }
 
     @Override
     public ServiceMetric getResult(ServiceMetric accumulator) {
+        logger.info("获取ServiceMetric, AppName:{}", accumulator.getAppName());
+        if (accumulator.isEmpty()) {
+            return null;
+        }
         return accumulator;
     }
 

+ 8 - 5
src/main/java/com/cecf/observe/agentstream/metrics/ServiceMetricMeta.java

@@ -1,15 +1,18 @@
 package com.cecf.observe.agentstream.metrics;
 
-import lombok.AllArgsConstructor;
 import lombok.Data;
+import lombok.EqualsAndHashCode;
 import lombok.NoArgsConstructor;
 
 
 @NoArgsConstructor
-@AllArgsConstructor
 @Data
-public class ServiceMetricMeta {
-    private String appName;
-    private long timestamp;
+@EqualsAndHashCode(callSuper = true)
+public class ServiceMetricMeta extends AppNameMetricMeta {
     private String serviceName;
+
+    public ServiceMetricMeta(String appName, long timestamp, String serviceName) {
+        super(appName, timestamp);
+        this.serviceName = serviceName;
+    }
 }

+ 1 - 1
src/main/java/com/cecf/observe/agentstream/metrics/ServiceMetricTimestampAdder.java

@@ -11,7 +11,7 @@ public class ServiceMetricTimestampAdder extends ProcessWindowFunction<ServiceMe
             String, TimeWindow>.Context context, Iterable<ServiceMetric> elements, Collector<ServiceMetric> out) throws Exception {
         TimeWindow timeWindow = context.window();
         ServiceMetric value = elements.iterator().next();
-        value.setTimestamp(timeWindow.getStart());
+        value.setTimestamp(timeWindow.getEnd());
         out.collect(value);
     }
 }

+ 25 - 0
src/main/java/com/cecf/observe/agentstream/metrics/SpanTagsMetrics.java

@@ -0,0 +1,25 @@
+package com.cecf.observe.agentstream.metrics;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+public class SpanTagsMetrics {
+    private String appName;
+    private String serviceName;
+    private long spanCount;
+    private long httpCount;
+    private long httpErrorCount;
+    private long rpcCount;
+    private long databaseCount;
+
+    public SpanTagsMetrics(SpanTagsMetrics a, SpanTagsMetrics b) {
+    }
+
+    public boolean isEmpty() {
+        return appName == null || serviceName == null;
+    }
+}

+ 97 - 0
src/main/java/com/cecf/observe/agentstream/metrics/SpanTagsProcess.java

@@ -0,0 +1,97 @@
+package com.cecf.observe.agentstream.metrics;
+
+import com.cecf.observe.agentstream.common.StringUtils;
+import com.cecf.observe.agentstream.common.TimeWindowAdder;
+import com.cecf.observe.agentstream.otel.PojoSpan;
+import lombok.Data;
+import lombok.experimental.Accessors;
+import org.apache.flink.api.common.functions.AggregateFunction;
+import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
+import org.apache.flink.connector.jdbc.JdbcSink;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
+
+import java.sql.Timestamp;
+import java.time.Duration;
+
+public class SpanTagsProcess implements AggregateFunction<PojoSpan, SpanTagsMetrics, SpanTagsMetrics> {
+
+    @Override
+    public SpanTagsMetrics createAccumulator() {
+        return new SpanTagsMetrics();
+    }
+
+    @Override
+    public SpanTagsMetrics add(PojoSpan value, SpanTagsMetrics accumulator) {
+        if (accumulator.getServiceName() == null) {
+            accumulator.setServiceName(value.getServiceName());
+        }
+        if (accumulator.getAppName() == null) {
+            accumulator.setAppName(value.AppName);
+        }
+        accumulator.setSpanCount(1 + accumulator.getSpanCount());
+        if (value.isHttpScopeType()) {
+            accumulator.setHttpCount(1 + accumulator.getHttpCount());
+        }
+
+        if (value.isDBScopeType()) {
+            accumulator.setDatabaseCount(1 + accumulator.getDatabaseCount());
+        }
+        if (value.isRPCScopeType()) {
+            accumulator.setRpcCount(1 + accumulator.getRpcCount());
+        }
+        long code = value.getHttpStatusCode();
+        if (code >= 400) {
+            accumulator.setHttpErrorCount(1 + accumulator.getHttpErrorCount());
+        }
+        return accumulator;
+    }
+
+    @Override
+    public SpanTagsMetrics getResult(SpanTagsMetrics accumulator) {
+        if (accumulator.isEmpty()) {
+            return null;
+        }
+        return accumulator;
+    }
+
+    @Override
+    public SpanTagsMetrics merge(SpanTagsMetrics a, SpanTagsMetrics b) {
+        return new SpanTagsMetrics(a, b);
+    }
+
+    @Data
+    @Accessors(chain = true)
+    public static class SpanTagsProcessBuilder {
+        private Duration windowDuration;
+        private JdbcConnectionOptions jdbcConnectionOptions;
+
+        public void buildStream(DataStream<PojoSpan> base) {
+            String tableSuffix = StringUtils.getDurationString(windowDuration);
+            String endpointSql = String.format("insert into span_metrics_%s " +
+                                               "(timestamp, app_name, service_name, span_count, http_count, http_error_count,\n" +
+                                               "rpc_count, database_count) values (?,?,?,?,?,?,?,?)", tableSuffix);
+            base
+                    .keyBy(PojoSpan::getServiceName)
+                    .window(TumblingEventTimeWindows.of(windowDuration))
+                    .aggregate(new SpanTagsProcess(), new TimeWindowAdder<SpanTagsMetrics, String>())
+                    .addSink(JdbcSink.sink(
+                            endpointSql,
+                            (ps, t) -> {
+                                Long timestamp = t.f0;
+                                SpanTagsMetrics meta = t.f1;
+                                ps.setTimestamp(1, new Timestamp(timestamp));
+                                ps.setString(2, meta.getAppName());
+                                ps.setString(3, meta.getServiceName());
+                                ps.setLong(4, meta.getSpanCount());
+                                ps.setLong(5, meta.getHttpCount());
+                                ps.setLong(6, meta.getHttpErrorCount());
+                                ps.setLong(7, meta.getRpcCount());
+                                ps.setLong(8, meta.getDatabaseCount());
+                            },
+                            jdbcConnectionOptions
+                    ));
+        }
+    }
+
+}

+ 1 - 1
src/main/java/com/cecf/observe/agentstream/otel/HttpPathFilter.java

@@ -19,6 +19,6 @@ public class HttpPathFilter extends FilterFunc<PojoSpan> {
 
     @Override
     protected boolean filterWhenEnabled(PojoSpan pojoSpan) throws Exception {
-        return httpPathsIncluded.contains(pojoSpan.getHttpPath());
+        return httpPathsIncluded.contains(pojoSpan.getOriginHttpPath());
     }
 }

+ 37 - 18
src/main/java/com/cecf/observe/agentstream/otel/PojoSpan.java

@@ -51,7 +51,9 @@ public class PojoSpan {
                 }
             }
         }
-        // end appName
+        if (AppName == null || AppName.isEmpty()) {
+            AppName = "UNSET";
+        }
     }
 
     public SpanScopeType getSpanScopeType() {
@@ -142,7 +144,7 @@ public class PojoSpan {
     public String getAttribute(String de, String... keys) {
         for (String key : keys) {
             String value = getAttribute(key);
-            if (value != null) {
+            if (value != null && !value.isEmpty()) {
                 return value;
             }
         }
@@ -229,7 +231,8 @@ public class PojoSpan {
     }
 
     public String getServiceInstanceID() {
-        return getResourceAttribute(null, "k8s.pod.uid", "k8s.pod.ip", "host.id", "host.ip", "host.mac");
+        return getResourceAttribute(null, "k8s.pod.uid", "k8s.pod.ip", "container.id", "service.instance.id",
+                "host.id", "host.ip", "host.mac");
     }
 
     public String getHostIP() {
@@ -268,23 +271,39 @@ public class PojoSpan {
         return Events != null && Events.length > 0;
     }
 
-    public String getHttpPath() {
-        String route = getAttribute("", "url.path", "http.route", "http.target");
-        if (route != null && !route.trim().isEmpty()) {
-            String[] routes = route.split("\\?");
-            if (routes.length > 1) {
-                return routes[0];
-            }
-            return route;
+
+    private static String replaceRegexpOne(String input, String regex) {
+        Pattern pattern = Pattern.compile(regex);
+        Matcher matcher = pattern.matcher(input);
+        if (matcher.find()) {
+            return matcher.replaceFirst("{:var}");
         }
-        String rawUrl = this.getHttpUrl();
-        try {
-            URL url = (new URI(rawUrl)).toURL();
-            return url.getPath();
-        } catch (URISyntaxException | MalformedURLException | IllegalArgumentException e) {
-//            logger.warn("解析http route失败: ss: {}", (Object) SpanAttributes);
-            return "";
+        return input;
+    }
+
+    public String getOriginHttpPath() {
+        String httpRoute = getAttribute("", "http.route", "url.path", "http.target", "http.url", "url.full").trim();
+        if (!httpRoute.isEmpty()) {
+            return removeQueryString(httpRoute);
         }
+        return null;
+    }
+
+    public static String removeQueryString(String url) {
+        int questionMarkIndex = url.indexOf('?');
+        if (questionMarkIndex != -1) {
+            return url.substring(0, questionMarkIndex);
+        }
+        return url;
+    }
+
+    public String getNormalizedHttpPath() {
+        String pathValue = getOriginHttpPath();
+        if (pathValue != null) {
+            String varPattern = "\\b[a-fA-F0-9]{32,}\\b|\\b[a-fA-F0-9\\-]{36}\\b|\\b\\[1-9]\\d*\\b";
+            return replaceRegexpOne(pathValue, varPattern);
+        }
+        return null;
     }
 
     public String getHttpUrl() {

+ 1 - 1
src/main/java/com/cecf/observe/agentstream/otel/ServiceMetricAggregator.java

@@ -40,7 +40,7 @@ public class ServiceMetricAggregator implements AggregateFunction<PojoSpan,
             Integer spanDurationNs = (int) value.Duration;
 
 
-            ServiceEndpointMeta serviceEndpointMeta = new ServiceEndpointMeta(serviceInstanceMeta, value.getHttpServerPort(), value.getHttpPath());
+            ServiceEndpointMeta serviceEndpointMeta = new ServiceEndpointMeta(serviceInstanceMeta, value.getHttpServerPort(), value.getNormalizedHttpPath());
             ServiceEndpointMetric serviceEndpointMetric = accumulator.getOrDefault(serviceEndpointMeta, new ServiceEndpointMetric());
             serviceEndpointMetric.addRequest(isError, value.getExceptionSize(), spanDurationNs);
             accumulator.put(serviceEndpointMeta, serviceEndpointMetric);

+ 1 - 1
src/main/java/com/cecf/observe/agentstream/traceurl/PojoSpan2TraceURLMapper.java

@@ -6,7 +6,7 @@ import org.apache.flink.api.common.functions.MapFunction;
 public class PojoSpan2TraceURLMapper implements MapFunction<PojoSpan, TraceURL> {
     @Override
     public TraceURL map(PojoSpan pojoSpan) throws Exception {
-        String httpPath = pojoSpan.getHttpPath();
+        String httpPath = pojoSpan.getNormalizedHttpPath();
         return (new TraceURL())
                 .setTimestamp(pojoSpan.Timestamp)
                 .setRoute(httpPath)

+ 1 - 1
version

@@ -1 +1 @@
-v2.5.14
+v2.7.1