|
@@ -36,17 +36,36 @@ import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
|
|
|
import io.opentelemetry.testing.internal.armeria.common.AggregatedHttpRequest;
|
|
|
import io.opentelemetry.testing.internal.armeria.common.AggregatedHttpResponse;
|
|
|
import io.opentelemetry.testing.internal.armeria.common.HttpData;
|
|
|
+import io.opentelemetry.testing.internal.armeria.common.HttpHeaderNames;
|
|
|
import io.opentelemetry.testing.internal.armeria.common.HttpMethod;
|
|
|
import io.opentelemetry.testing.internal.armeria.common.HttpRequest;
|
|
|
import io.opentelemetry.testing.internal.armeria.common.HttpRequestBuilder;
|
|
|
import io.opentelemetry.testing.internal.armeria.common.MediaType;
|
|
|
import io.opentelemetry.testing.internal.armeria.common.QueryParams;
|
|
|
import io.opentelemetry.testing.internal.armeria.common.RequestHeaders;
|
|
|
+import io.opentelemetry.testing.internal.io.netty.bootstrap.Bootstrap;
|
|
|
+import io.opentelemetry.testing.internal.io.netty.buffer.Unpooled;
|
|
|
+import io.opentelemetry.testing.internal.io.netty.channel.Channel;
|
|
|
+import io.opentelemetry.testing.internal.io.netty.channel.ChannelHandlerContext;
|
|
|
+import io.opentelemetry.testing.internal.io.netty.channel.ChannelInitializer;
|
|
|
+import io.opentelemetry.testing.internal.io.netty.channel.ChannelOption;
|
|
|
+import io.opentelemetry.testing.internal.io.netty.channel.ChannelPipeline;
|
|
|
+import io.opentelemetry.testing.internal.io.netty.channel.EventLoopGroup;
|
|
|
+import io.opentelemetry.testing.internal.io.netty.channel.SimpleChannelInboundHandler;
|
|
|
+import io.opentelemetry.testing.internal.io.netty.channel.nio.NioEventLoopGroup;
|
|
|
+import io.opentelemetry.testing.internal.io.netty.channel.socket.SocketChannel;
|
|
|
+import io.opentelemetry.testing.internal.io.netty.channel.socket.nio.NioSocketChannel;
|
|
|
+import io.opentelemetry.testing.internal.io.netty.handler.codec.http.DefaultFullHttpRequest;
|
|
|
+import io.opentelemetry.testing.internal.io.netty.handler.codec.http.DefaultHttpResponse;
|
|
|
+import io.opentelemetry.testing.internal.io.netty.handler.codec.http.HttpClientCodec;
|
|
|
+import io.opentelemetry.testing.internal.io.netty.handler.codec.http.HttpObject;
|
|
|
+import io.opentelemetry.testing.internal.io.netty.handler.codec.http.HttpVersion;
|
|
|
import java.net.URI;
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.List;
|
|
|
import java.util.Set;
|
|
|
import java.util.concurrent.CountDownLatch;
|
|
|
+import java.util.concurrent.TimeUnit;
|
|
|
import java.util.function.Consumer;
|
|
|
import java.util.function.Supplier;
|
|
|
import java.util.stream.Stream;
|
|
@@ -350,6 +369,88 @@ public abstract class AbstractHttpServerTest<SERVER> extends AbstractHttpServerU
|
|
|
assertHighConcurrency(count);
|
|
|
}
|
|
|
|
|
|
+ @Test
|
|
|
+ void httpPipelining() throws InterruptedException {
|
|
|
+ assumeTrue(options.testHttpPipelining);
|
|
|
+
|
|
|
+ int count = 10;
|
|
|
+ CountDownLatch countDownLatch = new CountDownLatch(count);
|
|
|
+ ServerEndpoint endpoint = INDEXED_CHILD;
|
|
|
+ TextMapPropagator propagator = GlobalOpenTelemetry.getPropagators().getTextMapPropagator();
|
|
|
+ TextMapSetter<DefaultFullHttpRequest> setter =
|
|
|
+ (request, key, value) -> request.headers().set(key, value);
|
|
|
+
|
|
|
+ EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
|
|
|
+ try {
|
|
|
+ Bootstrap bootstrap = buildBootstrap(eventLoopGroup);
|
|
|
+ Channel channel = bootstrap.connect(address.getHost(), port).sync().channel();
|
|
|
+ channel
|
|
|
+ .pipeline()
|
|
|
+ .addLast(
|
|
|
+ new SimpleChannelInboundHandler<HttpObject>() {
|
|
|
+
|
|
|
+ @Override
|
|
|
+ protected void channelRead0(
|
|
|
+ ChannelHandlerContext channelHandlerContext, HttpObject httpObject) {
|
|
|
+ if (httpObject instanceof DefaultHttpResponse) {
|
|
|
+ countDownLatch.countDown();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ });
|
|
|
+
|
|
|
+ for (int i = 0; i < count; i++) {
|
|
|
+ int index = i;
|
|
|
+ String target =
|
|
|
+ endpoint.resolvePath(address).getPath().toString()
|
|
|
+ + "?"
|
|
|
+ + ServerEndpoint.ID_PARAMETER_NAME
|
|
|
+ + "="
|
|
|
+ + index;
|
|
|
+
|
|
|
+ testing.runWithSpan(
|
|
|
+ "client " + index,
|
|
|
+ () -> {
|
|
|
+ Span.current().setAttribute(ServerEndpoint.ID_ATTRIBUTE_NAME, index);
|
|
|
+ DefaultFullHttpRequest request =
|
|
|
+ new DefaultFullHttpRequest(
|
|
|
+ HttpVersion.HTTP_1_1,
|
|
|
+ io.opentelemetry.testing.internal.io.netty.handler.codec.http.HttpMethod
|
|
|
+ .valueOf("GET"),
|
|
|
+ target,
|
|
|
+ Unpooled.EMPTY_BUFFER);
|
|
|
+ request.headers().set(HttpHeaderNames.HOST, address.getHost() + ":" + port);
|
|
|
+ request.headers().set(HttpHeaderNames.USER_AGENT, TEST_USER_AGENT);
|
|
|
+ request.headers().set(HttpHeaderNames.X_FORWARDED_FOR, TEST_CLIENT_IP);
|
|
|
+
|
|
|
+ propagator.inject(Context.current(), request, setter);
|
|
|
+ channel.writeAndFlush(request);
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
+ countDownLatch.await(30, TimeUnit.SECONDS);
|
|
|
+ assertHighConcurrency(count);
|
|
|
+ } finally {
|
|
|
+ eventLoopGroup.shutdownGracefully().await(10, TimeUnit.SECONDS);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private static Bootstrap buildBootstrap(EventLoopGroup eventLoopGroup) {
|
|
|
+ Bootstrap bootstrap = new Bootstrap();
|
|
|
+ bootstrap
|
|
|
+ .group(eventLoopGroup)
|
|
|
+ .channel(NioSocketChannel.class)
|
|
|
+ .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, (int) TimeUnit.SECONDS.toMillis(10))
|
|
|
+ .handler(
|
|
|
+ new ChannelInitializer<SocketChannel>() {
|
|
|
+ @Override
|
|
|
+ protected void initChannel(SocketChannel socketChannel) {
|
|
|
+ ChannelPipeline pipeline = socketChannel.pipeline();
|
|
|
+ pipeline.addLast(new HttpClientCodec());
|
|
|
+ }
|
|
|
+ });
|
|
|
+ return bootstrap;
|
|
|
+ }
|
|
|
+
|
|
|
protected void assertHighConcurrency(int count) {
|
|
|
ServerEndpoint endpoint = INDEXED_CHILD;
|
|
|
List<Consumer<TraceAssert>> assertions = new ArrayList<>();
|