RxJava3AsyncOperationEndStrategyTest.groovy 27 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045
  1. /*
  2. * Copyright The OpenTelemetry Authors
  3. * SPDX-License-Identifier: Apache-2.0
  4. */
  5. import io.opentelemetry.api.trace.Span
  6. import io.opentelemetry.context.Context
  7. import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter
  8. import io.opentelemetry.instrumentation.rxjava.v3.common.RxJava3AsyncOperationEndStrategy
  9. import io.reactivex.rxjava3.core.Completable
  10. import io.reactivex.rxjava3.core.Flowable
  11. import io.reactivex.rxjava3.core.Maybe
  12. import io.reactivex.rxjava3.core.Observable
  13. import io.reactivex.rxjava3.core.Single
  14. import io.reactivex.rxjava3.observers.TestObserver
  15. import io.reactivex.rxjava3.parallel.ParallelFlowable
  16. import io.reactivex.rxjava3.processors.ReplayProcessor
  17. import io.reactivex.rxjava3.processors.UnicastProcessor
  18. import io.reactivex.rxjava3.subjects.CompletableSubject
  19. import io.reactivex.rxjava3.subjects.MaybeSubject
  20. import io.reactivex.rxjava3.subjects.ReplaySubject
  21. import io.reactivex.rxjava3.subjects.SingleSubject
  22. import io.reactivex.rxjava3.subjects.UnicastSubject
  23. import io.reactivex.rxjava3.subscribers.TestSubscriber
  24. import org.reactivestreams.Publisher
  25. import org.reactivestreams.Subscriber
  26. import org.reactivestreams.Subscription
  27. import spock.lang.Specification
  28. class RxJava3AsyncOperationEndStrategyTest extends Specification {
  29. String request = "request"
  30. String response = "response"
  31. Instrumenter<String, String> instrumenter
  32. Context context
  33. Span span
  34. def underTest = RxJava3AsyncOperationEndStrategy.create()
  35. def underTestWithExperimentalAttributes = RxJava3AsyncOperationEndStrategy.builder()
  36. .setCaptureExperimentalSpanAttributes(true)
  37. .build()
  38. void setup() {
  39. instrumenter = Mock()
  40. context = Mock()
  41. span = Mock()
  42. span.storeInContext(_) >> { callRealMethod() }
  43. }
  44. static class CompletableTest extends RxJava3AsyncOperationEndStrategyTest {
  45. def "is supported"() {
  46. expect:
  47. underTest.supports(Completable)
  48. }
  49. def "ends span on already completed"() {
  50. given:
  51. def observer = new TestObserver()
  52. when:
  53. def result = (Completable) underTest.end(instrumenter, context, request, Completable.complete(), String)
  54. result.subscribe(observer)
  55. then:
  56. 1 * instrumenter.end(context, request, null, null)
  57. observer.assertComplete()
  58. }
  59. def "ends span on already errored"() {
  60. given:
  61. def exception = new IllegalStateException()
  62. def observer = new TestObserver()
  63. when:
  64. def result = (Completable) underTest.end(instrumenter, context, request, Completable.error(exception), String)
  65. result.subscribe(observer)
  66. then:
  67. 1 * instrumenter.end(context, request, null, exception)
  68. observer.assertError(exception)
  69. }
  70. def "ends span when completed"() {
  71. given:
  72. def source = CompletableSubject.create()
  73. def observer = new TestObserver()
  74. when:
  75. def result = (Completable) underTest.end(instrumenter, context, request, source, String)
  76. result.subscribe(observer)
  77. then:
  78. 0 * instrumenter._
  79. when:
  80. source.onComplete()
  81. then:
  82. 1 * instrumenter.end(context, request, null, null)
  83. observer.assertComplete()
  84. }
  85. def "ends span when errored"() {
  86. given:
  87. def exception = new IllegalStateException()
  88. def source = CompletableSubject.create()
  89. def observer = new TestObserver()
  90. when:
  91. def result = (Completable) underTest.end(instrumenter, context, request, source, String)
  92. result.subscribe(observer)
  93. then:
  94. 0 * instrumenter._
  95. when:
  96. source.onError(exception)
  97. then:
  98. 1 * instrumenter.end(context, request, null, exception)
  99. observer.assertError(exception)
  100. }
  101. def "ends span when cancelled"() {
  102. given:
  103. def source = CompletableSubject.create()
  104. def observer = new TestObserver()
  105. def context = span.storeInContext(Context.root())
  106. when:
  107. def result = (Completable) underTest.end(instrumenter, context, request, source, String)
  108. result.subscribe(observer)
  109. then:
  110. 0 * instrumenter._
  111. 0 * span._
  112. when:
  113. observer.dispose()
  114. then:
  115. 1 * instrumenter.end(context, request, null, null)
  116. 0 * span.setAttribute(_)
  117. }
  118. def "ends span when cancelled and capturing experimental span attributes"() {
  119. given:
  120. def source = CompletableSubject.create()
  121. def observer = new TestObserver()
  122. def context = span.storeInContext(Context.root())
  123. when:
  124. def result = (Completable) underTestWithExperimentalAttributes.end(instrumenter, context, request, source, String)
  125. result.subscribe(observer)
  126. then:
  127. 0 * instrumenter._
  128. 0 * span._
  129. when:
  130. observer.dispose()
  131. then:
  132. 1 * instrumenter.end(context, request, null, null)
  133. 1 * span.setAttribute({ it.getKey() == "rxjava.canceled" }, true)
  134. }
  135. def "ends span once for multiple subscribers"() {
  136. given:
  137. def source = CompletableSubject.create()
  138. def observer1 = new TestObserver()
  139. def observer2 = new TestObserver()
  140. def observer3 = new TestObserver()
  141. when:
  142. def result = (Completable) underTest.end(instrumenter, context, request, source, String)
  143. result.subscribe(observer1)
  144. result.subscribe(observer2)
  145. result.subscribe(observer3)
  146. then:
  147. 0 * instrumenter._
  148. when:
  149. source.onComplete()
  150. then:
  151. 1 * instrumenter.end(context, request, null, null)
  152. observer1.assertComplete()
  153. observer2.assertComplete()
  154. observer3.assertComplete()
  155. }
  156. }
  157. static class MaybeTest extends RxJava3AsyncOperationEndStrategyTest {
  158. def "is supported"() {
  159. expect:
  160. underTest.supports(Maybe)
  161. }
  162. def "ends span on already completed"() {
  163. given:
  164. def observer = new TestObserver()
  165. when:
  166. def result = (Maybe<?>) underTest.end(instrumenter, context, request, Maybe.just(response), String)
  167. result.subscribe(observer)
  168. then:
  169. 1 * instrumenter.end(context, request, response, null)
  170. observer.assertComplete()
  171. }
  172. def "ends span on already empty"() {
  173. given:
  174. def observer = new TestObserver()
  175. when:
  176. def result = (Maybe<?>) underTest.end(instrumenter, context, request, Maybe.empty(), String)
  177. result.subscribe(observer)
  178. then:
  179. 1 * instrumenter.end(context, request, null, null)
  180. observer.assertComplete()
  181. }
  182. def "ends span on already errored"() {
  183. given:
  184. def exception = new IllegalStateException()
  185. def observer = new TestObserver()
  186. when:
  187. def result = (Maybe<?>) underTest.end(instrumenter, context, request, Maybe.error(exception), String)
  188. result.subscribe(observer)
  189. then:
  190. 1 * instrumenter.end(context, request, null, exception)
  191. observer.assertError(exception)
  192. }
  193. def "ends span when completed"() {
  194. given:
  195. def source = MaybeSubject.create()
  196. def observer = new TestObserver()
  197. when:
  198. def result = (Maybe<?>) underTest.end(instrumenter, context, request, source, String)
  199. result.subscribe(observer)
  200. then:
  201. 0 * instrumenter._
  202. when:
  203. source.onSuccess(response)
  204. then:
  205. 1 * instrumenter.end(context, request, response, null)
  206. observer.assertComplete()
  207. }
  208. def "ends span when empty"() {
  209. given:
  210. def source = MaybeSubject.create()
  211. def observer = new TestObserver()
  212. when:
  213. def result = (Maybe<?>) underTest.end(instrumenter, context, request, source, String)
  214. result.subscribe(observer)
  215. then:
  216. 0 * instrumenter._
  217. when:
  218. source.onComplete()
  219. then:
  220. 1 * instrumenter.end(context, request, null, null)
  221. observer.assertComplete()
  222. }
  223. def "ends span when errored"() {
  224. given:
  225. def exception = new IllegalStateException()
  226. def source = MaybeSubject.create()
  227. def observer = new TestObserver()
  228. when:
  229. def result = (Maybe<?>) underTest.end(instrumenter, context, request, source, String)
  230. result.subscribe(observer)
  231. then:
  232. 0 * instrumenter._
  233. when:
  234. source.onError(exception)
  235. then:
  236. 1 * instrumenter.end(context, request, null, exception)
  237. observer.assertError(exception)
  238. }
  239. def "ends span when cancelled"() {
  240. given:
  241. def source = MaybeSubject.create()
  242. def observer = new TestObserver()
  243. def context = span.storeInContext(Context.root())
  244. when:
  245. def result = (Maybe<?>) underTest.end(instrumenter, context, request, source, String)
  246. result.subscribe(observer)
  247. then:
  248. 0 * instrumenter._
  249. when:
  250. observer.dispose()
  251. then:
  252. 1 * instrumenter.end(context, request, null, null)
  253. 0 * span.setAttribute(_)
  254. }
  255. def "ends span when cancelled and capturing experimental span attributes"() {
  256. given:
  257. def source = MaybeSubject.create()
  258. def observer = new TestObserver()
  259. def context = span.storeInContext(Context.root())
  260. when:
  261. def result = (Maybe<?>) underTestWithExperimentalAttributes.end(instrumenter, context, request, source, String)
  262. result.subscribe(observer)
  263. then:
  264. 0 * instrumenter._
  265. 0 * span._
  266. when:
  267. observer.dispose()
  268. then:
  269. 1 * instrumenter.end(context, request, null, null)
  270. 1 * span.setAttribute({ it.getKey() == "rxjava.canceled" }, true)
  271. }
  272. def "ends span once for multiple subscribers"() {
  273. given:
  274. def source = MaybeSubject.create()
  275. def observer1 = new TestObserver()
  276. def observer2 = new TestObserver()
  277. def observer3 = new TestObserver()
  278. when:
  279. def result = (Maybe<?>) underTest.end(instrumenter, context, request, source, String)
  280. result.subscribe(observer1)
  281. result.subscribe(observer2)
  282. result.subscribe(observer3)
  283. then:
  284. 0 * instrumenter._
  285. when:
  286. source.onSuccess(response)
  287. then:
  288. 1 * instrumenter.end(context, request, response, null)
  289. observer1.assertValue(response)
  290. observer1.assertComplete()
  291. observer2.assertValue(response)
  292. observer2.assertComplete()
  293. observer3.assertValue(response)
  294. observer3.assertComplete()
  295. }
  296. }
  297. static class SingleTest extends RxJava3AsyncOperationEndStrategyTest {
  298. def "is supported"() {
  299. expect:
  300. underTest.supports(Single)
  301. }
  302. def "ends span on already completed"() {
  303. given:
  304. def observer = new TestObserver()
  305. when:
  306. def result = (Single<?>) underTest.end(instrumenter, context, request, Single.just(response), String)
  307. result.subscribe(observer)
  308. then:
  309. 1 * instrumenter.end(context, request, response, null)
  310. observer.assertComplete()
  311. }
  312. def "ends span on already errored"() {
  313. given:
  314. def exception = new IllegalStateException()
  315. def observer = new TestObserver()
  316. when:
  317. def result = (Single<?>) underTest.end(instrumenter, context, request, Single.error(exception), String)
  318. result.subscribe(observer)
  319. then:
  320. 1 * instrumenter.end(context, request, null, exception)
  321. observer.assertError(exception)
  322. }
  323. def "ends span when completed"() {
  324. given:
  325. def source = SingleSubject.create()
  326. def observer = new TestObserver()
  327. when:
  328. def result = (Single<?>) underTest.end(instrumenter, context, request, source, String)
  329. result.subscribe(observer)
  330. then:
  331. 0 * instrumenter._
  332. when:
  333. source.onSuccess(response)
  334. then:
  335. 1 * instrumenter.end(context, request, response, null)
  336. observer.assertComplete()
  337. }
  338. def "ends span when errored"() {
  339. given:
  340. def exception = new IllegalStateException()
  341. def source = SingleSubject.create()
  342. def observer = new TestObserver()
  343. when:
  344. def result = (Single<?>) underTest.end(instrumenter, context, request, source, String)
  345. result.subscribe(observer)
  346. then:
  347. 0 * instrumenter._
  348. when:
  349. source.onError(exception)
  350. then:
  351. 1 * instrumenter.end(context, request, null, exception)
  352. observer.assertError(exception)
  353. }
  354. def "ends span when cancelled"() {
  355. given:
  356. def source = SingleSubject.create()
  357. def observer = new TestObserver()
  358. def context = span.storeInContext(Context.root())
  359. when:
  360. def result = (Single<?>) underTest.end(instrumenter, context, request, source, String)
  361. result.subscribe(observer)
  362. then:
  363. 0 * instrumenter._
  364. when:
  365. observer.dispose()
  366. then:
  367. 1 * instrumenter.end(context, request, null, null)
  368. 0 * span.setAttribute(_)
  369. }
  370. def "ends span when cancelled and capturing experimental span attributes"() {
  371. given:
  372. def source = SingleSubject.create()
  373. def observer = new TestObserver()
  374. def context = span.storeInContext(Context.root())
  375. when:
  376. def result = (Single<?>) underTestWithExperimentalAttributes.end(instrumenter, context, request, source, String)
  377. result.subscribe(observer)
  378. then:
  379. 0 * instrumenter._
  380. 0 * span._
  381. when:
  382. observer.dispose()
  383. then:
  384. 1 * instrumenter.end(context, request, null, null)
  385. 1 * span.setAttribute({ it.getKey() == "rxjava.canceled" }, true)
  386. }
  387. def "ends span once for multiple subscribers"() {
  388. given:
  389. def source = SingleSubject.create()
  390. def observer1 = new TestObserver()
  391. def observer2 = new TestObserver()
  392. def observer3 = new TestObserver()
  393. when:
  394. def result = (Single<?>) underTest.end(instrumenter, context, request, source, String)
  395. result.subscribe(observer1)
  396. result.subscribe(observer2)
  397. result.subscribe(observer3)
  398. then:
  399. 0 * instrumenter._
  400. when:
  401. source.onSuccess(response)
  402. then:
  403. 1 * instrumenter.end(context, request, response, null)
  404. observer1.assertValue(response)
  405. observer1.assertComplete()
  406. observer2.assertValue(response)
  407. observer2.assertComplete()
  408. observer3.assertValue(response)
  409. observer3.assertComplete()
  410. }
  411. }
  412. static class ObservableTest extends RxJava3AsyncOperationEndStrategyTest {
  413. def "is supported"() {
  414. expect:
  415. underTest.supports(Observable)
  416. }
  417. def "ends span on already completed"() {
  418. given:
  419. def observer = new TestObserver()
  420. when:
  421. def result = (Observable<?>) underTest.end(instrumenter, context, request, Observable.just(response), String)
  422. result.subscribe(observer)
  423. then:
  424. 1 * instrumenter.end(context, request, null, null)
  425. observer.assertComplete()
  426. }
  427. def "ends span on already errored"() {
  428. given:
  429. def exception = new IllegalStateException()
  430. def observer = new TestObserver()
  431. when:
  432. def result = (Observable<?>) underTest.end(instrumenter, context, request, Observable.error(exception), String)
  433. result.subscribe(observer)
  434. then:
  435. 1 * instrumenter.end(context, request, null, exception)
  436. observer.assertError(exception)
  437. }
  438. def "ends span when completed"() {
  439. given:
  440. def source = UnicastSubject.create()
  441. def observer = new TestObserver()
  442. when:
  443. def result = (Observable<?>) underTest.end(instrumenter, context, request, source, String)
  444. result.subscribe(observer)
  445. then:
  446. 0 * instrumenter._
  447. when:
  448. source.onComplete()
  449. then:
  450. 1 * instrumenter.end(context, request, null, null)
  451. observer.assertComplete()
  452. }
  453. def "ends span when errored"() {
  454. given:
  455. def exception = new IllegalStateException()
  456. def source = UnicastSubject.create()
  457. def observer = new TestObserver()
  458. when:
  459. def result = (Observable<?>) underTest.end(instrumenter, context, request, source, String)
  460. result.subscribe(observer)
  461. then:
  462. 0 * instrumenter._
  463. when:
  464. source.onError(exception)
  465. then:
  466. 1 * instrumenter.end(context, request, null, exception)
  467. observer.assertError(exception)
  468. }
  469. def "ends span when cancelled"() {
  470. given:
  471. def source = UnicastSubject.create()
  472. def observer = new TestObserver()
  473. def context = span.storeInContext(Context.root())
  474. when:
  475. def result = (Observable<?>) underTest.end(instrumenter, context, request, source, String)
  476. result.subscribe(observer)
  477. then:
  478. 0 * instrumenter._
  479. when:
  480. observer.dispose()
  481. then:
  482. 1 * instrumenter.end(context, request, null, null)
  483. 0 * span.setAttribute(_)
  484. }
  485. def "ends span when cancelled and capturing experimental span attributes"() {
  486. given:
  487. def source = UnicastSubject.create()
  488. def observer = new TestObserver()
  489. def context = span.storeInContext(Context.root())
  490. when:
  491. def result = (Observable<?>) underTestWithExperimentalAttributes.end(instrumenter, context, request, source, String)
  492. result.subscribe(observer)
  493. then:
  494. 0 * instrumenter._
  495. 0 * span._
  496. when:
  497. observer.dispose()
  498. then:
  499. 1 * instrumenter.end(context, request, null, null)
  500. 1 * span.setAttribute({ it.getKey() == "rxjava.canceled" }, true)
  501. }
  502. def "ends span once for multiple subscribers"() {
  503. given:
  504. def source = ReplaySubject.create()
  505. def observer1 = new TestObserver()
  506. def observer2 = new TestObserver()
  507. def observer3 = new TestObserver()
  508. when:
  509. def result = (Observable<?>) underTest.end(instrumenter, context, request, source, String)
  510. result.subscribe(observer1)
  511. result.subscribe(observer2)
  512. result.subscribe(observer3)
  513. then:
  514. 0 * instrumenter._
  515. when:
  516. source.onComplete()
  517. then:
  518. 1 * instrumenter.end(context, request, null, null)
  519. observer1.assertComplete()
  520. observer2.assertComplete()
  521. observer3.assertComplete()
  522. }
  523. }
  524. static class FlowableTest extends RxJava3AsyncOperationEndStrategyTest {
  525. def "is supported"() {
  526. expect:
  527. underTest.supports(Flowable)
  528. }
  529. def "ends span on already completed"() {
  530. given:
  531. def observer = new TestSubscriber()
  532. when:
  533. def result = (Flowable<?>) underTest.end(instrumenter, context, request, Flowable.just(response), String)
  534. result.subscribe(observer)
  535. then:
  536. 1 * instrumenter.end(context, request, null, null)
  537. observer.assertComplete()
  538. }
  539. def "ends span on already errored"() {
  540. given:
  541. def exception = new IllegalStateException()
  542. def observer = new TestSubscriber()
  543. when:
  544. def result = (Flowable<?>) underTest.end(instrumenter, context, request, Flowable.error(exception), String)
  545. result.subscribe(observer)
  546. then:
  547. 1 * instrumenter.end(context, request, null, exception)
  548. observer.assertError(exception)
  549. }
  550. def "ends span when completed"() {
  551. given:
  552. def source = UnicastProcessor.create()
  553. def observer = new TestSubscriber()
  554. when:
  555. def result = (Flowable<?>) underTest.end(instrumenter, context, request, source, String)
  556. result.subscribe(observer)
  557. then:
  558. 0 * instrumenter._
  559. when:
  560. source.onComplete()
  561. then:
  562. 1 * instrumenter.end(context, request, null, null)
  563. observer.assertComplete()
  564. }
  565. def "ends span when errored"() {
  566. given:
  567. def exception = new IllegalStateException()
  568. def source = UnicastProcessor.create()
  569. def observer = new TestSubscriber()
  570. when:
  571. def result = (Flowable<?>) underTest.end(instrumenter, context, request, source, String)
  572. result.subscribe(observer)
  573. then:
  574. 0 * instrumenter._
  575. when:
  576. source.onError(exception)
  577. then:
  578. 1 * instrumenter.end(context, request, null, exception)
  579. observer.assertError(exception)
  580. }
  581. def "ends span when cancelled"() {
  582. given:
  583. def source = UnicastProcessor.create()
  584. def observer = new TestSubscriber()
  585. def context = span.storeInContext(Context.root())
  586. when:
  587. def result = (Flowable<?>) underTest.end(instrumenter, context, request, source, String)
  588. result.subscribe(observer)
  589. then:
  590. 0 * instrumenter._
  591. when:
  592. observer.cancel()
  593. then:
  594. 1 * instrumenter.end(context, request, null, null)
  595. 0 * span.setAttribute(_)
  596. }
  597. def "ends span when cancelled and capturing experimental span attributes"() {
  598. given:
  599. def source = UnicastProcessor.create()
  600. def observer = new TestSubscriber()
  601. def context = span.storeInContext(Context.root())
  602. when:
  603. def result = (Flowable<?>) underTestWithExperimentalAttributes.end(instrumenter, context, request, source, String)
  604. result.subscribe(observer)
  605. then:
  606. 0 * instrumenter._
  607. 0 * span._
  608. when:
  609. observer.cancel()
  610. then:
  611. 1 * instrumenter.end(context, request, null, null)
  612. 1 * span.setAttribute({ it.getKey() == "rxjava.canceled" }, true)
  613. }
  614. def "ends span once for multiple subscribers"() {
  615. given:
  616. def source = ReplayProcessor.create()
  617. def observer1 = new TestSubscriber()
  618. def observer2 = new TestSubscriber()
  619. def observer3 = new TestSubscriber()
  620. when:
  621. def result = (Flowable<?>) underTest.end(instrumenter, context, request, source, String)
  622. result.subscribe(observer1)
  623. result.subscribe(observer2)
  624. result.subscribe(observer3)
  625. then:
  626. 0 * instrumenter._
  627. when:
  628. source.onComplete()
  629. then:
  630. 1 * instrumenter.end(context, request, null, null)
  631. observer1.assertComplete()
  632. observer2.assertComplete()
  633. observer3.assertComplete()
  634. }
  635. }
  636. static class ParallelFlowableTest extends RxJava3AsyncOperationEndStrategyTest {
  637. def "is supported"() {
  638. expect:
  639. underTest.supports(ParallelFlowable)
  640. }
  641. def "ends span on already completed"() {
  642. given:
  643. def observer = new TestSubscriber()
  644. when:
  645. def result = (ParallelFlowable<?>) underTest.end(instrumenter, context, request, Flowable.just(response).parallel(), String)
  646. result.sequential().subscribe(observer)
  647. then:
  648. observer.assertComplete()
  649. 1 * instrumenter.end(context, request, null, null)
  650. }
  651. def "ends span on already errored"() {
  652. given:
  653. def exception = new IllegalStateException()
  654. def observer = new TestSubscriber()
  655. when:
  656. def result = (ParallelFlowable<?>) underTest.end(instrumenter, context, request, Flowable.error(exception).parallel(), String)
  657. result.sequential().subscribe(observer)
  658. then:
  659. observer.assertError(exception)
  660. 1 * instrumenter.end(context, request, null, exception)
  661. }
  662. def "ends span when completed"() {
  663. given:
  664. def source = UnicastProcessor.create()
  665. def observer = new TestSubscriber()
  666. when:
  667. def result = (ParallelFlowable<?>) underTest.end(instrumenter, context, request, source.parallel(), String)
  668. result.sequential().subscribe(observer)
  669. then:
  670. 0 * instrumenter._
  671. when:
  672. source.onComplete()
  673. then:
  674. observer.assertComplete()
  675. 1 * instrumenter.end(context, request, null, null)
  676. }
  677. def "ends span when errored"() {
  678. given:
  679. def exception = new IllegalStateException()
  680. def source = UnicastProcessor.create()
  681. def observer = new TestSubscriber()
  682. when:
  683. def result = (ParallelFlowable<?>) underTest.end(instrumenter, context, request, source.parallel(), String)
  684. result.sequential().subscribe(observer)
  685. then:
  686. 0 * instrumenter._
  687. when:
  688. source.onError(exception)
  689. then:
  690. observer.assertError(exception)
  691. 1 * instrumenter.end(context, request, null, exception)
  692. }
  693. def "ends span when cancelled"() {
  694. given:
  695. def source = UnicastProcessor.create()
  696. def observer = new TestSubscriber()
  697. def context = span.storeInContext(Context.root())
  698. when:
  699. def result = (ParallelFlowable<?>) underTest.end(instrumenter, context, request, source.parallel(), String)
  700. result.sequential().subscribe(observer)
  701. then:
  702. 0 * instrumenter._
  703. when:
  704. observer.cancel()
  705. then:
  706. 1 * instrumenter.end(context, request, null, null)
  707. 0 * span.setAttribute(_)
  708. }
  709. def "ends span when cancelled and capturing experimental span attributes"() {
  710. given:
  711. def source = UnicastProcessor.create()
  712. def observer = new TestSubscriber()
  713. def context = span.storeInContext(Context.root())
  714. when:
  715. def result = (ParallelFlowable<?>) underTestWithExperimentalAttributes.end(instrumenter, context, request, source.parallel(), String)
  716. result.sequential().subscribe(observer)
  717. then:
  718. 0 * instrumenter._
  719. 0 * span._
  720. when:
  721. observer.cancel()
  722. then:
  723. 1 * instrumenter.end(context, request, null, null)
  724. 1 * span.setAttribute({ it.getKey() == "rxjava.canceled" }, true)
  725. }
  726. }
  727. static class PublisherTest extends RxJava3AsyncOperationEndStrategyTest {
  728. def "is supported"() {
  729. expect:
  730. underTest.supports(Publisher)
  731. }
  732. def "ends span when completed"() {
  733. given:
  734. def source = new CustomPublisher()
  735. def observer = new TestSubscriber()
  736. when:
  737. def result = (Flowable<?>) underTest.end(instrumenter, context, request, source, String)
  738. result.subscribe(observer)
  739. then:
  740. 0 * instrumenter._
  741. when:
  742. source.onComplete()
  743. then:
  744. 1 * instrumenter.end(context, request, null, null)
  745. observer.assertComplete()
  746. }
  747. def "ends span when errored"() {
  748. given:
  749. def exception = new IllegalStateException()
  750. def source = new CustomPublisher()
  751. def observer = new TestSubscriber()
  752. when:
  753. def result = (Flowable<?>) underTest.end(instrumenter, context, request, source, String)
  754. result.subscribe(observer)
  755. then:
  756. 0 * instrumenter._
  757. when:
  758. source.onError(exception)
  759. then:
  760. 1 * instrumenter.end(context, request, null, exception)
  761. observer.assertError(exception)
  762. }
  763. def "ends span when cancelled"() {
  764. given:
  765. def source = new CustomPublisher()
  766. def observer = new TestSubscriber()
  767. def context = span.storeInContext(Context.root())
  768. when:
  769. def result = (Flowable<?>) underTest.end(instrumenter, context, request, source, String)
  770. result.subscribe(observer)
  771. then:
  772. 0 * instrumenter._
  773. when:
  774. observer.cancel()
  775. then:
  776. 1 * instrumenter.end(context, request, null, null)
  777. 0 * span.setAttribute(_)
  778. }
  779. def "ends span when cancelled and capturing experimental span attributes"() {
  780. given:
  781. def source = new CustomPublisher()
  782. def observer = new TestSubscriber()
  783. def context = span.storeInContext(Context.root())
  784. when:
  785. def result = (Flowable<?>) underTestWithExperimentalAttributes.end(instrumenter, context, request, source, String)
  786. result.subscribe(observer)
  787. then:
  788. 0 * instrumenter._
  789. 0 * span._
  790. when:
  791. observer.cancel()
  792. then:
  793. 1 * instrumenter.end(context, request, null, null)
  794. 1 * span.setAttribute({ it.getKey() == "rxjava.canceled" }, true)
  795. }
  796. }
  797. static class CustomPublisher implements Publisher<String>, Subscription {
  798. Subscriber<? super String> subscriber
  799. @Override
  800. void subscribe(Subscriber<? super String> subscriber) {
  801. this.subscriber = subscriber
  802. subscriber.onSubscribe(this)
  803. }
  804. def onComplete() {
  805. this.subscriber.onComplete()
  806. }
  807. def onError(Throwable exception) {
  808. this.subscriber.onError(exception)
  809. }
  810. @Override
  811. void request(long l) {}
  812. @Override
  813. void cancel() {}
  814. }
  815. }