123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387 |
- // Copyright The OpenTelemetry Authors
- // SPDX-License-Identifier: Apache-2.0
- package sumologicexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/sumologicexporter"
- import (
- "bytes"
- "context"
- "encoding/json"
- "errors"
- "fmt"
- "io"
- "net/http"
- "strings"
- "go.opentelemetry.io/collector/pdata/pcommon"
- "go.opentelemetry.io/collector/pdata/plog"
- "go.opentelemetry.io/collector/pdata/pmetric"
- )
- type appendResponse struct {
- // sent gives information if the data was sent or not
- sent bool
- // appended keeps state of appending new log line to the body
- appended bool
- }
- // metricPair represents information required to send one metric to the Sumo Logic
- type metricPair struct {
- attributes pcommon.Map
- metric pmetric.Metric
- }
- type sender struct {
- logBuffer []plog.LogRecord
- metricBuffer []metricPair
- config *Config
- client *http.Client
- filter filter
- sources sourceFormats
- compressor compressor
- prometheusFormatter prometheusFormatter
- graphiteFormatter graphiteFormatter
- }
- const (
- logKey string = "log"
- // maxBufferSize defines size of the logBuffer (maximum number of plog.LogRecord entries)
- maxBufferSize int = 1024 * 1024
- headerContentType string = "Content-Type"
- headerContentEncoding string = "Content-Encoding"
- headerClient string = "X-Sumo-Client"
- headerHost string = "X-Sumo-Host"
- headerName string = "X-Sumo-Name"
- headerCategory string = "X-Sumo-Category"
- headerFields string = "X-Sumo-Fields"
- contentTypeLogs string = "application/x-www-form-urlencoded"
- contentTypePrometheus string = "application/vnd.sumologic.prometheus"
- contentTypeCarbon2 string = "application/vnd.sumologic.carbon2"
- contentTypeGraphite string = "application/vnd.sumologic.graphite"
- contentEncodingGzip string = "gzip"
- contentEncodingDeflate string = "deflate"
- )
- func newAppendResponse() appendResponse {
- return appendResponse{
- appended: true,
- }
- }
- func newSender(
- cfg *Config,
- cl *http.Client,
- f filter,
- s sourceFormats,
- c compressor,
- pf prometheusFormatter,
- gf graphiteFormatter,
- ) *sender {
- return &sender{
- config: cfg,
- client: cl,
- filter: f,
- sources: s,
- compressor: c,
- prometheusFormatter: pf,
- graphiteFormatter: gf,
- }
- }
- // send sends data to sumologic
- func (s *sender) send(ctx context.Context, pipeline PipelineType, body io.Reader, flds fields) error {
- data, err := s.compressor.compress(body)
- if err != nil {
- return err
- }
- req, err := http.NewRequestWithContext(ctx, http.MethodPost, s.config.HTTPClientSettings.Endpoint, data)
- if err != nil {
- return err
- }
- // Add headers
- switch s.config.CompressEncoding {
- case GZIPCompression:
- req.Header.Set(headerContentEncoding, contentEncodingGzip)
- case DeflateCompression:
- req.Header.Set(headerContentEncoding, contentEncodingDeflate)
- case NoCompression:
- default:
- return fmt.Errorf("invalid content encoding: %s", s.config.CompressEncoding)
- }
- req.Header.Add(headerClient, s.config.Client)
- if s.sources.host.isSet() {
- req.Header.Add(headerHost, s.sources.host.format(flds))
- }
- if s.sources.name.isSet() {
- req.Header.Add(headerName, s.sources.name.format(flds))
- }
- if s.sources.category.isSet() {
- req.Header.Add(headerCategory, s.sources.category.format(flds))
- }
- switch pipeline {
- case LogsPipeline:
- req.Header.Add(headerContentType, contentTypeLogs)
- req.Header.Add(headerFields, flds.string())
- case MetricsPipeline:
- switch s.config.MetricFormat {
- case PrometheusFormat:
- req.Header.Add(headerContentType, contentTypePrometheus)
- case Carbon2Format:
- req.Header.Add(headerContentType, contentTypeCarbon2)
- case GraphiteFormat:
- req.Header.Add(headerContentType, contentTypeGraphite)
- default:
- return fmt.Errorf("unsupported metrics format: %s", s.config.MetricFormat)
- }
- default:
- return errors.New("unexpected pipeline")
- }
- resp, err := s.client.Do(req)
- if err != nil {
- return err
- }
- if resp.StatusCode < 200 || resp.StatusCode >= 400 {
- return fmt.Errorf("error during sending data: %s", resp.Status)
- }
- return nil
- }
- // logToText converts LogRecord to a plain text line, returns it and error eventually
- func (s *sender) logToText(record plog.LogRecord) string {
- return record.Body().AsString()
- }
- // logToJSON converts LogRecord to a json line, returns it and error eventually
- func (s *sender) logToJSON(record plog.LogRecord) (string, error) {
- data := s.filter.filterOut(record.Attributes())
- record.Body().CopyTo(data.orig.PutEmpty(logKey))
- nextLine, err := json.Marshal(data.orig.AsRaw())
- if err != nil {
- return "", err
- }
- return bytes.NewBuffer(nextLine).String(), nil
- }
- // sendLogs sends log records from the logBuffer formatted according
- // to configured LogFormat and as the result of execution
- // returns array of records which has not been sent correctly and error
- func (s *sender) sendLogs(ctx context.Context, flds fields) ([]plog.LogRecord, error) {
- var (
- body strings.Builder
- errs []error
- droppedRecords []plog.LogRecord
- currentRecords []plog.LogRecord
- )
- for _, record := range s.logBuffer {
- var formattedLine string
- var err error
- switch s.config.LogFormat {
- case TextFormat:
- formattedLine = s.logToText(record)
- case JSONFormat:
- formattedLine, err = s.logToJSON(record)
- default:
- err = errors.New("unexpected log format")
- }
- if err != nil {
- droppedRecords = append(droppedRecords, record)
- errs = append(errs, err)
- continue
- }
- ar, err := s.appendAndSend(ctx, formattedLine, LogsPipeline, &body, flds)
- if err != nil {
- errs = append(errs, err)
- if ar.sent {
- droppedRecords = append(droppedRecords, currentRecords...)
- }
- if !ar.appended {
- droppedRecords = append(droppedRecords, record)
- }
- }
- // If data was sent, cleanup the currentTimeSeries counter
- if ar.sent {
- currentRecords = currentRecords[:0]
- }
- // If log has been appended to body, increment the currentTimeSeries
- if ar.appended {
- currentRecords = append(currentRecords, record)
- }
- }
- if body.Len() > 0 {
- if err := s.send(ctx, LogsPipeline, strings.NewReader(body.String()), flds); err != nil {
- errs = append(errs, err)
- droppedRecords = append(droppedRecords, currentRecords...)
- }
- }
- return droppedRecords, errors.Join(errs...)
- }
- // sendMetrics sends metrics in right format basing on the s.config.MetricFormat
- func (s *sender) sendMetrics(ctx context.Context, flds fields) ([]metricPair, error) {
- var (
- body strings.Builder
- errs []error
- droppedRecords []metricPair
- currentRecords []metricPair
- )
- for _, record := range s.metricBuffer {
- var formattedLine string
- var err error
- switch s.config.MetricFormat {
- case PrometheusFormat:
- formattedLine = s.prometheusFormatter.metric2String(record)
- case Carbon2Format:
- formattedLine = carbon2Metric2String(record)
- case GraphiteFormat:
- formattedLine = s.graphiteFormatter.metric2String(record)
- default:
- err = fmt.Errorf("unexpected metric format: %s", s.config.MetricFormat)
- }
- if err != nil {
- droppedRecords = append(droppedRecords, record)
- errs = append(errs, err)
- continue
- }
- ar, err := s.appendAndSend(ctx, formattedLine, MetricsPipeline, &body, flds)
- if err != nil {
- errs = append(errs, err)
- if ar.sent {
- droppedRecords = append(droppedRecords, currentRecords...)
- }
- if !ar.appended {
- droppedRecords = append(droppedRecords, record)
- }
- }
- // If data was sent, cleanup the currentTimeSeries counter
- if ar.sent {
- currentRecords = currentRecords[:0]
- }
- // If log has been appended to body, increment the currentTimeSeries
- if ar.appended {
- currentRecords = append(currentRecords, record)
- }
- }
- if body.Len() > 0 {
- if err := s.send(ctx, MetricsPipeline, strings.NewReader(body.String()), flds); err != nil {
- errs = append(errs, err)
- droppedRecords = append(droppedRecords, currentRecords...)
- }
- }
- return droppedRecords, errors.Join(errs...)
- }
- // appendAndSend appends line to the request body that will be sent and sends
- // the accumulated data if the internal logBuffer has been filled (with maxBufferSize elements).
- // It returns appendResponse
- func (s *sender) appendAndSend(
- ctx context.Context,
- line string,
- pipeline PipelineType,
- body *strings.Builder,
- flds fields,
- ) (appendResponse, error) {
- var errs []error
- ar := newAppendResponse()
- if body.Len() > 0 && body.Len()+len(line) >= s.config.MaxRequestBodySize {
- ar.sent = true
- errs = append(errs, s.send(ctx, pipeline, strings.NewReader(body.String()), flds))
- body.Reset()
- }
- if body.Len() > 0 {
- // Do not add newline if the body is empty
- if _, err := body.WriteString("\n"); err != nil {
- errs = append(errs, err)
- ar.appended = false
- }
- }
- if ar.appended {
- // Do not append new line if separator was not appended
- if _, err := body.WriteString(line); err != nil {
- errs = append(errs, err)
- ar.appended = false
- }
- }
- return ar, errors.Join(errs...)
- }
- // cleanLogsBuffer zeroes logBuffer
- func (s *sender) cleanLogsBuffer() {
- s.logBuffer = (s.logBuffer)[:0]
- }
- // batchLog adds log to the logBuffer and flushes them if logBuffer is full to avoid overflow
- // returns list of log records which were not sent successfully
- func (s *sender) batchLog(ctx context.Context, log plog.LogRecord, metadata fields) ([]plog.LogRecord, error) {
- s.logBuffer = append(s.logBuffer, log)
- if s.countLogs() >= maxBufferSize {
- dropped, err := s.sendLogs(ctx, metadata)
- s.cleanLogsBuffer()
- return dropped, err
- }
- return nil, nil
- }
- // countLogs returns number of logs in logBuffer
- func (s *sender) countLogs() int {
- return len(s.logBuffer)
- }
- // cleanMetricBuffer zeroes metricBuffer
- func (s *sender) cleanMetricBuffer() {
- s.metricBuffer = (s.metricBuffer)[:0]
- }
- // batchMetric adds metric to the metricBuffer and flushes them if metricBuffer is full to avoid overflow
- // returns list of metric records which were not sent successfully
- func (s *sender) batchMetric(ctx context.Context, metric metricPair, metadata fields) ([]metricPair, error) {
- s.metricBuffer = append(s.metricBuffer, metric)
- if s.countMetrics() >= maxBufferSize {
- dropped, err := s.sendMetrics(ctx, metadata)
- s.cleanMetricBuffer()
- return dropped, err
- }
- return nil, nil
- }
- // countMetrics returns number of metrics in metricBuffer
- func (s *sender) countMetrics() int {
- return len(s.metricBuffer)
- }
|