Browse Source

Rocketmq 5: set context for async callback (#7238)

Run callbacks added to the `CompletableFuture` returned from `sendAsync`
with the context that was used when `sendAsync` was called.
Add test for capturing message headers.
Lauri Tulmin 2 years ago
parent
commit
910d177e6c

+ 32 - 0
instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/CompletableFutureWrapper.java

@@ -0,0 +1,32 @@
+/*
+ * Copyright The OpenTelemetry Authors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package io.opentelemetry.javaagent.instrumentation.rocketmqclient.v5_0;
+
+import io.opentelemetry.context.Context;
+import io.opentelemetry.context.Scope;
+import java.util.concurrent.CompletableFuture;
+
+public final class CompletableFutureWrapper {
+
+  private CompletableFutureWrapper() {}
+
+  public static <T> CompletableFuture<T> wrap(CompletableFuture<T> future) {
+    CompletableFuture<T> result = new CompletableFuture<>();
+    Context context = Context.current();
+    future.whenComplete(
+        (T value, Throwable throwable) -> {
+          try (Scope ignored = context.makeCurrent()) {
+            if (throwable != null) {
+              result.completeExceptionally(throwable);
+            } else {
+              result.complete(value);
+            }
+          }
+        });
+
+    return result;
+  }
+}

+ 23 - 2
instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/ProducerImplInstrumentation.java

@@ -5,6 +5,7 @@
 
 package io.opentelemetry.javaagent.instrumentation.rocketmqclient.v5_0;
 
+import static io.opentelemetry.javaagent.instrumentation.rocketmqclient.v5_0.RocketMqSingletons.producerInstrumenter;
 import static net.bytebuddy.matcher.ElementMatchers.isMethod;
 import static net.bytebuddy.matcher.ElementMatchers.isPrivate;
 import static net.bytebuddy.matcher.ElementMatchers.named;
@@ -17,9 +18,11 @@ import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
 import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
 import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
 import net.bytebuddy.asm.Advice;
 import net.bytebuddy.description.type.TypeDescription;
 import net.bytebuddy.matcher.ElementMatcher;
+import org.apache.rocketmq.client.apis.producer.SendReceipt;
 import org.apache.rocketmq.client.java.impl.producer.SendReceiptImpl;
 import org.apache.rocketmq.client.java.message.PublishingMessageImpl;
 import org.apache.rocketmq.shaded.com.google.common.util.concurrent.Futures;
@@ -52,6 +55,13 @@ final class ProducerImplInstrumentation implements TypeInstrumentation {
             .and(takesArgument(4, List.class))
             .and(takesArgument(5, int.class)),
         ProducerImplInstrumentation.class.getName() + "$SendAdvice");
+
+    transformer.applyAdviceToMethod(
+        isMethod()
+            .and(named("sendAsync"))
+            .and(takesArguments(1))
+            .and(takesArgument(0, named("org.apache.rocketmq.client.apis.message.Message"))),
+        ProducerImplInstrumentation.class.getName() + "$SendAsyncAdvice");
   }
 
   @SuppressWarnings("unused")
@@ -60,8 +70,7 @@ final class ProducerImplInstrumentation implements TypeInstrumentation {
     public static void onEnter(
         @Advice.Argument(0) SettableFuture<List<SendReceiptImpl>> future0,
         @Advice.Argument(4) List<PublishingMessageImpl> messages) {
-      Instrumenter<PublishingMessageImpl, SendReceiptImpl> instrumenter =
-          RocketMqSingletons.producerInstrumenter();
+      Instrumenter<PublishingMessageImpl, SendReceiptImpl> instrumenter = producerInstrumenter();
       int count = messages.size();
       List<SettableFuture<SendReceiptImpl>> futures = FutureConverter.convert(future0, count);
       for (int i = 0; i < count; i++) {
@@ -90,4 +99,16 @@ final class ProducerImplInstrumentation implements TypeInstrumentation {
       }
     }
   }
+
+  @SuppressWarnings("unused")
+  public static class SendAsyncAdvice {
+    @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
+    public static void onExit(
+        @Advice.Return(readOnly = false) CompletableFuture<SendReceipt> future,
+        @Advice.Thrown Throwable throwable) {
+      if (throwable == null) {
+        future = CompletableFutureWrapper.wrap(future);
+      }
+    }
+  }
 }

+ 292 - 105
instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/testing/src/main/java/io/opentelemetry/instrumentation/rocketmqclient/v5_0/AbstractRocketMqClientTest.java

@@ -5,6 +5,7 @@
 
 package io.opentelemetry.instrumentation.rocketmqclient.v5_0;
 
+import static io.opentelemetry.instrumentation.testing.util.TelemetryDataUtil.orderByRootSpanKind;
 import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo;
 import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_DESTINATION;
 import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_DESTINATION_KIND;
@@ -18,18 +19,25 @@ import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSA
 import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_SYSTEM;
 import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MessagingRocketmqMessageTypeValues.NORMAL;
 
+import io.opentelemetry.api.common.AttributeKey;
 import io.opentelemetry.api.trace.SpanKind;
 import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
 import io.opentelemetry.instrumentation.testing.util.ThrowingSupplier;
+import io.opentelemetry.sdk.testing.assertj.AttributeAssertion;
+import io.opentelemetry.sdk.testing.assertj.SpanDataAssert;
 import io.opentelemetry.sdk.trace.data.LinkData;
 import io.opentelemetry.sdk.trace.data.SpanData;
 import io.opentelemetry.sdk.trace.data.StatusData;
 import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
+import java.io.IOException;
 import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.List;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.rocketmq.client.apis.ClientConfiguration;
+import org.apache.rocketmq.client.apis.ClientException;
 import org.apache.rocketmq.client.apis.ClientServiceProvider;
 import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
 import org.apache.rocketmq.client.apis.consumer.FilterExpression;
@@ -38,37 +46,37 @@ import org.apache.rocketmq.client.apis.consumer.PushConsumer;
 import org.apache.rocketmq.client.apis.message.Message;
 import org.apache.rocketmq.client.apis.producer.Producer;
 import org.apache.rocketmq.client.apis.producer.SendReceipt;
+import org.apache.rocketmq.client.java.impl.ClientImpl;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
 
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
 public abstract class AbstractRocketMqClientTest {
 
+  // Inner topic of the container.
+  private static final String topic = "normal-topic-0";
+  private static final String tag = "tagA";
+  private static final String consumerGroup = "group-normal-topic-0";
+
   private static final RocketMqProxyContainer container = new RocketMqProxyContainer();
+  private final ClientServiceProvider provider = ClientServiceProvider.loadService();
+  private PushConsumer consumer;
+  private Producer producer;
 
   protected abstract InstrumentationExtension testing();
 
   @BeforeAll
-  static void setUp() {
+  void setUp() throws ClientException {
     container.start();
-  }
-
-  @AfterAll
-  static void tearDown() {
-    container.close();
-  }
-
-  @Test
-  void testSendAndConsumeMessage() throws Throwable {
     ClientConfiguration clientConfiguration =
         ClientConfiguration.newBuilder().setEndpoints(container.endpoints).build();
     // Inner topic of the container.
-    String topic = "normal-topic-0";
     ClientServiceProvider provider = ClientServiceProvider.loadService();
     String consumerGroup = "group-normal-topic-0";
-    String tag = "tagA";
     FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
-    try (PushConsumer ignored =
+    consumer =
         provider
             .newPushConsumerBuilder()
             .setClientConfiguration(clientConfiguration)
@@ -76,100 +84,279 @@ public abstract class AbstractRocketMqClientTest {
             .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
             .setMessageListener(
                 messageView -> {
-                  testing().runWithSpan("child", () -> {});
+                  testing().runWithSpan("messageListener", () -> {});
                   return ConsumeResult.SUCCESS;
                 })
-            .build()) {
-      try (Producer producer =
-          provider
-              .newProducerBuilder()
-              .setClientConfiguration(clientConfiguration)
-              .setTopics(topic)
-              .build()) {
-
-        String[] keys = new String[] {"yourMessageKey-0", "yourMessageKey-1"};
-        byte[] body = "foobar".getBytes(StandardCharsets.UTF_8);
-        Message message =
-            provider
-                .newMessageBuilder()
-                .setTopic(topic)
-                .setTag(tag)
-                .setKeys(keys)
-                .setBody(body)
-                .build();
-
-        SendReceipt sendReceipt =
-            testing()
-                .runWithSpan(
-                    "parent",
-                    (ThrowingSupplier<SendReceipt, Throwable>) () -> producer.send(message));
-        AtomicReference<SpanData> sendSpanData = new AtomicReference<>();
-        testing()
-            .waitAndAssertTraces(
-                trace -> {
-                  trace.hasSpansSatisfyingExactly(
-                      span -> span.hasName("parent").hasKind(SpanKind.INTERNAL).hasNoParent(),
-                      span ->
-                          span.hasKind(SpanKind.PRODUCER)
-                              .hasName(topic + " send")
-                              .hasStatus(StatusData.unset())
-                              .hasParent(trace.getSpan(0))
-                              .hasAttributesSatisfyingExactly(
-                                  equalTo(MESSAGING_ROCKETMQ_MESSAGE_TAG, tag),
-                                  equalTo(MESSAGING_ROCKETMQ_MESSAGE_KEYS, Arrays.asList(keys)),
-                                  equalTo(MESSAGING_ROCKETMQ_MESSAGE_TYPE, NORMAL),
-                                  equalTo(MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES, (long) body.length),
-                                  equalTo(MESSAGING_SYSTEM, "rocketmq"),
-                                  equalTo(
-                                      MESSAGING_MESSAGE_ID, sendReceipt.getMessageId().toString()),
-                                  equalTo(
-                                      MESSAGING_DESTINATION_KIND,
-                                      SemanticAttributes.MessagingDestinationKindValues.TOPIC),
-                                  equalTo(MESSAGING_DESTINATION, topic)));
-                  sendSpanData.set(trace.getSpan(1));
-                },
-                trace ->
-                    trace.hasSpansSatisfyingExactly(
-                        span ->
-                            span.hasKind(SpanKind.CONSUMER)
-                                .hasName(topic + " receive")
-                                .hasStatus(StatusData.unset())
-                                .hasAttributesSatisfyingExactly(
-                                    equalTo(MESSAGING_ROCKETMQ_CLIENT_GROUP, consumerGroup),
-                                    equalTo(MESSAGING_SYSTEM, "rocketmq"),
-                                    equalTo(
-                                        MESSAGING_DESTINATION_KIND,
-                                        SemanticAttributes.MessagingDestinationKindValues.TOPIC),
-                                    equalTo(MESSAGING_DESTINATION, topic),
-                                    equalTo(MESSAGING_OPERATION, "receive")),
-                        span ->
-                            span.hasKind(SpanKind.CONSUMER)
-                                .hasName(topic + " process")
-                                .hasStatus(StatusData.unset())
-                                // Link to send span.
-                                .hasLinks(LinkData.create(sendSpanData.get().getSpanContext()))
-                                // As the child of receive span.
-                                .hasParent(trace.getSpan(0))
-                                .hasAttributesSatisfyingExactly(
-                                    equalTo(MESSAGING_ROCKETMQ_CLIENT_GROUP, consumerGroup),
-                                    equalTo(MESSAGING_ROCKETMQ_MESSAGE_TAG, tag),
-                                    equalTo(MESSAGING_ROCKETMQ_MESSAGE_KEYS, Arrays.asList(keys)),
-                                    equalTo(
-                                        MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES, (long) body.length),
-                                    equalTo(MESSAGING_SYSTEM, "rocketmq"),
-                                    equalTo(
-                                        MESSAGING_MESSAGE_ID,
-                                        sendReceipt.getMessageId().toString()),
-                                    equalTo(
-                                        MESSAGING_DESTINATION_KIND,
-                                        SemanticAttributes.MessagingDestinationKindValues.TOPIC),
-                                    equalTo(MESSAGING_DESTINATION, topic),
-                                    equalTo(MESSAGING_OPERATION, "process")),
-                        span ->
-                            span.hasName("child")
-                                .hasKind(SpanKind.INTERNAL)
-                                .hasParent(trace.getSpan(1))));
-      }
+            .build();
+    producer =
+        provider
+            .newProducerBuilder()
+            .setClientConfiguration(clientConfiguration)
+            .setTopics(topic)
+            .build();
+  }
+
+  @AfterAll
+  void tearDown() throws IOException {
+    if (producer != null) {
+      producer.close();
+    }
+    if (consumer != null) {
+      // Not calling consumer.close(); because it takes a lot of time to complete
+      ((ClientImpl) consumer).stopAsync();
     }
+    container.close();
+  }
+
+  @Test
+  void testSendAndConsumeMessage() throws Throwable {
+    String[] keys = new String[] {"yourMessageKey-0", "yourMessageKey-1"};
+    byte[] body = "foobar".getBytes(StandardCharsets.UTF_8);
+    Message message =
+        provider
+            .newMessageBuilder()
+            .setTopic(topic)
+            .setTag(tag)
+            .setKeys(keys)
+            .setBody(body)
+            .build();
+
+    SendReceipt sendReceipt =
+        testing()
+            .runWithSpan(
+                "parent", (ThrowingSupplier<SendReceipt, Throwable>) () -> producer.send(message));
+    AtomicReference<SpanData> sendSpanData = new AtomicReference<>();
+    testing()
+        .waitAndAssertSortedTraces(
+            orderByRootSpanKind(SpanKind.INTERNAL, SpanKind.CONSUMER),
+            trace -> {
+              trace.hasSpansSatisfyingExactly(
+                  span -> span.hasName("parent").hasKind(SpanKind.INTERNAL).hasNoParent(),
+                  span ->
+                      assertProducerSpan(span, topic, tag, keys, body, sendReceipt)
+                          .hasParent(trace.getSpan(0)));
+              sendSpanData.set(trace.getSpan(1));
+            },
+            trace ->
+                trace.hasSpansSatisfyingExactly(
+                    span -> assertReceiveSpan(span, topic, consumerGroup),
+                    span ->
+                        assertProcessSpan(
+                                span,
+                                sendSpanData.get(),
+                                topic,
+                                consumerGroup,
+                                tag,
+                                keys,
+                                body,
+                                sendReceipt)
+                            // As the child of receive span.
+                            .hasParent(trace.getSpan(0)),
+                    span ->
+                        span.hasName("messageListener")
+                            .hasKind(SpanKind.INTERNAL)
+                            .hasParent(trace.getSpan(1))));
+  }
+
+  @Test
+  public void testSendAsyncMessage() throws Exception {
+    String[] keys = new String[] {"yourMessageKey-0", "yourMessageKey-1"};
+    byte[] body = "foobar".getBytes(StandardCharsets.UTF_8);
+    Message message =
+        provider
+            .newMessageBuilder()
+            .setTopic(topic)
+            .setTag(tag)
+            .setKeys(keys)
+            .setBody(body)
+            .build();
+
+    SendReceipt sendReceipt =
+        testing()
+            .runWithSpan(
+                "parent",
+                () ->
+                    producer
+                        .sendAsync(message)
+                        .whenComplete(
+                            (result, throwable) -> {
+                              testing().runWithSpan("child", () -> {});
+                            })
+                        .get());
+    AtomicReference<SpanData> sendSpanData = new AtomicReference<>();
+    testing()
+        .waitAndAssertSortedTraces(
+            orderByRootSpanKind(SpanKind.INTERNAL, SpanKind.CONSUMER),
+            trace -> {
+              trace.hasSpansSatisfyingExactly(
+                  span -> span.hasName("parent"),
+                  span ->
+                      assertProducerSpan(span, topic, tag, keys, body, sendReceipt)
+                          .hasParent(trace.getSpan(0)),
+                  span -> span.hasName("child"));
+              sendSpanData.set(trace.getSpan(1));
+            },
+            trace ->
+                trace.hasSpansSatisfyingExactly(
+                    span -> assertReceiveSpan(span, topic, consumerGroup),
+                    span ->
+                        assertProcessSpan(
+                                span,
+                                sendSpanData.get(),
+                                topic,
+                                consumerGroup,
+                                tag,
+                                keys,
+                                body,
+                                sendReceipt)
+                            // As the child of receive span.
+                            .hasParent(trace.getSpan(0)),
+                    span ->
+                        span.hasName("messageListener")
+                            .hasKind(SpanKind.INTERNAL)
+                            .hasParent(trace.getSpan(1))));
+  }
+
+  @Test
+  public void testCapturedMessageHeaders() throws Throwable {
+    String[] keys = new String[] {"yourMessageKey-0", "yourMessageKey-1"};
+    byte[] body = "foobar".getBytes(StandardCharsets.UTF_8);
+    Message message =
+        provider
+            .newMessageBuilder()
+            .setTopic(topic)
+            .setTag(tag)
+            .setKeys(keys)
+            .setBody(body)
+            .addProperty("test-message-header", "test")
+            .build();
+
+    SendReceipt sendReceipt =
+        testing()
+            .runWithSpan(
+                "parent", (ThrowingSupplier<SendReceipt, Throwable>) () -> producer.send(message));
+    AtomicReference<SpanData> sendSpanData = new AtomicReference<>();
+    testing()
+        .waitAndAssertSortedTraces(
+            orderByRootSpanKind(SpanKind.INTERNAL, SpanKind.CONSUMER),
+            trace -> {
+              trace.hasSpansSatisfyingExactly(
+                  span -> span.hasName("parent").hasKind(SpanKind.INTERNAL).hasNoParent(),
+                  span ->
+                      assertProducerSpan(
+                              span,
+                              topic,
+                              tag,
+                              keys,
+                              body,
+                              sendReceipt,
+                              equalTo(
+                                  AttributeKey.stringArrayKey(
+                                      "messaging.header.test_message_header"),
+                                  Arrays.asList(new String[] {"test"})))
+                          .hasParent(trace.getSpan(0)));
+              sendSpanData.set(trace.getSpan(1));
+            },
+            trace ->
+                trace.hasSpansSatisfyingExactly(
+                    span -> assertReceiveSpan(span, topic, consumerGroup),
+                    span ->
+                        assertProcessSpan(
+                                span,
+                                sendSpanData.get(),
+                                topic,
+                                consumerGroup,
+                                tag,
+                                keys,
+                                body,
+                                sendReceipt,
+                                equalTo(
+                                    AttributeKey.stringArrayKey(
+                                        "messaging.header.test_message_header"),
+                                    Arrays.asList(new String[] {"test"})))
+                            // As the child of receive span.
+                            .hasParent(trace.getSpan(0)),
+                    span ->
+                        span.hasName("messageListener")
+                            .hasKind(SpanKind.INTERNAL)
+                            .hasParent(trace.getSpan(1))));
+  }
+
+  private static SpanDataAssert assertProducerSpan(
+      SpanDataAssert span,
+      String topic,
+      String tag,
+      String[] keys,
+      byte[] body,
+      SendReceipt sendReceipt,
+      AttributeAssertion... extraAttributes) {
+    List<AttributeAssertion> attributeAssertions =
+        new ArrayList<>(
+            Arrays.asList(
+                equalTo(MESSAGING_ROCKETMQ_MESSAGE_TAG, tag),
+                equalTo(MESSAGING_ROCKETMQ_MESSAGE_KEYS, Arrays.asList(keys)),
+                equalTo(MESSAGING_ROCKETMQ_MESSAGE_TYPE, NORMAL),
+                equalTo(MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES, (long) body.length),
+                equalTo(MESSAGING_SYSTEM, "rocketmq"),
+                equalTo(MESSAGING_MESSAGE_ID, sendReceipt.getMessageId().toString()),
+                equalTo(
+                    MESSAGING_DESTINATION_KIND,
+                    SemanticAttributes.MessagingDestinationKindValues.TOPIC),
+                equalTo(MESSAGING_DESTINATION, topic)));
+    attributeAssertions.addAll(Arrays.asList(extraAttributes));
+
+    return span.hasKind(SpanKind.PRODUCER)
+        .hasName(topic + " send")
+        .hasStatus(StatusData.unset())
+        .hasAttributesSatisfyingExactly(attributeAssertions);
+  }
+
+  private static SpanDataAssert assertReceiveSpan(
+      SpanDataAssert span, String topic, String consumerGroup) {
+    return span.hasKind(SpanKind.CONSUMER)
+        .hasName(topic + " receive")
+        .hasStatus(StatusData.unset())
+        .hasAttributesSatisfyingExactly(
+            equalTo(MESSAGING_ROCKETMQ_CLIENT_GROUP, consumerGroup),
+            equalTo(MESSAGING_SYSTEM, "rocketmq"),
+            equalTo(
+                MESSAGING_DESTINATION_KIND,
+                SemanticAttributes.MessagingDestinationKindValues.TOPIC),
+            equalTo(MESSAGING_DESTINATION, topic),
+            equalTo(MESSAGING_OPERATION, "receive"));
+  }
+
+  private static SpanDataAssert assertProcessSpan(
+      SpanDataAssert span,
+      SpanData linkedSpan,
+      String topic,
+      String consumerGroup,
+      String tag,
+      String[] keys,
+      byte[] body,
+      SendReceipt sendReceipt,
+      AttributeAssertion... extraAttributes) {
+    List<AttributeAssertion> attributeAssertions =
+        new ArrayList<>(
+            Arrays.asList(
+                equalTo(MESSAGING_ROCKETMQ_CLIENT_GROUP, consumerGroup),
+                equalTo(MESSAGING_ROCKETMQ_MESSAGE_TAG, tag),
+                equalTo(MESSAGING_ROCKETMQ_MESSAGE_KEYS, Arrays.asList(keys)),
+                equalTo(MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES, (long) body.length),
+                equalTo(MESSAGING_SYSTEM, "rocketmq"),
+                equalTo(MESSAGING_MESSAGE_ID, sendReceipt.getMessageId().toString()),
+                equalTo(
+                    MESSAGING_DESTINATION_KIND,
+                    SemanticAttributes.MessagingDestinationKindValues.TOPIC),
+                equalTo(MESSAGING_DESTINATION, topic),
+                equalTo(MESSAGING_OPERATION, "process")));
+    attributeAssertions.addAll(Arrays.asList(extraAttributes));
+
+    return span.hasKind(SpanKind.CONSUMER)
+        .hasName(topic + " process")
+        .hasStatus(StatusData.unset())
+        // Link to send span.
+        .hasLinks(LinkData.create(linkedSpan.getSpanContext()))
+        .hasAttributesSatisfyingExactly(attributeAssertions);
   }
 }