|
@@ -6,11 +6,8 @@
|
|
|
package io.opentelemetry.javaagent.instrumentation.kafkaclients.v0_11;
|
|
|
|
|
|
import static io.opentelemetry.instrumentation.testing.util.TelemetryDataUtil.orderByRootSpanKind;
|
|
|
-import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo;
|
|
|
-import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.satisfies;
|
|
|
import static org.assertj.core.api.Assertions.assertThat;
|
|
|
|
|
|
-import io.opentelemetry.api.common.AttributeKey;
|
|
|
import io.opentelemetry.api.trace.SpanKind;
|
|
|
import io.opentelemetry.instrumentation.kafka.internal.KafkaClientBaseTest;
|
|
|
import io.opentelemetry.instrumentation.kafka.internal.KafkaClientPropagationBaseTest;
|
|
@@ -18,10 +15,8 @@ import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtens
|
|
|
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
|
|
|
import io.opentelemetry.sdk.trace.data.LinkData;
|
|
|
import io.opentelemetry.sdk.trace.data.SpanData;
|
|
|
-import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
|
|
|
import java.nio.charset.StandardCharsets;
|
|
|
import java.time.Duration;
|
|
|
-import java.util.Collections;
|
|
|
import java.util.List;
|
|
|
import java.util.concurrent.ExecutionException;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
@@ -30,7 +25,6 @@ import java.util.concurrent.atomic.AtomicReference;
|
|
|
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
|
|
import org.apache.kafka.clients.consumer.ConsumerRecords;
|
|
|
import org.apache.kafka.clients.producer.ProducerRecord;
|
|
|
-import org.assertj.core.api.AbstractLongAssert;
|
|
|
import org.junit.jupiter.api.DisplayName;
|
|
|
import org.junit.jupiter.api.Test;
|
|
|
import org.junit.jupiter.api.extension.RegisterExtension;
|
|
@@ -51,7 +45,7 @@ class KafkaClientDefaultTest extends KafkaClientPropagationBaseTest {
|
|
|
"parent",
|
|
|
() -> {
|
|
|
ProducerRecord<Integer, String> producerRecord =
|
|
|
- new ProducerRecord<>(SHARED_TOPIC, greeting);
|
|
|
+ new ProducerRecord<>(SHARED_TOPIC, 10, greeting);
|
|
|
if (testHeaders) {
|
|
|
producerRecord
|
|
|
.headers()
|
|
@@ -80,8 +74,8 @@ class KafkaClientDefaultTest extends KafkaClientPropagationBaseTest {
|
|
|
testing.runWithSpan(
|
|
|
"processing",
|
|
|
() -> {
|
|
|
+ assertThat(record.key()).isEqualTo(10);
|
|
|
assertThat(record.value()).isEqualTo(greeting);
|
|
|
- assertThat(record.key()).isNull();
|
|
|
});
|
|
|
}
|
|
|
AtomicReference<SpanData> producerSpan = new AtomicReference<>();
|
|
@@ -89,85 +83,32 @@ class KafkaClientDefaultTest extends KafkaClientPropagationBaseTest {
|
|
|
orderByRootSpanKind(SpanKind.INTERNAL, SpanKind.CONSUMER),
|
|
|
trace -> {
|
|
|
trace.hasSpansSatisfyingExactly(
|
|
|
- span -> {
|
|
|
- span.hasName("parent").hasKind(SpanKind.INTERNAL).hasNoParent();
|
|
|
- },
|
|
|
- span -> {
|
|
|
- span.hasName(SHARED_TOPIC + " send")
|
|
|
- .hasKind(SpanKind.PRODUCER)
|
|
|
- .hasParent(trace.getSpan(0))
|
|
|
- .hasAttributesSatisfying(
|
|
|
- equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"),
|
|
|
- equalTo(SemanticAttributes.MESSAGING_DESTINATION_NAME, SHARED_TOPIC),
|
|
|
- equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic"),
|
|
|
- satisfies(
|
|
|
- SemanticAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION,
|
|
|
- AbstractLongAssert::isNotNegative),
|
|
|
- satisfies(
|
|
|
- AttributeKey.longKey("messaging.kafka.message.offset"),
|
|
|
- AbstractLongAssert::isNotNegative));
|
|
|
- if (testHeaders) {
|
|
|
- span.hasAttributesSatisfying(
|
|
|
- equalTo(
|
|
|
- AttributeKey.stringArrayKey("messaging.header.test_message_header"),
|
|
|
- Collections.singletonList("test")));
|
|
|
- }
|
|
|
- },
|
|
|
- span -> {
|
|
|
- span.hasName("producer callback")
|
|
|
- .hasKind(SpanKind.INTERNAL)
|
|
|
- .hasParent(trace.getSpan(0));
|
|
|
- });
|
|
|
+ span -> span.hasName("parent").hasKind(SpanKind.INTERNAL).hasNoParent(),
|
|
|
+ span ->
|
|
|
+ span.hasName(SHARED_TOPIC + " send")
|
|
|
+ .hasKind(SpanKind.PRODUCER)
|
|
|
+ .hasParent(trace.getSpan(0))
|
|
|
+ .hasAttributesSatisfyingExactly(sendAttributes("10", greeting, testHeaders)),
|
|
|
+ span ->
|
|
|
+ span.hasName("producer callback")
|
|
|
+ .hasKind(SpanKind.INTERNAL)
|
|
|
+ .hasParent(trace.getSpan(0)));
|
|
|
producerSpan.set(trace.getSpan(1));
|
|
|
},
|
|
|
trace ->
|
|
|
trace.hasSpansSatisfyingExactly(
|
|
|
- span -> {
|
|
|
- span.hasName(SHARED_TOPIC + " receive")
|
|
|
- .hasKind(SpanKind.CONSUMER)
|
|
|
- .hasNoParent()
|
|
|
- .hasAttributesSatisfying(
|
|
|
- equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"),
|
|
|
- equalTo(SemanticAttributes.MESSAGING_DESTINATION_NAME, SHARED_TOPIC),
|
|
|
- equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic"),
|
|
|
- equalTo(SemanticAttributes.MESSAGING_OPERATION, "receive"));
|
|
|
- if (testHeaders) {
|
|
|
- span.hasAttributesSatisfying(
|
|
|
- equalTo(
|
|
|
- AttributeKey.stringArrayKey("messaging.header.test_message_header"),
|
|
|
- Collections.singletonList("test")));
|
|
|
- }
|
|
|
- },
|
|
|
- span -> {
|
|
|
- span.hasName(SHARED_TOPIC + " process")
|
|
|
- .hasKind(SpanKind.CONSUMER)
|
|
|
- .hasLinks(LinkData.create(producerSpan.get().getSpanContext()))
|
|
|
- .hasParent(trace.getSpan(0))
|
|
|
- .hasAttributesSatisfying(
|
|
|
- equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"),
|
|
|
- equalTo(SemanticAttributes.MESSAGING_DESTINATION_NAME, SHARED_TOPIC),
|
|
|
- equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic"),
|
|
|
- equalTo(SemanticAttributes.MESSAGING_OPERATION, "process"),
|
|
|
- equalTo(
|
|
|
- SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES,
|
|
|
- greeting.getBytes(StandardCharsets.UTF_8).length),
|
|
|
- satisfies(
|
|
|
- SemanticAttributes.MESSAGING_KAFKA_SOURCE_PARTITION,
|
|
|
- AbstractLongAssert::isNotNegative),
|
|
|
- satisfies(
|
|
|
- AttributeKey.longKey("messaging.kafka.message.offset"),
|
|
|
- AbstractLongAssert::isNotNegative),
|
|
|
- satisfies(
|
|
|
- AttributeKey.longKey("kafka.record.queue_time_ms"),
|
|
|
- AbstractLongAssert::isNotNegative));
|
|
|
-
|
|
|
- if (testHeaders) {
|
|
|
- span.hasAttributesSatisfying(
|
|
|
- equalTo(
|
|
|
- AttributeKey.stringArrayKey("messaging.header.test_message_header"),
|
|
|
- Collections.singletonList("test")));
|
|
|
- }
|
|
|
- },
|
|
|
+ span ->
|
|
|
+ span.hasName(SHARED_TOPIC + " receive")
|
|
|
+ .hasKind(SpanKind.CONSUMER)
|
|
|
+ .hasNoParent()
|
|
|
+ .hasAttributesSatisfyingExactly(receiveAttributes(testHeaders)),
|
|
|
+ span ->
|
|
|
+ span.hasName(SHARED_TOPIC + " process")
|
|
|
+ .hasKind(SpanKind.CONSUMER)
|
|
|
+ .hasLinks(LinkData.create(producerSpan.get().getSpanContext()))
|
|
|
+ .hasParent(trace.getSpan(0))
|
|
|
+ .hasAttributesSatisfyingExactly(
|
|
|
+ processAttributes("10", greeting, testHeaders)),
|
|
|
span -> span.hasName("processing").hasParent(trace.getSpan(1))));
|
|
|
}
|
|
|
|
|
@@ -192,59 +133,26 @@ class KafkaClientDefaultTest extends KafkaClientPropagationBaseTest {
|
|
|
orderByRootSpanKind(SpanKind.INTERNAL, SpanKind.CONSUMER),
|
|
|
trace -> {
|
|
|
trace.hasSpansSatisfyingExactly(
|
|
|
- span -> {
|
|
|
- span.hasName(SHARED_TOPIC + " send")
|
|
|
- .hasKind(SpanKind.PRODUCER)
|
|
|
- .hasNoParent()
|
|
|
- .hasAttributesSatisfying(
|
|
|
- equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"),
|
|
|
- equalTo(SemanticAttributes.MESSAGING_DESTINATION_NAME, SHARED_TOPIC),
|
|
|
- equalTo(SemanticAttributes.MESSAGING_KAFKA_MESSAGE_TOMBSTONE, true),
|
|
|
- equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic"),
|
|
|
- satisfies(
|
|
|
- SemanticAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION,
|
|
|
- AbstractLongAssert::isNotNegative),
|
|
|
- satisfies(
|
|
|
- AttributeKey.longKey("messaging.kafka.message.offset"),
|
|
|
- AbstractLongAssert::isNotNegative));
|
|
|
- });
|
|
|
+ span ->
|
|
|
+ span.hasName(SHARED_TOPIC + " send")
|
|
|
+ .hasKind(SpanKind.PRODUCER)
|
|
|
+ .hasNoParent()
|
|
|
+ .hasAttributesSatisfyingExactly(sendAttributes(null, null, false)));
|
|
|
producerSpan.set(trace.getSpan(0));
|
|
|
},
|
|
|
- trace -> {
|
|
|
- trace.hasSpansSatisfyingExactly(
|
|
|
- span -> {
|
|
|
- span.hasName(SHARED_TOPIC + " receive")
|
|
|
- .hasKind(SpanKind.CONSUMER)
|
|
|
- .hasNoParent()
|
|
|
- .hasAttributesSatisfying(
|
|
|
- equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"),
|
|
|
- equalTo(SemanticAttributes.MESSAGING_DESTINATION_NAME, SHARED_TOPIC),
|
|
|
- equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic"),
|
|
|
- equalTo(SemanticAttributes.MESSAGING_OPERATION, "receive"));
|
|
|
- },
|
|
|
- span -> {
|
|
|
- span.hasName(SHARED_TOPIC + " process")
|
|
|
- .hasKind(SpanKind.CONSUMER)
|
|
|
- .hasLinks(LinkData.create(producerSpan.get().getSpanContext()))
|
|
|
- .hasParent(trace.getSpan(0))
|
|
|
- .hasAttributesSatisfying(
|
|
|
- equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"),
|
|
|
- equalTo(SemanticAttributes.MESSAGING_DESTINATION_NAME, SHARED_TOPIC),
|
|
|
- equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic"),
|
|
|
- equalTo(SemanticAttributes.MESSAGING_OPERATION, "process"),
|
|
|
- equalTo(SemanticAttributes.MESSAGING_KAFKA_MESSAGE_TOMBSTONE, true),
|
|
|
- equalTo(SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES, -1L),
|
|
|
- satisfies(
|
|
|
- SemanticAttributes.MESSAGING_KAFKA_SOURCE_PARTITION,
|
|
|
- AbstractLongAssert::isNotNegative),
|
|
|
- satisfies(
|
|
|
- AttributeKey.longKey("messaging.kafka.message.offset"),
|
|
|
- AbstractLongAssert::isNotNegative),
|
|
|
- satisfies(
|
|
|
- AttributeKey.longKey("kafka.record.queue_time_ms"),
|
|
|
- AbstractLongAssert::isNotNegative));
|
|
|
- });
|
|
|
- });
|
|
|
+ trace ->
|
|
|
+ trace.hasSpansSatisfyingExactly(
|
|
|
+ span ->
|
|
|
+ span.hasName(SHARED_TOPIC + " receive")
|
|
|
+ .hasKind(SpanKind.CONSUMER)
|
|
|
+ .hasNoParent()
|
|
|
+ .hasAttributesSatisfyingExactly(receiveAttributes(false)),
|
|
|
+ span ->
|
|
|
+ span.hasName(SHARED_TOPIC + " process")
|
|
|
+ .hasKind(SpanKind.CONSUMER)
|
|
|
+ .hasLinks(LinkData.create(producerSpan.get().getSpanContext()))
|
|
|
+ .hasParent(trace.getSpan(0))
|
|
|
+ .hasAttributesSatisfyingExactly(processAttributes(null, null, false))));
|
|
|
}
|
|
|
|
|
|
@DisplayName("test records(TopicPartition) kafka consume")
|
|
@@ -276,55 +184,25 @@ class KafkaClientDefaultTest extends KafkaClientPropagationBaseTest {
|
|
|
orderByRootSpanKind(SpanKind.INTERNAL, SpanKind.CONSUMER),
|
|
|
trace -> {
|
|
|
trace.hasSpansSatisfyingExactly(
|
|
|
- span -> {
|
|
|
- span.hasName(SHARED_TOPIC + " send")
|
|
|
- .hasKind(SpanKind.PRODUCER)
|
|
|
- .hasNoParent()
|
|
|
- .hasAttributesSatisfying(
|
|
|
- equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"),
|
|
|
- equalTo(SemanticAttributes.MESSAGING_DESTINATION_NAME, SHARED_TOPIC),
|
|
|
- equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic"),
|
|
|
- equalTo(
|
|
|
- SemanticAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION, partition),
|
|
|
- satisfies(
|
|
|
- AttributeKey.longKey("messaging.kafka.message.offset"),
|
|
|
- AbstractLongAssert::isNotNegative));
|
|
|
- });
|
|
|
+ span ->
|
|
|
+ span.hasName(SHARED_TOPIC + " send")
|
|
|
+ .hasKind(SpanKind.PRODUCER)
|
|
|
+ .hasNoParent()
|
|
|
+ .hasAttributesSatisfyingExactly(sendAttributes(null, greeting, false)));
|
|
|
producerSpan.set(trace.getSpan(0));
|
|
|
},
|
|
|
- trace -> {
|
|
|
- trace.hasSpansSatisfyingExactly(
|
|
|
- span -> {
|
|
|
- span.hasName(SHARED_TOPIC + " receive")
|
|
|
- .hasKind(SpanKind.CONSUMER)
|
|
|
- .hasNoParent()
|
|
|
- .hasAttributesSatisfying(
|
|
|
- equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"),
|
|
|
- equalTo(SemanticAttributes.MESSAGING_DESTINATION_NAME, SHARED_TOPIC),
|
|
|
- equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic"),
|
|
|
- equalTo(SemanticAttributes.MESSAGING_OPERATION, "receive"));
|
|
|
- },
|
|
|
- span -> {
|
|
|
- span.hasName(SHARED_TOPIC + " process")
|
|
|
- .hasKind(SpanKind.CONSUMER)
|
|
|
- .hasLinks(LinkData.create(producerSpan.get().getSpanContext()))
|
|
|
- .hasParent(trace.getSpan(0))
|
|
|
- .hasAttributesSatisfying(
|
|
|
- equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"),
|
|
|
- equalTo(SemanticAttributes.MESSAGING_DESTINATION_NAME, SHARED_TOPIC),
|
|
|
- equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic"),
|
|
|
- equalTo(SemanticAttributes.MESSAGING_OPERATION, "process"),
|
|
|
- equalTo(
|
|
|
- SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES,
|
|
|
- greeting.getBytes(StandardCharsets.UTF_8).length),
|
|
|
- equalTo(SemanticAttributes.MESSAGING_KAFKA_SOURCE_PARTITION, partition),
|
|
|
- satisfies(
|
|
|
- AttributeKey.longKey("messaging.kafka.message.offset"),
|
|
|
- AbstractLongAssert::isNotNegative),
|
|
|
- satisfies(
|
|
|
- AttributeKey.longKey("kafka.record.queue_time_ms"),
|
|
|
- AbstractLongAssert::isNotNegative));
|
|
|
- });
|
|
|
- });
|
|
|
+ trace ->
|
|
|
+ trace.hasSpansSatisfyingExactly(
|
|
|
+ span ->
|
|
|
+ span.hasName(SHARED_TOPIC + " receive")
|
|
|
+ .hasKind(SpanKind.CONSUMER)
|
|
|
+ .hasNoParent()
|
|
|
+ .hasAttributesSatisfyingExactly(receiveAttributes(false)),
|
|
|
+ span ->
|
|
|
+ span.hasName(SHARED_TOPIC + " process")
|
|
|
+ .hasKind(SpanKind.CONSUMER)
|
|
|
+ .hasLinks(LinkData.create(producerSpan.get().getSpanContext()))
|
|
|
+ .hasParent(trace.getSpan(0))
|
|
|
+ .hasAttributesSatisfyingExactly(processAttributes(null, greeting, false))));
|
|
|
}
|
|
|
}
|