### backup ```java class Main() { public static void main(String[] args) { final OutputTag i6000OutputTag = new OutputTag<>("i6000-stream") { }; mainStreamOperator = mainStreamOperator.process(new PojoSpanOutputTagProcessFunc(Collections.singletonList(i6000OutputTag))) .name("i6000SideOutput"); String i6000KafkaBootstrapServer = i6000Config.getKafkaBootstrapServer(); KafkaSink i6000KafkaSink = KafkaSink.builder() .setBootstrapServers(i6000KafkaBootstrapServer) .setRecordSerializer( KafkaRecordSerializationSchema.builder() .setTopic(i6000Config.getKafkaTopicTrace()) .setValueSerializationSchema(new I6000MessageSerializer()).build() ) .build(); mainStreamOperator.getSideOutput(i6000OutputTag) .filter(new AppNameFilter(i6000Config.getIncludeAppNames())) .name("filterI6000ByAppName") .filter(new ServiceNameFilter(i6000Config.getExcludeServiceNames())) .name("FilterI6000ServiceName") .filter(new ScopeTypeFilter(SpanScopeType.SPAN_SCOPE_TYPE_HTTP)) .name("FilterI6000ScopeType") .filter(new HttpPathFilter(i6000Config.getHttpPathIncluded())) .name("FilterI6000ByHttpPath") .map(new PojoSpan2TracesMessage()) .name("MapI6000PojoSpan2Bytes") .sinkTo(i6000KafkaSink) .name("SinkI6000-kafka"); } } ```