Browse Source

Propagate original exception through kafka wrapper (#7452)

Resolves
https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/7450
Lauri Tulmin 2 years ago
parent
commit
3ca6b04a59

+ 12 - 2
instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/KafkaTelemetry.java

@@ -18,6 +18,7 @@ import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
 import io.opentelemetry.instrumentation.kafka.internal.KafkaConsumerRecordGetter;
 import io.opentelemetry.instrumentation.kafka.internal.KafkaHeadersSetter;
 import io.opentelemetry.instrumentation.kafka.internal.OpenTelemetryMetricsReporter;
+import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Proxy;
 import java.util.Collections;
 import java.util.HashMap;
@@ -99,7 +100,11 @@ public final class KafkaTelemetry {
                         : null;
                 return buildAndInjectSpan(record, callback, producer::send);
               }
-              return method.invoke(producer, args);
+              try {
+                return method.invoke(producer, args);
+              } catch (InvocationTargetException exception) {
+                throw exception.getCause();
+              }
             });
   }
 
@@ -111,7 +116,12 @@ public final class KafkaTelemetry {
             KafkaTelemetry.class.getClassLoader(),
             new Class<?>[] {Consumer.class},
             (proxy, method, args) -> {
-              Object result = method.invoke(consumer, args);
+              Object result;
+              try {
+                result = method.invoke(consumer, args);
+              } catch (InvocationTargetException exception) {
+                throw exception.getCause();
+              }
               // ConsumerRecords<K, V> poll(long timeout)
               // ConsumerRecords<K, V> poll(Duration duration)
               if ("poll".equals(method.getName()) && result instanceof ConsumerRecords) {

+ 46 - 0
instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/groovy/io/opentelemetry/instrumentation/kafkaclients/ExceptionHandlingTest.groovy

@@ -0,0 +1,46 @@
+/*
+ * Copyright The OpenTelemetry Authors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package io.opentelemetry.instrumentation.kafkaclients
+
+import io.opentelemetry.instrumentation.test.InstrumentationSpecification
+import io.opentelemetry.instrumentation.test.LibraryTestTrait
+import java.lang.reflect.Proxy
+import java.time.Duration
+import org.apache.kafka.clients.consumer.Consumer
+import org.apache.kafka.clients.producer.Producer
+
+class ExceptionHandlingTest extends InstrumentationSpecification implements LibraryTestTrait {
+
+  def "test consumer exception propagates to caller"() throws Exception {
+    setup:
+    def consumer = Proxy.newProxyInstance(ExceptionHandlingTest.getClassLoader(), new Class[] { Consumer }) { proxy, method, args ->
+      throw new IllegalStateException("can't invoke")
+    } as Consumer
+    KafkaTelemetry telemetry = KafkaTelemetry.builder(getOpenTelemetry())
+      .build()
+    def wrappedConsumer = telemetry.wrap(consumer)
+
+    when:
+    wrappedConsumer.poll(Duration.ofMillis(1))
+    then:
+    thrown IllegalStateException
+  }
+
+  def "test producer exception propagates to caller"() throws Exception {
+    setup:
+    def producer = Proxy.newProxyInstance(ExceptionHandlingTest.getClassLoader(), new Class[] { Producer }) { proxy, method, args ->
+      throw new IllegalStateException("can't invoke")
+    } as Producer
+    KafkaTelemetry telemetry = KafkaTelemetry.builder(getOpenTelemetry())
+      .build()
+    def wrappedProducer = telemetry.wrap(producer)
+
+    when:
+    wrappedProducer.flush()
+    then:
+    thrown IllegalStateException
+  }
+}