Browse Source

add support for webflux server in spring starter (#11185)

Gregor Zeitlinger 10 months ago
parent
commit
8cf2217dca

+ 9 - 0
instrumentation/spring/spring-boot-autoconfigure/src/main/java/io/opentelemetry/instrumentation/spring/autoconfigure/instrumentation/webflux/SpringWebfluxInstrumentationAutoConfiguration.java

@@ -7,6 +7,7 @@ package io.opentelemetry.instrumentation.spring.autoconfigure.instrumentation.we
 
 import io.opentelemetry.api.OpenTelemetry;
 import io.opentelemetry.instrumentation.spring.autoconfigure.internal.SdkEnabled;
+import io.opentelemetry.instrumentation.spring.webflux.v5_3.SpringWebfluxTelemetry;
 import org.springframework.beans.factory.ObjectProvider;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
@@ -15,6 +16,7 @@ import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Conditional;
 import org.springframework.context.annotation.Configuration;
 import org.springframework.web.reactive.function.client.WebClient;
+import org.springframework.web.server.WebFilter;
 
 /**
  * Configures {@link WebClient} for tracing.
@@ -36,4 +38,11 @@ public class SpringWebfluxInstrumentationAutoConfiguration {
       ObjectProvider<OpenTelemetry> openTelemetryProvider) {
     return new WebClientBeanPostProcessor(openTelemetryProvider);
   }
+
+  @Bean
+  WebFilter telemetryFilter(OpenTelemetry openTelemetry) {
+    return SpringWebfluxTelemetry.builder(openTelemetry)
+        .build()
+        .createWebFilterAndRegisterReactorHook();
+  }
 }

+ 4 - 2
smoke-tests-otel-starter/build.gradle.kts

@@ -13,15 +13,17 @@ otelJava {
 dependencies {
   implementation("org.springframework.boot:spring-boot-starter-web")
   implementation("org.springframework.boot:spring-boot-starter-data-jdbc")
-  implementation("com.h2database:h2")
+  runtimeOnly("com.h2database:h2")
   implementation("org.apache.commons:commons-dbcp2")
   implementation(project(":instrumentation:jdbc:library"))
   implementation("org.springframework.kafka:spring-kafka") // not tested here, just make sure there are no warnings when it's included
-  implementation("org.springframework.boot:spring-boot-starter-webflux") // not tested here, just make sure there are no warnings when it's included
   implementation("io.opentelemetry:opentelemetry-extension-trace-propagators")
   implementation(project(":instrumentation:spring:starters:spring-boot-starter"))
   implementation(platform(org.springframework.boot.gradle.plugin.SpringBootPlugin.BOM_COORDINATES))
 
+  // webflux / reactive style
+  implementation("org.springframework.boot:spring-boot-starter-webflux")
+
   testImplementation("org.springframework.boot:spring-boot-starter-test")
   testImplementation(project(":testing-common"))
 }

+ 21 - 0
smoke-tests-otel-starter/src/main/java/io/opentelemetry/spring/smoketest/OtelSpringStarterWebfluxSmokeTestController.java

@@ -0,0 +1,21 @@
+/*
+ * Copyright The OpenTelemetry Authors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package io.opentelemetry.spring.smoketest;
+
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.RestController;
+import reactor.core.publisher.Mono;
+
+@RestController
+public class OtelSpringStarterWebfluxSmokeTestController {
+
+  public static final String WEBFLUX = "/webflux";
+
+  @GetMapping(WEBFLUX)
+  public Mono<String> getStock() {
+    return Mono.just("pong");
+  }
+}

+ 34 - 3
smoke-tests-otel-starter/src/test/java/io/opentelemetry/smoketest/OtelSpringStarterSmokeTest.java

@@ -41,6 +41,7 @@ import io.opentelemetry.semconv.incubating.DbIncubatingAttributes;
 import io.opentelemetry.semconv.incubating.ServiceIncubatingAttributes;
 import io.opentelemetry.spring.smoketest.OtelSpringStarterSmokeTestApplication;
 import io.opentelemetry.spring.smoketest.OtelSpringStarterSmokeTestController;
+import io.opentelemetry.spring.smoketest.OtelSpringStarterWebfluxSmokeTestController;
 import java.time.Duration;
 import java.util.Collections;
 import java.util.List;
@@ -65,6 +66,13 @@ import org.springframework.context.annotation.Configuration;
 import org.springframework.core.annotation.Order;
 import org.springframework.core.env.Environment;
 
+/**
+ * This test class enforces the order of the tests to make sure that {@link #shouldSendTelemetry()},
+ * which asserts the telemetry data from the application startup, is executed first.
+ *
+ * <p>The exporters are not reset using {@link org.junit.jupiter.api.BeforeEach}, because it would
+ * prevent the telemetry data from the application startup to be asserted.
+ */
 @ExtendWith(OutputCaptureExtension.class)
 @SpringBootTest(
     classes = {
@@ -187,7 +195,6 @@ class OtelSpringStarterSmokeTest {
   }
 
   @Test
-  @org.junit.jupiter.api.Order(10)
   void propertyConversion() {
     ConfigProperties configProperties =
         SpringConfigProperties.create(
@@ -268,7 +275,6 @@ class OtelSpringStarterSmokeTest {
   }
 
   @Test
-  @org.junit.jupiter.api.Order(2)
   void restTemplate() {
     assertClient(OtelSpringStarterSmokeTestController.REST_TEMPLATE);
   }
@@ -279,7 +285,7 @@ class OtelSpringStarterSmokeTest {
   }
 
   private void assertClient(String url) {
-    resetExporters(); // ignore the telemetry from application startup
+    resetExporters();
 
     testRestTemplate.getForObject(url, String.class);
 
@@ -307,6 +313,31 @@ class OtelSpringStarterSmokeTest {
                             .hasAttribute(HttpAttributes.HTTP_ROUTE, "/ping")));
   }
 
+  @Test
+  void webflux() {
+    resetExporters();
+
+    testRestTemplate.getForObject(
+        OtelSpringStarterWebfluxSmokeTestController.WEBFLUX, String.class);
+
+    TracesAssert.assertThat(expectSpans(2))
+        .hasTracesSatisfyingExactly(
+            traceAssert ->
+                traceAssert.hasSpansSatisfyingExactly(
+                    clientSpan ->
+                        clientSpan
+                            .hasKind(SpanKind.CLIENT)
+                            .hasAttributesSatisfying(
+                                a ->
+                                    assertThat(a.get(UrlAttributes.URL_FULL)).endsWith("/webflux")),
+                    serverSpan ->
+                        serverSpan
+                            .hasKind(SpanKind.SERVER)
+                            .hasAttribute(HttpAttributes.HTTP_REQUEST_METHOD, "GET")
+                            .hasAttribute(HttpAttributes.HTTP_RESPONSE_STATUS_CODE, 200L)
+                            .hasAttribute(HttpAttributes.HTTP_ROUTE, "/webflux")));
+  }
+
   private static List<SpanData> expectSpans(int spans) {
     with()
         .conditionEvaluationListener(