Browse Source

Add process spans to aws2 sqs instrumentation (#9778)

Lauri Tulmin 1 year ago
parent
commit
4a9b965b2f
31 changed files with 1593 additions and 233 deletions
  1. 19 2
      instrumentation/aws-sdk/aws-sdk-2.2/javaagent/build.gradle.kts
  2. 39 0
      instrumentation/aws-sdk/aws-sdk-2.2/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/awssdk/v2_2/DefaultSqsAsyncClientBuilderInstrumentation.java
  3. 39 0
      instrumentation/aws-sdk/aws-sdk-2.2/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/awssdk/v2_2/DefaultSqsClientBuilderInstrumentation.java
  4. 11 0
      instrumentation/aws-sdk/aws-sdk-2.2/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/awssdk/v2_2/SqsInstrumentationModule.java
  5. 27 0
      instrumentation/aws-sdk/aws-sdk-2.2/javaagent/src/test/groovy/Aws2SqsSuppressReceiveSpansTest.groovy
  6. 12 0
      instrumentation/aws-sdk/aws-sdk-2.2/javaagent/src/test/groovy/Aws2SqsTracingTest.groovy
  7. 43 0
      instrumentation/aws-sdk/aws-sdk-2.2/library-autoconfigure/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/autoconfigure/AwsSdkSingletons.java
  8. 1 21
      instrumentation/aws-sdk/aws-sdk-2.2/library-autoconfigure/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/autoconfigure/TracingExecutionInterceptor.java
  9. 14 1
      instrumentation/aws-sdk/aws-sdk-2.2/library/README.md
  10. 1 0
      instrumentation/aws-sdk/aws-sdk-2.2/library/build.gradle.kts
  11. 86 11
      instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/AwsSdkInstrumenterFactory.java
  12. 40 8
      instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/AwsSdkTelemetry.java
  13. 16 4
      instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/AwsSdkTelemetryBuilder.java
  14. 5 2
      instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/SqsAccess.java
  15. 154 32
      instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/SqsImpl.java
  16. 20 0
      instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/SqsMessage.java
  17. 33 0
      instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/SqsMessageImpl.java
  18. 19 0
      instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/SqsParentContext.java
  19. 30 0
      instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/SqsProcessRequest.java
  20. 63 0
      instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/SqsProcessRequestAttributesGetter.java
  21. 37 0
      instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/SqsTracingContext.java
  22. 28 7
      instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/TracingExecutionInterceptor.java
  23. 99 0
      instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/TracingIterator.java
  24. 62 0
      instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/TracingList.java
  25. 109 0
      instrumentation/aws-sdk/aws-sdk-2.2/library/src/test/groovy/io/opentelemetry/instrumentation/awssdk/v2_2/Aws2SqsSuppressReceiveSpansTest.groovy
  26. 66 9
      instrumentation/aws-sdk/aws-sdk-2.2/library/src/test/groovy/io/opentelemetry/instrumentation/awssdk/v2_2/Aws2SqsTracingTest.groovy
  27. 0 33
      instrumentation/aws-sdk/aws-sdk-2.2/library/src/test/groovy/io/opentelemetry/instrumentation/awssdk/v2_2/Aws2SqsTracingTestWithW3CPropagator.groovy
  28. 0 28
      instrumentation/aws-sdk/aws-sdk-2.2/library/src/test/groovy/io/opentelemetry/instrumentation/awssdk/v2_2/Aws2SqsTracingTestWithW3CPropagatorAndXrayPropagator.groovy
  29. 2 0
      instrumentation/aws-sdk/aws-sdk-2.2/library/src/testCoreOnly/groovy/io/opentelemetry/instrumentation/awssdk/v2_2/Aws2ClientDynamodbTest.groovy
  30. 394 0
      instrumentation/aws-sdk/aws-sdk-2.2/testing/src/main/groovy/io/opentelemetry/instrumentation/awssdk/v2_2/AbstractAws2SqsSuppressReceiveSpansTest.groovy
  31. 124 75
      instrumentation/aws-sdk/aws-sdk-2.2/testing/src/main/groovy/io/opentelemetry/instrumentation/awssdk/v2_2/AbstractAws2SqsTracingTest.groovy

+ 19 - 2
instrumentation/aws-sdk/aws-sdk-2.2/javaagent/build.gradle.kts

@@ -108,13 +108,30 @@ testing {
 
 tasks {
   val testExperimentalSqs by registering(Test::class) {
-    group = "verification"
-
+    filter {
+      excludeTestsMatching("Aws2SqsSuppressReceiveSpansTest")
+    }
     systemProperty("otel.instrumentation.aws-sdk.experimental-use-propagator-for-messaging", "true")
+    systemProperty("otel.instrumentation.messaging.experimental.receive-telemetry.enabled", "true")
+  }
+
+  val testReceiveSpansDisabled by registering(Test::class) {
+    filter {
+      includeTestsMatching("Aws2SqsSuppressReceiveSpansTest")
+    }
+    include("**/Aws2SqsSuppressReceiveSpansTest.*")
+  }
+
+  test {
+    filter {
+      excludeTestsMatching("Aws2SqsSuppressReceiveSpansTest")
+    }
+    systemProperty("otel.instrumentation.messaging.experimental.receive-telemetry.enabled", "true")
   }
 
   check {
     dependsOn(testExperimentalSqs)
+    dependsOn(testReceiveSpansDisabled)
     dependsOn(testing.suites)
   }
 

+ 39 - 0
instrumentation/aws-sdk/aws-sdk-2.2/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/awssdk/v2_2/DefaultSqsAsyncClientBuilderInstrumentation.java

@@ -0,0 +1,39 @@
+/*
+ * Copyright The OpenTelemetry Authors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package io.opentelemetry.javaagent.instrumentation.awssdk.v2_2;
+
+import static net.bytebuddy.matcher.ElementMatchers.named;
+
+import io.opentelemetry.instrumentation.awssdk.v2_2.autoconfigure.AwsSdkSingletons;
+import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
+import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
+import net.bytebuddy.asm.Advice;
+import net.bytebuddy.description.type.TypeDescription;
+import net.bytebuddy.matcher.ElementMatcher;
+import software.amazon.awssdk.services.sqs.SqsAsyncClient;
+
+public class DefaultSqsAsyncClientBuilderInstrumentation implements TypeInstrumentation {
+
+  @Override
+  public ElementMatcher<TypeDescription> typeMatcher() {
+    return named("software.amazon.awssdk.services.sqs.DefaultSqsAsyncClientBuilder");
+  }
+
+  @Override
+  public void transform(TypeTransformer transformer) {
+    transformer.applyAdviceToMethod(
+        named("buildClient"), this.getClass().getName() + "$BuildClientAdvice");
+  }
+
+  @SuppressWarnings("unused")
+  public static class BuildClientAdvice {
+
+    @Advice.OnMethodExit(suppress = Throwable.class)
+    public static void methodExit(@Advice.Return(readOnly = false) SqsAsyncClient sqsClient) {
+      sqsClient = AwsSdkSingletons.telemetry().wrap(sqsClient);
+    }
+  }
+}

+ 39 - 0
instrumentation/aws-sdk/aws-sdk-2.2/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/awssdk/v2_2/DefaultSqsClientBuilderInstrumentation.java

@@ -0,0 +1,39 @@
+/*
+ * Copyright The OpenTelemetry Authors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package io.opentelemetry.javaagent.instrumentation.awssdk.v2_2;
+
+import static net.bytebuddy.matcher.ElementMatchers.named;
+
+import io.opentelemetry.instrumentation.awssdk.v2_2.autoconfigure.AwsSdkSingletons;
+import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
+import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
+import net.bytebuddy.asm.Advice;
+import net.bytebuddy.description.type.TypeDescription;
+import net.bytebuddy.matcher.ElementMatcher;
+import software.amazon.awssdk.services.sqs.SqsClient;
+
+public class DefaultSqsClientBuilderInstrumentation implements TypeInstrumentation {
+
+  @Override
+  public ElementMatcher<TypeDescription> typeMatcher() {
+    return named("software.amazon.awssdk.services.sqs.DefaultSqsClientBuilder");
+  }
+
+  @Override
+  public void transform(TypeTransformer transformer) {
+    transformer.applyAdviceToMethod(
+        named("buildClient"), this.getClass().getName() + "$BuildClientAdvice");
+  }
+
+  @SuppressWarnings("unused")
+  public static class BuildClientAdvice {
+
+    @Advice.OnMethodExit(suppress = Throwable.class)
+    public static void methodExit(@Advice.Return(readOnly = false) SqsClient sqsClient) {
+      sqsClient = AwsSdkSingletons.telemetry().wrap(sqsClient);
+    }
+  }
+}

+ 11 - 0
instrumentation/aws-sdk/aws-sdk-2.2/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/awssdk/v2_2/SqsInstrumentationModule.java

@@ -11,7 +11,10 @@ import static net.bytebuddy.matcher.ElementMatchers.none;
 import com.google.auto.service.AutoService;
 import io.opentelemetry.instrumentation.awssdk.v2_2.SqsAdviceBridge;
 import io.opentelemetry.javaagent.extension.instrumentation.InstrumentationModule;
+import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
 import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
+import java.util.ArrayList;
+import java.util.List;
 import net.bytebuddy.asm.Advice;
 import net.bytebuddy.matcher.ElementMatcher;
 
@@ -27,6 +30,14 @@ public class SqsInstrumentationModule extends AbstractAwsSdkInstrumentationModul
     return hasClassesNamed("software.amazon.awssdk.services.sqs.SqsClient");
   }
 
+  @Override
+  public List<TypeInstrumentation> typeInstrumentations() {
+    List<TypeInstrumentation> instrumentations = new ArrayList<>(super.typeInstrumentations());
+    instrumentations.add(new DefaultSqsClientBuilderInstrumentation());
+    instrumentations.add(new DefaultSqsAsyncClientBuilderInstrumentation());
+    return instrumentations;
+  }
+
   @Override
   public void doTransform(TypeTransformer transformer) {
     transformer.applyAdviceToMethod(

+ 27 - 0
instrumentation/aws-sdk/aws-sdk-2.2/javaagent/src/test/groovy/Aws2SqsSuppressReceiveSpansTest.groovy

@@ -0,0 +1,27 @@
+/*
+ * Copyright The OpenTelemetry Authors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+import io.opentelemetry.instrumentation.awssdk.v2_2.AbstractAws2SqsSuppressReceiveSpansTest
+import io.opentelemetry.instrumentation.test.AgentTestTrait
+import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration
+import software.amazon.awssdk.services.sqs.SqsAsyncClient
+import software.amazon.awssdk.services.sqs.SqsClient
+
+class Aws2SqsSuppressReceiveSpansTest extends AbstractAws2SqsSuppressReceiveSpansTest implements AgentTestTrait {
+  @Override
+  ClientOverrideConfiguration.Builder createOverrideConfigurationBuilder() {
+    return ClientOverrideConfiguration.builder()
+  }
+
+  @Override
+  SqsClient configureSqsClient(SqsClient sqsClient) {
+    return sqsClient
+  }
+
+  @Override
+  SqsAsyncClient configureSqsClient(SqsAsyncClient sqsClient) {
+    return sqsClient
+  }
+}

+ 12 - 0
instrumentation/aws-sdk/aws-sdk-2.2/javaagent/src/test/groovy/Aws2SqsTracingTest.groovy

@@ -6,10 +6,22 @@
 import io.opentelemetry.instrumentation.awssdk.v2_2.AbstractAws2SqsTracingTest
 import io.opentelemetry.instrumentation.test.AgentTestTrait
 import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration
+import software.amazon.awssdk.services.sqs.SqsAsyncClient
+import software.amazon.awssdk.services.sqs.SqsClient
 
 class Aws2SqsTracingTest extends AbstractAws2SqsTracingTest implements AgentTestTrait {
   @Override
   ClientOverrideConfiguration.Builder createOverrideConfigurationBuilder() {
     return ClientOverrideConfiguration.builder()
   }
+
+  @Override
+  SqsClient configureSqsClient(SqsClient sqsClient) {
+    return sqsClient
+  }
+
+  @Override
+  SqsAsyncClient configureSqsClient(SqsAsyncClient sqsClient) {
+    return sqsClient
+  }
 }

+ 43 - 0
instrumentation/aws-sdk/aws-sdk-2.2/library-autoconfigure/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/autoconfigure/AwsSdkSingletons.java

@@ -0,0 +1,43 @@
+/*
+ * Copyright The OpenTelemetry Authors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package io.opentelemetry.instrumentation.awssdk.v2_2.autoconfigure;
+
+import io.opentelemetry.api.GlobalOpenTelemetry;
+import io.opentelemetry.instrumentation.api.internal.ConfigPropertiesUtil;
+import io.opentelemetry.instrumentation.awssdk.v2_2.AwsSdkTelemetry;
+
+public final class AwsSdkSingletons {
+
+  private static final boolean CAPTURE_EXPERIMENTAL_SPAN_ATTRIBUTES =
+      ConfigPropertiesUtil.getBoolean(
+          "otel.instrumentation.aws-sdk.experimental-span-attributes", false);
+
+  private static final boolean USE_MESSAGING_PROPAGATOR =
+      ConfigPropertiesUtil.getBoolean(
+          "otel.instrumentation.aws-sdk.experimental-use-propagator-for-messaging", false);
+
+  private static final boolean RECORD_INDIVIDUAL_HTTP_ERROR =
+      ConfigPropertiesUtil.getBoolean(
+          "otel.instrumentation.aws-sdk.experimental-record-individual-http-error", false);
+
+  private static final boolean RECEIVE_TELEMETRY_ENABLED =
+      ConfigPropertiesUtil.getBoolean(
+          "otel.instrumentation.messaging.experimental.receive-telemetry.enabled", false);
+
+  private static final AwsSdkTelemetry TELEMETRY =
+      AwsSdkTelemetry.builder(GlobalOpenTelemetry.get())
+          .setCaptureExperimentalSpanAttributes(CAPTURE_EXPERIMENTAL_SPAN_ATTRIBUTES)
+          .setMessagingReceiveInstrumentationEnabled(RECEIVE_TELEMETRY_ENABLED)
+          .setUseConfiguredPropagatorForMessaging(USE_MESSAGING_PROPAGATOR)
+          .setRecordIndividualHttpError(RECORD_INDIVIDUAL_HTTP_ERROR)
+          .build();
+
+  public static AwsSdkTelemetry telemetry() {
+    return TELEMETRY;
+  }
+
+  private AwsSdkSingletons() {}
+}

+ 1 - 21
instrumentation/aws-sdk/aws-sdk-2.2/library-autoconfigure/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/autoconfigure/TracingExecutionInterceptor.java

@@ -5,9 +5,6 @@
 
 package io.opentelemetry.instrumentation.awssdk.v2_2.autoconfigure;
 
-import io.opentelemetry.api.GlobalOpenTelemetry;
-import io.opentelemetry.instrumentation.api.internal.ConfigPropertiesUtil;
-import io.opentelemetry.instrumentation.awssdk.v2_2.AwsSdkTelemetry;
 import java.io.InputStream;
 import java.nio.ByteBuffer;
 import java.util.Optional;
@@ -28,25 +25,8 @@ import software.amazon.awssdk.http.SdkHttpResponse;
  */
 public class TracingExecutionInterceptor implements ExecutionInterceptor {
 
-  private static final boolean CAPTURE_EXPERIMENTAL_SPAN_ATTRIBUTES =
-      ConfigPropertiesUtil.getBoolean(
-          "otel.instrumentation.aws-sdk.experimental-span-attributes", false);
-
-  private static final boolean USE_MESSAGING_PROPAGATOR =
-      ConfigPropertiesUtil.getBoolean(
-          "otel.instrumentation.aws-sdk.experimental-use-propagator-for-messaging", false);
-
-  private static final boolean RECORD_INDIVIDUAL_HTTP_ERROR =
-      ConfigPropertiesUtil.getBoolean(
-          "otel.instrumentation.aws-sdk.experimental-record-individual-http-error", false);
-
   private final ExecutionInterceptor delegate =
-      AwsSdkTelemetry.builder(GlobalOpenTelemetry.get())
-          .setCaptureExperimentalSpanAttributes(CAPTURE_EXPERIMENTAL_SPAN_ATTRIBUTES)
-          .setUseConfiguredPropagatorForMessaging(USE_MESSAGING_PROPAGATOR)
-          .setRecordIndividualHttpError(RECORD_INDIVIDUAL_HTTP_ERROR)
-          .build()
-          .newExecutionInterceptor();
+      AwsSdkSingletons.telemetry().newExecutionInterceptor();
 
   @Override
   public void beforeExecution(

+ 14 - 1
instrumentation/aws-sdk/aws-sdk-2.2/library/README.md

@@ -9,13 +9,26 @@ To instrument all AWS SDK clients include the `opentelemetry-aws-sdk-2.2-autocon
 To register instrumentation only on a specific SDK client, register the interceptor when creating it.
 
 ```java
+AwsSdkTelemetry telemetrty = AwsSdkTelemetry.create(openTelemetry).build();
 DynamoDbClient client = DynamoDbClient.builder()
   .overrideConfiguration(ClientOverrideConfiguration.builder()
-    .addExecutionInterceptor(AwsSdk.newInterceptor()))
+    .addExecutionInterceptor(telemetrty.newExecutionInterceptor()))
     .build())
   .build();
 ```
 
+For SQS an additional step is needed
+```java
+SqsClientBuilder sqsClientBuilder = SqsClient.builder();
+...
+SqsClient sqsClient = telemetry.wrap(sqsClientBuilder.build());
+```
+```java
+SqsAsyncClientBuilder sqsAsyncClientBuilder = SqsAsyncClient.builder();
+...
+SqsAsyncClient sqsAsyncClient = telemetry.wrap(sqsAsyncClientBuilder.build());
+```
+
 ## Trace propagation
 
 The AWS SDK instrumentation always injects the trace header into the request

+ 1 - 0
instrumentation/aws-sdk/aws-sdk-2.2/library/build.gradle.kts

@@ -36,6 +36,7 @@ testing {
       dependencies {
         implementation(project())
         implementation(project(":instrumentation:aws-sdk:aws-sdk-2.2:testing"))
+        compileOnly("software.amazon.awssdk:sqs:2.2.0")
         if (findProperty("testLatestDeps") as Boolean) {
           implementation("software.amazon.awssdk:aws-core:+")
           implementation("software.amazon.awssdk:aws-json-protocol:+")

+ 86 - 11
instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/AwsSdkInstrumenterFactory.java

@@ -6,8 +6,13 @@
 package io.opentelemetry.instrumentation.awssdk.v2_2;
 
 import io.opentelemetry.api.OpenTelemetry;
+import io.opentelemetry.api.common.AttributesBuilder;
+import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.context.Context;
+import io.opentelemetry.context.propagation.TextMapPropagator;
 import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor;
 import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
+import io.opentelemetry.instrumentation.api.instrumenter.InstrumenterBuilder;
 import io.opentelemetry.instrumentation.api.instrumenter.SpanKindExtractor;
 import io.opentelemetry.instrumentation.api.instrumenter.SpanNameExtractor;
 import io.opentelemetry.instrumentation.api.instrumenter.http.HttpClientAttributesExtractor;
@@ -18,6 +23,7 @@ import io.opentelemetry.instrumentation.api.instrumenter.rpc.RpcClientAttributes
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import javax.annotation.Nullable;
 import software.amazon.awssdk.core.interceptor.ExecutionAttributes;
 import software.amazon.awssdk.core.interceptor.SdkExecutionAttribute;
 
@@ -66,17 +72,83 @@ final class AwsSdkInstrumenterFactory {
             ? extendedAttributesExtractors
             : defaultAttributesExtractors,
         AwsSdkInstrumenterFactory::spanName,
-        SpanKindExtractor.alwaysClient());
+        SpanKindExtractor.alwaysClient(),
+        true);
   }
 
-  static Instrumenter<ExecutionAttributes, Response> consumerInstrumenter(
-      OpenTelemetry openTelemetry, boolean captureExperimentalSpanAttributes) {
+  static Instrumenter<ExecutionAttributes, Response> consumerReceiveInstrumenter(
+      OpenTelemetry openTelemetry,
+      boolean captureExperimentalSpanAttributes,
+      boolean messagingReceiveInstrumentationEnabled) {
     return sqsInstrumenter(
         openTelemetry,
         MessageOperation.RECEIVE,
         captureExperimentalSpanAttributes
             ? extendedConsumerAttributesExtractors
-            : defaultConsumerAttributesExtractors);
+            : defaultConsumerAttributesExtractors,
+        messagingReceiveInstrumentationEnabled);
+  }
+
+  static Instrumenter<SqsProcessRequest, Void> consumerProcessInstrumenter(
+      OpenTelemetry openTelemetry,
+      TextMapPropagator messagingPropagator,
+      boolean captureExperimentalSpanAttributes,
+      boolean messagingReceiveInstrumentationEnabled,
+      boolean shouldUseXrayPropagator) {
+    MessageOperation operation = MessageOperation.PROCESS;
+    SqsProcessRequestAttributesGetter getter = SqsProcessRequestAttributesGetter.INSTANCE;
+
+    InstrumenterBuilder<SqsProcessRequest, Void> builder =
+        Instrumenter.<SqsProcessRequest, Void>builder(
+                openTelemetry,
+                INSTRUMENTATION_NAME,
+                MessagingSpanNameExtractor.create(getter, operation))
+            .addAttributesExtractors(
+                toProcessRequestExtractors(
+                    captureExperimentalSpanAttributes
+                        ? extendedConsumerAttributesExtractors
+                        : defaultConsumerAttributesExtractors))
+            .addAttributesExtractor(
+                MessagingAttributesExtractor.builder(getter, operation).build());
+
+    if (messagingReceiveInstrumentationEnabled) {
+      builder.addSpanLinksExtractor(
+          (spanLinks, parentContext, request) -> {
+            Context extracted =
+                SqsParentContext.ofMessage(
+                    request.getMessage(), messagingPropagator, shouldUseXrayPropagator);
+            spanLinks.addLink(Span.fromContext(extracted).getSpanContext());
+          });
+    }
+    return builder.buildInstrumenter(SpanKindExtractor.alwaysConsumer());
+  }
+
+  private static List<AttributesExtractor<SqsProcessRequest, Void>> toProcessRequestExtractors(
+      List<AttributesExtractor<ExecutionAttributes, Response>> extractors) {
+    List<AttributesExtractor<SqsProcessRequest, Void>> result = new ArrayList<>();
+    for (AttributesExtractor<ExecutionAttributes, Response> extractor : extractors) {
+      result.add(
+          new AttributesExtractor<SqsProcessRequest, Void>() {
+            @Override
+            public void onStart(
+                AttributesBuilder attributes,
+                Context parentContext,
+                SqsProcessRequest sqsProcessRequest) {
+              extractor.onStart(attributes, parentContext, sqsProcessRequest.getRequest());
+            }
+
+            @Override
+            public void onEnd(
+                AttributesBuilder attributes,
+                Context context,
+                SqsProcessRequest sqsProcessRequest,
+                @Nullable Void unused,
+                @Nullable Throwable error) {
+              extractor.onEnd(attributes, context, sqsProcessRequest.getRequest(), null, error);
+            }
+          });
+    }
+    return result;
   }
 
   static Instrumenter<ExecutionAttributes, Response> producerInstrumenter(
@@ -86,13 +158,15 @@ final class AwsSdkInstrumenterFactory {
         MessageOperation.PUBLISH,
         captureExperimentalSpanAttributes
             ? extendedAttributesExtractors
-            : defaultAttributesExtractors);
+            : defaultAttributesExtractors,
+        true);
   }
 
   private static Instrumenter<ExecutionAttributes, Response> sqsInstrumenter(
       OpenTelemetry openTelemetry,
       MessageOperation operation,
-      List<AttributesExtractor<ExecutionAttributes, Response>> extractors) {
+      List<AttributesExtractor<ExecutionAttributes, Response>> extractors,
+      boolean enabled) {
     SqsAttributesGetter getter = SqsAttributesGetter.INSTANCE;
     AttributesExtractor<ExecutionAttributes, Response> messagingAttributeExtractor =
         MessagingAttributesExtractor.builder(getter, operation).build();
@@ -106,20 +180,21 @@ final class AwsSdkInstrumenterFactory {
         MessagingSpanNameExtractor.create(getter, operation),
         operation == MessageOperation.PUBLISH
             ? SpanKindExtractor.alwaysProducer()
-            : SpanKindExtractor.alwaysConsumer());
+            : SpanKindExtractor.alwaysConsumer(),
+        enabled);
   }
 
   private static Instrumenter<ExecutionAttributes, Response> createInstrumenter(
       OpenTelemetry openTelemetry,
       List<AttributesExtractor<ExecutionAttributes, Response>> extractors,
       SpanNameExtractor<ExecutionAttributes> spanNameExtractor,
-      SpanKindExtractor<ExecutionAttributes> spanKindExtractor) {
+      SpanKindExtractor<ExecutionAttributes> spanKindExtractor,
+      boolean enabled) {
 
     return Instrumenter.<ExecutionAttributes, Response>builder(
-            openTelemetry,
-            INSTRUMENTATION_NAME,
-            spanNameExtractor) // AwsSdkInstrumenterFactory::spanName
+            openTelemetry, INSTRUMENTATION_NAME, spanNameExtractor)
         .addAttributesExtractors(extractors)
+        .setEnabled(enabled)
         .buildInstrumenter(spanKindExtractor);
   }
 

+ 40 - 8
instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/AwsSdkTelemetry.java

@@ -8,10 +8,13 @@ package io.opentelemetry.instrumentation.awssdk.v2_2;
 import io.opentelemetry.api.OpenTelemetry;
 import io.opentelemetry.context.propagation.TextMapPropagator;
 import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
+import io.opentelemetry.javaagent.tooling.muzzle.NoMuzzle;
 import javax.annotation.Nullable;
 import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
 import software.amazon.awssdk.core.interceptor.ExecutionAttributes;
 import software.amazon.awssdk.core.interceptor.ExecutionInterceptor;
+import software.amazon.awssdk.services.sqs.SqsAsyncClient;
+import software.amazon.awssdk.services.sqs.SqsClient;
 
 /**
  * Entrypoint to OpenTelemetry instrumentation of the AWS SDK. Register the {@link
@@ -41,7 +44,8 @@ public class AwsSdkTelemetry {
   }
 
   private final Instrumenter<ExecutionAttributes, Response> requestInstrumenter;
-  private final Instrumenter<ExecutionAttributes, Response> consumerInstrumenter;
+  private final Instrumenter<ExecutionAttributes, Response> consumerReceiveInstrumenter;
+  private final Instrumenter<SqsProcessRequest, Void> consumerProcessInstrumenter;
   private final Instrumenter<ExecutionAttributes, Response> producerInstrumenter;
   private final boolean captureExperimentalSpanAttributes;
   @Nullable private final TextMapPropagator messagingPropagator;
@@ -53,20 +57,30 @@ public class AwsSdkTelemetry {
       boolean captureExperimentalSpanAttributes,
       boolean useMessagingPropagator,
       boolean useXrayPropagator,
-      boolean recordIndividualHttpError) {
+      boolean recordIndividualHttpError,
+      boolean messagingReceiveInstrumentationEnabled) {
     this.useXrayPropagator = useXrayPropagator;
+    this.messagingPropagator =
+        useMessagingPropagator ? openTelemetry.getPropagators().getTextMapPropagator() : null;
     this.requestInstrumenter =
         AwsSdkInstrumenterFactory.requestInstrumenter(
             openTelemetry, captureExperimentalSpanAttributes);
-    this.consumerInstrumenter =
-        AwsSdkInstrumenterFactory.consumerInstrumenter(
-            openTelemetry, captureExperimentalSpanAttributes);
+    this.consumerReceiveInstrumenter =
+        AwsSdkInstrumenterFactory.consumerReceiveInstrumenter(
+            openTelemetry,
+            captureExperimentalSpanAttributes,
+            messagingReceiveInstrumentationEnabled);
+    this.consumerProcessInstrumenter =
+        AwsSdkInstrumenterFactory.consumerProcessInstrumenter(
+            openTelemetry,
+            messagingPropagator,
+            captureExperimentalSpanAttributes,
+            messagingReceiveInstrumentationEnabled,
+            useXrayPropagator);
     this.producerInstrumenter =
         AwsSdkInstrumenterFactory.producerInstrumenter(
             openTelemetry, captureExperimentalSpanAttributes);
     this.captureExperimentalSpanAttributes = captureExperimentalSpanAttributes;
-    this.messagingPropagator =
-        useMessagingPropagator ? openTelemetry.getPropagators().getTextMapPropagator() : null;
     this.recordIndividualHttpError = recordIndividualHttpError;
   }
 
@@ -77,11 +91,29 @@ public class AwsSdkTelemetry {
   public ExecutionInterceptor newExecutionInterceptor() {
     return new TracingExecutionInterceptor(
         requestInstrumenter,
-        consumerInstrumenter,
+        consumerReceiveInstrumenter,
+        consumerProcessInstrumenter,
         producerInstrumenter,
         captureExperimentalSpanAttributes,
         messagingPropagator,
         useXrayPropagator,
         recordIndividualHttpError);
   }
+
+  /**
+   * Construct a new tracing-enable {@link SqsClient} using the provided {@link SqsClient} instance.
+   */
+  @NoMuzzle
+  public SqsClient wrap(SqsClient sqsClient) {
+    return SqsImpl.wrap(sqsClient);
+  }
+
+  /**
+   * Construct a new tracing-enable {@link SqsAsyncClient} using the provided {@link SqsAsyncClient}
+   * instance.
+   */
+  @NoMuzzle
+  public SqsAsyncClient wrap(SqsAsyncClient sqsClient) {
+    return SqsImpl.wrap(sqsClient);
+  }
 }

+ 16 - 4
instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/AwsSdkTelemetryBuilder.java

@@ -14,12 +14,10 @@ public final class AwsSdkTelemetryBuilder {
   private final OpenTelemetry openTelemetry;
 
   private boolean captureExperimentalSpanAttributes;
-
   private boolean useMessagingPropagator;
-
   private boolean recordIndividualHttpError;
-
   private boolean useXrayPropagator = true;
+  private boolean messagingReceiveInstrumentationEnabled;
 
   AwsSdkTelemetryBuilder(OpenTelemetry openTelemetry) {
     this.openTelemetry = openTelemetry;
@@ -87,6 +85,19 @@ public final class AwsSdkTelemetryBuilder {
     return this;
   }
 
+  /**
+   * Set whether to capture the consumer message receive telemetry in messaging instrumentation.
+   *
+   * <p>Note that this will cause the consumer side to start a new trace, with only a span link
+   * connecting it to the producer trace.
+   */
+  @CanIgnoreReturnValue
+  public AwsSdkTelemetryBuilder setMessagingReceiveInstrumentationEnabled(
+      boolean messagingReceiveInstrumentationEnabled) {
+    this.messagingReceiveInstrumentationEnabled = messagingReceiveInstrumentationEnabled;
+    return this;
+  }
+
   /**
    * Returns a new {@link AwsSdkTelemetry} with the settings of this {@link AwsSdkTelemetryBuilder}.
    */
@@ -96,6 +107,7 @@ public final class AwsSdkTelemetryBuilder {
         captureExperimentalSpanAttributes,
         useMessagingPropagator,
         useXrayPropagator,
-        recordIndividualHttpError);
+        recordIndividualHttpError,
+        messagingReceiveInstrumentationEnabled);
   }
 }

+ 5 - 2
instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/SqsAccess.java

@@ -6,6 +6,7 @@
 package io.opentelemetry.instrumentation.awssdk.v2_2;
 
 import io.opentelemetry.context.propagation.TextMapPropagator;
+import io.opentelemetry.instrumentation.api.internal.Timer;
 import io.opentelemetry.javaagent.tooling.muzzle.NoMuzzle;
 import javax.annotation.Nullable;
 import software.amazon.awssdk.core.SdkRequest;
@@ -24,8 +25,10 @@ final class SqsAccess {
   static boolean afterReceiveMessageExecution(
       Context.AfterExecution context,
       ExecutionAttributes executionAttributes,
-      TracingExecutionInterceptor config) {
-    return enabled && SqsImpl.afterReceiveMessageExecution(context, executionAttributes, config);
+      TracingExecutionInterceptor config,
+      Timer timer) {
+    return enabled
+        && SqsImpl.afterReceiveMessageExecution(context, executionAttributes, config, timer);
   }
 
   /**

+ 154 - 32
instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/SqsImpl.java

@@ -5,18 +5,30 @@
 
 package io.opentelemetry.instrumentation.awssdk.v2_2;
 
+import static io.opentelemetry.instrumentation.awssdk.v2_2.TracingExecutionInterceptor.SDK_HTTP_REQUEST_ATTRIBUTE;
+import static io.opentelemetry.instrumentation.awssdk.v2_2.TracingExecutionInterceptor.SDK_REQUEST_ATTRIBUTE;
+
+import io.opentelemetry.context.Scope;
 import io.opentelemetry.context.propagation.TextMapPropagator;
 import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
+import io.opentelemetry.instrumentation.api.internal.InstrumenterUtil;
+import io.opentelemetry.instrumentation.api.internal.Timer;
+import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
 import javax.annotation.Nullable;
 import software.amazon.awssdk.core.SdkRequest;
 import software.amazon.awssdk.core.SdkResponse;
 import software.amazon.awssdk.core.interceptor.Context;
 import software.amazon.awssdk.core.interceptor.ExecutionAttributes;
-import software.amazon.awssdk.http.SdkHttpResponse;
+import software.amazon.awssdk.core.interceptor.SdkExecutionAttribute;
+import software.amazon.awssdk.services.sqs.SqsAsyncClient;
 import software.amazon.awssdk.services.sqs.SqsClient;
 import software.amazon.awssdk.services.sqs.model.Message;
 import software.amazon.awssdk.services.sqs.model.MessageAttributeValue;
@@ -41,7 +53,8 @@ final class SqsImpl {
   static boolean afterReceiveMessageExecution(
       Context.AfterExecution context,
       ExecutionAttributes executionAttributes,
-      TracingExecutionInterceptor config) {
+      TracingExecutionInterceptor config,
+      Timer timer) {
 
     SdkResponse rawResponse = context.response();
     if (!(rawResponse instanceof ReceiveMessageResponse)) {
@@ -49,44 +62,72 @@ final class SqsImpl {
     }
 
     ReceiveMessageResponse response = (ReceiveMessageResponse) rawResponse;
-    SdkHttpResponse httpResponse = context.httpResponse();
-    for (Message message : response.messages()) {
-      createConsumerSpan(message, httpResponse, executionAttributes, config);
+    if (response.messages().isEmpty()) {
+      return false;
     }
 
-    return true;
-  }
+    io.opentelemetry.context.Context parentContext =
+        TracingExecutionInterceptor.getParentContext(executionAttributes);
+    Instrumenter<ExecutionAttributes, Response> consumerReceiveInstrumenter =
+        config.getConsumerReceiveInstrumenter();
+    io.opentelemetry.context.Context receiveContext = null;
+    if (timer != null
+        && consumerReceiveInstrumenter.shouldStart(parentContext, executionAttributes)) {
+      receiveContext =
+          InstrumenterUtil.startAndEnd(
+              consumerReceiveInstrumenter,
+              parentContext,
+              executionAttributes,
+              new Response(context.httpResponse(), response),
+              null,
+              timer.startTime(),
+              timer.now());
+    }
+    // copy ExecutionAttributes as these will get cleared before the process spans are created
+    ExecutionAttributes copy = new ExecutionAttributes();
+    copy.putAttribute(
+        SDK_HTTP_REQUEST_ATTRIBUTE, executionAttributes.getAttribute(SDK_HTTP_REQUEST_ATTRIBUTE));
+    copy.putAttribute(
+        SDK_REQUEST_ATTRIBUTE, executionAttributes.getAttribute(SDK_REQUEST_ATTRIBUTE));
+    copy.putAttribute(
+        SdkExecutionAttribute.SERVICE_NAME,
+        executionAttributes.getAttribute(SdkExecutionAttribute.SERVICE_NAME));
+    copy.putAttribute(
+        SdkExecutionAttribute.OPERATION_NAME,
+        executionAttributes.getAttribute(SdkExecutionAttribute.OPERATION_NAME));
 
-  private static void createConsumerSpan(
-      Message message,
-      SdkHttpResponse httpResponse,
-      ExecutionAttributes executionAttributes,
-      TracingExecutionInterceptor config) {
+    TracingList tracingList =
+        TracingList.wrap(
+            response.messages(),
+            config.getConsumerProcessInstrumenter(),
+            copy,
+            config,
+            receiveContext);
 
-    io.opentelemetry.context.Context parentContext = io.opentelemetry.context.Context.root();
+    // store tracing list in context so that our proxied SqsClient/SqsAsyncClient could pick it up
+    SqsTracingContext.set(parentContext, tracingList);
 
-    TextMapPropagator messagingPropagator = config.getMessagingPropagator();
-    if (messagingPropagator != null) {
-      parentContext =
-          SqsParentContext.ofMessageAttributes(message.messageAttributes(), messagingPropagator);
-    }
+    return true;
+  }
 
-    if (config.shouldUseXrayPropagator()
-        && parentContext == io.opentelemetry.context.Context.root()) {
-      parentContext = SqsParentContext.ofSystemAttributes(message.attributesAsStrings());
-    }
+  private static final Field messagesField = getMessagesField();
 
-    Instrumenter<ExecutionAttributes, Response> consumerInstrumenter =
-        config.getConsumerInstrumenter();
-    if (consumerInstrumenter.shouldStart(parentContext, executionAttributes)) {
-      io.opentelemetry.context.Context context =
-          consumerInstrumenter.start(parentContext, executionAttributes);
+  private static Field getMessagesField() {
+    try {
+      Field field = ReceiveMessageResponse.class.getDeclaredField("messages");
+      field.setAccessible(true);
+      return field;
+    } catch (Exception e) {
+      return null;
+    }
+  }
 
-      // TODO: Even if we keep HTTP attributes (see afterMarshalling), does it make sense here
-      //  per-message?
-      // TODO: Should we really create root spans if we can't extract anything, or should we attach
-      //  to the current context?
-      consumerInstrumenter.end(context, executionAttributes, new Response(httpResponse), null);
+  public static void setMessages(
+      ReceiveMessageResponse receiveMessageResponse, List<Message> messages) {
+    try {
+      messagesField.set(receiveMessageResponse, messages);
+    } catch (IllegalAccessException ignored) {
+      // should not happen, we call setAccessible on the field
     }
   }
 
@@ -223,4 +264,85 @@ final class SqsImpl {
     }
     return null;
   }
+
+  static SqsClient wrap(SqsClient sqsClient) {
+    // proxy SqsClient so we could replace the messages list in ReceiveMessageResponse returned from
+    // receiveMessage call
+    return (SqsClient)
+        Proxy.newProxyInstance(
+            sqsClient.getClass().getClassLoader(),
+            new Class<?>[] {SqsClient.class},
+            (proxy, method, args) -> {
+              if ("receiveMessage".equals(method.getName())) {
+                SqsTracingContext sqsTracingContext = new SqsTracingContext();
+                try (Scope ignored =
+                    io.opentelemetry.context.Context.current()
+                        .with(sqsTracingContext)
+                        .makeCurrent()) {
+                  Object result = invokeProxyMethod(method, sqsClient, args);
+                  TracingList tracingList = sqsTracingContext.get();
+                  if (tracingList != null) {
+                    ReceiveMessageResponse response = (ReceiveMessageResponse) result;
+                    SqsImpl.setMessages(response, tracingList);
+                    return response;
+                  }
+                  return result;
+                }
+              } else {
+                return invokeProxyMethod(method, sqsClient, args);
+              }
+            });
+  }
+
+  @SuppressWarnings("unchecked")
+  static SqsAsyncClient wrap(SqsAsyncClient sqsClient) {
+    // proxy SqsAsyncClient so we could replace the messages list in ReceiveMessageResponse returned
+    // from receiveMessage call
+    return (SqsAsyncClient)
+        Proxy.newProxyInstance(
+            sqsClient.getClass().getClassLoader(),
+            new Class<?>[] {SqsAsyncClient.class},
+            (proxy, method, args) -> {
+              if ("receiveMessage".equals(method.getName())) {
+                SqsTracingContext sqsTracingContext = new SqsTracingContext();
+                try (Scope ignored =
+                    io.opentelemetry.context.Context.current()
+                        .with(sqsTracingContext)
+                        .makeCurrent()) {
+                  Object result = invokeProxyMethod(method, sqsClient, args);
+                  CompletableFuture<ReceiveMessageResponse> originalFuture =
+                      (CompletableFuture<ReceiveMessageResponse>) result;
+                  CompletableFuture<ReceiveMessageResponse> resultFuture =
+                      new CompletableFuture<>();
+                  originalFuture.whenComplete(
+                      (response, throwable) -> {
+                        if (throwable != null) {
+                          resultFuture.completeExceptionally(throwable);
+                        } else {
+                          TracingList tracingList = sqsTracingContext.get();
+                          if (tracingList != null) {
+                            SqsImpl.setMessages(response, tracingList);
+                          }
+                          resultFuture.complete(response);
+                        }
+                      });
+
+                  return resultFuture;
+                } catch (InvocationTargetException exception) {
+                  throw exception.getCause();
+                }
+              } else {
+                return invokeProxyMethod(method, sqsClient, args);
+              }
+            });
+  }
+
+  private static Object invokeProxyMethod(Method method, Object target, Object[] args)
+      throws Throwable {
+    try {
+      return method.invoke(target, args);
+    } catch (InvocationTargetException exception) {
+      throw exception.getCause();
+    }
+  }
 }

+ 20 - 0
instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/SqsMessage.java

@@ -0,0 +1,20 @@
+/*
+ * Copyright The OpenTelemetry Authors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package io.opentelemetry.instrumentation.awssdk.v2_2;
+
+import java.util.Map;
+import software.amazon.awssdk.services.sqs.model.MessageAttributeValue;
+
+/**
+ * A wrapper interface for {@link software.amazon.awssdk.services.sqs.model.Message}. Using this
+ * wrapper avoids muzzle failure when sqs classes are not present.
+ */
+interface SqsMessage {
+
+  Map<String, MessageAttributeValue> messageAttributes();
+
+  Map<String, String> attributesAsStrings();
+}

+ 33 - 0
instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/SqsMessageImpl.java

@@ -0,0 +1,33 @@
+/*
+ * Copyright The OpenTelemetry Authors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package io.opentelemetry.instrumentation.awssdk.v2_2;
+
+import java.util.Map;
+import software.amazon.awssdk.services.sqs.model.Message;
+import software.amazon.awssdk.services.sqs.model.MessageAttributeValue;
+
+final class SqsMessageImpl implements SqsMessage {
+
+  private final Message message;
+
+  private SqsMessageImpl(Message message) {
+    this.message = message;
+  }
+
+  static SqsMessage wrap(Message message) {
+    return new SqsMessageImpl(message);
+  }
+
+  @Override
+  public Map<String, MessageAttributeValue> messageAttributes() {
+    return message.messageAttributes();
+  }
+
+  @Override
+  public Map<String, String> attributesAsStrings() {
+    return message.attributesAsStrings();
+  }
+}

+ 19 - 0
instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/SqsParentContext.java

@@ -69,5 +69,24 @@ final class SqsParentContext {
             StringMapGetter.INSTANCE);
   }
 
+  static Context ofMessage(SqsMessage message, TracingExecutionInterceptor config) {
+    return ofMessage(message, config.getMessagingPropagator(), config.shouldUseXrayPropagator());
+  }
+
+  static Context ofMessage(
+      SqsMessage message, TextMapPropagator messagingPropagator, boolean shouldUseXrayPropagator) {
+    io.opentelemetry.context.Context parentContext = io.opentelemetry.context.Context.root();
+
+    if (messagingPropagator != null) {
+      parentContext = ofMessageAttributes(message.messageAttributes(), messagingPropagator);
+    }
+
+    if (shouldUseXrayPropagator && parentContext == io.opentelemetry.context.Context.root()) {
+      parentContext = ofSystemAttributes(message.attributesAsStrings());
+    }
+
+    return parentContext;
+  }
+
   private SqsParentContext() {}
 }

+ 30 - 0
instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/SqsProcessRequest.java

@@ -0,0 +1,30 @@
+/*
+ * Copyright The OpenTelemetry Authors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package io.opentelemetry.instrumentation.awssdk.v2_2;
+
+import software.amazon.awssdk.core.interceptor.ExecutionAttributes;
+
+final class SqsProcessRequest {
+  private final ExecutionAttributes request;
+  private final SqsMessage message;
+
+  private SqsProcessRequest(ExecutionAttributes request, SqsMessage message) {
+    this.request = request;
+    this.message = message;
+  }
+
+  public static SqsProcessRequest create(ExecutionAttributes request, SqsMessage message) {
+    return new SqsProcessRequest(request, message);
+  }
+
+  public ExecutionAttributes getRequest() {
+    return request;
+  }
+
+  public SqsMessage getMessage() {
+    return message;
+  }
+}

+ 63 - 0
instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/SqsProcessRequestAttributesGetter.java

@@ -0,0 +1,63 @@
+/*
+ * Copyright The OpenTelemetry Authors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package io.opentelemetry.instrumentation.awssdk.v2_2;
+
+import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesGetter;
+import javax.annotation.Nullable;
+import software.amazon.awssdk.core.SdkRequest;
+
+enum SqsProcessRequestAttributesGetter
+    implements MessagingAttributesGetter<SqsProcessRequest, Void> {
+  INSTANCE;
+
+  @Override
+  public String getSystem(SqsProcessRequest request) {
+    return "AmazonSQS";
+  }
+
+  @Override
+  public String getDestination(SqsProcessRequest request) {
+    SdkRequest sdkRequest =
+        request.getRequest().getAttribute(TracingExecutionInterceptor.SDK_REQUEST_ATTRIBUTE);
+    String queueUrl = SqsAccess.getQueueUrl(sdkRequest);
+    if (queueUrl != null) {
+      int i = queueUrl.lastIndexOf('/');
+      if (i > 0) {
+        return queueUrl.substring(i + 1);
+      }
+    }
+    return null;
+  }
+
+  @Override
+  public boolean isTemporaryDestination(SqsProcessRequest request) {
+    return false;
+  }
+
+  @Override
+  @Nullable
+  public String getConversationId(SqsProcessRequest request) {
+    return null;
+  }
+
+  @Override
+  @Nullable
+  public Long getMessagePayloadSize(SqsProcessRequest request) {
+    return null;
+  }
+
+  @Override
+  @Nullable
+  public Long getMessagePayloadCompressedSize(SqsProcessRequest request) {
+    return null;
+  }
+
+  @Override
+  @Nullable
+  public String getMessageId(SqsProcessRequest request, @Nullable Void response) {
+    return null;
+  }
+}

+ 37 - 0
instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/SqsTracingContext.java

@@ -0,0 +1,37 @@
+/*
+ * Copyright The OpenTelemetry Authors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package io.opentelemetry.instrumentation.awssdk.v2_2;
+
+import static io.opentelemetry.context.ContextKey.named;
+
+import io.opentelemetry.context.Context;
+import io.opentelemetry.context.ContextKey;
+import io.opentelemetry.context.ImplicitContextKeyed;
+import javax.annotation.Nullable;
+
+final class SqsTracingContext implements ImplicitContextKeyed {
+
+  private static final ContextKey<SqsTracingContext> KEY = named("sqs-tracing-context");
+
+  private TracingList tracingList;
+
+  public static void set(Context context, TracingList tracingList) {
+    SqsTracingContext holder = context.get(KEY);
+    if (holder != null) {
+      holder.tracingList = tracingList;
+    }
+  }
+
+  @Nullable
+  public TracingList get() {
+    return tracingList;
+  }
+
+  @Override
+  public Context storeInContext(Context context) {
+    return context.with(KEY, this);
+  }
+}

+ 28 - 7
instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/TracingExecutionInterceptor.java

@@ -16,6 +16,7 @@ import io.opentelemetry.context.propagation.TextMapPropagator;
 import io.opentelemetry.contrib.awsxray.propagator.AwsXrayPropagator;
 import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
 import io.opentelemetry.instrumentation.api.internal.InstrumenterUtil;
+import io.opentelemetry.instrumentation.api.internal.Timer;
 import io.opentelemetry.semconv.SemanticAttributes;
 import java.io.BufferedReader;
 import java.io.ByteArrayInputStream;
@@ -46,6 +47,9 @@ final class TracingExecutionInterceptor implements ExecutionInterceptor {
   // instrumentation, and won't conflict with usage outside javaagent instrumentation
   private static final ExecutionAttribute<io.opentelemetry.context.Context> CONTEXT_ATTRIBUTE =
       new ExecutionAttribute<>(TracingExecutionInterceptor.class.getName() + ".Context");
+  private static final ExecutionAttribute<io.opentelemetry.context.Context>
+      PARENT_CONTEXT_ATTRIBUTE =
+          new ExecutionAttribute<>(TracingExecutionInterceptor.class.getName() + ".ParentContext");
   private static final ExecutionAttribute<Scope> SCOPE_ATTRIBUTE =
       new ExecutionAttribute<>(TracingExecutionInterceptor.class.getName() + ".Scope");
   private static final ExecutionAttribute<AwsSdkRequest> AWS_SDK_REQUEST_ATTRIBUTE =
@@ -56,9 +60,12 @@ final class TracingExecutionInterceptor implements ExecutionInterceptor {
       new ExecutionAttribute<>(TracingExecutionInterceptor.class.getName() + ".SdkRequest");
   private static final ExecutionAttribute<RequestSpanFinisher> REQUEST_FINISHER_ATTRIBUTE =
       new ExecutionAttribute<>(TracingExecutionInterceptor.class.getName() + ".RequestFinisher");
+  static final ExecutionAttribute<TracingList> TRACING_MESSAGES_ATTRIBUTE =
+      new ExecutionAttribute<>(TracingExecutionInterceptor.class.getName() + ".TracingMessages");
 
   private final Instrumenter<ExecutionAttributes, Response> requestInstrumenter;
-  private final Instrumenter<ExecutionAttributes, Response> consumerInstrumenter;
+  private final Instrumenter<ExecutionAttributes, Response> consumerReceiveInstrumenter;
+  private final Instrumenter<SqsProcessRequest, Void> consumerProcessInstrumenter;
   private final Instrumenter<ExecutionAttributes, Response> producerInstrumenter;
   private final boolean captureExperimentalSpanAttributes;
 
@@ -66,8 +73,12 @@ final class TracingExecutionInterceptor implements ExecutionInterceptor {
       AttributeKey.stringKey("aws.http.error_message");
   static final String HTTP_FAILURE_EVENT = "HTTP request failure";
 
-  Instrumenter<ExecutionAttributes, Response> getConsumerInstrumenter() {
-    return consumerInstrumenter;
+  Instrumenter<ExecutionAttributes, Response> getConsumerReceiveInstrumenter() {
+    return consumerReceiveInstrumenter;
+  }
+
+  Instrumenter<SqsProcessRequest, Void> getConsumerProcessInstrumenter() {
+    return consumerProcessInstrumenter;
   }
 
   @Nullable
@@ -86,14 +97,16 @@ final class TracingExecutionInterceptor implements ExecutionInterceptor {
 
   TracingExecutionInterceptor(
       Instrumenter<ExecutionAttributes, Response> requestInstrumenter,
-      Instrumenter<ExecutionAttributes, Response> consumerInstrumenter,
+      Instrumenter<ExecutionAttributes, Response> consumerReceiveInstrumenter,
+      Instrumenter<SqsProcessRequest, Void> consumerProcessInstrumenter,
       Instrumenter<ExecutionAttributes, Response> producerInstrumenter,
       boolean captureExperimentalSpanAttributes,
       TextMapPropagator messagingPropagator,
       boolean useXrayPropagator,
       boolean recordIndividualHttpError) {
     this.requestInstrumenter = requestInstrumenter;
-    this.consumerInstrumenter = consumerInstrumenter;
+    this.consumerReceiveInstrumenter = consumerReceiveInstrumenter;
+    this.consumerProcessInstrumenter = consumerProcessInstrumenter;
     this.producerInstrumenter = producerInstrumenter;
     this.captureExperimentalSpanAttributes = captureExperimentalSpanAttributes;
     this.messagingPropagator = messagingPropagator;
@@ -134,7 +147,7 @@ final class TracingExecutionInterceptor implements ExecutionInterceptor {
     // suppress the span from the underlying http client. Request/http client span appears in a
     // separate trace from message producer/consumer spans if there is no parent span just having
     // a trace with only the request/http client span isn't useful.
-    if (parentOtelContext == io.opentelemetry.context.Context.root()
+    if (Span.fromContextOrNull(parentOtelContext) == null
         && "software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest"
             .equals(request.getClass().getName())) {
       otelContext =
@@ -159,6 +172,7 @@ final class TracingExecutionInterceptor implements ExecutionInterceptor {
       requestFinisher = instrumenter::end;
     }
 
+    executionAttributes.putAttribute(PARENT_CONTEXT_ATTRIBUTE, parentOtelContext);
     executionAttributes.putAttribute(CONTEXT_ATTRIBUTE, otelContext);
     executionAttributes.putAttribute(REQUEST_FINISHER_ATTRIBUTE, requestFinisher);
     if (executionAttributes
@@ -312,7 +326,8 @@ final class TracingExecutionInterceptor implements ExecutionInterceptor {
     if (executionAttributes.getAttribute(SDK_HTTP_REQUEST_ATTRIBUTE) != null) {
       // Other special handling could be shortcut-&&ed after this (false is returned if not
       // handled).
-      SqsAccess.afterReceiveMessageExecution(context, executionAttributes, this);
+      Timer timer = Timer.start();
+      SqsAccess.afterReceiveMessageExecution(context, executionAttributes, this, timer);
     }
 
     io.opentelemetry.context.Context otelContext = getContext(executionAttributes);
@@ -395,9 +410,11 @@ final class TracingExecutionInterceptor implements ExecutionInterceptor {
       scope.close();
     }
     executionAttributes.putAttribute(CONTEXT_ATTRIBUTE, null);
+    executionAttributes.putAttribute(PARENT_CONTEXT_ATTRIBUTE, null);
     executionAttributes.putAttribute(AWS_SDK_REQUEST_ATTRIBUTE, null);
     executionAttributes.putAttribute(SDK_HTTP_REQUEST_ATTRIBUTE, null);
     executionAttributes.putAttribute(REQUEST_FINISHER_ATTRIBUTE, null);
+    executionAttributes.putAttribute(TRACING_MESSAGES_ATTRIBUTE, null);
   }
 
   /**
@@ -408,6 +425,10 @@ final class TracingExecutionInterceptor implements ExecutionInterceptor {
     return attributes.getAttribute(CONTEXT_ATTRIBUTE);
   }
 
+  static io.opentelemetry.context.Context getParentContext(ExecutionAttributes attributes) {
+    return attributes.getAttribute(PARENT_CONTEXT_ATTRIBUTE);
+  }
+
   private Instrumenter<ExecutionAttributes, Response> getInstrumenter(SdkRequest request) {
     return SqsAccess.isSqsProducerRequest(request) ? producerInstrumenter : requestInstrumenter;
   }

+ 99 - 0
instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/TracingIterator.java

@@ -0,0 +1,99 @@
+/*
+ * Copyright The OpenTelemetry Authors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package io.opentelemetry.instrumentation.awssdk.v2_2;
+
+import io.opentelemetry.context.Context;
+import io.opentelemetry.context.Scope;
+import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
+import java.util.Iterator;
+import javax.annotation.Nullable;
+import software.amazon.awssdk.core.interceptor.ExecutionAttributes;
+import software.amazon.awssdk.services.sqs.model.Message;
+
+class TracingIterator implements Iterator<Message> {
+
+  private final Iterator<Message> delegateIterator;
+  private final Instrumenter<SqsProcessRequest, Void> instrumenter;
+  private final ExecutionAttributes request;
+  private final TracingExecutionInterceptor config;
+  private final Context receiveContext;
+
+  /*
+   * Note: this may potentially create problems if this iterator is used from different threads. But
+   * at the moment we cannot do much about this.
+   */
+  @Nullable private SqsProcessRequest currentRequest;
+  @Nullable private Context currentContext;
+  @Nullable private Scope currentScope;
+
+  private TracingIterator(
+      Iterator<Message> delegateIterator,
+      Instrumenter<SqsProcessRequest, Void> instrumenter,
+      ExecutionAttributes request,
+      TracingExecutionInterceptor config,
+      Context receiveContext) {
+    this.delegateIterator = delegateIterator;
+    this.instrumenter = instrumenter;
+    this.request = request;
+    this.config = config;
+    this.receiveContext = receiveContext;
+  }
+
+  public static Iterator<Message> wrap(
+      Iterator<Message> delegateIterator,
+      Instrumenter<SqsProcessRequest, Void> instrumenter,
+      ExecutionAttributes request,
+      TracingExecutionInterceptor config,
+      Context receiveContext) {
+    return new TracingIterator(delegateIterator, instrumenter, request, config, receiveContext);
+  }
+
+  @Override
+  public boolean hasNext() {
+    closeScopeAndEndSpan();
+    return delegateIterator.hasNext();
+  }
+
+  @Override
+  public Message next() {
+    // in case they didn't call hasNext()...
+    closeScopeAndEndSpan();
+
+    // it's important not to suppress consumer span creation here using Instrumenter.shouldStart()
+    // because this instrumentation can leak the context and so there may be a leaked consumer span
+    // in the context, in which case it's important to overwrite the leaked span instead of
+    // suppressing the correct span
+    // (https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/1947)
+    Message next = delegateIterator.next();
+    if (next != null) {
+      SqsMessage sqsMessage = SqsMessageImpl.wrap(next);
+      Context parentContext = receiveContext;
+      if (parentContext == null) {
+        parentContext = SqsParentContext.ofMessage(sqsMessage, config);
+      }
+
+      currentRequest = SqsProcessRequest.create(request, sqsMessage);
+      currentContext = instrumenter.start(parentContext, currentRequest);
+      currentScope = currentContext.makeCurrent();
+    }
+    return next;
+  }
+
+  private void closeScopeAndEndSpan() {
+    if (currentScope != null) {
+      currentScope.close();
+      instrumenter.end(currentContext, currentRequest, null, null);
+      currentScope = null;
+      currentRequest = null;
+      currentContext = null;
+    }
+  }
+
+  @Override
+  public void remove() {
+    delegateIterator.remove();
+  }
+}

+ 62 - 0
instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/TracingList.java

@@ -0,0 +1,62 @@
+/*
+ * Copyright The OpenTelemetry Authors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package io.opentelemetry.instrumentation.awssdk.v2_2;
+
+import io.opentelemetry.context.Context;
+import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import software.amazon.awssdk.core.interceptor.ExecutionAttributes;
+import software.amazon.awssdk.services.sqs.model.Message;
+
+class TracingList extends ArrayList<Message> {
+  private static final long serialVersionUID = 1L;
+
+  private final Instrumenter<SqsProcessRequest, Void> instrumenter;
+  private final ExecutionAttributes request;
+  private final TracingExecutionInterceptor config;
+  private final Context receiveContext;
+  private boolean firstIterator = true;
+
+  private TracingList(
+      List<Message> list,
+      Instrumenter<SqsProcessRequest, Void> instrumenter,
+      ExecutionAttributes request,
+      TracingExecutionInterceptor config,
+      Context receiveContext) {
+    super(list);
+    this.instrumenter = instrumenter;
+    this.request = request;
+    this.config = config;
+    this.receiveContext = receiveContext;
+  }
+
+  public static TracingList wrap(
+      List<Message> list,
+      Instrumenter<SqsProcessRequest, Void> instrumenter,
+      ExecutionAttributes request,
+      TracingExecutionInterceptor config,
+      Context receiveContext) {
+    return new TracingList(list, instrumenter, request, config, receiveContext);
+  }
+
+  @Override
+  public Iterator<Message> iterator() {
+    Iterator<Message> it;
+    // We should only return one iterator with tracing.
+    // However, this is not thread-safe, but usually the first (hopefully only) traversal of
+    // List is performed in the same thread that called receiveMessage()
+    if (firstIterator) {
+      it = TracingIterator.wrap(super.iterator(), instrumenter, request, config, receiveContext);
+      firstIterator = false;
+    } else {
+      it = super.iterator();
+    }
+
+    return it;
+  }
+}

+ 109 - 0
instrumentation/aws-sdk/aws-sdk-2.2/library/src/test/groovy/io/opentelemetry/instrumentation/awssdk/v2_2/Aws2SqsSuppressReceiveSpansTest.groovy

@@ -0,0 +1,109 @@
+/*
+ * Copyright The OpenTelemetry Authors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package io.opentelemetry.instrumentation.awssdk.v2_2
+
+import io.opentelemetry.instrumentation.test.LibraryTestTrait
+import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration
+import software.amazon.awssdk.services.sqs.SqsAsyncClient
+import software.amazon.awssdk.services.sqs.SqsClient
+
+abstract class Aws2SqsSuppressReceiveSpansTest extends AbstractAws2SqsSuppressReceiveSpansTest implements LibraryTestTrait {
+  static AwsSdkTelemetry telemetry
+
+  def setupSpec() {
+    def telemetryBuilder = AwsSdkTelemetry.builder(getOpenTelemetry())
+      .setCaptureExperimentalSpanAttributes(true)
+    configure(telemetryBuilder)
+    telemetry = telemetryBuilder.build()
+  }
+
+  abstract void configure(AwsSdkTelemetryBuilder telemetryBuilder)
+
+  @Override
+  ClientOverrideConfiguration.Builder createOverrideConfigurationBuilder() {
+    return ClientOverrideConfiguration.builder()
+      .addExecutionInterceptor(
+        telemetry.newExecutionInterceptor())
+  }
+
+  @Override
+  SqsClient configureSqsClient(SqsClient sqsClient) {
+    return telemetry.wrap(sqsClient)
+  }
+
+  @Override
+  SqsAsyncClient configureSqsClient(SqsAsyncClient sqsClient) {
+    return telemetry.wrap(sqsClient)
+  }
+}
+
+class Aws2SqsSuppressReceiveSpansDefaultPropagatorTest extends Aws2SqsSuppressReceiveSpansTest {
+
+  @Override
+  void configure(AwsSdkTelemetryBuilder telemetryBuilder) {}
+
+  @Override
+  boolean isSqsAttributeInjectionEnabled() {
+    false
+  }
+
+  def "duplicate tracing interceptor"() {
+    setup:
+    def builder = SqsClient.builder()
+    configureSdkClient(builder)
+    def overrideConfiguration = ClientOverrideConfiguration.builder()
+      .addExecutionInterceptor(telemetry.newExecutionInterceptor())
+      .addExecutionInterceptor(telemetry.newExecutionInterceptor())
+      .build()
+    builder.overrideConfiguration(overrideConfiguration)
+    def client = configureSqsClient(builder.build())
+
+    client.createQueue(createQueueRequest)
+
+    when:
+    client.sendMessage(sendMessageRequest)
+
+    def resp = client.receiveMessage(receiveMessageRequest)
+
+    then:
+    resp.messages().size() == 1
+    resp.messages.each {message -> runWithSpan("process child") {}}
+    assertSqsTraces()
+  }
+}
+
+class Aws2SqsSuppressReceiveSpansW3CPropagatorTest extends Aws2SqsSuppressReceiveSpansTest {
+
+  @Override
+  void configure(AwsSdkTelemetryBuilder telemetryBuilder) {
+    telemetryBuilder.setUseConfiguredPropagatorForMessaging(isSqsAttributeInjectionEnabled()) // Difference to main test
+      .setUseXrayPropagator(isXrayInjectionEnabled()) // Disable to confirm messaging propagator actually works
+  }
+
+  @Override
+  boolean isSqsAttributeInjectionEnabled() {
+    true
+  }
+
+  @Override
+  boolean isXrayInjectionEnabled() {
+    false
+  }
+}
+
+/** We want to test the combination of W3C + Xray, as that's what you'll get in prod if you enable W3C. */
+class Aws2SqsSuppressReceiveSpansW3CPropagatorAndXrayPropagatorTest extends Aws2SqsSuppressReceiveSpansTest {
+
+  @Override
+  void configure(AwsSdkTelemetryBuilder telemetryBuilder) {
+    telemetryBuilder.setUseConfiguredPropagatorForMessaging(isSqsAttributeInjectionEnabled()) // Difference to main test
+  }
+
+  @Override
+  boolean isSqsAttributeInjectionEnabled() {
+    true
+  }
+}

+ 66 - 9
instrumentation/aws-sdk/aws-sdk-2.2/library/src/test/groovy/io/opentelemetry/instrumentation/awssdk/v2_2/Aws2SqsTracingTest.groovy

@@ -7,18 +7,44 @@ package io.opentelemetry.instrumentation.awssdk.v2_2
 
 import io.opentelemetry.instrumentation.test.LibraryTestTrait
 import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration
+import software.amazon.awssdk.services.sqs.SqsAsyncClient
 import software.amazon.awssdk.services.sqs.SqsClient
 
-class Aws2SqsTracingTest extends AbstractAws2SqsTracingTest implements LibraryTestTrait {
+abstract class Aws2SqsTracingTest extends AbstractAws2SqsTracingTest implements LibraryTestTrait {
+  static AwsSdkTelemetry telemetry
+
+  def setupSpec() {
+    def telemetryBuilder = AwsSdkTelemetry.builder(getOpenTelemetry())
+      .setCaptureExperimentalSpanAttributes(true)
+      .setMessagingReceiveInstrumentationEnabled(true)
+    configure(telemetryBuilder)
+    telemetry = telemetryBuilder.build()
+  }
+
+  abstract void configure(AwsSdkTelemetryBuilder telemetryBuilder)
+
   @Override
   ClientOverrideConfiguration.Builder createOverrideConfigurationBuilder() {
     return ClientOverrideConfiguration.builder()
       .addExecutionInterceptor(
-        AwsSdkTelemetry.builder(getOpenTelemetry())
-          .setCaptureExperimentalSpanAttributes(true)
-          .build()
-          .newExecutionInterceptor())
+        telemetry.newExecutionInterceptor())
+  }
+
+  @Override
+  SqsClient configureSqsClient(SqsClient sqsClient) {
+    return telemetry.wrap(sqsClient)
+  }
+
+  @Override
+  SqsAsyncClient configureSqsClient(SqsAsyncClient sqsClient) {
+    return telemetry.wrap(sqsClient)
   }
+}
+
+class Aws2SqsDefaultPropagatorTest extends Aws2SqsTracingTest {
+
+  @Override
+  void configure(AwsSdkTelemetryBuilder telemetryBuilder) {}
 
   @Override
   boolean isSqsAttributeInjectionEnabled() {
@@ -29,15 +55,12 @@ class Aws2SqsTracingTest extends AbstractAws2SqsTracingTest implements LibraryTe
     setup:
     def builder = SqsClient.builder()
     configureSdkClient(builder)
-    def telemetry = AwsSdkTelemetry.builder(getOpenTelemetry())
-      .setCaptureExperimentalSpanAttributes(true)
-      .build()
     def overrideConfiguration = ClientOverrideConfiguration.builder()
       .addExecutionInterceptor(telemetry.newExecutionInterceptor())
       .addExecutionInterceptor(telemetry.newExecutionInterceptor())
       .build()
     builder.overrideConfiguration(overrideConfiguration)
-    def client = builder.build()
+    def client = configureSqsClient(builder.build())
 
     client.createQueue(createQueueRequest)
 
@@ -48,6 +71,40 @@ class Aws2SqsTracingTest extends AbstractAws2SqsTracingTest implements LibraryTe
 
     then:
     resp.messages().size() == 1
+    resp.messages.each {message -> runWithSpan("process child") {}}
     assertSqsTraces()
   }
 }
+
+class Aws2SqsW3CPropagatorTest extends Aws2SqsTracingTest {
+
+  @Override
+  void configure(AwsSdkTelemetryBuilder telemetryBuilder) {
+    telemetryBuilder.setUseConfiguredPropagatorForMessaging(isSqsAttributeInjectionEnabled()) // Difference to main test
+      .setUseXrayPropagator(isXrayInjectionEnabled()) // Disable to confirm messaging propagator actually works
+  }
+
+  @Override
+  boolean isSqsAttributeInjectionEnabled() {
+    true
+  }
+
+  @Override
+  boolean isXrayInjectionEnabled() {
+    false
+  }
+}
+
+/** We want to test the combination of W3C + Xray, as that's what you'll get in prod if you enable W3C. */
+class Aws2SqsW3CPropagatorAndXrayPropagatorTest extends Aws2SqsTracingTest {
+
+  @Override
+  void configure(AwsSdkTelemetryBuilder telemetryBuilder) {
+    telemetryBuilder.setUseConfiguredPropagatorForMessaging(isSqsAttributeInjectionEnabled()) // Difference to main test
+  }
+
+  @Override
+  boolean isSqsAttributeInjectionEnabled() {
+    true
+  }
+}

+ 0 - 33
instrumentation/aws-sdk/aws-sdk-2.2/library/src/test/groovy/io/opentelemetry/instrumentation/awssdk/v2_2/Aws2SqsTracingTestWithW3CPropagator.groovy

@@ -1,33 +0,0 @@
-/*
- * Copyright The OpenTelemetry Authors
- * SPDX-License-Identifier: Apache-2.0
- */
-
-package io.opentelemetry.instrumentation.awssdk.v2_2
-
-import io.opentelemetry.instrumentation.test.LibraryTestTrait
-import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration
-
-class Aws2SqsTracingTestWithW3CPropagator extends AbstractAws2SqsTracingTest implements LibraryTestTrait {
-  @Override
-  ClientOverrideConfiguration.Builder createOverrideConfigurationBuilder() {
-    return ClientOverrideConfiguration.builder()
-      .addExecutionInterceptor(
-        AwsSdkTelemetry.builder(getOpenTelemetry())
-          .setCaptureExperimentalSpanAttributes(true)
-          .setUseConfiguredPropagatorForMessaging(isSqsAttributeInjectionEnabled()) // Difference to main test
-          .setUseXrayPropagator(isXrayInjectionEnabled()) // Disable to confirm messaging propagator actually works
-          .build()
-          .newExecutionInterceptor())
-  }
-
-  @Override
-  boolean isSqsAttributeInjectionEnabled() {
-    true
-  }
-
-  @Override
-  boolean isXrayInjectionEnabled() {
-    false
-  }
-}

+ 0 - 28
instrumentation/aws-sdk/aws-sdk-2.2/library/src/test/groovy/io/opentelemetry/instrumentation/awssdk/v2_2/Aws2SqsTracingTestWithW3CPropagatorAndXrayPropagator.groovy

@@ -1,28 +0,0 @@
-/*
- * Copyright The OpenTelemetry Authors
- * SPDX-License-Identifier: Apache-2.0
- */
-
-package io.opentelemetry.instrumentation.awssdk.v2_2
-
-import io.opentelemetry.instrumentation.test.LibraryTestTrait
-import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration
-
-/** We want to test the combination of W3C + Xray, as that's what you'll get in prod if you enable W3C. */
-class Aws2SqsTracingTestWithW3CPropagatorAndXrayPropagator extends AbstractAws2SqsTracingTest implements LibraryTestTrait {
-  @Override
-  ClientOverrideConfiguration.Builder createOverrideConfigurationBuilder() {
-    return ClientOverrideConfiguration.builder()
-      .addExecutionInterceptor(
-        AwsSdkTelemetry.builder(getOpenTelemetry())
-          .setCaptureExperimentalSpanAttributes(true)
-          .setUseConfiguredPropagatorForMessaging(isSqsAttributeInjectionEnabled()) // Difference to main test
-          .build()
-          .newExecutionInterceptor())
-  }
-
-  @Override
-  boolean isSqsAttributeInjectionEnabled() {
-    true
-  }
-}

+ 2 - 0
instrumentation/aws-sdk/aws-sdk-2.2/library/src/testCoreOnly/groovy/io/opentelemetry/instrumentation/awssdk/v2_2/Aws2ClientDynamodbTest.groovy

@@ -5,9 +5,11 @@
 
 package io.opentelemetry.instrumentation.awssdk.v2_2
 
+import groovy.transform.CompileStatic
 import io.opentelemetry.instrumentation.test.LibraryTestTrait
 import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration
 
+@CompileStatic
 class Aws2ClientDynamodbTest extends AbstractAws2ClientCoreTest implements LibraryTestTrait {
   @Override
   ClientOverrideConfiguration.Builder createOverrideConfigurationBuilder() {

+ 394 - 0
instrumentation/aws-sdk/aws-sdk-2.2/testing/src/main/groovy/io/opentelemetry/instrumentation/awssdk/v2_2/AbstractAws2SqsSuppressReceiveSpansTest.groovy

@@ -0,0 +1,394 @@
+/*
+ * Copyright The OpenTelemetry Authors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package io.opentelemetry.instrumentation.awssdk.v2_2
+
+import io.opentelemetry.instrumentation.test.InstrumentationSpecification
+import io.opentelemetry.semconv.SemanticAttributes
+import org.elasticmq.rest.sqs.SQSRestServerBuilder
+import software.amazon.awssdk.auth.credentials.AwsBasicCredentials
+import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider
+import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration
+import software.amazon.awssdk.regions.Region
+import software.amazon.awssdk.services.sqs.SqsAsyncClient
+import software.amazon.awssdk.services.sqs.SqsBaseClientBuilder
+import software.amazon.awssdk.services.sqs.SqsClient
+import software.amazon.awssdk.services.sqs.model.CreateQueueRequest
+import software.amazon.awssdk.services.sqs.model.MessageAttributeValue
+import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest
+import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequest
+import software.amazon.awssdk.services.sqs.model.SendMessageRequest
+import spock.lang.Shared
+
+import static io.opentelemetry.api.trace.SpanKind.CLIENT
+import static io.opentelemetry.api.trace.SpanKind.CONSUMER
+import static io.opentelemetry.api.trace.SpanKind.PRODUCER
+
+abstract class AbstractAws2SqsSuppressReceiveSpansTest extends InstrumentationSpecification {
+
+  private static final StaticCredentialsProvider CREDENTIALS_PROVIDER = StaticCredentialsProvider
+    .create(AwsBasicCredentials.create("my-access-key", "my-secret-key"))
+
+  @Shared
+  def sqs
+
+  @Shared
+  int sqsPort
+
+  static Map<String, MessageAttributeValue> dummyMessageAttributes(count) {
+    (0..<count).collectEntries {
+      [
+          "a$it".toString(),
+          MessageAttributeValue.builder().stringValue("v$it").dataType("String").build()]
+    }
+  }
+
+  String queueUrl = "http://localhost:$sqsPort/000000000000/testSdkSqs"
+
+  ReceiveMessageRequest receiveMessageRequest = ReceiveMessageRequest.builder()
+      .queueUrl(queueUrl)
+      .build()
+
+  ReceiveMessageRequest receiveMessageBatchRequest = ReceiveMessageRequest.builder()
+      .queueUrl(queueUrl)
+      .maxNumberOfMessages(3)
+      .messageAttributeNames("All")
+      .waitTimeSeconds(5)
+      .build()
+
+  CreateQueueRequest createQueueRequest = CreateQueueRequest.builder()
+      .queueName("testSdkSqs")
+      .build()
+
+  SendMessageRequest sendMessageRequest = SendMessageRequest.builder()
+      .queueUrl(queueUrl)
+      .messageBody("{\"type\": \"hello\"}")
+      .build()
+
+  SendMessageBatchRequest sendMessageBatchRequest = SendMessageBatchRequest.builder()
+      .queueUrl(queueUrl)
+      .entries(
+          e -> e.messageBody("e1").id("i1"),
+          // 8 attributes, injection always possible
+      e -> e.messageBody("e2").id("i2")
+          .messageAttributes(dummyMessageAttributes(8)),
+          // 10 attributes, injection with custom propagator never possible
+      e -> e.messageBody("e3").id("i3").messageAttributes(dummyMessageAttributes(10)))
+      .build()
+
+  boolean isSqsAttributeInjectionEnabled() {
+    AbstractAws2ClientCoreTest.isSqsAttributeInjectionEnabled()
+  }
+
+  boolean isXrayInjectionEnabled() {
+    true
+  }
+
+  void configureSdkClient(SqsBaseClientBuilder builder) {
+    builder
+      .overrideConfiguration(createOverrideConfigurationBuilder().build())
+      .endpointOverride(new URI("http://localhost:" + sqsPort))
+    builder
+        .region(Region.AP_NORTHEAST_1)
+        .credentialsProvider(CREDENTIALS_PROVIDER)
+  }
+
+  abstract SqsClient configureSqsClient(SqsClient sqsClient)
+
+  abstract SqsAsyncClient configureSqsClient(SqsAsyncClient sqsClient)
+
+  abstract ClientOverrideConfiguration.Builder createOverrideConfigurationBuilder()
+
+  def setupSpec() {
+    sqs = SQSRestServerBuilder.withPort(0).withInterface("localhost").start()
+    def server = sqs.waitUntilStarted()
+    sqsPort = server.localAddress().port
+    println getClass().name + " SQS server started at: localhost:$sqsPort/"
+  }
+
+  def cleanupSpec() {
+    if (sqs != null) {
+      sqs.stopAndWait()
+    }
+  }
+
+  void assertSqsTraces(withParent = false) {
+    assertTraces(2 + (withParent ? 1 : 0)) {
+      trace(0, 1) {
+
+        span(0) {
+          name "Sqs.CreateQueue"
+          kind CLIENT
+          hasNoParent()
+          attributes {
+            "aws.agent" "java-aws-sdk"
+            "aws.queue.name" "testSdkSqs"
+            "aws.requestId" "00000000-0000-0000-0000-000000000000"
+            "rpc.system" "aws-api"
+            "rpc.service" "Sqs"
+            "rpc.method" "CreateQueue"
+            "http.method" "POST"
+            "http.status_code" 200
+            "http.url" { it.startsWith("http://localhost:$sqsPort") }
+            "net.peer.name" "localhost"
+            "net.peer.port" sqsPort
+            "$SemanticAttributes.HTTP_REQUEST_CONTENT_LENGTH" { it == null || it instanceof Long }
+            "$SemanticAttributes.HTTP_RESPONSE_CONTENT_LENGTH" { it == null || it instanceof Long }
+          }
+        }
+      }
+      trace(1, 3) {
+        span(0) {
+          name "testSdkSqs publish"
+          kind PRODUCER
+          hasNoParent()
+          attributes {
+            "aws.agent" "java-aws-sdk"
+            "aws.queue.url" "http://localhost:$sqsPort/000000000000/testSdkSqs"
+            "aws.requestId" "00000000-0000-0000-0000-000000000000"
+            "rpc.system" "aws-api"
+            "rpc.method" "SendMessage"
+            "rpc.service" "Sqs"
+            "http.method" "POST"
+            "http.status_code" 200
+            "http.url" { it.startsWith("http://localhost:$sqsPort") }
+            "net.peer.name" "localhost"
+            "net.peer.port" sqsPort
+            "$SemanticAttributes.MESSAGING_SYSTEM" "AmazonSQS"
+            "$SemanticAttributes.MESSAGING_DESTINATION_NAME" "testSdkSqs"
+            "$SemanticAttributes.MESSAGING_OPERATION" "publish"
+            "$SemanticAttributes.MESSAGING_MESSAGE_ID" String
+            "$SemanticAttributes.HTTP_REQUEST_CONTENT_LENGTH" { it == null || it instanceof Long }
+            "$SemanticAttributes.HTTP_RESPONSE_CONTENT_LENGTH" { it == null || it instanceof Long }
+          }
+        }
+        span(1) {
+          name "testSdkSqs process"
+          kind CONSUMER
+          childOf span(0)
+          hasNoLinks()
+          attributes {
+            "aws.agent" "java-aws-sdk"
+            "rpc.method" "ReceiveMessage"
+            "rpc.system" "aws-api"
+            "rpc.service" "Sqs"
+            "http.method" "POST"
+            "http.url" { it.startsWith("http://localhost:$sqsPort") }
+            "net.peer.name" "localhost"
+            "net.peer.port" sqsPort
+            "$SemanticAttributes.MESSAGING_SYSTEM" "AmazonSQS"
+            "$SemanticAttributes.MESSAGING_DESTINATION_NAME" "testSdkSqs"
+            "$SemanticAttributes.MESSAGING_OPERATION" "process"
+            "$SemanticAttributes.HTTP_REQUEST_CONTENT_LENGTH" { it == null || it instanceof Long }
+          }
+        }
+        span(2) {
+          name "process child"
+          childOf span(1)
+          attributes {
+          }
+        }
+      }
+      if (withParent) {
+        /**
+         * This span represents HTTP "sending of receive message" operation. It's always single, while there can be multiple CONSUMER spans (one per consumed message).
+         * This one could be suppressed (by IF in TracingRequestHandler#beforeRequest but then HTTP instrumentation span would appear
+         */
+        trace(2, 2) {
+          span(0) {
+            name "parent"
+            hasNoParent()
+          }
+          span(1) {
+            name "Sqs.ReceiveMessage"
+            kind CLIENT
+            childOf span(0)
+            hasNoLinks()
+            attributes {
+              "aws.agent" "java-aws-sdk"
+              "aws.requestId" "00000000-0000-0000-0000-000000000000"
+              "rpc.method" "ReceiveMessage"
+              "aws.queue.url" "http://localhost:$sqsPort/000000000000/testSdkSqs"
+              "rpc.system" "aws-api"
+              "rpc.service" "Sqs"
+              "http.method" "POST"
+              "http.status_code" 200
+              "http.url" { it.startsWith("http://localhost:$sqsPort") }
+              "net.peer.name" "localhost"
+              "net.peer.port" sqsPort
+              "$SemanticAttributes.HTTP_REQUEST_CONTENT_LENGTH" { it == null || it instanceof Long }
+              "$SemanticAttributes.HTTP_RESPONSE_CONTENT_LENGTH" { it == null || it instanceof Long }
+            }
+          }
+        }
+      }
+    }
+  }
+
+  def "simple sqs producer-consumer services: sync"() {
+    setup:
+    def builder = SqsClient.builder()
+    configureSdkClient(builder)
+    def client = configureSqsClient(builder.build())
+
+    client.createQueue(createQueueRequest)
+
+    when:
+    client.sendMessage(sendMessageRequest)
+
+    def resp = client.receiveMessage(receiveMessageRequest)
+
+    then:
+    resp.messages.size() == 1
+    resp.messages.each {message -> runWithSpan("process child") {}}
+    assertSqsTraces()
+  }
+
+  def "simple sqs producer-consumer services with parent: sync"() {
+    setup:
+    def builder = SqsClient.builder()
+    configureSdkClient(builder)
+    def client = configureSqsClient(builder.build())
+
+    client.createQueue(createQueueRequest)
+
+    when:
+    client.sendMessage(sendMessageRequest)
+
+    def resp = runWithSpan("parent") {
+      client.receiveMessage(receiveMessageRequest)
+    }
+
+    then:
+    resp.messages.size() == 1
+    resp.messages.each {message -> runWithSpan("process child") {}}
+    assertSqsTraces(true)
+  }
+
+  def "simple sqs producer-consumer services: async"() {
+    setup:
+    def builder = SqsAsyncClient.builder()
+    configureSdkClient(builder)
+    def client = configureSqsClient(builder.build())
+
+    client.createQueue(createQueueRequest).get()
+
+    when:
+    client.sendMessage(sendMessageRequest).get()
+
+    def resp = client.receiveMessage(receiveMessageRequest).get()
+
+    then:
+    resp.messages.size() == 1
+    resp.messages.each {message -> runWithSpan("process child") {}}
+    assertSqsTraces()
+  }
+
+  def "batch sqs producer-consumer services: sync"() {
+    setup:
+    def builder = SqsClient.builder()
+    configureSdkClient(builder)
+    def client = configureSqsClient(builder.build())
+
+    client.createQueue(createQueueRequest)
+
+    when:
+    client.sendMessageBatch(sendMessageBatchRequest)
+
+    def resp = client.receiveMessage(receiveMessageBatchRequest)
+    def totalAttrs = resp.messages().sum {it.messageAttributes().size() }
+
+    then:
+    resp.messages().size() == 3
+
+    // +2: 3 messages, 2x traceparent, 1x not injected due to too many attrs
+    totalAttrs == 18 + (sqsAttributeInjectionEnabled ? 2 : 0)
+
+    assertTraces(xrayInjectionEnabled ? 2 : 3) {
+      trace(0, 1) {
+
+        span(0) {
+          name "Sqs.CreateQueue"
+          kind CLIENT
+        }
+      }
+      trace(1, xrayInjectionEnabled ? 4 : 3) {
+        span(0) {
+          name "testSdkSqs publish"
+          kind PRODUCER
+          hasNoParent()
+          attributes {
+            "aws.agent" "java-aws-sdk"
+            "aws.queue.url" "http://localhost:$sqsPort/000000000000/testSdkSqs"
+            "aws.requestId" "00000000-0000-0000-0000-000000000000"
+            "rpc.system" "aws-api"
+            "rpc.method" "SendMessageBatch"
+            "rpc.service" "Sqs"
+            "http.method" "POST"
+            "http.status_code" 200
+            "http.url" { it.startsWith("http://localhost:$sqsPort") }
+            "net.peer.name" "localhost"
+            "net.peer.port" sqsPort
+            "$SemanticAttributes.MESSAGING_SYSTEM" "AmazonSQS"
+            "$SemanticAttributes.MESSAGING_DESTINATION_NAME" "testSdkSqs"
+            "$SemanticAttributes.MESSAGING_OPERATION" "publish"
+            "$SemanticAttributes.HTTP_REQUEST_CONTENT_LENGTH" { it == null || it instanceof Long }
+            "$SemanticAttributes.HTTP_RESPONSE_CONTENT_LENGTH" { it == null || it instanceof Long }
+          }
+        }
+        for (int i: 1..(xrayInjectionEnabled ? 3 : 2)) {
+          span(i) {
+            name "testSdkSqs process"
+            kind CONSUMER
+            childOf span(0)
+            hasNoLinks()
+
+            attributes {
+              "aws.agent" "java-aws-sdk"
+              "rpc.method" "ReceiveMessage"
+              "rpc.system" "aws-api"
+              "rpc.service" "Sqs"
+              "http.method" "POST"
+              "http.url" { it.startsWith("http://localhost:$sqsPort") }
+              "net.peer.name" "localhost"
+              "net.peer.port" sqsPort
+              "$SemanticAttributes.MESSAGING_SYSTEM" "AmazonSQS"
+              "$SemanticAttributes.MESSAGING_DESTINATION_NAME" "testSdkSqs"
+              "$SemanticAttributes.MESSAGING_OPERATION" "process"
+              "$SemanticAttributes.HTTP_REQUEST_CONTENT_LENGTH" { it == null || it instanceof Long }
+            }
+          }
+        }
+      }
+      if (!xrayInjectionEnabled) {
+        trace(2, 1) {
+          span(0) {
+            name "testSdkSqs process"
+            kind CONSUMER
+
+            // TODO This is not nice at all, and can also happen if producer is not instrumented
+            hasNoParent()
+            hasNoLinks()
+
+            attributes {
+              "aws.agent" "java-aws-sdk"
+              "rpc.method" "ReceiveMessage"
+              "rpc.system" "aws-api"
+              "rpc.service" "Sqs"
+              "http.method" "POST"
+              "http.url" { it.startsWith("http://localhost:$sqsPort") }
+              "net.peer.name" "localhost"
+              "net.peer.port" sqsPort
+              "$SemanticAttributes.MESSAGING_SYSTEM" "AmazonSQS"
+              "$SemanticAttributes.MESSAGING_DESTINATION_NAME" "testSdkSqs"
+              "$SemanticAttributes.MESSAGING_OPERATION" "process"
+              "$SemanticAttributes.HTTP_REQUEST_CONTENT_LENGTH" { it == null || it instanceof Long }
+            }
+          }
+        }
+      }
+    }
+  }
+}

+ 124 - 75
instrumentation/aws-sdk/aws-sdk-2.2/testing/src/main/groovy/io/opentelemetry/instrumentation/awssdk/v2_2/AbstractAws2SqsTracingTest.groovy

@@ -6,6 +6,7 @@
 package io.opentelemetry.instrumentation.awssdk.v2_2
 
 import io.opentelemetry.instrumentation.test.InstrumentationSpecification
+import io.opentelemetry.sdk.trace.data.SpanData
 import io.opentelemetry.semconv.SemanticAttributes
 import org.elasticmq.rest.sqs.SQSRestServerBuilder
 import software.amazon.awssdk.auth.credentials.AwsBasicCredentials
@@ -95,6 +96,10 @@ abstract class AbstractAws2SqsTracingTest extends InstrumentationSpecification {
         .credentialsProvider(CREDENTIALS_PROVIDER)
   }
 
+  abstract SqsClient configureSqsClient(SqsClient sqsClient)
+
+  abstract SqsAsyncClient configureSqsClient(SqsAsyncClient sqsClient)
+
   abstract ClientOverrideConfiguration.Builder createOverrideConfigurationBuilder()
 
   def setupSpec() {
@@ -111,7 +116,8 @@ abstract class AbstractAws2SqsTracingTest extends InstrumentationSpecification {
   }
 
   void assertSqsTraces(withParent = false) {
-    assertTraces(2 + (withParent ? 1 : 0)) {
+    assertTraces(3) {
+      SpanData publishSpan
       trace(0, 1) {
 
         span(0) {
@@ -135,7 +141,7 @@ abstract class AbstractAws2SqsTracingTest extends InstrumentationSpecification {
           }
         }
       }
-      trace(1, 2) {
+      trace(1, 1) {
         span(0) {
           name "testSdkSqs publish"
           kind PRODUCER
@@ -160,39 +166,19 @@ abstract class AbstractAws2SqsTracingTest extends InstrumentationSpecification {
             "$SemanticAttributes.HTTP_RESPONSE_CONTENT_LENGTH" { it == null || it instanceof Long }
           }
         }
-        span(1) {
-          name "testSdkSqs receive"
-          kind CONSUMER
-          childOf span(0)
-          hasNoLinks() // TODO: Link to receive operation?
-          attributes {
-            "aws.agent" "java-aws-sdk"
-            "rpc.method" "ReceiveMessage"
-            "rpc.system" "aws-api"
-            "rpc.service" "Sqs"
-            "http.method" "POST"
-            "http.status_code" 200
-            "http.url" { it.startsWith("http://localhost:$sqsPort") }
-            "net.peer.name" "localhost"
-            "net.peer.port" sqsPort
-            "$SemanticAttributes.MESSAGING_SYSTEM" "AmazonSQS"
-            "$SemanticAttributes.MESSAGING_DESTINATION_NAME" "testSdkSqs"
-            "$SemanticAttributes.MESSAGING_OPERATION" "receive"
-            "$SemanticAttributes.HTTP_REQUEST_CONTENT_LENGTH" { it == null || it instanceof Long }
-            "$SemanticAttributes.HTTP_RESPONSE_CONTENT_LENGTH" { it == null || it instanceof Long }
-          }
-        }
+        publishSpan = span(0)
       }
-      if (withParent) {
-        /**
-         * This span represents HTTP "sending of receive message" operation. It's always single, while there can be multiple CONSUMER spans (one per consumed message).
-         * This one could be suppressed (by IF in TracingRequestHandler#beforeRequest but then HTTP instrumentation span would appear
-         */
-        trace(2, 2) {
+      def offset = withParent ? 2 : 0
+      trace(2, 3 + offset) {
+        if (withParent) {
           span(0) {
             name "parent"
             hasNoParent()
           }
+          /**
+           * This span represents HTTP "sending of receive message" operation. It's always single, while there can be multiple CONSUMER spans (one per consumed message).
+           * This one could be suppressed (by IF in TracingRequestHandler#beforeRequest but then HTTP instrumentation span would appear
+           */
           span(1) {
             name "Sqs.ReceiveMessage"
             kind CLIENT
@@ -215,6 +201,58 @@ abstract class AbstractAws2SqsTracingTest extends InstrumentationSpecification {
             }
           }
         }
+        span(0 + offset) {
+          name "testSdkSqs receive"
+          kind CONSUMER
+          if (withParent) {
+           childOf span(0)
+          } else {
+            hasNoParent()
+          }
+          hasNoLinks()
+          attributes {
+            "aws.agent" "java-aws-sdk"
+            "rpc.method" "ReceiveMessage"
+            "rpc.system" "aws-api"
+            "rpc.service" "Sqs"
+            "http.method" "POST"
+            "http.status_code" 200
+            "http.url" { it.startsWith("http://localhost:$sqsPort") }
+            "net.peer.name" "localhost"
+            "net.peer.port" sqsPort
+            "$SemanticAttributes.MESSAGING_SYSTEM" "AmazonSQS"
+            "$SemanticAttributes.MESSAGING_DESTINATION_NAME" "testSdkSqs"
+            "$SemanticAttributes.MESSAGING_OPERATION" "receive"
+            "$SemanticAttributes.HTTP_REQUEST_CONTENT_LENGTH" { it == null || it instanceof Long }
+            "$SemanticAttributes.HTTP_RESPONSE_CONTENT_LENGTH" { it == null || it instanceof Long }
+          }
+        }
+        span(1 + offset) {
+          name "testSdkSqs process"
+          kind CONSUMER
+          childOf span(0 + offset)
+          hasLink(publishSpan)
+          attributes {
+            "aws.agent" "java-aws-sdk"
+            "rpc.method" "ReceiveMessage"
+            "rpc.system" "aws-api"
+            "rpc.service" "Sqs"
+            "http.method" "POST"
+            "http.url" { it.startsWith("http://localhost:$sqsPort") }
+            "net.peer.name" "localhost"
+            "net.peer.port" sqsPort
+            "$SemanticAttributes.MESSAGING_SYSTEM" "AmazonSQS"
+            "$SemanticAttributes.MESSAGING_DESTINATION_NAME" "testSdkSqs"
+            "$SemanticAttributes.MESSAGING_OPERATION" "process"
+            "$SemanticAttributes.HTTP_REQUEST_CONTENT_LENGTH" { it == null || it instanceof Long }
+          }
+        }
+        span(2 + offset) {
+          name "process child"
+          childOf span(1 + offset)
+          attributes {
+          }
+        }
       }
     }
   }
@@ -223,7 +261,7 @@ abstract class AbstractAws2SqsTracingTest extends InstrumentationSpecification {
     setup:
     def builder = SqsClient.builder()
     configureSdkClient(builder)
-    def client = builder.build()
+    def client = configureSqsClient(builder.build())
 
     client.createQueue(createQueueRequest)
 
@@ -233,7 +271,8 @@ abstract class AbstractAws2SqsTracingTest extends InstrumentationSpecification {
     def resp = client.receiveMessage(receiveMessageRequest)
 
     then:
-    resp.messages().size() == 1
+    resp.messages.size() == 1
+    resp.messages.each {message -> runWithSpan("process child") {}}
     assertSqsTraces()
   }
 
@@ -241,7 +280,7 @@ abstract class AbstractAws2SqsTracingTest extends InstrumentationSpecification {
     setup:
     def builder = SqsClient.builder()
     configureSdkClient(builder)
-    def client = builder.build()
+    def client = configureSqsClient(builder.build())
 
     client.createQueue(createQueueRequest)
 
@@ -253,7 +292,8 @@ abstract class AbstractAws2SqsTracingTest extends InstrumentationSpecification {
     }
 
     then:
-    resp.messages().size() == 1
+    resp.messages.size() == 1
+    resp.messages.each {message -> runWithSpan("process child") {}}
     assertSqsTraces(true)
   }
 
@@ -261,7 +301,7 @@ abstract class AbstractAws2SqsTracingTest extends InstrumentationSpecification {
     setup:
     def builder = SqsAsyncClient.builder()
     configureSdkClient(builder)
-    def client = builder.build()
+    def client = configureSqsClient(builder.build())
 
     client.createQueue(createQueueRequest).get()
 
@@ -271,7 +311,8 @@ abstract class AbstractAws2SqsTracingTest extends InstrumentationSpecification {
     def resp = client.receiveMessage(receiveMessageRequest).get()
 
     then:
-    resp.messages().size() == 1
+    resp.messages.size() == 1
+    resp.messages.each {message -> runWithSpan("process child") {}}
     assertSqsTraces()
   }
 
@@ -279,7 +320,7 @@ abstract class AbstractAws2SqsTracingTest extends InstrumentationSpecification {
     setup:
     def builder = SqsClient.builder()
     configureSdkClient(builder)
-    def client = builder.build()
+    def client = configureSqsClient(builder.build())
 
     client.createQueue(createQueueRequest)
 
@@ -287,15 +328,17 @@ abstract class AbstractAws2SqsTracingTest extends InstrumentationSpecification {
     client.sendMessageBatch(sendMessageBatchRequest)
 
     def resp = client.receiveMessage(receiveMessageBatchRequest)
-    def totalAttrs = resp.messages().sum {it.messageAttributes().size() }
+    resp.messages.each {message -> runWithSpan("process child") {}}
+    def totalAttrs = resp.messages.sum {it.messageAttributes().size() }
 
     then:
-    resp.messages().size() == 3
+    resp.messages.size() == 3
 
     // +2: 3 messages, 2x traceparent, 1x not injected due to too many attrs
     totalAttrs == 18 + (sqsAttributeInjectionEnabled ? 2 : 0)
 
-    assertTraces(xrayInjectionEnabled ? 2 : 3) {
+    assertTraces(3) {
+      def publishSpan
       trace(0, 1) {
 
         span(0) {
@@ -303,7 +346,7 @@ abstract class AbstractAws2SqsTracingTest extends InstrumentationSpecification {
           kind CLIENT
         }
       }
-      trace(1, xrayInjectionEnabled ? 4 : 3) {
+      trace(1, 1) {
         span(0) {
           name "testSdkSqs publish"
           kind PRODUCER
@@ -327,57 +370,63 @@ abstract class AbstractAws2SqsTracingTest extends InstrumentationSpecification {
             "$SemanticAttributes.HTTP_RESPONSE_CONTENT_LENGTH" { it == null || it instanceof Long }
           }
         }
-        for (int i: 1..(xrayInjectionEnabled ? 3 : 2)) {
-          span(i) {
-            name "testSdkSqs receive"
+        publishSpan = span(0)
+      }
+      trace(2, 1 + 2 * 3) {
+        span(0) {
+          name "testSdkSqs receive"
+          kind CONSUMER
+          hasNoParent()
+          hasNoLinks()
+
+          attributes {
+            "aws.agent" "java-aws-sdk"
+            "rpc.method" "ReceiveMessage"
+            "rpc.system" "aws-api"
+            "rpc.service" "Sqs"
+            "http.method" "POST"
+            "http.status_code" 200
+            "http.url" { it.startsWith("http://localhost:$sqsPort") }
+            "net.peer.name" "localhost"
+            "net.peer.port" sqsPort
+            "$SemanticAttributes.MESSAGING_SYSTEM" "AmazonSQS"
+            "$SemanticAttributes.MESSAGING_DESTINATION_NAME" "testSdkSqs"
+            "$SemanticAttributes.MESSAGING_OPERATION" "receive"
+            "$SemanticAttributes.HTTP_REQUEST_CONTENT_LENGTH" { it == null || it instanceof Long }
+            "$SemanticAttributes.HTTP_RESPONSE_CONTENT_LENGTH" { it == null || it instanceof Long }
+          }
+        }
+        for (int i: 0..2) {
+          span(1 + 2*i) {
+            name "testSdkSqs process"
             kind CONSUMER
             childOf span(0)
-            hasNoLinks() // TODO: Link to receive operation?
-
+            if (!xrayInjectionEnabled && i == 2) {
+              // last message in batch has too many attributes so injecting tracing header is not
+              // possible
+              hasNoLinks()
+            } else {
+              hasLink(publishSpan)
+            }
             attributes {
               "aws.agent" "java-aws-sdk"
               "rpc.method" "ReceiveMessage"
               "rpc.system" "aws-api"
               "rpc.service" "Sqs"
               "http.method" "POST"
-              "http.status_code" 200
               "http.url" { it.startsWith("http://localhost:$sqsPort") }
               "net.peer.name" "localhost"
               "net.peer.port" sqsPort
               "$SemanticAttributes.MESSAGING_SYSTEM" "AmazonSQS"
               "$SemanticAttributes.MESSAGING_DESTINATION_NAME" "testSdkSqs"
-              "$SemanticAttributes.MESSAGING_OPERATION" "receive"
+              "$SemanticAttributes.MESSAGING_OPERATION" "process"
               "$SemanticAttributes.HTTP_REQUEST_CONTENT_LENGTH" { it == null || it instanceof Long }
-              "$SemanticAttributes.HTTP_RESPONSE_CONTENT_LENGTH" { it == null || it instanceof Long }
             }
           }
-        }
-      }
-      if (!xrayInjectionEnabled) {
-        trace(2, 1) {
-          span(0) {
-            name "testSdkSqs receive"
-            kind CONSUMER
-
-            // TODO This is not nice at all, and can also happen if producer is not instrumented
-            hasNoParent()
-            hasNoLinks() // TODO: Link to receive operation?
-
+          span(1 + 2*i + 1) {
+            name "process child"
+            childOf span(1 + 2*i)
             attributes {
-              "aws.agent" "java-aws-sdk"
-              "rpc.method" "ReceiveMessage"
-              "rpc.system" "aws-api"
-              "rpc.service" "Sqs"
-              "http.method" "POST"
-              "http.status_code" 200
-              "http.url" { it.startsWith("http://localhost:$sqsPort") }
-              "net.peer.name" "localhost"
-              "net.peer.port" sqsPort
-              "$SemanticAttributes.MESSAGING_SYSTEM" "AmazonSQS"
-              "$SemanticAttributes.MESSAGING_DESTINATION_NAME" "testSdkSqs"
-              "$SemanticAttributes.MESSAGING_OPERATION" "receive"
-              "$SemanticAttributes.HTTP_REQUEST_CONTENT_LENGTH" { it == null || it instanceof Long }
-              "$SemanticAttributes.HTTP_RESPONSE_CONTENT_LENGTH" { it == null || it instanceof Long }
             }
           }
         }