123456789101112131415161718192021222324252627282930313233343536373839404142 |
- // Copyright The OpenTelemetry Authors
- // SPDX-License-Identifier: Apache-2.0
- package azureeventhubreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/azureeventhubreceiver"
- import (
- "context"
- "fmt"
- "github.com/Azure/azure-event-hubs-go/v3/persist"
- jsoniter "github.com/json-iterator/go"
- "go.opentelemetry.io/collector/extension/experimental/storage"
- )
- const (
- storageKeyFormat = "%s/%s/%s/%s"
- )
- type storageCheckpointPersister struct {
- storageClient storage.Client
- }
- func (s *storageCheckpointPersister) Write(namespace, name, consumerGroup, partitionID string, checkpoint persist.Checkpoint) error {
- b, err := jsoniter.Marshal(checkpoint)
- if err != nil {
- return err
- }
- return s.storageClient.Set(context.Background(), fmt.Sprintf(storageKeyFormat, namespace, name, consumerGroup, partitionID), b)
- }
- func (s *storageCheckpointPersister) Read(namespace, name, consumerGroup, partitionID string) (persist.Checkpoint, error) {
- var checkpoint persist.Checkpoint
- bytes, err := s.storageClient.Get(context.Background(), fmt.Sprintf(storageKeyFormat, namespace, name, consumerGroup, partitionID))
- if err != nil {
- return persist.NewCheckpointFromStartOfStream(), err
- }
- if len(bytes) == 0 {
- return persist.NewCheckpointFromStartOfStream(), err
- }
- err = jsoniter.Unmarshal(bytes, &checkpoint)
- return checkpoint, err
- }
|