Browse Source

Enable http pipelining test on spring webflux (#8501)

Lauri Tulmin 1 year ago
parent
commit
fa5d1744ed
10 changed files with 187 additions and 26 deletions
  1. 3 2
      instrumentation/netty/netty-4.1/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/netty/v4_1/AbstractChannelHandlerContextInstrumentation.java
  2. 2 1
      instrumentation/netty/netty-4.1/library/src/main/java/io/opentelemetry/instrumentation/netty/v4_1/internal/AttributeKeys.java
  3. 20 12
      instrumentation/netty/netty-4.1/library/src/main/java/io/opentelemetry/instrumentation/netty/v4_1/internal/server/HttpServerRequestTracingHandler.java
  4. 14 8
      instrumentation/netty/netty-4.1/library/src/main/java/io/opentelemetry/instrumentation/netty/v4_1/internal/server/HttpServerResponseTracingHandler.java
  5. 5 2
      instrumentation/ratpack/ratpack-1.4/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/ratpack/TracingHandler.java
  6. 1 0
      instrumentation/spring/spring-webflux/spring-webflux-5.0/javaagent/build.gradle.kts
  7. 57 0
      instrumentation/spring/spring-webflux/spring-webflux-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/webflux/v5_0/server/reactornetty/ContextHandlerInstrumentation.java
  8. 59 0
      instrumentation/spring/spring-webflux/spring-webflux-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/webflux/v5_0/server/reactornetty/HttpTrafficHandlerInstrumentation.java
  9. 26 0
      instrumentation/spring/spring-webflux/spring-webflux-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/webflux/v5_0/server/reactornetty/ReactorNettyInstrumentationModule.java
  10. 0 1
      instrumentation/spring/spring-webflux/spring-webflux-5.0/javaagent/src/test/java/server/base/SpringWebFluxServerTest.java

+ 3 - 2
instrumentation/netty/netty-4.1/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/netty/v4_1/AbstractChannelHandlerContextInstrumentation.java

@@ -19,6 +19,7 @@ import io.opentelemetry.instrumentation.netty.v4.common.HttpRequestAndChannel;
 import io.opentelemetry.instrumentation.netty.v4_1.internal.AttributeKeys;
 import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
 import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
+import java.util.Deque;
 import net.bytebuddy.asm.Advice;
 import net.bytebuddy.description.type.TypeDescription;
 import net.bytebuddy.matcher.ElementMatcher;
@@ -59,8 +60,8 @@ public class AbstractChannelHandlerContextInstrumentation implements TypeInstrum
         instrumenter().end(clientContext, request, null, throwable);
         return;
       }
-
-      Context serverContext = ctx.channel().attr(AttributeKeys.SERVER_CONTEXT).get();
+      Deque<Context> contexts = ctx.channel().attr(AttributeKeys.SERVER_CONTEXT).get();
+      Context serverContext = contexts != null ? contexts.peekFirst() : null;
       if (serverContext != null) {
         NettyErrorHolder.set(serverContext, throwable);
       }

+ 2 - 1
instrumentation/netty/netty-4.1/library/src/main/java/io/opentelemetry/instrumentation/netty/v4_1/internal/AttributeKeys.java

@@ -7,6 +7,7 @@ package io.opentelemetry.instrumentation.netty.v4_1.internal;
 
 import io.netty.util.AttributeKey;
 import io.opentelemetry.context.Context;
+import java.util.Deque;
 
 /**
  * This class is internal and is hence not for public use. Its APIs are unstable and can change at
@@ -17,7 +18,7 @@ public final class AttributeKeys {
   // this is the context that has the server span
   //
   // note: this attribute key is also used by ratpack instrumentation
-  public static final AttributeKey<Context> SERVER_CONTEXT =
+  public static final AttributeKey<Deque<Context>> SERVER_CONTEXT =
       AttributeKey.valueOf(AttributeKeys.class, "server-context");
 
   public static final AttributeKey<Context> CLIENT_CONTEXT =

+ 20 - 12
instrumentation/netty/netty-4.1/library/src/main/java/io/opentelemetry/instrumentation/netty/v4_1/internal/server/HttpServerRequestTracingHandler.java

@@ -17,6 +17,8 @@ import io.opentelemetry.context.Scope;
 import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
 import io.opentelemetry.instrumentation.netty.v4.common.HttpRequestAndChannel;
 import io.opentelemetry.instrumentation.netty.v4_1.internal.AttributeKeys;
+import java.util.ArrayDeque;
+import java.util.Deque;
 
 /**
  * This class is internal and is hence not for public use. Its APIs are unstable and can change at
@@ -24,7 +26,7 @@ import io.opentelemetry.instrumentation.netty.v4_1.internal.AttributeKeys;
  */
 public class HttpServerRequestTracingHandler extends ChannelInboundHandlerAdapter {
 
-  static final AttributeKey<HttpRequestAndChannel> HTTP_SERVER_REQUEST =
+  static final AttributeKey<Deque<HttpRequestAndChannel>> HTTP_SERVER_REQUEST =
       AttributeKey.valueOf(HttpServerRequestTracingHandler.class, "http-server-request");
 
   private final Instrumenter<HttpRequestAndChannel, HttpResponse> instrumenter;
@@ -37,11 +39,10 @@ public class HttpServerRequestTracingHandler extends ChannelInboundHandlerAdapte
   @Override
   public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
     Channel channel = ctx.channel();
-    Attribute<Context> contextAttr = channel.attr(AttributeKeys.SERVER_CONTEXT);
-    Attribute<HttpRequestAndChannel> requestAttr = channel.attr(HTTP_SERVER_REQUEST);
+    Deque<Context> contexts = getOrCreate(channel, AttributeKeys.SERVER_CONTEXT);
 
     if (!(msg instanceof HttpRequest)) {
-      Context serverContext = contextAttr.get();
+      Context serverContext = contexts.peekLast();
       if (serverContext == null) {
         super.channelRead(ctx, msg);
       } else {
@@ -52,28 +53,35 @@ public class HttpServerRequestTracingHandler extends ChannelInboundHandlerAdapte
       return;
     }
 
-    Context parentContext = contextAttr.get();
-    if (parentContext == null) {
-      parentContext = Context.current();
-    }
+    Context parentContext = Context.current();
     HttpRequestAndChannel request = HttpRequestAndChannel.create((HttpRequest) msg, channel);
-
     if (!instrumenter.shouldStart(parentContext, request)) {
       super.channelRead(ctx, msg);
       return;
     }
 
     Context context = instrumenter.start(parentContext, request);
-    contextAttr.set(context);
-    requestAttr.set(request);
+    contexts.addLast(context);
+    Deque<HttpRequestAndChannel> requests = getOrCreate(channel, HTTP_SERVER_REQUEST);
+    requests.addLast(request);
 
     try (Scope ignored = context.makeCurrent()) {
       super.channelRead(ctx, msg);
       // the span is ended normally in HttpServerResponseTracingHandler
     } catch (Throwable throwable) {
       // make sure to remove the server context on end() call
-      instrumenter.end(contextAttr.getAndSet(null), requestAttr.getAndSet(null), null, throwable);
+      instrumenter.end(contexts.removeLast(), requests.removeLast(), null, throwable);
       throw throwable;
     }
   }
+
+  private static <T> Deque<T> getOrCreate(Channel channel, AttributeKey<Deque<T>> key) {
+    Attribute<Deque<T>> attribute = channel.attr(key);
+    Deque<T> deque = attribute.get();
+    if (deque == null) {
+      deque = new ArrayDeque<>();
+      attribute.set(deque);
+    }
+    return deque;
+  }
 }

+ 14 - 8
instrumentation/netty/netty-4.1/library/src/main/java/io/opentelemetry/instrumentation/netty/v4_1/internal/server/HttpServerResponseTracingHandler.java

@@ -22,6 +22,7 @@ import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
 import io.opentelemetry.instrumentation.netty.common.internal.NettyErrorHolder;
 import io.opentelemetry.instrumentation.netty.v4.common.HttpRequestAndChannel;
 import io.opentelemetry.instrumentation.netty.v4_1.internal.AttributeKeys;
+import java.util.Deque;
 import javax.annotation.Nullable;
 
 /**
@@ -45,12 +46,17 @@ public class HttpServerResponseTracingHandler extends ChannelOutboundHandlerAdap
 
   @Override
   public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise prm) throws Exception {
-    Attribute<Context> contextAttr = ctx.channel().attr(AttributeKeys.SERVER_CONTEXT);
-    Context context = contextAttr.get();
+    Attribute<Deque<Context>> contextAttr = ctx.channel().attr(AttributeKeys.SERVER_CONTEXT);
+
+    Deque<Context> contexts = contextAttr.get();
+    Context context = contexts != null ? contexts.peekFirst() : null;
     if (context == null) {
       super.write(ctx, msg, prm);
       return;
     }
+    Attribute<Deque<HttpRequestAndChannel>> requestAttr = ctx.channel().attr(HTTP_SERVER_REQUEST);
+    Deque<HttpRequestAndChannel> requests = requestAttr.get();
+    HttpRequestAndChannel request = requests != null ? requests.peekFirst() : null;
 
     ChannelPromise writePromise;
 
@@ -68,16 +74,16 @@ public class HttpServerResponseTracingHandler extends ChannelOutboundHandlerAdap
       if (msg instanceof FullHttpResponse) {
         // Headers and body all sent together, we have the response information in the msg.
         beforeCommitHandler.handle(context, (HttpResponse) msg);
-        contextAttr.set(null);
-        HttpRequestAndChannel request = ctx.channel().attr(HTTP_SERVER_REQUEST).getAndSet(null);
+        contexts.removeFirst();
+        requests.removeFirst();
         writePromise.addListener(
             future -> end(context, request, (FullHttpResponse) msg, writePromise));
       } else {
         // Body sent after headers. We stored the response information in the context when
         // encountering HttpResponse (which was not FullHttpResponse since it's not
         // LastHttpContent).
-        contextAttr.set(null);
-        HttpRequestAndChannel request = ctx.channel().attr(HTTP_SERVER_REQUEST).getAndSet(null);
+        contexts.removeFirst();
+        requests.removeFirst();
         HttpResponse response = ctx.channel().attr(HTTP_SERVER_RESPONSE).getAndSet(null);
         writePromise.addListener(future -> end(context, request, response, writePromise));
       }
@@ -93,8 +99,8 @@ public class HttpServerResponseTracingHandler extends ChannelOutboundHandlerAdap
     try (Scope ignored = context.makeCurrent()) {
       super.write(ctx, msg, writePromise);
     } catch (Throwable throwable) {
-      contextAttr.set(null);
-      HttpRequestAndChannel request = ctx.channel().attr(HTTP_SERVER_REQUEST).getAndSet(null);
+      contexts.removeFirst();
+      requests.removeFirst();
       end(context, request, null, throwable);
       throw throwable;
     }

+ 5 - 2
instrumentation/ratpack/ratpack-1.4/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/ratpack/TracingHandler.java

@@ -13,6 +13,7 @@ import io.netty.util.Attribute;
 import io.opentelemetry.context.Scope;
 import io.opentelemetry.instrumentation.netty.v4_1.internal.AttributeKeys;
 import io.opentelemetry.javaagent.bootstrap.Java8BytecodeBridge;
+import java.util.Deque;
 import ratpack.handling.Context;
 import ratpack.handling.Handler;
 
@@ -24,9 +25,11 @@ public final class TracingHandler implements Handler {
 
   @Override
   public void handle(Context ctx) {
-    Attribute<io.opentelemetry.context.Context> spanAttribute =
+    Attribute<Deque<io.opentelemetry.context.Context>> serverContextAttribute =
         ctx.getDirectChannelAccess().getChannel().attr(AttributeKeys.SERVER_CONTEXT);
-    io.opentelemetry.context.Context serverSpanContext = spanAttribute.get();
+    Deque<io.opentelemetry.context.Context> contexts = serverContextAttribute.get();
+    io.opentelemetry.context.Context serverSpanContext =
+        contexts != null ? contexts.peekFirst() : null;
 
     // Must use context from channel, as executor instrumentation is not accurate - Ratpack
     // internally queues events and then drains them in batches, causing executor instrumentation to

+ 1 - 0
instrumentation/spring/spring-webflux/spring-webflux-5.0/javaagent/build.gradle.kts

@@ -40,6 +40,7 @@ muzzle {
 
 dependencies {
   implementation(project(":instrumentation:spring:spring-webflux:spring-webflux-5.3:library"))
+  implementation(project(":instrumentation:netty:netty-4.1:library"))
 
   compileOnly("org.springframework:spring-webflux:5.0.0.RELEASE")
   compileOnly("io.projectreactor.ipc:reactor-netty:0.7.0.RELEASE")

+ 57 - 0
instrumentation/spring/spring-webflux/spring-webflux-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/webflux/v5_0/server/reactornetty/ContextHandlerInstrumentation.java

@@ -0,0 +1,57 @@
+/*
+ * Copyright The OpenTelemetry Authors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package io.opentelemetry.javaagent.instrumentation.spring.webflux.v5_0.server.reactornetty;
+
+import static net.bytebuddy.matcher.ElementMatchers.named;
+import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
+
+import io.netty.channel.Channel;
+import io.opentelemetry.context.Context;
+import io.opentelemetry.context.Scope;
+import io.opentelemetry.instrumentation.netty.v4_1.internal.AttributeKeys;
+import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
+import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
+import java.util.Deque;
+import net.bytebuddy.asm.Advice;
+import net.bytebuddy.description.type.TypeDescription;
+import net.bytebuddy.matcher.ElementMatcher;
+
+// used before reactor-netty-0.8
+public class ContextHandlerInstrumentation implements TypeInstrumentation {
+  @Override
+  public ElementMatcher<TypeDescription> typeMatcher() {
+    return named("reactor.ipc.netty.channel.ContextHandler");
+  }
+
+  @Override
+  public void transform(TypeTransformer transformer) {
+    transformer.applyAdviceToMethod(
+        named("createOperations").and(takesArgument(0, named("io.netty.channel.Channel"))),
+        ContextHandlerInstrumentation.class.getName() + "$CreateOperationsAdvice");
+  }
+
+  @SuppressWarnings("unused")
+  public static class CreateOperationsAdvice {
+
+    @Advice.OnMethodEnter(suppress = Throwable.class)
+    public static Scope onEnter(@Advice.Argument(0) Channel channel) {
+      // set context to the first unprocessed request
+      Deque<Context> contexts = channel.attr(AttributeKeys.SERVER_CONTEXT).get();
+      Context context = contexts != null ? contexts.peekFirst() : null;
+      if (context != null) {
+        return context.makeCurrent();
+      }
+      return null;
+    }
+
+    @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
+    public static void onExit(@Advice.Enter Scope scope) {
+      if (scope != null) {
+        scope.close();
+      }
+    }
+  }
+}

+ 59 - 0
instrumentation/spring/spring-webflux/spring-webflux-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/webflux/v5_0/server/reactornetty/HttpTrafficHandlerInstrumentation.java

@@ -0,0 +1,59 @@
+/*
+ * Copyright The OpenTelemetry Authors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package io.opentelemetry.javaagent.instrumentation.spring.webflux.v5_0.server.reactornetty;
+
+import static net.bytebuddy.matcher.ElementMatchers.named;
+import static net.bytebuddy.matcher.ElementMatchers.takesNoArguments;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.opentelemetry.context.Context;
+import io.opentelemetry.context.Scope;
+import io.opentelemetry.instrumentation.netty.v4_1.internal.AttributeKeys;
+import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
+import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
+import java.util.Deque;
+import net.bytebuddy.asm.Advice;
+import net.bytebuddy.description.type.TypeDescription;
+import net.bytebuddy.matcher.ElementMatcher;
+
+// used since reactor-netty-0.8
+public class HttpTrafficHandlerInstrumentation implements TypeInstrumentation {
+  @Override
+  public ElementMatcher<TypeDescription> typeMatcher() {
+    return named("reactor.netty.http.server.HttpTrafficHandler");
+  }
+
+  @Override
+  public void transform(TypeTransformer transformer) {
+    transformer.applyAdviceToMethod(
+        named("run").and(takesNoArguments()),
+        HttpTrafficHandlerInstrumentation.class.getName() + "$RunAdvice");
+  }
+
+  @SuppressWarnings("unused")
+  public static class RunAdvice {
+
+    @Advice.OnMethodEnter(suppress = Throwable.class)
+    public static Scope onEnter(
+        @Advice.FieldValue("ctx") ChannelHandlerContext channelHandlerContext) {
+      // set context to the first unprocessed request
+      Deque<Context> contexts =
+          channelHandlerContext.channel().attr(AttributeKeys.SERVER_CONTEXT).get();
+      Context context = contexts != null ? contexts.peekFirst() : null;
+      if (context != null) {
+        return context.makeCurrent();
+      }
+      return null;
+    }
+
+    @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
+    public static void onExit(@Advice.Enter Scope scope) {
+      if (scope != null) {
+        scope.close();
+      }
+    }
+  }
+}

+ 26 - 0
instrumentation/spring/spring-webflux/spring-webflux-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/webflux/v5_0/server/reactornetty/ReactorNettyInstrumentationModule.java

@@ -0,0 +1,26 @@
+/*
+ * Copyright The OpenTelemetry Authors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package io.opentelemetry.javaagent.instrumentation.spring.webflux.v5_0.server.reactornetty;
+
+import com.google.auto.service.AutoService;
+import io.opentelemetry.javaagent.extension.instrumentation.InstrumentationModule;
+import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
+import java.util.Arrays;
+import java.util.List;
+
+@AutoService(InstrumentationModule.class)
+public class ReactorNettyInstrumentationModule extends InstrumentationModule {
+
+  public ReactorNettyInstrumentationModule() {
+    super("reactor-netty", "reactor-netty-server");
+  }
+
+  @Override
+  public List<TypeInstrumentation> typeInstrumentations() {
+    return Arrays.asList(
+        new HttpTrafficHandlerInstrumentation(), new ContextHandlerInstrumentation());
+  }
+}

+ 0 - 1
instrumentation/spring/spring-webflux/spring-webflux-5.0/javaagent/src/test/java/server/base/SpringWebFluxServerTest.java

@@ -65,6 +65,5 @@ public abstract class SpringWebFluxServerTest
     options.setTestPathParam(true);
     options.setExpectedException(new IllegalStateException(EXCEPTION.getBody()));
     options.setHasHandlerSpan(unused -> true);
-    options.setTestHttpPipelining(false);
   }
 }