Browse Source

Use internal Marshaler in testing exporters instead of gRPC. (#5332)

Anuraag Agrawal 3 years ago
parent
commit
85f9f0192b

+ 1 - 7
testing/agent-exporter/build.gradle.kts

@@ -18,13 +18,7 @@ dependencies {
   compileOnly(project(":instrumentation-appender-api-internal"))
   compileOnly(project(":instrumentation-appender-sdk-internal"))
 
-  implementation("io.grpc:grpc-core:1.33.1")
-  implementation("io.grpc:grpc-protobuf:1.33.1")
-  implementation("io.grpc:grpc-stub:1.33.1")
-  implementation("io.opentelemetry:opentelemetry-exporter-otlp")
-  implementation("io.opentelemetry:opentelemetry-exporter-otlp-metrics")
-  implementation("io.opentelemetry:opentelemetry-exporter-otlp-logs")
-  implementation("io.opentelemetry.proto:opentelemetry-proto")
+  implementation("io.opentelemetry:opentelemetry-exporter-otlp-common")
   compileOnly("io.opentelemetry:opentelemetry-sdk-extension-autoconfigure")
   compileOnly("org.slf4j:slf4j-api")
 }

+ 2 - 1
testing/agent-exporter/src/main/java/io/opentelemetry/javaagent/testing/exporter/AgentTestingExporterPropertySource.java

@@ -15,8 +15,9 @@ public class AgentTestingExporterPropertySource implements ConfigPropertySource
   @Override
   public Map<String, String> getProperties() {
     Map<String, String> properties = new HashMap<>();
-    properties.put("otel.traces.exporter", "none");
+    properties.put("otel.logs.exporter", "none");
     properties.put("otel.metrics.exporter", "none");
+    properties.put("otel.traces.exporter", "none");
     return properties;
   }
 }

+ 2 - 3
testing/agent-exporter/src/main/java/io/opentelemetry/javaagent/testing/exporter/AgentTestingLogsCustomizer.java

@@ -12,7 +12,7 @@ import io.opentelemetry.javaagent.extension.AgentListener;
 import io.opentelemetry.javaagent.instrumentation.api.appender.internal.AgentLogEmitterProvider;
 import io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdk;
 import io.opentelemetry.sdk.logs.SdkLogEmitterProvider;
-import io.opentelemetry.sdk.logs.export.BatchLogProcessor;
+import io.opentelemetry.sdk.logs.export.SimpleLogProcessor;
 
 @AutoService(AgentListener.class)
 public class AgentTestingLogsCustomizer implements AgentListener {
@@ -24,8 +24,7 @@ public class AgentTestingLogsCustomizer implements AgentListener {
     SdkLogEmitterProvider logEmitterProvider =
         SdkLogEmitterProvider.builder()
             .setResource(autoConfiguredOpenTelemetrySdk.getResource())
-            .addLogProcessor(
-                BatchLogProcessor.builder(AgentTestingExporterFactory.logExporter).build())
+            .addLogProcessor(SimpleLogProcessor.create(AgentTestingExporterFactory.logExporter))
             .build();
 
     AgentLogEmitterProvider.resetForTest();

+ 19 - 59
testing/agent-exporter/src/main/java/io/opentelemetry/javaagent/testing/exporter/OtlpInMemoryLogExporter.java

@@ -5,24 +5,18 @@
 
 package io.opentelemetry.javaagent.testing.exporter;
 
-import io.grpc.Server;
-import io.grpc.inprocess.InProcessChannelBuilder;
-import io.grpc.inprocess.InProcessServerBuilder;
-import io.grpc.stub.StreamObserver;
-import io.opentelemetry.exporter.otlp.logs.OtlpGrpcLogExporter;
-import io.opentelemetry.proto.collector.logs.v1.ExportLogsServiceRequest;
-import io.opentelemetry.proto.collector.logs.v1.ExportLogsServiceResponse;
-import io.opentelemetry.proto.collector.logs.v1.LogsServiceGrpc;
+import io.opentelemetry.exporter.internal.otlp.logs.LogsRequestMarshaler;
 import io.opentelemetry.sdk.common.CompletableResultCode;
 import io.opentelemetry.sdk.logs.data.LogData;
 import io.opentelemetry.sdk.logs.export.LogExporter;
+import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -30,73 +24,39 @@ class OtlpInMemoryLogExporter implements LogExporter {
 
   private static final Logger logger = LoggerFactory.getLogger(OtlpInMemoryLogExporter.class);
 
-  private final BlockingQueue<ExportLogsServiceRequest> collectedRequests =
-      new LinkedBlockingQueue<>();
+  private final Queue<byte[]> collectedRequests = new ConcurrentLinkedQueue<>();
 
   List<byte[]> getCollectedExportRequests() {
-    return collectedRequests.stream()
-        .map(ExportLogsServiceRequest::toByteArray)
-        .collect(Collectors.toList());
+    return new ArrayList<>(collectedRequests);
   }
 
   void reset() {
-    delegate.flush().join(1, TimeUnit.SECONDS);
     collectedRequests.clear();
   }
 
-  private final Server collector;
-  private final LogExporter delegate;
-
-  // TODO(anuraaga): https://github.com/open-telemetry/opentelemetry-java-instrumentation/pull/5314
-  @SuppressWarnings("deprecation")
-  OtlpInMemoryLogExporter() {
-    String serverName = InProcessServerBuilder.generateName();
-
-    collector =
-        InProcessServerBuilder.forName(serverName)
-            .directExecutor()
-            .addService(new InMemoryOtlpCollector())
-            .build();
-    try {
-      collector.start();
-    } catch (IOException e) {
-      throw new AssertionError("Could not start in-process collector.", e);
-    }
-
-    delegate =
-        OtlpGrpcLogExporter.builder()
-            .setChannel(InProcessChannelBuilder.forName(serverName).directExecutor().build())
-            .build();
-  }
-
   @Override
   public CompletableResultCode export(Collection<LogData> logs) {
     for (LogData log : logs) {
       logger.info("Exporting log {}", log);
     }
-    return delegate.export(logs);
+    ByteArrayOutputStream bos = new ByteArrayOutputStream();
+    try {
+      LogsRequestMarshaler.create(logs).writeBinaryTo(bos);
+    } catch (IOException e) {
+      throw new UncheckedIOException(e);
+    }
+    collectedRequests.add(bos.toByteArray());
+    return CompletableResultCode.ofSuccess();
   }
 
   @Override
   public CompletableResultCode flush() {
-    return delegate.flush();
+    return CompletableResultCode.ofSuccess();
   }
 
   @Override
   public CompletableResultCode shutdown() {
-    collector.shutdown();
-    return delegate.shutdown();
-  }
-
-  private class InMemoryOtlpCollector extends LogsServiceGrpc.LogsServiceImplBase {
-
-    @Override
-    public void export(
-        ExportLogsServiceRequest request,
-        StreamObserver<ExportLogsServiceResponse> responseObserver) {
-      collectedRequests.add(request);
-      responseObserver.onNext(ExportLogsServiceResponse.getDefaultInstance());
-      responseObserver.onCompleted();
-    }
+    reset();
+    return CompletableResultCode.ofSuccess();
   }
 }

+ 18 - 58
testing/agent-exporter/src/main/java/io/opentelemetry/javaagent/testing/exporter/OtlpInMemoryMetricExporter.java

@@ -5,24 +5,18 @@
 
 package io.opentelemetry.javaagent.testing.exporter;
 
-import io.grpc.Server;
-import io.grpc.inprocess.InProcessChannelBuilder;
-import io.grpc.inprocess.InProcessServerBuilder;
-import io.grpc.stub.StreamObserver;
-import io.opentelemetry.exporter.otlp.metrics.OtlpGrpcMetricExporter;
-import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest;
-import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceResponse;
-import io.opentelemetry.proto.collector.metrics.v1.MetricsServiceGrpc;
+import io.opentelemetry.exporter.internal.otlp.metrics.MetricsRequestMarshaler;
 import io.opentelemetry.sdk.common.CompletableResultCode;
 import io.opentelemetry.sdk.metrics.data.MetricData;
 import io.opentelemetry.sdk.metrics.export.MetricExporter;
+import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -30,70 +24,36 @@ class OtlpInMemoryMetricExporter implements MetricExporter {
 
   private static final Logger logger = LoggerFactory.getLogger(OtlpInMemoryMetricExporter.class);
 
-  private final BlockingQueue<ExportMetricsServiceRequest> collectedRequests =
-      new LinkedBlockingQueue<>();
+  private final Queue<byte[]> collectedRequests = new ConcurrentLinkedQueue<>();
 
   List<byte[]> getCollectedExportRequests() {
-    return collectedRequests.stream()
-        .map(ExportMetricsServiceRequest::toByteArray)
-        .collect(Collectors.toList());
+    return new ArrayList<>(collectedRequests);
   }
 
   void reset() {
-    delegate.flush().join(1, TimeUnit.SECONDS);
     collectedRequests.clear();
   }
 
-  private final Server collector;
-  private final MetricExporter delegate;
-
-  // TODO(anuraaga): https://github.com/open-telemetry/opentelemetry-java-instrumentation/pull/5314
-  @SuppressWarnings("deprecation")
-  OtlpInMemoryMetricExporter() {
-    String serverName = InProcessServerBuilder.generateName();
-
-    collector =
-        InProcessServerBuilder.forName(serverName)
-            .directExecutor()
-            .addService(new InMemoryOtlpCollector())
-            .build();
+  @Override
+  public CompletableResultCode export(Collection<MetricData> metrics) {
+    ByteArrayOutputStream bos = new ByteArrayOutputStream();
     try {
-      collector.start();
+      MetricsRequestMarshaler.create(metrics).writeBinaryTo(bos);
     } catch (IOException e) {
-      throw new AssertionError("Could not start in-process collector.", e);
+      throw new UncheckedIOException(e);
     }
-
-    delegate =
-        OtlpGrpcMetricExporter.builder()
-            .setChannel(InProcessChannelBuilder.forName(serverName).directExecutor().build())
-            .build();
-  }
-
-  @Override
-  public CompletableResultCode export(Collection<MetricData> metrics) {
-    return delegate.export(metrics);
+    collectedRequests.add(bos.toByteArray());
+    return CompletableResultCode.ofSuccess();
   }
 
   @Override
   public CompletableResultCode flush() {
-    return delegate.flush();
+    return CompletableResultCode.ofSuccess();
   }
 
   @Override
   public CompletableResultCode shutdown() {
-    collector.shutdown();
-    return delegate.shutdown();
-  }
-
-  private class InMemoryOtlpCollector extends MetricsServiceGrpc.MetricsServiceImplBase {
-
-    @Override
-    public void export(
-        ExportMetricsServiceRequest request,
-        StreamObserver<ExportMetricsServiceResponse> responseObserver) {
-      collectedRequests.add(request);
-      responseObserver.onNext(ExportMetricsServiceResponse.getDefaultInstance());
-      responseObserver.onCompleted();
-    }
+    reset();
+    return CompletableResultCode.ofSuccess();
   }
 }

+ 19 - 59
testing/agent-exporter/src/main/java/io/opentelemetry/javaagent/testing/exporter/OtlpInMemorySpanExporter.java

@@ -5,24 +5,18 @@
 
 package io.opentelemetry.javaagent.testing.exporter;
 
-import io.grpc.Server;
-import io.grpc.inprocess.InProcessChannelBuilder;
-import io.grpc.inprocess.InProcessServerBuilder;
-import io.grpc.stub.StreamObserver;
-import io.opentelemetry.exporter.otlp.trace.OtlpGrpcSpanExporter;
-import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceRequest;
-import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceResponse;
-import io.opentelemetry.proto.collector.trace.v1.TraceServiceGrpc;
+import io.opentelemetry.exporter.internal.otlp.traces.TraceRequestMarshaler;
 import io.opentelemetry.sdk.common.CompletableResultCode;
 import io.opentelemetry.sdk.trace.data.SpanData;
 import io.opentelemetry.sdk.trace.export.SpanExporter;
+import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -30,73 +24,39 @@ class OtlpInMemorySpanExporter implements SpanExporter {
 
   private static final Logger logger = LoggerFactory.getLogger(OtlpInMemorySpanExporter.class);
 
-  private final BlockingQueue<ExportTraceServiceRequest> collectedRequests =
-      new LinkedBlockingQueue<>();
+  private final Queue<byte[]> collectedRequests = new ConcurrentLinkedQueue<>();
 
   List<byte[]> getCollectedExportRequests() {
-    return collectedRequests.stream()
-        .map(ExportTraceServiceRequest::toByteArray)
-        .collect(Collectors.toList());
+    return new ArrayList<>(collectedRequests);
   }
 
   void reset() {
-    delegate.flush().join(1, TimeUnit.SECONDS);
     collectedRequests.clear();
   }
 
-  private final Server collector;
-  private final SpanExporter delegate;
-
-  // TODO(anuraaga): https://github.com/open-telemetry/opentelemetry-java-instrumentation/pull/5314
-  @SuppressWarnings("deprecation")
-  OtlpInMemorySpanExporter() {
-    String serverName = InProcessServerBuilder.generateName();
-
-    collector =
-        InProcessServerBuilder.forName(serverName)
-            .directExecutor()
-            .addService(new InMemoryOtlpCollector())
-            .build();
-    try {
-      collector.start();
-    } catch (IOException e) {
-      throw new AssertionError("Could not start in-process collector.", e);
-    }
-
-    delegate =
-        OtlpGrpcSpanExporter.builder()
-            .setChannel(InProcessChannelBuilder.forName(serverName).directExecutor().build())
-            .build();
-  }
-
   @Override
   public CompletableResultCode export(Collection<SpanData> spans) {
     for (SpanData span : spans) {
       logger.info("Exporting span {}", span);
     }
-    return delegate.export(spans);
+    ByteArrayOutputStream bos = new ByteArrayOutputStream();
+    try {
+      TraceRequestMarshaler.create(spans).writeBinaryTo(bos);
+    } catch (IOException e) {
+      throw new UncheckedIOException(e);
+    }
+    collectedRequests.add(bos.toByteArray());
+    return CompletableResultCode.ofSuccess();
   }
 
   @Override
   public CompletableResultCode flush() {
-    return delegate.flush();
+    return CompletableResultCode.ofSuccess();
   }
 
   @Override
   public CompletableResultCode shutdown() {
-    collector.shutdown();
-    return delegate.shutdown();
-  }
-
-  private class InMemoryOtlpCollector extends TraceServiceGrpc.TraceServiceImplBase {
-
-    @Override
-    public void export(
-        ExportTraceServiceRequest request,
-        StreamObserver<ExportTraceServiceResponse> responseObserver) {
-      collectedRequests.add(request);
-      responseObserver.onNext(ExportTraceServiceResponse.getDefaultInstance());
-      responseObserver.onCompleted();
-    }
+    reset();
+    return CompletableResultCode.ofSuccess();
   }
 }