blobclient.go 1.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758
  1. // Copyright The OpenTelemetry Authors
  2. // SPDX-License-Identifier: Apache-2.0
  3. package azureblobreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/azureblobreceiver"
  4. import (
  5. "bytes"
  6. "context"
  7. "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob"
  8. "go.uber.org/zap"
  9. )
  10. type blobClient interface {
  11. readBlob(ctx context.Context, containerName string, blobName string) (*bytes.Buffer, error)
  12. }
  13. type azureBlobClient struct {
  14. serviceClient *azblob.Client
  15. logger *zap.Logger
  16. }
  17. var _ blobClient = (*azureBlobClient)(nil)
  18. func (bc *azureBlobClient) readBlob(ctx context.Context, containerName string, blobName string) (*bytes.Buffer, error) {
  19. defer func() {
  20. _, blobDeleteErr := bc.serviceClient.DeleteBlob(ctx, containerName, blobName, nil)
  21. if blobDeleteErr != nil {
  22. bc.logger.Error("failed to delete blob", zap.Error(blobDeleteErr))
  23. }
  24. }()
  25. get, err := bc.serviceClient.DownloadStream(ctx, containerName, blobName, nil)
  26. if err != nil {
  27. return nil, err
  28. }
  29. downloadedData := &bytes.Buffer{}
  30. retryReader := get.NewRetryReader(ctx, &azblob.RetryReaderOptions{})
  31. defer retryReader.Close()
  32. _, err = downloadedData.ReadFrom(retryReader)
  33. return downloadedData, err
  34. }
  35. func newBlobClient(connectionString string, logger *zap.Logger) (*azureBlobClient, error) {
  36. serviceClient, err := azblob.NewClientFromConnectionString(connectionString, nil)
  37. if err != nil {
  38. return nil, err
  39. }
  40. return &azureBlobClient{
  41. serviceClient,
  42. logger,
  43. }, nil
  44. }