|
@@ -25,6 +25,7 @@ package io.opentelemetry.instrumentation.rxjava3;
|
|
|
import io.opentelemetry.context.Context;
|
|
|
import io.opentelemetry.context.Scope;
|
|
|
import io.opentelemetry.instrumentation.api.annotation.support.async.AsyncOperationEndStrategies;
|
|
|
+import io.opentelemetry.instrumentation.api.internal.GuardedBy;
|
|
|
import io.reactivex.rxjava3.core.Completable;
|
|
|
import io.reactivex.rxjava3.core.CompletableObserver;
|
|
|
import io.reactivex.rxjava3.core.Flowable;
|
|
@@ -40,7 +41,6 @@ import io.reactivex.rxjava3.internal.fuseable.ConditionalSubscriber;
|
|
|
import io.reactivex.rxjava3.parallel.ParallelFlowable;
|
|
|
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
|
|
|
import javax.annotation.Nullable;
|
|
|
-import org.checkerframework.checker.lock.qual.GuardedBy;
|
|
|
import org.reactivestreams.Subscriber;
|
|
|
|
|
|
/**
|
|
@@ -158,6 +158,7 @@ public final class TracingAssembly {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ @GuardedBy("TracingAssembly.class")
|
|
|
@SuppressWarnings({"rawtypes", "unchecked"})
|
|
|
private static void enableParallel() {
|
|
|
oldOnParallelAssembly = RxJavaPlugins.getOnParallelAssembly();
|
|
@@ -167,6 +168,7 @@ public final class TracingAssembly {
|
|
|
parallelFlowable -> new TracingParallelFlowable(parallelFlowable, Context.current())));
|
|
|
}
|
|
|
|
|
|
+ @GuardedBy("TracingAssembly.class")
|
|
|
private static void enableCompletable() {
|
|
|
oldOnCompletableSubscribe = RxJavaPlugins.getOnCompletableSubscribe();
|
|
|
RxJavaPlugins.setOnCompletableSubscribe(
|
|
@@ -180,6 +182,7 @@ public final class TracingAssembly {
|
|
|
}));
|
|
|
}
|
|
|
|
|
|
+ @GuardedBy("TracingAssembly.class")
|
|
|
@SuppressWarnings({"rawtypes", "unchecked"})
|
|
|
private static void enableFlowable() {
|
|
|
oldOnFlowableSubscribe = RxJavaPlugins.getOnFlowableSubscribe();
|
|
@@ -199,6 +202,7 @@ public final class TracingAssembly {
|
|
|
}));
|
|
|
}
|
|
|
|
|
|
+ @GuardedBy("TracingAssembly.class")
|
|
|
@SuppressWarnings({"rawtypes", "unchecked"})
|
|
|
private static void enableObservable() {
|
|
|
oldOnObservableSubscribe = RxJavaPlugins.getOnObservableSubscribe();
|
|
@@ -213,6 +217,7 @@ public final class TracingAssembly {
|
|
|
}));
|
|
|
}
|
|
|
|
|
|
+ @GuardedBy("TracingAssembly.class")
|
|
|
@SuppressWarnings({"rawtypes", "unchecked"})
|
|
|
private static void enableSingle() {
|
|
|
oldOnSingleSubscribe = RxJavaPlugins.getOnSingleSubscribe();
|
|
@@ -227,6 +232,7 @@ public final class TracingAssembly {
|
|
|
}));
|
|
|
}
|
|
|
|
|
|
+ @GuardedBy("TracingAssembly.class")
|
|
|
@SuppressWarnings({"rawtypes", "unchecked"})
|
|
|
private static void enableMaybe() {
|
|
|
oldOnMaybeSubscribe = RxJavaPlugins.getOnMaybeSubscribe();
|
|
@@ -254,31 +260,37 @@ public final class TracingAssembly {
|
|
|
AsyncOperationEndStrategies.instance().registerStrategy(asyncOperationEndStrategy);
|
|
|
}
|
|
|
|
|
|
+ @GuardedBy("TracingAssembly.class")
|
|
|
private static void disableParallel() {
|
|
|
RxJavaPlugins.setOnParallelAssembly(oldOnParallelAssembly);
|
|
|
oldOnParallelAssembly = null;
|
|
|
}
|
|
|
|
|
|
+ @GuardedBy("TracingAssembly.class")
|
|
|
private static void disableObservable() {
|
|
|
RxJavaPlugins.setOnObservableSubscribe(oldOnObservableSubscribe);
|
|
|
oldOnObservableSubscribe = null;
|
|
|
}
|
|
|
|
|
|
+ @GuardedBy("TracingAssembly.class")
|
|
|
private static void disableCompletable() {
|
|
|
RxJavaPlugins.setOnCompletableSubscribe(oldOnCompletableSubscribe);
|
|
|
oldOnCompletableSubscribe = null;
|
|
|
}
|
|
|
|
|
|
+ @GuardedBy("TracingAssembly.class")
|
|
|
private static void disableFlowable() {
|
|
|
RxJavaPlugins.setOnFlowableSubscribe(oldOnFlowableSubscribe);
|
|
|
oldOnFlowableSubscribe = null;
|
|
|
}
|
|
|
|
|
|
+ @GuardedBy("TracingAssembly.class")
|
|
|
private static void disableSingle() {
|
|
|
RxJavaPlugins.setOnSingleSubscribe(oldOnSingleSubscribe);
|
|
|
oldOnSingleSubscribe = null;
|
|
|
}
|
|
|
|
|
|
+ @GuardedBy("TracingAssembly.class")
|
|
|
@SuppressWarnings({"rawtypes", "unchecked"})
|
|
|
private static void disableMaybe() {
|
|
|
RxJavaPlugins.setOnMaybeSubscribe(
|