123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387 |
- // Copyright The OpenTelemetry Authors
- // SPDX-License-Identifier: Apache-2.0
- //go:build integration
- // +build integration
- package aerospikereceiver
- import (
- "context"
- "errors"
- "fmt"
- "os"
- "path/filepath"
- "testing"
- "time"
- as "github.com/aerospike/aerospike-client-go/v6"
- "github.com/docker/go-connections/nat"
- "github.com/testcontainers/testcontainers-go"
- "github.com/testcontainers/testcontainers-go/wait"
- "go.opentelemetry.io/collector/component"
- "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/scraperinttest"
- "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest/pmetrictest"
- )
- var aerospikePort = "3000"
- func TestIntegration(t *testing.T) {
- t.Run("6.2", integrationTest(func(*Config) {}))
- t.Run("6.2-cluster", integrationTest(func(cfg *Config) {
- cfg.CollectClusterMetrics = true
- }))
- }
- func integrationTest(cfgMod func(*Config)) func(*testing.T) {
- return scraperinttest.NewIntegrationTest(
- NewFactory(),
- scraperinttest.WithContainerRequest(
- testcontainers.ContainerRequest{
- Image: "aerospike:ce-6.2.0.2",
- ExposedPorts: []string{aerospikePort},
- WaitingFor: waitStrategy{},
- LifecycleHooks: []testcontainers.ContainerLifecycleHooks{{
- PostStarts: []testcontainers.ContainerHook{
- func(ctx context.Context, container testcontainers.Container) error {
- host, err := aerospikeHost(ctx, container)
- if err != nil {
- return err
- }
- return populateMetrics(host)
- },
- },
- }},
- }),
- scraperinttest.WithCustomConfig(
- func(t *testing.T, cfg component.Config, ci *scraperinttest.ContainerInfo) {
- rCfg := cfg.(*Config)
- rCfg.Endpoint = fmt.Sprintf("%s:%s", ci.Host(t), ci.MappedPort(t, aerospikePort))
- rCfg.ScraperControllerSettings.CollectionInterval = 100 * time.Millisecond
- cfgMod(rCfg)
- }),
- scraperinttest.WithCompareOptions(
- pmetrictest.IgnoreMetricValues(),
- pmetrictest.IgnoreResourceAttributeValue("aerospike.node.name"),
- pmetrictest.IgnoreMetricDataPointsOrder(),
- pmetrictest.IgnoreStartTimestamp(),
- pmetrictest.IgnoreTimestamp(),
- ),
- ).Run
- }
- type waitStrategy struct{}
- func (ws waitStrategy) WaitUntilReady(ctx context.Context, st wait.StrategyTarget) error {
- if err := wait.ForListeningPort(nat.Port(aerospikePort)).
- WithStartupTimeout(time.Minute).
- WaitUntilReady(ctx, st); err != nil {
- return err
- }
- host, err := aerospikeHost(ctx, st)
- if err != nil {
- return err
- }
- var clientErr error
- for {
- select {
- case <-ctx.Done():
- return clientErr
- default:
- _, clientErr = as.NewClientWithPolicyAndHost(clientPolicy(), host)
- if clientErr == nil {
- return nil
- }
- }
- }
- }
- func aerospikeHost(ctx context.Context, st wait.StrategyTarget) (*as.Host, error) {
- host, err := st.Host(ctx)
- if err != nil {
- return nil, err
- }
- port, err := st.MappedPort(ctx, nat.Port(aerospikePort))
- if err != nil {
- return nil, err
- }
- return as.NewHost(host, port.Int()), nil
- }
- type doneCheckable interface {
- IsDone() (bool, as.Error)
- }
- type recordsCheckable interface {
- Results() <-chan *as.Result
- }
- type aeroDoneFunc func() (doneCheckable, as.Error)
- type aeroRecordsFunc func() (recordsCheckable, as.Error)
- func doneWaitAndCheck(f aeroDoneFunc) error {
- chk, err := f()
- if err != nil {
- return err
- }
- for res := false; !res; res, err = chk.IsDone() {
- if err != nil {
- return err
- }
- time.Sleep(time.Second / 3)
- }
- return nil
- }
- func recordsWaitAndCheck(f aeroRecordsFunc) error {
- chk, err := f()
- if err != nil {
- return err
- }
- // consume all records
- chk.Results()
- return nil
- }
- func clientPolicy() *as.ClientPolicy {
- clientPolicy := as.NewClientPolicy()
- clientPolicy.Timeout = 60 * time.Second
- // minconns is used to populate the client connections metric
- clientPolicy.MinConnectionsPerNode = 50
- return clientPolicy
- }
- func populateMetrics(host *as.Host) error {
- errSetFilter := errors.New("failed to set filter")
- errCreateSindex := errors.New("failed to create sindex")
- errRunningCreateSindex := errors.New("failed running create index")
- c, err := as.NewClientWithPolicyAndHost(clientPolicy(), host)
- if err != nil {
- return err
- }
- ns := "test"
- set := "integration"
- pibin := "bin1"
- sibin := "bin2"
- // write 100 records to get some memory usage
- for i := 0; i < 100; i++ {
- var key *as.Key
- key, err = as.NewKey(ns, set, i)
- if err != nil {
- return errors.New("failed to create key")
- }
- err = c.Put(nil, key, as.BinMap{pibin: i, sibin: i})
- if err != nil {
- return errors.New("failed to write record")
- }
- }
- // register UDFs for aggregation queries
- cwd, wderr := os.Getwd()
- if wderr != nil {
- return errors.New("can't get working directory")
- }
- udfFile := "udf"
- udfFunc := "sum_single_bin"
- luaPath := filepath.Join(cwd, "testdata", "integration/")
- as.SetLuaPath(luaPath)
- task, err := c.RegisterUDFFromFile(nil, filepath.Join(luaPath, udfFile+".lua"), udfFile+".lua", as.LUA)
- if err != nil {
- return errors.New("failed registering udf file")
- }
- if nil != <-task.OnComplete() {
- return errors.New("failed while registering udf file")
- }
- queryPolicy := as.NewQueryPolicy()
- queryPolicyShort := as.NewQueryPolicy()
- queryPolicyShort.ShortQuery = true
- var writePolicy *as.WritePolicy
- // *** Primary Index Queries *** //
- // perform a basic primary index query
- s1 := as.NewStatement(ns, set)
- if err := recordsWaitAndCheck(func() (recordsCheckable, as.Error) {
- return c.Query(queryPolicy, s1)
- }); err != nil {
- return err
- }
- // aggregation query on primary index
- s2 := as.NewStatement(ns, set)
- if err := recordsWaitAndCheck(func() (recordsCheckable, as.Error) {
- return c.QueryAggregate(queryPolicy, s2, "/"+udfFile, udfFunc, as.StringValue(pibin))
- }); err != nil {
- return err
- }
- // background udf query on primary index
- s3 := as.NewStatement(ns, set)
- if err := doneWaitAndCheck(func() (doneCheckable, as.Error) {
- return c.ExecuteUDF(queryPolicy, s3, "/"+udfFile, udfFunc, as.StringValue(pibin))
- }); err != nil {
- return err
- }
- // ops query on primary index
- s4 := as.NewStatement(ns, set)
- wbin := as.NewBin(pibin, 200)
- ops := as.PutOp(wbin)
- if err := doneWaitAndCheck(func() (doneCheckable, as.Error) {
- return c.QueryExecute(queryPolicy, writePolicy, s4, ops)
- }); err != nil {
- return err
- }
- // perform a basic short primary index query
- s5 := as.NewStatement(ns, set)
- if err := recordsWaitAndCheck(func() (recordsCheckable, as.Error) {
- return c.Query(queryPolicyShort, s5)
- }); err != nil {
- return err
- }
- // *** Secondary Index Queries *** //
- // create secondary index for SI queries
- itask, err := c.CreateIndex(writePolicy, ns, set, "sitest", "bin2", as.NUMERIC)
- if err != nil {
- return errCreateSindex
- }
- if err = <-itask.OnComplete(); err != nil {
- return errRunningCreateSindex
- }
- // SI filter
- filt := as.NewRangeFilter(sibin, 0, 100)
- // perform a basic secondary index query
- s6 := as.NewStatement(ns, set)
- if sferr := s6.SetFilter(filt); sferr != nil {
- return errSetFilter
- }
- if err := recordsWaitAndCheck(func() (recordsCheckable, as.Error) {
- return c.Query(queryPolicy, s6)
- }); err != nil {
- return err
- }
- // aggregation query on secondary index
- s7 := as.NewStatement(ns, set)
- if sferr := s7.SetFilter(filt); sferr != nil {
- return errSetFilter
- }
- if err := recordsWaitAndCheck(func() (recordsCheckable, as.Error) {
- return c.QueryAggregate(queryPolicy, s7, "/"+udfFile, udfFunc, as.StringValue(sibin))
- }); err != nil {
- return err
- }
- // background udf query on secondary index
- s8 := as.NewStatement(ns, set)
- if sferr := s8.SetFilter(filt); sferr != nil {
- return errSetFilter
- }
- if err := doneWaitAndCheck(func() (doneCheckable, as.Error) {
- return c.ExecuteUDF(queryPolicy, s8, "/"+udfFile, udfFunc, as.StringValue(sibin))
- }); err != nil {
- return err
- }
- // ops query on secondary index
- s9 := as.NewStatement(ns, set)
- if sferr := s9.SetFilter(filt); sferr != nil {
- return errSetFilter
- }
- siwbin := as.NewBin("bin4", 400)
- siops := as.PutOp(siwbin)
- if err := doneWaitAndCheck(func() (doneCheckable, as.Error) {
- return c.QueryExecute(queryPolicy, writePolicy, s9, siops)
- }); err != nil {
- return err
- }
- // perform a basic short secondary index query
- s10 := as.NewStatement(ns, set)
- if sferr := s10.SetFilter(filt); sferr != nil {
- return errSetFilter
- }
- if err := recordsWaitAndCheck(func() (recordsCheckable, as.Error) {
- return c.Query(queryPolicyShort, s10)
- }); err != nil {
- return err
- }
- // *** GeoJSON *** //
- bins := []as.BinMap{
- {
- "name": "Bike Shop",
- "demand": 17923,
- "capacity": 17,
- "coord": as.GeoJSONValue(`{"type" : "Point", "coordinates": [13.009318762,80.003157854]}`),
- },
- {
- "name": "Residential Block",
- "demand": 2429,
- "capacity": 2974,
- "coord": as.GeoJSONValue(`{"type" : "Point", "coordinates": [13.00961276, 80.003422154]}`),
- },
- {
- "name": "Restaurant",
- "demand": 49589,
- "capacity": 4231,
- "coord": as.GeoJSONValue(`{"type" : "Point", "coordinates": [13.009318762,80.003157854]}`),
- },
- {
- "name": "Cafe",
- "demand": 247859,
- "capacity": 26,
- "coord": as.GeoJSONValue(`{"type" : "Point", "coordinates": [13.00961276, 80.003422154]}`),
- },
- {
- "name": "Park",
- "demand": 247859,
- "capacity": 26,
- "coord": as.GeoJSONValue(`{"type" : "AeroCircle", "coordinates": [[0.0, 10.0], 10]}`),
- },
- }
- geoSet := "geoset"
- for i, b := range bins {
- key, _ := as.NewKey(ns, geoSet, i)
- err = c.Put(nil, key, b)
- if err != nil {
- return errors.New("failed to write geojson record")
- }
- }
- // create secondary index for geo queries
- itask, err = c.CreateIndex(writePolicy, ns, geoSet, "testset_geo_index", "coord", as.GEO2DSPHERE)
- if err != nil {
- return errCreateSindex
- }
- if err := <-itask.OnComplete(); err != nil {
- return errRunningCreateSindex
- }
- // run geoJSON query
- geoStm1 := as.NewStatement(ns, geoSet)
- geoFilt1 := as.NewGeoWithinRadiusFilter("coord", float64(13.009318762), float64(80.003157854), float64(50000))
- if sferr := geoStm1.SetFilter(geoFilt1); sferr != nil {
- return errSetFilter
- }
- return recordsWaitAndCheck(func() (recordsCheckable, as.Error) {
- return c.Query(queryPolicy, geoStm1)
- })
- }
|