loki.go 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227
  1. // Copyright The OpenTelemetry Authors
  2. // SPDX-License-Identifier: Apache-2.0
  3. package lokireceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/lokireceiver"
  4. import (
  5. "context"
  6. "errors"
  7. "fmt"
  8. "io"
  9. "net/http"
  10. "sync"
  11. "github.com/grafana/loki/pkg/push"
  12. "go.opentelemetry.io/collector/component"
  13. "go.opentelemetry.io/collector/config/confighttp"
  14. "go.opentelemetry.io/collector/consumer"
  15. "go.opentelemetry.io/collector/receiver"
  16. "go.opentelemetry.io/collector/receiver/receiverhelper"
  17. "go.uber.org/zap"
  18. "google.golang.org/grpc"
  19. "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/loki"
  20. "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/lokireceiver/internal"
  21. )
  22. const (
  23. pbContentType = "application/x-protobuf"
  24. jsonContentType = "application/json"
  25. )
  26. const ErrAtLeastOneEntryFailedToProcess = "at least one entry in the push request failed to process"
  27. type lokiReceiver struct {
  28. conf *Config
  29. nextConsumer consumer.Logs
  30. settings receiver.CreateSettings
  31. httpMux *http.ServeMux
  32. serverHTTP *http.Server
  33. serverGRPC *grpc.Server
  34. shutdownWG sync.WaitGroup
  35. obsrepGRPC *receiverhelper.ObsReport
  36. obsrepHTTP *receiverhelper.ObsReport
  37. }
  38. func newLokiReceiver(conf *Config, nextConsumer consumer.Logs, settings receiver.CreateSettings) (*lokiReceiver, error) {
  39. r := &lokiReceiver{
  40. conf: conf,
  41. nextConsumer: nextConsumer,
  42. settings: settings,
  43. }
  44. var err error
  45. r.obsrepGRPC, err = receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{
  46. ReceiverID: settings.ID,
  47. Transport: "grpc",
  48. ReceiverCreateSettings: settings,
  49. })
  50. if err != nil {
  51. return nil, err
  52. }
  53. r.obsrepHTTP, err = receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{
  54. ReceiverID: settings.ID,
  55. Transport: "http",
  56. ReceiverCreateSettings: settings,
  57. })
  58. if err != nil {
  59. return nil, err
  60. }
  61. if nextConsumer == nil {
  62. return nil, component.ErrNilNextConsumer
  63. }
  64. if conf.HTTP != nil {
  65. r.httpMux = http.NewServeMux()
  66. r.httpMux.HandleFunc("/loki/api/v1/push", func(resp http.ResponseWriter, req *http.Request) {
  67. if req.Method != http.MethodPost {
  68. handleUnmatchedMethod(resp)
  69. return
  70. }
  71. switch req.Header.Get("Content-Type") {
  72. case jsonContentType, pbContentType:
  73. handleLogs(resp, req, r)
  74. default:
  75. handleUnmatchedContentType(resp)
  76. }
  77. })
  78. }
  79. return r, nil
  80. }
  81. func (r *lokiReceiver) startProtocolsServers(host component.Host) error {
  82. var err error
  83. if r.conf.HTTP != nil {
  84. r.serverHTTP, err = r.conf.HTTP.ToServer(host, r.settings.TelemetrySettings, r.httpMux, confighttp.WithDecoder("snappy", func(body io.ReadCloser) (io.ReadCloser, error) { return body, nil }))
  85. if err != nil {
  86. return fmt.Errorf("failed create http server error: %w", err)
  87. }
  88. err = r.startHTTPServer(host)
  89. if err != nil {
  90. return fmt.Errorf("failed to start http server error: %w", err)
  91. }
  92. }
  93. if r.conf.GRPC != nil {
  94. r.serverGRPC, err = r.conf.GRPC.ToServer(host, r.settings.TelemetrySettings)
  95. if err != nil {
  96. return fmt.Errorf("failed create grpc server error: %w", err)
  97. }
  98. push.RegisterPusherServer(r.serverGRPC, r)
  99. err = r.startGRPCServer(host)
  100. if err != nil {
  101. return fmt.Errorf("failed to start grpc server error: %w", err)
  102. }
  103. }
  104. return err
  105. }
  106. func (r *lokiReceiver) startHTTPServer(host component.Host) error {
  107. r.settings.Logger.Info("Starting HTTP server", zap.String("endpoint", r.conf.HTTP.Endpoint))
  108. listener, err := r.conf.HTTP.ToListener()
  109. if err != nil {
  110. return err
  111. }
  112. r.shutdownWG.Add(1)
  113. go func() {
  114. defer r.shutdownWG.Done()
  115. if errHTTP := r.serverHTTP.Serve(listener); !errors.Is(errHTTP, http.ErrServerClosed) && errHTTP != nil {
  116. host.ReportFatalError(errHTTP)
  117. }
  118. }()
  119. return nil
  120. }
  121. func (r *lokiReceiver) startGRPCServer(host component.Host) error {
  122. r.settings.Logger.Info("Starting GRPC server", zap.String("endpoint", r.conf.GRPC.NetAddr.Endpoint))
  123. listener, err := r.conf.GRPC.ToListener()
  124. if err != nil {
  125. return err
  126. }
  127. r.shutdownWG.Add(1)
  128. go func() {
  129. defer r.shutdownWG.Done()
  130. if errGRPC := r.serverGRPC.Serve(listener); !errors.Is(errGRPC, grpc.ErrServerStopped) && errGRPC != nil {
  131. host.ReportFatalError(errGRPC)
  132. }
  133. }()
  134. return nil
  135. }
  136. func (r *lokiReceiver) Push(ctx context.Context, pushRequest *push.PushRequest) (*push.PushResponse, error) {
  137. logs, err := loki.PushRequestToLogs(pushRequest, r.conf.KeepTimestamp)
  138. if err != nil {
  139. r.settings.Logger.Warn(ErrAtLeastOneEntryFailedToProcess, zap.Error(err))
  140. return &push.PushResponse{}, err
  141. }
  142. ctx = r.obsrepGRPC.StartLogsOp(ctx)
  143. logRecordCount := logs.LogRecordCount()
  144. err = r.nextConsumer.ConsumeLogs(ctx, logs)
  145. r.obsrepGRPC.EndLogsOp(ctx, "protobuf", logRecordCount, err)
  146. return &push.PushResponse{}, nil
  147. }
  148. func (r *lokiReceiver) Start(_ context.Context, host component.Host) error {
  149. return r.startProtocolsServers(host)
  150. }
  151. func (r *lokiReceiver) Shutdown(ctx context.Context) error {
  152. var err error
  153. if r.serverHTTP != nil {
  154. err = r.serverHTTP.Shutdown(ctx)
  155. }
  156. if r.serverGRPC != nil {
  157. r.serverGRPC.GracefulStop()
  158. }
  159. r.shutdownWG.Wait()
  160. return err
  161. }
  162. func handleUnmatchedMethod(resp http.ResponseWriter) {
  163. status := http.StatusMethodNotAllowed
  164. writeResponse(resp, "text/plain", status, []byte(fmt.Sprintf("%v method not allowed, supported: [POST]", status)))
  165. }
  166. func handleUnmatchedContentType(resp http.ResponseWriter) {
  167. status := http.StatusUnsupportedMediaType
  168. writeResponse(resp, "text/plain", status, []byte(fmt.Sprintf("%v unsupported media type, supported: [%s, %s]", status, jsonContentType, pbContentType)))
  169. }
  170. func writeResponse(w http.ResponseWriter, contentType string, statusCode int, msg []byte) {
  171. w.Header().Set("Content-Type", contentType)
  172. w.WriteHeader(statusCode)
  173. // Nothing we can do with the error if we cannot write to the response.
  174. _, _ = w.Write(msg)
  175. }
  176. func handleLogs(resp http.ResponseWriter, req *http.Request, r *lokiReceiver) {
  177. pushRequest, err := internal.ParseRequest(req)
  178. if err != nil {
  179. http.Error(resp, err.Error(), http.StatusBadRequest)
  180. return
  181. }
  182. logs, err := loki.PushRequestToLogs(pushRequest, r.conf.KeepTimestamp)
  183. if err != nil {
  184. r.settings.Logger.Warn(ErrAtLeastOneEntryFailedToProcess, zap.Error(err))
  185. http.Error(resp, err.Error(), http.StatusBadRequest)
  186. return
  187. }
  188. ctx := r.obsrepHTTP.StartLogsOp(req.Context())
  189. logRecordCount := logs.LogRecordCount()
  190. err = r.nextConsumer.ConsumeLogs(ctx, logs)
  191. r.obsrepHTTP.EndLogsOp(ctx, "json", logRecordCount, err)
  192. resp.WriteHeader(http.StatusNoContent)
  193. }