readmd.md 1.6 KB

backup

class Main() {
    public static void main(String[] args) {
        final OutputTag<PojoSpan> i6000OutputTag = new OutputTag<>("i6000-stream") {
        };
        mainStreamOperator = mainStreamOperator.process(new PojoSpanOutputTagProcessFunc(Collections.singletonList(i6000OutputTag)))
                .name("i6000SideOutput");
        String i6000KafkaBootstrapServer = i6000Config.getKafkaBootstrapServer();
        KafkaSink<byte[]> i6000KafkaSink = KafkaSink.<byte[]>builder()
                .setBootstrapServers(i6000KafkaBootstrapServer)
                .setRecordSerializer(
                        KafkaRecordSerializationSchema.builder()
                                .setTopic(i6000Config.getKafkaTopicTrace())
                                .setValueSerializationSchema(new I6000MessageSerializer()).build()
                )
                .build();
        mainStreamOperator.getSideOutput(i6000OutputTag)
                .filter(new AppNameFilter(i6000Config.getIncludeAppNames()))
                .name("filterI6000ByAppName")
                .filter(new ServiceNameFilter(i6000Config.getExcludeServiceNames()))
                .name("FilterI6000ServiceName")
                .filter(new ScopeTypeFilter(SpanScopeType.SPAN_SCOPE_TYPE_HTTP))
                .name("FilterI6000ScopeType")
                .filter(new HttpPathFilter(i6000Config.getHttpPathIncluded()))
                .name("FilterI6000ByHttpPath")
                .map(new PojoSpan2TracesMessage())
                .name("MapI6000PojoSpan2Bytes")
                .sinkTo(i6000KafkaSink)
                .name("SinkI6000-kafka");
    }
}