backup
class Main() {
public static void main(String[] args) {
final OutputTag<PojoSpan> i6000OutputTag = new OutputTag<>("i6000-stream") {
};
mainStreamOperator = mainStreamOperator.process(new PojoSpanOutputTagProcessFunc(Collections.singletonList(i6000OutputTag)))
.name("i6000SideOutput");
String i6000KafkaBootstrapServer = i6000Config.getKafkaBootstrapServer();
KafkaSink<byte[]> i6000KafkaSink = KafkaSink.<byte[]>builder()
.setBootstrapServers(i6000KafkaBootstrapServer)
.setRecordSerializer(
KafkaRecordSerializationSchema.builder()
.setTopic(i6000Config.getKafkaTopicTrace())
.setValueSerializationSchema(new I6000MessageSerializer()).build()
)
.build();
mainStreamOperator.getSideOutput(i6000OutputTag)
.filter(new AppNameFilter(i6000Config.getIncludeAppNames()))
.name("filterI6000ByAppName")
.filter(new ServiceNameFilter(i6000Config.getExcludeServiceNames()))
.name("FilterI6000ServiceName")
.filter(new ScopeTypeFilter(SpanScopeType.SPAN_SCOPE_TYPE_HTTP))
.name("FilterI6000ScopeType")
.filter(new HttpPathFilter(i6000Config.getHttpPathIncluded()))
.name("FilterI6000ByHttpPath")
.map(new PojoSpan2TracesMessage())
.name("MapI6000PojoSpan2Bytes")
.sinkTo(i6000KafkaSink)
.name("SinkI6000-kafka");
}
}