|
@@ -59,8 +59,10 @@ public class Main {
|
|
|
WatermarkStrategy<PojoSpan> watermarkStrategy = WatermarkStrategy
|
|
|
.<PojoSpan>forBoundedOutOfOrderness(Duration.ofSeconds(jobConfig.getWatermarkOrderlessSeconds()))
|
|
|
.withTimestampAssigner(
|
|
|
- (SerializableTimestampAssigner<PojoSpan>) (element, recordTimestamp) -> element.Timestamp.getTime()
|
|
|
- ).withIdleness(Duration.ofSeconds(jobConfig.getWatermarkIdlenessSeconds()));
|
|
|
+ (SerializableTimestampAssigner<PojoSpan>) (element, recordTimestamp) -> recordTimestamp
|
|
|
+ ).withIdleness(
|
|
|
+ Duration.ofSeconds(jobConfig.getWatermarkIdlenessSeconds())
|
|
|
+ );
|
|
|
if (enablePbSource) {
|
|
|
OasConfig.KafkaSourceConfig pbKafkaSourceConfig = oasConfig.getPbSource();
|
|
|
DeserializationSchema<ExportTraceServiceRequest> pbDeserializationSchema = new ExportTraceServiceRequestSchema();
|