|
@@ -49,14 +49,17 @@ public final class KafkaTelemetry {
|
|
private final OpenTelemetry openTelemetry;
|
|
private final OpenTelemetry openTelemetry;
|
|
private final Instrumenter<ProducerRecord<?, ?>, Void> producerInstrumenter;
|
|
private final Instrumenter<ProducerRecord<?, ?>, Void> producerInstrumenter;
|
|
private final Instrumenter<ConsumerRecord<?, ?>, Void> consumerProcessInstrumenter;
|
|
private final Instrumenter<ConsumerRecord<?, ?>, Void> consumerProcessInstrumenter;
|
|
|
|
+ private final boolean producerPropagationEnabled;
|
|
|
|
|
|
KafkaTelemetry(
|
|
KafkaTelemetry(
|
|
OpenTelemetry openTelemetry,
|
|
OpenTelemetry openTelemetry,
|
|
Instrumenter<ProducerRecord<?, ?>, Void> producerInstrumenter,
|
|
Instrumenter<ProducerRecord<?, ?>, Void> producerInstrumenter,
|
|
- Instrumenter<ConsumerRecord<?, ?>, Void> consumerProcessInstrumenter) {
|
|
|
|
|
|
+ Instrumenter<ConsumerRecord<?, ?>, Void> consumerProcessInstrumenter,
|
|
|
|
+ boolean producerPropagationEnabled) {
|
|
this.openTelemetry = openTelemetry;
|
|
this.openTelemetry = openTelemetry;
|
|
this.producerInstrumenter = producerInstrumenter;
|
|
this.producerInstrumenter = producerInstrumenter;
|
|
this.consumerProcessInstrumenter = consumerProcessInstrumenter;
|
|
this.consumerProcessInstrumenter = consumerProcessInstrumenter;
|
|
|
|
+ this.producerPropagationEnabled = producerPropagationEnabled;
|
|
}
|
|
}
|
|
|
|
|
|
/** Returns a new {@link KafkaTelemetry} configured with the given {@link OpenTelemetry}. */
|
|
/** Returns a new {@link KafkaTelemetry} configured with the given {@link OpenTelemetry}. */
|
|
@@ -162,23 +165,22 @@ public final class KafkaTelemetry {
|
|
* @param record the producer record to inject span info.
|
|
* @param record the producer record to inject span info.
|
|
*/
|
|
*/
|
|
<K, V> void buildAndInjectSpan(ProducerRecord<K, V> record) {
|
|
<K, V> void buildAndInjectSpan(ProducerRecord<K, V> record) {
|
|
- Context currentContext = Context.current();
|
|
|
|
|
|
+ Context parentContext = Context.current();
|
|
|
|
|
|
- if (!producerInstrumenter.shouldStart(currentContext, record)) {
|
|
|
|
|
|
+ if (!producerInstrumenter.shouldStart(parentContext, record)) {
|
|
return;
|
|
return;
|
|
}
|
|
}
|
|
|
|
|
|
- Context current = producerInstrumenter.start(currentContext, record);
|
|
|
|
- try (Scope ignored = current.makeCurrent()) {
|
|
|
|
|
|
+ Context context = producerInstrumenter.start(parentContext, record);
|
|
|
|
+ if (producerPropagationEnabled) {
|
|
try {
|
|
try {
|
|
- propagator().inject(current, record.headers(), SETTER);
|
|
|
|
|
|
+ propagator().inject(context, record.headers(), SETTER);
|
|
} catch (Throwable t) {
|
|
} catch (Throwable t) {
|
|
// it can happen if headers are read only (when record is sent second time)
|
|
// it can happen if headers are read only (when record is sent second time)
|
|
logger.log(WARNING, "failed to inject span context. sending record second time?", t);
|
|
logger.log(WARNING, "failed to inject span context. sending record second time?", t);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
-
|
|
|
|
- producerInstrumenter.end(current, record, null, null);
|
|
|
|
|
|
+ producerInstrumenter.end(context, record, null, null);
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|