node.go 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113
  1. // Copyright The OpenTelemetry Authors
  2. // SPDX-License-Identifier: Apache-2.0
  3. package cluster // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/aerospikereceiver/cluster"
  4. import (
  5. "fmt"
  6. "strings"
  7. "time"
  8. as "github.com/aerospike/aerospike-client-go/v6"
  9. "github.com/aerospike/aerospike-client-go/v6/types"
  10. )
  11. // asconn is used to mock aerospike connections
  12. type asconn interface {
  13. RequestInfo(...string) (map[string]string, as.Error)
  14. Login(*as.ClientPolicy) as.Error
  15. Close()
  16. SetTimeout(time.Time, time.Duration) as.Error
  17. }
  18. type Node interface {
  19. RequestInfo(*as.InfoPolicy, ...string) (map[string]string, as.Error)
  20. GetName() string
  21. Close()
  22. }
  23. // connNode is for single node scraping
  24. type connNode struct {
  25. conn asconn
  26. policy *as.ClientPolicy
  27. name string
  28. }
  29. type connFactoryFunc func(*as.ClientPolicy, *as.Host) (asconn, as.Error)
  30. func newASConn(policy *as.ClientPolicy, host *as.Host) (asconn, as.Error) {
  31. return as.NewConnection(policy, host)
  32. }
  33. func newConnNode(policy *as.ClientPolicy, host *as.Host, authEnabled bool) (Node, error) {
  34. return _newConnNode(policy, host, authEnabled, newASConn)
  35. }
  36. func _newConnNode(policy *as.ClientPolicy, host *as.Host, authEnabled bool, connF connFactoryFunc) (Node, error) {
  37. conn, err := connF(policy, host)
  38. if err != nil {
  39. return nil, err
  40. }
  41. var deadline time.Time
  42. // Set deadline to 0 (inf) so we can always reuse this connection
  43. if err = conn.SetTimeout(deadline, policy.Timeout); err != nil {
  44. return nil, fmt.Errorf("failed to set timeout: %w", err)
  45. }
  46. if authEnabled {
  47. if err = conn.Login(policy); err != nil {
  48. return nil, err
  49. }
  50. }
  51. m, err := conn.RequestInfo("node")
  52. if err != nil {
  53. return nil, err
  54. }
  55. for k := range m {
  56. if strings.HasPrefix(strings.ToUpper(k), "ERROR:") {
  57. return nil, as.ErrNotAuthenticated
  58. }
  59. }
  60. name := m["node"]
  61. res := connNode{
  62. conn: conn,
  63. policy: policy,
  64. name: name,
  65. }
  66. return &res, nil
  67. }
  68. func (n *connNode) RequestInfo(_ *as.InfoPolicy, commands ...string) (map[string]string, as.Error) {
  69. res, err := n.conn.RequestInfo(commands...)
  70. // Try to login and get a new session
  71. if err != nil && err.Matches(types.EXPIRED_SESSION) {
  72. if loginErr := n.conn.Login(n.policy); loginErr != nil {
  73. return nil, loginErr
  74. }
  75. }
  76. if err != nil {
  77. return nil, err
  78. }
  79. for k := range res {
  80. if strings.HasPrefix(strings.ToUpper(k), "ERROR:") {
  81. return nil, as.ErrNotAuthenticated
  82. }
  83. }
  84. return res, nil
  85. }
  86. func (n *connNode) GetName() string {
  87. return n.name
  88. }
  89. func (n *connNode) Close() {
  90. n.conn.Close()
  91. }