sender.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387
  1. // Copyright The OpenTelemetry Authors
  2. // SPDX-License-Identifier: Apache-2.0
  3. package sumologicexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/sumologicexporter"
  4. import (
  5. "bytes"
  6. "context"
  7. "encoding/json"
  8. "errors"
  9. "fmt"
  10. "io"
  11. "net/http"
  12. "strings"
  13. "go.opentelemetry.io/collector/pdata/pcommon"
  14. "go.opentelemetry.io/collector/pdata/plog"
  15. "go.opentelemetry.io/collector/pdata/pmetric"
  16. )
  17. type appendResponse struct {
  18. // sent gives information if the data was sent or not
  19. sent bool
  20. // appended keeps state of appending new log line to the body
  21. appended bool
  22. }
  23. // metricPair represents information required to send one metric to the Sumo Logic
  24. type metricPair struct {
  25. attributes pcommon.Map
  26. metric pmetric.Metric
  27. }
  28. type sender struct {
  29. logBuffer []plog.LogRecord
  30. metricBuffer []metricPair
  31. config *Config
  32. client *http.Client
  33. filter filter
  34. sources sourceFormats
  35. compressor compressor
  36. prometheusFormatter prometheusFormatter
  37. graphiteFormatter graphiteFormatter
  38. }
  39. const (
  40. logKey string = "log"
  41. // maxBufferSize defines size of the logBuffer (maximum number of plog.LogRecord entries)
  42. maxBufferSize int = 1024 * 1024
  43. headerContentType string = "Content-Type"
  44. headerContentEncoding string = "Content-Encoding"
  45. headerClient string = "X-Sumo-Client"
  46. headerHost string = "X-Sumo-Host"
  47. headerName string = "X-Sumo-Name"
  48. headerCategory string = "X-Sumo-Category"
  49. headerFields string = "X-Sumo-Fields"
  50. contentTypeLogs string = "application/x-www-form-urlencoded"
  51. contentTypePrometheus string = "application/vnd.sumologic.prometheus"
  52. contentTypeCarbon2 string = "application/vnd.sumologic.carbon2"
  53. contentTypeGraphite string = "application/vnd.sumologic.graphite"
  54. contentEncodingGzip string = "gzip"
  55. contentEncodingDeflate string = "deflate"
  56. )
  57. func newAppendResponse() appendResponse {
  58. return appendResponse{
  59. appended: true,
  60. }
  61. }
  62. func newSender(
  63. cfg *Config,
  64. cl *http.Client,
  65. f filter,
  66. s sourceFormats,
  67. c compressor,
  68. pf prometheusFormatter,
  69. gf graphiteFormatter,
  70. ) *sender {
  71. return &sender{
  72. config: cfg,
  73. client: cl,
  74. filter: f,
  75. sources: s,
  76. compressor: c,
  77. prometheusFormatter: pf,
  78. graphiteFormatter: gf,
  79. }
  80. }
  81. // send sends data to sumologic
  82. func (s *sender) send(ctx context.Context, pipeline PipelineType, body io.Reader, flds fields) error {
  83. data, err := s.compressor.compress(body)
  84. if err != nil {
  85. return err
  86. }
  87. req, err := http.NewRequestWithContext(ctx, http.MethodPost, s.config.HTTPClientSettings.Endpoint, data)
  88. if err != nil {
  89. return err
  90. }
  91. // Add headers
  92. switch s.config.CompressEncoding {
  93. case GZIPCompression:
  94. req.Header.Set(headerContentEncoding, contentEncodingGzip)
  95. case DeflateCompression:
  96. req.Header.Set(headerContentEncoding, contentEncodingDeflate)
  97. case NoCompression:
  98. default:
  99. return fmt.Errorf("invalid content encoding: %s", s.config.CompressEncoding)
  100. }
  101. req.Header.Add(headerClient, s.config.Client)
  102. if s.sources.host.isSet() {
  103. req.Header.Add(headerHost, s.sources.host.format(flds))
  104. }
  105. if s.sources.name.isSet() {
  106. req.Header.Add(headerName, s.sources.name.format(flds))
  107. }
  108. if s.sources.category.isSet() {
  109. req.Header.Add(headerCategory, s.sources.category.format(flds))
  110. }
  111. switch pipeline {
  112. case LogsPipeline:
  113. req.Header.Add(headerContentType, contentTypeLogs)
  114. req.Header.Add(headerFields, flds.string())
  115. case MetricsPipeline:
  116. switch s.config.MetricFormat {
  117. case PrometheusFormat:
  118. req.Header.Add(headerContentType, contentTypePrometheus)
  119. case Carbon2Format:
  120. req.Header.Add(headerContentType, contentTypeCarbon2)
  121. case GraphiteFormat:
  122. req.Header.Add(headerContentType, contentTypeGraphite)
  123. default:
  124. return fmt.Errorf("unsupported metrics format: %s", s.config.MetricFormat)
  125. }
  126. default:
  127. return errors.New("unexpected pipeline")
  128. }
  129. resp, err := s.client.Do(req)
  130. if err != nil {
  131. return err
  132. }
  133. if resp.StatusCode < 200 || resp.StatusCode >= 400 {
  134. return fmt.Errorf("error during sending data: %s", resp.Status)
  135. }
  136. return nil
  137. }
  138. // logToText converts LogRecord to a plain text line, returns it and error eventually
  139. func (s *sender) logToText(record plog.LogRecord) string {
  140. return record.Body().AsString()
  141. }
  142. // logToJSON converts LogRecord to a json line, returns it and error eventually
  143. func (s *sender) logToJSON(record plog.LogRecord) (string, error) {
  144. data := s.filter.filterOut(record.Attributes())
  145. record.Body().CopyTo(data.orig.PutEmpty(logKey))
  146. nextLine, err := json.Marshal(data.orig.AsRaw())
  147. if err != nil {
  148. return "", err
  149. }
  150. return bytes.NewBuffer(nextLine).String(), nil
  151. }
  152. // sendLogs sends log records from the logBuffer formatted according
  153. // to configured LogFormat and as the result of execution
  154. // returns array of records which has not been sent correctly and error
  155. func (s *sender) sendLogs(ctx context.Context, flds fields) ([]plog.LogRecord, error) {
  156. var (
  157. body strings.Builder
  158. errs []error
  159. droppedRecords []plog.LogRecord
  160. currentRecords []plog.LogRecord
  161. )
  162. for _, record := range s.logBuffer {
  163. var formattedLine string
  164. var err error
  165. switch s.config.LogFormat {
  166. case TextFormat:
  167. formattedLine = s.logToText(record)
  168. case JSONFormat:
  169. formattedLine, err = s.logToJSON(record)
  170. default:
  171. err = errors.New("unexpected log format")
  172. }
  173. if err != nil {
  174. droppedRecords = append(droppedRecords, record)
  175. errs = append(errs, err)
  176. continue
  177. }
  178. ar, err := s.appendAndSend(ctx, formattedLine, LogsPipeline, &body, flds)
  179. if err != nil {
  180. errs = append(errs, err)
  181. if ar.sent {
  182. droppedRecords = append(droppedRecords, currentRecords...)
  183. }
  184. if !ar.appended {
  185. droppedRecords = append(droppedRecords, record)
  186. }
  187. }
  188. // If data was sent, cleanup the currentTimeSeries counter
  189. if ar.sent {
  190. currentRecords = currentRecords[:0]
  191. }
  192. // If log has been appended to body, increment the currentTimeSeries
  193. if ar.appended {
  194. currentRecords = append(currentRecords, record)
  195. }
  196. }
  197. if body.Len() > 0 {
  198. if err := s.send(ctx, LogsPipeline, strings.NewReader(body.String()), flds); err != nil {
  199. errs = append(errs, err)
  200. droppedRecords = append(droppedRecords, currentRecords...)
  201. }
  202. }
  203. return droppedRecords, errors.Join(errs...)
  204. }
  205. // sendMetrics sends metrics in right format basing on the s.config.MetricFormat
  206. func (s *sender) sendMetrics(ctx context.Context, flds fields) ([]metricPair, error) {
  207. var (
  208. body strings.Builder
  209. errs []error
  210. droppedRecords []metricPair
  211. currentRecords []metricPair
  212. )
  213. for _, record := range s.metricBuffer {
  214. var formattedLine string
  215. var err error
  216. switch s.config.MetricFormat {
  217. case PrometheusFormat:
  218. formattedLine = s.prometheusFormatter.metric2String(record)
  219. case Carbon2Format:
  220. formattedLine = carbon2Metric2String(record)
  221. case GraphiteFormat:
  222. formattedLine = s.graphiteFormatter.metric2String(record)
  223. default:
  224. err = fmt.Errorf("unexpected metric format: %s", s.config.MetricFormat)
  225. }
  226. if err != nil {
  227. droppedRecords = append(droppedRecords, record)
  228. errs = append(errs, err)
  229. continue
  230. }
  231. ar, err := s.appendAndSend(ctx, formattedLine, MetricsPipeline, &body, flds)
  232. if err != nil {
  233. errs = append(errs, err)
  234. if ar.sent {
  235. droppedRecords = append(droppedRecords, currentRecords...)
  236. }
  237. if !ar.appended {
  238. droppedRecords = append(droppedRecords, record)
  239. }
  240. }
  241. // If data was sent, cleanup the currentTimeSeries counter
  242. if ar.sent {
  243. currentRecords = currentRecords[:0]
  244. }
  245. // If log has been appended to body, increment the currentTimeSeries
  246. if ar.appended {
  247. currentRecords = append(currentRecords, record)
  248. }
  249. }
  250. if body.Len() > 0 {
  251. if err := s.send(ctx, MetricsPipeline, strings.NewReader(body.String()), flds); err != nil {
  252. errs = append(errs, err)
  253. droppedRecords = append(droppedRecords, currentRecords...)
  254. }
  255. }
  256. return droppedRecords, errors.Join(errs...)
  257. }
  258. // appendAndSend appends line to the request body that will be sent and sends
  259. // the accumulated data if the internal logBuffer has been filled (with maxBufferSize elements).
  260. // It returns appendResponse
  261. func (s *sender) appendAndSend(
  262. ctx context.Context,
  263. line string,
  264. pipeline PipelineType,
  265. body *strings.Builder,
  266. flds fields,
  267. ) (appendResponse, error) {
  268. var errs []error
  269. ar := newAppendResponse()
  270. if body.Len() > 0 && body.Len()+len(line) >= s.config.MaxRequestBodySize {
  271. ar.sent = true
  272. errs = append(errs, s.send(ctx, pipeline, strings.NewReader(body.String()), flds))
  273. body.Reset()
  274. }
  275. if body.Len() > 0 {
  276. // Do not add newline if the body is empty
  277. if _, err := body.WriteString("\n"); err != nil {
  278. errs = append(errs, err)
  279. ar.appended = false
  280. }
  281. }
  282. if ar.appended {
  283. // Do not append new line if separator was not appended
  284. if _, err := body.WriteString(line); err != nil {
  285. errs = append(errs, err)
  286. ar.appended = false
  287. }
  288. }
  289. return ar, errors.Join(errs...)
  290. }
  291. // cleanLogsBuffer zeroes logBuffer
  292. func (s *sender) cleanLogsBuffer() {
  293. s.logBuffer = (s.logBuffer)[:0]
  294. }
  295. // batchLog adds log to the logBuffer and flushes them if logBuffer is full to avoid overflow
  296. // returns list of log records which were not sent successfully
  297. func (s *sender) batchLog(ctx context.Context, log plog.LogRecord, metadata fields) ([]plog.LogRecord, error) {
  298. s.logBuffer = append(s.logBuffer, log)
  299. if s.countLogs() >= maxBufferSize {
  300. dropped, err := s.sendLogs(ctx, metadata)
  301. s.cleanLogsBuffer()
  302. return dropped, err
  303. }
  304. return nil, nil
  305. }
  306. // countLogs returns number of logs in logBuffer
  307. func (s *sender) countLogs() int {
  308. return len(s.logBuffer)
  309. }
  310. // cleanMetricBuffer zeroes metricBuffer
  311. func (s *sender) cleanMetricBuffer() {
  312. s.metricBuffer = (s.metricBuffer)[:0]
  313. }
  314. // batchMetric adds metric to the metricBuffer and flushes them if metricBuffer is full to avoid overflow
  315. // returns list of metric records which were not sent successfully
  316. func (s *sender) batchMetric(ctx context.Context, metric metricPair, metadata fields) ([]metricPair, error) {
  317. s.metricBuffer = append(s.metricBuffer, metric)
  318. if s.countMetrics() >= maxBufferSize {
  319. dropped, err := s.sendMetrics(ctx, metadata)
  320. s.cleanMetricBuffer()
  321. return dropped, err
  322. }
  323. return nil, nil
  324. }
  325. // countMetrics returns number of metrics in metricBuffer
  326. func (s *sender) countMetrics() int {
  327. return len(s.metricBuffer)
  328. }