logs_receiver.go 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318
  1. // Copyright The OpenTelemetry Authors
  2. // SPDX-License-Identifier: Apache-2.0
  3. package sqlqueryreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/sqlqueryreceiver"
  4. import (
  5. "context"
  6. "database/sql"
  7. "fmt"
  8. "time"
  9. "go.opentelemetry.io/collector/component"
  10. "go.opentelemetry.io/collector/consumer"
  11. "go.opentelemetry.io/collector/extension/experimental/storage"
  12. "go.opentelemetry.io/collector/pdata/pcommon"
  13. "go.opentelemetry.io/collector/pdata/plog"
  14. "go.opentelemetry.io/collector/receiver"
  15. "go.opentelemetry.io/collector/receiver/receiverhelper"
  16. "go.uber.org/multierr"
  17. "go.uber.org/zap"
  18. "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/adapter"
  19. "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/sqlqueryreceiver/internal/metadata"
  20. )
  21. type logsReceiver struct {
  22. config *Config
  23. settings receiver.CreateSettings
  24. createConnection dbProviderFunc
  25. createClient clientProviderFunc
  26. queryReceivers []*logsQueryReceiver
  27. nextConsumer consumer.Logs
  28. isStarted bool
  29. collectionIntervalTicker *time.Ticker
  30. shutdownRequested chan struct{}
  31. id component.ID
  32. storageClient storage.Client
  33. obsrecv *receiverhelper.ObsReport
  34. }
  35. func newLogsReceiver(
  36. config *Config,
  37. settings receiver.CreateSettings,
  38. sqlOpenerFunc sqlOpenerFunc,
  39. createClient clientProviderFunc,
  40. nextConsumer consumer.Logs,
  41. ) (*logsReceiver, error) {
  42. obsr, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{
  43. ReceiverID: settings.ID,
  44. ReceiverCreateSettings: settings,
  45. })
  46. if err != nil {
  47. return nil, err
  48. }
  49. receiver := &logsReceiver{
  50. config: config,
  51. settings: settings,
  52. createConnection: func() (*sql.DB, error) {
  53. return sqlOpenerFunc(config.Driver, config.DataSource)
  54. },
  55. createClient: createClient,
  56. nextConsumer: nextConsumer,
  57. shutdownRequested: make(chan struct{}),
  58. id: settings.ID,
  59. obsrecv: obsr,
  60. }
  61. return receiver, nil
  62. }
  63. func (receiver *logsReceiver) Start(ctx context.Context, host component.Host) error {
  64. if receiver.isStarted {
  65. receiver.settings.Logger.Debug("requested start, but already started, ignoring.")
  66. return nil
  67. }
  68. receiver.settings.Logger.Debug("starting...")
  69. receiver.isStarted = true
  70. var err error
  71. receiver.storageClient, err = adapter.GetStorageClient(ctx, host, receiver.config.StorageID, receiver.settings.ID)
  72. if err != nil {
  73. return fmt.Errorf("error connecting to storage: %w", err)
  74. }
  75. err = receiver.createQueryReceivers()
  76. if err != nil {
  77. return err
  78. }
  79. for _, queryReceiver := range receiver.queryReceivers {
  80. err := queryReceiver.start(ctx)
  81. if err != nil {
  82. return err
  83. }
  84. }
  85. receiver.startCollecting()
  86. receiver.settings.Logger.Debug("started.")
  87. return nil
  88. }
  89. func (receiver *logsReceiver) createQueryReceivers() error {
  90. receiver.queryReceivers = nil
  91. for i, query := range receiver.config.Queries {
  92. if len(query.Logs) == 0 {
  93. continue
  94. }
  95. id := fmt.Sprintf("query-%d: %s", i, query.SQL)
  96. queryReceiver := newLogsQueryReceiver(
  97. id,
  98. query,
  99. receiver.createConnection,
  100. receiver.createClient,
  101. receiver.settings.Logger,
  102. receiver.storageClient,
  103. )
  104. receiver.queryReceivers = append(receiver.queryReceivers, queryReceiver)
  105. }
  106. return nil
  107. }
  108. func (receiver *logsReceiver) startCollecting() {
  109. receiver.collectionIntervalTicker = time.NewTicker(receiver.config.CollectionInterval)
  110. go func() {
  111. for {
  112. select {
  113. case <-receiver.collectionIntervalTicker.C:
  114. receiver.collect()
  115. case <-receiver.shutdownRequested:
  116. return
  117. }
  118. }
  119. }()
  120. }
  121. func (receiver *logsReceiver) collect() {
  122. logsChannel := make(chan plog.Logs)
  123. for _, queryReceiver := range receiver.queryReceivers {
  124. go func(queryReceiver *logsQueryReceiver) {
  125. logs, err := queryReceiver.collect(context.Background())
  126. if err != nil {
  127. receiver.settings.Logger.Error("error collecting logs", zap.Error(err), zap.String("query", queryReceiver.ID()))
  128. }
  129. logsChannel <- logs
  130. }(queryReceiver)
  131. }
  132. allLogs := plog.NewLogs()
  133. for range receiver.queryReceivers {
  134. logs := <-logsChannel
  135. logs.ResourceLogs().MoveAndAppendTo(allLogs.ResourceLogs())
  136. }
  137. logRecordCount := allLogs.LogRecordCount()
  138. if logRecordCount > 0 {
  139. ctx := receiver.obsrecv.StartLogsOp(context.Background())
  140. err := receiver.nextConsumer.ConsumeLogs(context.Background(), allLogs)
  141. receiver.obsrecv.EndLogsOp(ctx, metadata.Type, logRecordCount, err)
  142. if err != nil {
  143. receiver.settings.Logger.Error("failed to send logs: %w", zap.Error(err))
  144. }
  145. }
  146. }
  147. func (receiver *logsReceiver) Shutdown(ctx context.Context) error {
  148. if !receiver.isStarted {
  149. receiver.settings.Logger.Debug("Requested shutdown, but not started, ignoring.")
  150. return nil
  151. }
  152. receiver.settings.Logger.Debug("stopping...")
  153. receiver.stopCollecting()
  154. for _, queryReceiver := range receiver.queryReceivers {
  155. queryReceiver.shutdown(ctx)
  156. }
  157. var errors error
  158. if receiver.storageClient != nil {
  159. errors = multierr.Append(errors, receiver.storageClient.Close(ctx))
  160. }
  161. receiver.isStarted = false
  162. receiver.settings.Logger.Debug("stopped.")
  163. return errors
  164. }
  165. func (receiver *logsReceiver) stopCollecting() {
  166. if receiver.collectionIntervalTicker != nil {
  167. receiver.collectionIntervalTicker.Stop()
  168. }
  169. close(receiver.shutdownRequested)
  170. }
  171. type logsQueryReceiver struct {
  172. id string
  173. query Query
  174. createDb dbProviderFunc
  175. createClient clientProviderFunc
  176. logger *zap.Logger
  177. db *sql.DB
  178. client dbClient
  179. trackingValue string
  180. // TODO: Extract persistence into its own component
  181. storageClient storage.Client
  182. trackingValueStorageKey string
  183. }
  184. func newLogsQueryReceiver(
  185. id string,
  186. query Query,
  187. dbProviderFunc dbProviderFunc,
  188. clientProviderFunc clientProviderFunc,
  189. logger *zap.Logger,
  190. storageClient storage.Client,
  191. ) *logsQueryReceiver {
  192. queryReceiver := &logsQueryReceiver{
  193. id: id,
  194. query: query,
  195. createDb: dbProviderFunc,
  196. createClient: clientProviderFunc,
  197. logger: logger,
  198. storageClient: storageClient,
  199. }
  200. queryReceiver.trackingValue = queryReceiver.query.TrackingStartValue
  201. queryReceiver.trackingValueStorageKey = fmt.Sprintf("%s.%s", queryReceiver.id, "trackingValue")
  202. return queryReceiver
  203. }
  204. func (queryReceiver *logsQueryReceiver) ID() string {
  205. return queryReceiver.id
  206. }
  207. func (queryReceiver *logsQueryReceiver) start(ctx context.Context) error {
  208. var err error
  209. queryReceiver.db, err = queryReceiver.createDb()
  210. if err != nil {
  211. return fmt.Errorf("failed to open db connection: %w", err)
  212. }
  213. queryReceiver.client = queryReceiver.createClient(dbWrapper{queryReceiver.db}, queryReceiver.query.SQL, queryReceiver.logger)
  214. queryReceiver.trackingValue = queryReceiver.retrieveTrackingValue(ctx)
  215. return nil
  216. }
  217. // retrieveTrackingValue retrieves the tracking value from storage, if storage is configured.
  218. // Otherwise, it returns the tracking value configured in `tracking_start_value`.
  219. func (queryReceiver *logsQueryReceiver) retrieveTrackingValue(ctx context.Context) string {
  220. trackingValueFromConfig := queryReceiver.query.TrackingStartValue
  221. if queryReceiver.storageClient == nil {
  222. return trackingValueFromConfig
  223. }
  224. storedTrackingValueBytes, err := queryReceiver.storageClient.Get(ctx, queryReceiver.trackingValueStorageKey)
  225. if err != nil || storedTrackingValueBytes == nil {
  226. return trackingValueFromConfig
  227. }
  228. return string(storedTrackingValueBytes)
  229. }
  230. func (queryReceiver *logsQueryReceiver) collect(ctx context.Context) (plog.Logs, error) {
  231. logs := plog.NewLogs()
  232. var rows []stringMap
  233. var err error
  234. observedAt := pcommon.NewTimestampFromTime(time.Now())
  235. if queryReceiver.query.TrackingColumn != "" {
  236. rows, err = queryReceiver.client.queryRows(ctx, queryReceiver.trackingValue)
  237. } else {
  238. rows, err = queryReceiver.client.queryRows(ctx)
  239. }
  240. if err != nil {
  241. return logs, fmt.Errorf("error getting rows: %w", err)
  242. }
  243. var errs error
  244. scopeLogs := logs.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords()
  245. for logsConfigIndex, logsConfig := range queryReceiver.query.Logs {
  246. for _, row := range rows {
  247. logRecord := scopeLogs.AppendEmpty()
  248. rowToLog(row, logsConfig, logRecord)
  249. logRecord.SetObservedTimestamp(observedAt)
  250. if logsConfigIndex == 0 {
  251. errs = multierr.Append(errs, queryReceiver.storeTrackingValue(ctx, row))
  252. }
  253. }
  254. }
  255. return logs, nil
  256. }
  257. func (queryReceiver *logsQueryReceiver) storeTrackingValue(ctx context.Context, row stringMap) error {
  258. if queryReceiver.query.TrackingColumn == "" {
  259. return nil
  260. }
  261. queryReceiver.trackingValue = row[queryReceiver.query.TrackingColumn]
  262. if queryReceiver.storageClient != nil {
  263. err := queryReceiver.storageClient.Set(ctx, queryReceiver.trackingValueStorageKey, []byte(queryReceiver.trackingValue))
  264. if err != nil {
  265. return err
  266. }
  267. }
  268. return nil
  269. }
  270. func rowToLog(row stringMap, config LogsCfg, logRecord plog.LogRecord) {
  271. logRecord.Body().SetStr(row[config.BodyColumn])
  272. }
  273. func (queryReceiver *logsQueryReceiver) shutdown(_ context.Context) {
  274. }