exporter_test.go 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272
  1. // Copyright The OpenTelemetry Authors
  2. // SPDX-License-Identifier: Apache-2.0
  3. package mezmoexporter
  4. import (
  5. "context"
  6. "encoding/json"
  7. "fmt"
  8. "io"
  9. "net/http"
  10. "net/http/httptest"
  11. "net/url"
  12. "testing"
  13. "time"
  14. "github.com/stretchr/testify/assert"
  15. "github.com/stretchr/testify/require"
  16. "go.opentelemetry.io/collector/component"
  17. "go.opentelemetry.io/collector/component/componenttest"
  18. "go.opentelemetry.io/collector/pdata/pcommon"
  19. "go.opentelemetry.io/collector/pdata/plog"
  20. conventions "go.opentelemetry.io/collector/semconv/v1.6.1"
  21. "go.uber.org/zap"
  22. "go.uber.org/zap/zapcore"
  23. "go.uber.org/zap/zaptest/observer"
  24. )
  25. var buildInfo = component.BuildInfo{
  26. Version: "1.0",
  27. }
  28. func createSimpleLogData(numberOfLogs int) plog.Logs {
  29. logs := plog.NewLogs()
  30. logs.ResourceLogs().AppendEmpty() // Add an empty ResourceLogs
  31. rl := logs.ResourceLogs().AppendEmpty()
  32. rl.ScopeLogs().AppendEmpty() // Add an empty ScopeLogs
  33. sl := rl.ScopeLogs().AppendEmpty()
  34. for i := 0; i < numberOfLogs; i++ {
  35. ts := pcommon.Timestamp(int64(i) * time.Millisecond.Nanoseconds())
  36. logRecord := sl.LogRecords().AppendEmpty()
  37. logRecord.Body().SetStr("10byteslog")
  38. logRecord.Attributes().PutStr(conventions.AttributeServiceName, "myapp")
  39. logRecord.Attributes().PutStr("my-label", "myapp-type")
  40. logRecord.Attributes().PutStr(conventions.AttributeHostName, "myhost")
  41. logRecord.Attributes().PutStr("custom", "custom")
  42. logRecord.SetTimestamp(ts)
  43. }
  44. return logs
  45. }
  46. func createMinimalAttributesLogData(numberOfLogs int) plog.Logs {
  47. logs := plog.NewLogs()
  48. logs.ResourceLogs().AppendEmpty()
  49. rl := logs.ResourceLogs().AppendEmpty()
  50. rl.ScopeLogs().AppendEmpty()
  51. sl := rl.ScopeLogs().AppendEmpty()
  52. for i := 0; i < numberOfLogs; i++ {
  53. logRecord := sl.LogRecords().AppendEmpty()
  54. logRecord.Body().SetStr("minimal attribute log")
  55. }
  56. return logs
  57. }
  58. // Creates a logs set that exceeds the maximum message side we can send in one HTTP POST
  59. func createMaxLogData() plog.Logs {
  60. logs := plog.NewLogs()
  61. logs.ResourceLogs().AppendEmpty() // Add an empty ResourceLogs
  62. rl := logs.ResourceLogs().AppendEmpty()
  63. rl.ScopeLogs().AppendEmpty() // Add an empty ScopeLogs
  64. sl := rl.ScopeLogs().AppendEmpty()
  65. var lineLen = maxMessageSize
  66. var lineCnt = (maxBodySize / lineLen) * 2
  67. for i := 0; i < lineCnt; i++ {
  68. ts := pcommon.Timestamp(int64(i) * time.Millisecond.Nanoseconds())
  69. logRecord := sl.LogRecords().AppendEmpty()
  70. logRecord.Body().SetStr(randString(maxMessageSize))
  71. logRecord.SetTimestamp(ts)
  72. }
  73. return logs
  74. }
  75. func createSizedPayloadLogData(payloadSize int) plog.Logs {
  76. logs := plog.NewLogs()
  77. logs.ResourceLogs().AppendEmpty() // Add an empty ResourceLogs
  78. rl := logs.ResourceLogs().AppendEmpty()
  79. rl.ScopeLogs().AppendEmpty() // Add an empty ScopeLogs
  80. sl := rl.ScopeLogs().AppendEmpty()
  81. maxMsg := randString(payloadSize)
  82. ts := pcommon.Timestamp(0)
  83. logRecord := sl.LogRecords().AppendEmpty()
  84. logRecord.Body().SetStr(maxMsg)
  85. logRecord.SetTimestamp(ts)
  86. return logs
  87. }
  88. type testServer struct {
  89. instance *httptest.Server
  90. url string
  91. }
  92. type httpAssertionCallback func(req *http.Request, body mezmoLogBody) (int, string)
  93. type testServerParams struct {
  94. t *testing.T
  95. assertionsCallback httpAssertionCallback
  96. }
  97. // Creates an HTTP server to test log delivery payloads by applying a set of
  98. // assertions through the assertCB function.
  99. func createHTTPServer(params *testServerParams) testServer {
  100. httpServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
  101. body, err := io.ReadAll(r.Body)
  102. if err != nil {
  103. params.t.Fatal(err)
  104. }
  105. var logBody mezmoLogBody
  106. if err = json.Unmarshal(body, &logBody); err != nil {
  107. w.WriteHeader(http.StatusUnprocessableEntity)
  108. }
  109. statusCode, responseBody := params.assertionsCallback(r, logBody)
  110. w.WriteHeader(statusCode)
  111. if len(responseBody) > 0 {
  112. _, err = w.Write([]byte(responseBody))
  113. assert.NoError(params.t, err)
  114. }
  115. }))
  116. serverURL, err := url.Parse(httpServer.URL)
  117. assert.NoError(params.t, err)
  118. server := testServer{
  119. instance: httpServer,
  120. url: serverURL.String(),
  121. }
  122. return server
  123. }
  124. func createExporter(t *testing.T, config *Config, logger *zap.Logger) *mezmoExporter {
  125. exporter := newLogsExporter(config, componenttest.NewNopTelemetrySettings(), buildInfo, logger)
  126. require.NotNil(t, exporter)
  127. err := exporter.start(context.Background(), componenttest.NewNopHost())
  128. require.NoError(t, err)
  129. return exporter
  130. }
  131. func createLogger() (*zap.Logger, *observer.ObservedLogs) {
  132. core, logObserver := observer.New(zap.DebugLevel)
  133. logger := zap.New(core)
  134. return logger, logObserver
  135. }
  136. func TestLogsExporter(t *testing.T) {
  137. httpServerParams := testServerParams{
  138. t: t,
  139. assertionsCallback: func(req *http.Request, body mezmoLogBody) (int, string) {
  140. assert.Equal(t, "application/json", req.Header.Get("Content-Type"))
  141. assert.Equal(t, "mezmo-otel-exporter/"+buildInfo.Version, req.Header.Get("User-Agent"))
  142. return http.StatusOK, ""
  143. },
  144. }
  145. server := createHTTPServer(&httpServerParams)
  146. defer server.instance.Close()
  147. log, _ := createLogger()
  148. config := &Config{
  149. IngestURL: server.url,
  150. }
  151. exporter := createExporter(t, config, log)
  152. t.Run("Test simple log data", func(t *testing.T) {
  153. var logs = createSimpleLogData(3)
  154. err := exporter.pushLogData(context.Background(), logs)
  155. require.NoError(t, err)
  156. })
  157. t.Run("Test max message size", func(t *testing.T) {
  158. var logs = createSizedPayloadLogData(maxMessageSize)
  159. err := exporter.pushLogData(context.Background(), logs)
  160. require.NoError(t, err)
  161. })
  162. t.Run("Test max body size", func(t *testing.T) {
  163. var logs = createMaxLogData()
  164. err := exporter.pushLogData(context.Background(), logs)
  165. require.NoError(t, err)
  166. })
  167. }
  168. func TestAddsRequiredAttributes(t *testing.T) {
  169. httpServerParams := testServerParams{
  170. t: t,
  171. assertionsCallback: func(req *http.Request, body mezmoLogBody) (int, string) {
  172. assert.Equal(t, "application/json", req.Header.Get("Content-Type"))
  173. assert.Equal(t, "mezmo-otel-exporter/"+buildInfo.Version, req.Header.Get("User-Agent"))
  174. lines := body.Lines
  175. for _, line := range lines {
  176. assert.True(t, line.Timestamp > 0)
  177. assert.Equal(t, line.Level, "info")
  178. assert.Equal(t, line.App, "")
  179. assert.Equal(t, line.Line, "minimal attribute log")
  180. }
  181. return http.StatusOK, ""
  182. },
  183. }
  184. server := createHTTPServer(&httpServerParams)
  185. defer server.instance.Close()
  186. log, _ := createLogger()
  187. config := &Config{
  188. IngestURL: server.url,
  189. }
  190. exporter := createExporter(t, config, log)
  191. logs := createMinimalAttributesLogData(4)
  192. err := exporter.pushLogData(context.Background(), logs)
  193. require.NoError(t, err)
  194. }
  195. func Test404IngestError(t *testing.T) {
  196. log, logObserver := createLogger()
  197. httpServerParams := testServerParams{
  198. t: t,
  199. assertionsCallback: func(req *http.Request, body mezmoLogBody) (int, string) {
  200. return http.StatusNotFound, `{"foo":"bar"}`
  201. },
  202. }
  203. server := createHTTPServer(&httpServerParams)
  204. defer server.instance.Close()
  205. config := &Config{
  206. IngestURL: fmt.Sprintf("%s/foobar", server.url),
  207. }
  208. exporter := createExporter(t, config, log)
  209. logs := createSizedPayloadLogData(1)
  210. err := exporter.pushLogData(context.Background(), logs)
  211. require.NoError(t, err)
  212. assert.Equal(t, logObserver.Len(), 2)
  213. logLine := logObserver.All()[0]
  214. assert.Equal(t, logLine.Message, "got http status (/foobar): 404 Not Found")
  215. assert.Equal(t, logLine.Level, zapcore.ErrorLevel)
  216. logLine = logObserver.All()[1]
  217. assert.Equal(t, logLine.Message, "http response")
  218. assert.Equal(t, logLine.Level, zapcore.DebugLevel)
  219. responseField := logLine.Context[0]
  220. assert.Equal(t, responseField.Key, "response")
  221. assert.Equal(t, responseField.String, `{"foo":"bar"}`)
  222. }