logs.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360
  1. // Copyright The OpenTelemetry Authors
  2. // SPDX-License-Identifier: Apache-2.0
  3. package awscloudwatchreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscloudwatchreceiver"
  4. import (
  5. "context"
  6. "errors"
  7. "fmt"
  8. "sync"
  9. "time"
  10. "github.com/aws/aws-sdk-go/aws"
  11. "github.com/aws/aws-sdk-go/aws/request"
  12. "github.com/aws/aws-sdk-go/aws/session"
  13. "github.com/aws/aws-sdk-go/service/cloudwatchlogs"
  14. "go.opentelemetry.io/collector/component"
  15. "go.opentelemetry.io/collector/consumer"
  16. "go.opentelemetry.io/collector/pdata/pcommon"
  17. "go.opentelemetry.io/collector/pdata/plog"
  18. "go.uber.org/zap"
  19. )
  20. const (
  21. noStreamName = "THIS IS INVALID STREAM"
  22. )
  23. type logsReceiver struct {
  24. region string
  25. profile string
  26. imdsEndpoint string
  27. pollInterval time.Duration
  28. maxEventsPerRequest int
  29. nextStartTime time.Time
  30. groupRequests []groupRequest
  31. autodiscover *AutodiscoverConfig
  32. logger *zap.Logger
  33. client client
  34. consumer consumer.Logs
  35. wg *sync.WaitGroup
  36. doneChan chan bool
  37. }
  38. const maxLogGroupsPerDiscovery = int64(50)
  39. type client interface {
  40. DescribeLogGroupsWithContext(ctx context.Context, input *cloudwatchlogs.DescribeLogGroupsInput, opts ...request.Option) (*cloudwatchlogs.DescribeLogGroupsOutput, error)
  41. FilterLogEventsWithContext(ctx context.Context, input *cloudwatchlogs.FilterLogEventsInput, opts ...request.Option) (*cloudwatchlogs.FilterLogEventsOutput, error)
  42. }
  43. type streamNames struct {
  44. group string
  45. names []*string
  46. }
  47. func (sn *streamNames) request(limit int, nextToken string, st, et *time.Time) *cloudwatchlogs.FilterLogEventsInput {
  48. base := &cloudwatchlogs.FilterLogEventsInput{
  49. LogGroupName: &sn.group,
  50. StartTime: aws.Int64(st.UnixMilli()),
  51. EndTime: aws.Int64(et.UnixMilli()),
  52. Limit: aws.Int64(int64(limit)),
  53. }
  54. if len(sn.names) > 0 {
  55. base.LogStreamNames = sn.names
  56. }
  57. if nextToken != "" {
  58. base.NextToken = aws.String(nextToken)
  59. }
  60. return base
  61. }
  62. func (sn *streamNames) groupName() string {
  63. return sn.group
  64. }
  65. type streamPrefix struct {
  66. group string
  67. prefix *string
  68. }
  69. func (sp *streamPrefix) request(limit int, nextToken string, st, et *time.Time) *cloudwatchlogs.FilterLogEventsInput {
  70. base := &cloudwatchlogs.FilterLogEventsInput{
  71. LogGroupName: &sp.group,
  72. StartTime: aws.Int64(st.UnixMilli()),
  73. EndTime: aws.Int64(et.UnixMilli()),
  74. Limit: aws.Int64(int64(limit)),
  75. LogStreamNamePrefix: sp.prefix,
  76. }
  77. if nextToken != "" {
  78. base.NextToken = aws.String(nextToken)
  79. }
  80. return base
  81. }
  82. func (sp *streamPrefix) groupName() string {
  83. return sp.group
  84. }
  85. type groupRequest interface {
  86. request(limit int, nextToken string, st, et *time.Time) *cloudwatchlogs.FilterLogEventsInput
  87. groupName() string
  88. }
  89. func newLogsReceiver(cfg *Config, logger *zap.Logger, consumer consumer.Logs) *logsReceiver {
  90. groups := []groupRequest{}
  91. for logGroupName, sc := range cfg.Logs.Groups.NamedConfigs {
  92. for _, prefix := range sc.Prefixes {
  93. groups = append(groups, &streamPrefix{group: logGroupName, prefix: prefix})
  94. }
  95. if len(sc.Names) > 0 {
  96. groups = append(groups, &streamNames{group: logGroupName, names: sc.Names})
  97. }
  98. }
  99. // safeguard from using both
  100. autodiscover := cfg.Logs.Groups.AutodiscoverConfig
  101. if len(cfg.Logs.Groups.NamedConfigs) > 0 {
  102. autodiscover = nil
  103. }
  104. return &logsReceiver{
  105. region: cfg.Region,
  106. profile: cfg.Profile,
  107. consumer: consumer,
  108. maxEventsPerRequest: cfg.Logs.MaxEventsPerRequest,
  109. imdsEndpoint: cfg.IMDSEndpoint,
  110. autodiscover: autodiscover,
  111. pollInterval: cfg.Logs.PollInterval,
  112. nextStartTime: time.Now().Add(-cfg.Logs.PollInterval),
  113. groupRequests: groups,
  114. logger: logger,
  115. wg: &sync.WaitGroup{},
  116. doneChan: make(chan bool),
  117. }
  118. }
  119. func (l *logsReceiver) Start(ctx context.Context, _ component.Host) error {
  120. l.logger.Debug("starting to poll for Cloudwatch logs")
  121. l.wg.Add(1)
  122. go l.startPolling(ctx)
  123. return nil
  124. }
  125. func (l *logsReceiver) Shutdown(_ context.Context) error {
  126. l.logger.Debug("shutting down logs receiver")
  127. close(l.doneChan)
  128. l.wg.Wait()
  129. return nil
  130. }
  131. func (l *logsReceiver) startPolling(ctx context.Context) {
  132. defer l.wg.Done()
  133. t := time.NewTicker(l.pollInterval)
  134. for {
  135. select {
  136. case <-ctx.Done():
  137. return
  138. case <-l.doneChan:
  139. return
  140. case <-t.C:
  141. if l.autodiscover != nil {
  142. group, err := l.discoverGroups(ctx, l.autodiscover)
  143. if err != nil {
  144. l.logger.Error("unable to perform discovery of log groups", zap.Error(err))
  145. continue
  146. }
  147. l.groupRequests = group
  148. }
  149. err := l.poll(ctx)
  150. if err != nil {
  151. l.logger.Error("there was an error during the poll", zap.Error(err))
  152. }
  153. }
  154. }
  155. }
  156. func (l *logsReceiver) poll(ctx context.Context) error {
  157. var errs error
  158. startTime := l.nextStartTime
  159. endTime := time.Now()
  160. for _, r := range l.groupRequests {
  161. if err := l.pollForLogs(ctx, r, startTime, endTime); err != nil {
  162. errs = errors.Join(errs, err)
  163. }
  164. }
  165. l.nextStartTime = endTime
  166. return errs
  167. }
  168. func (l *logsReceiver) pollForLogs(ctx context.Context, pc groupRequest, startTime, endTime time.Time) error {
  169. err := l.ensureSession()
  170. if err != nil {
  171. return err
  172. }
  173. nextToken := aws.String("")
  174. for nextToken != nil {
  175. select {
  176. // if done, we want to stop processing paginated stream of events
  177. case _, ok := <-l.doneChan:
  178. if !ok {
  179. return nil
  180. }
  181. default:
  182. input := pc.request(l.maxEventsPerRequest, *nextToken, &startTime, &endTime)
  183. resp, err := l.client.FilterLogEventsWithContext(ctx, input)
  184. if err != nil {
  185. l.logger.Error("unable to retrieve logs from cloudwatch", zap.String("log group", pc.groupName()), zap.Error(err))
  186. break
  187. }
  188. observedTime := pcommon.NewTimestampFromTime(time.Now())
  189. logs := l.processEvents(observedTime, pc.groupName(), resp)
  190. if logs.LogRecordCount() > 0 {
  191. if err = l.consumer.ConsumeLogs(ctx, logs); err != nil {
  192. l.logger.Error("unable to consume logs", zap.Error(err))
  193. break
  194. }
  195. }
  196. nextToken = resp.NextToken
  197. }
  198. }
  199. return nil
  200. }
  201. func (l *logsReceiver) processEvents(now pcommon.Timestamp, logGroupName string, output *cloudwatchlogs.FilterLogEventsOutput) plog.Logs {
  202. logs := plog.NewLogs()
  203. resourceMap := map[string](map[string]*plog.ResourceLogs){}
  204. for _, e := range output.Events {
  205. if e.Timestamp == nil {
  206. l.logger.Error("unable to determine timestamp of event as the timestamp is nil")
  207. continue
  208. }
  209. if e.EventId == nil {
  210. l.logger.Error("no event ID was present on the event, skipping entry")
  211. continue
  212. }
  213. if e.Message == nil {
  214. l.logger.Error("no message was present on the event", zap.String("event.id", *e.EventId))
  215. continue
  216. }
  217. group, ok := resourceMap[logGroupName]
  218. if !ok {
  219. group = map[string]*plog.ResourceLogs{}
  220. resourceMap[logGroupName] = group
  221. }
  222. logStreamName := noStreamName
  223. if e.LogStreamName != nil {
  224. logStreamName = *e.LogStreamName
  225. }
  226. resourceLogs, ok := group[logStreamName]
  227. if !ok {
  228. rl := logs.ResourceLogs().AppendEmpty()
  229. resourceLogs = &rl
  230. resourceAttributes := resourceLogs.Resource().Attributes()
  231. resourceAttributes.PutStr("aws.region", l.region)
  232. resourceAttributes.PutStr("cloudwatch.log.group.name", logGroupName)
  233. resourceAttributes.PutStr("cloudwatch.log.stream", logStreamName)
  234. group[logStreamName] = resourceLogs
  235. // Ensure one scopeLogs is initialized so we can handle in standardized way going forward.
  236. _ = resourceLogs.ScopeLogs().AppendEmpty()
  237. }
  238. // Now we know resourceLogs is initialized and has one scopeLogs so we don't have to handle any special cases.
  239. logRecord := resourceLogs.ScopeLogs().At(0).LogRecords().AppendEmpty()
  240. logRecord.SetObservedTimestamp(now)
  241. ts := time.UnixMilli(*e.Timestamp)
  242. logRecord.SetTimestamp(pcommon.NewTimestampFromTime(ts))
  243. logRecord.Body().SetStr(*e.Message)
  244. logRecord.Attributes().PutStr("id", *e.EventId)
  245. }
  246. return logs
  247. }
  248. func (l *logsReceiver) discoverGroups(ctx context.Context, auto *AutodiscoverConfig) ([]groupRequest, error) {
  249. l.logger.Debug("attempting to discover log groups.", zap.Int("limit", auto.Limit))
  250. groups := []groupRequest{}
  251. err := l.ensureSession()
  252. if err != nil {
  253. return groups, fmt.Errorf("unable to establish a session to auto discover log groups: %w", err)
  254. }
  255. numGroups := 0
  256. var nextToken = aws.String("")
  257. for nextToken != nil {
  258. if numGroups >= auto.Limit {
  259. break
  260. }
  261. req := &cloudwatchlogs.DescribeLogGroupsInput{
  262. Limit: aws.Int64(maxLogGroupsPerDiscovery),
  263. }
  264. if auto.Prefix != "" {
  265. req.LogGroupNamePrefix = &auto.Prefix
  266. }
  267. dlgResults, err := l.client.DescribeLogGroupsWithContext(ctx, req)
  268. if err != nil {
  269. return groups, fmt.Errorf("unable to list log groups: %w", err)
  270. }
  271. for _, lg := range dlgResults.LogGroups {
  272. if numGroups == auto.Limit {
  273. l.logger.Debug("reached limit of the number of log groups to discover."+
  274. "To increase the number of groups able to be discovered, please increase the autodiscover limit field.",
  275. zap.Int("groups_discovered", numGroups), zap.Int("limit", auto.Limit))
  276. break
  277. }
  278. numGroups++
  279. l.logger.Debug("discovered log group", zap.String("log group", lg.GoString()))
  280. // default behavior is to collect all if not stream filtered
  281. if len(auto.Streams.Names) == 0 && len(auto.Streams.Prefixes) == 0 {
  282. groups = append(groups, &streamNames{group: *lg.LogGroupName})
  283. continue
  284. }
  285. for _, prefix := range auto.Streams.Prefixes {
  286. groups = append(groups, &streamPrefix{group: *lg.LogGroupName, prefix: prefix})
  287. }
  288. if len(auto.Streams.Names) > 0 {
  289. groups = append(groups, &streamNames{group: *lg.LogGroupName, names: auto.Streams.Names})
  290. }
  291. }
  292. nextToken = dlgResults.NextToken
  293. }
  294. return groups, nil
  295. }
  296. func (l *logsReceiver) ensureSession() error {
  297. if l.client != nil {
  298. return nil
  299. }
  300. awsConfig := aws.NewConfig().WithRegion(l.region)
  301. options := session.Options{
  302. Config: *awsConfig,
  303. }
  304. if l.imdsEndpoint != "" {
  305. options.EC2IMDSEndpoint = l.imdsEndpoint
  306. }
  307. if l.profile != "" {
  308. options.Profile = l.profile
  309. }
  310. s, err := session.NewSessionWithOptions(options)
  311. l.client = cloudwatchlogs.New(s)
  312. return err
  313. }