scraper.go 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218
  1. // Copyright The OpenTelemetry Authors
  2. // SPDX-License-Identifier: Apache-2.0
  3. package zookeeperreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/zookeeperreceiver"
  4. import (
  5. "bufio"
  6. "context"
  7. "errors"
  8. "fmt"
  9. "net"
  10. "regexp"
  11. "strconv"
  12. "time"
  13. "go.opentelemetry.io/collector/pdata/pcommon"
  14. "go.opentelemetry.io/collector/pdata/pmetric"
  15. "go.opentelemetry.io/collector/receiver"
  16. "go.uber.org/zap"
  17. "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/zookeeperreceiver/internal/metadata"
  18. )
  19. var zookeeperFormatRE = regexp.MustCompile(`(^zk_\w+)\s+([\w\.\-]+)`)
  20. const (
  21. mntrCommand = "mntr"
  22. ruokCommand = "ruok"
  23. )
  24. type zookeeperMetricsScraper struct {
  25. logger *zap.Logger
  26. config *Config
  27. cancel context.CancelFunc
  28. rb *metadata.ResourceBuilder
  29. mb *metadata.MetricsBuilder
  30. // For mocking.
  31. closeConnection func(net.Conn) error
  32. setConnectionDeadline func(net.Conn, time.Time) error
  33. sendCmd func(net.Conn, string) (*bufio.Scanner, error)
  34. }
  35. func (z *zookeeperMetricsScraper) Name() string {
  36. return metadata.Type
  37. }
  38. func newZookeeperMetricsScraper(settings receiver.CreateSettings, config *Config) (*zookeeperMetricsScraper, error) {
  39. _, _, err := net.SplitHostPort(config.TCPAddr.Endpoint)
  40. if err != nil {
  41. return nil, err
  42. }
  43. if config.Timeout <= 0 {
  44. return nil, errors.New("timeout must be a positive duration")
  45. }
  46. z := &zookeeperMetricsScraper{
  47. logger: settings.Logger,
  48. config: config,
  49. rb: metadata.NewResourceBuilder(config.ResourceAttributes),
  50. mb: metadata.NewMetricsBuilder(config.MetricsBuilderConfig, settings),
  51. closeConnection: closeConnection,
  52. setConnectionDeadline: setConnectionDeadline,
  53. sendCmd: sendCmd,
  54. }
  55. return z, nil
  56. }
  57. func (z *zookeeperMetricsScraper) shutdown(_ context.Context) error {
  58. if z.cancel != nil {
  59. z.cancel()
  60. z.cancel = nil
  61. }
  62. return nil
  63. }
  64. func (z *zookeeperMetricsScraper) scrape(ctx context.Context) (pmetric.Metrics, error) {
  65. responseMntr, err := z.runCommand(ctx, "mntr")
  66. if err != nil {
  67. return pmetric.NewMetrics(), err
  68. }
  69. responseRuok, err := z.runCommand(ctx, "ruok")
  70. if err != nil {
  71. return pmetric.NewMetrics(), err
  72. }
  73. z.processMntr(responseMntr)
  74. z.processRuok(responseRuok)
  75. return z.mb.Emit(metadata.WithResource(z.rb.Emit())), nil
  76. }
  77. func (z *zookeeperMetricsScraper) runCommand(ctx context.Context, command string) ([]string, error) {
  78. conn, err := z.config.Dial()
  79. if err != nil {
  80. z.logger.Error("failed to establish connection",
  81. zap.String("endpoint", z.config.Endpoint),
  82. zap.Error(err),
  83. )
  84. return nil, err
  85. }
  86. defer func() {
  87. if closeErr := z.closeConnection(conn); closeErr != nil {
  88. z.logger.Warn("failed to shutdown connection", zap.Error(closeErr))
  89. }
  90. }()
  91. deadline, ok := ctx.Deadline()
  92. if ok {
  93. if err = z.setConnectionDeadline(conn, deadline); err != nil {
  94. z.logger.Warn("failed to set deadline on connection", zap.Error(err))
  95. }
  96. }
  97. scanner, err := z.sendCmd(conn, command)
  98. if err != nil {
  99. z.logger.Error("failed to send command",
  100. zap.Error(err),
  101. zap.String("command", command),
  102. )
  103. return nil, err
  104. }
  105. var response []string
  106. for scanner.Scan() {
  107. response = append(response, scanner.Text())
  108. }
  109. return response, nil
  110. }
  111. func (z *zookeeperMetricsScraper) processMntr(response []string) {
  112. creator := newMetricCreator(z.mb)
  113. now := pcommon.NewTimestampFromTime(time.Now())
  114. for _, line := range response {
  115. parts := zookeeperFormatRE.FindStringSubmatch(line)
  116. if len(parts) != 3 {
  117. z.logger.Warn("unexpected line in response",
  118. zap.String("command", mntrCommand),
  119. zap.String("line", line),
  120. )
  121. continue
  122. }
  123. metricKey := parts[1]
  124. metricValue := parts[2]
  125. switch metricKey {
  126. case zkVersionKey:
  127. z.rb.SetZkVersion(metricValue)
  128. continue
  129. case serverStateKey:
  130. z.rb.SetServerState(metricValue)
  131. continue
  132. default:
  133. // Skip metric if there is no descriptor associated with it.
  134. recordDataPoints := creator.recordDataPointsFunc(metricKey)
  135. if recordDataPoints == nil {
  136. // Unexported metric, just move to the next line.
  137. continue
  138. }
  139. int64Val, err := strconv.ParseInt(metricValue, 10, 64)
  140. if err != nil {
  141. z.logger.Debug(
  142. fmt.Sprintf("non-integer value from %s", mntrCommand),
  143. zap.String("value", metricValue),
  144. )
  145. continue
  146. }
  147. recordDataPoints(now, int64Val)
  148. }
  149. }
  150. // Generate computed metrics
  151. creator.generateComputedMetrics(z.logger, now)
  152. }
  153. func (z *zookeeperMetricsScraper) processRuok(response []string) {
  154. creator := newMetricCreator(z.mb)
  155. now := pcommon.NewTimestampFromTime(time.Now())
  156. metricKey := "ruok"
  157. metricValue := int64(0)
  158. if len(response) > 0 {
  159. if response[0] == "imok" {
  160. metricValue = int64(1)
  161. } else {
  162. z.logger.Error("invalid response from ruok",
  163. zap.String("command", ruokCommand),
  164. )
  165. return
  166. }
  167. }
  168. recordDataPoints := creator.recordDataPointsFunc(metricKey)
  169. recordDataPoints(now, metricValue)
  170. }
  171. func closeConnection(conn net.Conn) error {
  172. return conn.Close()
  173. }
  174. func setConnectionDeadline(conn net.Conn, deadline time.Time) error {
  175. return conn.SetDeadline(deadline)
  176. }
  177. func sendCmd(conn net.Conn, cmd string) (*bufio.Scanner, error) {
  178. _, err := fmt.Fprintf(conn, "%s\n", cmd)
  179. if err != nil {
  180. return nil, err
  181. }
  182. reader := bufio.NewReader(conn)
  183. scanner := bufio.NewScanner(reader)
  184. return scanner, nil
  185. }