|
@@ -8,9 +8,11 @@ package io.opentelemetry.instrumentation.rocketmqclient.v4_8;
|
|
|
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo;
|
|
|
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.satisfies;
|
|
|
import static java.util.Collections.singletonList;
|
|
|
+import static org.assertj.core.api.Assertions.assertThat;
|
|
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
|
|
|
|
|
import io.opentelemetry.api.common.AttributeKey;
|
|
|
+import io.opentelemetry.api.trace.SpanContext;
|
|
|
import io.opentelemetry.api.trace.SpanKind;
|
|
|
import io.opentelemetry.instrumentation.rocketmqclient.v4_8.base.BaseConf;
|
|
|
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
|
|
@@ -23,6 +25,8 @@ import java.util.concurrent.CompletableFuture;
|
|
|
import java.util.concurrent.ExecutionException;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
import java.util.concurrent.TimeoutException;
|
|
|
+import java.util.concurrent.atomic.AtomicReference;
|
|
|
+import java.util.function.Consumer;
|
|
|
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
|
|
|
import org.apache.rocketmq.client.exception.MQClientException;
|
|
|
import org.apache.rocketmq.client.producer.DefaultMQProducer;
|
|
@@ -253,106 +257,115 @@ abstract class AbstractRocketMqClientTest {
|
|
|
int maxAttempts = 5;
|
|
|
for (int i = 0; i < maxAttempts; i++) {
|
|
|
tracingMessageListener.reset();
|
|
|
+
|
|
|
testing().runWithSpan("parent", () -> producer.send(msgs));
|
|
|
+
|
|
|
tracingMessageListener.waitForMessages();
|
|
|
if (tracingMessageListener.getLastBatchSize() == 2) {
|
|
|
break;
|
|
|
} else if (i < maxAttempts) {
|
|
|
// if messages weren't received as a batch we get 1 trace instead of 2
|
|
|
testing().waitForTraces(1);
|
|
|
+ Thread.sleep(2_000);
|
|
|
testing().clearData();
|
|
|
logger.error("Messages weren't received as batch, retrying");
|
|
|
}
|
|
|
- testing()
|
|
|
- .waitAndAssertTraces(
|
|
|
- trace ->
|
|
|
- trace.hasSpansSatisfyingExactly(
|
|
|
- span -> span.hasName("parent").hasKind(SpanKind.INTERNAL),
|
|
|
- span ->
|
|
|
- span.hasName(sharedTopic + " publish")
|
|
|
- .hasKind(SpanKind.PRODUCER)
|
|
|
- .hasParent(trace.getSpan(0))
|
|
|
- .hasAttributesSatisfyingExactly(
|
|
|
- equalTo(SemanticAttributes.MESSAGING_SYSTEM, "rocketmq"),
|
|
|
- equalTo(
|
|
|
- SemanticAttributes.MESSAGING_DESTINATION_NAME, sharedTopic),
|
|
|
- equalTo(SemanticAttributes.MESSAGING_OPERATION, "publish"),
|
|
|
- satisfies(
|
|
|
- SemanticAttributes.MESSAGING_MESSAGE_ID,
|
|
|
- val -> val.isInstanceOf(String.class)),
|
|
|
- satisfies(
|
|
|
- AttributeKey.stringKey("messaging.rocketmq.broker_address"),
|
|
|
- val -> val.isInstanceOf(String.class)),
|
|
|
- equalTo(
|
|
|
- AttributeKey.stringKey("messaging.rocketmq.send_result"),
|
|
|
- "SEND_OK"))),
|
|
|
- trace ->
|
|
|
- trace.hasSpansSatisfyingExactly(
|
|
|
- span ->
|
|
|
- span.hasName("multiple_sources receive")
|
|
|
- .hasKind(SpanKind.CONSUMER)
|
|
|
- .hasAttributesSatisfyingExactly(
|
|
|
- equalTo(SemanticAttributes.MESSAGING_SYSTEM, "rocketmq"),
|
|
|
- equalTo(SemanticAttributes.MESSAGING_OPERATION, "receive")),
|
|
|
- span ->
|
|
|
- span.hasName(sharedTopic + " process")
|
|
|
- .hasKind(SpanKind.CONSUMER)
|
|
|
- .hasParent(trace.getSpan(0))
|
|
|
- .hasLinks(LinkData.create(trace.getSpan(1).getSpanContext()))
|
|
|
- .hasAttributesSatisfyingExactly(
|
|
|
- equalTo(SemanticAttributes.MESSAGING_SYSTEM, "rocketmq"),
|
|
|
- equalTo(
|
|
|
- SemanticAttributes.MESSAGING_DESTINATION_NAME, sharedTopic),
|
|
|
- equalTo(SemanticAttributes.MESSAGING_OPERATION, "process"),
|
|
|
- satisfies(
|
|
|
- SemanticAttributes.MESSAGING_MESSAGE_BODY_SIZE,
|
|
|
- val -> val.isInstanceOf(Long.class)),
|
|
|
- satisfies(
|
|
|
- SemanticAttributes.MESSAGING_MESSAGE_ID,
|
|
|
- val -> val.isInstanceOf(String.class)),
|
|
|
- equalTo(
|
|
|
- SemanticAttributes.MESSAGING_ROCKETMQ_MESSAGE_TAG, "TagA"),
|
|
|
- satisfies(
|
|
|
- AttributeKey.stringKey("messaging.rocketmq.broker_address"),
|
|
|
- val -> val.isInstanceOf(Long.class)),
|
|
|
- satisfies(
|
|
|
- AttributeKey.stringKey("messaging.rocketmq.queue_id"),
|
|
|
- val -> val.isInstanceOf(Long.class)),
|
|
|
- satisfies(
|
|
|
- AttributeKey.stringKey("messaging.rocketmq.queue_offset"),
|
|
|
- val -> val.isInstanceOf(Long.class))),
|
|
|
- span ->
|
|
|
- span.hasName(sharedTopic + " process")
|
|
|
- .hasKind(SpanKind.CONSUMER)
|
|
|
- .hasParent(trace.getSpan(0))
|
|
|
- .hasLinks(LinkData.create(trace.getSpan(1).getSpanContext()))
|
|
|
- .hasAttributesSatisfyingExactly(
|
|
|
- equalTo(SemanticAttributes.MESSAGING_SYSTEM, "rocketmq"),
|
|
|
- equalTo(
|
|
|
- SemanticAttributes.MESSAGING_DESTINATION_NAME, sharedTopic),
|
|
|
- equalTo(SemanticAttributes.MESSAGING_OPERATION, "process"),
|
|
|
- satisfies(
|
|
|
- SemanticAttributes.MESSAGING_MESSAGE_BODY_SIZE,
|
|
|
- val -> val.isInstanceOf(Long.class)),
|
|
|
- satisfies(
|
|
|
- SemanticAttributes.MESSAGING_MESSAGE_ID,
|
|
|
- val -> val.isInstanceOf(String.class)),
|
|
|
- equalTo(
|
|
|
- SemanticAttributes.MESSAGING_ROCKETMQ_MESSAGE_TAG, "TagA"),
|
|
|
- satisfies(
|
|
|
- AttributeKey.stringKey("messaging.rocketmq.broker_address"),
|
|
|
- val -> val.isInstanceOf(String.class)),
|
|
|
- satisfies(
|
|
|
- AttributeKey.stringKey("messaging.rocketmq.queue_id"),
|
|
|
- val -> val.isInstanceOf(Long.class)),
|
|
|
- satisfies(
|
|
|
- AttributeKey.stringKey("messaging.rocketmq.queue_offset"),
|
|
|
- val -> val.isInstanceOf(Long.class))),
|
|
|
- span ->
|
|
|
- span.hasName("messageListener")
|
|
|
- .hasParent(trace.getSpan(0))
|
|
|
- .hasKind(SpanKind.INTERNAL)));
|
|
|
}
|
|
|
+
|
|
|
+ AtomicReference<SpanContext> producerSpanContext = new AtomicReference<>();
|
|
|
+ testing()
|
|
|
+ .waitAndAssertTraces(
|
|
|
+ trace -> {
|
|
|
+ trace.hasSpansSatisfyingExactly(
|
|
|
+ span -> span.hasName("parent").hasKind(SpanKind.INTERNAL),
|
|
|
+ span ->
|
|
|
+ span.hasName(sharedTopic + " publish")
|
|
|
+ .hasKind(SpanKind.PRODUCER)
|
|
|
+ .hasParent(trace.getSpan(0))
|
|
|
+ .hasAttributesSatisfyingExactly(
|
|
|
+ equalTo(SemanticAttributes.MESSAGING_SYSTEM, "rocketmq"),
|
|
|
+ equalTo(SemanticAttributes.MESSAGING_DESTINATION_NAME, sharedTopic),
|
|
|
+ equalTo(SemanticAttributes.MESSAGING_OPERATION, "publish"),
|
|
|
+ satisfies(
|
|
|
+ SemanticAttributes.MESSAGING_MESSAGE_ID,
|
|
|
+ val -> val.isInstanceOf(String.class)),
|
|
|
+ satisfies(
|
|
|
+ AttributeKey.stringKey("messaging.rocketmq.broker_address"),
|
|
|
+ val -> val.isInstanceOf(String.class)),
|
|
|
+ equalTo(
|
|
|
+ AttributeKey.stringKey("messaging.rocketmq.send_result"),
|
|
|
+ "SEND_OK")));
|
|
|
+
|
|
|
+ SpanContext spanContext = trace.getSpan(1).getSpanContext();
|
|
|
+ producerSpanContext.set(
|
|
|
+ SpanContext.createFromRemoteParent(
|
|
|
+ spanContext.getTraceId(),
|
|
|
+ spanContext.getSpanId(),
|
|
|
+ spanContext.getTraceFlags(),
|
|
|
+ spanContext.getTraceState()));
|
|
|
+ },
|
|
|
+ trace ->
|
|
|
+ trace.hasSpansSatisfyingExactly(
|
|
|
+ span ->
|
|
|
+ span.hasName("multiple_sources receive")
|
|
|
+ .hasKind(SpanKind.CONSUMER)
|
|
|
+ .hasAttributesSatisfyingExactly(
|
|
|
+ equalTo(SemanticAttributes.MESSAGING_SYSTEM, "rocketmq"),
|
|
|
+ equalTo(SemanticAttributes.MESSAGING_OPERATION, "receive")),
|
|
|
+ span ->
|
|
|
+ span.hasName(sharedTopic + " process")
|
|
|
+ .hasKind(SpanKind.CONSUMER)
|
|
|
+ .hasParent(trace.getSpan(0))
|
|
|
+ .hasLinksSatisfying(links(producerSpanContext.get()))
|
|
|
+ .hasAttributesSatisfyingExactly(
|
|
|
+ equalTo(SemanticAttributes.MESSAGING_SYSTEM, "rocketmq"),
|
|
|
+ equalTo(SemanticAttributes.MESSAGING_DESTINATION_NAME, sharedTopic),
|
|
|
+ equalTo(SemanticAttributes.MESSAGING_OPERATION, "process"),
|
|
|
+ satisfies(
|
|
|
+ SemanticAttributes.MESSAGING_MESSAGE_BODY_SIZE,
|
|
|
+ val -> val.isInstanceOf(Long.class)),
|
|
|
+ satisfies(
|
|
|
+ SemanticAttributes.MESSAGING_MESSAGE_ID,
|
|
|
+ val -> val.isInstanceOf(String.class)),
|
|
|
+ equalTo(SemanticAttributes.MESSAGING_ROCKETMQ_MESSAGE_TAG, "TagA"),
|
|
|
+ satisfies(
|
|
|
+ AttributeKey.stringKey("messaging.rocketmq.broker_address"),
|
|
|
+ val -> val.isNotEmpty()),
|
|
|
+ satisfies(
|
|
|
+ AttributeKey.longKey("messaging.rocketmq.queue_id"),
|
|
|
+ val -> val.isNotNull()),
|
|
|
+ satisfies(
|
|
|
+ AttributeKey.longKey("messaging.rocketmq.queue_offset"),
|
|
|
+ val -> val.isNotNull())),
|
|
|
+ span ->
|
|
|
+ span.hasName(sharedTopic + " process")
|
|
|
+ .hasKind(SpanKind.CONSUMER)
|
|
|
+ .hasParent(trace.getSpan(0))
|
|
|
+ .hasLinksSatisfying(links(producerSpanContext.get()))
|
|
|
+ .hasAttributesSatisfyingExactly(
|
|
|
+ equalTo(SemanticAttributes.MESSAGING_SYSTEM, "rocketmq"),
|
|
|
+ equalTo(SemanticAttributes.MESSAGING_DESTINATION_NAME, sharedTopic),
|
|
|
+ equalTo(SemanticAttributes.MESSAGING_OPERATION, "process"),
|
|
|
+ satisfies(
|
|
|
+ SemanticAttributes.MESSAGING_MESSAGE_BODY_SIZE,
|
|
|
+ val -> val.isInstanceOf(Long.class)),
|
|
|
+ satisfies(
|
|
|
+ SemanticAttributes.MESSAGING_MESSAGE_ID,
|
|
|
+ val -> val.isInstanceOf(String.class)),
|
|
|
+ equalTo(SemanticAttributes.MESSAGING_ROCKETMQ_MESSAGE_TAG, "TagB"),
|
|
|
+ satisfies(
|
|
|
+ AttributeKey.stringKey("messaging.rocketmq.broker_address"),
|
|
|
+ val -> val.isNotEmpty()),
|
|
|
+ satisfies(
|
|
|
+ AttributeKey.longKey("messaging.rocketmq.queue_id"),
|
|
|
+ val -> val.isNotNull()),
|
|
|
+ satisfies(
|
|
|
+ AttributeKey.longKey("messaging.rocketmq.queue_offset"),
|
|
|
+ val -> val.isNotNull())),
|
|
|
+ span ->
|
|
|
+ span.hasName("messageListener")
|
|
|
+ .hasParent(trace.getSpan(0))
|
|
|
+ .hasKind(SpanKind.INTERNAL)));
|
|
|
}
|
|
|
|
|
|
@Test
|
|
@@ -433,4 +446,23 @@ abstract class AbstractRocketMqClientTest {
|
|
|
.hasParent(trace.getSpan(2))
|
|
|
.hasKind(SpanKind.INTERNAL)));
|
|
|
}
|
|
|
+
|
|
|
+ private static Consumer<List<? extends LinkData>> links(SpanContext... spanContexts) {
|
|
|
+ return links -> {
|
|
|
+ assertThat(links).hasSize(spanContexts.length);
|
|
|
+ for (SpanContext spanContext : spanContexts) {
|
|
|
+ assertThat(links)
|
|
|
+ .anySatisfy(
|
|
|
+ link -> {
|
|
|
+ assertThat(link.getSpanContext().getTraceId())
|
|
|
+ .isEqualTo(spanContext.getTraceId());
|
|
|
+ assertThat(link.getSpanContext().getSpanId()).isEqualTo(spanContext.getSpanId());
|
|
|
+ assertThat(link.getSpanContext().getTraceFlags())
|
|
|
+ .isEqualTo(spanContext.getTraceFlags());
|
|
|
+ assertThat(link.getSpanContext().getTraceState())
|
|
|
+ .isEqualTo(spanContext.getTraceState());
|
|
|
+ });
|
|
|
+ }
|
|
|
+ };
|
|
|
+ }
|
|
|
}
|