liubing 2 月之前
父节点
当前提交
5acd0f5a2f

+ 0 - 29
src/main/java/com/cecf/observe/agentstream/Main.java

@@ -249,8 +249,6 @@ public class Main {
         i6000KafkaProducerSettings.put("linger.ms", "0");
         OasConfig.I6000Config i6000Config = oasConfig.getI6000Config();
         OasConfig.JobConfig jobConfig = oasConfig.getJobConfig();
-        final OutputTag<PojoSpan> topoOutput = new OutputTag<>("http-metric-topo") {
-        };
         final OutputTag<PojoSpan> httpMetricOutput = new OutputTag<>("http-metric-stream") {
         };
         Set<String> i6000IncludeAppNameSet = new HashSet<>(i6000Config.getIncludeAppNames());
@@ -262,9 +260,6 @@ public class Main {
                         if (!i6000IncludeAppNameSet.contains(value.AppName)) {
                             return;
                         }
-                        if (value.isHttpScopeType()) {
-                            ctx.output(topoOutput, value);
-                        }
                         if (value.isHttpServerScopeType()) {
                             ctx.output(httpMetricOutput, value);
                         }
@@ -295,30 +290,6 @@ public class Main {
                         i6000Config.getIncludeAppNames()))
                 .setParallelism(i6000Metric2BytesParallelism).name("i6000Metrics2Bytes")
                 .sinkTo(i6000MetricSink).setParallelism(jobConfig.getI6000MetricKafkaSinkParallelism()).name("i6000MetricKafka");
-        // i6000 stream
         // todo 拆解i6000与metric生成
-//        if (!i6000Config.isEnabled()) {
-//            return;
-//        }
-        KafkaSink<byte[]> i6000TopoSink = KafkaSink.<byte[]>builder()
-                .setBootstrapServers(kafkaBootstrapServers)
-                .setKafkaProducerConfig(i6000KafkaProducerSettings)
-                .setRecordSerializer(
-                        KafkaRecordSerializationSchema.builder()
-                                .setTopic(i6000Config.getKafkaTopicTopo())
-                                .setValueSerializationSchema(new I6000MessageSerializer()).build()
-                )
-                .build();
-
-        outputStreamOperator.getSideOutput(topoOutput)
-                .windowAll(TumblingEventTimeWindows.of(Duration.ofSeconds(i6000Config.getTopoWindowSeconds())))
-                .aggregate(new TopoAggregator(), new TimeWindowAllAdder<>())
-                .name("aggregateServiceTopo")
-                .flatMap(new Topo2BytesMapper(i6000Config.getAppName2CmdbID()))
-                .setParallelism(jobConfig.getI6000Topo2BytesParallelism())
-                .name("ServiceTopo2Bytes")
-                .sinkTo(i6000TopoSink)
-                .setParallelism(jobConfig.getI6000TopoKafkaSinkParallelism())
-                .name("i6000Topo2Kafka");
     }
 }

+ 1 - 1
src/main/java/com/cecf/observe/agentstream/PojoSpanSpanCSVConverter.java

@@ -48,7 +48,7 @@ public class PojoSpanSpanCSVConverter extends CHConverter implements ClickHouseS
 
         addInt(ss.HttpCode, builder);
         addString(ss.HttpMethod, builder);
-        addString(ss.HttpURL, builder);
+        addString(ss.getHttpPath(), builder);
 
         addString(ss.ContainerID, builder);