KafkaStreamsDefaultTest.groovy 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225
  1. /*
  2. * Copyright The OpenTelemetry Authors
  3. * SPDX-License-Identifier: Apache-2.0
  4. */
  5. import io.opentelemetry.api.trace.Span
  6. import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator
  7. import io.opentelemetry.context.Context
  8. import io.opentelemetry.context.propagation.TextMapGetter
  9. import io.opentelemetry.sdk.trace.data.SpanData
  10. import io.opentelemetry.semconv.trace.attributes.SemanticAttributes
  11. import org.apache.kafka.clients.producer.ProducerRecord
  12. import org.apache.kafka.common.header.Headers
  13. import org.apache.kafka.common.serialization.Serdes
  14. import org.apache.kafka.streams.KafkaStreams
  15. import org.apache.kafka.streams.StreamsConfig
  16. import org.apache.kafka.streams.kstream.KStream
  17. import org.apache.kafka.streams.kstream.ValueMapper
  18. import java.time.Duration
  19. import static io.opentelemetry.api.trace.SpanKind.CONSUMER
  20. import static io.opentelemetry.api.trace.SpanKind.PRODUCER
  21. class KafkaStreamsDefaultTest extends KafkaStreamsBaseTest {
  22. def "test kafka produce and consume with streams in-between"() {
  23. setup:
  24. def config = new Properties()
  25. config.putAll(producerProps(KafkaStreamsBaseTest.kafka.bootstrapServers))
  26. config.put(StreamsConfig.APPLICATION_ID_CONFIG, "test-application")
  27. config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName())
  28. config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName())
  29. // CONFIGURE PROCESSOR
  30. def builder
  31. try {
  32. // Different class names for test and latestDepTest.
  33. builder = Class.forName("org.apache.kafka.streams.kstream.KStreamBuilder").newInstance()
  34. } catch (ClassNotFoundException | NoClassDefFoundError e) {
  35. builder = Class.forName("org.apache.kafka.streams.StreamsBuilder").newInstance()
  36. }
  37. KStream<Integer, String> textLines = builder.stream(STREAM_PENDING)
  38. def values = textLines
  39. .mapValues(new ValueMapper<String, String>() {
  40. @Override
  41. String apply(String textLine) {
  42. Span.current().setAttribute("asdf", "testing")
  43. return textLine.toLowerCase()
  44. }
  45. })
  46. KafkaStreams streams
  47. try {
  48. // Different api for test and latestDepTest.
  49. values.to(Serdes.Integer(), Serdes.String(), STREAM_PROCESSED)
  50. streams = new KafkaStreams(builder, config)
  51. } catch (MissingMethodException e) {
  52. def producer = Class.forName("org.apache.kafka.streams.kstream.Produced")
  53. .with(Serdes.Integer(), Serdes.String())
  54. values.to(STREAM_PROCESSED, producer)
  55. streams = new KafkaStreams(builder.build(), config)
  56. }
  57. streams.start()
  58. when:
  59. String greeting = "TESTING TESTING 123!"
  60. KafkaStreamsBaseTest.producer.send(new ProducerRecord<>(STREAM_PENDING, 10, greeting))
  61. then:
  62. awaitUntilConsumerIsReady()
  63. def records = KafkaStreamsBaseTest.consumer.poll(Duration.ofSeconds(10).toMillis())
  64. Headers receivedHeaders = null
  65. for (record in records) {
  66. Span.current().setAttribute("testing", 123)
  67. assert record.key() == 10
  68. assert record.value() == greeting.toLowerCase()
  69. if (receivedHeaders == null) {
  70. receivedHeaders = record.headers()
  71. }
  72. }
  73. assertTraces(3) {
  74. traces.sort(orderByRootSpanName(
  75. STREAM_PENDING + " send",
  76. STREAM_PENDING + " receive",
  77. STREAM_PROCESSED + " receive"))
  78. SpanData producerPending, producerProcessed
  79. trace(0, 1) {
  80. // kafka-clients PRODUCER
  81. span(0) {
  82. name STREAM_PENDING + " send"
  83. kind PRODUCER
  84. hasNoParent()
  85. attributes {
  86. "$SemanticAttributes.MESSAGING_SYSTEM" "kafka"
  87. "$SemanticAttributes.MESSAGING_DESTINATION_NAME" STREAM_PENDING
  88. "$SemanticAttributes.MESSAGING_DESTINATION_KIND" "topic"
  89. "$SemanticAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION" { it >= 0 }
  90. "$SemanticAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET" 0
  91. "$SemanticAttributes.MESSAGING_KAFKA_MESSAGE_KEY" "10"
  92. }
  93. }
  94. producerPending = span(0)
  95. }
  96. trace(1, 3) {
  97. // kafka-clients CONSUMER receive
  98. span(0) {
  99. name STREAM_PENDING + " receive"
  100. kind CONSUMER
  101. hasNoParent()
  102. attributes {
  103. "$SemanticAttributes.MESSAGING_SYSTEM" "kafka"
  104. "$SemanticAttributes.MESSAGING_DESTINATION_NAME" STREAM_PENDING
  105. "$SemanticAttributes.MESSAGING_DESTINATION_KIND" "topic"
  106. "$SemanticAttributes.MESSAGING_OPERATION" "receive"
  107. if (Boolean.getBoolean("testLatestDeps")) {
  108. "$SemanticAttributes.MESSAGING_KAFKA_CONSUMER_GROUP" "test-application"
  109. }
  110. }
  111. }
  112. // kafka-stream CONSUMER
  113. span(1) {
  114. name STREAM_PENDING + " process"
  115. kind CONSUMER
  116. childOf span(0)
  117. hasLink(producerPending)
  118. attributes {
  119. "$SemanticAttributes.MESSAGING_SYSTEM" "kafka"
  120. "$SemanticAttributes.MESSAGING_DESTINATION_NAME" STREAM_PENDING
  121. "$SemanticAttributes.MESSAGING_DESTINATION_KIND" "topic"
  122. "$SemanticAttributes.MESSAGING_OPERATION" "process"
  123. "$SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES" Long
  124. "$SemanticAttributes.MESSAGING_KAFKA_SOURCE_PARTITION" { it >= 0 }
  125. "$SemanticAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET" 0
  126. "$SemanticAttributes.MESSAGING_KAFKA_MESSAGE_KEY" "10"
  127. "kafka.record.queue_time_ms" { it >= 0 }
  128. "asdf" "testing"
  129. }
  130. }
  131. // kafka-clients PRODUCER
  132. span(2) {
  133. name STREAM_PROCESSED + " send"
  134. kind PRODUCER
  135. childOf span(1)
  136. attributes {
  137. "$SemanticAttributes.MESSAGING_SYSTEM" "kafka"
  138. "$SemanticAttributes.MESSAGING_DESTINATION_NAME" STREAM_PROCESSED
  139. "$SemanticAttributes.MESSAGING_DESTINATION_KIND" "topic"
  140. "$SemanticAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION" { it >= 0 }
  141. "$SemanticAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET" 0
  142. }
  143. }
  144. producerProcessed = span(2)
  145. }
  146. trace(2, 2) {
  147. // kafka-clients CONSUMER receive
  148. span(0) {
  149. name STREAM_PROCESSED + " receive"
  150. kind CONSUMER
  151. hasNoParent()
  152. attributes {
  153. "$SemanticAttributes.MESSAGING_SYSTEM" "kafka"
  154. "$SemanticAttributes.MESSAGING_DESTINATION_NAME" STREAM_PROCESSED
  155. "$SemanticAttributes.MESSAGING_DESTINATION_KIND" "topic"
  156. "$SemanticAttributes.MESSAGING_OPERATION" "receive"
  157. if (Boolean.getBoolean("testLatestDeps")) {
  158. "$SemanticAttributes.MESSAGING_KAFKA_CONSUMER_GROUP" "test"
  159. }
  160. }
  161. }
  162. // kafka-clients CONSUMER process
  163. span(1) {
  164. name STREAM_PROCESSED + " process"
  165. kind CONSUMER
  166. childOf span(0)
  167. hasLink producerProcessed
  168. attributes {
  169. "$SemanticAttributes.MESSAGING_SYSTEM" "kafka"
  170. "$SemanticAttributes.MESSAGING_DESTINATION_NAME" STREAM_PROCESSED
  171. "$SemanticAttributes.MESSAGING_DESTINATION_KIND" "topic"
  172. "$SemanticAttributes.MESSAGING_OPERATION" "process"
  173. "$SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES" Long
  174. "$SemanticAttributes.MESSAGING_KAFKA_SOURCE_PARTITION" { it >= 0 }
  175. "$SemanticAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET" 0
  176. "$SemanticAttributes.MESSAGING_KAFKA_MESSAGE_KEY" "10"
  177. if (Boolean.getBoolean("testLatestDeps")) {
  178. "$SemanticAttributes.MESSAGING_KAFKA_CONSUMER_GROUP" "test"
  179. }
  180. "kafka.record.queue_time_ms" { it >= 0 }
  181. "testing" 123
  182. }
  183. }
  184. }
  185. }
  186. receivedHeaders.iterator().hasNext()
  187. def traceparent = new String(receivedHeaders.headers("traceparent").iterator().next().value())
  188. Context context = W3CTraceContextPropagator.instance.extract(Context.root(), "", new TextMapGetter<String>() {
  189. @Override
  190. Iterable<String> keys(String carrier) {
  191. return Collections.singleton("traceparent")
  192. }
  193. @Override
  194. String get(String carrier, String key) {
  195. if (key == "traceparent") {
  196. return traceparent
  197. }
  198. return null
  199. }
  200. })
  201. def spanContext = Span.fromContext(context).getSpanContext()
  202. def streamTrace = traces.find { it.size() == 3 }
  203. def streamSendSpan = streamTrace[2]
  204. spanContext.traceId == streamSendSpan.traceId
  205. spanContext.spanId == streamSendSpan.spanId
  206. }
  207. }