persister.go 1.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142
  1. // Copyright The OpenTelemetry Authors
  2. // SPDX-License-Identifier: Apache-2.0
  3. package azureeventhubreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/azureeventhubreceiver"
  4. import (
  5. "context"
  6. "fmt"
  7. "github.com/Azure/azure-event-hubs-go/v3/persist"
  8. jsoniter "github.com/json-iterator/go"
  9. "go.opentelemetry.io/collector/extension/experimental/storage"
  10. )
  11. const (
  12. storageKeyFormat = "%s/%s/%s/%s"
  13. )
  14. type storageCheckpointPersister struct {
  15. storageClient storage.Client
  16. }
  17. func (s *storageCheckpointPersister) Write(namespace, name, consumerGroup, partitionID string, checkpoint persist.Checkpoint) error {
  18. b, err := jsoniter.Marshal(checkpoint)
  19. if err != nil {
  20. return err
  21. }
  22. return s.storageClient.Set(context.Background(), fmt.Sprintf(storageKeyFormat, namespace, name, consumerGroup, partitionID), b)
  23. }
  24. func (s *storageCheckpointPersister) Read(namespace, name, consumerGroup, partitionID string) (persist.Checkpoint, error) {
  25. var checkpoint persist.Checkpoint
  26. bytes, err := s.storageClient.Get(context.Background(), fmt.Sprintf(storageKeyFormat, namespace, name, consumerGroup, partitionID))
  27. if err != nil {
  28. return persist.NewCheckpointFromStartOfStream(), err
  29. }
  30. if len(bytes) == 0 {
  31. return persist.NewCheckpointFromStartOfStream(), err
  32. }
  33. err = jsoniter.Unmarshal(bytes, &checkpoint)
  34. return checkpoint, err
  35. }