|
@@ -3,98 +3,47 @@
|
|
|
* SPDX-License-Identifier: Apache-2.0
|
|
|
*/
|
|
|
|
|
|
-package io.opentelemetry.instrumentation.spring.autoconfigure.instrumentation.kafka;
|
|
|
+package io.opentelemetry.spring.smoketest;
|
|
|
|
|
|
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo;
|
|
|
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.satisfies;
|
|
|
|
|
|
import io.opentelemetry.api.OpenTelemetry;
|
|
|
import io.opentelemetry.api.trace.SpanKind;
|
|
|
-import io.opentelemetry.instrumentation.testing.junit.LibraryInstrumentationExtension;
|
|
|
import io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes;
|
|
|
-import java.time.Duration;
|
|
|
import org.apache.kafka.clients.admin.NewTopic;
|
|
|
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
|
|
import org.assertj.core.api.AbstractLongAssert;
|
|
|
import org.assertj.core.api.AbstractStringAssert;
|
|
|
-import org.junit.jupiter.api.AfterAll;
|
|
|
-import org.junit.jupiter.api.BeforeAll;
|
|
|
-import org.junit.jupiter.api.BeforeEach;
|
|
|
import org.junit.jupiter.api.Test;
|
|
|
-import org.junit.jupiter.api.extension.RegisterExtension;
|
|
|
-import org.springframework.boot.autoconfigure.AutoConfigurations;
|
|
|
-import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration;
|
|
|
-import org.springframework.boot.test.context.runner.ApplicationContextRunner;
|
|
|
-import org.springframework.context.ConfigurableApplicationContext;
|
|
|
+import org.springframework.beans.factory.annotation.Autowired;
|
|
|
import org.springframework.context.annotation.Bean;
|
|
|
import org.springframework.context.annotation.Configuration;
|
|
|
import org.springframework.kafka.annotation.KafkaListener;
|
|
|
import org.springframework.kafka.config.TopicBuilder;
|
|
|
import org.springframework.kafka.core.KafkaTemplate;
|
|
|
-import org.testcontainers.containers.KafkaContainer;
|
|
|
-import org.testcontainers.containers.wait.strategy.Wait;
|
|
|
-import org.testcontainers.utility.DockerImageName;
|
|
|
|
|
|
-class KafkaIntegrationTest {
|
|
|
+abstract class AbstractKafkaSpringStarterSmokeTest extends AbstractSpringStarterSmokeTest {
|
|
|
|
|
|
- @RegisterExtension
|
|
|
- static final LibraryInstrumentationExtension testing = LibraryInstrumentationExtension.create();
|
|
|
-
|
|
|
- static KafkaContainer kafka;
|
|
|
-
|
|
|
- private ApplicationContextRunner contextRunner;
|
|
|
-
|
|
|
- @BeforeAll
|
|
|
- static void setUpKafka() {
|
|
|
- kafka =
|
|
|
- new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.10"))
|
|
|
- .withEnv("KAFKA_HEAP_OPTS", "-Xmx256m")
|
|
|
- .waitingFor(Wait.forLogMessage(".*started \\(kafka.server.KafkaServer\\).*", 1))
|
|
|
- .withStartupTimeout(Duration.ofMinutes(1));
|
|
|
- kafka.start();
|
|
|
- }
|
|
|
-
|
|
|
- @AfterAll
|
|
|
- static void tearDownKafka() {
|
|
|
- kafka.stop();
|
|
|
- }
|
|
|
-
|
|
|
- @BeforeEach
|
|
|
- void setUpContext() {
|
|
|
- contextRunner =
|
|
|
- new ApplicationContextRunner()
|
|
|
- .withConfiguration(
|
|
|
- AutoConfigurations.of(
|
|
|
- KafkaAutoConfiguration.class,
|
|
|
- KafkaInstrumentationAutoConfiguration.class,
|
|
|
- TestConfig.class))
|
|
|
- .withBean("openTelemetry", OpenTelemetry.class, testing::getOpenTelemetry)
|
|
|
- .withPropertyValues(
|
|
|
- "spring.kafka.bootstrap-servers=" + kafka.getBootstrapServers(),
|
|
|
- "spring.kafka.consumer.auto-offset-reset=earliest",
|
|
|
- "spring.kafka.consumer.linger-ms=10",
|
|
|
- "spring.kafka.listener.idle-between-polls=1000",
|
|
|
- "spring.kafka.producer.transaction-id-prefix=test-");
|
|
|
- }
|
|
|
+ @Autowired protected KafkaTemplate<String, String> kafkaTemplate;
|
|
|
|
|
|
@Test
|
|
|
void shouldInstrumentProducerAndConsumer() {
|
|
|
- contextRunner.run(KafkaIntegrationTest::runShouldInstrumentProducerAndConsumer);
|
|
|
- }
|
|
|
-
|
|
|
- // In kafka 2 ops.send is deprecated. We are using it to avoid reflection because kafka 3 also has
|
|
|
- // ops.send, although with different return type.
|
|
|
- @SuppressWarnings({"unchecked", "deprecation"})
|
|
|
- private static void runShouldInstrumentProducerAndConsumer(
|
|
|
- ConfigurableApplicationContext applicationContext) {
|
|
|
- KafkaTemplate<String, String> kafkaTemplate = applicationContext.getBean(KafkaTemplate.class);
|
|
|
+ testing.clearAllExportedData(); // ignore data from application startup
|
|
|
|
|
|
testing.runWithSpan(
|
|
|
"producer",
|
|
|
() -> {
|
|
|
kafkaTemplate.executeInTransaction(
|
|
|
ops -> {
|
|
|
- ops.send("testTopic", "10", "testSpan");
|
|
|
+ // return type is incompatible between Spring Boot 2 and 3
|
|
|
+ try {
|
|
|
+ ops.getClass()
|
|
|
+ .getDeclaredMethod("send", String.class, Object.class, Object.class)
|
|
|
+ .invoke(ops, "testTopic", "10", "testSpan");
|
|
|
+ } catch (Exception e) {
|
|
|
+ throw new IllegalStateException(e);
|
|
|
+ }
|
|
|
return 0;
|
|
|
});
|
|
|
});
|
|
@@ -128,7 +77,7 @@ class KafkaIntegrationTest {
|
|
|
span.hasName("testTopic process")
|
|
|
.hasKind(SpanKind.CONSUMER)
|
|
|
.hasParent(trace.getSpan(1))
|
|
|
- .hasAttributesSatisfyingExactly(
|
|
|
+ .hasAttributesSatisfying(
|
|
|
equalTo(MessagingIncubatingAttributes.MESSAGING_SYSTEM, "kafka"),
|
|
|
equalTo(
|
|
|
MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME,
|
|
@@ -155,7 +104,9 @@ class KafkaIntegrationTest {
|
|
|
}
|
|
|
|
|
|
@Configuration
|
|
|
- static class TestConfig {
|
|
|
+ public static class KafkaConfig {
|
|
|
+
|
|
|
+ @Autowired OpenTelemetry openTelemetry;
|
|
|
|
|
|
@Bean
|
|
|
public NewTopic testTopic() {
|
|
@@ -164,7 +115,12 @@ class KafkaIntegrationTest {
|
|
|
|
|
|
@KafkaListener(id = "testListener", topics = "testTopic")
|
|
|
public void listener(ConsumerRecord<String, String> record) {
|
|
|
- testing.runWithSpan("consumer", () -> {});
|
|
|
+ openTelemetry
|
|
|
+ .getTracer("consumer", "1.0")
|
|
|
+ .spanBuilder("consumer")
|
|
|
+ .setSpanKind(SpanKind.CONSUMER)
|
|
|
+ .startSpan()
|
|
|
+ .end();
|
|
|
}
|
|
|
}
|
|
|
}
|