Browse Source

Pulsar: use span links when receive telemetry is enabled (#10650)

Lauri Tulmin 11 months ago
parent
commit
3b7c2255b6

+ 20 - 1
instrumentation/pulsar/pulsar-2.8/javaagent/build.gradle.kts

@@ -19,9 +19,28 @@ dependencies {
   testImplementation("org.apache.pulsar:pulsar-client-admin:2.8.0")
 }
 
+tasks {
+  val testReceiveSpanDisabled by registering(Test::class) {
+    filter {
+      includeTestsMatching("PulsarClientSuppressReceiveSpansTest")
+    }
+    include("**/PulsarClientSuppressReceiveSpansTest.*")
+  }
+
+  test {
+    filter {
+      excludeTestsMatching("PulsarClientSuppressReceiveSpansTest")
+    }
+    jvmArgs("-Dotel.instrumentation.messaging.experimental.receive-telemetry.enabled=true")
+  }
+
+  check {
+    dependsOn(testReceiveSpanDisabled)
+  }
+}
+
 tasks.withType<Test>().configureEach {
   // TODO run tests both with and without experimental span attributes
   jvmArgs("-Dotel.instrumentation.pulsar.experimental-span-attributes=true")
-  jvmArgs("-Dotel.instrumentation.messaging.experimental.receive-telemetry.enabled=true")
   usesService(gradle.sharedServices.registrations["testcontainersBuildService"].service)
 }

+ 48 - 0
instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/ConsumerBaseInstrumentation.java

@@ -0,0 +1,48 @@
+/*
+ * Copyright The OpenTelemetry Authors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package io.opentelemetry.javaagent.instrumentation.pulsar.v2_8;
+
+import static net.bytebuddy.matcher.ElementMatchers.named;
+import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
+
+import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
+import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
+import io.opentelemetry.javaagent.instrumentation.pulsar.v2_8.telemetry.MessageListenerContext;
+import net.bytebuddy.asm.Advice;
+import net.bytebuddy.description.type.TypeDescription;
+import net.bytebuddy.matcher.ElementMatcher;
+
+public class ConsumerBaseInstrumentation implements TypeInstrumentation {
+
+  @Override
+  public ElementMatcher<TypeDescription> typeMatcher() {
+    return named("org.apache.pulsar.client.impl.ConsumerBase")
+        .or(named("org.apache.pulsar.client.impl.MultiTopicsConsumerImpl"));
+  }
+
+  @Override
+  public void transform(TypeTransformer transformer) {
+    // these methods receive a message and pass it on to a message listener
+    // we instrument them so that the span for the receive operation could be suppressed
+    transformer.applyAdviceToMethod(
+        named("triggerListener").and(takesArguments(0)).or(named("receiveMessageFromConsumer")),
+        this.getClass().getName() + "$TriggerListenerAdvice");
+  }
+
+  @SuppressWarnings("unused")
+  public static class TriggerListenerAdvice {
+
+    @Advice.OnMethodEnter(suppress = Throwable.class)
+    public static void onEnter() {
+      MessageListenerContext.startProcessing();
+    }
+
+    @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
+    public static void onExit() {
+      MessageListenerContext.endProcessing();
+    }
+  }
+}

+ 1 - 0
instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/PulsarInstrumentationModule.java

@@ -20,6 +20,7 @@ public class PulsarInstrumentationModule extends InstrumentationModule {
   @Override
   public List<TypeInstrumentation> typeInstrumentations() {
     return Arrays.asList(
+        new ConsumerBaseInstrumentation(),
         new ConsumerImplInstrumentation(),
         new ProducerImplInstrumentation(),
         new MessageInstrumentation(),

+ 31 - 0
instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/MessageListenerContext.java

@@ -0,0 +1,31 @@
+/*
+ * Copyright The OpenTelemetry Authors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package io.opentelemetry.javaagent.instrumentation.pulsar.v2_8.telemetry;
+
+/**
+ * Helper class used to determine whether message is going to be processed by a listener. If we know
+ * that message is going to be passed to a message listener, that would produce a span for the
+ * "process" operation, we are going to suppress the span from the message "receive" operation.
+ */
+public final class MessageListenerContext {
+  private static final ThreadLocal<Boolean> processing = new ThreadLocal<>();
+
+  private MessageListenerContext() {}
+
+  /** Call on entry to a method that will pass the received message to a message listener. */
+  public static void startProcessing() {
+    processing.set(Boolean.TRUE);
+  }
+
+  public static void endProcessing() {
+    processing.remove();
+  }
+
+  /** Returns true if we expect a received message to be passed to a listener. */
+  public static boolean isProcessing() {
+    return processing.get() != null;
+  }
+}

+ 53 - 25
instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/PulsarSingletons.java

@@ -18,7 +18,9 @@ import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor;
 import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
 import io.opentelemetry.instrumentation.api.instrumenter.InstrumenterBuilder;
 import io.opentelemetry.instrumentation.api.instrumenter.SpanKindExtractor;
+import io.opentelemetry.instrumentation.api.instrumenter.SpanLinksExtractor;
 import io.opentelemetry.instrumentation.api.internal.InstrumenterUtil;
+import io.opentelemetry.instrumentation.api.internal.PropagatorBasedSpanLinksExtractor;
 import io.opentelemetry.instrumentation.api.internal.Timer;
 import io.opentelemetry.instrumentation.api.semconv.network.ServerAttributesExtractor;
 import io.opentelemetry.javaagent.bootstrap.internal.ExperimentalConfig;
@@ -38,6 +40,8 @@ public final class PulsarSingletons {
       TELEMETRY.getPropagators().getTextMapPropagator();
   private static final List<String> capturedHeaders =
       ExperimentalConfig.get().getMessagingHeaders();
+  private static final boolean receiveInstrumentationEnabled =
+      ExperimentalConfig.get().messagingReceiveInstrumentationEnabled();
 
   private static final Instrumenter<PulsarRequest, Void> CONSUMER_PROCESS_INSTRUMENTER =
       createConsumerProcessInstrumenter();
@@ -64,15 +68,23 @@ public final class PulsarSingletons {
     MessagingAttributesGetter<PulsarRequest, Void> getter =
         PulsarMessagingAttributesGetter.INSTANCE;
 
-    return Instrumenter.<PulsarRequest, Void>builder(
-            TELEMETRY,
-            INSTRUMENTATION_NAME,
-            MessagingSpanNameExtractor.create(getter, MessageOperation.RECEIVE))
-        .addAttributesExtractor(
-            createMessagingAttributesExtractor(getter, MessageOperation.RECEIVE))
-        .addAttributesExtractor(
-            ServerAttributesExtractor.create(new PulsarNetClientAttributesGetter()))
-        .buildConsumerInstrumenter(MessageTextMapGetter.INSTANCE);
+    InstrumenterBuilder<PulsarRequest, Void> instrumenterBuilder =
+        Instrumenter.<PulsarRequest, Void>builder(
+                TELEMETRY,
+                INSTRUMENTATION_NAME,
+                MessagingSpanNameExtractor.create(getter, MessageOperation.RECEIVE))
+            .addAttributesExtractor(
+                createMessagingAttributesExtractor(getter, MessageOperation.RECEIVE))
+            .addAttributesExtractor(
+                ServerAttributesExtractor.create(new PulsarNetClientAttributesGetter()));
+
+    if (receiveInstrumentationEnabled) {
+      return instrumenterBuilder
+          .addSpanLinksExtractor(
+              new PropagatorBasedSpanLinksExtractor<>(PROPAGATOR, MessageTextMapGetter.INSTANCE))
+          .buildInstrumenter(SpanKindExtractor.alwaysConsumer());
+    }
+    return instrumenterBuilder.buildConsumerInstrumenter(MessageTextMapGetter.INSTANCE);
   }
 
   private static Instrumenter<PulsarBatchRequest, Void> createConsumerBatchReceiveInstrumenter() {
@@ -87,10 +99,7 @@ public final class PulsarSingletons {
             createMessagingAttributesExtractor(getter, MessageOperation.RECEIVE))
         .addAttributesExtractor(
             ServerAttributesExtractor.create(new PulsarNetClientAttributesGetter()))
-        .setEnabled(ExperimentalConfig.get().messagingReceiveInstrumentationEnabled())
-        .addSpanLinksExtractor(
-            new PulsarBatchRequestSpanLinksExtractor(
-                GlobalOpenTelemetry.getPropagators().getTextMapPropagator()))
+        .addSpanLinksExtractor(new PulsarBatchRequestSpanLinksExtractor(PROPAGATOR))
         .buildInstrumenter(SpanKindExtractor.alwaysConsumer());
   }
 
@@ -98,13 +107,21 @@ public final class PulsarSingletons {
     MessagingAttributesGetter<PulsarRequest, Void> getter =
         PulsarMessagingAttributesGetter.INSTANCE;
 
-    return Instrumenter.<PulsarRequest, Void>builder(
-            TELEMETRY,
-            INSTRUMENTATION_NAME,
-            MessagingSpanNameExtractor.create(getter, MessageOperation.PROCESS))
-        .addAttributesExtractor(
-            createMessagingAttributesExtractor(getter, MessageOperation.PROCESS))
-        .buildInstrumenter();
+    InstrumenterBuilder<PulsarRequest, Void> instrumenterBuilder =
+        Instrumenter.<PulsarRequest, Void>builder(
+                TELEMETRY,
+                INSTRUMENTATION_NAME,
+                MessagingSpanNameExtractor.create(getter, MessageOperation.PROCESS))
+            .addAttributesExtractor(
+                createMessagingAttributesExtractor(getter, MessageOperation.PROCESS));
+
+    if (receiveInstrumentationEnabled) {
+      SpanLinksExtractor<PulsarRequest> spanLinksExtractor =
+          new PropagatorBasedSpanLinksExtractor<>(PROPAGATOR, MessageTextMapGetter.INSTANCE);
+      instrumenterBuilder.addSpanLinksExtractor(spanLinksExtractor);
+      return instrumenterBuilder.buildInstrumenter(SpanKindExtractor.alwaysConsumer());
+    }
+    return instrumenterBuilder.buildConsumerInstrumenter(MessageTextMapGetter.INSTANCE);
   }
 
   private static Instrumenter<PulsarRequest, Void> createProducerInstrumenter() {
@@ -146,12 +163,17 @@ public final class PulsarSingletons {
     if (!CONSUMER_RECEIVE_INSTRUMENTER.shouldStart(parent, request)) {
       return null;
     }
-    // startAndEnd not supports extract trace context from carrier
-    // start not supports custom startTime
-    // extract trace context by using TEXT_MAP_PROPAGATOR here.
+    if (!receiveInstrumentationEnabled) {
+      // suppress receive span when receive telemetry is not enabled and message is going to be
+      // processed by a listener
+      if (MessageListenerContext.isProcessing()) {
+        return null;
+      }
+      parent = PROPAGATOR.extract(parent, request, MessageTextMapGetter.INSTANCE);
+    }
     return InstrumenterUtil.startAndEnd(
         CONSUMER_RECEIVE_INSTRUMENTER,
-        PROPAGATOR.extract(parent, request, MessageTextMapGetter.INSTANCE),
+        parent,
         request,
         null,
         throwable,
@@ -185,11 +207,17 @@ public final class PulsarSingletons {
 
   public static CompletableFuture<Message<?>> wrap(
       CompletableFuture<Message<?>> future, Timer timer, Consumer<?> consumer) {
+    boolean listenerContextActive = MessageListenerContext.isProcessing();
     Context parent = Context.current();
     CompletableFuture<Message<?>> result = new CompletableFuture<>();
     future.whenComplete(
         (message, throwable) -> {
-          Context context = startAndEndConsumerReceive(parent, message, timer, consumer, throwable);
+          // we create a "receive" span when receive telemetry is enabled or when we know that
+          // this message will not be passed to a listener that would create the "process" span
+          Context context =
+              receiveInstrumentationEnabled || !listenerContextActive
+                  ? startAndEndConsumerReceive(parent, message, timer, consumer, throwable)
+                  : parent;
           runWithContext(
               context,
               () -> {

+ 322 - 0
instrumentation/pulsar/pulsar-2.8/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/AbstractPulsarClientTest.java

@@ -0,0 +1,322 @@
+/*
+ * Copyright The OpenTelemetry Authors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package io.opentelemetry.javaagent.instrumentation.pulsar.v2_8;
+
+import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo;
+import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.satisfies;
+import static io.opentelemetry.semconv.ServerAttributes.SERVER_ADDRESS;
+import static io.opentelemetry.semconv.ServerAttributes.SERVER_PORT;
+import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_BATCH_MESSAGE_COUNT;
+import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME;
+import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_MESSAGE_BODY_SIZE;
+import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_MESSAGE_ID;
+import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_OPERATION;
+import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_SYSTEM;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import io.opentelemetry.api.common.AttributeKey;
+import io.opentelemetry.api.trace.SpanKind;
+import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension;
+import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
+import io.opentelemetry.sdk.testing.assertj.AttributeAssertion;
+import io.opentelemetry.sdk.trace.data.LinkData;
+import io.opentelemetry.sdk.trace.data.SpanData;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Messages;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
+import org.assertj.core.api.AbstractLongAssert;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.PulsarContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.utility.DockerImageName;
+
+abstract class AbstractPulsarClientTest {
+
+  private static final Logger logger = LoggerFactory.getLogger(AbstractPulsarClientTest.class);
+
+  private static final DockerImageName DEFAULT_IMAGE_NAME =
+      DockerImageName.parse("apachepulsar/pulsar:2.8.0");
+
+  @RegisterExtension
+  static final InstrumentationExtension testing = AgentInstrumentationExtension.create();
+
+  private static PulsarContainer pulsar;
+  static PulsarClient client;
+  static PulsarAdmin admin;
+  static Producer<String> producer;
+  static Consumer<String> consumer;
+  static Producer<String> producer2;
+
+  private static String brokerHost;
+  private static int brokerPort;
+
+  private static final AttributeKey<String> MESSAGE_TYPE =
+      AttributeKey.stringKey("messaging.pulsar.message.type");
+
+  @BeforeAll
+  static void beforeAll() throws PulsarClientException {
+    pulsar =
+        new PulsarContainer(DEFAULT_IMAGE_NAME)
+            .withEnv("PULSAR_MEM", "-Xmx128m")
+            .withLogConsumer(new Slf4jLogConsumer(logger))
+            .withStartupTimeout(Duration.ofMinutes(2));
+    pulsar.start();
+
+    brokerHost = pulsar.getHost();
+    brokerPort = pulsar.getMappedPort(6650);
+    client = PulsarClient.builder().serviceUrl(pulsar.getPulsarBrokerUrl()).build();
+    admin = PulsarAdmin.builder().serviceHttpUrl(pulsar.getHttpServiceUrl()).build();
+  }
+
+  @AfterEach
+  void afterEach() throws PulsarClientException {
+    if (producer != null) {
+      producer.close();
+    }
+    if (consumer != null) {
+      consumer.close();
+    }
+    if (producer2 != null) {
+      producer2.close();
+    }
+  }
+
+  @AfterAll
+  static void afterAll() throws PulsarClientException {
+    if (client != null) {
+      client.close();
+    }
+    if (admin != null) {
+      admin.close();
+    }
+    pulsar.close();
+  }
+
+  @Test
+  void testConsumeNonPartitionedTopicUsingBatchReceive() throws Exception {
+    String topic = "persistent://public/default/testConsumeNonPartitionedTopicCallBatchReceive";
+    admin.topics().createNonPartitionedTopic(topic);
+    consumer =
+        client
+            .newConsumer(Schema.STRING)
+            .subscriptionName("test_sub")
+            .topic(topic)
+            .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+            .subscribe();
+
+    producer = client.newProducer(Schema.STRING).topic(topic).enableBatching(false).create();
+
+    String msg = "test";
+    MessageId msgId = testing.runWithSpan("parent", () -> producer.send(msg));
+
+    testing.runWithSpan(
+        "receive-parent",
+        () -> {
+          Messages<String> receivedMsg = consumer.batchReceive();
+          consumer.acknowledge(receivedMsg);
+        });
+    AtomicReference<SpanData> producerSpan = new AtomicReference<>();
+
+    testing.waitAndAssertTraces(
+        trace -> {
+          trace.hasSpansSatisfyingExactly(
+              span -> span.hasName("parent").hasKind(SpanKind.INTERNAL).hasNoParent(),
+              span ->
+                  span.hasName(topic + " publish")
+                      .hasKind(SpanKind.PRODUCER)
+                      .hasParent(trace.getSpan(0))
+                      .hasAttributesSatisfyingExactly(
+                          sendAttributes(topic, msgId.toString(), false)));
+          producerSpan.set(trace.getSpan(1));
+        },
+        trace ->
+            trace.hasSpansSatisfyingExactly(
+                span -> span.hasName("receive-parent").hasKind(SpanKind.INTERNAL).hasNoParent(),
+                span ->
+                    span.hasName(topic + " receive")
+                        .hasKind(SpanKind.CONSUMER)
+                        .hasLinks(LinkData.create(producerSpan.get().getSpanContext()))
+                        .hasParent(trace.getSpan(0))
+                        .hasAttributesSatisfyingExactly(
+                            batchReceiveAttributes(topic, null, false))));
+  }
+
+  @Test
+  void testConsumeNonPartitionedTopicUsingBatchReceiveAsync() throws Exception {
+    String topic =
+        "persistent://public/default/testConsumeNonPartitionedTopicCallBatchReceiveAsync";
+    admin.topics().createNonPartitionedTopic(topic);
+    consumer =
+        client
+            .newConsumer(Schema.STRING)
+            .subscriptionName("test_sub")
+            .topic(topic)
+            .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+            .subscribe();
+
+    producer = client.newProducer(Schema.STRING).topic(topic).enableBatching(false).create();
+
+    String msg = "test";
+    MessageId msgId = testing.runWithSpan("parent", () -> producer.send(msg));
+
+    CompletableFuture<Messages<String>> result =
+        testing.runWithSpan(
+            "receive-parent",
+            () ->
+                consumer
+                    .batchReceiveAsync()
+                    .whenComplete(
+                        (messages, throwable) -> {
+                          if (messages != null) {
+                            testing.runWithSpan(
+                                "callback", () -> acknowledgeMessages(consumer, messages));
+                          }
+                        }));
+
+    assertThat(result.get(1, TimeUnit.MINUTES).size()).isEqualTo(1);
+
+    AtomicReference<SpanData> producerSpan = new AtomicReference<>();
+    testing.waitAndAssertTraces(
+        trace -> {
+          trace.hasSpansSatisfyingExactly(
+              span -> span.hasName("parent").hasKind(SpanKind.INTERNAL).hasNoParent(),
+              span ->
+                  span.hasName(topic + " publish")
+                      .hasKind(SpanKind.PRODUCER)
+                      .hasParent(trace.getSpan(0))
+                      .hasAttributesSatisfyingExactly(
+                          sendAttributes(topic, msgId.toString(), false)));
+
+          producerSpan.set(trace.getSpan(1));
+        },
+        trace ->
+            trace.hasSpansSatisfyingExactly(
+                span -> span.hasName("receive-parent").hasKind(SpanKind.INTERNAL).hasNoParent(),
+                span ->
+                    span.hasName(topic + " receive")
+                        .hasKind(SpanKind.CONSUMER)
+                        .hasParent(trace.getSpan(0))
+                        .hasLinks(LinkData.create(producerSpan.get().getSpanContext()))
+                        .hasAttributesSatisfyingExactly(batchReceiveAttributes(topic, null, false)),
+                span ->
+                    span.hasName("callback")
+                        .hasKind(SpanKind.INTERNAL)
+                        .hasParent(trace.getSpan(1))));
+  }
+
+  static List<AttributeAssertion> sendAttributes(
+      String destination, String messageId, boolean testHeaders) {
+    List<AttributeAssertion> assertions =
+        new ArrayList<>(
+            Arrays.asList(
+                equalTo(MESSAGING_SYSTEM, "pulsar"),
+                equalTo(SERVER_ADDRESS, brokerHost),
+                equalTo(SERVER_PORT, brokerPort),
+                equalTo(MESSAGING_DESTINATION_NAME, destination),
+                equalTo(MESSAGING_OPERATION, "publish"),
+                equalTo(MESSAGING_MESSAGE_ID, messageId),
+                satisfies(MESSAGING_MESSAGE_BODY_SIZE, AbstractLongAssert::isNotNegative),
+                equalTo(MESSAGE_TYPE, "normal")));
+
+    if (testHeaders) {
+      assertions.add(
+          equalTo(
+              AttributeKey.stringArrayKey("messaging.header.test_message_header"),
+              Collections.singletonList("test")));
+    }
+    return assertions;
+  }
+
+  static List<AttributeAssertion> batchReceiveAttributes(
+      String destination, String messageId, boolean testHeaders) {
+    return receiveAttributes(destination, messageId, testHeaders, true);
+  }
+
+  static List<AttributeAssertion> receiveAttributes(
+      String destination, String messageId, boolean testHeaders) {
+    return receiveAttributes(destination, messageId, testHeaders, false);
+  }
+
+  static List<AttributeAssertion> receiveAttributes(
+      String destination, String messageId, boolean testHeaders, boolean isBatch) {
+    List<AttributeAssertion> assertions =
+        new ArrayList<>(
+            Arrays.asList(
+                equalTo(MESSAGING_SYSTEM, "pulsar"),
+                equalTo(SERVER_ADDRESS, brokerHost),
+                equalTo(SERVER_PORT, brokerPort),
+                equalTo(MESSAGING_DESTINATION_NAME, destination),
+                equalTo(MESSAGING_OPERATION, "receive"),
+                equalTo(MESSAGING_MESSAGE_ID, messageId),
+                satisfies(MESSAGING_MESSAGE_BODY_SIZE, AbstractLongAssert::isNotNegative)));
+    if (testHeaders) {
+      assertions.add(
+          equalTo(
+              AttributeKey.stringArrayKey("messaging.header.test_message_header"),
+              Collections.singletonList("test")));
+    }
+    if (isBatch) {
+      assertions.add(satisfies(MESSAGING_BATCH_MESSAGE_COUNT, AbstractLongAssert::isPositive));
+    }
+    return assertions;
+  }
+
+  static List<AttributeAssertion> processAttributes(
+      String destination, String messageId, boolean testHeaders) {
+    List<AttributeAssertion> assertions =
+        new ArrayList<>(
+            Arrays.asList(
+                equalTo(MESSAGING_SYSTEM, "pulsar"),
+                equalTo(MESSAGING_DESTINATION_NAME, destination),
+                equalTo(MESSAGING_OPERATION, "process"),
+                equalTo(MESSAGING_MESSAGE_ID, messageId),
+                satisfies(MESSAGING_MESSAGE_BODY_SIZE, AbstractLongAssert::isNotNegative)));
+    if (testHeaders) {
+      assertions.add(
+          equalTo(
+              AttributeKey.stringArrayKey("messaging.header.test_message_header"),
+              Collections.singletonList("test")));
+    }
+    return assertions;
+  }
+
+  static void acknowledgeMessage(Consumer<String> consumer, Message<String> message) {
+    try {
+      consumer.acknowledge(message);
+    } catch (PulsarClientException exception) {
+      throw new RuntimeException(exception);
+    }
+  }
+
+  static void acknowledgeMessages(Consumer<String> consumer, Messages<String> messages) {
+    try {
+      consumer.acknowledge(messages);
+    } catch (PulsarClientException exception) {
+      throw new RuntimeException(exception);
+    }
+  }
+}

+ 346 - 0
instrumentation/pulsar/pulsar-2.8/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/PulsarClientSuppressReceiveSpansTest.java

@@ -0,0 +1,346 @@
+/*
+ * Copyright The OpenTelemetry Authors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package io.opentelemetry.javaagent.instrumentation.pulsar.v2_8;
+
+import static io.opentelemetry.instrumentation.testing.util.TelemetryDataUtil.orderByRootSpanName;
+
+import io.opentelemetry.api.trace.SpanKind;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.MessageListener;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
+import org.junit.jupiter.api.Test;
+
+class PulsarClientSuppressReceiveSpansTest extends AbstractPulsarClientTest {
+
+  @Test
+  void testConsumeNonPartitionedTopic() throws Exception {
+    String topic = "persistent://public/default/testConsumeNonPartitionedTopic";
+    CountDownLatch latch = new CountDownLatch(1);
+    admin.topics().createNonPartitionedTopic(topic);
+    consumer =
+        client
+            .newConsumer(Schema.STRING)
+            .subscriptionName("test_sub")
+            .topic(topic)
+            .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+            .messageListener(
+                (MessageListener<String>)
+                    (consumer, msg) -> {
+                      acknowledgeMessage(consumer, msg);
+                      latch.countDown();
+                    })
+            .subscribe();
+
+    producer = client.newProducer(Schema.STRING).topic(topic).enableBatching(false).create();
+
+    String msg = "test";
+    MessageId msgId = testing.runWithSpan("parent", () -> producer.send(msg));
+
+    latch.await(1, TimeUnit.MINUTES);
+
+    testing.waitAndAssertTraces(
+        trace ->
+            trace.hasSpansSatisfyingExactly(
+                span -> span.hasName("parent").hasKind(SpanKind.INTERNAL).hasNoParent(),
+                span ->
+                    span.hasName(topic + " publish")
+                        .hasKind(SpanKind.PRODUCER)
+                        .hasParent(trace.getSpan(0))
+                        .hasAttributesSatisfyingExactly(
+                            sendAttributes(topic, msgId.toString(), false)),
+                span ->
+                    span.hasName(topic + " process")
+                        .hasKind(SpanKind.CONSUMER)
+                        .hasParent(trace.getSpan(1))
+                        .hasAttributesSatisfyingExactly(
+                            processAttributes(topic, msgId.toString(), false))));
+  }
+
+  @Test
+  void testConsumeNonPartitionedTopicUsingReceive() throws Exception {
+    String topic = "persistent://public/default/testConsumeNonPartitionedTopicCallReceive";
+    admin.topics().createNonPartitionedTopic(topic);
+    consumer =
+        client
+            .newConsumer(Schema.STRING)
+            .subscriptionName("test_sub")
+            .topic(topic)
+            .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+            .subscribe();
+    producer = client.newProducer(Schema.STRING).topic(topic).enableBatching(false).create();
+
+    String msg = "test";
+    MessageId msgId = testing.runWithSpan("parent", () -> producer.send(msg));
+
+    Message<String> receivedMsg = consumer.receive();
+    consumer.acknowledge(receivedMsg);
+
+    testing.waitAndAssertTraces(
+        trace ->
+            trace.hasSpansSatisfyingExactly(
+                span -> span.hasName("parent").hasKind(SpanKind.INTERNAL).hasNoParent(),
+                span ->
+                    span.hasName(topic + " publish")
+                        .hasKind(SpanKind.PRODUCER)
+                        .hasParent(trace.getSpan(0))
+                        .hasAttributesSatisfyingExactly(
+                            sendAttributes(topic, msgId.toString(), false)),
+                span ->
+                    span.hasName(topic + " receive")
+                        .hasKind(SpanKind.CONSUMER)
+                        .hasParent(trace.getSpan(1))
+                        .hasAttributesSatisfyingExactly(
+                            receiveAttributes(topic, msgId.toString(), false))));
+  }
+
+  @Test
+  void testConsumeNonPartitionedTopicUsingReceiveAsync() throws Exception {
+    String topic = "persistent://public/default/testConsumeNonPartitionedTopicCallReceiveAsync";
+    admin.topics().createNonPartitionedTopic(topic);
+    consumer =
+        client
+            .newConsumer(Schema.STRING)
+            .subscriptionName("test_sub")
+            .topic(topic)
+            .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+            .subscribe();
+
+    producer = client.newProducer(Schema.STRING).topic(topic).enableBatching(false).create();
+
+    CompletableFuture<Message<String>> result =
+        consumer
+            .receiveAsync()
+            .whenComplete(
+                (message, throwable) -> {
+                  if (message != null) {
+                    testing.runWithSpan("callback", () -> acknowledgeMessage(consumer, message));
+                  }
+                });
+
+    String msg = "test";
+    MessageId msgId = testing.runWithSpan("parent", () -> producer.send(msg));
+
+    result.get(1, TimeUnit.MINUTES);
+
+    testing.waitAndAssertTraces(
+        trace ->
+            trace.hasSpansSatisfyingExactly(
+                span -> span.hasName("parent").hasKind(SpanKind.INTERNAL).hasNoParent(),
+                span ->
+                    span.hasName(topic + " publish")
+                        .hasKind(SpanKind.PRODUCER)
+                        .hasParent(trace.getSpan(0))
+                        .hasAttributesSatisfyingExactly(
+                            sendAttributes(topic, msgId.toString(), false)),
+                span ->
+                    span.hasName(topic + " receive")
+                        .hasKind(SpanKind.CONSUMER)
+                        .hasParent(trace.getSpan(1))
+                        .hasAttributesSatisfyingExactly(
+                            receiveAttributes(topic, msgId.toString(), false)),
+                span ->
+                    span.hasName("callback")
+                        .hasKind(SpanKind.INTERNAL)
+                        .hasParent(trace.getSpan(2))));
+  }
+
+  @Test
+  void testConsumeNonPartitionedTopicUsingReceiveWithTimeout() throws Exception {
+    String topic =
+        "persistent://public/default/testConsumeNonPartitionedTopicCallReceiveWithTimeout";
+    admin.topics().createNonPartitionedTopic(topic);
+    consumer =
+        client
+            .newConsumer(Schema.STRING)
+            .subscriptionName("test_sub")
+            .topic(topic)
+            .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+            .subscribe();
+    producer = client.newProducer(Schema.STRING).topic(topic).enableBatching(false).create();
+
+    String msg = "test";
+    MessageId msgId = testing.runWithSpan("parent", () -> producer.send(msg));
+
+    Message<String> receivedMsg = consumer.receive(1, TimeUnit.MINUTES);
+    consumer.acknowledge(receivedMsg);
+
+    testing.waitAndAssertTraces(
+        trace ->
+            trace.hasSpansSatisfyingExactly(
+                span -> span.hasName("parent").hasKind(SpanKind.INTERNAL).hasNoParent(),
+                span ->
+                    span.hasName(topic + " publish")
+                        .hasKind(SpanKind.PRODUCER)
+                        .hasParent(trace.getSpan(0))
+                        .hasAttributesSatisfyingExactly(
+                            sendAttributes(topic, msgId.toString(), false)),
+                span ->
+                    span.hasName(topic + " receive")
+                        .hasKind(SpanKind.CONSUMER)
+                        .hasParent(trace.getSpan(1))
+                        .hasAttributesSatisfyingExactly(
+                            receiveAttributes(topic, msgId.toString(), false))));
+  }
+
+  @Test
+  void captureMessageHeaderAsSpanAttribute() throws Exception {
+    String topic = "persistent://public/default/testCaptureMessageHeaderTopic";
+    CountDownLatch latch = new CountDownLatch(1);
+    admin.topics().createNonPartitionedTopic(topic);
+    consumer =
+        client
+            .newConsumer(Schema.STRING)
+            .subscriptionName("test_sub")
+            .topic(topic)
+            .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+            .messageListener(
+                (MessageListener<String>)
+                    (consumer, msg) -> {
+                      acknowledgeMessage(consumer, msg);
+                      latch.countDown();
+                    })
+            .subscribe();
+
+    producer = client.newProducer(Schema.STRING).topic(topic).enableBatching(false).create();
+
+    String msg = "test";
+    MessageId msgId =
+        testing.runWithSpan(
+            "parent",
+            () -> producer.newMessage().value(msg).property("test-message-header", "test").send());
+
+    latch.await(1, TimeUnit.MINUTES);
+
+    testing.waitAndAssertTraces(
+        trace ->
+            trace.hasSpansSatisfyingExactly(
+                span -> span.hasName("parent").hasKind(SpanKind.INTERNAL).hasNoParent(),
+                span ->
+                    span.hasName(topic + " publish")
+                        .hasKind(SpanKind.PRODUCER)
+                        .hasParent(trace.getSpan(0))
+                        .hasAttributesSatisfyingExactly(
+                            sendAttributes(topic, msgId.toString(), true)),
+                span ->
+                    span.hasName(topic + " process")
+                        .hasKind(SpanKind.CONSUMER)
+                        .hasParent(trace.getSpan(1))
+                        .hasAttributesSatisfyingExactly(
+                            processAttributes(topic, msgId.toString(), true))));
+  }
+
+  @Test
+  void testConsumePartitionedTopic() throws Exception {
+    String topic = "persistent://public/default/testConsumePartitionedTopic";
+    admin.topics().createPartitionedTopic(topic, 1);
+    CountDownLatch latch = new CountDownLatch(1);
+
+    consumer =
+        client
+            .newConsumer(Schema.STRING)
+            .subscriptionName("test_sub")
+            .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+            .topic(topic)
+            .messageListener(
+                (MessageListener<String>)
+                    (consumer, msg) -> {
+                      acknowledgeMessage(consumer, msg);
+                      latch.countDown();
+                    })
+            .subscribe();
+
+    producer = client.newProducer(Schema.STRING).topic(topic).enableBatching(false).create();
+
+    String msg = "test";
+    MessageId msgId = testing.runWithSpan("parent", () -> producer.send(msg));
+
+    latch.await(1, TimeUnit.MINUTES);
+
+    testing.waitAndAssertTraces(
+        trace ->
+            trace.hasSpansSatisfyingExactly(
+                span -> span.hasName("parent").hasKind(SpanKind.INTERNAL).hasNoParent(),
+                span ->
+                    span.hasName(topic + "-partition-0 publish")
+                        .hasKind(SpanKind.PRODUCER)
+                        .hasParent(trace.getSpan(0))
+                        .hasAttributesSatisfyingExactly(
+                            sendAttributes(topic + "-partition-0", msgId.toString(), false)),
+                span ->
+                    span.hasName(topic + "-partition-0 process")
+                        .hasKind(SpanKind.CONSUMER)
+                        .hasParent(trace.getSpan(1))
+                        .hasAttributesSatisfyingExactly(
+                            processAttributes(topic + "-partition-0", msgId.toString(), false))));
+  }
+
+  @Test
+  void testConsumeMultiTopics() throws Exception {
+    String topicNamePrefix = "persistent://public/default/testConsumeMulti_";
+    String topic1 = topicNamePrefix + "1";
+    String topic2 = topicNamePrefix + "2";
+    CountDownLatch latch = new CountDownLatch(2);
+    producer = client.newProducer(Schema.STRING).topic(topic1).enableBatching(false).create();
+    producer2 = client.newProducer(Schema.STRING).topic(topic2).enableBatching(false).create();
+
+    MessageId msgId1 = testing.runWithSpan("parent1", () -> producer.send("test1"));
+    MessageId msgId2 = testing.runWithSpan("parent2", () -> producer2.send("test2"));
+
+    consumer =
+        client
+            .newConsumer(Schema.STRING)
+            .topic(topic2, topic1)
+            .subscriptionName("test_sub")
+            .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+            .messageListener(
+                (MessageListener<String>)
+                    (consumer, msg) -> {
+                      acknowledgeMessage(consumer, msg);
+                      latch.countDown();
+                    })
+            .subscribe();
+
+    latch.await(1, TimeUnit.MINUTES);
+
+    testing.waitAndAssertSortedTraces(
+        orderByRootSpanName("parent1", "parent2"),
+        trace ->
+            trace.hasSpansSatisfyingExactly(
+                span -> span.hasName("parent1").hasKind(SpanKind.INTERNAL).hasNoParent(),
+                span ->
+                    span.hasName(topic1 + " publish")
+                        .hasKind(SpanKind.PRODUCER)
+                        .hasParent(trace.getSpan(0))
+                        .hasAttributesSatisfyingExactly(
+                            sendAttributes(topic1, msgId1.toString(), false)),
+                span ->
+                    span.hasName(topic1 + " process")
+                        .hasKind(SpanKind.CONSUMER)
+                        .hasParent(trace.getSpan(1))
+                        .hasAttributesSatisfyingExactly(
+                            processAttributes(topic1, msgId1.toString(), false))),
+        trace ->
+            trace.hasSpansSatisfyingExactly(
+                span -> span.hasName("parent2").hasKind(SpanKind.INTERNAL).hasNoParent(),
+                span ->
+                    span.hasName(topic2 + " publish")
+                        .hasKind(SpanKind.PRODUCER)
+                        .hasParent(trace.getSpan(0))
+                        .hasAttributesSatisfyingExactly(
+                            sendAttributes(topic2, msgId2.toString(), false)),
+                span ->
+                    span.hasName(topic2 + " process")
+                        .hasKind(SpanKind.CONSUMER)
+                        .hasParent(trace.getSpan(1))
+                        .hasAttributesSatisfyingExactly(
+                            processAttributes(topic2, msgId2.toString(), false))));
+  }
+}

+ 139 - 409
instrumentation/pulsar/pulsar-2.8/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/PulsarClientTest.java

@@ -5,129 +5,24 @@
 
 package io.opentelemetry.javaagent.instrumentation.pulsar.v2_8;
 
+import static io.opentelemetry.instrumentation.testing.util.TelemetryDataUtil.orderByRootSpanKind;
 import static io.opentelemetry.instrumentation.testing.util.TelemetryDataUtil.orderByRootSpanName;
-import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo;
-import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.satisfies;
-import static org.assertj.core.api.Assertions.assertThat;
 
-import io.opentelemetry.api.common.AttributeKey;
 import io.opentelemetry.api.trace.SpanKind;
-import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension;
-import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
-import io.opentelemetry.sdk.testing.assertj.AttributeAssertion;
 import io.opentelemetry.sdk.trace.data.LinkData;
 import io.opentelemetry.sdk.trace.data.SpanData;
-import io.opentelemetry.semconv.ServerAttributes;
-import io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes;
-import java.time.Duration;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
-import org.apache.pulsar.client.admin.PulsarAdmin;
-import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.MessageListener;
-import org.apache.pulsar.client.api.Messages;
-import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionInitialPosition;
-import org.assertj.core.api.AbstractLongAssert;
-import org.junit.jupiter.api.AfterAll;
-import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.RegisterExtension;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.testcontainers.containers.PulsarContainer;
-import org.testcontainers.containers.output.Slf4jLogConsumer;
-import org.testcontainers.utility.DockerImageName;
-
-class PulsarClientTest {
-
-  private static final Logger logger = LoggerFactory.getLogger(PulsarClientTest.class);
-
-  private static final DockerImageName DEFAULT_IMAGE_NAME =
-      DockerImageName.parse("apachepulsar/pulsar:2.8.0");
-
-  @RegisterExtension
-  static final InstrumentationExtension testing = AgentInstrumentationExtension.create();
-
-  private static PulsarContainer pulsar;
-  private static PulsarClient client;
-  private static PulsarAdmin admin;
-  private static Producer<String> producer;
-  private static Consumer<String> consumer;
-  private static Producer<String> producer2;
-
-  private static String brokerHost;
-  private static int brokerPort;
-
-  private static final AttributeKey<String> MESSAGE_TYPE =
-      AttributeKey.stringKey("messaging.pulsar.message.type");
-
-  @BeforeAll
-  static void beforeAll() throws PulsarClientException {
-    pulsar =
-        new PulsarContainer(DEFAULT_IMAGE_NAME)
-            .withEnv("PULSAR_MEM", "-Xmx128m")
-            .withLogConsumer(new Slf4jLogConsumer(logger))
-            .withStartupTimeout(Duration.ofMinutes(2));
-    pulsar.start();
-
-    brokerHost = pulsar.getHost();
-    brokerPort = pulsar.getMappedPort(6650);
-    client = PulsarClient.builder().serviceUrl(pulsar.getPulsarBrokerUrl()).build();
-    admin = PulsarAdmin.builder().serviceHttpUrl(pulsar.getHttpServiceUrl()).build();
-  }
-
-  @AfterAll
-  static void afterAll() throws PulsarClientException {
-    if (producer != null) {
-      producer.close();
-    }
-    if (consumer != null) {
-      consumer.close();
-    }
-    if (producer2 != null) {
-      producer2.close();
-    }
-    if (client != null) {
-      client.close();
-    }
-    if (admin != null) {
-      admin.close();
-    }
-    pulsar.close();
-  }
-
-  @Test
-  void testSendNonPartitionedTopic() throws Exception {
-    String topic = "persistent://public/default/testSendNonPartitionedTopic";
-    admin.topics().createNonPartitionedTopic(topic);
-    producer = client.newProducer(Schema.STRING).topic(topic).enableBatching(false).create();
-
-    String msg = "test";
-    MessageId msgId = testing.runWithSpan("parent", () -> producer.send(msg));
 
-    testing.waitAndAssertTraces(
-        trace ->
-            trace.hasSpansSatisfyingExactly(
-                span -> span.hasName("parent").hasKind(SpanKind.INTERNAL).hasNoParent(),
-                span ->
-                    span.hasName(topic + " publish")
-                        .hasKind(SpanKind.PRODUCER)
-                        .hasParent(trace.getSpan(0))
-                        .hasAttributesSatisfyingExactly(
-                            sendAttributes(topic, msgId.toString(), false))));
-  }
+class PulsarClientTest extends AbstractPulsarClientTest {
 
   @Test
   void testConsumeNonPartitionedTopic() throws Exception {
@@ -155,26 +50,35 @@ class PulsarClientTest {
 
     latch.await(1, TimeUnit.MINUTES);
 
-    testing.waitAndAssertTraces(
+    AtomicReference<SpanData> producerSpan = new AtomicReference<>();
+    testing.waitAndAssertSortedTraces(
+        orderByRootSpanKind(SpanKind.INTERNAL, SpanKind.CONSUMER),
+        trace -> {
+          trace.hasSpansSatisfyingExactly(
+              span -> span.hasName("parent").hasKind(SpanKind.INTERNAL).hasNoParent(),
+              span ->
+                  span.hasName(topic + " publish")
+                      .hasKind(SpanKind.PRODUCER)
+                      .hasParent(trace.getSpan(0))
+                      .hasAttributesSatisfyingExactly(
+                          sendAttributes(topic, msgId.toString(), false)));
+
+          producerSpan.set(trace.getSpan(1));
+        },
         trace ->
             trace.hasSpansSatisfyingExactly(
-                span -> span.hasName("parent").hasKind(SpanKind.INTERNAL).hasNoParent(),
-                span ->
-                    span.hasName(topic + " publish")
-                        .hasKind(SpanKind.PRODUCER)
-                        .hasParent(trace.getSpan(0))
-                        .hasAttributesSatisfyingExactly(
-                            sendAttributes(topic, msgId.toString(), false)),
                 span ->
                     span.hasName(topic + " receive")
                         .hasKind(SpanKind.CONSUMER)
-                        .hasParent(trace.getSpan(1))
+                        .hasNoParent()
+                        .hasLinks(LinkData.create(producerSpan.get().getSpanContext()))
                         .hasAttributesSatisfyingExactly(
                             receiveAttributes(topic, msgId.toString(), false)),
                 span ->
                     span.hasName(topic + " process")
-                        .hasKind(SpanKind.INTERNAL)
-                        .hasParent(trace.getSpan(2))
+                        .hasKind(SpanKind.CONSUMER)
+                        .hasParent(trace.getSpan(0))
+                        .hasLinks(LinkData.create(producerSpan.get().getSpanContext()))
                         .hasAttributesSatisfyingExactly(
                             processAttributes(topic, msgId.toString(), false))));
   }
@@ -198,20 +102,28 @@ class PulsarClientTest {
     Message<String> receivedMsg = consumer.receive();
     consumer.acknowledge(receivedMsg);
 
-    testing.waitAndAssertTraces(
+    AtomicReference<SpanData> producerSpan = new AtomicReference<>();
+    testing.waitAndAssertSortedTraces(
+        orderByRootSpanKind(SpanKind.INTERNAL, SpanKind.CONSUMER),
+        trace -> {
+          trace.hasSpansSatisfyingExactly(
+              span -> span.hasName("parent").hasKind(SpanKind.INTERNAL).hasNoParent(),
+              span ->
+                  span.hasName(topic + " publish")
+                      .hasKind(SpanKind.PRODUCER)
+                      .hasParent(trace.getSpan(0))
+                      .hasAttributesSatisfyingExactly(
+                          sendAttributes(topic, msgId.toString(), false)));
+
+          producerSpan.set(trace.getSpan(1));
+        },
         trace ->
             trace.hasSpansSatisfyingExactly(
-                span -> span.hasName("parent").hasKind(SpanKind.INTERNAL).hasNoParent(),
-                span ->
-                    span.hasName(topic + " publish")
-                        .hasKind(SpanKind.PRODUCER)
-                        .hasParent(trace.getSpan(0))
-                        .hasAttributesSatisfyingExactly(
-                            sendAttributes(topic, msgId.toString(), false)),
                 span ->
                     span.hasName(topic + " receive")
                         .hasKind(SpanKind.CONSUMER)
-                        .hasParent(trace.getSpan(1))
+                        .hasNoParent()
+                        .hasLinks(LinkData.create(producerSpan.get().getSpanContext()))
                         .hasAttributesSatisfyingExactly(
                             receiveAttributes(topic, msgId.toString(), false))));
   }
@@ -245,26 +157,34 @@ class PulsarClientTest {
 
     result.get(1, TimeUnit.MINUTES);
 
-    testing.waitAndAssertTraces(
+    AtomicReference<SpanData> producerSpan = new AtomicReference<>();
+    testing.waitAndAssertSortedTraces(
+        orderByRootSpanKind(SpanKind.INTERNAL, SpanKind.CONSUMER),
+        trace -> {
+          trace.hasSpansSatisfyingExactly(
+              span -> span.hasName("parent").hasKind(SpanKind.INTERNAL).hasNoParent(),
+              span ->
+                  span.hasName(topic + " publish")
+                      .hasKind(SpanKind.PRODUCER)
+                      .hasParent(trace.getSpan(0))
+                      .hasAttributesSatisfyingExactly(
+                          sendAttributes(topic, msgId.toString(), false)));
+
+          producerSpan.set(trace.getSpan(1));
+        },
         trace ->
             trace.hasSpansSatisfyingExactly(
-                span -> span.hasName("parent").hasKind(SpanKind.INTERNAL).hasNoParent(),
-                span ->
-                    span.hasName(topic + " publish")
-                        .hasKind(SpanKind.PRODUCER)
-                        .hasParent(trace.getSpan(0))
-                        .hasAttributesSatisfyingExactly(
-                            sendAttributes(topic, msgId.toString(), false)),
                 span ->
                     span.hasName(topic + " receive")
                         .hasKind(SpanKind.CONSUMER)
-                        .hasParent(trace.getSpan(1))
+                        .hasNoParent()
+                        .hasLinks(LinkData.create(producerSpan.get().getSpanContext()))
                         .hasAttributesSatisfyingExactly(
                             receiveAttributes(topic, msgId.toString(), false)),
                 span ->
                     span.hasName("callback")
                         .hasKind(SpanKind.INTERNAL)
-                        .hasParent(trace.getSpan(2))));
+                        .hasParent(trace.getSpan(0))));
   }
 
   @Test
@@ -287,50 +207,9 @@ class PulsarClientTest {
     Message<String> receivedMsg = consumer.receive(1, TimeUnit.MINUTES);
     consumer.acknowledge(receivedMsg);
 
-    testing.waitAndAssertTraces(
-        trace ->
-            trace.hasSpansSatisfyingExactly(
-                span -> span.hasName("parent").hasKind(SpanKind.INTERNAL).hasNoParent(),
-                span ->
-                    span.hasName(topic + " publish")
-                        .hasKind(SpanKind.PRODUCER)
-                        .hasParent(trace.getSpan(0))
-                        .hasAttributesSatisfyingExactly(
-                            sendAttributes(topic, msgId.toString(), false)),
-                span ->
-                    span.hasName(topic + " receive")
-                        .hasKind(SpanKind.CONSUMER)
-                        .hasParent(trace.getSpan(1))
-                        .hasAttributesSatisfyingExactly(
-                            receiveAttributes(topic, msgId.toString(), false))));
-  }
-
-  @Test
-  void testConsumeNonPartitionedTopicUsingBatchReceive() throws Exception {
-    String topic = "persistent://public/default/testConsumeNonPartitionedTopicCallBatchReceive";
-    admin.topics().createNonPartitionedTopic(topic);
-    consumer =
-        client
-            .newConsumer(Schema.STRING)
-            .subscriptionName("test_sub")
-            .topic(topic)
-            .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
-            .subscribe();
-
-    producer = client.newProducer(Schema.STRING).topic(topic).enableBatching(false).create();
-
-    String msg = "test";
-    MessageId msgId = testing.runWithSpan("parent", () -> producer.send(msg));
-
-    testing.runWithSpan(
-        "receive-parent",
-        () -> {
-          Messages<String> receivedMsg = consumer.batchReceive();
-          consumer.acknowledge(receivedMsg);
-        });
     AtomicReference<SpanData> producerSpan = new AtomicReference<>();
-
-    testing.waitAndAssertTraces(
+    testing.waitAndAssertSortedTraces(
+        orderByRootSpanKind(SpanKind.INTERNAL, SpanKind.CONSUMER),
         trace -> {
           trace.hasSpansSatisfyingExactly(
               span -> span.hasName("parent").hasKind(SpanKind.INTERNAL).hasNoParent(),
@@ -340,80 +219,18 @@ class PulsarClientTest {
                       .hasParent(trace.getSpan(0))
                       .hasAttributesSatisfyingExactly(
                           sendAttributes(topic, msgId.toString(), false)));
+
           producerSpan.set(trace.getSpan(1));
         },
         trace ->
             trace.hasSpansSatisfyingExactly(
-                span -> span.hasName("receive-parent").hasKind(SpanKind.INTERNAL).hasNoParent(),
                 span ->
                     span.hasName(topic + " receive")
                         .hasKind(SpanKind.CONSUMER)
+                        .hasNoParent()
                         .hasLinks(LinkData.create(producerSpan.get().getSpanContext()))
-                        .hasParent(trace.getSpan(0))
                         .hasAttributesSatisfyingExactly(
-                            batchReceiveAttributes(topic, null, false))));
-  }
-
-  @Test
-  void testConsumeNonPartitionedTopicUsingBatchReceiveAsync() throws Exception {
-    String topic =
-        "persistent://public/default/testConsumeNonPartitionedTopicCallBatchReceiveAsync";
-    admin.topics().createNonPartitionedTopic(topic);
-    consumer =
-        client
-            .newConsumer(Schema.STRING)
-            .subscriptionName("test_sub")
-            .topic(topic)
-            .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
-            .subscribe();
-
-    producer = client.newProducer(Schema.STRING).topic(topic).enableBatching(false).create();
-
-    String msg = "test";
-    MessageId msgId = testing.runWithSpan("parent", () -> producer.send(msg));
-
-    CompletableFuture<Messages<String>> result =
-        testing.runWithSpan(
-            "receive-parent",
-            () ->
-                consumer
-                    .batchReceiveAsync()
-                    .whenComplete(
-                        (messages, throwable) -> {
-                          if (messages != null) {
-                            testing.runWithSpan(
-                                "callback", () -> acknowledgeMessages(consumer, messages));
-                          }
-                        }));
-
-    assertThat(result.get(1, TimeUnit.MINUTES).size()).isEqualTo(1);
-
-    AtomicReference<SpanData> producerSpan = new AtomicReference<>();
-    testing.waitAndAssertTraces(
-        trace ->
-            trace.hasSpansSatisfyingExactly(
-                span -> span.hasName("parent").hasKind(SpanKind.INTERNAL).hasNoParent(),
-                span -> {
-                  span.hasName(topic + " publish")
-                      .hasKind(SpanKind.PRODUCER)
-                      .hasParent(trace.getSpan(0))
-                      .hasAttributesSatisfyingExactly(
-                          sendAttributes(topic, msgId.toString(), false));
-                  producerSpan.set(trace.getSpan(1));
-                }),
-        trace ->
-            trace.hasSpansSatisfyingExactly(
-                span -> span.hasName("receive-parent").hasKind(SpanKind.INTERNAL).hasNoParent(),
-                span ->
-                    span.hasName(topic + " receive")
-                        .hasKind(SpanKind.CONSUMER)
-                        .hasParent(trace.getSpan(0))
-                        .hasLinks(LinkData.create(producerSpan.get().getSpanContext()))
-                        .hasAttributesSatisfyingExactly(batchReceiveAttributes(topic, null, false)),
-                span ->
-                    span.hasName("callback")
-                        .hasKind(SpanKind.INTERNAL)
-                        .hasParent(trace.getSpan(1))));
+                            receiveAttributes(topic, msgId.toString(), false))));
   }
 
   @Test
@@ -445,49 +262,37 @@ class PulsarClientTest {
 
     latch.await(1, TimeUnit.MINUTES);
 
-    testing.waitAndAssertTraces(
+    AtomicReference<SpanData> producerSpan = new AtomicReference<>();
+    testing.waitAndAssertSortedTraces(
+        orderByRootSpanKind(SpanKind.INTERNAL, SpanKind.CONSUMER),
+        trace -> {
+          trace.hasSpansSatisfyingExactly(
+              span -> span.hasName("parent").hasKind(SpanKind.INTERNAL).hasNoParent(),
+              span ->
+                  span.hasName(topic + " publish")
+                      .hasKind(SpanKind.PRODUCER)
+                      .hasParent(trace.getSpan(0))
+                      .hasAttributesSatisfyingExactly(
+                          sendAttributes(topic, msgId.toString(), true)));
+
+          producerSpan.set(trace.getSpan(1));
+        },
         trace ->
             trace.hasSpansSatisfyingExactly(
-                span -> span.hasName("parent").hasKind(SpanKind.INTERNAL).hasNoParent(),
-                span ->
-                    span.hasName(topic + " publish")
-                        .hasKind(SpanKind.PRODUCER)
-                        .hasParent(trace.getSpan(0))
-                        .hasAttributesSatisfyingExactly(
-                            sendAttributes(topic, msgId.toString(), true)),
                 span ->
                     span.hasName(topic + " receive")
                         .hasKind(SpanKind.CONSUMER)
-                        .hasParent(trace.getSpan(1))
+                        .hasNoParent()
+                        .hasLinks(LinkData.create(producerSpan.get().getSpanContext()))
                         .hasAttributesSatisfyingExactly(
                             receiveAttributes(topic, msgId.toString(), true)),
                 span ->
                     span.hasName(topic + " process")
-                        .hasKind(SpanKind.INTERNAL)
-                        .hasParent(trace.getSpan(2))
-                        .hasAttributesSatisfyingExactly(
-                            processAttributes(topic, msgId.toString(), true))));
-  }
-
-  @Test
-  void testSendPartitionedTopic() throws Exception {
-    String topic = "persistent://public/default/testSendPartitionedTopic";
-    admin.topics().createPartitionedTopic(topic, 1);
-    producer = client.newProducer(Schema.STRING).topic(topic).enableBatching(false).create();
-
-    String msg = "test";
-    MessageId msgId = testing.runWithSpan("parent", () -> producer.send(msg));
-
-    testing.waitAndAssertTraces(
-        trace ->
-            trace.hasSpansSatisfyingExactly(
-                span -> span.hasName("parent").hasKind(SpanKind.INTERNAL).hasNoParent(),
-                span ->
-                    span.hasName(topic + "-partition-0 publish")
-                        .hasKind(SpanKind.PRODUCER)
+                        .hasKind(SpanKind.CONSUMER)
                         .hasParent(trace.getSpan(0))
+                        .hasLinks(LinkData.create(producerSpan.get().getSpanContext()))
                         .hasAttributesSatisfyingExactly(
-                            sendAttributes(topic + "-partition-0", msgId.toString(), false))));
+                            processAttributes(topic, msgId.toString(), true))));
   }
 
   @Test
@@ -517,26 +322,35 @@ class PulsarClientTest {
 
     latch.await(1, TimeUnit.MINUTES);
 
-    testing.waitAndAssertTraces(
+    AtomicReference<SpanData> producerSpan = new AtomicReference<>();
+    testing.waitAndAssertSortedTraces(
+        orderByRootSpanKind(SpanKind.INTERNAL, SpanKind.CONSUMER),
+        trace -> {
+          trace.hasSpansSatisfyingExactly(
+              span -> span.hasName("parent").hasKind(SpanKind.INTERNAL).hasNoParent(),
+              span ->
+                  span.hasName(topic + "-partition-0 publish")
+                      .hasKind(SpanKind.PRODUCER)
+                      .hasParent(trace.getSpan(0))
+                      .hasAttributesSatisfyingExactly(
+                          sendAttributes(topic + "-partition-0", msgId.toString(), false)));
+
+          producerSpan.set(trace.getSpan(1));
+        },
         trace ->
             trace.hasSpansSatisfyingExactly(
-                span -> span.hasName("parent").hasKind(SpanKind.INTERNAL).hasNoParent(),
-                span ->
-                    span.hasName(topic + "-partition-0 publish")
-                        .hasKind(SpanKind.PRODUCER)
-                        .hasParent(trace.getSpan(0))
-                        .hasAttributesSatisfyingExactly(
-                            sendAttributes(topic + "-partition-0", msgId.toString(), false)),
                 span ->
                     span.hasName(topic + "-partition-0 receive")
                         .hasKind(SpanKind.CONSUMER)
-                        .hasParent(trace.getSpan(1))
+                        .hasNoParent()
+                        .hasLinks(LinkData.create(producerSpan.get().getSpanContext()))
                         .hasAttributesSatisfyingExactly(
                             receiveAttributes(topic + "-partition-0", msgId.toString(), false)),
                 span ->
                     span.hasName(topic + "-partition-0 process")
-                        .hasKind(SpanKind.INTERNAL)
-                        .hasParent(trace.getSpan(2))
+                        .hasKind(SpanKind.CONSUMER)
+                        .hasParent(trace.getSpan(0))
+                        .hasLinks(LinkData.create(producerSpan.get().getSpanContext()))
                         .hasAttributesSatisfyingExactly(
                             processAttributes(topic + "-partition-0", msgId.toString(), false))));
   }
@@ -569,149 +383,65 @@ class PulsarClientTest {
 
     latch.await(1, TimeUnit.MINUTES);
 
+    AtomicReference<SpanData> producerSpan = new AtomicReference<>();
+    AtomicReference<SpanData> producerSpan2 = new AtomicReference<>();
     testing.waitAndAssertSortedTraces(
-        orderByRootSpanName("parent1", "parent2"),
+        orderByRootSpanName("parent1", topic1 + " receive", "parent2", topic2 + " receive"),
+        trace -> {
+          trace.hasSpansSatisfyingExactly(
+              span -> span.hasName("parent1").hasKind(SpanKind.INTERNAL).hasNoParent(),
+              span ->
+                  span.hasName(topic1 + " publish")
+                      .hasKind(SpanKind.PRODUCER)
+                      .hasParent(trace.getSpan(0))
+                      .hasAttributesSatisfyingExactly(
+                          sendAttributes(topic1, msgId1.toString(), false)));
+
+          producerSpan.set(trace.getSpan(1));
+        },
         trace ->
             trace.hasSpansSatisfyingExactly(
-                span -> span.hasName("parent1").hasKind(SpanKind.INTERNAL).hasNoParent(),
-                span ->
-                    span.hasName(topic1 + " publish")
-                        .hasKind(SpanKind.PRODUCER)
-                        .hasParent(trace.getSpan(0))
-                        .hasAttributesSatisfyingExactly(
-                            sendAttributes(topic1, msgId1.toString(), false)),
                 span ->
                     span.hasName(topic1 + " receive")
                         .hasKind(SpanKind.CONSUMER)
-                        .hasParent(trace.getSpan(1))
+                        .hasNoParent()
+                        .hasLinks(LinkData.create(producerSpan.get().getSpanContext()))
                         .hasAttributesSatisfyingExactly(
                             receiveAttributes(topic1, msgId1.toString(), false)),
                 span ->
                     span.hasName(topic1 + " process")
-                        .hasKind(SpanKind.INTERNAL)
-                        .hasParent(trace.getSpan(2))
+                        .hasKind(SpanKind.CONSUMER)
+                        .hasParent(trace.getSpan(0))
+                        .hasLinks(LinkData.create(producerSpan.get().getSpanContext()))
                         .hasAttributesSatisfyingExactly(
                             processAttributes(topic1, msgId1.toString(), false))),
+        trace -> {
+          trace.hasSpansSatisfyingExactly(
+              span -> span.hasName("parent2").hasKind(SpanKind.INTERNAL).hasNoParent(),
+              span ->
+                  span.hasName(topic2 + " publish")
+                      .hasKind(SpanKind.PRODUCER)
+                      .hasParent(trace.getSpan(0))
+                      .hasAttributesSatisfyingExactly(
+                          sendAttributes(topic2, msgId2.toString(), false)));
+
+          producerSpan2.set(trace.getSpan(1));
+        },
         trace ->
             trace.hasSpansSatisfyingExactly(
-                span -> span.hasName("parent2").hasKind(SpanKind.INTERNAL).hasNoParent(),
-                span ->
-                    span.hasName(topic2 + " publish")
-                        .hasKind(SpanKind.PRODUCER)
-                        .hasParent(trace.getSpan(0))
-                        .hasAttributesSatisfyingExactly(
-                            sendAttributes(topic2, msgId2.toString(), false)),
                 span ->
                     span.hasName(topic2 + " receive")
                         .hasKind(SpanKind.CONSUMER)
-                        .hasParent(trace.getSpan(1))
+                        .hasNoParent()
+                        .hasLinks(LinkData.create(producerSpan2.get().getSpanContext()))
                         .hasAttributesSatisfyingExactly(
                             receiveAttributes(topic2, msgId2.toString(), false)),
                 span ->
                     span.hasName(topic2 + " process")
-                        .hasKind(SpanKind.INTERNAL)
-                        .hasParent(trace.getSpan(2))
+                        .hasKind(SpanKind.CONSUMER)
+                        .hasParent(trace.getSpan(0))
+                        .hasLinks(LinkData.create(producerSpan2.get().getSpanContext()))
                         .hasAttributesSatisfyingExactly(
                             processAttributes(topic2, msgId2.toString(), false))));
   }
-
-  private static List<AttributeAssertion> sendAttributes(
-      String destination, String messageId, boolean testHeaders) {
-    List<AttributeAssertion> assertions =
-        new ArrayList<>(
-            Arrays.asList(
-                equalTo(MessagingIncubatingAttributes.MESSAGING_SYSTEM, "pulsar"),
-                equalTo(ServerAttributes.SERVER_ADDRESS, brokerHost),
-                equalTo(ServerAttributes.SERVER_PORT, brokerPort),
-                equalTo(MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME, destination),
-                equalTo(MessagingIncubatingAttributes.MESSAGING_OPERATION, "publish"),
-                equalTo(MessagingIncubatingAttributes.MESSAGING_MESSAGE_ID, messageId),
-                satisfies(
-                    MessagingIncubatingAttributes.MESSAGING_MESSAGE_BODY_SIZE,
-                    AbstractLongAssert::isNotNegative),
-                equalTo(MESSAGE_TYPE, "normal")));
-    if (testHeaders) {
-      assertions.add(
-          equalTo(
-              AttributeKey.stringArrayKey("messaging.header.test_message_header"),
-              Collections.singletonList("test")));
-    }
-    return assertions;
-  }
-
-  private static List<AttributeAssertion> batchReceiveAttributes(
-      String destination, String messageId, boolean testHeaders) {
-    return receiveAttributes(destination, messageId, testHeaders, true);
-  }
-
-  private static List<AttributeAssertion> receiveAttributes(
-      String destination, String messageId, boolean testHeaders) {
-    return receiveAttributes(destination, messageId, testHeaders, false);
-  }
-
-  private static List<AttributeAssertion> receiveAttributes(
-      String destination, String messageId, boolean testHeaders, boolean isBatch) {
-    List<AttributeAssertion> assertions =
-        new ArrayList<>(
-            Arrays.asList(
-                equalTo(MessagingIncubatingAttributes.MESSAGING_SYSTEM, "pulsar"),
-                equalTo(ServerAttributes.SERVER_ADDRESS, brokerHost),
-                equalTo(ServerAttributes.SERVER_PORT, brokerPort),
-                equalTo(MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME, destination),
-                equalTo(MessagingIncubatingAttributes.MESSAGING_OPERATION, "receive"),
-                equalTo(MessagingIncubatingAttributes.MESSAGING_MESSAGE_ID, messageId),
-                satisfies(
-                    MessagingIncubatingAttributes.MESSAGING_MESSAGE_BODY_SIZE,
-                    AbstractLongAssert::isNotNegative)));
-    if (testHeaders) {
-      assertions.add(
-          equalTo(
-              AttributeKey.stringArrayKey("messaging.header.test_message_header"),
-              Collections.singletonList("test")));
-    }
-    if (isBatch) {
-      assertions.add(
-          satisfies(
-              MessagingIncubatingAttributes.MESSAGING_BATCH_MESSAGE_COUNT,
-              AbstractLongAssert::isPositive));
-    }
-    return assertions;
-  }
-
-  private static List<AttributeAssertion> processAttributes(
-      String destination, String messageId, boolean testHeaders) {
-    List<AttributeAssertion> assertions =
-        new ArrayList<>(
-            Arrays.asList(
-                equalTo(MessagingIncubatingAttributes.MESSAGING_SYSTEM, "pulsar"),
-                equalTo(MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME, destination),
-                equalTo(MessagingIncubatingAttributes.MESSAGING_OPERATION, "process"),
-                equalTo(MessagingIncubatingAttributes.MESSAGING_MESSAGE_ID, messageId),
-                satisfies(
-                    MessagingIncubatingAttributes.MESSAGING_MESSAGE_BODY_SIZE,
-                    AbstractLongAssert::isNotNegative)));
-    if (testHeaders) {
-      assertions.add(
-          equalTo(
-              AttributeKey.stringArrayKey("messaging.header.test_message_header"),
-              Collections.singletonList("test")));
-    }
-    return assertions;
-  }
-
-  private static void acknowledgeMessage(Consumer<String> consumer, Message<String> message) {
-    try {
-      consumer.acknowledge(message);
-    } catch (PulsarClientException exception) {
-      throw new RuntimeException(exception);
-    }
-  }
-
-  private static void acknowledgeMessages(Consumer<String> consumer, Messages<String> messages) {
-    try {
-      consumer.acknowledge(messages);
-    } catch (PulsarClientException exception) {
-      throw new RuntimeException(exception);
-    }
-  }
 }