metrics.go 2.2 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374
  1. // Copyright The OpenTelemetry Authors
  2. // SPDX-License-Identifier: Apache-2.0
  3. package kafkareceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kafkareceiver"
  4. import (
  5. "go.opencensus.io/stats"
  6. "go.opencensus.io/stats/view"
  7. "go.opencensus.io/tag"
  8. )
  9. var (
  10. tagInstanceName, _ = tag.NewKey("name")
  11. statMessageCount = stats.Int64("kafka_receiver_messages", "Number of received messages", stats.UnitDimensionless)
  12. statMessageOffset = stats.Int64("kafka_receiver_current_offset", "Current message offset", stats.UnitDimensionless)
  13. statMessageOffsetLag = stats.Int64("kafka_receiver_offset_lag", "Current offset lag", stats.UnitDimensionless)
  14. statPartitionStart = stats.Int64("kafka_receiver_partition_start", "Number of started partitions", stats.UnitDimensionless)
  15. statPartitionClose = stats.Int64("kafka_receiver_partition_close", "Number of finished partitions", stats.UnitDimensionless)
  16. )
  17. // metricViews return metric views for Kafka receiver.
  18. func metricViews() []*view.View {
  19. tagKeys := []tag.Key{tagInstanceName}
  20. countMessages := &view.View{
  21. Name: statMessageCount.Name(),
  22. Measure: statMessageCount,
  23. Description: statMessageCount.Description(),
  24. TagKeys: tagKeys,
  25. Aggregation: view.Sum(),
  26. }
  27. lastValueOffset := &view.View{
  28. Name: statMessageOffset.Name(),
  29. Measure: statMessageOffset,
  30. Description: statMessageOffset.Description(),
  31. TagKeys: tagKeys,
  32. Aggregation: view.LastValue(),
  33. }
  34. lastValueOffsetLag := &view.View{
  35. Name: statMessageOffsetLag.Name(),
  36. Measure: statMessageOffsetLag,
  37. Description: statMessageOffsetLag.Description(),
  38. TagKeys: tagKeys,
  39. Aggregation: view.LastValue(),
  40. }
  41. countPartitionStart := &view.View{
  42. Name: statPartitionStart.Name(),
  43. Measure: statPartitionStart,
  44. Description: statPartitionStart.Description(),
  45. TagKeys: tagKeys,
  46. Aggregation: view.Sum(),
  47. }
  48. countPartitionClose := &view.View{
  49. Name: statPartitionClose.Name(),
  50. Measure: statPartitionClose,
  51. Description: statPartitionClose.Description(),
  52. TagKeys: tagKeys,
  53. Aggregation: view.Sum(),
  54. }
  55. return []*view.View{
  56. countMessages,
  57. lastValueOffset,
  58. lastValueOffsetLag,
  59. countPartitionStart,
  60. countPartitionClose,
  61. }
  62. }