loki_test.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382
  1. // Copyright The OpenTelemetry Authors
  2. // SPDX-License-Identifier: Apache-2.0
  3. package lokireceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/lokireceiver"
  4. import (
  5. "bytes"
  6. "compress/gzip"
  7. "compress/zlib"
  8. "context"
  9. "fmt"
  10. "net"
  11. "net/http"
  12. "testing"
  13. "time"
  14. "github.com/gogo/protobuf/proto"
  15. "github.com/golang/snappy"
  16. "github.com/grafana/loki/pkg/push"
  17. "github.com/stretchr/testify/assert"
  18. "github.com/stretchr/testify/require"
  19. "go.opentelemetry.io/collector/component/componenttest"
  20. "go.opentelemetry.io/collector/config/configgrpc"
  21. "go.opentelemetry.io/collector/config/confighttp"
  22. "go.opentelemetry.io/collector/config/confignet"
  23. "go.opentelemetry.io/collector/consumer/consumertest"
  24. "go.opentelemetry.io/collector/pdata/pcommon"
  25. "go.opentelemetry.io/collector/pdata/plog"
  26. "go.opentelemetry.io/collector/receiver/receivertest"
  27. "google.golang.org/grpc"
  28. "google.golang.org/grpc/credentials/insecure"
  29. "github.com/open-telemetry/opentelemetry-collector-contrib/internal/common/testutil"
  30. "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest/plogtest"
  31. )
  32. func sendToCollector(endpoint string, contentType string, contentEncoding string, body []byte) error {
  33. var buf bytes.Buffer
  34. switch contentEncoding {
  35. case "":
  36. buf = *bytes.NewBuffer(body)
  37. case "snappy":
  38. if contentType == jsonContentType {
  39. buf = *bytes.NewBuffer(body)
  40. } else {
  41. data := snappy.Encode(nil, body)
  42. buf = *bytes.NewBuffer(data)
  43. }
  44. case "gzip":
  45. zw := gzip.NewWriter(&buf)
  46. if _, err := zw.Write(body); err != nil {
  47. return err
  48. }
  49. if err := zw.Close(); err != nil {
  50. return err
  51. }
  52. case "deflate":
  53. fw := zlib.NewWriter(&buf)
  54. if _, err := fw.Write(body); err != nil {
  55. return nil
  56. }
  57. if err := fw.Close(); err != nil {
  58. return err
  59. }
  60. }
  61. req, err := http.NewRequest("POST", endpoint, &buf)
  62. if err != nil {
  63. return err
  64. }
  65. req.Header.Set("Content-Type", contentType)
  66. req.Header.Set("Content-Encoding", contentEncoding)
  67. resp, err := http.DefaultClient.Do(req)
  68. if err != nil {
  69. return err
  70. }
  71. resp.Body.Close()
  72. if resp.StatusCode < 200 || resp.StatusCode >= 300 {
  73. return fmt.Errorf("failed to upload logs; HTTP status code: %d", resp.StatusCode)
  74. }
  75. return nil
  76. }
  77. func startGRPCServer(t *testing.T) (*grpc.ClientConn, *consumertest.LogsSink) {
  78. config := &Config{
  79. Protocols: Protocols{
  80. GRPC: &configgrpc.GRPCServerSettings{
  81. NetAddr: confignet.NetAddr{
  82. Endpoint: testutil.GetAvailableLocalAddress(t),
  83. Transport: "tcp",
  84. },
  85. },
  86. },
  87. KeepTimestamp: true,
  88. }
  89. sink := new(consumertest.LogsSink)
  90. set := receivertest.NewNopCreateSettings()
  91. lr, err := newLokiReceiver(config, sink, set)
  92. require.NoError(t, err)
  93. require.NoError(t, lr.Start(context.Background(), componenttest.NewNopHost()))
  94. t.Cleanup(func() { require.NoError(t, lr.Shutdown(context.Background())) })
  95. conn, err := grpc.Dial(config.GRPC.NetAddr.Endpoint, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock())
  96. require.NoError(t, err)
  97. return conn, sink
  98. }
  99. func startHTTPServer(t *testing.T) (string, *consumertest.LogsSink) {
  100. addr := testutil.GetAvailableLocalAddress(t)
  101. config := &Config{
  102. Protocols: Protocols{
  103. HTTP: &confighttp.HTTPServerSettings{
  104. Endpoint: addr,
  105. },
  106. },
  107. KeepTimestamp: true,
  108. }
  109. sink := new(consumertest.LogsSink)
  110. set := receivertest.NewNopCreateSettings()
  111. lr, err := newLokiReceiver(config, sink, set)
  112. require.NoError(t, err)
  113. require.NoError(t, lr.Start(context.Background(), componenttest.NewNopHost()))
  114. t.Cleanup(func() { require.NoError(t, lr.Shutdown(context.Background())) })
  115. return addr, sink
  116. }
  117. func TestSendingProtobufPushRequestToHTTPEndpoint(t *testing.T) {
  118. tests := []struct {
  119. name string
  120. contentEncoding string
  121. contentType string
  122. body *push.PushRequest
  123. expected plog.Logs
  124. err error
  125. }{
  126. {
  127. name: "Sending contentEncoding=\"snappy\" contentType=application/x-protobuf to http endpoint",
  128. contentEncoding: "snappy",
  129. contentType: pbContentType,
  130. body: &push.PushRequest{
  131. Streams: []push.Stream{
  132. {
  133. Labels: "{foo=\"bar\"}",
  134. Entries: []push.Entry{
  135. {
  136. Timestamp: time.Unix(0, 1676888496000000000),
  137. Line: "logline 1",
  138. },
  139. },
  140. },
  141. },
  142. },
  143. expected: generateLogs([]Log{
  144. {
  145. Timestamp: 1676888496000000000,
  146. Attributes: map[string]any{
  147. "foo": "bar",
  148. },
  149. Body: pcommon.NewValueStr("logline 1"),
  150. },
  151. }),
  152. err: nil,
  153. },
  154. }
  155. // Start http server
  156. addr, sink := startHTTPServer(t)
  157. for i, tt := range tests {
  158. t.Run(tt.name, func(t *testing.T) {
  159. // Send push request to the Loki receiver.
  160. _, port, _ := net.SplitHostPort(addr)
  161. collectorAddr := fmt.Sprintf("http://localhost:%s/loki/api/v1/push", port)
  162. buf, err := proto.Marshal(tt.body)
  163. require.NoError(t, err)
  164. require.NoError(t, sendToCollector(collectorAddr, tt.contentType, tt.contentEncoding, buf))
  165. gotLogs := sink.AllLogs()
  166. require.NoError(t, plogtest.CompareLogs(tt.expected, gotLogs[i], plogtest.IgnoreObservedTimestamp()))
  167. })
  168. }
  169. }
  170. func TestSendingPushRequestToHTTPEndpoint(t *testing.T) {
  171. tests := []struct {
  172. name string
  173. contentEncoding string
  174. contentType string
  175. body []byte
  176. expected plog.Logs
  177. err error
  178. }{
  179. {
  180. name: "Sending contentEncoding=\"\" contentType=application/json to http endpoint",
  181. contentEncoding: "",
  182. contentType: jsonContentType,
  183. body: []byte(`{"streams": [{"stream": {"foo": "bar"},"values": [[ "1676888496000000000", "logline 1" ], [ "1676888497000000000", "logline 2" ]]}]}`),
  184. expected: generateLogs([]Log{
  185. {
  186. Timestamp: 1676888496000000000,
  187. Attributes: map[string]any{
  188. "foo": "bar",
  189. },
  190. Body: pcommon.NewValueStr("logline 1"),
  191. },
  192. {
  193. Timestamp: 1676888497000000000,
  194. Attributes: map[string]any{
  195. "foo": "bar",
  196. },
  197. Body: pcommon.NewValueStr("logline 2"),
  198. },
  199. }),
  200. err: nil,
  201. },
  202. {
  203. name: "Sending contentEncoding=\"snappy\" contentType=application/json to http endpoint",
  204. contentEncoding: "snappy",
  205. contentType: jsonContentType,
  206. body: []byte(`{"streams": [{"stream": {"foo": "bar"},"values": [[ "1676888496000000000", "logline 1" ], [ "1676888497000000000", "logline 2" ]]}]}`),
  207. expected: generateLogs([]Log{
  208. {
  209. Timestamp: 1676888496000000000,
  210. Attributes: map[string]any{
  211. "foo": "bar",
  212. },
  213. Body: pcommon.NewValueStr("logline 1"),
  214. },
  215. {
  216. Timestamp: 1676888497000000000,
  217. Attributes: map[string]any{
  218. "foo": "bar",
  219. },
  220. Body: pcommon.NewValueStr("logline 2"),
  221. },
  222. }),
  223. err: nil,
  224. },
  225. {
  226. name: "Sending contentEncoding=\"gzip\" contentType=application/json to http endpoint",
  227. contentEncoding: "gzip",
  228. contentType: jsonContentType,
  229. body: []byte(`{"streams": [{"stream": {"foo": "bar"},"values": [[ "1676888496000000000", "logline 1" ], [ "1676888497000000000", "logline 2" ]]}]}`),
  230. expected: generateLogs([]Log{
  231. {
  232. Timestamp: 1676888496000000000,
  233. Attributes: map[string]any{
  234. "foo": "bar",
  235. },
  236. Body: pcommon.NewValueStr("logline 1"),
  237. },
  238. {
  239. Timestamp: 1676888497000000000,
  240. Attributes: map[string]any{
  241. "foo": "bar",
  242. },
  243. Body: pcommon.NewValueStr("logline 2"),
  244. },
  245. }),
  246. err: nil,
  247. },
  248. {
  249. name: "Sending contentEncoding=\"deflate\" contentType=application/json to http endpoint",
  250. contentEncoding: "deflate",
  251. contentType: jsonContentType,
  252. body: []byte(`{"streams": [{"stream": {"foo": "bar"},"values": [[ "1676888496000000000", "logline 1" ], [ "1676888497000000000", "logline 2" ]]}]}`),
  253. expected: generateLogs([]Log{
  254. {
  255. Timestamp: 1676888496000000000,
  256. Attributes: map[string]any{
  257. "foo": "bar",
  258. },
  259. Body: pcommon.NewValueStr("logline 1"),
  260. },
  261. {
  262. Timestamp: 1676888497000000000,
  263. Attributes: map[string]any{
  264. "foo": "bar",
  265. },
  266. Body: pcommon.NewValueStr("logline 2"),
  267. },
  268. }),
  269. err: nil,
  270. },
  271. }
  272. // Start http server
  273. addr, sink := startHTTPServer(t)
  274. for _, tt := range tests {
  275. t.Run(tt.name, func(t *testing.T) {
  276. // Send push request to the Loki receiver.
  277. _, port, _ := net.SplitHostPort(addr)
  278. collectorAddr := fmt.Sprintf("http://localhost:%s/loki/api/v1/push", port)
  279. require.NoError(t, sendToCollector(collectorAddr, tt.contentType, tt.contentEncoding, tt.body), "sending logs to http endpoint shouldn't have been failed")
  280. gotLogs := sink.AllLogs()
  281. require.NoError(t, plogtest.CompareLogs(tt.expected, gotLogs[0], plogtest.IgnoreObservedTimestamp()))
  282. sink.Reset()
  283. })
  284. }
  285. }
  286. func TestSendingPushRequestToGRPCEndpoint(t *testing.T) {
  287. // Start grpc server
  288. conn, sink := startGRPCServer(t)
  289. defer conn.Close()
  290. client := push.NewPusherClient(conn)
  291. tests := []struct {
  292. name string
  293. body *push.PushRequest
  294. expected plog.Logs
  295. err error
  296. }{
  297. {
  298. name: "Sending logs to grpc endpoint",
  299. body: &push.PushRequest{
  300. Streams: []push.Stream{
  301. {
  302. Labels: "{foo=\"bar\"}",
  303. Entries: []push.Entry{
  304. {
  305. Timestamp: time.Unix(0, 1676888496000000000),
  306. Line: "logline 1",
  307. },
  308. },
  309. },
  310. },
  311. },
  312. expected: generateLogs([]Log{
  313. {
  314. Timestamp: 1676888496000000000,
  315. Attributes: map[string]any{
  316. "foo": "bar",
  317. },
  318. Body: pcommon.NewValueStr("logline 1"),
  319. },
  320. }),
  321. },
  322. }
  323. for i, tt := range tests {
  324. t.Run(tt.name, func(t *testing.T) {
  325. resp, err := client.Push(context.Background(), tt.body)
  326. assert.NoError(t, err, "should not have failed to post logs")
  327. assert.NotNil(t, resp, "response should not have been nil")
  328. gotLogs := sink.AllLogs()
  329. require.NoError(t, plogtest.CompareLogs(tt.expected, gotLogs[i], plogtest.IgnoreObservedTimestamp()))
  330. })
  331. }
  332. }
  333. type Log struct {
  334. Timestamp int64
  335. Body pcommon.Value
  336. Attributes map[string]any
  337. }
  338. func generateLogs(logs []Log) plog.Logs {
  339. ld := plog.NewLogs()
  340. logSlice := ld.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords()
  341. for _, log := range logs {
  342. lr := logSlice.AppendEmpty()
  343. _ = lr.Attributes().FromRaw(log.Attributes)
  344. lr.SetTimestamp(pcommon.Timestamp(log.Timestamp))
  345. lr.Body().SetStr(log.Body.AsString())
  346. }
  347. return ld
  348. }