Browse Source

Add HTTP client and server tests (#6836)

Resolves #6835.
jack-berg 2 years ago
parent
commit
fcd5876e1a
22 changed files with 767 additions and 382 deletions
  1. 1 0
      instrumentation/netty/netty-4.1/javaagent/build.gradle.kts
  2. 1 0
      instrumentation/netty/netty-4.1/javaagent/src/test/groovy/Netty41ClientSslTest.groovy
  3. 66 149
      instrumentation/netty/netty-4.1/javaagent/src/test/groovy/Netty41ClientTest.groovy
  4. 1 0
      instrumentation/netty/netty-4.1/javaagent/src/test/groovy/Netty41ConnectionSpanTest.groovy
  5. 0 57
      instrumentation/netty/netty-4.1/javaagent/src/test/groovy/Netty41NativeClientTest.groovy
  6. 0 147
      instrumentation/netty/netty-4.1/javaagent/src/test/groovy/Netty41ServerTest.groovy
  7. 36 0
      instrumentation/netty/netty-4.1/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/netty/v4_1/Netty41ClientTest.java
  8. 86 0
      instrumentation/netty/netty-4.1/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/netty/v4_1/Netty41NativeClientTest.java
  9. 21 0
      instrumentation/netty/netty-4.1/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/netty/v4_1/Netty41ServerTest.java
  10. 2 0
      instrumentation/netty/netty-4.1/library/build.gradle.kts
  11. 11 0
      instrumentation/netty/netty-4.1/library/src/main/java/io/opentelemetry/instrumentation/netty/v4_1/NettyClientTelemetry.java
  12. 68 0
      instrumentation/netty/netty-4.1/library/src/test/java/io/opentelemetry/instrumentation/netty/v4_1/Netty41ClientTest.java
  13. 34 0
      instrumentation/netty/netty-4.1/library/src/test/java/io/opentelemetry/instrumentation/netty/v4_1/Netty41ServerTest.java
  14. 9 0
      instrumentation/netty/netty-4.1/testing/build.gradle.kts
  15. 128 0
      instrumentation/netty/netty-4.1/testing/src/main/java/io/opentelemetry/instrumentation/netty/v4_1/AbstractNetty41ClientTest.java
  16. 176 0
      instrumentation/netty/netty-4.1/testing/src/main/java/io/opentelemetry/instrumentation/netty/v4_1/AbstractNetty41ServerTest.java
  17. 4 2
      instrumentation/netty/netty-4.1/testing/src/main/java/io/opentelemetry/instrumentation/netty/v4_1/ClientHandler.java
  18. 110 0
      instrumentation/netty/netty-4.1/testing/src/main/java/io/opentelemetry/instrumentation/netty/v4_1/Netty41ClientExtension.java
  19. 8 23
      instrumentation/netty/netty-4.1/testing/src/main/java/io/opentelemetry/instrumentation/netty/v4_1/SingleNettyConnection.java
  20. 2 2
      instrumentation/reactor/reactor-netty/reactor-netty-0.9/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v0_9/OnRequest.java
  21. 2 2
      instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/HttpResponseReceiverInstrumenter.java
  22. 1 0
      settings.gradle.kts

+ 1 - 0
instrumentation/netty/netty-4.1/javaagent/build.gradle.kts

@@ -30,6 +30,7 @@ dependencies {
   implementation(project(":instrumentation:netty:netty-4-common:library"))
   implementation(project(":instrumentation:netty:netty-common:library"))
 
+  testImplementation(project(":instrumentation:netty:netty-4.1:testing"))
   testInstrumentation(project(":instrumentation:netty:netty-3.8:javaagent"))
   testInstrumentation(project(":instrumentation:netty:netty-4.0:javaagent"))
 

+ 1 - 0
instrumentation/netty/netty-4.1/javaagent/src/test/groovy/Netty41ClientSslTest.groovy

@@ -20,6 +20,7 @@ import io.netty.handler.codec.http.HttpVersion
 import io.netty.handler.ssl.SslContext
 import io.netty.handler.ssl.SslContextBuilder
 import io.netty.handler.ssl.SslHandler
+import io.opentelemetry.instrumentation.netty.v4_1.ClientHandler
 import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification
 import io.opentelemetry.instrumentation.testing.junit.http.HttpClientTestServer
 import io.opentelemetry.semconv.trace.attributes.SemanticAttributes

+ 66 - 149
instrumentation/netty/netty-4.1/javaagent/src/test/groovy/Netty41ClientTest.groovy

@@ -9,7 +9,6 @@ import io.netty.channel.Channel
 import io.netty.channel.ChannelHandler
 import io.netty.channel.ChannelHandlerContext
 import io.netty.channel.ChannelInitializer
-import io.netty.channel.ChannelOption
 import io.netty.channel.ChannelPipeline
 import io.netty.channel.EventLoopGroup
 import io.netty.channel.embedded.EmbeddedChannel
@@ -21,16 +20,10 @@ import io.netty.handler.codec.http.HttpClientCodec
 import io.netty.handler.codec.http.HttpHeaderNames
 import io.netty.handler.codec.http.HttpMethod
 import io.netty.handler.codec.http.HttpVersion
-import io.netty.handler.ssl.SslContext
-import io.netty.handler.ssl.SslContextBuilder
-import io.netty.handler.timeout.ReadTimeoutHandler
-import io.opentelemetry.api.common.AttributeKey
 import io.opentelemetry.api.trace.SpanKind
-import io.opentelemetry.instrumentation.test.AgentTestTrait
-import io.opentelemetry.instrumentation.test.base.HttpClientTest
-import io.opentelemetry.instrumentation.testing.junit.http.AbstractHttpClientTest
-import io.opentelemetry.instrumentation.testing.junit.http.SingleConnection
-import io.opentelemetry.semconv.trace.attributes.SemanticAttributes
+import io.opentelemetry.instrumentation.netty.v4_1.ClientHandler
+import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification
+import io.opentelemetry.instrumentation.testing.junit.http.HttpClientTestServer
 import spock.lang.Shared
 import spock.lang.Unroll
 
@@ -40,134 +33,18 @@ import java.util.concurrent.TimeUnit
 import static org.junit.jupiter.api.Assumptions.assumeTrue
 
 @Unroll
-class Netty41ClientTest extends HttpClientTest<DefaultFullHttpRequest> implements AgentTestTrait {
+class Netty41ClientTest extends AgentInstrumentationSpecification {
 
   @Shared
-  private EventLoopGroup eventLoopGroup = buildEventLoopGroup()
+  private HttpClientTestServer server
 
-  @Shared
-  private Bootstrap bootstrap = buildBootstrap(false)
-
-  @Shared
-  private Bootstrap httpsBootstrap = buildBootstrap(true)
-
-  @Shared
-  private Bootstrap readTimeoutBootstrap = buildBootstrap(false, true)
-
-  def cleanupSpec() {
-    eventLoopGroup?.shutdownGracefully()
-  }
-
-  Bootstrap buildBootstrap(boolean https, boolean readTimeout = false) {
-    Bootstrap bootstrap = new Bootstrap()
-    bootstrap.group(eventLoopGroup)
-      .channel(getChannelClass())
-      .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, CONNECT_TIMEOUT_MS)
-      .handler(new ChannelInitializer<SocketChannel>() {
-        @Override
-        protected void initChannel(SocketChannel socketChannel) throws Exception {
-          ChannelPipeline pipeline = socketChannel.pipeline()
-          if (https) {
-            SslContext sslContext = SslContextBuilder.forClient().build()
-            pipeline.addLast(sslContext.newHandler(socketChannel.alloc()))
-          }
-          if (readTimeout) {
-            pipeline.addLast(new ReadTimeoutHandler(READ_TIMEOUT_MS, TimeUnit.MILLISECONDS))
-          }
-          pipeline.addLast(new HttpClientCodec())
-        }
-      })
-
-    return bootstrap
-  }
-
-  EventLoopGroup buildEventLoopGroup() {
-    return new NioEventLoopGroup()
-  }
-
-  Class<Channel> getChannelClass() {
-    return NioSocketChannel
-  }
-
-  Bootstrap getBootstrap(URI uri) {
-    if (uri.getScheme() == "https") {
-      return httpsBootstrap
-    } else if (uri.getPath() == "/read-timeout") {
-      return readTimeoutBootstrap
-    }
-    return bootstrap
-  }
-
-  @Override
-  DefaultFullHttpRequest buildRequest(String method, URI uri, Map<String, String> headers) {
-    def target = uri.path
-    if (uri.query != null) {
-      target += "?" + uri.query
-    }
-    def request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.valueOf(method), target, Unpooled.EMPTY_BUFFER)
-    request.headers().set(HttpHeaderNames.HOST, uri.host + ":" + uri.port)
-    headers.each { k, v -> request.headers().set(k, v) }
-    return request
-  }
-
-  @Override
-  int sendRequest(DefaultFullHttpRequest request, String method, URI uri, Map<String, String> headers) {
-    def channel = getBootstrap(uri).connect(uri.host, getPort(uri)).sync().channel()
-    def result = new CompletableFuture<Integer>()
-    channel.pipeline().addLast(new ClientHandler(result))
-    channel.writeAndFlush(request).get()
-    return result.get(20, TimeUnit.SECONDS)
+  def setupSpec() {
+    server = new HttpClientTestServer(openTelemetry)
+    server.start()
   }
 
-  @Override
-  void sendRequestWithCallback(DefaultFullHttpRequest request, String method, URI uri, Map<String, String> headers, AbstractHttpClientTest.RequestResult requestResult) {
-    Channel ch
-    try {
-      ch = getBootstrap(uri).connect(uri.host, getPort(uri)).sync().channel()
-    } catch (Exception exception) {
-      requestResult.complete(exception)
-      return
-    }
-    def result = new CompletableFuture<Integer>()
-    result.whenComplete { status, throwable ->
-      requestResult.complete({ status }, throwable)
-    }
-    ch.pipeline().addLast(new ClientHandler(result))
-    ch.writeAndFlush(request)
-  }
-
-  @Override
-  String expectedClientSpanName(URI uri, String method) {
-    switch (uri.toString()) {
-      case "http://localhost:61/": // unopened port
-      case "https://192.0.2.1/": // non routable address
-        return "CONNECT"
-      default:
-        return super.expectedClientSpanName(uri, method)
-    }
-  }
-
-  @Override
-  Set<AttributeKey<?>> httpAttributes(URI uri) {
-    switch (uri.toString()) {
-      case "http://localhost:61/": // unopened port
-      case "https://192.0.2.1/": // non routable address
-        return []
-    }
-    def attributes = super.httpAttributes(uri)
-    attributes.remove(SemanticAttributes.NET_PEER_NAME)
-    attributes.remove(SemanticAttributes.NET_PEER_PORT)
-    return attributes
-  }
-
-  @Override
-  boolean testRedirects() {
-    false
-  }
-
-  @Override
-  boolean testReadTimeout() {
-    true
+  def cleanupSpec() {
+    server.stop()
   }
 
   def "test connection reuse and second request with lazy execute"() {
@@ -204,8 +81,14 @@ class Netty41ClientTest extends HttpClientTest<DefaultFullHttpRequest> implement
           kind SpanKind.INTERNAL
           hasNoParent()
         }
-        clientSpan(it, 1, span(0))
-        serverSpan(it, 2, span(1))
+        span(1) {
+          kind SpanKind.CLIENT
+          childOf span(0)
+        }
+        span(2) {
+          kind SpanKind.SERVER
+          childOf span(1)
+        }
       }
     }
 
@@ -223,8 +106,14 @@ class Netty41ClientTest extends HttpClientTest<DefaultFullHttpRequest> implement
           kind SpanKind.INTERNAL
           hasNoParent()
         }
-        clientSpan(it, 1, span(0))
-        serverSpan(it, 2, span(1))
+        span(1) {
+          kind SpanKind.CLIENT
+          childOf span(0)
+        }
+        span(2) {
+          kind SpanKind.SERVER
+          childOf span(1)
+        }
       }
       trace(1, 3) {
         span(0) {
@@ -232,8 +121,14 @@ class Netty41ClientTest extends HttpClientTest<DefaultFullHttpRequest> implement
           kind SpanKind.INTERNAL
           hasNoParent()
         }
-        clientSpan(it, 1, span(0))
-        serverSpan(it, 2, span(1))
+        span(1) {
+          kind SpanKind.CLIENT
+          childOf span(0)
+        }
+        span(2) {
+          kind SpanKind.SERVER
+          childOf span(1)
+        }
       }
     }
 
@@ -339,20 +234,47 @@ class Netty41ClientTest extends HttpClientTest<DefaultFullHttpRequest> implement
           attributes {
           }
         }
-        clientSpan(it, 2, span(1), method)
-        serverSpan(it, 3, span(2))
+        span(2) {
+          childOf span(1)
+          kind SpanKind.CLIENT
+        }
+        span(3) {
+          childOf span(2)
+          kind SpanKind.SERVER
+        }
       }
     }
 
     where:
-    method << BODY_METHODS
+    method << ["POST", "PUT"]
   }
 
   class TracedClass {
+    private final Bootstrap bootstrap
+
+    private TracedClass() {
+      EventLoopGroup group = new NioEventLoopGroup()
+      bootstrap = new Bootstrap()
+      bootstrap.group(group)
+        .channel(NioSocketChannel)
+        .handler(new ChannelInitializer<SocketChannel>() {
+          @Override
+          protected void initChannel(SocketChannel socketChannel) throws Exception {
+            ChannelPipeline pipeline = socketChannel.pipeline()
+            pipeline.addLast(new HttpClientCodec())
+          }
+        })
+    }
+
     int tracedMethod(String method) {
-      def uri = resolveAddress("/success")
       runWithSpan("tracedMethod") {
-        doRequest(method, uri)
+        def request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.valueOf(method), "/success", Unpooled.EMPTY_BUFFER)
+        request.headers().set(HttpHeaderNames.HOST, "localhost:" + server.httpPort())
+        def ch = bootstrap.connect("localhost", server.httpPort()).sync().channel()
+        def result = new CompletableFuture<Integer>()
+        ch.pipeline().addLast(new ClientHandler(result))
+        ch.writeAndFlush(request).get()
+        return result.get(20, TimeUnit.SECONDS)
       }
     }
   }
@@ -392,9 +314,4 @@ class Netty41ClientTest extends HttpClientTest<DefaultFullHttpRequest> implement
       ch.pipeline().addLast("added_in_initializer", new HttpClientCodec())
     }
   }
-
-  @Override
-  SingleConnection createSingleConnection(String host, int port) {
-    return new SingleNettyConnection(host, port)
-  }
 }

+ 1 - 0
instrumentation/netty/netty-4.1/javaagent/src/test/groovy/Netty41ConnectionSpanTest.groovy

@@ -16,6 +16,7 @@ import io.netty.handler.codec.http.HttpClientCodec
 import io.netty.handler.codec.http.HttpHeaderNames
 import io.netty.handler.codec.http.HttpMethod
 import io.netty.handler.codec.http.HttpVersion
+import io.opentelemetry.instrumentation.netty.v4_1.ClientHandler
 import io.opentelemetry.instrumentation.test.AgentTestTrait
 import io.opentelemetry.instrumentation.test.InstrumentationSpecification
 import io.opentelemetry.instrumentation.test.utils.PortUtils

+ 0 - 57
instrumentation/netty/netty-4.1/javaagent/src/test/groovy/Netty41NativeClientTest.groovy

@@ -1,57 +0,0 @@
-/*
- * Copyright The OpenTelemetry Authors
- * SPDX-License-Identifier: Apache-2.0
- */
-
-import io.netty.channel.Channel
-import io.netty.channel.EventLoopGroup
-import io.netty.channel.epoll.Epoll
-import io.netty.channel.epoll.EpollEventLoopGroup
-import io.netty.channel.epoll.EpollSocketChannel
-import io.netty.channel.kqueue.KQueue
-import io.netty.channel.kqueue.KQueueEventLoopGroup
-import io.netty.channel.kqueue.KQueueSocketChannel
-import org.junit.jupiter.api.Assumptions
-
-// netty client test with epoll/kqueue native library
-class Netty41NativeClientTest extends Netty41ClientTest {
-
-  EventLoopGroup buildEventLoopGroup() {
-    // linux
-    if (Epoll.isAvailable()) {
-      return new EpollEventLoopGroup()
-    }
-    // mac
-    if (KQueueHelper.isAvailable()) {
-      return new KQueueEventLoopGroup()
-    }
-
-    // skip test when native library was not found
-    Assumptions.assumeTrue(false, "Native library was not found")
-    return super.buildEventLoopGroup()
-  }
-
-  @Override
-  Class<Channel> getChannelClass() {
-    if (Epoll.isAvailable()) {
-      return EpollSocketChannel
-    }
-    if (KQueueHelper.isAvailable()) {
-      return KQueueSocketChannel
-    }
-    return null
-  }
-
-  static class KQueueHelper {
-    static boolean isAvailable() {
-      try {
-        return KQueue.isAvailable()
-      } catch (NoClassDefFoundError error) {
-        // kqueue is available only in latest dep tests
-        // in regular tests we only have a compile time dependency because kqueue support was added
-        // after 4.1.0
-        return false
-      }
-    }
-  }
-}

+ 0 - 147
instrumentation/netty/netty-4.1/javaagent/src/test/groovy/Netty41ServerTest.groovy

@@ -1,147 +0,0 @@
-/*
- * Copyright The OpenTelemetry Authors
- * SPDX-License-Identifier: Apache-2.0
- */
-
-import io.netty.bootstrap.ServerBootstrap
-import io.netty.buffer.ByteBuf
-import io.netty.buffer.Unpooled
-import io.netty.channel.ChannelHandlerContext
-import io.netty.channel.ChannelInitializer
-import io.netty.channel.ChannelPipeline
-import io.netty.channel.EventLoopGroup
-import io.netty.channel.SimpleChannelInboundHandler
-import io.netty.channel.nio.NioEventLoopGroup
-import io.netty.channel.socket.nio.NioServerSocketChannel
-import io.netty.handler.codec.http.DefaultFullHttpResponse
-import io.netty.handler.codec.http.FullHttpResponse
-import io.netty.handler.codec.http.HttpHeaderNames
-import io.netty.handler.codec.http.HttpRequest
-import io.netty.handler.codec.http.HttpResponseStatus
-import io.netty.handler.codec.http.HttpServerCodec
-import io.netty.handler.codec.http.QueryStringDecoder
-import io.netty.handler.logging.LogLevel
-import io.netty.handler.logging.LoggingHandler
-import io.netty.util.CharsetUtil
-import io.opentelemetry.api.common.AttributeKey
-import io.opentelemetry.instrumentation.test.AgentTestTrait
-import io.opentelemetry.instrumentation.test.base.HttpServerTest
-import io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint
-import io.opentelemetry.semconv.trace.attributes.SemanticAttributes
-
-import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_LENGTH
-import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_TYPE
-import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR
-import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1
-import static io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint.CAPTURE_HEADERS
-import static io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint.ERROR
-import static io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint.EXCEPTION
-import static io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint.INDEXED_CHILD
-import static io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint.NOT_FOUND
-import static io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint.QUERY_PARAM
-import static io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint.REDIRECT
-import static io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint.SUCCESS
-
-class Netty41ServerTest extends HttpServerTest<EventLoopGroup> implements AgentTestTrait {
-
-  static final LoggingHandler LOGGING_HANDLER = new LoggingHandler(SERVER_LOGGER.name, LogLevel.DEBUG)
-
-  @Override
-  EventLoopGroup startServer(int port) {
-    def eventLoopGroup = new NioEventLoopGroup()
-
-    ServerBootstrap bootstrap = new ServerBootstrap()
-      .group(eventLoopGroup)
-      .handler(LOGGING_HANDLER)
-      .childHandler([
-        initChannel: { ch ->
-          ChannelPipeline pipeline = ch.pipeline()
-          pipeline.addFirst("logger", LOGGING_HANDLER)
-
-          def handlers = [new HttpServerCodec()]
-          handlers.each { pipeline.addLast(it) }
-          pipeline.addLast(new SimpleChannelInboundHandler() {
-
-            @Override
-            protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
-              if (msg instanceof HttpRequest) {
-                def request = msg as HttpRequest
-                def uri = URI.create(request.uri())
-                ServerEndpoint endpoint = ServerEndpoint.forPath(uri.path)
-                ctx.write controller(endpoint) {
-                  ByteBuf content = null
-                  FullHttpResponse response
-                  switch (endpoint) {
-                    case SUCCESS:
-                    case ERROR:
-                      content = Unpooled.copiedBuffer(endpoint.body, CharsetUtil.UTF_8)
-                      response = new DefaultFullHttpResponse(HTTP_1_1, HttpResponseStatus.valueOf(endpoint.status), content)
-                      break
-                    case INDEXED_CHILD:
-                      content = Unpooled.EMPTY_BUFFER
-                      endpoint.collectSpanAttributes { new QueryStringDecoder(uri).parameters().get(it).find() }
-                      response = new DefaultFullHttpResponse(HTTP_1_1, HttpResponseStatus.valueOf(endpoint.status), content)
-                      break
-                    case QUERY_PARAM:
-                      content = Unpooled.copiedBuffer(uri.query, CharsetUtil.UTF_8)
-                      response = new DefaultFullHttpResponse(HTTP_1_1, HttpResponseStatus.valueOf(endpoint.status), content)
-                      break
-                    case REDIRECT:
-                      content = Unpooled.EMPTY_BUFFER
-                      response = new DefaultFullHttpResponse(HTTP_1_1, HttpResponseStatus.valueOf(endpoint.status), content)
-                      response.headers().set(HttpHeaderNames.LOCATION, endpoint.body)
-                      break
-                    case CAPTURE_HEADERS:
-                      content = Unpooled.copiedBuffer(endpoint.body, CharsetUtil.UTF_8)
-                      response = new DefaultFullHttpResponse(HTTP_1_1, HttpResponseStatus.valueOf(endpoint.status), content)
-                      response.headers().set("X-Test-Response", request.headers().get("X-Test-Request"))
-                      break
-                    case EXCEPTION:
-                      throw new Exception(endpoint.body)
-                    default:
-                      content = Unpooled.copiedBuffer(NOT_FOUND.body, CharsetUtil.UTF_8)
-                      response = new DefaultFullHttpResponse(HTTP_1_1, HttpResponseStatus.valueOf(NOT_FOUND.status), content)
-                      break
-                  }
-                  response.headers().set(CONTENT_TYPE, "text/plain")
-                  if (content) {
-                    response.headers().set(CONTENT_LENGTH, content.readableBytes())
-                  }
-                  return response
-                }
-              }
-            }
-
-            @Override
-            void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
-              ByteBuf content = Unpooled.copiedBuffer(cause.message, CharsetUtil.UTF_8)
-              FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, INTERNAL_SERVER_ERROR, content)
-              response.headers().set(CONTENT_TYPE, "text/plain")
-              response.headers().set(CONTENT_LENGTH, content.readableBytes())
-              ctx.write(response)
-            }
-
-            @Override
-            void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
-              ctx.flush()
-            }
-          })
-        }
-      ] as ChannelInitializer).channel(NioServerSocketChannel)
-    bootstrap.bind(port).sync()
-
-    return eventLoopGroup
-  }
-
-  @Override
-  void stopServer(EventLoopGroup server) {
-    server?.shutdownGracefully()
-  }
-
-  @Override
-  Set<AttributeKey<?>> httpAttributes(ServerEndpoint endpoint) {
-    def attributes = super.httpAttributes(endpoint)
-    attributes.remove(SemanticAttributes.HTTP_ROUTE)
-    attributes
-  }
-}

+ 36 - 0
instrumentation/netty/netty-4.1/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/netty/v4_1/Netty41ClientTest.java

@@ -0,0 +1,36 @@
+/*
+ * Copyright The OpenTelemetry Authors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package io.opentelemetry.javaagent.instrumentation.netty.v4_1;
+
+import io.netty.channel.Channel;
+import io.opentelemetry.instrumentation.netty.v4_1.AbstractNetty41ClientTest;
+import io.opentelemetry.instrumentation.netty.v4_1.Netty41ClientExtension;
+import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
+import io.opentelemetry.instrumentation.testing.junit.http.HttpClientInstrumentationExtension;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+public class Netty41ClientTest extends AbstractNetty41ClientTest {
+
+  @RegisterExtension
+  static final InstrumentationExtension testing = HttpClientInstrumentationExtension.forAgent();
+
+  @RegisterExtension
+  static final Netty41ClientExtension clientExtension =
+      new Netty41ClientExtension(channelPipeline -> {});
+
+  @Override
+  protected Netty41ClientExtension clientExtension() {
+    return clientExtension;
+  }
+
+  @Override
+  protected void configureChannel(Channel channel) {}
+
+  @Override
+  protected boolean testReadTimeout() {
+    return true;
+  }
+}

+ 86 - 0
instrumentation/netty/netty-4.1/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/netty/v4_1/Netty41NativeClientTest.java

@@ -0,0 +1,86 @@
+/*
+ * Copyright The OpenTelemetry Authors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package io.opentelemetry.javaagent.instrumentation.netty.v4_1;
+
+import io.netty.channel.Channel;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.epoll.Epoll;
+import io.netty.channel.epoll.EpollEventLoopGroup;
+import io.netty.channel.epoll.EpollSocketChannel;
+import io.netty.channel.kqueue.KQueue;
+import io.netty.channel.kqueue.KQueueEventLoopGroup;
+import io.netty.channel.kqueue.KQueueSocketChannel;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.opentelemetry.instrumentation.netty.v4_1.AbstractNetty41ClientTest;
+import io.opentelemetry.instrumentation.netty.v4_1.Netty41ClientExtension;
+import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
+import io.opentelemetry.instrumentation.testing.junit.http.HttpClientInstrumentationExtension;
+import org.junit.jupiter.api.Assumptions;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+/** Netty client test with epoll/kqueue native library. */
+public class Netty41NativeClientTest extends AbstractNetty41ClientTest {
+
+  @RegisterExtension
+  static final InstrumentationExtension testing = HttpClientInstrumentationExtension.forAgent();
+
+  @RegisterExtension
+  static final Netty41ClientExtension clientExtension =
+      new Netty41ClientExtension(
+          channelPipeline -> {}, Netty41NativeClientTest::buildEventLoopGroup, getChannelClass());
+
+  private static EventLoopGroup buildEventLoopGroup() {
+    // linux
+    if (Epoll.isAvailable()) {
+      return new EpollEventLoopGroup();
+    }
+    // mac
+    if (KQueueHelper.isAvailable()) {
+      return new KQueueEventLoopGroup();
+    }
+    // skip test when native library was not found
+    Assumptions.assumeTrue(false, "Native library was not found");
+    return new NioEventLoopGroup();
+  }
+
+  @SuppressWarnings("AbbreviationAsWordInName")
+  private static class KQueueHelper {
+    static boolean isAvailable() {
+      try {
+        return KQueue.isAvailable();
+      } catch (NoClassDefFoundError error) {
+        // kqueue is available only in latest dep tests
+        // in regular tests we only have a compile time dependency because kqueue support was added
+        // after 4.1.0
+        return false;
+      }
+    }
+  }
+
+  private static Class<? extends Channel> getChannelClass() {
+    if (Epoll.isAvailable()) {
+      return EpollSocketChannel.class;
+    }
+    if (KQueueHelper.isAvailable()) {
+      return KQueueSocketChannel.class;
+    }
+    return NioSocketChannel.class;
+  }
+
+  @Override
+  protected Netty41ClientExtension clientExtension() {
+    return clientExtension;
+  }
+
+  @Override
+  protected void configureChannel(Channel channel) {}
+
+  @Override
+  protected boolean testReadTimeout() {
+    return true;
+  }
+}

+ 21 - 0
instrumentation/netty/netty-4.1/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/netty/v4_1/Netty41ServerTest.java

@@ -0,0 +1,21 @@
+/*
+ * Copyright The OpenTelemetry Authors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package io.opentelemetry.javaagent.instrumentation.netty.v4_1;
+
+import io.netty.channel.ChannelPipeline;
+import io.opentelemetry.instrumentation.netty.v4_1.AbstractNetty41ServerTest;
+import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
+import io.opentelemetry.instrumentation.testing.junit.http.HttpServerInstrumentationExtension;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+public class Netty41ServerTest extends AbstractNetty41ServerTest {
+
+  @RegisterExtension
+  static final InstrumentationExtension testing = HttpServerInstrumentationExtension.forAgent();
+
+  @Override
+  protected void configurePipeline(ChannelPipeline channelPipeline) {}
+}

+ 2 - 0
instrumentation/netty/netty-4.1/library/build.gradle.kts

@@ -6,4 +6,6 @@ dependencies {
   library("io.netty:netty-codec-http:4.1.0.Final")
   implementation(project(":instrumentation:netty:netty-4-common:library"))
   implementation(project(":instrumentation:netty:netty-common:library"))
+
+  testImplementation(project(":instrumentation:netty:netty-4.1:testing"))
 }

+ 11 - 0
instrumentation/netty/netty-4.1/library/src/main/java/io/opentelemetry/instrumentation/netty/v4_1/NettyClientTelemetry.java

@@ -5,13 +5,16 @@
 
 package io.opentelemetry.instrumentation.netty.v4_1;
 
+import io.netty.channel.Channel;
 import io.netty.channel.ChannelInboundHandlerAdapter;
 import io.netty.channel.ChannelOutboundHandlerAdapter;
 import io.netty.channel.CombinedChannelDuplexHandler;
 import io.netty.handler.codec.http.HttpResponse;
 import io.opentelemetry.api.OpenTelemetry;
+import io.opentelemetry.context.Context;
 import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
 import io.opentelemetry.instrumentation.netty.v4.common.HttpRequestAndChannel;
+import io.opentelemetry.instrumentation.netty.v4_1.internal.AttributeKeys;
 import io.opentelemetry.instrumentation.netty.v4_1.internal.client.HttpClientRequestTracingHandler;
 import io.opentelemetry.instrumentation.netty.v4_1.internal.client.HttpClientResponseTracingHandler;
 import io.opentelemetry.instrumentation.netty.v4_1.internal.client.HttpClientTracingHandler;
@@ -63,4 +66,12 @@ public final class NettyClientTelemetry {
       createCombinedHandler() {
     return new HttpClientTracingHandler(instrumenter);
   }
+
+  /**
+   * Propagate the {@link Context} to the {@link Channel}. This MUST be called before each HTTP
+   * request executed on a {@link Channel}.
+   */
+  public static void setChannelContext(Channel channel, Context context) {
+    channel.attr(AttributeKeys.WRITE_CONTEXT).compareAndSet(null, context);
+  }
 }

+ 68 - 0
instrumentation/netty/netty-4.1/library/src/test/java/io/opentelemetry/instrumentation/netty/v4_1/Netty41ClientTest.java

@@ -0,0 +1,68 @@
+/*
+ * Copyright The OpenTelemetry Authors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package io.opentelemetry.instrumentation.netty.v4_1;
+
+import io.netty.channel.Channel;
+import io.opentelemetry.context.Context;
+import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
+import io.opentelemetry.instrumentation.testing.junit.http.HttpClientInstrumentationExtension;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+public class Netty41ClientTest extends AbstractNetty41ClientTest {
+
+  @RegisterExtension
+  static final InstrumentationExtension testing = HttpClientInstrumentationExtension.forLibrary();
+
+  @RegisterExtension
+  static final Netty41ClientExtension clientExtension =
+      new Netty41ClientExtension(
+          channelPipeline ->
+              channelPipeline.addLast(
+                  NettyClientTelemetry.builder(testing.getOpenTelemetry())
+                      .build()
+                      .createCombinedHandler()));
+
+  @Override
+  protected Netty41ClientExtension clientExtension() {
+    return clientExtension;
+  }
+
+  @Override
+  protected void configureChannel(Channel channel) {
+    // Current context must be propagated to the channel
+    NettyClientTelemetry.setChannelContext(channel, Context.current());
+  }
+
+  @Override
+  protected boolean testReadTimeout() {
+    return false;
+  }
+
+  @Override
+  protected boolean testErrorWithCallback() {
+    return false;
+  }
+
+  @Override
+  protected boolean testCallbackWithParent() {
+    return false;
+  }
+
+  @Override
+  protected boolean testWithClientParent() {
+    return false;
+  }
+
+  @Override
+  protected boolean testConnectionFailure() {
+    return false;
+  }
+
+  @Override
+  protected boolean testRemoteConnection() {
+    return false;
+  }
+}

+ 34 - 0
instrumentation/netty/netty-4.1/library/src/test/java/io/opentelemetry/instrumentation/netty/v4_1/Netty41ServerTest.java

@@ -0,0 +1,34 @@
+/*
+ * Copyright The OpenTelemetry Authors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package io.opentelemetry.instrumentation.netty.v4_1;
+
+import io.netty.channel.ChannelPipeline;
+import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
+import io.opentelemetry.instrumentation.testing.junit.http.AbstractHttpServerTest;
+import io.opentelemetry.instrumentation.testing.junit.http.HttpServerInstrumentationExtension;
+import io.opentelemetry.testing.internal.io.netty.handler.codec.http.HttpServerCodec;
+import java.util.Collections;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+public class Netty41ServerTest extends AbstractNetty41ServerTest {
+
+  @RegisterExtension
+  static final InstrumentationExtension testing = HttpServerInstrumentationExtension.forLibrary();
+
+  @Override
+  protected void configurePipeline(ChannelPipeline channelPipeline) {
+    channelPipeline.addAfter(
+        HttpServerCodec.class.getSimpleName() + "#0",
+        NettyServerTelemetry.class.getName(),
+        NettyServerTelemetry.builder(testing.getOpenTelemetry())
+            .setCapturedRequestHeaders(
+                Collections.singletonList(AbstractHttpServerTest.TEST_REQUEST_HEADER))
+            .setCapturedResponseHeaders(
+                Collections.singletonList(AbstractHttpServerTest.TEST_RESPONSE_HEADER))
+            .build()
+            .createCombinedHandler());
+  }
+}

+ 9 - 0
instrumentation/netty/netty-4.1/testing/build.gradle.kts

@@ -0,0 +1,9 @@
+plugins {
+  id("otel.java-conventions")
+}
+
+dependencies {
+  api(project(":testing-common"))
+
+  api("io.netty:netty-codec-http:4.1.0.Final")
+}

+ 128 - 0
instrumentation/netty/netty-4.1/testing/src/main/java/io/opentelemetry/instrumentation/netty/v4_1/AbstractNetty41ClientTest.java

@@ -0,0 +1,128 @@
+/*
+ * Copyright The OpenTelemetry Authors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package io.opentelemetry.instrumentation.netty.v4_1;
+
+import static io.opentelemetry.instrumentation.test.base.HttpClientTest.getPort;
+
+import io.netty.buffer.Unpooled;
+import io.netty.channel.Channel;
+import io.netty.handler.codec.http.DefaultFullHttpRequest;
+import io.netty.handler.codec.http.HttpHeaderNames;
+import io.netty.handler.codec.http.HttpMethod;
+import io.netty.handler.codec.http.HttpVersion;
+import io.opentelemetry.api.common.AttributeKey;
+import io.opentelemetry.instrumentation.testing.junit.http.AbstractHttpClientTest;
+import io.opentelemetry.instrumentation.testing.junit.http.SingleConnection;
+import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
+import java.net.URI;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+public abstract class AbstractNetty41ClientTest
+    extends AbstractHttpClientTest<DefaultFullHttpRequest> {
+
+  protected abstract Netty41ClientExtension clientExtension();
+
+  protected abstract void configureChannel(Channel channel);
+
+  @Override
+  protected boolean testRedirects() {
+    return false;
+  }
+
+  @Override
+  protected DefaultFullHttpRequest buildRequest(
+      String method, URI uri, Map<String, String> headers) {
+    String target = uri.getPath();
+    if (uri.getQuery() != null) {
+      target += "?" + uri.getQuery();
+    }
+    DefaultFullHttpRequest request =
+        new DefaultFullHttpRequest(
+            HttpVersion.HTTP_1_1, HttpMethod.valueOf(method), target, Unpooled.EMPTY_BUFFER);
+    request.headers().set(HttpHeaderNames.HOST, uri.getHost() + ":" + uri.getPort());
+    headers.forEach((k, v) -> request.headers().set(k, v));
+    return request;
+  }
+
+  @Override
+  protected int sendRequest(
+      DefaultFullHttpRequest defaultFullHttpRequest,
+      String method,
+      URI uri,
+      Map<String, String> headers)
+      throws InterruptedException, ExecutionException, TimeoutException {
+    Channel channel =
+        clientExtension().getBootstrap(uri).connect(uri.getHost(), getPort(uri)).sync().channel();
+    configureChannel(channel);
+    CompletableFuture<Integer> result = new CompletableFuture<>();
+    channel.pipeline().addLast(new ClientHandler(result));
+    channel.writeAndFlush(defaultFullHttpRequest).get();
+    return result.get(20, TimeUnit.SECONDS);
+  }
+
+  @Override
+  @SuppressWarnings(
+      "CatchingUnchecked") // Checked exception is thrown when connecting to unopened port
+  protected void sendRequestWithCallback(
+      DefaultFullHttpRequest defaultFullHttpRequest,
+      String method,
+      URI uri,
+      Map<String, String> headers,
+      RequestResult requestResult) {
+    Channel ch;
+    try {
+      ch =
+          clientExtension().getBootstrap(uri).connect(uri.getHost(), getPort(uri)).sync().channel();
+    } catch (InterruptedException exception) {
+      Thread.currentThread().interrupt();
+      return;
+    } catch (Exception exception) {
+      requestResult.complete(exception);
+      return;
+    }
+    configureChannel(ch);
+    CompletableFuture<Integer> result = new CompletableFuture<>();
+    result.whenComplete((status, throwable) -> requestResult.complete(() -> status, throwable));
+    ch.pipeline().addLast(new ClientHandler(result));
+    ch.writeAndFlush(defaultFullHttpRequest);
+  }
+
+  @Override
+  protected String expectedClientSpanName(URI uri, String method) {
+    switch (uri.toString()) {
+      case "http://localhost:61/": // unopened port
+      case "https://192.0.2.1/": // non routable address
+        return "CONNECT";
+      default:
+        return super.expectedClientSpanName(uri, method);
+    }
+  }
+
+  @Override
+  protected Set<AttributeKey<?>> httpAttributes(URI uri) {
+    String uriString = uri.toString();
+    // http://localhost:61/ => unopened port, https://192.0.2.1/ => non routable address
+    if ("http://localhost:61/".equals(uriString) || "https://192.0.2.1/".equals(uriString)) {
+      return Collections.emptySet();
+    }
+    Set<AttributeKey<?>> attributes = super.httpAttributes(uri);
+    attributes.remove(SemanticAttributes.NET_PEER_NAME);
+    attributes.remove(SemanticAttributes.NET_PEER_PORT);
+    return attributes;
+  }
+
+  @Override
+  protected SingleConnection createSingleConnection(String host, int port) {
+    return new SingleNettyConnection(
+        clientExtension().buildBootstrap(false, false), host, port, this::configureChannel);
+  }
+}

+ 176 - 0
instrumentation/netty/netty-4.1/testing/src/main/java/io/opentelemetry/instrumentation/netty/v4_1/AbstractNetty41ServerTest.java

@@ -0,0 +1,176 @@
+/*
+ * Copyright The OpenTelemetry Authors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package io.opentelemetry.instrumentation.netty.v4_1;
+
+import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_LENGTH;
+import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_TYPE;
+import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
+import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
+import static io.opentelemetry.instrumentation.testing.junit.http.HttpServerTestOptions.DEFAULT_HTTP_ATTRIBUTES;
+import static io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint.NOT_FOUND;
+
+import com.google.common.collect.Sets;
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.handler.codec.http.DefaultFullHttpResponse;
+import io.netty.handler.codec.http.FullHttpResponse;
+import io.netty.handler.codec.http.HttpHeaderNames;
+import io.netty.handler.codec.http.HttpRequest;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.netty.handler.codec.http.HttpServerCodec;
+import io.netty.handler.codec.http.QueryStringDecoder;
+import io.netty.handler.logging.LogLevel;
+import io.netty.handler.logging.LoggingHandler;
+import io.netty.util.CharsetUtil;
+import io.opentelemetry.instrumentation.testing.junit.http.AbstractHttpServerTest;
+import io.opentelemetry.instrumentation.testing.junit.http.HttpServerTestOptions;
+import io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint;
+import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
+import java.net.URI;
+import java.util.Collections;
+
+public abstract class AbstractNetty41ServerTest extends AbstractHttpServerTest<EventLoopGroup> {
+
+  static final LoggingHandler LOGGING_HANDLER =
+      new LoggingHandler(AbstractNetty41ServerTest.class, LogLevel.DEBUG);
+
+  protected abstract void configurePipeline(ChannelPipeline channelPipeline);
+
+  @Override
+  protected void configure(HttpServerTestOptions options) {
+    options.setTestException(false);
+    options.setHttpAttributes(
+        unused ->
+            Sets.difference(
+                DEFAULT_HTTP_ATTRIBUTES, Collections.singleton(SemanticAttributes.HTTP_ROUTE)));
+  }
+
+  @Override
+  protected EventLoopGroup setupServer() {
+    NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup();
+
+    ServerBootstrap bootstrap =
+        new ServerBootstrap()
+            .group(eventLoopGroup)
+            .handler(LOGGING_HANDLER)
+            .childHandler(
+                new ChannelInitializer<SocketChannel>() {
+                  @Override
+                  protected void initChannel(SocketChannel socketChannel) {
+                    ChannelPipeline pipeline = socketChannel.pipeline();
+                    pipeline.addFirst("logger", LOGGING_HANDLER);
+                    pipeline.addLast(new HttpServerCodec());
+                    pipeline.addLast(new HttpHandler());
+                    configurePipeline(pipeline);
+                  }
+                })
+            .channel(NioServerSocketChannel.class);
+    try {
+      bootstrap.bind(port).sync();
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+    }
+
+    return eventLoopGroup;
+  }
+
+  @Override
+  protected void stopServer(EventLoopGroup server) {
+    server.shutdownGracefully();
+  }
+
+  private static class HttpHandler extends SimpleChannelInboundHandler<Object> {
+
+    @Override
+    protected void channelRead0(ChannelHandlerContext ctx, Object msg) {
+      if (msg instanceof HttpRequest) {
+        HttpRequest request = (HttpRequest) msg;
+        URI uri = URI.create(request.uri());
+        ServerEndpoint endpoint = ServerEndpoint.forPath(uri.getPath());
+        ctx.write(controller(endpoint, () -> handle(request, uri, endpoint)));
+      }
+    }
+
+    private static Object handle(HttpRequest request, URI uri, ServerEndpoint endpoint) {
+      ByteBuf content;
+      FullHttpResponse response;
+      switch (endpoint) {
+        case SUCCESS:
+        case ERROR:
+          content = Unpooled.copiedBuffer(endpoint.getBody(), CharsetUtil.UTF_8);
+          response =
+              new DefaultFullHttpResponse(
+                  HTTP_1_1, HttpResponseStatus.valueOf(endpoint.getStatus()), content);
+          break;
+        case INDEXED_CHILD:
+          content = Unpooled.EMPTY_BUFFER;
+          endpoint.collectSpanAttributes(
+              name ->
+                  new QueryStringDecoder(uri)
+                      .parameters().get(name).stream().findFirst().orElse(""));
+          response =
+              new DefaultFullHttpResponse(
+                  HTTP_1_1, HttpResponseStatus.valueOf(endpoint.getStatus()), content);
+          break;
+        case QUERY_PARAM:
+          content = Unpooled.copiedBuffer(uri.getQuery(), CharsetUtil.UTF_8);
+          response =
+              new DefaultFullHttpResponse(
+                  HTTP_1_1, HttpResponseStatus.valueOf(endpoint.getStatus()), content);
+          break;
+        case REDIRECT:
+          content = Unpooled.EMPTY_BUFFER;
+          response =
+              new DefaultFullHttpResponse(
+                  HTTP_1_1, HttpResponseStatus.valueOf(endpoint.getStatus()), content);
+          response.headers().set(HttpHeaderNames.LOCATION, endpoint.getBody());
+          break;
+        case CAPTURE_HEADERS:
+          content = Unpooled.copiedBuffer(endpoint.getBody(), CharsetUtil.UTF_8);
+          response =
+              new DefaultFullHttpResponse(
+                  HTTP_1_1, HttpResponseStatus.valueOf(endpoint.getStatus()), content);
+          response.headers().set("X-Test-Response", request.headers().get("X-Test-Request"));
+          break;
+        case EXCEPTION:
+          throw new IllegalStateException(endpoint.getBody());
+        default:
+          content = Unpooled.copiedBuffer(NOT_FOUND.getBody(), CharsetUtil.UTF_8);
+          response =
+              new DefaultFullHttpResponse(
+                  HTTP_1_1, HttpResponseStatus.valueOf(NOT_FOUND.getStatus()), content);
+          break;
+      }
+      response.headers().set(CONTENT_TYPE, "text/plain");
+      response.headers().set(CONTENT_LENGTH, content.readableBytes());
+      return response;
+    }
+
+    @Override
+    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
+      ByteBuf content = Unpooled.copiedBuffer(cause.getMessage(), CharsetUtil.UTF_8);
+      FullHttpResponse response =
+          new DefaultFullHttpResponse(HTTP_1_1, INTERNAL_SERVER_ERROR, content);
+      response.headers().set(CONTENT_TYPE, "text/plain");
+      response.headers().set(CONTENT_LENGTH, content.readableBytes());
+      ctx.write(response);
+    }
+
+    @Override
+    public void channelReadComplete(ChannelHandlerContext ctx) {
+      ctx.flush();
+    }
+  }
+}

+ 4 - 2
instrumentation/netty/netty-4.1/javaagent/src/test/groovy/ClientHandler.java → instrumentation/netty/netty-4.1/testing/src/main/java/io/opentelemetry/instrumentation/netty/v4_1/ClientHandler.java

@@ -3,6 +3,8 @@
  * SPDX-License-Identifier: Apache-2.0
  */
 
+package io.opentelemetry.instrumentation.netty.v4_1;
+
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.SimpleChannelInboundHandler;
 import io.netty.handler.codec.http.FullHttpResponse;
@@ -33,13 +35,13 @@ public class ClientHandler extends SimpleChannelInboundHandler<HttpObject> {
     if (msg instanceof FullHttpResponse) {
       ctx.pipeline().remove(this);
       FullHttpResponse response = (FullHttpResponse) msg;
-      responseCode.complete(response.getStatus().code());
+      responseCode.complete(response.status().code());
     } else if (msg instanceof HttpResponse) {
       // Headers before body have been received, store them to use when finishing the span.
       ctx.channel().attr(HTTP_RESPONSE).set((HttpResponse) msg);
     } else if (msg instanceof LastHttpContent) {
       ctx.pipeline().remove(this);
-      responseCode.complete(ctx.channel().attr(HTTP_RESPONSE).get().getStatus().code());
+      responseCode.complete(ctx.channel().attr(HTTP_RESPONSE).get().status().code());
     }
   }
 

+ 110 - 0
instrumentation/netty/netty-4.1/testing/src/main/java/io/opentelemetry/instrumentation/netty/v4_1/Netty41ClientExtension.java

@@ -0,0 +1,110 @@
+/*
+ * Copyright The OpenTelemetry Authors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package io.opentelemetry.instrumentation.netty.v4_1;
+
+import static io.opentelemetry.instrumentation.testing.junit.http.AbstractHttpClientTest.CONNECTION_TIMEOUT;
+import static io.opentelemetry.instrumentation.testing.junit.http.AbstractHttpClientTest.READ_TIMEOUT;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.handler.codec.http.HttpClientCodec;
+import io.netty.handler.ssl.SslContext;
+import io.netty.handler.ssl.SslContextBuilder;
+import io.netty.handler.timeout.ReadTimeoutHandler;
+import java.net.URI;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+import org.junit.jupiter.api.extension.AfterAllCallback;
+import org.junit.jupiter.api.extension.BeforeAllCallback;
+import org.junit.jupiter.api.extension.ExtensionContext;
+
+public class Netty41ClientExtension implements BeforeAllCallback, AfterAllCallback {
+
+  private final Consumer<ChannelPipeline> channelPipelineConfigurer;
+  private final Supplier<EventLoopGroup> eventLoopGroupSupplier;
+  private final Class<? extends Channel> channelClass;
+
+  private EventLoopGroup eventLoopGroup;
+  private Bootstrap httpBootstrap;
+  private Bootstrap httpsBootstrap;
+  private Bootstrap readTimeoutBootstrap;
+
+  public Netty41ClientExtension(Consumer<ChannelPipeline> channelPipelineConfigurer) {
+    this(channelPipelineConfigurer, NioEventLoopGroup::new, NioSocketChannel.class);
+  }
+
+  public Netty41ClientExtension(
+      Consumer<ChannelPipeline> channelPipelineConfigurer,
+      Supplier<EventLoopGroup> eventLoopGroupSupplier,
+      Class<? extends Channel> channelClass) {
+    this.channelPipelineConfigurer = channelPipelineConfigurer;
+    this.eventLoopGroupSupplier = eventLoopGroupSupplier;
+    this.channelClass = channelClass;
+  }
+
+  @Override
+  public void beforeAll(ExtensionContext context) {
+    eventLoopGroup = eventLoopGroupSupplier.get();
+    httpBootstrap = buildBootstrap(eventLoopGroup, false, false);
+    httpsBootstrap = buildBootstrap(eventLoopGroup, true, false);
+    readTimeoutBootstrap = buildBootstrap(eventLoopGroup, false, true);
+  }
+
+  public Bootstrap buildBootstrap(boolean https, boolean readTimeout) {
+    return buildBootstrap(eventLoopGroupSupplier.get(), https, readTimeout);
+  }
+
+  private Bootstrap buildBootstrap(
+      EventLoopGroup eventLoopGroup, boolean https, boolean readTimeout) {
+    Bootstrap bootstrap = new Bootstrap();
+    bootstrap
+        .group(eventLoopGroup)
+        .channel(channelClass)
+        .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, (int) CONNECTION_TIMEOUT.toMillis())
+        .handler(
+            new ChannelInitializer<SocketChannel>() {
+              @Override
+              protected void initChannel(SocketChannel socketChannel) throws Exception {
+                ChannelPipeline pipeline = socketChannel.pipeline();
+                if (https) {
+                  SslContext sslContext = SslContextBuilder.forClient().build();
+                  pipeline.addLast(sslContext.newHandler(socketChannel.alloc()));
+                }
+                if (readTimeout) {
+                  pipeline.addLast(
+                      new ReadTimeoutHandler(READ_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS));
+                }
+                pipeline.addLast(new HttpClientCodec());
+                channelPipelineConfigurer.accept(pipeline);
+              }
+            });
+    return bootstrap;
+  }
+
+  @Override
+  public void afterAll(ExtensionContext context) throws InterruptedException {
+    if (eventLoopGroup != null) {
+      eventLoopGroup.shutdownGracefully().await(10, TimeUnit.SECONDS);
+    }
+  }
+
+  public Bootstrap getBootstrap(URI uri) {
+    if ("https".equals(uri.getScheme())) {
+      return httpsBootstrap;
+    } else if ("/read-timeout".equals(uri.getPath())) {
+      return readTimeoutBootstrap;
+    }
+    return httpBootstrap;
+  }
+}

+ 8 - 23
instrumentation/netty/netty-4.1/javaagent/src/test/groovy/SingleNettyConnection.java → instrumentation/netty/netty-4.1/testing/src/main/java/io/opentelemetry/instrumentation/netty/v4_1/SingleNettyConnection.java

@@ -3,19 +3,13 @@
  * SPDX-License-Identifier: Apache-2.0
  */
 
+package io.opentelemetry.instrumentation.netty.v4_1;
+
 import io.netty.bootstrap.Bootstrap;
 import io.netty.buffer.Unpooled;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelInitializer;
-import io.netty.channel.ChannelOption;
-import io.netty.channel.ChannelPipeline;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.nio.NioEventLoopGroup;
-import io.netty.channel.socket.SocketChannel;
-import io.netty.channel.socket.nio.NioSocketChannel;
 import io.netty.handler.codec.http.DefaultFullHttpRequest;
-import io.netty.handler.codec.http.HttpClientCodec;
 import io.netty.handler.codec.http.HttpHeaderNames;
 import io.netty.handler.codec.http.HttpMethod;
 import io.netty.handler.codec.http.HttpRequest;
@@ -26,6 +20,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.function.Consumer;
 
 /*
 Netty does not actually support proper http pipelining and has no way to correlate incoming response
@@ -37,25 +32,14 @@ does not wreak havoc on Netty channel.
 public class SingleNettyConnection implements SingleConnection {
   private final String host;
   private final int port;
+  private final Consumer<Channel> channelConsumer;
   private final Channel channel;
 
-  public SingleNettyConnection(String host, int port) {
+  public SingleNettyConnection(
+      Bootstrap bootstrap, String host, int port, Consumer<Channel> channelConsumer) {
     this.host = host;
     this.port = port;
-    EventLoopGroup group = new NioEventLoopGroup();
-    Bootstrap bootstrap = new Bootstrap();
-    bootstrap
-        .group(group)
-        .channel(NioSocketChannel.class)
-        .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000)
-        .handler(
-            new ChannelInitializer<SocketChannel>() {
-              @Override
-              protected void initChannel(SocketChannel socketChannel) {
-                ChannelPipeline pipeline = socketChannel.pipeline();
-                pipeline.addLast(new HttpClientCodec());
-              }
-            });
+    this.channelConsumer = channelConsumer;
 
     ChannelFuture channelFuture = bootstrap.connect(host, port);
     channelFuture.awaitUninterruptibly();
@@ -70,6 +54,7 @@ public class SingleNettyConnection implements SingleConnection {
   public synchronized int doRequest(String path, Map<String, String> headers)
       throws ExecutionException, InterruptedException, TimeoutException {
     CompletableFuture<Integer> result = new CompletableFuture<>();
+    channelConsumer.accept(channel);
 
     channel.pipeline().addLast(new ClientHandler(result));
 

+ 2 - 2
instrumentation/reactor/reactor-netty/reactor-netty-0.9/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v0_9/OnRequest.java

@@ -6,7 +6,7 @@
 package io.opentelemetry.javaagent.instrumentation.reactornetty.v0_9;
 
 import io.opentelemetry.context.Context;
-import io.opentelemetry.instrumentation.netty.v4_1.internal.AttributeKeys;
+import io.opentelemetry.instrumentation.netty.v4_1.NettyClientTelemetry;
 import java.util.function.BiConsumer;
 import reactor.netty.Connection;
 import reactor.netty.http.client.HttpClientRequest;
@@ -15,6 +15,6 @@ public class OnRequest implements BiConsumer<HttpClientRequest, Connection> {
   @Override
   public void accept(HttpClientRequest r, Connection c) {
     Context context = r.currentContext().get(MapConnect.CONTEXT_ATTRIBUTE);
-    c.channel().attr(AttributeKeys.WRITE_CONTEXT).set(context);
+    NettyClientTelemetry.setChannelContext(c.channel(), context);
   }
 }

+ 2 - 2
instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/HttpResponseReceiverInstrumenter.java

@@ -11,7 +11,7 @@ import static io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0.React
 
 import io.opentelemetry.api.GlobalOpenTelemetry;
 import io.opentelemetry.context.Context;
-import io.opentelemetry.instrumentation.netty.v4_1.internal.AttributeKeys;
+import io.opentelemetry.instrumentation.netty.v4_1.NettyClientTelemetry;
 import io.opentelemetry.instrumentation.reactor.ContextPropagationOperator;
 import java.util.function.BiConsumer;
 import java.util.function.Function;
@@ -117,7 +117,7 @@ public final class HttpResponseReceiverInstrumenter {
       // if this span was suppressed and context is null, propagate parentContext - this will allow
       // netty spans to be suppressed too
       Context nettyParentContext = context == null ? contextHolder.parentContext : context;
-      connection.channel().attr(AttributeKeys.WRITE_CONTEXT).set(nettyParentContext);
+      NettyClientTelemetry.setChannelContext(connection.channel(), nettyParentContext);
     }
   }
 

+ 1 - 0
settings.gradle.kts

@@ -352,6 +352,7 @@ include(":instrumentation:netty:netty-3.8:javaagent")
 include(":instrumentation:netty:netty-4.0:javaagent")
 include(":instrumentation:netty:netty-4.1:javaagent")
 include(":instrumentation:netty:netty-4.1:library")
+include(":instrumentation:netty:netty-4.1:testing")
 include(":instrumentation:netty:netty-4-common:javaagent")
 include(":instrumentation:netty:netty-4-common:library")
 include(":instrumentation:netty:netty-common:library")