Browse Source

span kind count

liubing 4 weeks ago
parent
commit
e3af89fe67

+ 13 - 8
sql/metric_tables_1202.sql

@@ -28,14 +28,19 @@ SELECT add_retention_policy('app_metrics_15s', INTERVAL '10 days');
 drop table if exists ob_metrics.public.span_metrics_1m;
 create table if not exists ob_metrics.public.span_metrics_1m
 (
-    timestamp        timestamp,
-    app_name         varchar(256),
-    service_name     varchar(512),
-    span_count       int,
-    http_count       int,
-    http_error_count int,
-    rpc_count        int,
-    database_count   int
+    timestamp          timestamp,
+    app_name           varchar(256),
+    service_name       varchar(512),
+    span_count         bigint,
+    http_count         bigint,
+    http_error_count   bigint,
+    rpc_count          bigint,
+    database_count     bigint,
+    span_kind_internal bigint,
+    span_kind_server   bigint,
+    span_kind_client   bigint,
+    span_kind_producer bigint,
+    span_kind_consumer bigint
 );
 
 SELECT create_hypertable('span_metrics_1m', 'timestamp');

+ 0 - 8
src/main/java/com/cecf/observe/agentstream/Main.java

@@ -55,17 +55,9 @@ public class Main {
                 .name("过滤ServiceName")
                 .setParallelism(jobConfig.getFilterByServiceNameParallelism());
         // clickhouse traces stream
-//        SlotSharingGroup clickhouseSinkSSG = SlotSharingGroup.newBuilder("clickhouseSSG")
-//                .setCpuCores(1)
-//                .setManagedMemoryMB(300)
-//                .setTaskOffHeapMemoryMB(500)
-//                .setTaskHeapMemoryMB(2000)
-//                .build();
-//        env.registerSlotSharingGroup(clickhouseSinkSSG);
         baseStream
                 .addSink(tracesClickhouseSink)
                 .setParallelism(jobConfig.getClickhouseSinkParallelism())
-//                .slotSharingGroup(clickhouseSinkSSG)
                 .name("ClickhouseSink");
 
         OasConfig.SpanMetricConfig configSpanMetric = oasConfig.getSpanMetric();

+ 6 - 0
src/main/java/com/cecf/observe/agentstream/metrics/SpanTagsMetrics.java

@@ -16,6 +16,12 @@ public class SpanTagsMetrics {
     private long rpcCount;
     private long databaseCount;
 
+    private long spanKindInternalCount;
+    private long spanKindServerCount;
+    private long spanKindClientCount;
+    private long spanKindProducerCount;
+    private long spanKindConsumerCount;
+
     public SpanTagsMetrics(SpanTagsMetrics a, SpanTagsMetrics b) {
     }
 

+ 27 - 1
src/main/java/com/cecf/observe/agentstream/metrics/SpanTagsProcess.java

@@ -44,6 +44,24 @@ public class SpanTagsProcess implements AggregateFunction<PojoSpan, SpanTagsMetr
         if (code >= 400) {
             accumulator.setHttpErrorCount(1 + accumulator.getHttpErrorCount());
         }
+
+        switch (value.SpanKind) {
+            case 1:
+                accumulator.setSpanKindInternalCount(1 + accumulator.getSpanKindInternalCount());
+                break;
+            case 2:
+                accumulator.setSpanKindServerCount(1 + accumulator.getSpanKindServerCount());
+                break;
+            case 3:
+                accumulator.setSpanKindClientCount(1 + accumulator.getSpanKindClientCount());
+                break;
+            case 4:
+                accumulator.setSpanKindProducerCount(1 + accumulator.getSpanKindProducerCount());
+                break;
+            case 5:
+                accumulator.setSpanKindConsumerCount(1 + accumulator.getSpanKindConsumerCount());
+                break;
+        }
         return accumulator;
     }
 
@@ -70,7 +88,10 @@ public class SpanTagsProcess implements AggregateFunction<PojoSpan, SpanTagsMetr
             String tableSuffix = StringUtils.getDurationString(windowDuration);
             String endpointSql = String.format("insert into span_metrics_%s " +
                                                "(timestamp, app_name, service_name, span_count, http_count, http_error_count,\n" +
-                                               "rpc_count, database_count) values (?,?,?,?,?,?,?,?)", tableSuffix);
+                                               "rpc_count, database_count, span_kind_internal, span_kind_server, " +
+                                               "span_kind_client, span_kind_producer, span_kind_consumer) values " +
+                                               "(?,?,?,?,?,?,?,?, ?, ?, ?, ?, ?)",
+                    tableSuffix);
             base
                     .keyBy(PojoSpan::getServiceName)
                     .window(TumblingEventTimeWindows.of(windowDuration))
@@ -88,6 +109,11 @@ public class SpanTagsProcess implements AggregateFunction<PojoSpan, SpanTagsMetr
                                 ps.setLong(6, meta.getHttpErrorCount());
                                 ps.setLong(7, meta.getRpcCount());
                                 ps.setLong(8, meta.getDatabaseCount());
+                                ps.setLong(9, meta.getSpanKindInternalCount());
+                                ps.setLong(10, meta.getSpanKindServerCount());
+                                ps.setLong(11, meta.getSpanKindClientCount());
+                                ps.setLong(12, meta.getSpanKindProducerCount());
+                                ps.setLong(13, meta.getSpanKindConsumerCount());
                             },
                             jdbcConnectionOptions
                     ));