integration_test.go 10 KB


  1. // Copyright The OpenTelemetry Authors
  2. // SPDX-License-Identifier: Apache-2.0
  3. //go:build integration
  4. // +build integration
  5. package aerospikereceiver
  6. import (
  7. "context"
  8. "errors"
  9. "fmt"
  10. "os"
  11. "path/filepath"
  12. "testing"
  13. "time"
  14. as "github.com/aerospike/aerospike-client-go/v6"
  15. "github.com/docker/go-connections/nat"
  16. "github.com/testcontainers/testcontainers-go"
  17. "github.com/testcontainers/testcontainers-go/wait"
  18. "go.opentelemetry.io/collector/component"
  19. "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/scraperinttest"
  20. "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest/pmetrictest"
  21. )
  22. var aerospikePort = "3000"
  23. func TestIntegration(t *testing.T) {
  24. t.Run("6.2", integrationTest(func(*Config) {}))
  25. t.Run("6.2-cluster", integrationTest(func(cfg *Config) {
  26. cfg.CollectClusterMetrics = true
  27. }))
  28. }
  29. func integrationTest(cfgMod func(*Config)) func(*testing.T) {
  30. return scraperinttest.NewIntegrationTest(
  31. NewFactory(),
  32. scraperinttest.WithContainerRequest(
  33. testcontainers.ContainerRequest{
  34. Image: "aerospike:ce-6.2.0.2",
  35. ExposedPorts: []string{aerospikePort},
  36. WaitingFor: waitStrategy{},
  37. LifecycleHooks: []testcontainers.ContainerLifecycleHooks{{
  38. PostStarts: []testcontainers.ContainerHook{
  39. func(ctx context.Context, container testcontainers.Container) error {
  40. host, err := aerospikeHost(ctx, container)
  41. if err != nil {
  42. return err
  43. }
  44. return populateMetrics(host)
  45. },
  46. },
  47. }},
  48. }),
  49. scraperinttest.WithCustomConfig(
  50. func(t *testing.T, cfg component.Config, ci *scraperinttest.ContainerInfo) {
  51. rCfg := cfg.(*Config)
  52. rCfg.Endpoint = fmt.Sprintf("%s:%s", ci.Host(t), ci.MappedPort(t, aerospikePort))
  53. rCfg.ScraperControllerSettings.CollectionInterval = 100 * time.Millisecond
  54. cfgMod(rCfg)
  55. }),
  56. scraperinttest.WithCompareOptions(
  57. pmetrictest.IgnoreMetricValues(),
  58. pmetrictest.IgnoreResourceAttributeValue("aerospike.node.name"),
  59. pmetrictest.IgnoreMetricDataPointsOrder(),
  60. pmetrictest.IgnoreStartTimestamp(),
  61. pmetrictest.IgnoreTimestamp(),
  62. ),
  63. ).Run
  64. }
  65. type waitStrategy struct{}
  66. func (ws waitStrategy) WaitUntilReady(ctx context.Context, st wait.StrategyTarget) error {
  67. if err := wait.ForListeningPort(nat.Port(aerospikePort)).
  68. WithStartupTimeout(time.Minute).
  69. WaitUntilReady(ctx, st); err != nil {
  70. return err
  71. }
  72. host, err := aerospikeHost(ctx, st)
  73. if err != nil {
  74. return err
  75. }
  76. var clientErr error
  77. for {
  78. select {
  79. case <-ctx.Done():
  80. return clientErr
  81. default:
  82. _, clientErr = as.NewClientWithPolicyAndHost(clientPolicy(), host)
  83. if clientErr == nil {
  84. return nil
  85. }
  86. }
  87. }
  88. }
  89. func aerospikeHost(ctx context.Context, st wait.StrategyTarget) (*as.Host, error) {
  90. host, err := st.Host(ctx)
  91. if err != nil {
  92. return nil, err
  93. }
  94. port, err := st.MappedPort(ctx, nat.Port(aerospikePort))
  95. if err != nil {
  96. return nil, err
  97. }
  98. return as.NewHost(host, port.Int()), nil
  99. }
  100. type doneCheckable interface {
  101. IsDone() (bool, as.Error)
  102. }
  103. type recordsCheckable interface {
  104. Results() <-chan *as.Result
  105. }
  106. type aeroDoneFunc func() (doneCheckable, as.Error)
  107. type aeroRecordsFunc func() (recordsCheckable, as.Error)
  108. func doneWaitAndCheck(f aeroDoneFunc) error {
  109. chk, err := f()
  110. if err != nil {
  111. return err
  112. }
  113. for res := false; !res; res, err = chk.IsDone() {
  114. if err != nil {
  115. return err
  116. }
  117. time.Sleep(time.Second / 3)
  118. }
  119. return nil
  120. }
  121. func recordsWaitAndCheck(f aeroRecordsFunc) error {
  122. chk, err := f()
  123. if err != nil {
  124. return err
  125. }
  126. // consume all records
  127. chk.Results()
  128. return nil
  129. }
  130. func clientPolicy() *as.ClientPolicy {
  131. clientPolicy := as.NewClientPolicy()
  132. clientPolicy.Timeout = 60 * time.Second
  133. // minconns is used to populate the client connections metric
  134. clientPolicy.MinConnectionsPerNode = 50
  135. return clientPolicy
  136. }
  137. func populateMetrics(host *as.Host) error {
  138. errSetFilter := errors.New("failed to set filter")
  139. errCreateSindex := errors.New("failed to create sindex")
  140. errRunningCreateSindex := errors.New("failed running create index")
  141. c, err := as.NewClientWithPolicyAndHost(clientPolicy(), host)
  142. if err != nil {
  143. return err
  144. }
  145. ns := "test"
  146. set := "integration"
  147. pibin := "bin1"
  148. sibin := "bin2"
  149. // write 100 records to get some memory usage
  150. for i := 0; i < 100; i++ {
  151. var key *as.Key
  152. key, err = as.NewKey(ns, set, i)
  153. if err != nil {
  154. return errors.New("failed to create key")
  155. }
  156. err = c.Put(nil, key, as.BinMap{pibin: i, sibin: i})
  157. if err != nil {
  158. return errors.New("failed to write record")
  159. }
  160. }
  161. // register UDFs for aggregation queries
  162. cwd, wderr := os.Getwd()
  163. if wderr != nil {
  164. return errors.New("can't get working directory")
  165. }
  166. udfFile := "udf"
  167. udfFunc := "sum_single_bin"
  168. luaPath := filepath.Join(cwd, "testdata", "integration/")
  169. as.SetLuaPath(luaPath)
  170. task, err := c.RegisterUDFFromFile(nil, filepath.Join(luaPath, udfFile+".lua"), udfFile+".lua", as.LUA)
  171. if err != nil {
  172. return errors.New("failed registering udf file")
  173. }
  174. if nil != <-task.OnComplete() {
  175. return errors.New("failed while registering udf file")
  176. }
  177. queryPolicy := as.NewQueryPolicy()
  178. queryPolicyShort := as.NewQueryPolicy()
  179. queryPolicyShort.ShortQuery = true
  180. var writePolicy *as.WritePolicy
  181. // *** Primary Index Queries *** //
  182. // perform a basic primary index query
  183. s1 := as.NewStatement(ns, set)
  184. if err := recordsWaitAndCheck(func() (recordsCheckable, as.Error) {
  185. return c.Query(queryPolicy, s1)
  186. }); err != nil {
  187. return err
  188. }
  189. // aggregation query on primary index
  190. s2 := as.NewStatement(ns, set)
  191. if err := recordsWaitAndCheck(func() (recordsCheckable, as.Error) {
  192. return c.QueryAggregate(queryPolicy, s2, "/"+udfFile, udfFunc, as.StringValue(pibin))
  193. }); err != nil {
  194. return err
  195. }
  196. // background udf query on primary index
  197. s3 := as.NewStatement(ns, set)
  198. if err := doneWaitAndCheck(func() (doneCheckable, as.Error) {
  199. return c.ExecuteUDF(queryPolicy, s3, "/"+udfFile, udfFunc, as.StringValue(pibin))
  200. }); err != nil {
  201. return err
  202. }
  203. // ops query on primary index
  204. s4 := as.NewStatement(ns, set)
  205. wbin := as.NewBin(pibin, 200)
  206. ops := as.PutOp(wbin)
  207. if err := doneWaitAndCheck(func() (doneCheckable, as.Error) {
  208. return c.QueryExecute(queryPolicy, writePolicy, s4, ops)
  209. }); err != nil {
  210. return err
  211. }
  212. // perform a basic short primary index query
  213. s5 := as.NewStatement(ns, set)
  214. if err := recordsWaitAndCheck(func() (recordsCheckable, as.Error) {
  215. return c.Query(queryPolicyShort, s5)
  216. }); err != nil {
  217. return err
  218. }
  219. // *** Secondary Index Queries *** //
  220. // create secondary index for SI queries
  221. itask, err := c.CreateIndex(writePolicy, ns, set, "sitest", "bin2", as.NUMERIC)
  222. if err != nil {
  223. return errCreateSindex
  224. }
  225. if err = <-itask.OnComplete(); err != nil {
  226. return errRunningCreateSindex
  227. }
  228. // SI filter
  229. filt := as.NewRangeFilter(sibin, 0, 100)
  230. // perform a basic secondary index query
  231. s6 := as.NewStatement(ns, set)
  232. if sferr := s6.SetFilter(filt); sferr != nil {
  233. return errSetFilter
  234. }
  235. if err := recordsWaitAndCheck(func() (recordsCheckable, as.Error) {
  236. return c.Query(queryPolicy, s6)
  237. }); err != nil {
  238. return err
  239. }
  240. // aggregation query on secondary index
  241. s7 := as.NewStatement(ns, set)
  242. if sferr := s7.SetFilter(filt); sferr != nil {
  243. return errSetFilter
  244. }
  245. if err := recordsWaitAndCheck(func() (recordsCheckable, as.Error) {
  246. return c.QueryAggregate(queryPolicy, s7, "/"+udfFile, udfFunc, as.StringValue(sibin))
  247. }); err != nil {
  248. return err
  249. }
  250. // background udf query on secondary index
  251. s8 := as.NewStatement(ns, set)
  252. if sferr := s8.SetFilter(filt); sferr != nil {
  253. return errSetFilter
  254. }
  255. if err := doneWaitAndCheck(func() (doneCheckable, as.Error) {
  256. return c.ExecuteUDF(queryPolicy, s8, "/"+udfFile, udfFunc, as.StringValue(sibin))
  257. }); err != nil {
  258. return err
  259. }
  260. // ops query on secondary index
  261. s9 := as.NewStatement(ns, set)
  262. if sferr := s9.SetFilter(filt); sferr != nil {
  263. return errSetFilter
  264. }
  265. siwbin := as.NewBin("bin4", 400)
  266. siops := as.PutOp(siwbin)
  267. if err := doneWaitAndCheck(func() (doneCheckable, as.Error) {
  268. return c.QueryExecute(queryPolicy, writePolicy, s9, siops)
  269. }); err != nil {
  270. return err
  271. }
  272. // perform a basic short secondary index query
  273. s10 := as.NewStatement(ns, set)
  274. if sferr := s10.SetFilter(filt); sferr != nil {
  275. return errSetFilter
  276. }
  277. if err := recordsWaitAndCheck(func() (recordsCheckable, as.Error) {
  278. return c.Query(queryPolicyShort, s10)
  279. }); err != nil {
  280. return err
  281. }
  282. // *** GeoJSON *** //
  283. bins := []as.BinMap{
  284. {
  285. "name": "Bike Shop",
  286. "demand": 17923,
  287. "capacity": 17,
  288. "coord": as.GeoJSONValue(`{"type" : "Point", "coordinates": [13.009318762,80.003157854]}`),
  289. },
  290. {
  291. "name": "Residential Block",
  292. "demand": 2429,
  293. "capacity": 2974,
  294. "coord": as.GeoJSONValue(`{"type" : "Point", "coordinates": [13.00961276, 80.003422154]}`),
  295. },
  296. {
  297. "name": "Restaurant",
  298. "demand": 49589,
  299. "capacity": 4231,
  300. "coord": as.GeoJSONValue(`{"type" : "Point", "coordinates": [13.009318762,80.003157854]}`),
  301. },
  302. {
  303. "name": "Cafe",
  304. "demand": 247859,
  305. "capacity": 26,
  306. "coord": as.GeoJSONValue(`{"type" : "Point", "coordinates": [13.00961276, 80.003422154]}`),
  307. },
  308. {
  309. "name": "Park",
  310. "demand": 247859,
  311. "capacity": 26,
  312. "coord": as.GeoJSONValue(`{"type" : "AeroCircle", "coordinates": [[0.0, 10.0], 10]}`),
  313. },
  314. }
  315. geoSet := "geoset"
  316. for i, b := range bins {
  317. key, _ := as.NewKey(ns, geoSet, i)
  318. err = c.Put(nil, key, b)
  319. if err != nil {
  320. return errors.New("failed to write geojson record")
  321. }
  322. }
  323. // create secondary index for geo queries
  324. itask, err = c.CreateIndex(writePolicy, ns, geoSet, "testset_geo_index", "coord", as.GEO2DSPHERE)
  325. if err != nil {
  326. return errCreateSindex
  327. }
  328. if err := <-itask.OnComplete(); err != nil {
  329. return errRunningCreateSindex
  330. }
  331. // run geoJSON query
  332. geoStm1 := as.NewStatement(ns, geoSet)
  333. geoFilt1 := as.NewGeoWithinRadiusFilter("coord", float64(13.009318762), float64(80.003157854), float64(50000))
  334. if sferr := geoStm1.SetFilter(geoFilt1); sferr != nil {
  335. return errSetFilter
  336. }
  337. return recordsWaitAndCheck(func() (recordsCheckable, as.Error) {
  338. return c.Query(queryPolicy, geoStm1)
  339. })
  340. }