handler_test.go 1.8 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061
  1. // Copyright The OpenTelemetry Authors
  2. // SPDX-License-Identifier: Apache-2.0
  3. package internal
  4. import (
  5. "context"
  6. "testing"
  7. "time"
  8. pubsub "cloud.google.com/go/pubsub/apiv1"
  9. "cloud.google.com/go/pubsub/apiv1/pubsubpb"
  10. "cloud.google.com/go/pubsub/pstest"
  11. "github.com/stretchr/testify/assert"
  12. "go.uber.org/zap/zaptest"
  13. "google.golang.org/api/option"
  14. "google.golang.org/grpc"
  15. "google.golang.org/grpc/credentials/insecure"
  16. )
  17. func TestCancelStream(t *testing.T) {
  18. ctx := context.Background()
  19. srv := pstest.NewServer()
  20. defer srv.Close()
  21. var copts []option.ClientOption
  22. var dialOpts []grpc.DialOption
  23. conn, err := grpc.Dial(srv.Addr, append(dialOpts, grpc.WithTransportCredentials(insecure.NewCredentials()))...)
  24. assert.NoError(t, err)
  25. copts = append(copts, option.WithGRPCConn(conn))
  26. _, err = srv.GServer.CreateTopic(ctx, &pubsubpb.Topic{
  27. Name: "projects/my-project/topics/otlp",
  28. })
  29. assert.NoError(t, err)
  30. _, err = srv.GServer.CreateSubscription(ctx, &pubsubpb.Subscription{
  31. Topic: "projects/my-project/topics/otlp",
  32. Name: "projects/my-project/subscriptions/otlp",
  33. AckDeadlineSeconds: 10,
  34. })
  35. assert.NoError(t, err)
  36. client, err := pubsub.NewSubscriberClient(ctx, copts...)
  37. assert.NoError(t, err)
  38. handler, err := NewHandler(context.Background(), zaptest.NewLogger(t), client, "client-id", "projects/my-project/subscriptions/otlp",
  39. func(ctx context.Context, message *pubsubpb.ReceivedMessage) error {
  40. return nil
  41. })
  42. handler.ackBatchWait = 10 * time.Millisecond
  43. assert.NoError(t, err)
  44. srv.Publish("projects/my-project/topics/otlp", []byte{}, map[string]string{
  45. "ce-type": "org.opentelemetry.otlp.traces.v1",
  46. "content-type": "application/protobuf",
  47. })
  48. handler.RecoverableStream(ctx)
  49. go func() {
  50. time.Sleep(100 * time.Millisecond)
  51. handler.CancelNow()
  52. }()
  53. handler.Wait()
  54. }