Browse Source

Merge changes from dd-trace-java 0.53.0 (#456)

* Add time in queue (DataDog/dd-trace-java#1481)

* Minor upgrades (DataDog/dd-trace-java#1495)

* Allow user to disable kafka time in queue tag (DataDog/dd-trace-java#1487)

* Replace Set<Integer> with BitSet for HTTP statuses (DataDog/dd-trace-java#1496)

* Register WeakMapProvider earlier in AgentInstaller (DataDog/dd-trace-java#1480)

* Update codenarc (DataDog/dd-trace-java#1500)

Co-authored-by: Tyler Benson <tyler.benson@datadoghq.com>
Co-authored-by: Nikolay Martynov <mar.kolya@gmail.com>
Co-authored-by: Richard Startin <richard.startin@datadoghq.com>
Trask Stalnaker 4 years ago
parent
commit
96fa7b8c31

+ 1 - 1
agent-bootstrap/src/main/java/io/opentelemetry/auto/bootstrap/WeakMap.java

@@ -67,7 +67,7 @@ public interface WeakMap<K, V> {
 
       @Override
       public <K, V> WeakMap<K, V> get() {
-        log.warn("WeakMap.Supplier not registered. Returning a synchronized WeakHashMap.");
+        log.debug("WeakMap.Supplier not registered. Returning a synchronized WeakHashMap.");
         return new MapAdapter<>(Collections.synchronizedMap(new WeakHashMap<K, V>()));
       }
     }

+ 1 - 1
agent-bootstrap/src/main/java/io/opentelemetry/auto/bootstrap/instrumentation/decorator/HttpClientDecorator.java

@@ -104,7 +104,7 @@ public abstract class HttpClientDecorator<REQUEST, RESPONSE> extends ClientDecor
       if (status != null) {
         span.setAttribute(Tags.HTTP_STATUS, status);
 
-        if (Config.get().getHttpClientErrorStatuses().contains(status)) {
+        if (Config.get().getHttpClientErrorStatuses().get(status)) {
           span.setStatus(Status.UNKNOWN);
         }
       }

+ 1 - 1
agent-bootstrap/src/main/java/io/opentelemetry/auto/bootstrap/instrumentation/decorator/HttpServerDecorator.java

@@ -119,7 +119,7 @@ public abstract class HttpServerDecorator<REQUEST, CONNECTION, RESPONSE> extends
       if (status != null) {
         span.setAttribute(Tags.HTTP_STATUS, status);
 
-        if (Config.get().getHttpServerErrorStatuses().contains(status)) {
+        if (Config.get().getHttpServerErrorStatuses().get(status)) {
           span.setStatus(Status.UNKNOWN);
         }
       }

+ 21 - 23
agent-bootstrap/src/main/java/io/opentelemetry/auto/config/Config.java

@@ -22,12 +22,11 @@ import java.io.IOException;
 import java.lang.invoke.MethodHandles;
 import java.lang.invoke.MethodType;
 import java.util.Arrays;
+import java.util.BitSet;
 import java.util.Collections;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Locale;
 import java.util.Properties;
-import java.util.Set;
 import java.util.SortedSet;
 import java.util.regex.Pattern;
 import lombok.Getter;
@@ -84,9 +83,9 @@ public class Config {
 
   private static final boolean DEFAULT_RUNTIME_CONTEXT_FIELD_INJECTION = true;
 
-  private static final Set<Integer> DEFAULT_HTTP_SERVER_ERROR_STATUSES =
+  private static final BitSet DEFAULT_HTTP_SERVER_ERROR_STATUSES =
       parseIntegerRangeSet("500-599", "default");
-  private static final Set<Integer> DEFAULT_HTTP_CLIENT_ERROR_STATUSES =
+  private static final BitSet DEFAULT_HTTP_CLIENT_ERROR_STATUSES =
       parseIntegerRangeSet("400-599", "default");
   private static final boolean DEFAULT_HTTP_SERVER_TAG_QUERY_STRING = false;
   private static final boolean DEFAULT_HTTP_CLIENT_TAG_QUERY_STRING = false;
@@ -111,8 +110,8 @@ public class Config {
   @Getter private final boolean traceEnabled;
   @Getter private final boolean integrationsEnabled;
   @Getter private final List<String> excludedClasses;
-  @Getter private final Set<Integer> httpServerErrorStatuses;
-  @Getter private final Set<Integer> httpClientErrorStatuses;
+  @Getter private final BitSet httpServerErrorStatuses;
+  @Getter private final BitSet httpClientErrorStatuses;
   @Getter private final boolean httpServerTagQueryString;
   @Getter private final boolean httpClientTagQueryString;
   @Getter private final Integer scopeDepthLimit;
@@ -368,8 +367,8 @@ public class Config {
     }
   }
 
-  private static Set<Integer> getIntegerRangeSettingFromEnvironment(
-      final String name, final Set<Integer> defaultValue) {
+  private static BitSet getIntegerRangeSettingFromEnvironment(
+      final String name, final BitSet defaultValue) {
     final String value = getSettingFromEnvironment(name, null);
     try {
       return value == null ? defaultValue : parseIntegerRangeSet(value, name);
@@ -450,8 +449,8 @@ public class Config {
     return valueOf(properties.getProperty(name), Integer.class, defaultValue);
   }
 
-  private static Set<Integer> getPropertyIntegerRangeValue(
-      final Properties properties, final String name, final Set<Integer> defaultValue) {
+  private static BitSet getPropertyIntegerRangeValue(
+      final Properties properties, final String name, final BitSet defaultValue) {
     final String value = properties.getProperty(name);
     try {
       return value == null ? defaultValue : parseIntegerRangeSet(value, name);
@@ -462,7 +461,7 @@ public class Config {
   }
 
   @NonNull
-  private static Set<Integer> parseIntegerRangeSet(@NonNull String str, final String settingName)
+  private static BitSet parseIntegerRangeSet(@NonNull String str, final String settingName)
       throws NumberFormatException {
     str = str.replaceAll("\\s", "");
     if (!str.matches("\\d{3}(?:-\\d{3})?(?:,\\d{3}(?:-\\d{3})?)*")) {
@@ -473,24 +472,23 @@ public class Config {
       throw new NumberFormatException();
     }
 
+    final int lastSeparator = Math.max(str.lastIndexOf(','), str.lastIndexOf('-'));
+    final int maxValue = Integer.parseInt(str.substring(lastSeparator + 1));
+    final BitSet set = new BitSet(maxValue);
     final String[] tokens = str.split(",", -1);
-    final Set<Integer> set = new HashSet<>();
-
     for (final String token : tokens) {
-      final String[] range = token.split("-", -1);
-      if (range.length == 1) {
-        set.add(Integer.parseInt(range[0]));
-      } else if (range.length == 2) {
-        final int left = Integer.parseInt(range[0]);
-        final int right = Integer.parseInt(range[1]);
+      final int separator = token.indexOf('-');
+      if (separator == -1) {
+        set.set(Integer.parseInt(token));
+      } else if (separator > 0) {
+        final int left = Integer.parseInt(token.substring(0, separator));
+        final int right = Integer.parseInt(token.substring(separator + 1));
         final int min = Math.min(left, right);
         final int max = Math.max(left, right);
-        for (int i = min; i <= max; i++) {
-          set.add(i);
-        }
+        set.set(min, max + 1);
       }
     }
-    return Collections.unmodifiableSet(set);
+    return set;
   }
 
   @NonNull

+ 22 - 14
agent-bootstrap/src/test/groovy/io/opentelemetry/auto/config/ConfigTest.groovy

@@ -43,8 +43,8 @@ class ConfigTest extends AgentSpecification {
 
     then:
     config.traceEnabled == true
-    config.httpServerErrorStatuses == (500..599).toSet()
-    config.httpClientErrorStatuses == (400..599).toSet()
+    config.httpServerErrorStatuses == toBitSet((500..599))
+    config.httpClientErrorStatuses == toBitSet((400..599))
     config.runtimeContextFieldInjection == true
     config.toString().contains("traceEnabled=true")
 
@@ -71,8 +71,8 @@ class ConfigTest extends AgentSpecification {
     then:
     config.traceEnabled == false
     config.traceMethods == "mypackage.MyClass[myMethod]"
-    config.httpServerErrorStatuses == (122..457).toSet()
-    config.httpClientErrorStatuses == (111..111).toSet()
+    config.httpServerErrorStatuses == toBitSet((122..457))
+    config.httpClientErrorStatuses == toBitSet((111..111))
     config.runtimeContextFieldInjection == false
   }
 
@@ -90,8 +90,8 @@ class ConfigTest extends AgentSpecification {
     then:
     config.traceEnabled == false
     config.traceMethods == "mypackage.MyClass[myMethod]"
-    config.httpServerErrorStatuses == (122..457).toSet()
-    config.httpClientErrorStatuses == (111..111).toSet()
+    config.httpServerErrorStatuses == toBitSet((122..457))
+    config.httpClientErrorStatuses == toBitSet((111..111))
     config.runtimeContextFieldInjection == false
   }
 
@@ -134,8 +134,8 @@ class ConfigTest extends AgentSpecification {
     then:
     config.traceEnabled == true
     config.traceMethods == " "
-    config.httpServerErrorStatuses == (500..599).toSet()
-    config.httpClientErrorStatuses == (400..599).toSet()
+    config.httpServerErrorStatuses == toBitSet((500..599))
+    config.httpClientErrorStatuses == toBitSet((400..599))
   }
 
   def "sys props override properties"() {
@@ -152,8 +152,8 @@ class ConfigTest extends AgentSpecification {
     then:
     config.traceEnabled == false
     config.traceMethods == "mypackage.MyClass[myMethod]"
-    config.httpServerErrorStatuses == (122..457).toSet()
-    config.httpClientErrorStatuses == (111..111).toSet()
+    config.httpServerErrorStatuses == toBitSet((122..457))
+    config.httpClientErrorStatuses == toBitSet((111..111))
   }
 
   def "override null properties"() {
@@ -235,10 +235,10 @@ class ConfigTest extends AgentSpecification {
 
     then:
     if (expected) {
-      assert config.httpServerErrorStatuses == expected.toSet()
-      assert config.httpClientErrorStatuses == expected.toSet()
-      assert propConfig.httpServerErrorStatuses == expected.toSet()
-      assert propConfig.httpClientErrorStatuses == expected.toSet()
+      assert config.httpServerErrorStatuses == toBitSet(expected)
+      assert config.httpClientErrorStatuses == toBitSet(expected)
+      assert propConfig.httpServerErrorStatuses == toBitSet(expected)
+      assert propConfig.httpClientErrorStatuses == toBitSet(expected)
     } else {
       assert config.httpServerErrorStatuses == Config.DEFAULT_HTTP_SERVER_ERROR_STATUSES
       assert config.httpClientErrorStatuses == Config.DEFAULT_HTTP_CLIENT_ERROR_STATUSES
@@ -320,4 +320,12 @@ class ConfigTest extends AgentSpecification {
     cleanup:
     System.clearProperty(PREFIX + CONFIGURATION_FILE)
   }
+
+  static BitSet toBitSet(Collection<Integer> set) {
+    BitSet bs = new BitSet()
+    for (Integer i : set) {
+      bs.set(i)
+    }
+    return bs
+  }
 }

+ 5 - 0
agent-tooling/src/main/java/io/opentelemetry/auto/tooling/AgentInstaller.java

@@ -50,6 +50,11 @@ public class AgentInstaller {
     return INSTRUMENTATION;
   }
 
+  static {
+    // WeakMap is used by other classes below, so we need to register the provider first.
+    AgentTooling.registerWeakMapProvider();
+  }
+
   public static void installBytebuddyAgent(final Instrumentation inst) {
     if (Config.get().isTraceEnabled()) {
       installBytebuddyAgent(inst, false, new AgentBuilder.Listener[0]);

+ 1 - 1
agent-tooling/src/main/java/io/opentelemetry/auto/tooling/AgentTooling.java

@@ -35,7 +35,7 @@ public class AgentTooling {
     registerWeakMapProvider();
   }
 
-  private static void registerWeakMapProvider() {
+  static void registerWeakMapProvider() {
     if (!WeakMap.Provider.isProviderRegistered()) {
       WeakMap.Provider.registerIfAbsent(new WeakMapSuppliers.WeakConcurrent());
       //    WeakMap.Provider.registerIfAbsent(new WeakMapSuppliers.WeakConcurrent.Inline());

+ 1 - 1
gradle/codenarc.gradle

@@ -1,7 +1,7 @@
 apply plugin: "codenarc"
 
 dependencies {
-  codenarc "org.codenarc:CodeNarc:0.27.0"
+  codenarc "org.codenarc:CodeNarc:1.5"
 }
 
 codenarc {

+ 1 - 1
gradle/wrapper/gradle-wrapper.properties

@@ -1,5 +1,5 @@
 distributionBase=GRADLE_USER_HOME
 distributionPath=wrapper/dists
-distributionUrl=https\://services.gradle.org/distributions/gradle-6.4-all.zip
+distributionUrl=https\://services.gradle.org/distributions/gradle-6.4.1-all.zip
 zipStoreBase=GRADLE_USER_HOME
 zipStorePath=wrapper/dists

+ 1 - 1
instrumentation/finatra-2.9/src/main/java/io/opentelemetry/auto/instrumentation/finatra/FinatraInstrumentation.java

@@ -130,7 +130,7 @@ public class FinatraInstrumentation extends Instrumenter.Default {
       final Span span = spanWithScope.getSpan();
 
       // Don't use DECORATE.onResponse because this is the controller span
-      if (Config.get().getHttpServerErrorStatuses().contains(DECORATE.status(response))) {
+      if (Config.get().getHttpServerErrorStatuses().get(DECORATE.status(response))) {
         span.setStatus(Status.UNKNOWN);
       }
 

+ 10 - 2
instrumentation/kafka-clients-0.11/src/main/java/io/opentelemetry/auto/instrumentation/kafkaclients/KafkaDecorator.java

@@ -21,6 +21,7 @@ import io.opentelemetry.trace.Span;
 import io.opentelemetry.trace.Tracer;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.record.TimestampType;
 
 public class KafkaDecorator extends ClientDecorator {
   public static final KafkaDecorator DECORATE = new KafkaDecorator();
@@ -47,16 +48,23 @@ public class KafkaDecorator extends ClientDecorator {
     return "destination";
   }
 
-  public void onConsume(final Span span, final ConsumerRecord record) {
+  public void onConsume(final Span span, final long startTimeMillis, final ConsumerRecord record) {
     span.setAttribute("partition", record.partition());
     span.setAttribute("offset", record.offset());
+    // don't record a duration if the message was sent from an old Kafka client
+    if (record.timestampType() != TimestampType.NO_TIMESTAMP_TYPE) {
+      final long produceTime = record.timestamp();
+      // this attribute shows how much time elapsed between the producer and the consumer of this
+      // message, which can be helpful for identifying queue bottlenecks
+      span.setAttribute("record.queue_time_ms", Math.max(0L, startTimeMillis - produceTime));
+    }
   }
 
   public void onProduce(final Span span, final ProducerRecord record) {
     if (record != null) {
       final Integer partition = record.partition();
       if (partition != null) {
-        span.setAttribute("kafka.partition", partition);
+        span.setAttribute("partition", partition);
       }
     }
   }

+ 4 - 1
instrumentation/kafka-clients-0.11/src/main/java/io/opentelemetry/auto/instrumentation/kafkaclients/TracingIterator.java

@@ -25,6 +25,7 @@ import io.opentelemetry.auto.instrumentation.api.SpanWithScope;
 import io.opentelemetry.trace.Span;
 import io.opentelemetry.trace.SpanContext;
 import java.util.Iterator;
+import java.util.concurrent.TimeUnit;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 
@@ -81,9 +82,11 @@ public class TracingIterator implements Iterator<ConsumerRecord> {
             spanBuilder.addLink(spanContext);
           }
         }
+        final long startTimeMillis = System.currentTimeMillis();
+        spanBuilder.setStartTimestamp(TimeUnit.MILLISECONDS.toNanos(startTimeMillis));
         final Span span = spanBuilder.startSpan();
         decorator.afterStart(span);
-        decorator.onConsume(span, next);
+        decorator.onConsume(span, startTimeMillis, next);
         currentSpanWithScope = new SpanWithScope(span, currentContextWith(span));
       }
     } catch (final Exception e) {

+ 3 - 1
instrumentation/kafka-clients-0.11/src/test/groovy/KafkaClientTest.groovy

@@ -115,6 +115,7 @@ class KafkaClientTest extends AgentTestRunner {
           tags {
             "partition" { it >= 0 }
             "offset" 0
+            "record.queue_time_ms" { it >= 0 }
           }
         }
       }
@@ -168,7 +169,7 @@ class KafkaClientTest extends AgentTestRunner {
           errored false
           parent()
           tags {
-            "kafka.partition" { it >= 0 }
+            "partition" { it >= 0 }
           }
         }
         span(1) {
@@ -179,6 +180,7 @@ class KafkaClientTest extends AgentTestRunner {
           tags {
             "partition" { it >= 0 }
             "offset" 0
+            "record.queue_time_ms" { it >= 0 }
           }
         }
       }

+ 2 - 0
instrumentation/kafka-streams-0.11/src/test/groovy/KafkaStreamsTest.groovy

@@ -153,6 +153,7 @@ class KafkaStreamsTest extends AgentTestRunner {
           tags {
             "partition" { it >= 0 }
             "offset" 0
+            "record.queue_time_ms" { it >= 0 }
           }
         }
         // STREAMING span 1
@@ -185,6 +186,7 @@ class KafkaStreamsTest extends AgentTestRunner {
           tags {
             "partition" { it >= 0 }
             "offset" 0
+            "record.queue_time_ms" { it >= 0 }
             "testing" 123
           }
         }