client.go 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309
  1. // Copyright The OpenTelemetry Authors
  2. // SPDX-License-Identifier: Apache-2.0
  3. package aerospikereceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/aerospikereceiver"
  4. import (
  5. "crypto/tls"
  6. "fmt"
  7. "strings"
  8. "sync"
  9. "time"
  10. as "github.com/aerospike/aerospike-client-go/v6"
  11. "go.uber.org/zap"
  12. "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/aerospikereceiver/cluster"
  13. )
  14. var defaultNodeInfoCommands = []string{
  15. "node",
  16. "statistics",
  17. }
  18. // nodeName: metricName: stats
  19. type clusterInfo = map[string]map[string]string
  20. // Aerospike is the interface that provides information about a given node
  21. type Aerospike interface {
  22. // NamespaceInfo gets information about a specific namespace
  23. NamespaceInfo() namespaceInfo
  24. // Info gets high-level information about the node/system.
  25. Info() clusterInfo
  26. // Close closes the connection to the Aerospike node
  27. Close()
  28. }
  29. type clientConfig struct {
  30. host *as.Host
  31. username string
  32. password string
  33. timeout time.Duration
  34. logger *zap.SugaredLogger
  35. collectClusterMetrics bool
  36. tls *tls.Config
  37. }
  38. type nodeGetter interface {
  39. GetNodes() []cluster.Node
  40. Close()
  41. }
  42. type defaultASClient struct {
  43. cluster nodeGetter
  44. policy *as.ClientPolicy // Timeout and authentication information
  45. logger *zap.SugaredLogger // logs malformed metrics in responses
  46. }
  47. type nodeGetterFactoryFunc func(cfg *clientConfig, policy *as.ClientPolicy, authEnabled bool) (nodeGetter, error)
  48. func nodeGetterFactory(cfg *clientConfig, policy *as.ClientPolicy, authEnabled bool) (nodeGetter, error) {
  49. hosts := []*as.Host{cfg.host}
  50. if cfg.collectClusterMetrics {
  51. cluster, err := cluster.NewCluster(policy, hosts)
  52. return cluster, err
  53. }
  54. cluster, err := cluster.NewSubsetCluster(
  55. policy,
  56. hosts,
  57. authEnabled,
  58. )
  59. return cluster, err
  60. }
  61. // newASClient creates a new defaultASClient connected to the given host and port
  62. // If collectClusterMetrics is true, the client will connect to and tend all nodes in the cluster
  63. // If username and password aren't blank, they're used to authenticate
  64. func newASClient(cfg *clientConfig, ngf nodeGetterFactoryFunc) (*defaultASClient, error) {
  65. authEnabled := cfg.username != "" && cfg.password != ""
  66. policy := as.NewClientPolicy()
  67. policy.Timeout = cfg.timeout
  68. if authEnabled {
  69. policy.User = cfg.username
  70. policy.Password = cfg.password
  71. }
  72. if cfg.tls != nil {
  73. // enable TLS
  74. policy.TlsConfig = cfg.tls
  75. }
  76. cluster, err := ngf(cfg, policy, authEnabled)
  77. if err != nil {
  78. return nil, err
  79. }
  80. return &defaultASClient{
  81. cluster: cluster,
  82. logger: cfg.logger,
  83. policy: policy,
  84. }, nil
  85. }
  86. // useNodeFunc maps a nodeFunc to all the client's nodes
  87. func (c *defaultASClient) useNodeFunc(nf nodeFunc) clusterInfo {
  88. var res clusterInfo
  89. nodes := c.cluster.GetNodes()
  90. policy := as.NewInfoPolicy()
  91. policy.Timeout = c.policy.Timeout
  92. res = mapNodeInfoFunc(
  93. nodes,
  94. nf,
  95. policy,
  96. c.logger,
  97. )
  98. return res
  99. }
  100. // metricName: stat
  101. // may be used as, commandName: stat
  102. type metricsMap = map[string]string
  103. // Info returns a clusterInfo map of node names to metricMaps
  104. // it uses the info commands defined in defaultNodeInfoCommands
  105. func (c *defaultASClient) Info() clusterInfo {
  106. res := clusterInfo{}
  107. // NOTE this discards the command names
  108. metricsToParse := c.useNodeFunc(allNodeInfo)
  109. for node, commands := range metricsToParse {
  110. res[node] = metricsMap{}
  111. for command, stats := range commands {
  112. ps := parseStats(command, stats, ";")
  113. mergeMetricsMap(res[node], ps, c.logger)
  114. }
  115. }
  116. return res
  117. }
  118. // nodeName: namespaceName: metricName: stats
  119. type namespaceInfo = map[string]map[string]map[string]string
  120. // NamespaceInfo returns a namespaceInfo map
  121. // the map contains the results of the "namespace/<name>" info command
  122. // for all nodes' namespaces
  123. func (c *defaultASClient) NamespaceInfo() namespaceInfo {
  124. res := namespaceInfo{}
  125. metricsToParse := c.useNodeFunc(allNamespaceInfo)
  126. for node, namespaces := range metricsToParse {
  127. res[node] = map[string]map[string]string{}
  128. for ns, stats := range namespaces {
  129. // ns == "namespace/<namespaceName>"
  130. nsData := strings.SplitN(ns, "/", 2)
  131. if len(nsData) < 2 {
  132. c.logger.Warn("NamespaceInfo nsData len < 2")
  133. continue
  134. }
  135. nsName := nsData[1]
  136. res[node][nsName] = parseStats(ns, stats, ";")
  137. }
  138. }
  139. return res
  140. }
  141. // Close closes the client's connections to all nodes
  142. func (c *defaultASClient) Close() {
  143. c.cluster.Close()
  144. }
  145. // mapNodeInfoFunc maps a nodeFunc to all nodes in the list in parallel
  146. // if an error occurs during any of the nodeFuncs' execution, it is logged but not returned
  147. // the return value is a clusterInfo map from node name to command to unparsed metric string
  148. func mapNodeInfoFunc(nodes []cluster.Node, nodeF nodeFunc, policy *as.InfoPolicy, logger *zap.SugaredLogger) clusterInfo {
  149. numNodes := len(nodes)
  150. res := make(clusterInfo, numNodes)
  151. type nodeStats struct {
  152. name string
  153. metrics metricsMap
  154. error error
  155. }
  156. var wg sync.WaitGroup
  157. resChan := make(chan nodeStats, numNodes)
  158. for _, nd := range nodes {
  159. wg.Add(1)
  160. go func(nd cluster.Node) {
  161. defer wg.Done()
  162. name := nd.GetName()
  163. metrics, err := nodeF(nd, policy)
  164. if err != nil {
  165. logger.Errorf("mapNodeInfoFunc err: %w", err)
  166. }
  167. ns := nodeStats{
  168. name: name,
  169. metrics: metrics,
  170. error: err,
  171. }
  172. resChan <- ns
  173. }(nd)
  174. }
  175. wg.Wait()
  176. close(resChan)
  177. for ns := range resChan {
  178. res[ns.name] = ns.metrics
  179. }
  180. return res
  181. }
  182. // node wise info functions
  183. // nodeFunc is a function that requests info commands from a node
  184. // and returns a metricsMap from command to metric string
  185. type nodeFunc func(n cluster.Node, policy *as.InfoPolicy) (metricsMap, error)
  186. // namespaceNames is used by nodeFuncs to get the names of all namespaces ona node
  187. func namespaceNames(n cluster.Node, policy *as.InfoPolicy) ([]string, error) {
  188. var namespaces []string
  189. info, err := n.RequestInfo(policy, "namespaces")
  190. if err != nil {
  191. return nil, err
  192. }
  193. namespaces = strings.Split(info["namespaces"], ";")
  194. return namespaces, err
  195. }
  196. // allNodeInfo returns the results of defaultNodeInfoCommands for a node
  197. func allNodeInfo(n cluster.Node, policy *as.InfoPolicy) (metricsMap, error) {
  198. var res metricsMap
  199. commands := defaultNodeInfoCommands
  200. res, err := n.RequestInfo(policy, commands...)
  201. if err != nil {
  202. return nil, err
  203. }
  204. return res, nil
  205. }
  206. // allNamespaceInfo returns the results of namespace/%s for each namespace on the node
  207. func allNamespaceInfo(n cluster.Node, policy *as.InfoPolicy) (metricsMap, error) {
  208. var res metricsMap
  209. names, err := namespaceNames(n, policy)
  210. if err != nil {
  211. return nil, err
  212. }
  213. commands := make([]string, len(names))
  214. for i, name := range names {
  215. commands[i] = fmt.Sprintf("namespace/%s", name)
  216. }
  217. res, err = n.RequestInfo(policy, commands...)
  218. if err != nil {
  219. return nil, err
  220. }
  221. return res, nil
  222. }
  223. func parseStats(defaultKey, s, sep string) metricsMap {
  224. stats := make(metricsMap, strings.Count(s, sep)+1)
  225. s2 := strings.Split(s, sep)
  226. for _, s := range s2 {
  227. list := strings.SplitN(s, "=", 2)
  228. switch len(list) {
  229. case 0:
  230. case 1:
  231. stats[defaultKey] = list[0]
  232. case 2:
  233. stats[list[0]] = list[1]
  234. default:
  235. stats[list[0]] = strings.Join(list[1:], "=")
  236. }
  237. }
  238. return stats
  239. }
  240. // mergeMetricsMap merges values from rm into lm
  241. // logs a warning if a duplicate key is found
  242. func mergeMetricsMap(lm, rm metricsMap, logger *zap.SugaredLogger) {
  243. for k, v := range rm {
  244. if _, ok := lm[k]; ok {
  245. logger.Warnf("duplicate key: %s", k)
  246. }
  247. lm[k] = v
  248. }
  249. }