sumo_marshaler.go 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162
  1. // Copyright The OpenTelemetry Authors
  2. // SPDX-License-Identifier: Apache-2.0
  3. package awss3exporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/awss3exporter"
  4. import (
  5. "bytes"
  6. "encoding/json"
  7. "errors"
  8. "fmt"
  9. "strconv"
  10. "strings"
  11. "go.opentelemetry.io/collector/pdata/pcommon"
  12. "go.opentelemetry.io/collector/pdata/plog"
  13. "go.opentelemetry.io/collector/pdata/pmetric"
  14. "go.opentelemetry.io/collector/pdata/ptrace"
  15. )
  16. const (
  17. logBodyKey = "log"
  18. )
  19. type sumoMarshaler struct{}
  20. func (*sumoMarshaler) format() string {
  21. return string(SumoIC)
  22. }
  23. func newSumoICMarshaler() sumoMarshaler {
  24. return sumoMarshaler{}
  25. }
  26. func logEntry(buf *bytes.Buffer, format string, a ...any) {
  27. buf.WriteString(fmt.Sprintf(format, a...))
  28. buf.WriteString("\n")
  29. }
  30. func attributeValueToString(v pcommon.Value) (string, error) {
  31. switch v.Type() {
  32. case pcommon.ValueTypeStr:
  33. return v.Str(), nil
  34. case pcommon.ValueTypeBool:
  35. return strconv.FormatBool(v.Bool()), nil
  36. case pcommon.ValueTypeBytes:
  37. return valueToJSON(v.Bytes().AsRaw())
  38. case pcommon.ValueTypeDouble:
  39. return strconv.FormatFloat(v.Double(), 'f', -1, 64), nil
  40. case pcommon.ValueTypeInt:
  41. return strconv.FormatInt(v.Int(), 10), nil
  42. case pcommon.ValueTypeSlice:
  43. return valueToJSON(v.Slice().AsRaw())
  44. case pcommon.ValueTypeMap:
  45. return valueToJSON(v.Map().AsRaw())
  46. case pcommon.ValueTypeEmpty:
  47. return "", nil
  48. default:
  49. return "", fmt.Errorf("unknown OpenTelemetry attribute value type: %q", v.Type())
  50. }
  51. }
  52. func valueToJSON(m any) (string, error) {
  53. jsonString := new(bytes.Buffer)
  54. enc := json.NewEncoder(jsonString)
  55. err := enc.Encode(m)
  56. return strings.Trim(jsonString.String(), "\n"), err
  57. }
  58. const (
  59. SourceCategoryKey = "_sourceCategory"
  60. SourceHostKey = "_sourceHost"
  61. SourceNameKey = "_sourceName"
  62. )
  63. func (sumoMarshaler) MarshalLogs(ld plog.Logs) ([]byte, error) {
  64. buf := bytes.Buffer{}
  65. rls := ld.ResourceLogs()
  66. for i := 0; i < rls.Len(); i++ {
  67. rl := rls.At(i)
  68. ra := rl.Resource().Attributes()
  69. sourceCategory, exists := ra.Get(SourceCategoryKey)
  70. if !exists {
  71. return nil, errors.New("_sourceCategory attribute does not exist")
  72. }
  73. sourceHost, exists := ra.Get(SourceHostKey)
  74. if !exists {
  75. return nil, errors.New("_sourceHost attribute does not exist")
  76. }
  77. sourceName, exists := ra.Get(SourceNameKey)
  78. if !exists {
  79. return nil, errors.New("_sourceName attribute does not exist")
  80. }
  81. sc, err := attributeValueToString(sourceCategory)
  82. if err != nil {
  83. return nil, err
  84. }
  85. sh, err := attributeValueToString(sourceHost)
  86. if err != nil {
  87. return nil, err
  88. }
  89. sn, err := attributeValueToString(sourceName)
  90. if err != nil {
  91. return nil, err
  92. }
  93. sc = strconv.Quote(sc)
  94. sh = strconv.Quote(sh)
  95. sn = strconv.Quote(sn)
  96. // Remove the source attributes so that they won't be included in "fields" value.
  97. ra.Remove(SourceCategoryKey)
  98. ra.Remove(SourceHostKey)
  99. ra.Remove(SourceNameKey)
  100. fields, err := valueToJSON(ra.AsRaw())
  101. if err != nil {
  102. return nil, err
  103. }
  104. ills := rl.ScopeLogs()
  105. for j := 0; j < ills.Len(); j++ {
  106. ils := ills.At(j)
  107. logs := ils.LogRecords()
  108. for k := 0; k < logs.Len(); k++ {
  109. lr := logs.At(k)
  110. dateVal := lr.ObservedTimestamp()
  111. message, err := getMessageJSON(lr)
  112. if err != nil {
  113. return nil, err
  114. }
  115. logEntry(&buf, "{\"date\": \"%s\",\"sourceName\":%s,\"sourceHost\":%s,\"sourceCategory\":%s,\"fields\":%s,\"message\":%s}",
  116. dateVal, sn, sh, sc, fields, message)
  117. }
  118. }
  119. }
  120. return buf.Bytes(), nil
  121. }
  122. func getMessageJSON(lr plog.LogRecord) (string, error) {
  123. // The "message" fields is a JSON created from combining the actual log body and log-level attributes,
  124. // where the log body is stored under "log" key.
  125. // More info:
  126. // https://help.sumologic.com/docs/send-data/opentelemetry-collector/data-source-configurations/additional-configurations-reference/#mapping-opentelemetry-concepts-to-sumo-logic
  127. message := new(bytes.Buffer)
  128. enc := json.NewEncoder(message)
  129. lr.Body().CopyTo(lr.Attributes().PutEmpty(logBodyKey))
  130. err := enc.Encode(lr.Attributes().AsRaw())
  131. return strings.Trim(message.String(), "\n"), err
  132. }
  133. func (s sumoMarshaler) MarshalTraces(_ ptrace.Traces) ([]byte, error) {
  134. return nil, fmt.Errorf("traces can't be marshaled into %s format", s.format())
  135. }
  136. func (s sumoMarshaler) MarshalMetrics(_ pmetric.Metrics) ([]byte, error) {
  137. return nil, fmt.Errorf("metrics can't be marshaled into %s format", s.format())
  138. }