123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188 |
- /*
- * Copyright The OpenTelemetry Authors
- * SPDX-License-Identifier: Apache-2.0
- */
- package io.opentelemetry.instrumentation.testing.util;
- import static java.util.stream.Collectors.toList;
- import static org.assertj.core.api.Assertions.assertThat;
- import io.opentelemetry.api.trace.SpanId;
- import io.opentelemetry.api.trace.SpanKind;
- import io.opentelemetry.sdk.trace.data.SpanData;
- import java.util.ArrayList;
- import java.util.Arrays;
- import java.util.Comparator;
- import java.util.HashMap;
- import java.util.List;
- import java.util.Map;
- import java.util.concurrent.TimeUnit;
- import java.util.concurrent.TimeoutException;
- import java.util.function.Supplier;
- import java.util.stream.Collectors;
- public final class TelemetryDataUtil {
- public static Comparator<List<SpanData>> orderByRootSpanKind(SpanKind... spanKinds) {
- List<SpanKind> list = Arrays.asList(spanKinds);
- return Comparator.comparing(span -> list.indexOf(span.get(0).getKind()));
- }
- public static Comparator<List<SpanData>> orderByRootSpanName(String... names) {
- List<String> list = Arrays.asList(names);
- return Comparator.comparing(span -> list.indexOf(span.get(0).getName()));
- }
- public static List<List<SpanData>> groupTraces(List<SpanData> spans) {
- List<List<SpanData>> traces =
- new ArrayList<>(
- spans.stream().collect(Collectors.groupingBy(SpanData::getTraceId)).values());
- sortTraces(traces);
- for (int i = 0; i < traces.size(); i++) {
- List<SpanData> trace = traces.get(i);
- traces.set(i, sort(trace));
- }
- return traces;
- }
- public static List<List<SpanData>> waitForTraces(
- Supplier<List<SpanData>> supplier, int number, boolean verifyScopeVersion)
- throws InterruptedException, TimeoutException {
- return waitForTraces(supplier, number, 20, TimeUnit.SECONDS, verifyScopeVersion);
- }
- public static List<List<SpanData>> waitForTraces(
- Supplier<List<SpanData>> supplier,
- int number,
- long timeout,
- TimeUnit unit,
- boolean verifyScopeVersion)
- throws InterruptedException, TimeoutException {
- long startTime = System.nanoTime();
- List<List<SpanData>> allTraces = groupTraces(supplier.get());
- List<List<SpanData>> completeTraces =
- allTraces.stream().filter(TelemetryDataUtil::isCompleted).collect(toList());
- while (completeTraces.size() < number && elapsedSeconds(startTime) < unit.toSeconds(timeout)) {
- allTraces = groupTraces(supplier.get());
- completeTraces = allTraces.stream().filter(TelemetryDataUtil::isCompleted).collect(toList());
- Thread.sleep(10);
- }
- if (completeTraces.size() < number) {
- throw new TimeoutException(
- "Timeout waiting for "
- + number
- + " completed trace(s), found "
- + completeTraces.size()
- + " completed trace(s) and "
- + allTraces.size()
- + " total trace(s): "
- + allTraces);
- }
- if (verifyScopeVersion) {
- // TODO (trask) is there a better location for this assertion?
- for (List<SpanData> trace : completeTraces) {
- for (SpanData span : trace) {
- if (!span.getInstrumentationScopeInfo().getName().equals("test")) {
- assertThat(span.getInstrumentationScopeInfo().getVersion())
- .as(
- "Instrumentation version was empty; make sure that the instrumentation name matches the gradle module name")
- .isNotNull();
- }
- }
- }
- }
- return completeTraces;
- }
- private static long elapsedSeconds(long startTime) {
- return TimeUnit.NANOSECONDS.toSeconds(System.nanoTime() - startTime);
- }
- // must be called under tracesLock
- private static void sortTraces(List<List<SpanData>> traces) {
- traces.sort(Comparator.comparingLong(TelemetryDataUtil::getMinSpanOrder));
- }
- private static long getMinSpanOrder(List<SpanData> spans) {
- return spans.stream().mapToLong(SpanData::getStartEpochNanos).min().orElse(0);
- }
- @SuppressWarnings("UnstableApiUsage")
- private static List<SpanData> sort(List<SpanData> trace) {
- Map<String, Node> lookup = new HashMap<>();
- for (SpanData span : trace) {
- lookup.put(span.getSpanId(), new Node(span));
- }
- for (Node node : lookup.values()) {
- String parentSpanId = node.span.getParentSpanId();
- if (SpanId.isValid(parentSpanId)) {
- Node parentNode = lookup.get(parentSpanId);
- if (parentNode != null) {
- parentNode.childNodes.add(node);
- node.root = false;
- }
- }
- }
- List<Node> rootNodes = new ArrayList<>();
- for (Node node : lookup.values()) {
- sortOneLevel(node.childNodes);
- if (node.root) {
- rootNodes.add(node);
- }
- }
- sortOneLevel(rootNodes);
- List<Node> orderedNodes = new ArrayList<>();
- for (Node rootNode : rootNodes) {
- traversePreOrder(rootNode, orderedNodes);
- }
- List<SpanData> orderedSpans = new ArrayList<>();
- for (Node node : orderedNodes) {
- orderedSpans.add(node.span);
- }
- return orderedSpans;
- }
- private static void sortOneLevel(List<Node> nodes) {
- nodes.sort(Comparator.comparingLong(node -> node.span.getStartEpochNanos()));
- }
- private static void traversePreOrder(Node node, List<Node> accumulator) {
- accumulator.add(node);
- for (Node child : node.childNodes) {
- traversePreOrder(child, accumulator);
- }
- }
- // trace is completed if root span is present
- private static boolean isCompleted(List<SpanData> trace) {
- for (SpanData span : trace) {
- if (!SpanId.isValid(span.getParentSpanId())) {
- return true;
- }
- if (span.getParentSpanId().equals("0000000000000456")) {
- // this is a special parent id that some tests use
- return true;
- }
- }
- return false;
- }
- private static class Node {
- private final SpanData span;
- private final List<Node> childNodes = new ArrayList<>();
- private boolean root = true;
- private Node(SpanData span) {
- this.span = span;
- }
- }
- private TelemetryDataUtil() {}
- }
|