Browse Source

Implement capturing message headers for aws2 sqs spans (#9842)

Lauri Tulmin 1 year ago
parent
commit
a6be8ce0c4
19 changed files with 406 additions and 113 deletions
  1. 3 0
      instrumentation/aws-sdk/aws-sdk-2.2/library-autoconfigure/build.gradle.kts
  2. 58 18
      instrumentation/aws-sdk/aws-sdk-2.2/library-autoconfigure/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/autoconfigure/AwsSdkSingletons.java
  3. 13 0
      instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/AbstractSqsRequest.java
  4. 94 69
      instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/AwsSdkInstrumenterFactory.java
  5. 12 14
      instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/AwsSdkTelemetry.java
  6. 16 0
      instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/AwsSdkTelemetryBuilder.java
  7. 5 0
      instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/SqsAccess.java
  8. 3 2
      instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/SqsAttributesGetter.java
  9. 13 4
      instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/SqsImpl.java
  10. 4 0
      instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/SqsMessage.java
  11. 21 0
      instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/SqsMessageImpl.java
  12. 2 1
      instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/SqsProcessRequest.java
  13. 9 1
      instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/SqsProcessRequestAttributesGetter.java
  14. 32 0
      instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/SqsReceiveRequest.java
  15. 74 0
      instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/SqsReceiveRequestAttributesGetter.java
  16. 3 3
      instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/TracingExecutionInterceptor.java
  17. 4 0
      instrumentation/aws-sdk/aws-sdk-2.2/library/src/test/groovy/io/opentelemetry/instrumentation/awssdk/v2_2/Aws2SqsTracingTest.groovy
  18. 3 0
      instrumentation/aws-sdk/aws-sdk-2.2/testing/src/main/groovy/io/opentelemetry/instrumentation/awssdk/v2_2/AbstractAws2SqsSuppressReceiveSpansTest.groovy
  19. 37 1
      instrumentation/aws-sdk/aws-sdk-2.2/testing/src/main/groovy/io/opentelemetry/instrumentation/awssdk/v2_2/AbstractAws2SqsTracingTest.groovy

+ 3 - 0
instrumentation/aws-sdk/aws-sdk-2.2/library-autoconfigure/build.gradle.kts

@@ -5,6 +5,8 @@ plugins {
 base.archivesName.set("${base.archivesName.get()}-autoconfigure")
 
 dependencies {
+  compileOnly(project(":javaagent-extension-api"))
+
   implementation(project(":instrumentation:aws-sdk:aws-sdk-2.2:library"))
 
   library("software.amazon.awssdk:aws-core:2.2.0")
@@ -29,5 +31,6 @@ tasks {
     systemProperty("otel.instrumentation.aws-sdk.experimental-span-attributes", true)
     systemProperty("otel.instrumentation.aws-sdk.experimental-use-propagator-for-messaging", true)
     systemProperty("otel.instrumentation.aws-sdk.experimental-record-individual-http-error", true)
+    systemProperty("otel.instrumentation.messaging.experimental.capture-headers", "test-message-header")
   }
 }

+ 58 - 18
instrumentation/aws-sdk/aws-sdk-2.2/library-autoconfigure/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/autoconfigure/AwsSdkSingletons.java

@@ -5,35 +5,75 @@
 
 package io.opentelemetry.instrumentation.awssdk.v2_2.autoconfigure;
 
+import static java.util.Collections.emptyList;
+
 import io.opentelemetry.api.GlobalOpenTelemetry;
 import io.opentelemetry.instrumentation.api.internal.ConfigPropertiesUtil;
 import io.opentelemetry.instrumentation.awssdk.v2_2.AwsSdkTelemetry;
+import io.opentelemetry.javaagent.bootstrap.internal.ExperimentalConfig;
+import io.opentelemetry.javaagent.bootstrap.internal.InstrumentationConfig;
+import java.util.List;
 
 public final class AwsSdkSingletons {
 
-  private static final boolean CAPTURE_EXPERIMENTAL_SPAN_ATTRIBUTES =
-      ConfigPropertiesUtil.getBoolean(
-          "otel.instrumentation.aws-sdk.experimental-span-attributes", false);
+  private static final boolean HAS_INSTRUMENTATION_CONFIG = hasAgentConfiguration();
+  private static final AwsSdkTelemetry TELEMETRY =
+      AwsSdkTelemetry.builder(GlobalOpenTelemetry.get())
+          .setCapturedHeaders(getCapturedHeaders())
+          .setCaptureExperimentalSpanAttributes(captureExperimentalSpanAttributes())
+          .setMessagingReceiveInstrumentationEnabled(messagingReceiveInstrumentationEnabled())
+          .setUseConfiguredPropagatorForMessaging(useMessagingPropagator())
+          .setRecordIndividualHttpError(recordIndividualHttpError())
+          .build();
 
-  private static final boolean USE_MESSAGING_PROPAGATOR =
-      ConfigPropertiesUtil.getBoolean(
-          "otel.instrumentation.aws-sdk.experimental-use-propagator-for-messaging", false);
+  private static boolean hasAgentConfiguration() {
+    try {
+      Class.forName("io.opentelemetry.javaagent.bootstrap.internal.InstrumentationConfig");
+      return true;
+    } catch (ClassNotFoundException e) {
+      return false;
+    }
+  }
 
-  private static final boolean RECORD_INDIVIDUAL_HTTP_ERROR =
-      ConfigPropertiesUtil.getBoolean(
-          "otel.instrumentation.aws-sdk.experimental-record-individual-http-error", false);
+  private static List<String> getCapturedHeaders() {
+    if (HAS_INSTRUMENTATION_CONFIG) {
+      return ExperimentalConfig.get().getMessagingHeaders();
+    } else {
+      return ConfigPropertiesUtil.getList(
+          "otel.instrumentation.messaging.experimental.capture-headers", emptyList());
+    }
+  }
+
+  private static boolean captureExperimentalSpanAttributes() {
+    return getBoolean("otel.instrumentation.aws-sdk.experimental-span-attributes", false);
+  }
 
-  private static final boolean RECEIVE_TELEMETRY_ENABLED =
-      ConfigPropertiesUtil.getBoolean(
+  private static boolean messagingReceiveInstrumentationEnabled() {
+    if (HAS_INSTRUMENTATION_CONFIG) {
+      return ExperimentalConfig.get().messagingReceiveInstrumentationEnabled();
+    } else {
+      return ConfigPropertiesUtil.getBoolean(
           "otel.instrumentation.messaging.experimental.receive-telemetry.enabled", false);
+    }
+  }
 
-  private static final AwsSdkTelemetry TELEMETRY =
-      AwsSdkTelemetry.builder(GlobalOpenTelemetry.get())
-          .setCaptureExperimentalSpanAttributes(CAPTURE_EXPERIMENTAL_SPAN_ATTRIBUTES)
-          .setMessagingReceiveInstrumentationEnabled(RECEIVE_TELEMETRY_ENABLED)
-          .setUseConfiguredPropagatorForMessaging(USE_MESSAGING_PROPAGATOR)
-          .setRecordIndividualHttpError(RECORD_INDIVIDUAL_HTTP_ERROR)
-          .build();
+  private static boolean useMessagingPropagator() {
+    return getBoolean(
+        "otel.instrumentation.aws-sdk.experimental-use-propagator-for-messaging", false);
+  }
+
+  private static boolean recordIndividualHttpError() {
+    return getBoolean(
+        "otel.instrumentation.aws-sdk.experimental-record-individual-http-error", false);
+  }
+
+  private static boolean getBoolean(String name, boolean defaultValue) {
+    if (HAS_INSTRUMENTATION_CONFIG) {
+      return InstrumentationConfig.get().getBoolean(name, defaultValue);
+    } else {
+      return ConfigPropertiesUtil.getBoolean(name, defaultValue);
+    }
+  }
 
   public static AwsSdkTelemetry telemetry() {
     return TELEMETRY;

+ 13 - 0
instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/AbstractSqsRequest.java

@@ -0,0 +1,13 @@
+/*
+ * Copyright The OpenTelemetry Authors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package io.opentelemetry.instrumentation.awssdk.v2_2;
+
+import software.amazon.awssdk.core.interceptor.ExecutionAttributes;
+
+abstract class AbstractSqsRequest {
+
+  public abstract ExecutionAttributes getRequest();
+}

+ 94 - 69
instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/AwsSdkInstrumenterFactory.java

@@ -5,6 +5,9 @@
 
 package io.opentelemetry.instrumentation.awssdk.v2_2;
 
+import static java.util.Collections.emptyList;
+import static java.util.Collections.singletonList;
+
 import io.opentelemetry.api.OpenTelemetry;
 import io.opentelemetry.api.common.AttributesBuilder;
 import io.opentelemetry.api.trace.Span;
@@ -18,11 +21,13 @@ import io.opentelemetry.instrumentation.api.instrumenter.SpanNameExtractor;
 import io.opentelemetry.instrumentation.api.instrumenter.http.HttpClientAttributesExtractor;
 import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation;
 import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesExtractor;
+import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesGetter;
 import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingSpanNameExtractor;
 import io.opentelemetry.instrumentation.api.instrumenter.rpc.RpcClientAttributesExtractor;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.function.Function;
 import javax.annotation.Nullable;
 import software.amazon.awssdk.core.interceptor.ExecutionAttributes;
 import software.amazon.awssdk.core.interceptor.SdkExecutionAttribute;
@@ -63,38 +68,73 @@ final class AwsSdkInstrumenterFactory {
           Arrays.asList(
               rpcAttributesExtractor, httpAttributesExtractor, experimentalAttributesExtractor);
 
-  static Instrumenter<ExecutionAttributes, Response> requestInstrumenter(
-      OpenTelemetry openTelemetry, boolean captureExperimentalSpanAttributes) {
+  private final OpenTelemetry openTelemetry;
+  @Nullable private final TextMapPropagator messagingPropagator;
+  private final List<String> capturedHeaders;
+  private final boolean captureExperimentalSpanAttributes;
+  private final boolean messagingReceiveInstrumentationEnabled;
+  private final boolean useXrayPropagator;
 
+  AwsSdkInstrumenterFactory(
+      OpenTelemetry openTelemetry,
+      @Nullable TextMapPropagator messagingPropagator,
+      List<String> capturedHeaders,
+      boolean captureExperimentalSpanAttributes,
+      boolean messagingReceiveInstrumentationEnabled,
+      boolean useXrayPropagator) {
+    this.openTelemetry = openTelemetry;
+    this.messagingPropagator = messagingPropagator;
+    this.capturedHeaders = capturedHeaders;
+    this.captureExperimentalSpanAttributes = captureExperimentalSpanAttributes;
+    this.messagingReceiveInstrumentationEnabled = messagingReceiveInstrumentationEnabled;
+    this.useXrayPropagator = useXrayPropagator;
+  }
+
+  Instrumenter<ExecutionAttributes, Response> requestInstrumenter() {
     return createInstrumenter(
         openTelemetry,
-        captureExperimentalSpanAttributes
-            ? extendedAttributesExtractors
-            : defaultAttributesExtractors,
         AwsSdkInstrumenterFactory::spanName,
         SpanKindExtractor.alwaysClient(),
+        attributesExtractors(),
+        emptyList(),
         true);
   }
 
-  static Instrumenter<ExecutionAttributes, Response> consumerReceiveInstrumenter(
-      OpenTelemetry openTelemetry,
-      boolean captureExperimentalSpanAttributes,
-      boolean messagingReceiveInstrumentationEnabled) {
-    return sqsInstrumenter(
+  private List<AttributesExtractor<ExecutionAttributes, Response>> attributesExtractors() {
+    return captureExperimentalSpanAttributes
+        ? extendedAttributesExtractors
+        : defaultAttributesExtractors;
+  }
+
+  private List<AttributesExtractor<ExecutionAttributes, Response>> consumerAttributesExtractors() {
+    return captureExperimentalSpanAttributes
+        ? extendedConsumerAttributesExtractors
+        : defaultConsumerAttributesExtractors;
+  }
+
+  private <REQUEST, RESPONSE> AttributesExtractor<REQUEST, RESPONSE> messagingAttributesExtractor(
+      MessagingAttributesGetter<REQUEST, RESPONSE> getter, MessageOperation operation) {
+    return MessagingAttributesExtractor.builder(getter, operation)
+        .setCapturedHeaders(capturedHeaders)
+        .build();
+  }
+
+  Instrumenter<SqsReceiveRequest, Response> consumerReceiveInstrumenter() {
+    MessageOperation operation = MessageOperation.RECEIVE;
+    SqsReceiveRequestAttributesGetter getter = SqsReceiveRequestAttributesGetter.INSTANCE;
+    AttributesExtractor<SqsReceiveRequest, Response> messagingAttributeExtractor =
+        messagingAttributesExtractor(getter, operation);
+
+    return createInstrumenter(
         openTelemetry,
-        MessageOperation.RECEIVE,
-        captureExperimentalSpanAttributes
-            ? extendedConsumerAttributesExtractors
-            : defaultConsumerAttributesExtractors,
+        MessagingSpanNameExtractor.create(getter, operation),
+        SpanKindExtractor.alwaysConsumer(),
+        toSqsRequestExtractors(consumerAttributesExtractors(), Function.identity()),
+        singletonList(messagingAttributeExtractor),
         messagingReceiveInstrumentationEnabled);
   }
 
-  static Instrumenter<SqsProcessRequest, Void> consumerProcessInstrumenter(
-      OpenTelemetry openTelemetry,
-      TextMapPropagator messagingPropagator,
-      boolean captureExperimentalSpanAttributes,
-      boolean messagingReceiveInstrumentationEnabled,
-      boolean shouldUseXrayPropagator) {
+  Instrumenter<SqsProcessRequest, Void> consumerProcessInstrumenter() {
     MessageOperation operation = MessageOperation.PROCESS;
     SqsProcessRequestAttributesGetter getter = SqsProcessRequestAttributesGetter.INSTANCE;
 
@@ -104,96 +144,83 @@ final class AwsSdkInstrumenterFactory {
                 INSTRUMENTATION_NAME,
                 MessagingSpanNameExtractor.create(getter, operation))
             .addAttributesExtractors(
-                toProcessRequestExtractors(
-                    captureExperimentalSpanAttributes
-                        ? extendedConsumerAttributesExtractors
-                        : defaultConsumerAttributesExtractors))
-            .addAttributesExtractor(
-                MessagingAttributesExtractor.builder(getter, operation).build());
+                toSqsRequestExtractors(consumerAttributesExtractors(), unused -> null))
+            .addAttributesExtractor(messagingAttributesExtractor(getter, operation));
 
     if (messagingReceiveInstrumentationEnabled) {
       builder.addSpanLinksExtractor(
           (spanLinks, parentContext, request) -> {
             Context extracted =
                 SqsParentContext.ofMessage(
-                    request.getMessage(), messagingPropagator, shouldUseXrayPropagator);
+                    request.getMessage(), messagingPropagator, useXrayPropagator);
             spanLinks.addLink(Span.fromContext(extracted).getSpanContext());
           });
     }
     return builder.buildInstrumenter(SpanKindExtractor.alwaysConsumer());
   }
 
-  private static List<AttributesExtractor<SqsProcessRequest, Void>> toProcessRequestExtractors(
-      List<AttributesExtractor<ExecutionAttributes, Response>> extractors) {
-    List<AttributesExtractor<SqsProcessRequest, Void>> result = new ArrayList<>();
+  private static <RESPONSE>
+      List<AttributesExtractor<AbstractSqsRequest, RESPONSE>> toSqsRequestExtractors(
+          List<AttributesExtractor<ExecutionAttributes, Response>> extractors,
+          Function<RESPONSE, Response> responseConverter) {
+    List<AttributesExtractor<AbstractSqsRequest, RESPONSE>> result = new ArrayList<>();
     for (AttributesExtractor<ExecutionAttributes, Response> extractor : extractors) {
       result.add(
-          new AttributesExtractor<SqsProcessRequest, Void>() {
+          new AttributesExtractor<AbstractSqsRequest, RESPONSE>() {
             @Override
             public void onStart(
                 AttributesBuilder attributes,
                 Context parentContext,
-                SqsProcessRequest sqsProcessRequest) {
-              extractor.onStart(attributes, parentContext, sqsProcessRequest.getRequest());
+                AbstractSqsRequest sqsRequest) {
+              extractor.onStart(attributes, parentContext, sqsRequest.getRequest());
             }
 
             @Override
             public void onEnd(
                 AttributesBuilder attributes,
                 Context context,
-                SqsProcessRequest sqsProcessRequest,
-                @Nullable Void unused,
+                AbstractSqsRequest sqsRequest,
+                @Nullable RESPONSE response,
                 @Nullable Throwable error) {
-              extractor.onEnd(attributes, context, sqsProcessRequest.getRequest(), null, error);
+              extractor.onEnd(
+                  attributes,
+                  context,
+                  sqsRequest.getRequest(),
+                  responseConverter.apply(response),
+                  error);
             }
           });
     }
     return result;
   }
 
-  static Instrumenter<ExecutionAttributes, Response> producerInstrumenter(
-      OpenTelemetry openTelemetry, boolean captureExperimentalSpanAttributes) {
-    return sqsInstrumenter(
-        openTelemetry,
-        MessageOperation.PUBLISH,
-        captureExperimentalSpanAttributes
-            ? extendedAttributesExtractors
-            : defaultAttributesExtractors,
-        true);
-  }
-
-  private static Instrumenter<ExecutionAttributes, Response> sqsInstrumenter(
-      OpenTelemetry openTelemetry,
-      MessageOperation operation,
-      List<AttributesExtractor<ExecutionAttributes, Response>> extractors,
-      boolean enabled) {
+  Instrumenter<ExecutionAttributes, Response> producerInstrumenter() {
+    MessageOperation operation = MessageOperation.PUBLISH;
     SqsAttributesGetter getter = SqsAttributesGetter.INSTANCE;
     AttributesExtractor<ExecutionAttributes, Response> messagingAttributeExtractor =
-        MessagingAttributesExtractor.builder(getter, operation).build();
-    List<AttributesExtractor<ExecutionAttributes, Response>> newExtractors =
-        new ArrayList<>(extractors);
-    newExtractors.add(messagingAttributeExtractor);
+        messagingAttributesExtractor(getter, operation);
 
     return createInstrumenter(
         openTelemetry,
-        newExtractors,
         MessagingSpanNameExtractor.create(getter, operation),
-        operation == MessageOperation.PUBLISH
-            ? SpanKindExtractor.alwaysProducer()
-            : SpanKindExtractor.alwaysConsumer(),
-        enabled);
+        SpanKindExtractor.alwaysProducer(),
+        attributesExtractors(),
+        singletonList(messagingAttributeExtractor),
+        true);
   }
 
-  private static Instrumenter<ExecutionAttributes, Response> createInstrumenter(
+  private static <REQUEST, RESPONSE> Instrumenter<REQUEST, RESPONSE> createInstrumenter(
       OpenTelemetry openTelemetry,
-      List<AttributesExtractor<ExecutionAttributes, Response>> extractors,
-      SpanNameExtractor<ExecutionAttributes> spanNameExtractor,
-      SpanKindExtractor<ExecutionAttributes> spanKindExtractor,
+      SpanNameExtractor<REQUEST> spanNameExtractor,
+      SpanKindExtractor<REQUEST> spanKindExtractor,
+      List<? extends AttributesExtractor<? super REQUEST, ? super RESPONSE>> attributeExtractors,
+      List<AttributesExtractor<REQUEST, RESPONSE>> additionalAttributeExtractors,
       boolean enabled) {
 
-    return Instrumenter.<ExecutionAttributes, Response>builder(
+    return Instrumenter.<REQUEST, RESPONSE>builder(
             openTelemetry, INSTRUMENTATION_NAME, spanNameExtractor)
-        .addAttributesExtractors(extractors)
+        .addAttributesExtractors(attributeExtractors)
+        .addAttributesExtractors(additionalAttributeExtractors)
         .setEnabled(enabled)
         .buildInstrumenter(spanKindExtractor);
   }
@@ -203,6 +230,4 @@ final class AwsSdkInstrumenterFactory {
     String awsOperation = attributes.getAttribute(SdkExecutionAttribute.OPERATION_NAME);
     return awsServiceName + "." + awsOperation;
   }
-
-  private AwsSdkInstrumenterFactory() {}
 }

+ 12 - 14
instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/AwsSdkTelemetry.java

@@ -9,6 +9,7 @@ import io.opentelemetry.api.OpenTelemetry;
 import io.opentelemetry.context.propagation.TextMapPropagator;
 import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
 import io.opentelemetry.javaagent.tooling.muzzle.NoMuzzle;
+import java.util.List;
 import javax.annotation.Nullable;
 import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
 import software.amazon.awssdk.core.interceptor.ExecutionAttributes;
@@ -44,7 +45,7 @@ public class AwsSdkTelemetry {
   }
 
   private final Instrumenter<ExecutionAttributes, Response> requestInstrumenter;
-  private final Instrumenter<ExecutionAttributes, Response> consumerReceiveInstrumenter;
+  private final Instrumenter<SqsReceiveRequest, Response> consumerReceiveInstrumenter;
   private final Instrumenter<SqsProcessRequest, Void> consumerProcessInstrumenter;
   private final Instrumenter<ExecutionAttributes, Response> producerInstrumenter;
   private final boolean captureExperimentalSpanAttributes;
@@ -54,6 +55,7 @@ public class AwsSdkTelemetry {
 
   AwsSdkTelemetry(
       OpenTelemetry openTelemetry,
+      List<String> capturedHeaders,
       boolean captureExperimentalSpanAttributes,
       boolean useMessagingPropagator,
       boolean useXrayPropagator,
@@ -62,24 +64,20 @@ public class AwsSdkTelemetry {
     this.useXrayPropagator = useXrayPropagator;
     this.messagingPropagator =
         useMessagingPropagator ? openTelemetry.getPropagators().getTextMapPropagator() : null;
-    this.requestInstrumenter =
-        AwsSdkInstrumenterFactory.requestInstrumenter(
-            openTelemetry, captureExperimentalSpanAttributes);
-    this.consumerReceiveInstrumenter =
-        AwsSdkInstrumenterFactory.consumerReceiveInstrumenter(
-            openTelemetry,
-            captureExperimentalSpanAttributes,
-            messagingReceiveInstrumentationEnabled);
-    this.consumerProcessInstrumenter =
-        AwsSdkInstrumenterFactory.consumerProcessInstrumenter(
+
+    AwsSdkInstrumenterFactory instrumenterFactory =
+        new AwsSdkInstrumenterFactory(
             openTelemetry,
             messagingPropagator,
+            capturedHeaders,
             captureExperimentalSpanAttributes,
             messagingReceiveInstrumentationEnabled,
             useXrayPropagator);
-    this.producerInstrumenter =
-        AwsSdkInstrumenterFactory.producerInstrumenter(
-            openTelemetry, captureExperimentalSpanAttributes);
+
+    this.requestInstrumenter = instrumenterFactory.requestInstrumenter();
+    this.consumerReceiveInstrumenter = instrumenterFactory.consumerReceiveInstrumenter();
+    this.consumerProcessInstrumenter = instrumenterFactory.consumerProcessInstrumenter();
+    this.producerInstrumenter = instrumenterFactory.producerInstrumenter();
     this.captureExperimentalSpanAttributes = captureExperimentalSpanAttributes;
     this.recordIndividualHttpError = recordIndividualHttpError;
   }

+ 16 - 0
instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/AwsSdkTelemetryBuilder.java

@@ -5,14 +5,18 @@
 
 package io.opentelemetry.instrumentation.awssdk.v2_2;
 
+import static java.util.Collections.emptyList;
+
 import com.google.errorprone.annotations.CanIgnoreReturnValue;
 import io.opentelemetry.api.OpenTelemetry;
+import java.util.List;
 
 /** A builder of {@link AwsSdkTelemetry}. */
 public final class AwsSdkTelemetryBuilder {
 
   private final OpenTelemetry openTelemetry;
 
+  private List<String> capturedHeaders = emptyList();
   private boolean captureExperimentalSpanAttributes;
   private boolean useMessagingPropagator;
   private boolean recordIndividualHttpError;
@@ -23,6 +27,17 @@ public final class AwsSdkTelemetryBuilder {
     this.openTelemetry = openTelemetry;
   }
 
+  /**
+   * Configures the messaging headers that will be captured as span attributes.
+   *
+   * @param capturedHeaders A list of messaging header names.
+   */
+  @CanIgnoreReturnValue
+  public AwsSdkTelemetryBuilder setCapturedHeaders(List<String> capturedHeaders) {
+    this.capturedHeaders = capturedHeaders;
+    return this;
+  }
+
   /**
    * Sets whether experimental attributes should be set to spans. These attributes may be changed or
    * removed in the future, so only enable this if you know you do not require attributes filled by
@@ -104,6 +119,7 @@ public final class AwsSdkTelemetryBuilder {
   public AwsSdkTelemetry build() {
     return new AwsSdkTelemetry(
         openTelemetry,
+        capturedHeaders,
         captureExperimentalSpanAttributes,
         useMessagingPropagator,
         useXrayPropagator,

+ 5 - 0
instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/SqsAccess.java

@@ -57,6 +57,11 @@ final class SqsAccess {
     return enabled ? SqsImpl.getQueueUrl(request) : null;
   }
 
+  @NoMuzzle
+  static String getMessageAttribute(SdkRequest request, String name) {
+    return enabled ? SqsImpl.getMessageAttribute(request, name) : null;
+  }
+
   @NoMuzzle
   static String getMessageId(SdkResponse response) {
     return enabled ? SqsImpl.getMessageId(response) : null;

+ 3 - 2
instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/SqsAttributesGetter.java

@@ -69,7 +69,8 @@ enum SqsAttributesGetter implements MessagingAttributesGetter<ExecutionAttribute
 
   @Override
   public List<String> getMessageHeader(ExecutionAttributes request, String name) {
-    // TODO: not implemented
-    return Collections.emptyList();
+    SdkRequest sdkRequest = request.getAttribute(TracingExecutionInterceptor.SDK_REQUEST_ATTRIBUTE);
+    String value = SqsAccess.getMessageAttribute(sdkRequest, name);
+    return value != null ? Collections.singletonList(value) : Collections.emptyList();
   }
 }

+ 13 - 4
instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/SqsImpl.java

@@ -68,16 +68,17 @@ final class SqsImpl {
 
     io.opentelemetry.context.Context parentContext =
         TracingExecutionInterceptor.getParentContext(executionAttributes);
-    Instrumenter<ExecutionAttributes, Response> consumerReceiveInstrumenter =
+    Instrumenter<SqsReceiveRequest, Response> consumerReceiveInstrumenter =
         config.getConsumerReceiveInstrumenter();
     io.opentelemetry.context.Context receiveContext = null;
-    if (timer != null
-        && consumerReceiveInstrumenter.shouldStart(parentContext, executionAttributes)) {
+    SqsReceiveRequest receiveRequest =
+        SqsReceiveRequest.create(executionAttributes, SqsMessageImpl.wrap(response.messages()));
+    if (timer != null && consumerReceiveInstrumenter.shouldStart(parentContext, receiveRequest)) {
       receiveContext =
           InstrumenterUtil.startAndEnd(
               consumerReceiveInstrumenter,
               parentContext,
-              executionAttributes,
+              receiveRequest,
               new Response(context.httpResponse(), response),
               null,
               timer.startTime(),
@@ -258,6 +259,14 @@ final class SqsImpl {
     return null;
   }
 
+  static String getMessageAttribute(SdkRequest request, String name) {
+    if (request instanceof SendMessageRequest) {
+      MessageAttributeValue value = ((SendMessageRequest) request).messageAttributes().get(name);
+      return value != null ? value.stringValue() : null;
+    }
+    return null;
+  }
+
   static String getMessageId(SdkResponse response) {
     if (response instanceof SendMessageResponse) {
       return ((SendMessageResponse) response).messageId();

+ 4 - 0
instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/SqsMessage.java

@@ -17,4 +17,8 @@ interface SqsMessage {
   Map<String, MessageAttributeValue> messageAttributes();
 
   Map<String, String> attributesAsStrings();
+
+  String getMessageAttribute(String name);
+
+  String getMessageId();
 }

+ 21 - 0
instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/SqsMessageImpl.java

@@ -5,6 +5,8 @@
 
 package io.opentelemetry.instrumentation.awssdk.v2_2;
 
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
 import software.amazon.awssdk.services.sqs.model.Message;
 import software.amazon.awssdk.services.sqs.model.MessageAttributeValue;
@@ -21,6 +23,14 @@ final class SqsMessageImpl implements SqsMessage {
     return new SqsMessageImpl(message);
   }
 
+  static List<SqsMessage> wrap(List<Message> messages) {
+    List<SqsMessage> result = new ArrayList<>();
+    for (Message message : messages) {
+      result.add(wrap(message));
+    }
+    return result;
+  }
+
   @Override
   public Map<String, MessageAttributeValue> messageAttributes() {
     return message.messageAttributes();
@@ -30,4 +40,15 @@ final class SqsMessageImpl implements SqsMessage {
   public Map<String, String> attributesAsStrings() {
     return message.attributesAsStrings();
   }
+
+  @Override
+  public String getMessageAttribute(String name) {
+    MessageAttributeValue value = message.messageAttributes().get(name);
+    return value != null ? value.stringValue() : null;
+  }
+
+  @Override
+  public String getMessageId() {
+    return message.messageId();
+  }
 }

+ 2 - 1
instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/SqsProcessRequest.java

@@ -7,7 +7,7 @@ package io.opentelemetry.instrumentation.awssdk.v2_2;
 
 import software.amazon.awssdk.core.interceptor.ExecutionAttributes;
 
-final class SqsProcessRequest {
+final class SqsProcessRequest extends AbstractSqsRequest {
   private final ExecutionAttributes request;
   private final SqsMessage message;
 
@@ -20,6 +20,7 @@ final class SqsProcessRequest {
     return new SqsProcessRequest(request, message);
   }
 
+  @Override
   public ExecutionAttributes getRequest() {
     return request;
   }

+ 9 - 1
instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/SqsProcessRequestAttributesGetter.java

@@ -6,6 +6,8 @@
 package io.opentelemetry.instrumentation.awssdk.v2_2;
 
 import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesGetter;
+import java.util.Collections;
+import java.util.List;
 import javax.annotation.Nullable;
 import software.amazon.awssdk.core.SdkRequest;
 
@@ -58,6 +60,12 @@ enum SqsProcessRequestAttributesGetter
   @Override
   @Nullable
   public String getMessageId(SqsProcessRequest request, @Nullable Void response) {
-    return null;
+    return request.getMessage().getMessageId();
+  }
+
+  @Override
+  public List<String> getMessageHeader(SqsProcessRequest request, String name) {
+    String value = request.getMessage().getMessageAttribute(name);
+    return value != null ? Collections.singletonList(value) : Collections.emptyList();
   }
 }

+ 32 - 0
instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/SqsReceiveRequest.java

@@ -0,0 +1,32 @@
+/*
+ * Copyright The OpenTelemetry Authors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package io.opentelemetry.instrumentation.awssdk.v2_2;
+
+import java.util.List;
+import software.amazon.awssdk.core.interceptor.ExecutionAttributes;
+
+final class SqsReceiveRequest extends AbstractSqsRequest {
+  private final ExecutionAttributes request;
+  private final List<SqsMessage> messages;
+
+  private SqsReceiveRequest(ExecutionAttributes request, List<SqsMessage> messages) {
+    this.request = request;
+    this.messages = messages;
+  }
+
+  public static SqsReceiveRequest create(ExecutionAttributes request, List<SqsMessage> messages) {
+    return new SqsReceiveRequest(request, messages);
+  }
+
+  @Override
+  public ExecutionAttributes getRequest() {
+    return request;
+  }
+
+  public List<SqsMessage> getMessages() {
+    return messages;
+  }
+}

+ 74 - 0
instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/SqsReceiveRequestAttributesGetter.java

@@ -0,0 +1,74 @@
+/*
+ * Copyright The OpenTelemetry Authors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package io.opentelemetry.instrumentation.awssdk.v2_2;
+
+import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesGetter;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import javax.annotation.Nullable;
+import software.amazon.awssdk.core.SdkRequest;
+
+enum SqsReceiveRequestAttributesGetter
+    implements MessagingAttributesGetter<SqsReceiveRequest, Response> {
+  INSTANCE;
+
+  @Override
+  public String getSystem(SqsReceiveRequest request) {
+    return "AmazonSQS";
+  }
+
+  @Override
+  public String getDestination(SqsReceiveRequest request) {
+    SdkRequest sdkRequest =
+        request.getRequest().getAttribute(TracingExecutionInterceptor.SDK_REQUEST_ATTRIBUTE);
+    String queueUrl = SqsAccess.getQueueUrl(sdkRequest);
+    if (queueUrl != null) {
+      int i = queueUrl.lastIndexOf('/');
+      if (i > 0) {
+        return queueUrl.substring(i + 1);
+      }
+    }
+    return null;
+  }
+
+  @Override
+  public boolean isTemporaryDestination(SqsReceiveRequest request) {
+    return false;
+  }
+
+  @Override
+  @Nullable
+  public String getConversationId(SqsReceiveRequest request) {
+    return null;
+  }
+
+  @Override
+  @Nullable
+  public Long getMessagePayloadSize(SqsReceiveRequest request) {
+    return null;
+  }
+
+  @Override
+  @Nullable
+  public Long getMessagePayloadCompressedSize(SqsReceiveRequest request) {
+    return null;
+  }
+
+  @Override
+  @Nullable
+  public String getMessageId(SqsReceiveRequest request, @Nullable Response response) {
+    return null;
+  }
+
+  @Override
+  public List<String> getMessageHeader(SqsReceiveRequest request, String name) {
+    return StreamSupport.stream(request.getMessages().spliterator(), false)
+        .map(message -> message.getMessageAttribute(name))
+        .filter(value -> value != null)
+        .collect(Collectors.toList());
+  }
+}

+ 3 - 3
instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/TracingExecutionInterceptor.java

@@ -64,7 +64,7 @@ final class TracingExecutionInterceptor implements ExecutionInterceptor {
       new ExecutionAttribute<>(TracingExecutionInterceptor.class.getName() + ".TracingMessages");
 
   private final Instrumenter<ExecutionAttributes, Response> requestInstrumenter;
-  private final Instrumenter<ExecutionAttributes, Response> consumerReceiveInstrumenter;
+  private final Instrumenter<SqsReceiveRequest, Response> consumerReceiveInstrumenter;
   private final Instrumenter<SqsProcessRequest, Void> consumerProcessInstrumenter;
   private final Instrumenter<ExecutionAttributes, Response> producerInstrumenter;
   private final boolean captureExperimentalSpanAttributes;
@@ -73,7 +73,7 @@ final class TracingExecutionInterceptor implements ExecutionInterceptor {
       AttributeKey.stringKey("aws.http.error_message");
   static final String HTTP_FAILURE_EVENT = "HTTP request failure";
 
-  Instrumenter<ExecutionAttributes, Response> getConsumerReceiveInstrumenter() {
+  Instrumenter<SqsReceiveRequest, Response> getConsumerReceiveInstrumenter() {
     return consumerReceiveInstrumenter;
   }
 
@@ -97,7 +97,7 @@ final class TracingExecutionInterceptor implements ExecutionInterceptor {
 
   TracingExecutionInterceptor(
       Instrumenter<ExecutionAttributes, Response> requestInstrumenter,
-      Instrumenter<ExecutionAttributes, Response> consumerReceiveInstrumenter,
+      Instrumenter<SqsReceiveRequest, Response> consumerReceiveInstrumenter,
       Instrumenter<SqsProcessRequest, Void> consumerProcessInstrumenter,
       Instrumenter<ExecutionAttributes, Response> producerInstrumenter,
       boolean captureExperimentalSpanAttributes,

+ 4 - 0
instrumentation/aws-sdk/aws-sdk-2.2/library/src/test/groovy/io/opentelemetry/instrumentation/awssdk/v2_2/Aws2SqsTracingTest.groovy

@@ -10,6 +10,8 @@ import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration
 import software.amazon.awssdk.services.sqs.SqsAsyncClient
 import software.amazon.awssdk.services.sqs.SqsClient
 
+import static java.util.Collections.singletonList
+
 abstract class Aws2SqsTracingTest extends AbstractAws2SqsTracingTest implements LibraryTestTrait {
   static AwsSdkTelemetry telemetry
 
@@ -17,6 +19,8 @@ abstract class Aws2SqsTracingTest extends AbstractAws2SqsTracingTest implements
     def telemetryBuilder = AwsSdkTelemetry.builder(getOpenTelemetry())
       .setCaptureExperimentalSpanAttributes(true)
       .setMessagingReceiveInstrumentationEnabled(true)
+      .setCapturedHeaders(singletonList("test-message-header"))
+
     configure(telemetryBuilder)
     telemetry = telemetryBuilder.build()
   }

+ 3 - 0
instrumentation/aws-sdk/aws-sdk-2.2/testing/src/main/groovy/io/opentelemetry/instrumentation/awssdk/v2_2/AbstractAws2SqsSuppressReceiveSpansTest.groovy

@@ -181,6 +181,7 @@ abstract class AbstractAws2SqsSuppressReceiveSpansTest extends InstrumentationSp
             "$SemanticAttributes.MESSAGING_SYSTEM" "AmazonSQS"
             "$SemanticAttributes.MESSAGING_DESTINATION_NAME" "testSdkSqs"
             "$SemanticAttributes.MESSAGING_OPERATION" "process"
+            "$SemanticAttributes.MESSAGING_MESSAGE_ID" String
             "$SemanticAttributes.HTTP_REQUEST_CONTENT_LENGTH" { it == null || it instanceof Long }
           }
         }
@@ -357,6 +358,7 @@ abstract class AbstractAws2SqsSuppressReceiveSpansTest extends InstrumentationSp
               "$SemanticAttributes.MESSAGING_SYSTEM" "AmazonSQS"
               "$SemanticAttributes.MESSAGING_DESTINATION_NAME" "testSdkSqs"
               "$SemanticAttributes.MESSAGING_OPERATION" "process"
+              "$SemanticAttributes.MESSAGING_MESSAGE_ID" String
               "$SemanticAttributes.HTTP_REQUEST_CONTENT_LENGTH" { it == null || it instanceof Long }
             }
           }
@@ -384,6 +386,7 @@ abstract class AbstractAws2SqsSuppressReceiveSpansTest extends InstrumentationSp
               "$SemanticAttributes.MESSAGING_SYSTEM" "AmazonSQS"
               "$SemanticAttributes.MESSAGING_DESTINATION_NAME" "testSdkSqs"
               "$SemanticAttributes.MESSAGING_OPERATION" "process"
+              "$SemanticAttributes.MESSAGING_MESSAGE_ID" String
               "$SemanticAttributes.HTTP_REQUEST_CONTENT_LENGTH" { it == null || it instanceof Long }
             }
           }

+ 37 - 1
instrumentation/aws-sdk/aws-sdk-2.2/testing/src/main/groovy/io/opentelemetry/instrumentation/awssdk/v2_2/AbstractAws2SqsTracingTest.groovy

@@ -115,7 +115,7 @@ abstract class AbstractAws2SqsTracingTest extends InstrumentationSpecification {
     }
   }
 
-  void assertSqsTraces(withParent = false) {
+  void assertSqsTraces(withParent = false, captureHeaders = false) {
     assertTraces(3) {
       SpanData publishSpan
       trace(0, 1) {
@@ -164,6 +164,9 @@ abstract class AbstractAws2SqsTracingTest extends InstrumentationSpecification {
             "$SemanticAttributes.MESSAGING_MESSAGE_ID" String
             "$SemanticAttributes.HTTP_REQUEST_CONTENT_LENGTH" { it == null || it instanceof Long }
             "$SemanticAttributes.HTTP_RESPONSE_CONTENT_LENGTH" { it == null || it instanceof Long }
+            if (captureHeaders) {
+              "messaging.header.test_message_header" { it == ["test"] }
+            }
           }
         }
         publishSpan = span(0)
@@ -225,6 +228,9 @@ abstract class AbstractAws2SqsTracingTest extends InstrumentationSpecification {
             "$SemanticAttributes.MESSAGING_OPERATION" "receive"
             "$SemanticAttributes.HTTP_REQUEST_CONTENT_LENGTH" { it == null || it instanceof Long }
             "$SemanticAttributes.HTTP_RESPONSE_CONTENT_LENGTH" { it == null || it instanceof Long }
+            if (captureHeaders) {
+              "messaging.header.test_message_header" { it == ["test"] }
+            }
           }
         }
         span(1 + offset) {
@@ -244,7 +250,11 @@ abstract class AbstractAws2SqsTracingTest extends InstrumentationSpecification {
             "$SemanticAttributes.MESSAGING_SYSTEM" "AmazonSQS"
             "$SemanticAttributes.MESSAGING_DESTINATION_NAME" "testSdkSqs"
             "$SemanticAttributes.MESSAGING_OPERATION" "process"
+            "$SemanticAttributes.MESSAGING_MESSAGE_ID" String
             "$SemanticAttributes.HTTP_REQUEST_CONTENT_LENGTH" { it == null || it instanceof Long }
+            if (captureHeaders) {
+              "messaging.header.test_message_header" { it == ["test"] }
+            }
           }
         }
         span(2 + offset) {
@@ -276,6 +286,31 @@ abstract class AbstractAws2SqsTracingTest extends InstrumentationSpecification {
     assertSqsTraces()
   }
 
+  def "capture message header as span attribute"() {
+    setup:
+    def builder = SqsClient.builder()
+    configureSdkClient(builder)
+    def client = configureSqsClient(builder.build())
+
+    client.createQueue(createQueueRequest)
+
+    when:
+    SendMessageRequest newSendMessageRequest = sendMessageRequest.toBuilder().messageAttributes(
+      Collections.singletonMap("test-message-header",
+      MessageAttributeValue.builder().dataType("String").stringValue("test").build())
+    ).build()
+    client.sendMessage(newSendMessageRequest)
+
+    ReceiveMessageRequest newReceiveMessageRequest = receiveMessageRequest.toBuilder()
+      .messageAttributeNames("test-message-header").build()
+    def resp = client.receiveMessage(newReceiveMessageRequest)
+
+    then:
+    resp.messages.size() == 1
+    resp.messages.each {message -> runWithSpan("process child") {}}
+    assertSqsTraces(false, true)
+  }
+
   def "simple sqs producer-consumer services with parent: sync"() {
     setup:
     def builder = SqsClient.builder()
@@ -420,6 +455,7 @@ abstract class AbstractAws2SqsTracingTest extends InstrumentationSpecification {
               "$SemanticAttributes.MESSAGING_SYSTEM" "AmazonSQS"
               "$SemanticAttributes.MESSAGING_DESTINATION_NAME" "testSdkSqs"
               "$SemanticAttributes.MESSAGING_OPERATION" "process"
+              "$SemanticAttributes.MESSAGING_MESSAGE_ID" String
               "$SemanticAttributes.HTTP_REQUEST_CONTENT_LENGTH" { it == null || it instanceof Long }
             }
           }