Selaa lähdekoodia

Fix InstrumentedRecordInterceptor closing the trace too early (#11471)

Ben 9 kuukautta sitten
vanhempi
säilyke
f602e193e5

+ 12 - 6
instrumentation/spring/spring-kafka-2.7/library/src/main/java/io/opentelemetry/instrumentation/spring/kafka/v2_7/InstrumentedBatchInterceptor.java

@@ -71,17 +71,23 @@ final class InstrumentedBatchInterceptor<K, V> implements BatchInterceptor<K, V>
 
   @Override
   public void success(ConsumerRecords<K, V> records, Consumer<K, V> consumer) {
-    end(records, null);
-    if (decorated != null) {
-      decorated.success(records, consumer);
+    try {
+      if (decorated != null) {
+        decorated.success(records, consumer);
+      }
+    } finally {
+      end(records, null);
     }
   }
 
   @Override
   public void failure(ConsumerRecords<K, V> records, Exception exception, Consumer<K, V> consumer) {
-    end(records, exception);
-    if (decorated != null) {
-      decorated.failure(records, exception, consumer);
+    try {
+      if (decorated != null) {
+        decorated.failure(records, exception, consumer);
+      }
+    } finally {
+      end(records, exception);
     }
   }
 

+ 12 - 6
instrumentation/spring/spring-kafka-2.7/library/src/main/java/io/opentelemetry/instrumentation/spring/kafka/v2_7/InstrumentedRecordInterceptor.java

@@ -69,17 +69,23 @@ final class InstrumentedRecordInterceptor<K, V> implements RecordInterceptor<K,
 
   @Override
   public void success(ConsumerRecord<K, V> record, Consumer<K, V> consumer) {
-    end(record, null);
-    if (decorated != null) {
-      decorated.success(record, consumer);
+    try {
+      if (decorated != null) {
+        decorated.success(record, consumer);
+      }
+    } finally {
+      end(record, null);
     }
   }
 
   @Override
   public void failure(ConsumerRecord<K, V> record, Exception exception, Consumer<K, V> consumer) {
-    end(record, exception);
-    if (decorated != null) {
-      decorated.failure(record, exception, consumer);
+    try {
+      if (decorated != null) {
+        decorated.failure(record, exception, consumer);
+      }
+    } finally {
+      end(record, exception);
     }
   }