scraper.go 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211
  1. // Copyright The OpenTelemetry Authors
  2. // SPDX-License-Identifier: Apache-2.0
  3. package memcachedreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/memcachedreceiver"
  4. import (
  5. "context"
  6. "strconv"
  7. "time"
  8. "go.opentelemetry.io/collector/pdata/pcommon"
  9. "go.opentelemetry.io/collector/pdata/pmetric"
  10. "go.opentelemetry.io/collector/receiver"
  11. "go.uber.org/zap"
  12. "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/memcachedreceiver/internal/metadata"
  13. )
  14. type memcachedScraper struct {
  15. logger *zap.Logger
  16. config *Config
  17. mb *metadata.MetricsBuilder
  18. newClient newMemcachedClientFunc
  19. }
  20. func newMemcachedScraper(
  21. settings receiver.CreateSettings,
  22. config *Config,
  23. ) memcachedScraper {
  24. return memcachedScraper{
  25. logger: settings.Logger,
  26. config: config,
  27. newClient: newMemcachedClient,
  28. mb: metadata.NewMetricsBuilder(config.MetricsBuilderConfig, settings),
  29. }
  30. }
  31. func (r *memcachedScraper) scrape(_ context.Context) (pmetric.Metrics, error) {
  32. // Init client in scrape method in case there are transient errors in the
  33. // constructor.
  34. statsClient, err := r.newClient(r.config.Endpoint, r.config.Timeout)
  35. if err != nil {
  36. r.logger.Error("Failed to establish client", zap.Error(err))
  37. return pmetric.Metrics{}, err
  38. }
  39. allServerStats, err := statsClient.Stats()
  40. if err != nil {
  41. r.logger.Error("Failed to fetch memcached stats", zap.Error(err))
  42. return pmetric.Metrics{}, err
  43. }
  44. now := pcommon.NewTimestampFromTime(time.Now())
  45. for _, stats := range allServerStats {
  46. for k, v := range stats.Stats {
  47. switch k {
  48. case "bytes":
  49. if parsedV, ok := r.parseInt(k, v); ok {
  50. r.mb.RecordMemcachedBytesDataPoint(now, parsedV)
  51. }
  52. case "curr_connections":
  53. if parsedV, ok := r.parseInt(k, v); ok {
  54. r.mb.RecordMemcachedConnectionsCurrentDataPoint(now, parsedV)
  55. }
  56. case "total_connections":
  57. if parsedV, ok := r.parseInt(k, v); ok {
  58. r.mb.RecordMemcachedConnectionsTotalDataPoint(now, parsedV)
  59. }
  60. case "cmd_get":
  61. if parsedV, ok := r.parseInt(k, v); ok {
  62. r.mb.RecordMemcachedCommandsDataPoint(now, parsedV, metadata.AttributeCommandGet)
  63. }
  64. case "cmd_set":
  65. if parsedV, ok := r.parseInt(k, v); ok {
  66. r.mb.RecordMemcachedCommandsDataPoint(now, parsedV, metadata.AttributeCommandSet)
  67. }
  68. case "cmd_flush":
  69. if parsedV, ok := r.parseInt(k, v); ok {
  70. r.mb.RecordMemcachedCommandsDataPoint(now, parsedV, metadata.AttributeCommandFlush)
  71. }
  72. case "cmd_touch":
  73. if parsedV, ok := r.parseInt(k, v); ok {
  74. r.mb.RecordMemcachedCommandsDataPoint(now, parsedV, metadata.AttributeCommandTouch)
  75. }
  76. case "curr_items":
  77. if parsedV, ok := r.parseInt(k, v); ok {
  78. r.mb.RecordMemcachedCurrentItemsDataPoint(now, parsedV)
  79. }
  80. case "threads":
  81. if parsedV, ok := r.parseInt(k, v); ok {
  82. r.mb.RecordMemcachedThreadsDataPoint(now, parsedV)
  83. }
  84. case "evictions":
  85. if parsedV, ok := r.parseInt(k, v); ok {
  86. r.mb.RecordMemcachedEvictionsDataPoint(now, parsedV)
  87. }
  88. case "bytes_read":
  89. if parsedV, ok := r.parseInt(k, v); ok {
  90. r.mb.RecordMemcachedNetworkDataPoint(now, parsedV, metadata.AttributeDirectionReceived)
  91. }
  92. case "bytes_written":
  93. if parsedV, ok := r.parseInt(k, v); ok {
  94. r.mb.RecordMemcachedNetworkDataPoint(now, parsedV, metadata.AttributeDirectionSent)
  95. }
  96. case "get_hits":
  97. if parsedV, ok := r.parseInt(k, v); ok {
  98. r.mb.RecordMemcachedOperationsDataPoint(now, parsedV, metadata.AttributeTypeHit,
  99. metadata.AttributeOperationGet)
  100. }
  101. case "get_misses":
  102. if parsedV, ok := r.parseInt(k, v); ok {
  103. r.mb.RecordMemcachedOperationsDataPoint(now, parsedV, metadata.AttributeTypeMiss,
  104. metadata.AttributeOperationGet)
  105. }
  106. case "incr_hits":
  107. if parsedV, ok := r.parseInt(k, v); ok {
  108. r.mb.RecordMemcachedOperationsDataPoint(now, parsedV, metadata.AttributeTypeHit,
  109. metadata.AttributeOperationIncrement)
  110. }
  111. case "incr_misses":
  112. if parsedV, ok := r.parseInt(k, v); ok {
  113. r.mb.RecordMemcachedOperationsDataPoint(now, parsedV, metadata.AttributeTypeMiss,
  114. metadata.AttributeOperationIncrement)
  115. }
  116. case "decr_hits":
  117. if parsedV, ok := r.parseInt(k, v); ok {
  118. r.mb.RecordMemcachedOperationsDataPoint(now, parsedV, metadata.AttributeTypeHit,
  119. metadata.AttributeOperationDecrement)
  120. }
  121. case "decr_misses":
  122. if parsedV, ok := r.parseInt(k, v); ok {
  123. r.mb.RecordMemcachedOperationsDataPoint(now, parsedV, metadata.AttributeTypeMiss,
  124. metadata.AttributeOperationDecrement)
  125. }
  126. case "rusage_system":
  127. if parsedV, ok := r.parseFloat(k, v); ok {
  128. r.mb.RecordMemcachedCPUUsageDataPoint(now, parsedV, metadata.AttributeStateSystem)
  129. }
  130. case "rusage_user":
  131. if parsedV, ok := r.parseFloat(k, v); ok {
  132. r.mb.RecordMemcachedCPUUsageDataPoint(now, parsedV, metadata.AttributeStateUser)
  133. }
  134. }
  135. }
  136. // Calculated Metrics
  137. parsedHit, okHit := r.parseInt("incr_hits", stats.Stats["incr_hits"])
  138. parsedMiss, okMiss := r.parseInt("incr_misses", stats.Stats["incr_misses"])
  139. if okHit && okMiss {
  140. r.mb.RecordMemcachedOperationHitRatioDataPoint(now, calculateHitRatio(parsedHit, parsedMiss),
  141. metadata.AttributeOperationIncrement)
  142. }
  143. parsedHit, okHit = r.parseInt("decr_hits", stats.Stats["decr_hits"])
  144. parsedMiss, okMiss = r.parseInt("decr_misses", stats.Stats["decr_misses"])
  145. if okHit && okMiss {
  146. r.mb.RecordMemcachedOperationHitRatioDataPoint(now, calculateHitRatio(parsedHit, parsedMiss),
  147. metadata.AttributeOperationDecrement)
  148. }
  149. parsedHit, okHit = r.parseInt("get_hits", stats.Stats["get_hits"])
  150. parsedMiss, okMiss = r.parseInt("get_misses", stats.Stats["get_misses"])
  151. if okHit && okMiss {
  152. r.mb.RecordMemcachedOperationHitRatioDataPoint(now, calculateHitRatio(parsedHit, parsedMiss), metadata.AttributeOperationGet)
  153. }
  154. }
  155. return r.mb.Emit(), nil
  156. }
  157. func calculateHitRatio(misses, hits int64) float64 {
  158. if misses+hits == 0 {
  159. return 0
  160. }
  161. hitsFloat := float64(hits)
  162. missesFloat := float64(misses)
  163. return hitsFloat / (hitsFloat + missesFloat) * 100
  164. }
  165. // parseInt converts string to int64.
  166. func (r *memcachedScraper) parseInt(key, value string) (int64, bool) {
  167. i, err := strconv.ParseInt(value, 10, 64)
  168. if err != nil {
  169. r.logInvalid("int", key, value)
  170. return 0, false
  171. }
  172. return i, true
  173. }
  174. // parseFloat converts string to float64.
  175. func (r *memcachedScraper) parseFloat(key, value string) (float64, bool) {
  176. i, err := strconv.ParseFloat(value, 64)
  177. if err != nil {
  178. r.logInvalid("float", key, value)
  179. return 0, false
  180. }
  181. return i, true
  182. }
  183. func (r *memcachedScraper) logInvalid(expectedType, key, value string) {
  184. r.logger.Info(
  185. "invalid value",
  186. zap.String("expectedType", expectedType),
  187. zap.String("key", key),
  188. zap.String("value", value),
  189. )
  190. }