123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309 |
- // Copyright The OpenTelemetry Authors
- // SPDX-License-Identifier: Apache-2.0
- package aerospikereceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/aerospikereceiver"
- import (
- "crypto/tls"
- "fmt"
- "strings"
- "sync"
- "time"
- as "github.com/aerospike/aerospike-client-go/v6"
- "go.uber.org/zap"
- "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/aerospikereceiver/cluster"
- )
- var defaultNodeInfoCommands = []string{
- "node",
- "statistics",
- }
- // nodeName: metricName: stats
- type clusterInfo = map[string]map[string]string
- // Aerospike is the interface that provides information about a given node
- type Aerospike interface {
- // NamespaceInfo gets information about a specific namespace
- NamespaceInfo() namespaceInfo
- // Info gets high-level information about the node/system.
- Info() clusterInfo
- // Close closes the connection to the Aerospike node
- Close()
- }
- type clientConfig struct {
- host *as.Host
- username string
- password string
- timeout time.Duration
- logger *zap.SugaredLogger
- collectClusterMetrics bool
- tls *tls.Config
- }
- type nodeGetter interface {
- GetNodes() []cluster.Node
- Close()
- }
- type defaultASClient struct {
- cluster nodeGetter
- policy *as.ClientPolicy // Timeout and authentication information
- logger *zap.SugaredLogger // logs malformed metrics in responses
- }
- type nodeGetterFactoryFunc func(cfg *clientConfig, policy *as.ClientPolicy, authEnabled bool) (nodeGetter, error)
- func nodeGetterFactory(cfg *clientConfig, policy *as.ClientPolicy, authEnabled bool) (nodeGetter, error) {
- hosts := []*as.Host{cfg.host}
- if cfg.collectClusterMetrics {
- cluster, err := cluster.NewCluster(policy, hosts)
- return cluster, err
- }
- cluster, err := cluster.NewSubsetCluster(
- policy,
- hosts,
- authEnabled,
- )
- return cluster, err
- }
- // newASClient creates a new defaultASClient connected to the given host and port
- // If collectClusterMetrics is true, the client will connect to and tend all nodes in the cluster
- // If username and password aren't blank, they're used to authenticate
- func newASClient(cfg *clientConfig, ngf nodeGetterFactoryFunc) (*defaultASClient, error) {
- authEnabled := cfg.username != "" && cfg.password != ""
- policy := as.NewClientPolicy()
- policy.Timeout = cfg.timeout
- if authEnabled {
- policy.User = cfg.username
- policy.Password = cfg.password
- }
- if cfg.tls != nil {
- // enable TLS
- policy.TlsConfig = cfg.tls
- }
- cluster, err := ngf(cfg, policy, authEnabled)
- if err != nil {
- return nil, err
- }
- return &defaultASClient{
- cluster: cluster,
- logger: cfg.logger,
- policy: policy,
- }, nil
- }
- // useNodeFunc maps a nodeFunc to all the client's nodes
- func (c *defaultASClient) useNodeFunc(nf nodeFunc) clusterInfo {
- var res clusterInfo
- nodes := c.cluster.GetNodes()
- policy := as.NewInfoPolicy()
- policy.Timeout = c.policy.Timeout
- res = mapNodeInfoFunc(
- nodes,
- nf,
- policy,
- c.logger,
- )
- return res
- }
- // metricName: stat
- // may be used as, commandName: stat
- type metricsMap = map[string]string
- // Info returns a clusterInfo map of node names to metricMaps
- // it uses the info commands defined in defaultNodeInfoCommands
- func (c *defaultASClient) Info() clusterInfo {
- res := clusterInfo{}
- // NOTE this discards the command names
- metricsToParse := c.useNodeFunc(allNodeInfo)
- for node, commands := range metricsToParse {
- res[node] = metricsMap{}
- for command, stats := range commands {
- ps := parseStats(command, stats, ";")
- mergeMetricsMap(res[node], ps, c.logger)
- }
- }
- return res
- }
- // nodeName: namespaceName: metricName: stats
- type namespaceInfo = map[string]map[string]map[string]string
- // NamespaceInfo returns a namespaceInfo map
- // the map contains the results of the "namespace/<name>" info command
- // for all nodes' namespaces
- func (c *defaultASClient) NamespaceInfo() namespaceInfo {
- res := namespaceInfo{}
- metricsToParse := c.useNodeFunc(allNamespaceInfo)
- for node, namespaces := range metricsToParse {
- res[node] = map[string]map[string]string{}
- for ns, stats := range namespaces {
- // ns == "namespace/<namespaceName>"
- nsData := strings.SplitN(ns, "/", 2)
- if len(nsData) < 2 {
- c.logger.Warn("NamespaceInfo nsData len < 2")
- continue
- }
- nsName := nsData[1]
- res[node][nsName] = parseStats(ns, stats, ";")
- }
- }
- return res
- }
- // Close closes the client's connections to all nodes
- func (c *defaultASClient) Close() {
- c.cluster.Close()
- }
- // mapNodeInfoFunc maps a nodeFunc to all nodes in the list in parallel
- // if an error occurs during any of the nodeFuncs' execution, it is logged but not returned
- // the return value is a clusterInfo map from node name to command to unparsed metric string
- func mapNodeInfoFunc(nodes []cluster.Node, nodeF nodeFunc, policy *as.InfoPolicy, logger *zap.SugaredLogger) clusterInfo {
- numNodes := len(nodes)
- res := make(clusterInfo, numNodes)
- type nodeStats struct {
- name string
- metrics metricsMap
- error error
- }
- var wg sync.WaitGroup
- resChan := make(chan nodeStats, numNodes)
- for _, nd := range nodes {
- wg.Add(1)
- go func(nd cluster.Node) {
- defer wg.Done()
- name := nd.GetName()
- metrics, err := nodeF(nd, policy)
- if err != nil {
- logger.Errorf("mapNodeInfoFunc err: %w", err)
- }
- ns := nodeStats{
- name: name,
- metrics: metrics,
- error: err,
- }
- resChan <- ns
- }(nd)
- }
- wg.Wait()
- close(resChan)
- for ns := range resChan {
- res[ns.name] = ns.metrics
- }
- return res
- }
- // node wise info functions
- // nodeFunc is a function that requests info commands from a node
- // and returns a metricsMap from command to metric string
- type nodeFunc func(n cluster.Node, policy *as.InfoPolicy) (metricsMap, error)
- // namespaceNames is used by nodeFuncs to get the names of all namespaces ona node
- func namespaceNames(n cluster.Node, policy *as.InfoPolicy) ([]string, error) {
- var namespaces []string
- info, err := n.RequestInfo(policy, "namespaces")
- if err != nil {
- return nil, err
- }
- namespaces = strings.Split(info["namespaces"], ";")
- return namespaces, err
- }
- // allNodeInfo returns the results of defaultNodeInfoCommands for a node
- func allNodeInfo(n cluster.Node, policy *as.InfoPolicy) (metricsMap, error) {
- var res metricsMap
- commands := defaultNodeInfoCommands
- res, err := n.RequestInfo(policy, commands...)
- if err != nil {
- return nil, err
- }
- return res, nil
- }
- // allNamespaceInfo returns the results of namespace/%s for each namespace on the node
- func allNamespaceInfo(n cluster.Node, policy *as.InfoPolicy) (metricsMap, error) {
- var res metricsMap
- names, err := namespaceNames(n, policy)
- if err != nil {
- return nil, err
- }
- commands := make([]string, len(names))
- for i, name := range names {
- commands[i] = fmt.Sprintf("namespace/%s", name)
- }
- res, err = n.RequestInfo(policy, commands...)
- if err != nil {
- return nil, err
- }
- return res, nil
- }
- func parseStats(defaultKey, s, sep string) metricsMap {
- stats := make(metricsMap, strings.Count(s, sep)+1)
- s2 := strings.Split(s, sep)
- for _, s := range s2 {
- list := strings.SplitN(s, "=", 2)
- switch len(list) {
- case 0:
- case 1:
- stats[defaultKey] = list[0]
- case 2:
- stats[list[0]] = list[1]
- default:
- stats[list[0]] = strings.Join(list[1:], "=")
- }
- }
- return stats
- }
- // mergeMetricsMap merges values from rm into lm
- // logs a warning if a duplicate key is found
- func mergeMetricsMap(lm, rm metricsMap, logger *zap.SugaredLogger) {
- for k, v := range rm {
- if _, ok := lm[k]; ok {
- logger.Warnf("duplicate key: %s", k)
- }
- lm[k] = v
- }
- }
|