Ver Fonte

Spring Kafka library instrumentation (#6283)

* Spring Kafka library instrumentation

* Merge and fix prior merge

Co-authored-by: Trask Stalnaker <trask.stalnaker@gmail.com>
Mateusz Rzeszutek há 2 anos atrás
pai
commit
5bc7abf178
18 ficheiros alterados com 638 adições e 327 exclusões
  1. 1 0
      instrumentation/spring/spring-kafka-2.7/javaagent/build.gradle.kts
  2. 15 18
      instrumentation/spring/spring-kafka-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/AbstractMessageListenerContainerInstrumentation.java
  3. 15 31
      instrumentation/spring/spring-kafka-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/SpringKafkaSingletons.java
  4. 0 26
      instrumentation/spring/spring-kafka-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/State.java
  5. 18 0
      instrumentation/spring/spring-kafka-2.7/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/SpringKafkaTest.java
  6. 15 195
      instrumentation/spring/spring-kafka-2.7/javaagent/src/testNoReceiveTelemetry/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/SpringKafkaNoReceiveTelemetryTest.java
  7. 21 0
      instrumentation/spring/spring-kafka-2.7/library/build.gradle.kts
  8. 16 15
      instrumentation/spring/spring-kafka-2.7/library/src/main/java/io/opentelemetry/instrumentation/spring/kafka/v2_7/InstrumentedBatchInterceptor.java
  9. 16 15
      instrumentation/spring/spring-kafka-2.7/library/src/main/java/io/opentelemetry/instrumentation/spring/kafka/v2_7/InstrumentedRecordInterceptor.java
  10. 1 1
      instrumentation/spring/spring-kafka-2.7/library/src/main/java/io/opentelemetry/instrumentation/spring/kafka/v2_7/SpringKafkaErrorCauseExtractor.java
  11. 84 0
      instrumentation/spring/spring-kafka-2.7/library/src/main/java/io/opentelemetry/instrumentation/spring/kafka/v2_7/SpringKafkaTelemetry.java
  12. 57 0
      instrumentation/spring/spring-kafka-2.7/library/src/main/java/io/opentelemetry/instrumentation/spring/kafka/v2_7/SpringKafkaTelemetryBuilder.java
  13. 26 0
      instrumentation/spring/spring-kafka-2.7/library/src/main/java/io/opentelemetry/instrumentation/spring/kafka/v2_7/State.java
  14. 57 0
      instrumentation/spring/spring-kafka-2.7/library/src/test/java/io/opentelemetry/instrumentation/spring/kafka/v2_7/SpringKafkaNoReceiveTelemetryTest.java
  15. 220 0
      instrumentation/spring/spring-kafka-2.7/testing/src/main/java/io/opentelemetry/testing/AbstractSpringKafkaNoReceiveTelemetryTest.java
  16. 60 24
      instrumentation/spring/spring-kafka-2.7/testing/src/main/java/io/opentelemetry/testing/AbstractSpringKafkaTest.java
  17. 15 2
      instrumentation/spring/spring-kafka-2.7/testing/src/main/java/io/opentelemetry/testing/ConsumerConfig.java
  18. 1 0
      settings.gradle.kts

+ 1 - 0
instrumentation/spring/spring-kafka-2.7/javaagent/build.gradle.kts

@@ -17,6 +17,7 @@ dependencies {
 
   bootstrap(project(":instrumentation:kafka:kafka-clients:kafka-clients-0.11:bootstrap"))
   implementation(project(":instrumentation:kafka:kafka-clients:kafka-clients-common:library"))
+  implementation(project(":instrumentation:spring:spring-kafka-2.7:library"))
 
   library("org.springframework.kafka:spring-kafka:2.7.0")
 

+ 15 - 18
instrumentation/spring/spring-kafka-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/AbstractMessageListenerContainerInstrumentation.java

@@ -5,20 +5,17 @@
 
 package io.opentelemetry.javaagent.instrumentation.spring.kafka;
 
+import static io.opentelemetry.javaagent.instrumentation.spring.kafka.SpringKafkaSingletons.telemetry;
 import static net.bytebuddy.matcher.ElementMatchers.isProtected;
 import static net.bytebuddy.matcher.ElementMatchers.named;
 import static net.bytebuddy.matcher.ElementMatchers.returns;
 import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
 
-import io.opentelemetry.context.Context;
-import io.opentelemetry.instrumentation.api.util.VirtualField;
 import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
 import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
 import net.bytebuddy.asm.Advice;
 import net.bytebuddy.description.type.TypeDescription;
 import net.bytebuddy.matcher.ElementMatcher;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.springframework.kafka.listener.BatchInterceptor;
 import org.springframework.kafka.listener.RecordInterceptor;
 
@@ -56,13 +53,13 @@ public class AbstractMessageListenerContainerInstrumentation implements TypeInst
     public static <K, V> void onExit(
         @Advice.Return(readOnly = false) BatchInterceptor<K, V> interceptor) {
 
-      if (!(interceptor instanceof InstrumentedBatchInterceptor)) {
-        VirtualField<ConsumerRecords<K, V>, Context> receiveContextField =
-            VirtualField.find(ConsumerRecords.class, Context.class);
-        VirtualField<ConsumerRecords<K, V>, State<ConsumerRecords<K, V>>> stateField =
-            VirtualField.find(ConsumerRecords.class, State.class);
-        interceptor =
-            new InstrumentedBatchInterceptor<>(receiveContextField, stateField, interceptor);
+      if (interceptor == null
+          || !interceptor
+              .getClass()
+              .getName()
+              .equals(
+                  "io.opentelemetry.instrumentation.spring.kafka.v2_7.InstrumentedBatchInterceptor")) {
+        interceptor = telemetry().createBatchInterceptor(interceptor);
       }
     }
   }
@@ -74,13 +71,13 @@ public class AbstractMessageListenerContainerInstrumentation implements TypeInst
     public static <K, V> void onExit(
         @Advice.Return(readOnly = false) RecordInterceptor<K, V> interceptor) {
 
-      if (!(interceptor instanceof InstrumentedRecordInterceptor)) {
-        VirtualField<ConsumerRecord<K, V>, Context> receiveContextField =
-            VirtualField.find(ConsumerRecord.class, Context.class);
-        VirtualField<ConsumerRecord<K, V>, State<ConsumerRecord<K, V>>> stateField =
-            VirtualField.find(ConsumerRecord.class, State.class);
-        interceptor =
-            new InstrumentedRecordInterceptor<>(receiveContextField, stateField, interceptor);
+      if (interceptor == null
+          || !interceptor
+              .getClass()
+              .getName()
+              .equals(
+                  "io.opentelemetry.instrumentation.spring.kafka.v2_7.InstrumentedRecordInterceptor")) {
+        interceptor = telemetry().createRecordInterceptor(interceptor);
       }
     }
   }

+ 15 - 31
instrumentation/spring/spring-kafka-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/SpringKafkaSingletons.java

@@ -6,42 +6,26 @@
 package io.opentelemetry.javaagent.instrumentation.spring.kafka;
 
 import io.opentelemetry.api.GlobalOpenTelemetry;
-import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
-import io.opentelemetry.instrumentation.kafka.internal.KafkaInstrumenterFactory;
+import io.opentelemetry.instrumentation.spring.kafka.v2_7.SpringKafkaTelemetry;
 import io.opentelemetry.javaagent.bootstrap.internal.ExperimentalConfig;
 import io.opentelemetry.javaagent.bootstrap.internal.InstrumentationConfig;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
 
 public final class SpringKafkaSingletons {
 
-  private static final String INSTRUMENTATION_NAME = "io.opentelemetry.spring-kafka-2.7";
-
-  private static final Instrumenter<ConsumerRecords<?, ?>, Void> BATCH_PROCESS_INSTRUMENTER;
-  private static final Instrumenter<ConsumerRecord<?, ?>, Void> PROCESS_INSTRUMENTER;
-
-  static {
-    KafkaInstrumenterFactory factory =
-        new KafkaInstrumenterFactory(GlobalOpenTelemetry.get(), INSTRUMENTATION_NAME)
-            .setCaptureExperimentalSpanAttributes(
-                InstrumentationConfig.get()
-                    .getBoolean("otel.instrumentation.kafka.experimental-span-attributes", false))
-            .setPropagationEnabled(
-                InstrumentationConfig.get()
-                    .getBoolean("otel.instrumentation.kafka.client-propagation.enabled", true))
-            .setMessagingReceiveInstrumentationEnabled(
-                ExperimentalConfig.get().messagingReceiveInstrumentationEnabled())
-            .setErrorCauseExtractor(SpringKafkaErrorCauseExtractor.INSTANCE);
-    BATCH_PROCESS_INSTRUMENTER = factory.createBatchProcessInstrumenter();
-    PROCESS_INSTRUMENTER = factory.createConsumerProcessInstrumenter();
-  }
-
-  public static Instrumenter<ConsumerRecords<?, ?>, Void> batchProcessInstrumenter() {
-    return BATCH_PROCESS_INSTRUMENTER;
-  }
-
-  public static Instrumenter<ConsumerRecord<?, ?>, Void> processInstrumenter() {
-    return PROCESS_INSTRUMENTER;
+  private static final SpringKafkaTelemetry TELEMETRY =
+      SpringKafkaTelemetry.builder(GlobalOpenTelemetry.get())
+          .setCaptureExperimentalSpanAttributes(
+              InstrumentationConfig.get()
+                  .getBoolean("otel.instrumentation.kafka.experimental-span-attributes", false))
+          .setPropagationEnabled(
+              InstrumentationConfig.get()
+                  .getBoolean("otel.instrumentation.kafka.client-propagation.enabled", true))
+          .setMessagingReceiveInstrumentationEnabled(
+              ExperimentalConfig.get().messagingReceiveInstrumentationEnabled())
+          .build();
+
+  public static SpringKafkaTelemetry telemetry() {
+    return TELEMETRY;
   }
 
   private SpringKafkaSingletons() {}

+ 0 - 26
instrumentation/spring/spring-kafka-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/State.java

@@ -1,26 +0,0 @@
-/*
- * Copyright The OpenTelemetry Authors
- * SPDX-License-Identifier: Apache-2.0
- */
-
-package io.opentelemetry.javaagent.instrumentation.spring.kafka;
-
-import com.google.auto.value.AutoValue;
-import io.opentelemetry.context.Context;
-import io.opentelemetry.context.Scope;
-
-@AutoValue
-public abstract class State<REQUEST> {
-
-  public static <REQUEST> State<REQUEST> create(REQUEST request, Context context, Scope scope) {
-    return new AutoValue_State<>(request, context, scope);
-  }
-
-  public abstract REQUEST request();
-
-  public abstract Context context();
-
-  public abstract Scope scope();
-
-  State() {}
-}

+ 18 - 0
instrumentation/spring/spring-kafka-2.7/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/SpringKafkaTest.java

@@ -9,21 +9,39 @@ import static io.opentelemetry.api.common.AttributeKey.longKey;
 import static io.opentelemetry.instrumentation.testing.util.TelemetryDataUtil.orderByRootSpanKind;
 import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo;
 import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.satisfies;
+import static java.util.Collections.emptyList;
 
 import io.opentelemetry.api.trace.SpanKind;
+import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension;
+import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
 import io.opentelemetry.sdk.trace.data.LinkData;
 import io.opentelemetry.sdk.trace.data.SpanData;
 import io.opentelemetry.sdk.trace.data.StatusData;
 import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
 import io.opentelemetry.testing.AbstractSpringKafkaTest;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicReference;
 import org.assertj.core.api.AbstractLongAssert;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
 
 class SpringKafkaTest extends AbstractSpringKafkaTest {
 
+  @RegisterExtension
+  protected static final InstrumentationExtension testing = AgentInstrumentationExtension.create();
+
+  @Override
+  protected InstrumentationExtension testing() {
+    return testing;
+  }
+
+  @Override
+  protected List<Class<?>> additionalSpringConfigs() {
+    return emptyList();
+  }
+
   @Test
   void shouldCreateSpansForSingleRecordProcess() {
     testing.runWithSpan(

+ 15 - 195
instrumentation/spring/spring-kafka-2.7/javaagent/src/testNoReceiveTelemetry/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/SpringKafkaNoReceiveTelemetryTest.java

@@ -5,206 +5,26 @@
 
 package io.opentelemetry.javaagent.instrumentation.spring.kafka;
 
-import static io.opentelemetry.instrumentation.testing.util.TelemetryDataUtil.orderByRootSpanKind;
-import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo;
-import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.satisfies;
+import static java.util.Collections.emptyList;
 
-import io.opentelemetry.api.trace.SpanKind;
-import io.opentelemetry.sdk.trace.data.LinkData;
-import io.opentelemetry.sdk.trace.data.SpanData;
-import io.opentelemetry.sdk.trace.data.StatusData;
-import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
-import io.opentelemetry.testing.AbstractSpringKafkaTest;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicReference;
-import org.assertj.core.api.AbstractLongAssert;
-import org.junit.jupiter.api.Test;
+import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension;
+import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
+import io.opentelemetry.testing.AbstractSpringKafkaNoReceiveTelemetryTest;
+import java.util.List;
+import org.junit.jupiter.api.extension.RegisterExtension;
 
-class SpringKafkaNoReceiveTelemetryTest extends AbstractSpringKafkaTest {
+class SpringKafkaNoReceiveTelemetryTest extends AbstractSpringKafkaNoReceiveTelemetryTest {
 
-  @Test
-  void shouldCreateSpansForSingleRecordProcess() {
-    testing.runWithSpan(
-        "producer",
-        () -> {
-          kafkaTemplate.executeInTransaction(
-              ops -> {
-                ops.send("testSingleTopic", "10", "testSpan");
-                return 0;
-              });
-        });
+  @RegisterExtension
+  protected static final InstrumentationExtension testing = AgentInstrumentationExtension.create();
 
-    testing.waitAndAssertTraces(
-        trace ->
-            trace.hasSpansSatisfyingExactly(
-                span -> span.hasName("producer"),
-                span ->
-                    span.hasName("testSingleTopic send")
-                        .hasKind(SpanKind.PRODUCER)
-                        .hasParent(trace.getSpan(0))
-                        .hasAttributesSatisfyingExactly(
-                            equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"),
-                            equalTo(SemanticAttributes.MESSAGING_DESTINATION, "testSingleTopic"),
-                            equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic")),
-                span ->
-                    span.hasName("testSingleTopic process")
-                        .hasKind(SpanKind.CONSUMER)
-                        .hasParent(trace.getSpan(1))
-                        .hasAttributesSatisfyingExactly(
-                            equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"),
-                            equalTo(SemanticAttributes.MESSAGING_DESTINATION, "testSingleTopic"),
-                            equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic"),
-                            equalTo(SemanticAttributes.MESSAGING_OPERATION, "process"),
-                            satisfies(
-                                SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES,
-                                AbstractLongAssert::isNotNegative),
-                            satisfies(
-                                SemanticAttributes.MESSAGING_KAFKA_PARTITION,
-                                AbstractLongAssert::isNotNegative)),
-                span -> span.hasName("consumer").hasParent(trace.getSpan(2))));
+  @Override
+  protected InstrumentationExtension testing() {
+    return testing;
   }
 
-  @Test
-  void shouldHandleFailureInKafkaListener() {
-    testing.runWithSpan(
-        "producer",
-        () -> {
-          kafkaTemplate.executeInTransaction(
-              ops -> {
-                ops.send("testSingleTopic", "10", "error");
-                return 0;
-              });
-        });
-
-    testing.waitAndAssertTraces(
-        trace ->
-            trace.hasSpansSatisfyingExactly(
-                span -> span.hasName("producer"),
-                span ->
-                    span.hasName("testSingleTopic send")
-                        .hasKind(SpanKind.PRODUCER)
-                        .hasParent(trace.getSpan(0))
-                        .hasAttributesSatisfyingExactly(
-                            equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"),
-                            equalTo(SemanticAttributes.MESSAGING_DESTINATION, "testSingleTopic"),
-                            equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic")),
-                span ->
-                    span.hasName("testSingleTopic process")
-                        .hasKind(SpanKind.CONSUMER)
-                        .hasParent(trace.getSpan(1))
-                        .hasStatus(StatusData.error())
-                        .hasException(new IllegalArgumentException("boom"))
-                        .hasAttributesSatisfyingExactly(
-                            equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"),
-                            equalTo(SemanticAttributes.MESSAGING_DESTINATION, "testSingleTopic"),
-                            equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic"),
-                            equalTo(SemanticAttributes.MESSAGING_OPERATION, "process"),
-                            satisfies(
-                                SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES,
-                                AbstractLongAssert::isNotNegative),
-                            satisfies(
-                                SemanticAttributes.MESSAGING_KAFKA_PARTITION,
-                                AbstractLongAssert::isNotNegative)),
-                span -> span.hasName("consumer").hasParent(trace.getSpan(2))));
-  }
-
-  @Test
-  void shouldCreateSpansForBatchReceiveAndProcess() throws InterruptedException {
-    Map<String, String> batchMessages = new HashMap<>();
-    batchMessages.put("10", "testSpan1");
-    batchMessages.put("20", "testSpan2");
-    sendBatchMessages(batchMessages);
-
-    AtomicReference<SpanData> producer1 = new AtomicReference<>();
-    AtomicReference<SpanData> producer2 = new AtomicReference<>();
-
-    testing.waitAndAssertSortedTraces(
-        orderByRootSpanKind(SpanKind.INTERNAL, SpanKind.CONSUMER),
-        trace -> {
-          trace.hasSpansSatisfyingExactly(
-              span -> span.hasName("producer"),
-              span ->
-                  span.hasName("testBatchTopic send")
-                      .hasKind(SpanKind.PRODUCER)
-                      .hasParent(trace.getSpan(0))
-                      .hasAttributesSatisfyingExactly(
-                          equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"),
-                          equalTo(SemanticAttributes.MESSAGING_DESTINATION, "testBatchTopic"),
-                          equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic")),
-              span ->
-                  span.hasName("testBatchTopic send")
-                      .hasKind(SpanKind.PRODUCER)
-                      .hasParent(trace.getSpan(0))
-                      .hasAttributesSatisfyingExactly(
-                          equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"),
-                          equalTo(SemanticAttributes.MESSAGING_DESTINATION, "testBatchTopic"),
-                          equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic")));
-
-          producer1.set(trace.getSpan(1));
-          producer2.set(trace.getSpan(2));
-        },
-        trace ->
-            trace.hasSpansSatisfyingExactly(
-                span ->
-                    span.hasName("testBatchTopic process")
-                        .hasKind(SpanKind.CONSUMER)
-                        .hasNoParent()
-                        .hasLinks(
-                            LinkData.create(producer1.get().getSpanContext()),
-                            LinkData.create(producer2.get().getSpanContext()))
-                        .hasAttributesSatisfyingExactly(
-                            equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"),
-                            equalTo(SemanticAttributes.MESSAGING_DESTINATION, "testBatchTopic"),
-                            equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic"),
-                            equalTo(SemanticAttributes.MESSAGING_OPERATION, "process")),
-                span -> span.hasName("consumer").hasParent(trace.getSpan(0))));
-  }
-
-  @Test
-  void shouldHandleFailureInKafkaBatchListener() {
-    testing.runWithSpan(
-        "producer",
-        () -> {
-          kafkaTemplate.executeInTransaction(
-              ops -> {
-                ops.send("testBatchTopic", "10", "error");
-                return 0;
-              });
-        });
-
-    AtomicReference<SpanData> producer = new AtomicReference<>();
-
-    testing.waitAndAssertSortedTraces(
-        orderByRootSpanKind(SpanKind.INTERNAL, SpanKind.CONSUMER),
-        trace -> {
-          trace.hasSpansSatisfyingExactly(
-              span -> span.hasName("producer"),
-              span ->
-                  span.hasName("testBatchTopic send")
-                      .hasKind(SpanKind.PRODUCER)
-                      .hasParent(trace.getSpan(0))
-                      .hasAttributesSatisfyingExactly(
-                          equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"),
-                          equalTo(SemanticAttributes.MESSAGING_DESTINATION, "testBatchTopic"),
-                          equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic")));
-
-          producer.set(trace.getSpan(1));
-        },
-        trace ->
-            trace.hasSpansSatisfyingExactly(
-                span ->
-                    span.hasName("testBatchTopic process")
-                        .hasKind(SpanKind.CONSUMER)
-                        .hasNoParent()
-                        .hasLinks(LinkData.create(producer.get().getSpanContext()))
-                        .hasStatus(StatusData.error())
-                        .hasException(new IllegalArgumentException("boom"))
-                        .hasAttributesSatisfyingExactly(
-                            equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"),
-                            equalTo(SemanticAttributes.MESSAGING_DESTINATION, "testBatchTopic"),
-                            equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic"),
-                            equalTo(SemanticAttributes.MESSAGING_OPERATION, "process")),
-                span -> span.hasName("consumer").hasParent(trace.getSpan(0))));
+  @Override
+  protected List<Class<?>> additionalSpringConfigs() {
+    return emptyList();
   }
 }

+ 21 - 0
instrumentation/spring/spring-kafka-2.7/library/build.gradle.kts

@@ -0,0 +1,21 @@
+plugins {
+  id("otel.library-instrumentation")
+}
+
+dependencies {
+  compileOnly("com.google.auto.value:auto-value-annotations")
+  annotationProcessor("com.google.auto.value:auto-value")
+
+  implementation(project(":instrumentation:kafka:kafka-clients:kafka-clients-common:library"))
+
+  compileOnly("org.springframework.kafka:spring-kafka:2.7.0")
+
+  testImplementation(project(":instrumentation:spring:spring-kafka-2.7:testing"))
+  testImplementation(project(":instrumentation:kafka:kafka-clients:kafka-clients-2.6:library"))
+
+  // 2.7.0 has a bug that makes decorating a Kafka Producer impossible
+  testImplementation("org.springframework.kafka:spring-kafka:2.7.1")
+
+  testLibrary("org.springframework.boot:spring-boot-starter-test:2.5.3")
+  testLibrary("org.springframework.boot:spring-boot-starter:2.5.3")
+}

+ 16 - 15
instrumentation/spring/spring-kafka-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/InstrumentedBatchInterceptor.java → instrumentation/spring/spring-kafka-2.7/library/src/main/java/io/opentelemetry/instrumentation/spring/kafka/v2_7/InstrumentedBatchInterceptor.java

@@ -3,30 +3,31 @@
  * SPDX-License-Identifier: Apache-2.0
  */
 
-package io.opentelemetry.javaagent.instrumentation.spring.kafka;
-
-import static io.opentelemetry.javaagent.instrumentation.spring.kafka.SpringKafkaSingletons.batchProcessInstrumenter;
+package io.opentelemetry.instrumentation.spring.kafka.v2_7;
 
 import io.opentelemetry.context.Context;
 import io.opentelemetry.context.Scope;
+import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
 import io.opentelemetry.instrumentation.api.util.VirtualField;
 import javax.annotation.Nullable;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.springframework.kafka.listener.BatchInterceptor;
 
-public final class InstrumentedBatchInterceptor<K, V> implements BatchInterceptor<K, V> {
+final class InstrumentedBatchInterceptor<K, V> implements BatchInterceptor<K, V> {
+
+  private static final VirtualField<ConsumerRecords<?, ?>, Context> receiveContextField =
+      VirtualField.find(ConsumerRecords.class, Context.class);
+  private static final VirtualField<ConsumerRecords<?, ?>, State<ConsumerRecords<?, ?>>>
+      stateField = VirtualField.find(ConsumerRecords.class, State.class);
 
-  private final VirtualField<ConsumerRecords<K, V>, Context> receiveContextField;
-  private final VirtualField<ConsumerRecords<K, V>, State<ConsumerRecords<K, V>>> stateField;
+  private final Instrumenter<ConsumerRecords<?, ?>, Void> batchProcessInstrumenter;
   @Nullable private final BatchInterceptor<K, V> decorated;
 
-  public InstrumentedBatchInterceptor(
-      VirtualField<ConsumerRecords<K, V>, Context> receiveContextField,
-      VirtualField<ConsumerRecords<K, V>, State<ConsumerRecords<K, V>>> stateField,
+  InstrumentedBatchInterceptor(
+      Instrumenter<ConsumerRecords<?, ?>, Void> batchProcessInstrumenter,
       @Nullable BatchInterceptor<K, V> decorated) {
-    this.receiveContextField = receiveContextField;
-    this.stateField = stateField;
+    this.batchProcessInstrumenter = batchProcessInstrumenter;
     this.decorated = decorated;
   }
 
@@ -34,8 +35,8 @@ public final class InstrumentedBatchInterceptor<K, V> implements BatchIntercepto
   public ConsumerRecords<K, V> intercept(ConsumerRecords<K, V> records, Consumer<K, V> consumer) {
     Context parentContext = getParentContext(records);
 
-    if (batchProcessInstrumenter().shouldStart(parentContext, records)) {
-      Context context = batchProcessInstrumenter().start(parentContext, records);
+    if (batchProcessInstrumenter.shouldStart(parentContext, records)) {
+      Context context = batchProcessInstrumenter.start(parentContext, records);
       Scope scope = context.makeCurrent();
       stateField.set(records, State.create(records, context, scope));
     }
@@ -67,11 +68,11 @@ public final class InstrumentedBatchInterceptor<K, V> implements BatchIntercepto
   }
 
   private void end(ConsumerRecords<K, V> records, @Nullable Throwable error) {
-    State<ConsumerRecords<K, V>> state = stateField.get(records);
+    State<ConsumerRecords<?, ?>> state = stateField.get(records);
     stateField.set(records, null);
     if (state != null) {
       state.scope().close();
-      batchProcessInstrumenter().end(state.context(), state.request(), null, error);
+      batchProcessInstrumenter.end(state.context(), state.request(), null, error);
     }
   }
 }

+ 16 - 15
instrumentation/spring/spring-kafka-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/InstrumentedRecordInterceptor.java → instrumentation/spring/spring-kafka-2.7/library/src/main/java/io/opentelemetry/instrumentation/spring/kafka/v2_7/InstrumentedRecordInterceptor.java

@@ -3,30 +3,31 @@
  * SPDX-License-Identifier: Apache-2.0
  */
 
-package io.opentelemetry.javaagent.instrumentation.spring.kafka;
-
-import static io.opentelemetry.javaagent.instrumentation.spring.kafka.SpringKafkaSingletons.processInstrumenter;
+package io.opentelemetry.instrumentation.spring.kafka.v2_7;
 
 import io.opentelemetry.context.Context;
 import io.opentelemetry.context.Scope;
+import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
 import io.opentelemetry.instrumentation.api.util.VirtualField;
 import javax.annotation.Nullable;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.springframework.kafka.listener.RecordInterceptor;
 
-public final class InstrumentedRecordInterceptor<K, V> implements RecordInterceptor<K, V> {
+final class InstrumentedRecordInterceptor<K, V> implements RecordInterceptor<K, V> {
+
+  private static final VirtualField<ConsumerRecord<?, ?>, Context> receiveContextField =
+      VirtualField.find(ConsumerRecord.class, Context.class);
+  private static final VirtualField<ConsumerRecord<?, ?>, State<ConsumerRecord<?, ?>>> stateField =
+      VirtualField.find(ConsumerRecord.class, State.class);
 
-  private final VirtualField<ConsumerRecord<K, V>, Context> receiveContextField;
-  private final VirtualField<ConsumerRecord<K, V>, State<ConsumerRecord<K, V>>> stateField;
+  private final Instrumenter<ConsumerRecord<?, ?>, Void> processInstrumenter;
   @Nullable private final RecordInterceptor<K, V> decorated;
 
-  public InstrumentedRecordInterceptor(
-      VirtualField<ConsumerRecord<K, V>, Context> receiveContextField,
-      VirtualField<ConsumerRecord<K, V>, State<ConsumerRecord<K, V>>> stateField,
+  InstrumentedRecordInterceptor(
+      Instrumenter<ConsumerRecord<?, ?>, Void> processInstrumenter,
       @Nullable RecordInterceptor<K, V> decorated) {
-    this.receiveContextField = receiveContextField;
-    this.stateField = stateField;
+    this.processInstrumenter = processInstrumenter;
     this.decorated = decorated;
   }
 
@@ -46,8 +47,8 @@ public final class InstrumentedRecordInterceptor<K, V> implements RecordIntercep
   private void start(ConsumerRecord<K, V> record) {
     Context parentContext = getParentContext(record);
 
-    if (processInstrumenter().shouldStart(parentContext, record)) {
-      Context context = processInstrumenter().start(parentContext, record);
+    if (processInstrumenter.shouldStart(parentContext, record)) {
+      Context context = processInstrumenter.start(parentContext, record);
       Scope scope = context.makeCurrent();
       stateField.set(record, State.create(record, context, scope));
     }
@@ -77,11 +78,11 @@ public final class InstrumentedRecordInterceptor<K, V> implements RecordIntercep
   }
 
   private void end(ConsumerRecord<K, V> record, @Nullable Throwable error) {
-    State<ConsumerRecord<K, V>> state = stateField.get(record);
+    State<ConsumerRecord<?, ?>> state = stateField.get(record);
     stateField.set(record, null);
     if (state != null) {
       state.scope().close();
-      processInstrumenter().end(state.context(), state.request(), null, error);
+      processInstrumenter.end(state.context(), state.request(), null, error);
     }
   }
 }

+ 1 - 1
instrumentation/spring/spring-kafka-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/SpringKafkaErrorCauseExtractor.java → instrumentation/spring/spring-kafka-2.7/library/src/main/java/io/opentelemetry/instrumentation/spring/kafka/v2_7/SpringKafkaErrorCauseExtractor.java

@@ -3,7 +3,7 @@
  * SPDX-License-Identifier: Apache-2.0
  */
 
-package io.opentelemetry.javaagent.instrumentation.spring.kafka;
+package io.opentelemetry.instrumentation.spring.kafka.v2_7;
 
 import io.opentelemetry.instrumentation.api.instrumenter.ErrorCauseExtractor;
 import org.springframework.kafka.listener.ListenerExecutionFailedException;

+ 84 - 0
instrumentation/spring/spring-kafka-2.7/library/src/main/java/io/opentelemetry/instrumentation/spring/kafka/v2_7/SpringKafkaTelemetry.java

@@ -0,0 +1,84 @@
+/*
+ * Copyright The OpenTelemetry Authors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package io.opentelemetry.instrumentation.spring.kafka.v2_7;
+
+import io.opentelemetry.api.OpenTelemetry;
+import io.opentelemetry.api.trace.SpanKind;
+import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.springframework.kafka.listener.AbstractMessageListenerContainer;
+import org.springframework.kafka.listener.BatchInterceptor;
+import org.springframework.kafka.listener.RecordInterceptor;
+
+/** Entrypoint for instrumenting Spring Kafka listeners. */
+public final class SpringKafkaTelemetry {
+
+  /** Returns a new {@link SpringKafkaTelemetry} configured with the given {@link OpenTelemetry}. */
+  public static SpringKafkaTelemetry create(OpenTelemetry openTelemetry) {
+    return builder(openTelemetry).build();
+  }
+
+  /**
+   * Returns a new {@link SpringKafkaTelemetryBuilder} configured with the given {@link
+   * OpenTelemetry}.
+   */
+  public static SpringKafkaTelemetryBuilder builder(OpenTelemetry openTelemetry) {
+    return new SpringKafkaTelemetryBuilder(openTelemetry);
+  }
+
+  private final Instrumenter<ConsumerRecord<?, ?>, Void> processInstrumenter;
+  private final Instrumenter<ConsumerRecords<?, ?>, Void> batchProcessInstrumenter;
+
+  SpringKafkaTelemetry(
+      Instrumenter<ConsumerRecord<?, ?>, Void> processInstrumenter,
+      Instrumenter<ConsumerRecords<?, ?>, Void> batchProcessInstrumenter) {
+    this.processInstrumenter = processInstrumenter;
+    this.batchProcessInstrumenter = batchProcessInstrumenter;
+  }
+
+  /**
+   * Returns a new {@link RecordInterceptor} that decorates a message listener with a {@link
+   * SpanKind#CONSUMER CONSUMER} span. Can be set on a {@link AbstractMessageListenerContainer}
+   * using the {@link AbstractMessageListenerContainer#setRecordInterceptor(RecordInterceptor)}
+   * method.
+   */
+  public <K, V> RecordInterceptor<K, V> createRecordInterceptor() {
+    return createRecordInterceptor(null);
+  }
+
+  /**
+   * Returns a new {@link RecordInterceptor} that decorates a message listener with a {@link
+   * SpanKind#CONSUMER CONSUMER} span, and then delegates to a provided {@code
+   * decoratedInterceptor}. Can be set on a {@link AbstractMessageListenerContainer} using the
+   * {@link AbstractMessageListenerContainer#setRecordInterceptor(RecordInterceptor)} method.
+   */
+  public <K, V> RecordInterceptor<K, V> createRecordInterceptor(
+      RecordInterceptor<K, V> decoratedInterceptor) {
+    return new InstrumentedRecordInterceptor<>(processInstrumenter, decoratedInterceptor);
+  }
+
+  /**
+   * Returns a new {@link BatchInterceptor} that decorates a message listener with a {@link
+   * SpanKind#CONSUMER CONSUMER} span. Can be set on a {@link AbstractMessageListenerContainer}
+   * using the {@link AbstractMessageListenerContainer#setBatchInterceptor(BatchInterceptor)}
+   * method.
+   */
+  public <K, V> BatchInterceptor<K, V> createBatchInterceptor() {
+    return createBatchInterceptor(null);
+  }
+
+  /**
+   * Returns a new {@link BatchInterceptor} that decorates a message listener with a {@link
+   * SpanKind#CONSUMER CONSUMER} span, and then delegates to a provided {@code
+   * decoratedInterceptor}. Can be set on a {@link AbstractMessageListenerContainer} using the
+   * {@link AbstractMessageListenerContainer#setBatchInterceptor(BatchInterceptor)} method.
+   */
+  public <K, V> BatchInterceptor<K, V> createBatchInterceptor(
+      BatchInterceptor<K, V> decoratedInterceptor) {
+    return new InstrumentedBatchInterceptor<>(batchProcessInstrumenter, decoratedInterceptor);
+  }
+}

+ 57 - 0
instrumentation/spring/spring-kafka-2.7/library/src/main/java/io/opentelemetry/instrumentation/spring/kafka/v2_7/SpringKafkaTelemetryBuilder.java

@@ -0,0 +1,57 @@
+/*
+ * Copyright The OpenTelemetry Authors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package io.opentelemetry.instrumentation.spring.kafka.v2_7;
+
+import io.opentelemetry.api.OpenTelemetry;
+import io.opentelemetry.instrumentation.kafka.internal.KafkaInstrumenterFactory;
+
+/** A builder of {@link SpringKafkaTelemetry}. */
+public final class SpringKafkaTelemetryBuilder {
+
+  private static final String INSTRUMENTATION_NAME = "io.opentelemetry.spring-kafka-2.7";
+
+  private final OpenTelemetry openTelemetry;
+  private boolean captureExperimentalSpanAttributes = false;
+  private boolean propagationEnabled = true;
+  private boolean messagingReceiveInstrumentationEnabled = false;
+
+  SpringKafkaTelemetryBuilder(OpenTelemetry openTelemetry) {
+    this.openTelemetry = openTelemetry;
+  }
+
+  public SpringKafkaTelemetryBuilder setCaptureExperimentalSpanAttributes(
+      boolean captureExperimentalSpanAttributes) {
+    this.captureExperimentalSpanAttributes = captureExperimentalSpanAttributes;
+    return this;
+  }
+
+  public SpringKafkaTelemetryBuilder setPropagationEnabled(boolean propagationEnabled) {
+    this.propagationEnabled = propagationEnabled;
+    return this;
+  }
+
+  public SpringKafkaTelemetryBuilder setMessagingReceiveInstrumentationEnabled(
+      boolean messagingReceiveInstrumentationEnabled) {
+    this.messagingReceiveInstrumentationEnabled = messagingReceiveInstrumentationEnabled;
+    return this;
+  }
+
+  /**
+   * Returns a new {@link SpringKafkaTelemetry} with the settings of this {@link
+   * SpringKafkaTelemetryBuilder}.
+   */
+  public SpringKafkaTelemetry build() {
+    KafkaInstrumenterFactory factory =
+        new KafkaInstrumenterFactory(openTelemetry, INSTRUMENTATION_NAME)
+            .setCaptureExperimentalSpanAttributes(captureExperimentalSpanAttributes)
+            .setPropagationEnabled(propagationEnabled)
+            .setMessagingReceiveInstrumentationEnabled(messagingReceiveInstrumentationEnabled)
+            .setErrorCauseExtractor(SpringKafkaErrorCauseExtractor.INSTANCE);
+
+    return new SpringKafkaTelemetry(
+        factory.createConsumerProcessInstrumenter(), factory.createBatchProcessInstrumenter());
+  }
+}

+ 26 - 0
instrumentation/spring/spring-kafka-2.7/library/src/main/java/io/opentelemetry/instrumentation/spring/kafka/v2_7/State.java

@@ -0,0 +1,26 @@
+/*
+ * Copyright The OpenTelemetry Authors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package io.opentelemetry.instrumentation.spring.kafka.v2_7;
+
+import com.google.auto.value.AutoValue;
+import io.opentelemetry.context.Context;
+import io.opentelemetry.context.Scope;
+
+@AutoValue
+abstract class State<REQUEST> {
+
+  static <REQUEST> State<REQUEST> create(REQUEST request, Context context, Scope scope) {
+    return new AutoValue_State<>(request, context, scope);
+  }
+
+  abstract REQUEST request();
+
+  abstract Context context();
+
+  abstract Scope scope();
+
+  State() {}
+}

+ 57 - 0
instrumentation/spring/spring-kafka-2.7/library/src/test/java/io/opentelemetry/instrumentation/spring/kafka/v2_7/SpringKafkaNoReceiveTelemetryTest.java

@@ -0,0 +1,57 @@
+/*
+ * Copyright The OpenTelemetry Authors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package io.opentelemetry.instrumentation.spring.kafka.v2_7;
+
+import static java.util.Collections.singletonList;
+
+import io.opentelemetry.instrumentation.kafkaclients.KafkaTelemetry;
+import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
+import io.opentelemetry.instrumentation.testing.junit.LibraryInstrumentationExtension;
+import io.opentelemetry.testing.AbstractSpringKafkaNoReceiveTelemetryTest;
+import java.util.List;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.springframework.boot.autoconfigure.kafka.DefaultKafkaProducerFactoryCustomizer;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.kafka.config.ContainerCustomizer;
+import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
+
+class SpringKafkaNoReceiveTelemetryTest extends AbstractSpringKafkaNoReceiveTelemetryTest {
+
+  @RegisterExtension
+  static final InstrumentationExtension testing = LibraryInstrumentationExtension.create();
+
+  @Override
+  protected InstrumentationExtension testing() {
+    return testing;
+  }
+
+  @Override
+  protected List<Class<?>> additionalSpringConfigs() {
+    return singletonList(KafkaInstrumentationConfig.class);
+  }
+
+  @Configuration
+  public static class KafkaInstrumentationConfig {
+
+    @Bean
+    public DefaultKafkaProducerFactoryCustomizer producerInstrumentation() {
+      KafkaTelemetry kafkaTelemetry = KafkaTelemetry.create(testing.getOpenTelemetry());
+      return producerFactory -> producerFactory.addPostProcessor(kafkaTelemetry::wrap);
+    }
+
+    @Bean
+    public ContainerCustomizer<String, String, ConcurrentMessageListenerContainer<String, String>>
+        listenerCustomizer() {
+      SpringKafkaTelemetry springKafkaTelemetry =
+          SpringKafkaTelemetry.create(testing.getOpenTelemetry());
+      return container -> {
+        container.setRecordInterceptor(springKafkaTelemetry.createRecordInterceptor());
+        container.setBatchInterceptor(springKafkaTelemetry.createBatchInterceptor());
+      };
+    }
+  }
+}

+ 220 - 0
instrumentation/spring/spring-kafka-2.7/testing/src/main/java/io/opentelemetry/testing/AbstractSpringKafkaNoReceiveTelemetryTest.java

@@ -0,0 +1,220 @@
+/*
+ * Copyright The OpenTelemetry Authors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package io.opentelemetry.testing;
+
+import static io.opentelemetry.instrumentation.testing.util.TelemetryDataUtil.orderByRootSpanKind;
+import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo;
+import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.satisfies;
+
+import io.opentelemetry.api.trace.SpanKind;
+import io.opentelemetry.sdk.trace.data.SpanData;
+import io.opentelemetry.sdk.trace.data.StatusData;
+import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+import org.assertj.core.api.AbstractLongAssert;
+import org.junit.jupiter.api.Test;
+
+public abstract class AbstractSpringKafkaNoReceiveTelemetryTest extends AbstractSpringKafkaTest {
+
+  @Test
+  void shouldCreateSpansForSingleRecordProcess() {
+    testing()
+        .runWithSpan(
+            "producer",
+            () -> {
+              kafkaTemplate.executeInTransaction(
+                  ops -> {
+                    ops.send("testSingleTopic", "10", "testSpan");
+                    return 0;
+                  });
+            });
+
+    testing()
+        .waitAndAssertTraces(
+            trace ->
+                trace.hasSpansSatisfyingExactly(
+                    span -> span.hasName("producer"),
+                    span ->
+                        span.hasName("testSingleTopic send")
+                            .hasKind(SpanKind.PRODUCER)
+                            .hasParent(trace.getSpan(0))
+                            .hasAttributesSatisfyingExactly(
+                                equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"),
+                                equalTo(
+                                    SemanticAttributes.MESSAGING_DESTINATION, "testSingleTopic"),
+                                equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic")),
+                    span ->
+                        span.hasName("testSingleTopic process")
+                            .hasKind(SpanKind.CONSUMER)
+                            .hasParent(trace.getSpan(1))
+                            .hasAttributesSatisfyingExactly(
+                                equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"),
+                                equalTo(
+                                    SemanticAttributes.MESSAGING_DESTINATION, "testSingleTopic"),
+                                equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic"),
+                                equalTo(SemanticAttributes.MESSAGING_OPERATION, "process"),
+                                satisfies(
+                                    SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES,
+                                    AbstractLongAssert::isNotNegative),
+                                satisfies(
+                                    SemanticAttributes.MESSAGING_KAFKA_PARTITION,
+                                    AbstractLongAssert::isNotNegative)),
+                    span -> span.hasName("consumer").hasParent(trace.getSpan(2))));
+  }
+
+  @Test
+  void shouldHandleFailureInKafkaListener() {
+    testing()
+        .runWithSpan(
+            "producer",
+            () -> {
+              kafkaTemplate.executeInTransaction(
+                  ops -> {
+                    ops.send("testSingleTopic", "10", "error");
+                    return 0;
+                  });
+            });
+
+    testing()
+        .waitAndAssertTraces(
+            trace ->
+                trace.hasSpansSatisfyingExactly(
+                    span -> span.hasName("producer"),
+                    span ->
+                        span.hasName("testSingleTopic send")
+                            .hasKind(SpanKind.PRODUCER)
+                            .hasParent(trace.getSpan(0))
+                            .hasAttributesSatisfyingExactly(
+                                equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"),
+                                equalTo(
+                                    SemanticAttributes.MESSAGING_DESTINATION, "testSingleTopic"),
+                                equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic")),
+                    span ->
+                        span.hasName("testSingleTopic process")
+                            .hasKind(SpanKind.CONSUMER)
+                            .hasParent(trace.getSpan(1))
+                            .hasStatus(StatusData.error())
+                            .hasException(new IllegalArgumentException("boom"))
+                            .hasAttributesSatisfyingExactly(
+                                equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"),
+                                equalTo(
+                                    SemanticAttributes.MESSAGING_DESTINATION, "testSingleTopic"),
+                                equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic"),
+                                equalTo(SemanticAttributes.MESSAGING_OPERATION, "process"),
+                                satisfies(
+                                    SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES,
+                                    AbstractLongAssert::isNotNegative),
+                                satisfies(
+                                    SemanticAttributes.MESSAGING_KAFKA_PARTITION,
+                                    AbstractLongAssert::isNotNegative)),
+                    span -> span.hasName("consumer").hasParent(trace.getSpan(2))));
+  }
+
+  @Test
+  void shouldCreateSpansForBatchReceiveAndProcess() throws InterruptedException {
+    Map<String, String> batchMessages = new HashMap<>();
+    batchMessages.put("10", "testSpan1");
+    batchMessages.put("20", "testSpan2");
+    sendBatchMessages(batchMessages);
+
+    AtomicReference<SpanData> producer1 = new AtomicReference<>();
+    AtomicReference<SpanData> producer2 = new AtomicReference<>();
+
+    testing()
+        .waitAndAssertSortedTraces(
+            orderByRootSpanKind(SpanKind.INTERNAL, SpanKind.CONSUMER),
+            trace -> {
+              trace.hasSpansSatisfyingExactly(
+                  span -> span.hasName("producer"),
+                  span ->
+                      span.hasName("testBatchTopic send")
+                          .hasKind(SpanKind.PRODUCER)
+                          .hasParent(trace.getSpan(0))
+                          .hasAttributesSatisfyingExactly(
+                              equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"),
+                              equalTo(SemanticAttributes.MESSAGING_DESTINATION, "testBatchTopic"),
+                              equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic")),
+                  span ->
+                      span.hasName("testBatchTopic send")
+                          .hasKind(SpanKind.PRODUCER)
+                          .hasParent(trace.getSpan(0))
+                          .hasAttributesSatisfyingExactly(
+                              equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"),
+                              equalTo(SemanticAttributes.MESSAGING_DESTINATION, "testBatchTopic"),
+                              equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic")));
+
+              producer1.set(trace.getSpan(1));
+              producer2.set(trace.getSpan(2));
+            },
+            trace ->
+                trace.hasSpansSatisfyingExactly(
+                    span ->
+                        span.hasName("testBatchTopic process")
+                            .hasKind(SpanKind.CONSUMER)
+                            .hasNoParent()
+                            .hasLinksSatisfying(
+                                links(
+                                    producer1.get().getSpanContext(),
+                                    producer2.get().getSpanContext()))
+                            .hasAttributesSatisfyingExactly(
+                                equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"),
+                                equalTo(SemanticAttributes.MESSAGING_DESTINATION, "testBatchTopic"),
+                                equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic"),
+                                equalTo(SemanticAttributes.MESSAGING_OPERATION, "process")),
+                    span -> span.hasName("consumer").hasParent(trace.getSpan(0))));
+  }
+
+  @Test
+  void shouldHandleFailureInKafkaBatchListener() {
+    testing()
+        .runWithSpan(
+            "producer",
+            () -> {
+              kafkaTemplate.executeInTransaction(
+                  ops -> {
+                    ops.send("testBatchTopic", "10", "error");
+                    return 0;
+                  });
+            });
+
+    AtomicReference<SpanData> producer = new AtomicReference<>();
+
+    testing()
+        .waitAndAssertSortedTraces(
+            orderByRootSpanKind(SpanKind.INTERNAL, SpanKind.CONSUMER),
+            trace -> {
+              trace.hasSpansSatisfyingExactly(
+                  span -> span.hasName("producer"),
+                  span ->
+                      span.hasName("testBatchTopic send")
+                          .hasKind(SpanKind.PRODUCER)
+                          .hasParent(trace.getSpan(0))
+                          .hasAttributesSatisfyingExactly(
+                              equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"),
+                              equalTo(SemanticAttributes.MESSAGING_DESTINATION, "testBatchTopic"),
+                              equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic")));
+
+              producer.set(trace.getSpan(1));
+            },
+            trace ->
+                trace.hasSpansSatisfyingExactly(
+                    span ->
+                        span.hasName("testBatchTopic process")
+                            .hasKind(SpanKind.CONSUMER)
+                            .hasNoParent()
+                            .hasLinksSatisfying(links(producer.get().getSpanContext()))
+                            .hasStatus(StatusData.error())
+                            .hasException(new IllegalArgumentException("boom"))
+                            .hasAttributesSatisfyingExactly(
+                                equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"),
+                                equalTo(SemanticAttributes.MESSAGING_DESTINATION, "testBatchTopic"),
+                                equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic"),
+                                equalTo(SemanticAttributes.MESSAGING_OPERATION, "process")),
+                    span -> span.hasName("consumer").hasParent(trace.getSpan(0))));
+  }
+}

+ 60 - 24
instrumentation/spring/spring-kafka-2.7/testing/src/main/java/io/opentelemetry/testing/AbstractSpringKafkaTest.java

@@ -5,14 +5,20 @@
 
 package io.opentelemetry.testing;
 
-import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import io.opentelemetry.api.trace.SpanContext;
 import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
+import io.opentelemetry.sdk.trace.data.LinkData;
 import java.time.Duration;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
+import java.util.function.Consumer;
 import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.api.BeforeEach;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.boot.SpringApplication;
@@ -26,22 +32,32 @@ public abstract class AbstractSpringKafkaTest {
 
   private static final Logger logger = LoggerFactory.getLogger(AbstractSpringKafkaTest.class);
 
-  @RegisterExtension
-  protected static final InstrumentationExtension testing = AgentInstrumentationExtension.create();
-
   static KafkaContainer kafka;
-  static ConfigurableApplicationContext applicationContext;
-  protected static KafkaTemplate<String, String> kafkaTemplate;
 
-  @SuppressWarnings("unchecked")
+  ConfigurableApplicationContext applicationContext;
+  protected KafkaTemplate<String, String> kafkaTemplate;
+
   @BeforeAll
-  static void setUp() {
+  static void setUpKafka() {
     kafka =
         new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:5.4.3"))
             .waitingFor(Wait.forLogMessage(".*started \\(kafka.server.KafkaServer\\).*", 1))
             .withStartupTimeout(Duration.ofMinutes(1));
     kafka.start();
+  }
+
+  @AfterAll
+  static void tearDownKafka() {
+    kafka.stop();
+  }
+
+  protected abstract InstrumentationExtension testing();
 
+  protected abstract List<Class<?>> additionalSpringConfigs();
+
+  @SuppressWarnings("unchecked")
+  @BeforeEach
+  void setUpApp() {
     Map<String, Object> props = new HashMap<>();
     props.put("spring.jmx.enabled", false);
     props.put("spring.main.web-application-type", "none");
@@ -53,16 +69,16 @@ public abstract class AbstractSpringKafkaTest {
     props.put("spring.kafka.producer.transaction-id-prefix", "test-");
 
     SpringApplication app = new SpringApplication(ConsumerConfig.class);
+    app.addPrimarySources(additionalSpringConfigs());
     app.setDefaultProperties(props);
     applicationContext = app.run();
     kafkaTemplate = applicationContext.getBean("kafkaTemplate", KafkaTemplate.class);
   }
 
-  @AfterAll
-  static void tearDown() {
-    kafka.stop();
+  @AfterEach
+  void tearDownApp() {
     if (applicationContext != null) {
-      applicationContext.stop();
+      applicationContext.close();
     }
   }
 
@@ -75,25 +91,45 @@ public abstract class AbstractSpringKafkaTest {
     for (int i = 1; i <= maxAttempts; i++) {
       BatchRecordListener.reset();
 
-      testing.runWithSpan(
-          "producer",
-          () -> {
-            kafkaTemplate.executeInTransaction(
-                ops -> {
-                  keyToData.forEach((key, data) -> ops.send("testBatchTopic", key, data));
-                  return 0;
-                });
-          });
+      testing()
+          .runWithSpan(
+              "producer",
+              () -> {
+                kafkaTemplate.executeInTransaction(
+                    ops -> {
+                      keyToData.forEach((key, data) -> ops.send("testBatchTopic", key, data));
+                      return 0;
+                    });
+              });
 
       BatchRecordListener.waitForMessages();
       if (BatchRecordListener.getLastBatchSize() == 2) {
         break;
       } else if (i < maxAttempts) {
-        testing.waitForTraces(2);
+        testing().waitForTraces(2);
         Thread.sleep(1_000); // sleep a bit to give time for all the spans to arrive
-        testing.clearData();
+        testing().clearData();
         logger.info("Messages weren't received as batch, retrying");
       }
     }
   }
+
+  protected static Consumer<List<? extends LinkData>> links(SpanContext... spanContexts) {
+    return links -> {
+      assertThat(links).hasSize(spanContexts.length);
+      for (SpanContext spanContext : spanContexts) {
+        assertThat(links)
+            .anySatisfy(
+                link -> {
+                  assertThat(link.getSpanContext().getTraceId())
+                      .isEqualTo(spanContext.getTraceId());
+                  assertThat(link.getSpanContext().getSpanId()).isEqualTo(spanContext.getSpanId());
+                  assertThat(link.getSpanContext().getTraceFlags())
+                      .isEqualTo(spanContext.getTraceFlags());
+                  assertThat(link.getSpanContext().getTraceState())
+                      .isEqualTo(spanContext.getTraceState());
+                });
+      }
+    };
+  }
 }

+ 15 - 2
instrumentation/spring/spring-kafka-2.7/testing/src/main/java/io/opentelemetry/testing/ConsumerConfig.java

@@ -6,12 +6,15 @@
 package io.opentelemetry.testing;
 
 import org.apache.kafka.clients.admin.NewTopic;
+import org.springframework.beans.factory.ObjectProvider;
 import org.springframework.boot.SpringBootConfiguration;
 import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
 import org.springframework.context.annotation.Bean;
 import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
+import org.springframework.kafka.config.ContainerCustomizer;
 import org.springframework.kafka.config.TopicBuilder;
 import org.springframework.kafka.core.ConsumerFactory;
+import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
 
 @SpringBootConfiguration
 @EnableAutoConfiguration
@@ -39,7 +42,11 @@ public class ConsumerConfig {
 
   @Bean
   public ConcurrentKafkaListenerContainerFactory<String, String> batchFactory(
-      ConsumerFactory<String, String> consumerFactory) {
+      ConsumerFactory<String, String> consumerFactory,
+      ObjectProvider<
+              ContainerCustomizer<
+                  String, String, ConcurrentMessageListenerContainer<String, String>>>
+          customizerProvider) {
     ConcurrentKafkaListenerContainerFactory<String, String> factory =
         new ConcurrentKafkaListenerContainerFactory<>();
     // do not retry failed records
@@ -47,12 +54,17 @@ public class ConsumerConfig {
     factory.setConsumerFactory(consumerFactory);
     factory.setBatchListener(true);
     factory.setAutoStartup(true);
+    customizerProvider.ifAvailable(factory::setContainerCustomizer);
     return factory;
   }
 
   @Bean
   public ConcurrentKafkaListenerContainerFactory<String, String> singleFactory(
-      ConsumerFactory<String, String> consumerFactory) {
+      ConsumerFactory<String, String> consumerFactory,
+      ObjectProvider<
+              ContainerCustomizer<
+                  String, String, ConcurrentMessageListenerContainer<String, String>>>
+          customizerProvider) {
     ConcurrentKafkaListenerContainerFactory<String, String> factory =
         new ConcurrentKafkaListenerContainerFactory<>();
     // do not retry failed records
@@ -60,6 +72,7 @@ public class ConsumerConfig {
     factory.setConsumerFactory(consumerFactory);
     factory.setBatchListener(false);
     factory.setAutoStartup(true);
+    customizerProvider.ifAvailable(factory::setContainerCustomizer);
     return factory;
   }
 }

+ 1 - 0
settings.gradle.kts

@@ -430,6 +430,7 @@ include(":instrumentation:spring:spring-integration-4.1:library")
 include(":instrumentation:spring:spring-integration-4.1:testing")
 include(":instrumentation:spring:spring-jms-2.0:javaagent")
 include(":instrumentation:spring:spring-kafka-2.7:javaagent")
+include(":instrumentation:spring:spring-kafka-2.7:library")
 include(":instrumentation:spring:spring-kafka-2.7:testing")
 include(":instrumentation:spring:spring-rabbit-1.0:javaagent")
 include(":instrumentation:spring:spring-rmi-4.0:javaagent")