Răsfoiți Sursa

Fix flaky http pipelining test on akka http (#8437)

Lauri Tulmin 1 an în urmă
părinte
comite
8e016a7bf4

+ 0 - 6
instrumentation/akka/akka-actor-2.3/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/akkaactor/AkkaActorCellInstrumentation.java

@@ -12,7 +12,6 @@ import akka.dispatch.Envelope;
 import akka.dispatch.sysmsg.SystemMessage;
 import io.opentelemetry.context.Scope;
 import io.opentelemetry.instrumentation.api.util.VirtualField;
-import io.opentelemetry.javaagent.bootstrap.Java8BytecodeBridge;
 import io.opentelemetry.javaagent.bootstrap.executors.PropagatedContext;
 import io.opentelemetry.javaagent.bootstrap.executors.TaskAdviceHelper;
 import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
@@ -53,11 +52,6 @@ public class AkkaActorCellInstrumentation implements TypeInstrumentation {
       if (scope != null) {
         scope.close();
       }
-      // akka-http instrumentation can leak scopes
-      // reset the context to clear the leaked scopes
-      if (Java8BytecodeBridge.currentContext() != Java8BytecodeBridge.rootContext()) {
-        Java8BytecodeBridge.rootContext().makeCurrent();
-      }
     }
   }
 

+ 1 - 1
instrumentation/akka/akka-actor-2.3/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/akkaactor/AkkaDispatcherInstrumentation.java

@@ -42,7 +42,7 @@ public class AkkaDispatcherInstrumentation implements TypeInstrumentation {
     @Advice.OnMethodEnter(suppress = Throwable.class)
     public static PropagatedContext enterDispatch(@Advice.Argument(1) Envelope envelope) {
       Context context = Java8BytecodeBridge.currentContext();
-      if (ExecutorAdviceHelper.shouldPropagateContext(context, envelope)) {
+      if (ExecutorAdviceHelper.shouldPropagateContext(context, envelope.message())) {
         VirtualField<Envelope, PropagatedContext> virtualField =
             VirtualField.find(Envelope.class, PropagatedContext.class);
         return ExecutorAdviceHelper.attachContextToTask(context, virtualField, envelope);

+ 27 - 14
instrumentation/akka/akka-http-10.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/akkahttp/server/AkkaFlowWrapper.java

@@ -21,8 +21,8 @@ import akka.stream.stage.AbstractInHandler;
 import akka.stream.stage.AbstractOutHandler;
 import akka.stream.stage.GraphStage;
 import akka.stream.stage.GraphStageLogic;
+import akka.stream.stage.OutHandler;
 import io.opentelemetry.context.Context;
-import io.opentelemetry.context.Scope;
 import io.opentelemetry.javaagent.bootstrap.http.HttpServerResponseCustomizerHolder;
 import java.util.ArrayDeque;
 import java.util.Deque;
@@ -43,6 +43,22 @@ public class AkkaFlowWrapper
     return handler.join(new AkkaFlowWrapper());
   }
 
+  public static Context getContext(OutHandler outHandler) {
+    if (outHandler instanceof TracingLogic.ApplicationOutHandler) {
+      // We have multiple requests here only when requests are pipelined on the same connection.
+      // It appears that these requests are processed one by one so processing next request won't
+      // be started before the first one has returned a response, because of this the first request
+      // in the queue is always the one that is currently being processed.
+      TracingRequest request =
+          ((TracingLogic.ApplicationOutHandler) outHandler).getRequests().peek();
+      if (request != null) {
+        return request.context;
+      }
+    }
+
+    return null;
+  }
+
   @Override
   public BidiShape<HttpResponse, HttpResponse, HttpRequest, HttpRequest> shape() {
     return shape;
@@ -77,7 +93,7 @@ public class AkkaFlowWrapper
       // user code pulls request, pass request from server to user code
       setHandler(
           requestOut,
-          new AbstractOutHandler() {
+          new ApplicationOutHandler() {
             @Override
             public void onPull() {
               pull(requestIn);
@@ -102,9 +118,7 @@ public class AkkaFlowWrapper
               Context parentContext = currentContext();
               if (instrumenter().shouldStart(parentContext, request)) {
                 Context context = instrumenter().start(parentContext, request);
-                // scope opened here may leak, actor instrumentation will close it
-                Scope scope = context.makeCurrent();
-                tracingRequest = new TracingRequest(context, scope, request);
+                tracingRequest = new TracingRequest(context, request);
               }
               // event if span wasn't started we need to push TracingRequest to match response
               // with request
@@ -134,10 +148,6 @@ public class AkkaFlowWrapper
 
               TracingRequest tracingRequest = requests.poll();
               if (tracingRequest != null && tracingRequest != TracingRequest.EMPTY) {
-                // this may happen on a different thread from the one that opened the scope
-                // actor instrumentation will take care of the leaked scopes
-                tracingRequest.scope.close();
-
                 // akka response is immutable so the customizer just captures the added headers
                 AkkaHttpResponseMutator responseMutator = new AkkaHttpResponseMutator();
                 HttpServerResponseCustomizerHolder.getCustomizer()
@@ -160,7 +170,6 @@ public class AkkaFlowWrapper
                 if (tracingRequest == TracingRequest.EMPTY) {
                   continue;
                 }
-                tracingRequest.scope.close();
                 instrumenter()
                     .end(
                         tracingRequest.context, tracingRequest.request, errorResponse(), exception);
@@ -175,17 +184,21 @@ public class AkkaFlowWrapper
             }
           });
     }
+
+    abstract class ApplicationOutHandler extends AbstractOutHandler {
+      Deque<TracingRequest> getRequests() {
+        return requests;
+      }
+    }
   }
 
   private static class TracingRequest {
-    static final TracingRequest EMPTY = new TracingRequest(null, null, null);
+    static final TracingRequest EMPTY = new TracingRequest(null, null);
     final Context context;
-    final Scope scope;
     final HttpRequest request;
 
-    TracingRequest(Context context, Scope scope, HttpRequest request) {
+    TracingRequest(Context context, HttpRequest request) {
       this.context = context;
-      this.scope = scope;
       this.request = request;
     }
   }

+ 11 - 2
instrumentation/akka/akka-http-10.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/akkahttp/server/AkkaHttpServerInstrumentationModule.java

@@ -5,12 +5,14 @@
 
 package io.opentelemetry.javaagent.instrumentation.akkahttp.server;
 
-import static java.util.Collections.singletonList;
+import static io.opentelemetry.javaagent.extension.matcher.AgentElementMatchers.hasClassesNamed;
+import static java.util.Arrays.asList;
 
 import com.google.auto.service.AutoService;
 import io.opentelemetry.javaagent.extension.instrumentation.InstrumentationModule;
 import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
 import java.util.List;
+import net.bytebuddy.matcher.ElementMatcher;
 
 @AutoService(InstrumentationModule.class)
 public class AkkaHttpServerInstrumentationModule extends InstrumentationModule {
@@ -18,8 +20,15 @@ public class AkkaHttpServerInstrumentationModule extends InstrumentationModule {
     super("akka-http", "akka-http-10.0", "akka-http-server");
   }
 
+  @Override
+  public ElementMatcher.Junction<ClassLoader> classLoaderMatcher() {
+    // in GraphInterpreterInstrumentation we instrument a class that belongs to akka-streams, make
+    // sure this runs only when akka-http is present to avoid muzzle failures
+    return hasClassesNamed("akka.http.scaladsl.HttpExt");
+  }
+
   @Override
   public List<TypeInstrumentation> typeInstrumentations() {
-    return singletonList(new HttpExtServerInstrumentation());
+    return asList(new HttpExtServerInstrumentation(), new GraphInterpreterInstrumentation());
   }
 }

+ 22 - 0
instrumentation/akka/akka-http-10.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/akkahttp/server/AkkaServerIgnoredTypesConfigurer.java

@@ -0,0 +1,22 @@
+/*
+ * Copyright The OpenTelemetry Authors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package io.opentelemetry.javaagent.instrumentation.akkahttp.server;
+
+import com.google.auto.service.AutoService;
+import io.opentelemetry.javaagent.extension.ignore.IgnoredTypesBuilder;
+import io.opentelemetry.javaagent.extension.ignore.IgnoredTypesConfigurer;
+import io.opentelemetry.sdk.autoconfigure.spi.ConfigProperties;
+
+@AutoService(IgnoredTypesConfigurer.class)
+public class AkkaServerIgnoredTypesConfigurer implements IgnoredTypesConfigurer {
+
+  @Override
+  public void configure(IgnoredTypesBuilder builder, ConfigProperties config) {
+    // in AkkaHttpServerInstrumentationTestAsync http pipeline test sending this message trigger
+    // processing next request, we don't want to propagate context there
+    builder.ignoreTaskClass("akka.stream.impl.fusing.ActorGraphInterpreter$AsyncInput");
+  }
+}

+ 52 - 0
instrumentation/akka/akka-http-10.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/akkahttp/server/GraphInterpreterInstrumentation.java

@@ -0,0 +1,52 @@
+/*
+ * Copyright The OpenTelemetry Authors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package io.opentelemetry.javaagent.instrumentation.akkahttp.server;
+
+import static net.bytebuddy.matcher.ElementMatchers.named;
+
+import akka.stream.impl.fusing.GraphInterpreter;
+import io.opentelemetry.context.Context;
+import io.opentelemetry.context.Scope;
+import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
+import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
+import net.bytebuddy.asm.Advice;
+import net.bytebuddy.description.type.TypeDescription;
+import net.bytebuddy.matcher.ElementMatcher;
+
+public class GraphInterpreterInstrumentation implements TypeInstrumentation {
+  @Override
+  public ElementMatcher<TypeDescription> typeMatcher() {
+    return named("akka.stream.impl.fusing.GraphInterpreter");
+  }
+
+  @Override
+  public void transform(TypeTransformer transformer) {
+    transformer.applyAdviceToMethod(
+        named("processPush"), GraphInterpreterInstrumentation.class.getName() + "$PushAdvice");
+  }
+
+  @SuppressWarnings("unused")
+  public static class PushAdvice {
+
+    @Advice.OnMethodEnter(suppress = Throwable.class)
+    public static Scope onEnter(@Advice.Argument(0) GraphInterpreter.Connection connection) {
+      // processPush is called when execution passes to application or server. Here we propagate the
+      // context to the application code.
+      Context context = AkkaFlowWrapper.getContext(connection.outHandler());
+      if (context != null) {
+        return context.makeCurrent();
+      }
+      return null;
+    }
+
+    @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
+    public static void exit(@Advice.Enter Scope scope) {
+      if (scope != null) {
+        scope.close();
+      }
+    }
+  }
+}

+ 0 - 4
instrumentation/play/play-mvc/play-mvc-2.6/javaagent/build.gradle.kts

@@ -74,10 +74,6 @@ tasks {
       dependsOn(testing.suites)
     }
   }
-
-  withType<Test>().configureEach {
-    jvmArgs("-Dio.opentelemetry.javaagent.shaded.io.opentelemetry.context.enableStrictContext=false")
-  }
 }
 
 // play-test depends on websocket-client