Ver Fonte

Make kafka library and javaagent instrumentations more similar (#9738)

Lauri Tulmin há 1 ano atrás
pai
commit
5a2f52978f
26 ficheiros alterados com 933 adições e 298 exclusões
  1. 6 0
      instrumentation/kafka/kafka-clients/kafka-clients-0.11/bootstrap/src/main/java/io/opentelemetry/javaagent/bootstrap/kafka/KafkaClientsConsumerProcessTracing.java
  2. 14 3
      instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/ConsumerRecordsInstrumentation.java
  3. 0 47
      instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/TracingIterable.java
  4. 20 0
      instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/build.gradle.kts
  5. 65 14
      instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/KafkaTelemetry.java
  6. 23 6
      instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/KafkaTelemetryBuilder.java
  7. 20 3
      instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/TracingConsumerInterceptor.java
  8. 76 0
      instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/AbstractInterceptorsTest.java
  9. 82 0
      instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/AbstractWrapperTest.java
  10. 66 0
      instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/InterceptorsSuppressReceiveSpansTest.java
  11. 74 99
      instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/InterceptorsTest.java
  12. 110 0
      instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/WrapperSuppressReceiveSpansTest.java
  13. 69 82
      instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/WrapperTest.java
  14. 7 4
      instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/KafkaConsumerContext.java
  15. 54 12
      instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/KafkaConsumerContextUtil.java
  16. 9 4
      instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/KafkaInstrumenterFactory.java
  17. 3 1
      instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/KafkaProcessRequest.java
  18. 3 1
      instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/KafkaReceiveRequest.java
  19. 62 0
      instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/TracingIterable.java
  20. 25 13
      instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/TracingIterator.java
  21. 19 8
      instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/TracingList.java
  22. 1 1
      javaagent-tooling/src/main/java/io/opentelemetry/javaagent/tooling/field/RuntimeFieldBasedImplementationSupplier.java
  23. 36 0
      testing-common/integration-tests/src/main/java/field/VirtualFieldTestHelper.java
  24. 59 0
      testing-common/integration-tests/src/main/java/field/VirtualFieldTestInstrumentationModule.java
  25. 22 0
      testing-common/integration-tests/src/test/java/field/VirtualFieldTest.java
  26. 8 0
      testing-common/library-for-integration-tests/src/main/java/library/VirtualFieldTestClass.java

+ 6 - 0
instrumentation/kafka/kafka-clients/kafka-clients-0.11/bootstrap/src/main/java/io/opentelemetry/javaagent/bootstrap/kafka/KafkaClientsConsumerProcessTracing.java

@@ -5,6 +5,8 @@
 
 package io.opentelemetry.javaagent.bootstrap.kafka;
 
+import java.util.function.BooleanSupplier;
+
 // Classes used by multiple instrumentations should be in a bootstrap module to ensure that all
 // instrumentations see the same class. Helper classes are injected into each class loader that
 // contains an instrumentation that uses them, so instrumentations in different class loaders will
@@ -23,4 +25,8 @@ public final class KafkaClientsConsumerProcessTracing {
   public static boolean wrappingEnabled() {
     return wrappingEnabled.get();
   }
+
+  public static BooleanSupplier wrappingEnabledSupplier() {
+    return KafkaClientsConsumerProcessTracing::wrappingEnabled;
+  }
 }

+ 14 - 3
instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/ConsumerRecordsInstrumentation.java

@@ -5,6 +5,8 @@
 
 package io.opentelemetry.javaagent.instrumentation.kafkaclients.v0_11;
 
+import static io.opentelemetry.javaagent.bootstrap.kafka.KafkaClientsConsumerProcessTracing.wrappingEnabledSupplier;
+import static io.opentelemetry.javaagent.instrumentation.kafkaclients.v0_11.KafkaSingletons.consumerProcessInstrumenter;
 import static net.bytebuddy.matcher.ElementMatchers.isMethod;
 import static net.bytebuddy.matcher.ElementMatchers.isPublic;
 import static net.bytebuddy.matcher.ElementMatchers.named;
@@ -14,6 +16,9 @@ import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
 
 import io.opentelemetry.instrumentation.kafka.internal.KafkaConsumerContext;
 import io.opentelemetry.instrumentation.kafka.internal.KafkaConsumerContextUtil;
+import io.opentelemetry.instrumentation.kafka.internal.TracingIterable;
+import io.opentelemetry.instrumentation.kafka.internal.TracingIterator;
+import io.opentelemetry.instrumentation.kafka.internal.TracingList;
 import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
 import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
 import java.util.Iterator;
@@ -70,7 +75,9 @@ public class ConsumerRecordsInstrumentation implements TypeInstrumentation {
       // case it's important to overwrite the leaked span instead of suppressing the correct span
       // (https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/1947)
       KafkaConsumerContext consumerContext = KafkaConsumerContextUtil.get(records);
-      iterable = TracingIterable.wrap(iterable, consumerContext);
+      iterable =
+          TracingIterable.wrap(
+              iterable, consumerProcessInstrumenter(), wrappingEnabledSupplier(), consumerContext);
     }
   }
 
@@ -88,7 +95,9 @@ public class ConsumerRecordsInstrumentation implements TypeInstrumentation {
       // case it's important to overwrite the leaked span instead of suppressing the correct span
       // (https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/1947)
       KafkaConsumerContext consumerContext = KafkaConsumerContextUtil.get(records);
-      list = TracingList.wrap(list, consumerContext);
+      list =
+          TracingList.wrap(
+              list, consumerProcessInstrumenter(), wrappingEnabledSupplier(), consumerContext);
     }
   }
 
@@ -106,7 +115,9 @@ public class ConsumerRecordsInstrumentation implements TypeInstrumentation {
       // case it's important to overwrite the leaked span instead of suppressing the correct span
       // (https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/1947)
       KafkaConsumerContext consumerContext = KafkaConsumerContextUtil.get(records);
-      iterator = TracingIterator.wrap(iterator, consumerContext);
+      iterator =
+          TracingIterator.wrap(
+              iterator, consumerProcessInstrumenter(), wrappingEnabledSupplier(), consumerContext);
     }
   }
 }

+ 0 - 47
instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/TracingIterable.java

@@ -1,47 +0,0 @@
-/*
- * Copyright The OpenTelemetry Authors
- * SPDX-License-Identifier: Apache-2.0
- */
-
-package io.opentelemetry.javaagent.instrumentation.kafkaclients.v0_11;
-
-import io.opentelemetry.instrumentation.kafka.internal.KafkaConsumerContext;
-import io.opentelemetry.javaagent.bootstrap.kafka.KafkaClientsConsumerProcessTracing;
-import java.util.Iterator;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-
-public class TracingIterable<K, V> implements Iterable<ConsumerRecord<K, V>> {
-  private final Iterable<ConsumerRecord<K, V>> delegate;
-  private final KafkaConsumerContext consumerContext;
-  private boolean firstIterator = true;
-
-  protected TracingIterable(
-      Iterable<ConsumerRecord<K, V>> delegate, KafkaConsumerContext consumerContext) {
-    this.delegate = delegate;
-    this.consumerContext = consumerContext;
-  }
-
-  public static <K, V> Iterable<ConsumerRecord<K, V>> wrap(
-      Iterable<ConsumerRecord<K, V>> delegate, KafkaConsumerContext consumerContext) {
-    if (KafkaClientsConsumerProcessTracing.wrappingEnabled()) {
-      return new TracingIterable<>(delegate, consumerContext);
-    }
-    return delegate;
-  }
-
-  @Override
-  public Iterator<ConsumerRecord<K, V>> iterator() {
-    Iterator<ConsumerRecord<K, V>> it;
-    // We should only return one iterator with tracing.
-    // However, this is not thread-safe, but usually the first (hopefully only) traversal of
-    // ConsumerRecords is performed in the same thread that called poll()
-    if (firstIterator) {
-      it = TracingIterator.wrap(delegate.iterator(), consumerContext);
-      firstIterator = false;
-    } else {
-      it = delegate.iterator();
-    }
-
-    return it;
-  }
-}

+ 20 - 0
instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/build.gradle.kts

@@ -20,4 +20,24 @@ tasks {
   withType<Test>().configureEach {
     usesService(gradle.sharedServices.registrations["testcontainersBuildService"].service)
   }
+
+  val testReceiveSpansDisabled by registering(Test::class) {
+    filter {
+      includeTestsMatching("InterceptorsSuppressReceiveSpansTest")
+      includeTestsMatching("WrapperSuppressReceiveSpansTest")
+    }
+    include("**/InterceptorsSuppressReceiveSpansTest.*", "**/WrapperSuppressReceiveSpansTest.*")
+  }
+
+  test {
+    filter {
+      excludeTestsMatching("InterceptorsSuppressReceiveSpansTest")
+      excludeTestsMatching("WrapperSuppressReceiveSpansTest")
+    }
+    jvmArgs("-Dotel.instrumentation.messaging.experimental.receive-telemetry.enabled=true")
+  }
+
+  check {
+    dependsOn(testReceiveSpansDisabled)
+  }
 }

+ 65 - 14
instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/KafkaTelemetry.java

@@ -13,16 +13,24 @@ import io.opentelemetry.context.Scope;
 import io.opentelemetry.context.propagation.TextMapPropagator;
 import io.opentelemetry.context.propagation.TextMapSetter;
 import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
+import io.opentelemetry.instrumentation.api.internal.InstrumenterUtil;
+import io.opentelemetry.instrumentation.api.internal.Timer;
+import io.opentelemetry.instrumentation.kafka.internal.KafkaConsumerContext;
+import io.opentelemetry.instrumentation.kafka.internal.KafkaConsumerContextUtil;
 import io.opentelemetry.instrumentation.kafka.internal.KafkaHeadersSetter;
 import io.opentelemetry.instrumentation.kafka.internal.KafkaProcessRequest;
 import io.opentelemetry.instrumentation.kafka.internal.KafkaProducerRequest;
+import io.opentelemetry.instrumentation.kafka.internal.KafkaReceiveRequest;
 import io.opentelemetry.instrumentation.kafka.internal.KafkaUtil;
 import io.opentelemetry.instrumentation.kafka.internal.OpenTelemetryMetricsReporter;
 import io.opentelemetry.instrumentation.kafka.internal.OpenTelemetrySupplier;
+import io.opentelemetry.instrumentation.kafka.internal.TracingList;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Proxy;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Future;
 import java.util.function.BiFunction;
@@ -37,6 +45,7 @@ import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.common.metrics.MetricsReporter;
 
@@ -47,16 +56,19 @@ public final class KafkaTelemetry {
 
   private final OpenTelemetry openTelemetry;
   private final Instrumenter<KafkaProducerRequest, RecordMetadata> producerInstrumenter;
+  private final Instrumenter<KafkaReceiveRequest, Void> consumerReceiveInstrumenter;
   private final Instrumenter<KafkaProcessRequest, Void> consumerProcessInstrumenter;
   private final boolean producerPropagationEnabled;
 
   KafkaTelemetry(
       OpenTelemetry openTelemetry,
       Instrumenter<KafkaProducerRequest, RecordMetadata> producerInstrumenter,
+      Instrumenter<KafkaReceiveRequest, Void> consumerReceiveInstrumenter,
       Instrumenter<KafkaProcessRequest, Void> consumerProcessInstrumenter,
       boolean producerPropagationEnabled) {
     this.openTelemetry = openTelemetry;
     this.producerInstrumenter = producerInstrumenter;
+    this.consumerReceiveInstrumenter = consumerReceiveInstrumenter;
     this.consumerProcessInstrumenter = consumerProcessInstrumenter;
     this.producerPropagationEnabled = producerPropagationEnabled;
   }
@@ -115,6 +127,7 @@ public final class KafkaTelemetry {
             new Class<?>[] {Consumer.class},
             (proxy, method, args) -> {
               Object result;
+              Timer timer = "poll".equals(method.getName()) ? Timer.start() : null;
               try {
                 result = method.invoke(consumer, args);
               } catch (InvocationTargetException exception) {
@@ -123,12 +136,36 @@ public final class KafkaTelemetry {
               // ConsumerRecords<K, V> poll(long timeout)
               // ConsumerRecords<K, V> poll(Duration duration)
               if ("poll".equals(method.getName()) && result instanceof ConsumerRecords) {
-                buildAndFinishSpan((ConsumerRecords) result, consumer);
+                ConsumerRecords<K, V> consumerRecords = (ConsumerRecords<K, V>) result;
+                Context receiveContext = buildAndFinishSpan(consumerRecords, consumer, timer);
+                if (receiveContext == null) {
+                  receiveContext = Context.current();
+                }
+                KafkaConsumerContext consumerContext =
+                    KafkaConsumerContextUtil.create(receiveContext, consumer);
+                result = addTracing(consumerRecords, consumerContext);
               }
               return result;
             });
   }
 
+  <K, V> ConsumerRecords<K, V> addTracing(
+      ConsumerRecords<K, V> consumerRecords, KafkaConsumerContext consumerContext) {
+    if (consumerRecords.isEmpty()) {
+      return consumerRecords;
+    }
+
+    Map<TopicPartition, List<ConsumerRecord<K, V>>> records = new LinkedHashMap<>();
+    for (TopicPartition partition : consumerRecords.partitions()) {
+      List<ConsumerRecord<K, V>> list = consumerRecords.records(partition);
+      if (list != null && !list.isEmpty()) {
+        list = TracingList.wrap(list, consumerProcessInstrumenter, () -> true, consumerContext);
+      }
+      records.put(partition, list);
+    }
+    return new ConsumerRecords<>(records);
+  }
+
   /**
    * Produces a set of kafka client config properties (consumer or producer) to register a {@link
    * MetricsReporter} that records metrics to an {@code openTelemetry} instance. Add these resulting
@@ -221,23 +258,37 @@ public final class KafkaTelemetry {
     }
   }
 
-  private <K, V> void buildAndFinishSpan(ConsumerRecords<K, V> records, Consumer<K, V> consumer) {
-    buildAndFinishSpan(
-        records, KafkaUtil.getConsumerGroup(consumer), KafkaUtil.getClientId(consumer));
+  private <K, V> Context buildAndFinishSpan(
+      ConsumerRecords<K, V> records, Consumer<K, V> consumer, Timer timer) {
+    return buildAndFinishSpan(
+        records, KafkaUtil.getConsumerGroup(consumer), KafkaUtil.getClientId(consumer), timer);
   }
 
-  <K, V> void buildAndFinishSpan(
-      ConsumerRecords<K, V> records, String consumerGroup, String clientId) {
+  <K, V> Context buildAndFinishSpan(
+      ConsumerRecords<K, V> records, String consumerGroup, String clientId, Timer timer) {
+    if (records.isEmpty()) {
+      return null;
+    }
     Context parentContext = Context.current();
-    for (ConsumerRecord<K, V> record : records) {
-      KafkaProcessRequest request = KafkaProcessRequest.create(record, consumerGroup, clientId);
-      if (!consumerProcessInstrumenter.shouldStart(parentContext, request)) {
-        continue;
-      }
-
-      Context context = consumerProcessInstrumenter.start(parentContext, request);
-      consumerProcessInstrumenter.end(context, request, null, null);
+    KafkaReceiveRequest request = KafkaReceiveRequest.create(records, consumerGroup, clientId);
+    Context context = null;
+    if (consumerReceiveInstrumenter.shouldStart(parentContext, request)) {
+      context =
+          InstrumenterUtil.startAndEnd(
+              consumerReceiveInstrumenter,
+              parentContext,
+              request,
+              null,
+              null,
+              timer.startTime(),
+              timer.now());
     }
+
+    // we're returning the context of the receive span so that process spans can use it as
+    // parent context even though the span has ended
+    // this is the suggested behavior according to the spec batch receive scenario:
+    // https://github.com/open-telemetry/semantic-conventions/blob/main/docs/messaging/messaging-spans.md#batch-receiving
+    return context;
   }
 
   private class ProducerCallback implements Callback {

+ 23 - 6
instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/KafkaTelemetryBuilder.java

@@ -10,10 +10,10 @@ import static java.util.Collections.emptyList;
 import com.google.errorprone.annotations.CanIgnoreReturnValue;
 import io.opentelemetry.api.OpenTelemetry;
 import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor;
-import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation;
 import io.opentelemetry.instrumentation.kafka.internal.KafkaInstrumenterFactory;
 import io.opentelemetry.instrumentation.kafka.internal.KafkaProcessRequest;
 import io.opentelemetry.instrumentation.kafka.internal.KafkaProducerRequest;
+import io.opentelemetry.instrumentation.kafka.internal.KafkaReceiveRequest;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Objects;
@@ -25,8 +25,10 @@ public final class KafkaTelemetryBuilder {
   private final OpenTelemetry openTelemetry;
   private final List<AttributesExtractor<KafkaProducerRequest, RecordMetadata>>
       producerAttributesExtractors = new ArrayList<>();
-  private final List<AttributesExtractor<KafkaProcessRequest, Void>> consumerAttributesExtractors =
-      new ArrayList<>();
+  private final List<AttributesExtractor<KafkaProcessRequest, Void>>
+      consumerProcessAttributesExtractors = new ArrayList<>();
+  private final List<AttributesExtractor<KafkaReceiveRequest, Void>>
+      consumerReceiveAttributesExtractors = new ArrayList<>();
   private List<String> capturedHeaders = emptyList();
   private boolean captureExperimentalSpanAttributes = false;
   private boolean propagationEnabled = true;
@@ -43,10 +45,25 @@ public final class KafkaTelemetryBuilder {
     return this;
   }
 
+  /** Use {@link #addConsumerProcessAttributesExtractors(AttributesExtractor)} instead. */
+  @Deprecated
   @CanIgnoreReturnValue
   public KafkaTelemetryBuilder addConsumerAttributesExtractors(
       AttributesExtractor<KafkaProcessRequest, Void> extractor) {
-    consumerAttributesExtractors.add(extractor);
+    return addConsumerProcessAttributesExtractors(extractor);
+  }
+
+  @CanIgnoreReturnValue
+  public KafkaTelemetryBuilder addConsumerProcessAttributesExtractors(
+      AttributesExtractor<KafkaProcessRequest, Void> extractor) {
+    consumerProcessAttributesExtractors.add(extractor);
+    return this;
+  }
+
+  @CanIgnoreReturnValue
+  public KafkaTelemetryBuilder addConsumerReceiveAttributesExtractors(
+      AttributesExtractor<KafkaReceiveRequest, Void> extractor) {
+    consumerReceiveAttributesExtractors.add(extractor);
     return this;
   }
 
@@ -109,8 +126,8 @@ public final class KafkaTelemetryBuilder {
     return new KafkaTelemetry(
         openTelemetry,
         instrumenterFactory.createProducerInstrumenter(producerAttributesExtractors),
-        instrumenterFactory.createConsumerOperationInstrumenter(
-            MessageOperation.RECEIVE, consumerAttributesExtractors),
+        instrumenterFactory.createConsumerReceiveInstrumenter(consumerReceiveAttributesExtractors),
+        instrumenterFactory.createConsumerProcessInstrumenter(consumerProcessAttributesExtractors),
         propagationEnabled);
   }
 }

+ 20 - 3
instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/TracingConsumerInterceptor.java

@@ -7,6 +7,11 @@ package io.opentelemetry.instrumentation.kafkaclients.v2_6;
 
 import com.google.errorprone.annotations.CanIgnoreReturnValue;
 import io.opentelemetry.api.GlobalOpenTelemetry;
+import io.opentelemetry.context.Context;
+import io.opentelemetry.instrumentation.api.internal.ConfigPropertiesUtil;
+import io.opentelemetry.instrumentation.api.internal.Timer;
+import io.opentelemetry.instrumentation.kafka.internal.KafkaConsumerContext;
+import io.opentelemetry.instrumentation.kafka.internal.KafkaConsumerContextUtil;
 import java.util.Map;
 import java.util.Objects;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
@@ -22,7 +27,12 @@ import org.apache.kafka.common.TopicPartition;
  */
 public class TracingConsumerInterceptor<K, V> implements ConsumerInterceptor<K, V> {
 
-  private static final KafkaTelemetry telemetry = KafkaTelemetry.create(GlobalOpenTelemetry.get());
+  private static final KafkaTelemetry telemetry =
+      KafkaTelemetry.builder(GlobalOpenTelemetry.get())
+          .setMessagingReceiveInstrumentationEnabled(
+              ConfigPropertiesUtil.getBoolean(
+                  "otel.instrumentation.messaging.experimental.receive-telemetry.enabled", false))
+          .build();
 
   private String consumerGroup;
   private String clientId;
@@ -30,8 +40,15 @@ public class TracingConsumerInterceptor<K, V> implements ConsumerInterceptor<K,
   @Override
   @CanIgnoreReturnValue
   public ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records) {
-    telemetry.buildAndFinishSpan(records, consumerGroup, clientId);
-    return records;
+    // timer should be started before fetching ConsumerRecords, but there is no callback for that
+    Timer timer = Timer.start();
+    Context receiveContext = telemetry.buildAndFinishSpan(records, consumerGroup, clientId, timer);
+    if (receiveContext == null) {
+      receiveContext = Context.current();
+    }
+    KafkaConsumerContext consumerContext =
+        KafkaConsumerContextUtil.create(receiveContext, consumerGroup, clientId);
+    return telemetry.addTracing(records, consumerContext);
   }
 
   @Override

+ 76 - 0
instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/AbstractInterceptorsTest.java

@@ -0,0 +1,76 @@
+/*
+ * Copyright The OpenTelemetry Authors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package io.opentelemetry.instrumentation.kafkaclients.v2_6;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import io.opentelemetry.instrumentation.kafka.internal.KafkaClientBaseTest;
+import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
+import io.opentelemetry.instrumentation.testing.junit.LibraryInstrumentationExtension;
+import java.time.Duration;
+import java.util.Map;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+abstract class AbstractInterceptorsTest extends KafkaClientBaseTest {
+
+  @RegisterExtension
+  static final InstrumentationExtension testing = LibraryInstrumentationExtension.create();
+
+  static final String greeting = "Hello Kafka!";
+
+  @Override
+  public Map<String, Object> producerProps() {
+    Map<String, Object> props = super.producerProps();
+    props.put(
+        ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingProducerInterceptor.class.getName());
+    return props;
+  }
+
+  @Override
+  public Map<String, Object> consumerProps() {
+    Map<String, Object> props = super.consumerProps();
+    props.put(
+        ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingConsumerInterceptor.class.getName());
+    return props;
+  }
+
+  @Test
+  void testInterceptors() throws InterruptedException {
+    testing.runWithSpan(
+        "parent",
+        () -> {
+          producer.send(
+              new ProducerRecord<>(SHARED_TOPIC, greeting),
+              (meta, ex) -> {
+                if (ex == null) {
+                  testing.runWithSpan("producer callback", () -> {});
+                } else {
+                  testing.runWithSpan("producer exception: " + ex, () -> {});
+                }
+              });
+        });
+
+    awaitUntilConsumerIsReady();
+    // check that the message was received
+    ConsumerRecords<?, ?> records = consumer.poll(Duration.ofSeconds(5));
+    assertThat(records.count()).isEqualTo(1);
+    for (ConsumerRecord<?, ?> record : records) {
+      assertThat(record.value()).isEqualTo(greeting);
+      assertThat(record.key()).isNull();
+      testing.runWithSpan("process child", () -> {});
+    }
+
+    assertTraces();
+  }
+
+  abstract void assertTraces();
+}

+ 82 - 0
instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/AbstractWrapperTest.java

@@ -0,0 +1,82 @@
+/*
+ * Copyright The OpenTelemetry Authors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package io.opentelemetry.instrumentation.kafkaclients.v2_6;
+
+import static java.util.Collections.singletonList;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import io.opentelemetry.instrumentation.kafka.internal.KafkaClientBaseTest;
+import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
+import io.opentelemetry.instrumentation.testing.junit.LibraryInstrumentationExtension;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+abstract class AbstractWrapperTest extends KafkaClientBaseTest {
+
+  @RegisterExtension
+  static final InstrumentationExtension testing = LibraryInstrumentationExtension.create();
+
+  static final String greeting = "Hello Kafka!";
+
+  @ParameterizedTest
+  @ValueSource(booleans = {true, false})
+  void testWrappers(boolean testHeaders) throws InterruptedException {
+    KafkaTelemetryBuilder telemetryBuilder =
+        KafkaTelemetry.builder(testing.getOpenTelemetry())
+            .setCapturedHeaders(singletonList("test-message-header"))
+            // TODO run tests both with and without experimental span attributes
+            .setCaptureExperimentalSpanAttributes(true);
+    configure(telemetryBuilder);
+    KafkaTelemetry telemetry = telemetryBuilder.build();
+
+    Producer<Integer, String> wrappedProducer = telemetry.wrap(producer);
+
+    testing.runWithSpan(
+        "parent",
+        () -> {
+          ProducerRecord<Integer, String> producerRecord =
+              new ProducerRecord<>(SHARED_TOPIC, greeting);
+          if (testHeaders) {
+            producerRecord
+                .headers()
+                .add("test-message-header", "test".getBytes(StandardCharsets.UTF_8));
+          }
+          wrappedProducer.send(
+              producerRecord,
+              (meta, ex) -> {
+                if (ex == null) {
+                  testing.runWithSpan("producer callback", () -> {});
+                } else {
+                  testing.runWithSpan("producer exception: " + ex, () -> {});
+                }
+              });
+        });
+
+    awaitUntilConsumerIsReady();
+    Consumer<Integer, String> wrappedConsumer = telemetry.wrap(consumer);
+    ConsumerRecords<?, ?> records = wrappedConsumer.poll(Duration.ofSeconds(10));
+    assertThat(records.count()).isEqualTo(1);
+    for (ConsumerRecord<?, ?> record : records) {
+      assertThat(record.value()).isEqualTo(greeting);
+      assertThat(record.key()).isNull();
+      testing.runWithSpan("process child", () -> {});
+    }
+
+    assertTraces(testHeaders);
+  }
+
+  abstract void configure(KafkaTelemetryBuilder builder);
+
+  abstract void assertTraces(boolean testHeaders);
+}

+ 66 - 0
instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/InterceptorsSuppressReceiveSpansTest.java

@@ -0,0 +1,66 @@
+/*
+ * Copyright The OpenTelemetry Authors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package io.opentelemetry.instrumentation.kafkaclients.v2_6;
+
+import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo;
+import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.satisfies;
+
+import io.opentelemetry.api.trace.SpanKind;
+import io.opentelemetry.semconv.SemanticAttributes;
+import java.nio.charset.StandardCharsets;
+import org.assertj.core.api.AbstractLongAssert;
+
+class InterceptorsSuppressReceiveSpansTest extends AbstractInterceptorsTest {
+
+  @Override
+  void assertTraces() {
+    testing.waitAndAssertTraces(
+        trace ->
+            trace.hasSpansSatisfyingExactly(
+                span -> span.hasName("parent").hasKind(SpanKind.INTERNAL).hasNoParent(),
+                span ->
+                    span.hasName(SHARED_TOPIC + " publish")
+                        .hasKind(SpanKind.PRODUCER)
+                        .hasParent(trace.getSpan(0))
+                        .hasAttributesSatisfyingExactly(
+                            equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"),
+                            equalTo(SemanticAttributes.MESSAGING_DESTINATION_NAME, SHARED_TOPIC),
+                            satisfies(
+                                SemanticAttributes.MESSAGING_CLIENT_ID,
+                                stringAssert -> stringAssert.startsWith("producer"))),
+                span ->
+                    span.hasName(SHARED_TOPIC + " process")
+                        .hasKind(SpanKind.CONSUMER)
+                        .hasParent(trace.getSpan(1))
+                        .hasAttributesSatisfyingExactly(
+                            equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"),
+                            equalTo(SemanticAttributes.MESSAGING_DESTINATION_NAME, SHARED_TOPIC),
+                            equalTo(SemanticAttributes.MESSAGING_OPERATION, "process"),
+                            equalTo(
+                                SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES,
+                                greeting.getBytes(StandardCharsets.UTF_8).length),
+                            satisfies(
+                                SemanticAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION,
+                                AbstractLongAssert::isNotNegative),
+                            satisfies(
+                                SemanticAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET,
+                                AbstractLongAssert::isNotNegative),
+                            equalTo(SemanticAttributes.MESSAGING_KAFKA_CONSUMER_GROUP, "test"),
+                            satisfies(
+                                SemanticAttributes.MESSAGING_CLIENT_ID,
+                                stringAssert -> stringAssert.startsWith("consumer"))),
+                span ->
+                    span.hasName("process child")
+                        .hasKind(SpanKind.INTERNAL)
+                        .hasParent(trace.getSpan(2))),
+        // ideally we'd want producer callback to be part of the main trace, we just aren't able to
+        // instrument that
+        trace ->
+            trace.hasSpansSatisfyingExactly(
+                span ->
+                    span.hasName("producer callback").hasKind(SpanKind.INTERNAL).hasNoParent()));
+  }
+}

+ 74 - 99
instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/InterceptorsTest.java

@@ -5,118 +5,93 @@
 
 package io.opentelemetry.instrumentation.kafkaclients.v2_6;
 
+import static io.opentelemetry.instrumentation.testing.util.TelemetryDataUtil.orderByRootSpanName;
 import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo;
 import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.satisfies;
 import static org.assertj.core.api.Assertions.assertThat;
 
+import io.opentelemetry.api.trace.SpanContext;
 import io.opentelemetry.api.trace.SpanKind;
-import io.opentelemetry.instrumentation.kafka.internal.KafkaClientBaseTest;
-import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
-import io.opentelemetry.instrumentation.testing.junit.LibraryInstrumentationExtension;
+import io.opentelemetry.sdk.trace.data.LinkData;
 import io.opentelemetry.semconv.SemanticAttributes;
 import java.nio.charset.StandardCharsets;
-import java.time.Duration;
-import java.util.Map;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.clients.producer.ProducerRecord;
+import java.util.concurrent.atomic.AtomicReference;
 import org.assertj.core.api.AbstractLongAssert;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.RegisterExtension;
 
-class InterceptorsTest extends KafkaClientBaseTest {
-  @RegisterExtension
-  static final InstrumentationExtension testing = LibraryInstrumentationExtension.create();
+class InterceptorsTest extends AbstractInterceptorsTest {
 
   @Override
-  public Map<String, Object> producerProps() {
-    Map<String, Object> props = super.producerProps();
-    props.put(
-        ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingProducerInterceptor.class.getName());
-    return props;
-  }
-
-  @Override
-  public Map<String, Object> consumerProps() {
-    Map<String, Object> props = super.consumerProps();
-    props.put(
-        ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingConsumerInterceptor.class.getName());
-    return props;
-  }
-
-  @Test
-  void testInterceptors() throws InterruptedException {
-    String greeting = "Hello Kafka!";
-    testing.runWithSpan(
-        "parent",
-        () -> {
-          producer.send(
-              new ProducerRecord<>(SHARED_TOPIC, greeting),
-              (meta, ex) -> {
-                if (ex == null) {
-                  testing.runWithSpan("producer callback", () -> {});
-                } else {
-                  testing.runWithSpan("producer exception: " + ex, () -> {});
-                }
-              });
-        });
-
-    awaitUntilConsumerIsReady();
-    // check that the message was received
-    ConsumerRecords<?, ?> records = consumer.poll(Duration.ofSeconds(5));
-    assertThat(records.count()).isEqualTo(1);
-    for (ConsumerRecord<?, ?> record : records) {
-      assertThat(record.value()).isEqualTo(greeting);
-      assertThat(record.key()).isNull();
-    }
-
-    testing.waitAndAssertTraces(
+  void assertTraces() {
+    AtomicReference<SpanContext> producerSpanContext = new AtomicReference<>();
+    testing.waitAndAssertSortedTraces(
+        orderByRootSpanName("parent", SHARED_TOPIC + " receive", "producer callback"),
         trace -> {
           trace.hasSpansSatisfyingExactly(
-              span -> {
-                span.hasName("parent").hasKind(SpanKind.INTERNAL).hasNoParent();
-              },
-              span -> {
-                span.hasName(SHARED_TOPIC + " publish")
-                    .hasKind(SpanKind.PRODUCER)
-                    .hasParent(trace.getSpan(0))
-                    .hasAttributesSatisfyingExactly(
-                        equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"),
-                        equalTo(SemanticAttributes.MESSAGING_DESTINATION_NAME, SHARED_TOPIC),
-                        satisfies(
-                            SemanticAttributes.MESSAGING_CLIENT_ID,
-                            stringAssert -> stringAssert.startsWith("producer")));
-              },
-              span -> {
-                span.hasName(SHARED_TOPIC + " receive")
-                    .hasKind(SpanKind.CONSUMER)
-                    .hasParent(trace.getSpan(1))
-                    .hasAttributesSatisfyingExactly(
-                        equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"),
-                        equalTo(SemanticAttributes.MESSAGING_DESTINATION_NAME, SHARED_TOPIC),
-                        equalTo(SemanticAttributes.MESSAGING_OPERATION, "receive"),
-                        equalTo(
-                            SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES,
-                            greeting.getBytes(StandardCharsets.UTF_8).length),
-                        satisfies(
-                            SemanticAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION,
-                            AbstractLongAssert::isNotNegative),
-                        satisfies(
-                            SemanticAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET,
-                            AbstractLongAssert::isNotNegative),
-                        equalTo(SemanticAttributes.MESSAGING_KAFKA_CONSUMER_GROUP, "test"),
-                        satisfies(
-                            SemanticAttributes.MESSAGING_CLIENT_ID,
-                            stringAssert -> stringAssert.startsWith("consumer")));
-              });
+              span -> span.hasName("parent").hasKind(SpanKind.INTERNAL).hasNoParent(),
+              span ->
+                  span.hasName(SHARED_TOPIC + " publish")
+                      .hasKind(SpanKind.PRODUCER)
+                      .hasParent(trace.getSpan(0))
+                      .hasAttributesSatisfyingExactly(
+                          equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"),
+                          equalTo(SemanticAttributes.MESSAGING_DESTINATION_NAME, SHARED_TOPIC),
+                          satisfies(
+                              SemanticAttributes.MESSAGING_CLIENT_ID,
+                              stringAssert -> stringAssert.startsWith("producer"))));
+          SpanContext spanContext = trace.getSpan(1).getSpanContext();
+          producerSpanContext.set(
+              SpanContext.createFromRemoteParent(
+                  spanContext.getTraceId(),
+                  spanContext.getSpanId(),
+                  spanContext.getTraceFlags(),
+                  spanContext.getTraceState()));
         },
-        trace -> {
-          trace.hasSpansSatisfyingExactly(
-              span -> {
-                span.hasName("producer callback").hasKind(SpanKind.INTERNAL).hasNoParent();
-              });
-        });
+        trace ->
+            trace.hasSpansSatisfyingExactly(
+                span ->
+                    span.hasName(SHARED_TOPIC + " receive")
+                        .hasKind(SpanKind.CONSUMER)
+                        .hasNoParent()
+                        .hasLinksSatisfying(links -> assertThat(links).isEmpty())
+                        .hasAttributesSatisfyingExactly(
+                            equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"),
+                            equalTo(SemanticAttributes.MESSAGING_DESTINATION_NAME, SHARED_TOPIC),
+                            equalTo(SemanticAttributes.MESSAGING_OPERATION, "receive"),
+                            equalTo(SemanticAttributes.MESSAGING_KAFKA_CONSUMER_GROUP, "test"),
+                            satisfies(
+                                SemanticAttributes.MESSAGING_CLIENT_ID,
+                                stringAssert -> stringAssert.startsWith("consumer"))),
+                span ->
+                    span.hasName(SHARED_TOPIC + " process")
+                        .hasKind(SpanKind.CONSUMER)
+                        .hasParent(trace.getSpan(0))
+                        .hasLinks(LinkData.create(producerSpanContext.get()))
+                        .hasAttributesSatisfyingExactly(
+                            equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"),
+                            equalTo(SemanticAttributes.MESSAGING_DESTINATION_NAME, SHARED_TOPIC),
+                            equalTo(SemanticAttributes.MESSAGING_OPERATION, "process"),
+                            equalTo(
+                                SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES,
+                                greeting.getBytes(StandardCharsets.UTF_8).length),
+                            satisfies(
+                                SemanticAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION,
+                                AbstractLongAssert::isNotNegative),
+                            satisfies(
+                                SemanticAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET,
+                                AbstractLongAssert::isNotNegative),
+                            equalTo(SemanticAttributes.MESSAGING_KAFKA_CONSUMER_GROUP, "test"),
+                            satisfies(
+                                SemanticAttributes.MESSAGING_CLIENT_ID,
+                                stringAssert -> stringAssert.startsWith("consumer"))),
+                span ->
+                    span.hasName("process child")
+                        .hasKind(SpanKind.INTERNAL)
+                        .hasParent(trace.getSpan(1))),
+        // ideally we'd want producer callback to be part of the main trace, we just aren't able to
+        // instrument that
+        trace ->
+            trace.hasSpansSatisfyingExactly(
+                span ->
+                    span.hasName("producer callback").hasKind(SpanKind.INTERNAL).hasNoParent()));
   }
 }

+ 110 - 0
instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/WrapperSuppressReceiveSpansTest.java

@@ -0,0 +1,110 @@
+/*
+ * Copyright The OpenTelemetry Authors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package io.opentelemetry.instrumentation.kafkaclients.v2_6;
+
+import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo;
+import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.satisfies;
+
+import io.opentelemetry.api.common.AttributeKey;
+import io.opentelemetry.api.trace.SpanKind;
+import io.opentelemetry.sdk.testing.assertj.AttributeAssertion;
+import io.opentelemetry.semconv.SemanticAttributes;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import org.assertj.core.api.AbstractLongAssert;
+
+class WrapperSuppressReceiveSpansTest extends AbstractWrapperTest {
+
+  @Override
+  void configure(KafkaTelemetryBuilder builder) {
+    builder.setMessagingReceiveInstrumentationEnabled(false);
+  }
+
+  @Override
+  void assertTraces(boolean testHeaders) {
+    testing.waitAndAssertTraces(
+        trace ->
+            trace.hasSpansSatisfyingExactly(
+                span -> span.hasName("parent").hasKind(SpanKind.INTERNAL).hasNoParent(),
+                span ->
+                    span.hasName(SHARED_TOPIC + " publish")
+                        .hasKind(SpanKind.PRODUCER)
+                        .hasParent(trace.getSpan(0))
+                        .hasAttributesSatisfyingExactly(sendAttributes(testHeaders)),
+                span ->
+                    span.hasName(SHARED_TOPIC + " process")
+                        .hasKind(SpanKind.CONSUMER)
+                        .hasParent(trace.getSpan(1))
+                        .hasAttributesSatisfyingExactly(processAttributes(greeting, testHeaders)),
+                span ->
+                    span.hasName("process child")
+                        .hasKind(SpanKind.INTERNAL)
+                        .hasParent(trace.getSpan(2)),
+                span ->
+                    span.hasName("producer callback")
+                        .hasKind(SpanKind.INTERNAL)
+                        .hasParent(trace.getSpan(0))));
+  }
+
+  protected static List<AttributeAssertion> sendAttributes(boolean testHeaders) {
+    List<AttributeAssertion> assertions =
+        new ArrayList<>(
+            Arrays.asList(
+                equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"),
+                equalTo(SemanticAttributes.MESSAGING_DESTINATION_NAME, SHARED_TOPIC),
+                satisfies(
+                    SemanticAttributes.MESSAGING_CLIENT_ID,
+                    stringAssert -> stringAssert.startsWith("producer")),
+                satisfies(
+                    SemanticAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION,
+                    AbstractLongAssert::isNotNegative),
+                satisfies(
+                    SemanticAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET,
+                    AbstractLongAssert::isNotNegative)));
+    if (testHeaders) {
+      assertions.add(
+          equalTo(
+              AttributeKey.stringArrayKey("messaging.header.test_message_header"),
+              Collections.singletonList("test")));
+    }
+    return assertions;
+  }
+
+  private static List<AttributeAssertion> processAttributes(String greeting, boolean testHeaders) {
+    List<AttributeAssertion> assertions =
+        new ArrayList<>(
+            Arrays.asList(
+                equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"),
+                equalTo(SemanticAttributes.MESSAGING_DESTINATION_NAME, SHARED_TOPIC),
+                equalTo(SemanticAttributes.MESSAGING_OPERATION, "process"),
+                equalTo(
+                    SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES,
+                    greeting.getBytes(StandardCharsets.UTF_8).length),
+                satisfies(
+                    SemanticAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION,
+                    AbstractLongAssert::isNotNegative),
+                satisfies(
+                    SemanticAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET,
+                    AbstractLongAssert::isNotNegative),
+                satisfies(
+                    AttributeKey.longKey("kafka.record.queue_time_ms"),
+                    AbstractLongAssert::isNotNegative),
+                equalTo(SemanticAttributes.MESSAGING_KAFKA_CONSUMER_GROUP, "test"),
+                satisfies(
+                    SemanticAttributes.MESSAGING_CLIENT_ID,
+                    stringAssert -> stringAssert.startsWith("consumer"))));
+    if (testHeaders) {
+      assertions.add(
+          equalTo(
+              AttributeKey.stringArrayKey("messaging.header.test_message_header"),
+              Collections.singletonList("test")));
+    }
+    return assertions;
+  }
+}

+ 69 - 82
instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/WrapperTest.java

@@ -7,104 +7,71 @@ package io.opentelemetry.instrumentation.kafkaclients.v2_6;
 
 import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo;
 import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.satisfies;
-import static java.util.Collections.singletonList;
 import static org.assertj.core.api.Assertions.assertThat;
 
 import io.opentelemetry.api.common.AttributeKey;
+import io.opentelemetry.api.trace.SpanContext;
 import io.opentelemetry.api.trace.SpanKind;
-import io.opentelemetry.instrumentation.kafka.internal.KafkaClientBaseTest;
-import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
-import io.opentelemetry.instrumentation.testing.junit.LibraryInstrumentationExtension;
 import io.opentelemetry.sdk.testing.assertj.AttributeAssertion;
+import io.opentelemetry.sdk.trace.data.LinkData;
 import io.opentelemetry.semconv.SemanticAttributes;
 import java.nio.charset.StandardCharsets;
-import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
-import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.producer.Producer;
-import org.apache.kafka.clients.producer.ProducerRecord;
+import java.util.concurrent.atomic.AtomicReference;
 import org.assertj.core.api.AbstractLongAssert;
-import org.junit.jupiter.api.extension.RegisterExtension;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.ValueSource;
 
-class WrapperTest extends KafkaClientBaseTest {
+class WrapperTest extends AbstractWrapperTest {
 
-  @RegisterExtension
-  static final InstrumentationExtension testing = LibraryInstrumentationExtension.create();
-
-  @ParameterizedTest
-  @ValueSource(booleans = {true, false})
-  void testWrappers(boolean testHeaders) throws InterruptedException {
-    KafkaTelemetry telemetry =
-        KafkaTelemetry.builder(testing.getOpenTelemetry())
-            .setCapturedHeaders(singletonList("test-message-header"))
-            // TODO run tests both with and without experimental span attributes
-            .setCaptureExperimentalSpanAttributes(true)
-            .build();
-
-    String greeting = "Hello Kafka!";
-    Producer<Integer, String> wrappedProducer = telemetry.wrap(producer);
-
-    testing.runWithSpan(
-        "parent",
-        () -> {
-          ProducerRecord<Integer, String> producerRecord =
-              new ProducerRecord<>(SHARED_TOPIC, greeting);
-          if (testHeaders) {
-            producerRecord
-                .headers()
-                .add("test-message-header", "test".getBytes(StandardCharsets.UTF_8));
-          }
-          wrappedProducer.send(
-              producerRecord,
-              (meta, ex) -> {
-                if (ex == null) {
-                  testing.runWithSpan("producer callback", () -> {});
-                } else {
-                  testing.runWithSpan("producer exception: " + ex, () -> {});
-                }
-              });
-        });
-
-    awaitUntilConsumerIsReady();
-    Consumer<Integer, String> wrappedConsumer = telemetry.wrap(consumer);
-    ConsumerRecords<?, ?> records = wrappedConsumer.poll(Duration.ofSeconds(10));
-    assertThat(records.count()).isEqualTo(1);
-    for (ConsumerRecord<?, ?> record : records) {
-      assertThat(record.value()).isEqualTo(greeting);
-      assertThat(record.key()).isNull();
-    }
+  @Override
+  void configure(KafkaTelemetryBuilder builder) {
+    builder.setMessagingReceiveInstrumentationEnabled(true);
+  }
 
+  @Override
+  void assertTraces(boolean testHeaders) {
+    AtomicReference<SpanContext> producerSpanContext = new AtomicReference<>();
     testing.waitAndAssertTraces(
         trace -> {
           trace.hasSpansSatisfyingExactly(
-              span -> {
-                span.hasName("parent").hasKind(SpanKind.INTERNAL).hasNoParent();
-              },
-              span -> {
-                span.hasName(SHARED_TOPIC + " publish")
-                    .hasKind(SpanKind.PRODUCER)
-                    .hasParent(trace.getSpan(0))
-                    .hasAttributesSatisfyingExactly(sendAttributes(testHeaders));
-              },
-              span -> {
-                span.hasName(SHARED_TOPIC + " receive")
-                    .hasKind(SpanKind.CONSUMER)
-                    .hasParent(trace.getSpan(1))
-                    .hasAttributesSatisfyingExactly(receiveAttributes(greeting, testHeaders));
-              },
-              span -> {
-                span.hasName("producer callback")
-                    .hasKind(SpanKind.INTERNAL)
-                    .hasParent(trace.getSpan(0));
-              });
-        });
+              span -> span.hasName("parent").hasKind(SpanKind.INTERNAL).hasNoParent(),
+              span ->
+                  span.hasName(SHARED_TOPIC + " publish")
+                      .hasKind(SpanKind.PRODUCER)
+                      .hasParent(trace.getSpan(0))
+                      .hasAttributesSatisfyingExactly(sendAttributes(testHeaders)),
+              span ->
+                  span.hasName("producer callback")
+                      .hasKind(SpanKind.INTERNAL)
+                      .hasParent(trace.getSpan(0)));
+          SpanContext spanContext = trace.getSpan(1).getSpanContext();
+          producerSpanContext.set(
+              SpanContext.createFromRemoteParent(
+                  spanContext.getTraceId(),
+                  spanContext.getSpanId(),
+                  spanContext.getTraceFlags(),
+                  spanContext.getTraceState()));
+        },
+        trace ->
+            trace.hasSpansSatisfyingExactly(
+                span ->
+                    span.hasName(SHARED_TOPIC + " receive")
+                        .hasKind(SpanKind.CONSUMER)
+                        .hasNoParent()
+                        .hasLinksSatisfying(links -> assertThat(links).isEmpty())
+                        .hasAttributesSatisfyingExactly(receiveAttributes(testHeaders)),
+                span ->
+                    span.hasName(SHARED_TOPIC + " process")
+                        .hasKind(SpanKind.CONSUMER)
+                        .hasParent(trace.getSpan(0))
+                        .hasLinks(LinkData.create(producerSpanContext.get()))
+                        .hasAttributesSatisfyingExactly(processAttributes(greeting, testHeaders)),
+                span ->
+                    span.hasName("process child")
+                        .hasKind(SpanKind.INTERNAL)
+                        .hasParent(trace.getSpan(1))));
   }
 
   protected static List<AttributeAssertion> sendAttributes(boolean testHeaders) {
@@ -131,13 +98,13 @@ class WrapperTest extends KafkaClientBaseTest {
     return assertions;
   }
 
-  private static List<AttributeAssertion> receiveAttributes(String greeting, boolean testHeaders) {
+  private static List<AttributeAssertion> processAttributes(String greeting, boolean testHeaders) {
     List<AttributeAssertion> assertions =
         new ArrayList<>(
             Arrays.asList(
                 equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"),
                 equalTo(SemanticAttributes.MESSAGING_DESTINATION_NAME, SHARED_TOPIC),
-                equalTo(SemanticAttributes.MESSAGING_OPERATION, "receive"),
+                equalTo(SemanticAttributes.MESSAGING_OPERATION, "process"),
                 equalTo(
                     SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES,
                     greeting.getBytes(StandardCharsets.UTF_8).length),
@@ -162,4 +129,24 @@ class WrapperTest extends KafkaClientBaseTest {
     }
     return assertions;
   }
+
+  protected static List<AttributeAssertion> receiveAttributes(boolean testHeaders) {
+    List<AttributeAssertion> assertions =
+        new ArrayList<>(
+            Arrays.asList(
+                equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"),
+                equalTo(SemanticAttributes.MESSAGING_DESTINATION_NAME, SHARED_TOPIC),
+                equalTo(SemanticAttributes.MESSAGING_OPERATION, "receive"),
+                equalTo(SemanticAttributes.MESSAGING_KAFKA_CONSUMER_GROUP, "test"),
+                satisfies(
+                    SemanticAttributes.MESSAGING_CLIENT_ID,
+                    stringAssert -> stringAssert.startsWith("consumer"))));
+    if (testHeaders) {
+      assertions.add(
+          equalTo(
+              AttributeKey.stringArrayKey("messaging.header.test_message_header"),
+              Collections.singletonList("test")));
+    }
+    return assertions;
+  }
 }

+ 7 - 4
instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/KafkaConsumerContext.java

@@ -8,7 +8,6 @@ package io.opentelemetry.instrumentation.kafka.internal;
 import com.google.auto.value.AutoValue;
 import io.opentelemetry.context.Context;
 import javax.annotation.Nullable;
-import org.apache.kafka.clients.consumer.Consumer;
 
 /**
  * This class is internal and is hence not for public use. Its APIs are unstable and can change at
@@ -17,13 +16,17 @@ import org.apache.kafka.clients.consumer.Consumer;
 @AutoValue
 public abstract class KafkaConsumerContext {
 
-  static KafkaConsumerContext create(@Nullable Context context, @Nullable Consumer<?, ?> consumer) {
-    return new AutoValue_KafkaConsumerContext(context, consumer);
+  static KafkaConsumerContext create(
+      @Nullable Context context, @Nullable String consumerGroup, @Nullable String clientId) {
+    return new AutoValue_KafkaConsumerContext(context, consumerGroup, clientId);
   }
 
   @Nullable
   public abstract Context getContext();
 
   @Nullable
-  abstract Consumer<?, ?> getConsumer();
+  abstract String getConsumerGroup();
+
+  @Nullable
+  abstract String getClientId();
 }

+ 54 - 12
instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/KafkaConsumerContextUtil.java

@@ -16,44 +16,86 @@ import org.apache.kafka.clients.consumer.ConsumerRecords;
  * any time.
  */
 public final class KafkaConsumerContextUtil {
+  // these fields can be used for multiple instrumentations because of that we don't use a helper
+  // class as field type
   private static final VirtualField<ConsumerRecord<?, ?>, Context> recordContextField =
       VirtualField.find(ConsumerRecord.class, Context.class);
-  private static final VirtualField<ConsumerRecord<?, ?>, Consumer<?, ?>> recordConsumerField =
-      VirtualField.find(ConsumerRecord.class, Consumer.class);
+  private static final VirtualField<ConsumerRecord<?, ?>, String[]> recordConsumerInfoField =
+      VirtualField.find(ConsumerRecord.class, String[].class);
   private static final VirtualField<ConsumerRecords<?, ?>, Context> recordsContextField =
       VirtualField.find(ConsumerRecords.class, Context.class);
-  private static final VirtualField<ConsumerRecords<?, ?>, Consumer<?, ?>> recordsConsumerField =
-      VirtualField.find(ConsumerRecords.class, Consumer.class);
+  private static final VirtualField<ConsumerRecords<?, ?>, String[]> recordsConsumerInfoField =
+      VirtualField.find(ConsumerRecords.class, String[].class);
 
   public static KafkaConsumerContext get(ConsumerRecord<?, ?> records) {
     Context receiveContext = recordContextField.get(records);
-    Consumer<?, ?> consumer = recordConsumerField.get(records);
-    return KafkaConsumerContext.create(receiveContext, consumer);
+    String consumerGroup = null;
+    String clientId = null;
+    String[] consumerInfo = recordConsumerInfoField.get(records);
+    if (consumerInfo != null) {
+      consumerGroup = consumerInfo[0];
+      clientId = consumerInfo[1];
+    }
+    return create(receiveContext, consumerGroup, clientId);
   }
 
   public static KafkaConsumerContext get(ConsumerRecords<?, ?> records) {
     Context receiveContext = recordsContextField.get(records);
-    Consumer<?, ?> consumer = recordsConsumerField.get(records);
-    return KafkaConsumerContext.create(receiveContext, consumer);
+    String consumerGroup = null;
+    String clientId = null;
+    String[] consumerInfo = recordsConsumerInfoField.get(records);
+    if (consumerInfo != null) {
+      consumerGroup = consumerInfo[0];
+      clientId = consumerInfo[1];
+    }
+    return create(receiveContext, consumerGroup, clientId);
+  }
+
+  public static KafkaConsumerContext create(Context context, Consumer<?, ?> consumer) {
+    return create(context, KafkaUtil.getConsumerGroup(consumer), KafkaUtil.getClientId(consumer));
+  }
+
+  public static KafkaConsumerContext create(
+      Context context, String consumerGroup, String clientId) {
+    return KafkaConsumerContext.create(context, consumerGroup, clientId);
   }
 
   public static void set(ConsumerRecord<?, ?> record, Context context, Consumer<?, ?> consumer) {
     recordContextField.set(record, context);
-    recordConsumerField.set(record, consumer);
+    String consumerGroup = KafkaUtil.getConsumerGroup(consumer);
+    String clientId = KafkaUtil.getClientId(consumer);
+    set(record, context, consumerGroup, clientId);
   }
 
   public static void set(ConsumerRecord<?, ?> record, KafkaConsumerContext consumerContext) {
-    set(record, consumerContext.getContext(), consumerContext.getConsumer());
+    set(
+        record,
+        consumerContext.getContext(),
+        consumerContext.getConsumerGroup(),
+        consumerContext.getClientId());
+  }
+
+  public static void set(
+      ConsumerRecord<?, ?> record, Context context, String consumerGroup, String clientId) {
+    recordContextField.set(record, context);
+    recordConsumerInfoField.set(record, new String[] {consumerGroup, clientId});
   }
 
   public static void set(ConsumerRecords<?, ?> records, Context context, Consumer<?, ?> consumer) {
+    String consumerGroup = KafkaUtil.getConsumerGroup(consumer);
+    String clientId = KafkaUtil.getClientId(consumer);
+    set(records, context, consumerGroup, clientId);
+  }
+
+  public static void set(
+      ConsumerRecords<?, ?> records, Context context, String consumerGroup, String clientId) {
     recordsContextField.set(records, context);
-    recordsConsumerField.set(records, consumer);
+    recordsConsumerInfoField.set(records, new String[] {consumerGroup, clientId});
   }
 
   public static void copy(ConsumerRecord<?, ?> from, ConsumerRecord<?, ?> to) {
     recordContextField.set(to, recordContextField.get(from));
-    recordConsumerField.set(to, recordConsumerField.get(from));
+    recordConsumerInfoField.set(to, recordConsumerInfoField.get(from));
   }
 
   private KafkaConsumerContextUtil() {}

+ 9 - 4
instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/KafkaInstrumenterFactory.java

@@ -101,6 +101,11 @@ public final class KafkaInstrumenterFactory {
   }
 
   public Instrumenter<KafkaReceiveRequest, Void> createConsumerReceiveInstrumenter() {
+    return createConsumerReceiveInstrumenter(Collections.emptyList());
+  }
+
+  public Instrumenter<KafkaReceiveRequest, Void> createConsumerReceiveInstrumenter(
+      Iterable<AttributesExtractor<KafkaReceiveRequest, Void>> extractors) {
     KafkaReceiveAttributesGetter getter = KafkaReceiveAttributesGetter.INSTANCE;
     MessageOperation operation = MessageOperation.RECEIVE;
 
@@ -111,20 +116,20 @@ public final class KafkaInstrumenterFactory {
         .addAttributesExtractor(
             buildMessagingAttributesExtractor(getter, operation, capturedHeaders))
         .addAttributesExtractor(KafkaReceiveAttributesExtractor.INSTANCE)
+        .addAttributesExtractors(extractors)
         .setErrorCauseExtractor(errorCauseExtractor)
         .setEnabled(messagingReceiveInstrumentationEnabled)
         .buildInstrumenter(SpanKindExtractor.alwaysConsumer());
   }
 
   public Instrumenter<KafkaProcessRequest, Void> createConsumerProcessInstrumenter() {
-    return createConsumerOperationInstrumenter(MessageOperation.PROCESS, Collections.emptyList());
+    return createConsumerProcessInstrumenter(Collections.emptyList());
   }
 
-  public Instrumenter<KafkaProcessRequest, Void> createConsumerOperationInstrumenter(
-      MessageOperation operation,
+  public Instrumenter<KafkaProcessRequest, Void> createConsumerProcessInstrumenter(
       Iterable<AttributesExtractor<KafkaProcessRequest, Void>> extractors) {
-
     KafkaConsumerAttributesGetter getter = KafkaConsumerAttributesGetter.INSTANCE;
+    MessageOperation operation = MessageOperation.PROCESS;
 
     InstrumenterBuilder<KafkaProcessRequest, Void> builder =
         Instrumenter.<KafkaProcessRequest, Void>builder(

+ 3 - 1
instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/KafkaProcessRequest.java

@@ -22,7 +22,9 @@ public class KafkaProcessRequest extends AbstractKafkaConsumerRequest {
 
   public static KafkaProcessRequest create(
       KafkaConsumerContext consumerContext, ConsumerRecord<?, ?> record) {
-    return create(record, consumerContext != null ? consumerContext.getConsumer() : null);
+    String consumerGroup = consumerContext != null ? consumerContext.getConsumerGroup() : null;
+    String clientId = consumerContext != null ? consumerContext.getClientId() : null;
+    return create(record, consumerGroup, clientId);
   }
 
   public static KafkaProcessRequest create(

+ 3 - 1
instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/KafkaReceiveRequest.java

@@ -24,7 +24,9 @@ public class KafkaReceiveRequest extends AbstractKafkaConsumerRequest {
 
   public static KafkaReceiveRequest create(
       KafkaConsumerContext consumerContext, ConsumerRecords<?, ?> records) {
-    return create(records, consumerContext != null ? consumerContext.getConsumer() : null);
+    String consumerGroup = consumerContext != null ? consumerContext.getConsumerGroup() : null;
+    String clientId = consumerContext != null ? consumerContext.getClientId() : null;
+    return create(records, consumerGroup, clientId);
   }
 
   public static KafkaReceiveRequest create(

+ 62 - 0
instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/TracingIterable.java

@@ -0,0 +1,62 @@
+/*
+ * Copyright The OpenTelemetry Authors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package io.opentelemetry.instrumentation.kafka.internal;
+
+import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
+import java.util.Iterator;
+import java.util.function.BooleanSupplier;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+
+/**
+ * This class is internal and is hence not for public use. Its APIs are unstable and can change at
+ * any time.
+ */
+public class TracingIterable<K, V> implements Iterable<ConsumerRecord<K, V>> {
+  private final Iterable<ConsumerRecord<K, V>> delegate;
+  private final Instrumenter<KafkaProcessRequest, Void> instrumenter;
+  private final BooleanSupplier wrappingEnabled;
+  private final KafkaConsumerContext consumerContext;
+  private boolean firstIterator = true;
+
+  protected TracingIterable(
+      Iterable<ConsumerRecord<K, V>> delegate,
+      Instrumenter<KafkaProcessRequest, Void> instrumenter,
+      BooleanSupplier wrappingEnabled,
+      KafkaConsumerContext consumerContext) {
+    this.delegate = delegate;
+    this.instrumenter = instrumenter;
+    this.wrappingEnabled = wrappingEnabled;
+    this.consumerContext = consumerContext;
+  }
+
+  public static <K, V> Iterable<ConsumerRecord<K, V>> wrap(
+      Iterable<ConsumerRecord<K, V>> delegate,
+      Instrumenter<KafkaProcessRequest, Void> instrumenter,
+      BooleanSupplier wrappingEnabled,
+      KafkaConsumerContext consumerContext) {
+    if (wrappingEnabled.getAsBoolean()) {
+      return new TracingIterable<>(delegate, instrumenter, wrappingEnabled, consumerContext);
+    }
+    return delegate;
+  }
+
+  @Override
+  public Iterator<ConsumerRecord<K, V>> iterator() {
+    Iterator<ConsumerRecord<K, V>> it;
+    // We should only return one iterator with tracing.
+    // However, this is not thread-safe, but usually the first (hopefully only) traversal of
+    // ConsumerRecords is performed in the same thread that called poll()
+    if (firstIterator) {
+      it =
+          TracingIterator.wrap(delegate.iterator(), instrumenter, wrappingEnabled, consumerContext);
+      firstIterator = false;
+    } else {
+      it = delegate.iterator();
+    }
+
+    return it;
+  }
+}

+ 25 - 13
instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/TracingIterator.java → instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/TracingIterator.java

@@ -3,22 +3,25 @@
  * SPDX-License-Identifier: Apache-2.0
  */
 
-package io.opentelemetry.javaagent.instrumentation.kafkaclients.v0_11;
-
-import static io.opentelemetry.javaagent.instrumentation.kafkaclients.v0_11.KafkaSingletons.consumerProcessInstrumenter;
+package io.opentelemetry.instrumentation.kafka.internal;
 
 import io.opentelemetry.context.Context;
 import io.opentelemetry.context.Scope;
-import io.opentelemetry.instrumentation.kafka.internal.KafkaConsumerContext;
-import io.opentelemetry.instrumentation.kafka.internal.KafkaProcessRequest;
-import io.opentelemetry.javaagent.bootstrap.kafka.KafkaClientsConsumerProcessTracing;
+import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
 import java.util.Iterator;
+import java.util.function.BooleanSupplier;
 import javax.annotation.Nullable;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 
+/**
+ * This class is internal and is hence not for public use. Its APIs are unstable and can change at
+ * any time.
+ */
 public class TracingIterator<K, V> implements Iterator<ConsumerRecord<K, V>> {
 
   private final Iterator<ConsumerRecord<K, V>> delegateIterator;
+  private final Instrumenter<KafkaProcessRequest, Void> instrumenter;
+  private final BooleanSupplier wrappingEnabled;
   private final Context parentContext;
   private final KafkaConsumerContext consumerContext;
 
@@ -31,8 +34,13 @@ public class TracingIterator<K, V> implements Iterator<ConsumerRecord<K, V>> {
   @Nullable private Scope currentScope;
 
   private TracingIterator(
-      Iterator<ConsumerRecord<K, V>> delegateIterator, KafkaConsumerContext consumerContext) {
+      Iterator<ConsumerRecord<K, V>> delegateIterator,
+      Instrumenter<KafkaProcessRequest, Void> instrumenter,
+      BooleanSupplier wrappingEnabled,
+      KafkaConsumerContext consumerContext) {
     this.delegateIterator = delegateIterator;
+    this.instrumenter = instrumenter;
+    this.wrappingEnabled = wrappingEnabled;
 
     Context receiveContext = consumerContext.getContext();
     // use the receive CONSUMER as parent if it's available
@@ -41,9 +49,13 @@ public class TracingIterator<K, V> implements Iterator<ConsumerRecord<K, V>> {
   }
 
   public static <K, V> Iterator<ConsumerRecord<K, V>> wrap(
-      Iterator<ConsumerRecord<K, V>> delegateIterator, KafkaConsumerContext consumerContext) {
-    if (KafkaClientsConsumerProcessTracing.wrappingEnabled()) {
-      return new TracingIterator<>(delegateIterator, consumerContext);
+      Iterator<ConsumerRecord<K, V>> delegateIterator,
+      Instrumenter<KafkaProcessRequest, Void> instrumenter,
+      BooleanSupplier wrappingEnabled,
+      KafkaConsumerContext consumerContext) {
+    if (wrappingEnabled.getAsBoolean()) {
+      return new TracingIterator<>(
+          delegateIterator, instrumenter, wrappingEnabled, consumerContext);
     }
     return delegateIterator;
   }
@@ -65,9 +77,9 @@ public class TracingIterator<K, V> implements Iterator<ConsumerRecord<K, V>> {
     // suppressing the correct span
     // (https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/1947)
     ConsumerRecord<K, V> next = delegateIterator.next();
-    if (next != null && KafkaClientsConsumerProcessTracing.wrappingEnabled()) {
+    if (next != null && wrappingEnabled.getAsBoolean()) {
       currentRequest = KafkaProcessRequest.create(consumerContext, next);
-      currentContext = consumerProcessInstrumenter().start(parentContext, currentRequest);
+      currentContext = instrumenter.start(parentContext, currentRequest);
       currentScope = currentContext.makeCurrent();
     }
     return next;
@@ -76,7 +88,7 @@ public class TracingIterator<K, V> implements Iterator<ConsumerRecord<K, V>> {
   private void closeScopeAndEndSpan() {
     if (currentScope != null) {
       currentScope.close();
-      consumerProcessInstrumenter().end(currentContext, currentRequest, null, null);
+      instrumenter.end(currentContext, currentRequest, null, null);
       currentScope = null;
       currentRequest = null;
       currentContext = null;

+ 19 - 8
instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/TracingList.java → instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/TracingList.java

@@ -3,27 +3,38 @@
  * SPDX-License-Identifier: Apache-2.0
  */
 
-package io.opentelemetry.javaagent.instrumentation.kafkaclients.v0_11;
+package io.opentelemetry.instrumentation.kafka.internal;
 
-import io.opentelemetry.instrumentation.kafka.internal.KafkaConsumerContext;
-import io.opentelemetry.javaagent.bootstrap.kafka.KafkaClientsConsumerProcessTracing;
+import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
 import java.util.Collection;
 import java.util.List;
 import java.util.ListIterator;
+import java.util.function.BooleanSupplier;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 
+/**
+ * This class is internal and is hence not for public use. Its APIs are unstable and can change at
+ * any time.
+ */
 public class TracingList<K, V> extends TracingIterable<K, V> implements List<ConsumerRecord<K, V>> {
   private final List<ConsumerRecord<K, V>> delegate;
 
-  private TracingList(List<ConsumerRecord<K, V>> delegate, KafkaConsumerContext consumerContext) {
-    super(delegate, consumerContext);
+  private TracingList(
+      List<ConsumerRecord<K, V>> delegate,
+      Instrumenter<KafkaProcessRequest, Void> instrumenter,
+      BooleanSupplier wrappingEnabled,
+      KafkaConsumerContext consumerContext) {
+    super(delegate, instrumenter, wrappingEnabled, consumerContext);
     this.delegate = delegate;
   }
 
   public static <K, V> List<ConsumerRecord<K, V>> wrap(
-      List<ConsumerRecord<K, V>> delegate, KafkaConsumerContext consumerContext) {
-    if (KafkaClientsConsumerProcessTracing.wrappingEnabled()) {
-      return new TracingList<>(delegate, consumerContext);
+      List<ConsumerRecord<K, V>> delegate,
+      Instrumenter<KafkaProcessRequest, Void> instrumenter,
+      BooleanSupplier wrappingEnabled,
+      KafkaConsumerContext consumerContext) {
+    if (wrappingEnabled.getAsBoolean()) {
+      return new TracingList<>(delegate, instrumenter, wrappingEnabled, consumerContext);
     }
     return delegate;
   }

+ 1 - 1
javaagent-tooling/src/main/java/io/opentelemetry/javaagent/tooling/field/RuntimeFieldBasedImplementationSupplier.java

@@ -20,7 +20,7 @@ final class RuntimeFieldBasedImplementationSupplier
       Class<T> type, Class<F> fieldType) {
     try {
       String virtualFieldImplClassName =
-          getVirtualFieldImplementationClassName(type.getName(), fieldType.getName());
+          getVirtualFieldImplementationClassName(type.getTypeName(), fieldType.getTypeName());
       Class<?> contextStoreClass = Class.forName(virtualFieldImplClassName, false, null);
       Method method = contextStoreClass.getMethod("getVirtualField", Class.class, Class.class);
       @SuppressWarnings("unchecked")

+ 36 - 0
testing-common/integration-tests/src/main/java/field/VirtualFieldTestHelper.java

@@ -0,0 +1,36 @@
+/*
+ * Copyright The OpenTelemetry Authors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package field;
+
+import io.opentelemetry.instrumentation.api.util.VirtualField;
+import library.VirtualFieldTestClass;
+
+public final class VirtualFieldTestHelper {
+
+  private VirtualFieldTestHelper() {}
+
+  public static void test() {
+    VirtualFieldTestClass instance = new VirtualFieldTestClass();
+    {
+      VirtualField<VirtualFieldTestClass, String> field =
+          VirtualField.find(VirtualFieldTestClass.class, String.class);
+      field.set(instance, "test");
+      field.get(instance);
+    }
+    {
+      VirtualField<VirtualFieldTestClass, String[]> field =
+          VirtualField.find(VirtualFieldTestClass.class, String[].class);
+      field.set(instance, new String[] {"test"});
+      field.get(instance);
+    }
+    {
+      VirtualField<VirtualFieldTestClass, String[][]> field =
+          VirtualField.find(VirtualFieldTestClass.class, String[][].class);
+      field.set(instance, new String[][] {new String[] {"test"}});
+      field.get(instance);
+    }
+  }
+}

+ 59 - 0
testing-common/integration-tests/src/main/java/field/VirtualFieldTestInstrumentationModule.java

@@ -0,0 +1,59 @@
+/*
+ * Copyright The OpenTelemetry Authors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package field;
+
+import static java.util.Collections.singletonList;
+import static net.bytebuddy.matcher.ElementMatchers.named;
+
+import com.google.auto.service.AutoService;
+import io.opentelemetry.javaagent.extension.instrumentation.InstrumentationModule;
+import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
+import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
+import java.util.List;
+import net.bytebuddy.asm.Advice;
+import net.bytebuddy.description.type.TypeDescription;
+import net.bytebuddy.matcher.ElementMatcher;
+
+@AutoService(InstrumentationModule.class)
+public class VirtualFieldTestInstrumentationModule extends InstrumentationModule {
+  public VirtualFieldTestInstrumentationModule() {
+    super("virtual-field-test");
+  }
+
+  @Override
+  public List<TypeInstrumentation> typeInstrumentations() {
+    return singletonList(new TestInstrumentation());
+  }
+
+  @Override
+  public boolean isHelperClass(String className) {
+    return className.startsWith("field.VirtualFieldTestHelper");
+  }
+
+  private static class TestInstrumentation implements TypeInstrumentation {
+
+    @Override
+    public ElementMatcher<TypeDescription> typeMatcher() {
+      return named("field.VirtualFieldTest");
+    }
+
+    @Override
+    public void transform(TypeTransformer transformer) {
+      transformer.applyAdviceToMethod(
+          named("virtualFieldTestMethod"),
+          VirtualFieldTestInstrumentationModule.class.getName() + "$TestAdvice");
+    }
+  }
+
+  @SuppressWarnings("unused")
+  public static class TestAdvice {
+    @Advice.OnMethodExit
+    public static void onExit(@Advice.Return(readOnly = false) boolean result) {
+      VirtualFieldTestHelper.test();
+      result = true;
+    }
+  }
+}

+ 22 - 0
testing-common/integration-tests/src/test/java/field/VirtualFieldTest.java

@@ -0,0 +1,22 @@
+/*
+ * Copyright The OpenTelemetry Authors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package field;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+class VirtualFieldTest {
+
+  @Test
+  void testVirtualFields() {
+    Assertions.assertTrue(virtualFieldTestMethod());
+  }
+
+  // instrumented by VirtualFieldTestInstrumentationModule
+  private static boolean virtualFieldTestMethod() {
+    return false;
+  }
+}

+ 8 - 0
testing-common/library-for-integration-tests/src/main/java/library/VirtualFieldTestClass.java

@@ -0,0 +1,8 @@
+/*
+ * Copyright The OpenTelemetry Authors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package library;
+
+public class VirtualFieldTestClass {}