OpenTelemetry Bot d680729c09 [chore] Prepare release 0.90.0 (#29543) vor 1 Jahr
..
internal 5d6c7c8260 [chore][receiver/kafka] create metadata (#21788) vor 1 Jahr
testdata 6af33d6229 [kafka] Expose resolve_canonical_bootstrap_servers_only (#26022) vor 1 Jahr
Makefile ced60e83d2 Migrate kafkareceiver from Collector core to Collector contrib (#4661) vor 3 Jahren
README.md 6af33d6229 [kafka] Expose resolve_canonical_bootstrap_servers_only (#26022) vor 1 Jahr
config.go 6af33d6229 [kafka] Expose resolve_canonical_bootstrap_servers_only (#26022) vor 1 Jahr
config_test.go 6af33d6229 [kafka] Expose resolve_canonical_bootstrap_servers_only (#26022) vor 1 Jahr
doc.go 5133f4ccd6 [chore] use license shortform (#22052) vor 1 Jahr
factory.go f481d0cc62 [receiver/kafka] unexport the function `WithTracesUnmarshalers`, `WithMetricsUnmarshalers`, `WithLogsUnmarshalers` (#26644) vor 1 Jahr
factory_test.go f481d0cc62 [receiver/kafka] unexport the function `WithTracesUnmarshalers`, `WithMetricsUnmarshalers`, `WithLogsUnmarshalers` (#26644) vor 1 Jahr
go.mod d680729c09 [chore] Prepare release 0.90.0 (#29543) vor 1 Jahr
go.sum 40b485f08a Update core for v0.90.0 release (#29539) vor 1 Jahr
header_extraction.go 31df678b03 Kafka extract header metada (#24367) vor 1 Jahr
header_extraction_test.go 31df678b03 Kafka extract header metada (#24367) vor 1 Jahr
jaeger_unmarshaler.go 5133f4ccd6 [chore] use license shortform (#22052) vor 1 Jahr
jaeger_unmarshaler_test.go 5133f4ccd6 [chore] use license shortform (#22052) vor 1 Jahr
json_unmarshaler.go f4c44858b5 [all][chore] Moved from interface{} to any for all go code (#29072) vor 1 Jahr
json_unmarshaler_test.go 0e4cb4a434 [receiver/kafka] Add JSON-encoded log support (#24028) vor 1 Jahr
kafka_receiver.go 98124e0b6d Fix panic when LogRecordCount is called after ConsumeLogs (#29274) vor 1 Jahr
kafka_receiver_test.go f481d0cc62 [receiver/kafka] unexport the function `WithTracesUnmarshalers`, `WithMetricsUnmarshalers`, `WithLogsUnmarshalers` (#26644) vor 1 Jahr
metadata.yaml 8a4348cb00 [chore] add codeowners to metadata (#24404) vor 1 Jahr
metrics.go f481d0cc62 [receiver/kafka] unexport the function `WithTracesUnmarshalers`, `WithMetricsUnmarshalers`, `WithLogsUnmarshalers` (#26644) vor 1 Jahr
metrics_test.go f481d0cc62 [receiver/kafka] unexport the function `WithTracesUnmarshalers`, `WithMetricsUnmarshalers`, `WithLogsUnmarshalers` (#26644) vor 1 Jahr
pdata_unmarshaler.go 5133f4ccd6 [chore] use license shortform (#22052) vor 1 Jahr
pdata_unmarshaler_test.go 5133f4ccd6 [chore] use license shortform (#22052) vor 1 Jahr
raw_unmarshaler.go 5133f4ccd6 [chore] use license shortform (#22052) vor 1 Jahr
raw_unmarshaler_test.go 5133f4ccd6 [chore] use license shortform (#22052) vor 1 Jahr
text_unmarshaler.go 5133f4ccd6 [chore] use license shortform (#22052) vor 1 Jahr
text_unmarshaler_test.go 5133f4ccd6 [chore] use license shortform (#22052) vor 1 Jahr
unmarshaler.go 0e4cb4a434 [receiver/kafka] Add JSON-encoded log support (#24028) vor 1 Jahr
unmarshaler_test.go 0e4cb4a434 [receiver/kafka] Add JSON-encoded log support (#24028) vor 1 Jahr
zipkin_unmarshaler.go 5133f4ccd6 [chore] use license shortform (#22052) vor 1 Jahr
zipkin_unmarshaler_test.go 5133f4ccd6 [chore] use license shortform (#22052) vor 1 Jahr

README.md

Kafka Receiver

Status
Stability beta: metrics, logs, traces
Distributions contrib, aws, grafana, observiq, splunk, sumo
Issues Open issues Closed issues
Code Owners @pavolloffay, @MovieStoreGuy

Kafka receiver receives traces, metrics, and logs from Kafka. Message payload encoding is configurable.

Note that metrics and logs only support OTLP.

Getting Started

The following settings are required:

  • protocol_version (no default): Kafka protocol version e.g. 2.0.0

The following settings can be optionally configured:

  • brokers (default = localhost:9092): The list of kafka brokers
  • resolve_canonical_bootstrap_servers_only (default = false): Whether to resolve then reverse-lookup broker IPs during startup
  • topic (default = otlp_spans): The name of the kafka topic to read from
  • encoding (default = otlp_proto): The encoding of the payload received from kafka. Available encodings:
    • otlp_proto: the payload is deserialized to ExportTraceServiceRequest, ExportLogsServiceRequest or ExportMetricsServiceRequest respectively.
    • jaeger_proto: the payload is deserialized to a single Jaeger proto Span.
    • jaeger_json: the payload is deserialized to a single Jaeger JSON Span using jsonpb.
    • zipkin_proto: the payload is deserialized into a list of Zipkin proto spans.
    • zipkin_json: the payload is deserialized into a list of Zipkin V2 JSON spans.
    • zipkin_thrift: the payload is deserialized into a list of Zipkin Thrift spans.
    • raw: (logs only) the payload's bytes are inserted as the body of a log record.
    • text: (logs only) the payload are decoded as text and inserted as the body of a log record. By default, it uses UTF-8 to decode. You can use text_<ENCODING>, like text_utf-8, text_shift_jis, etc., to customize this behavior.
    • json: (logs only) the payload is decoded as JSON and inserted as the body of a log record.
  • group_id (default = otel-collector): The consumer group that receiver will be consuming messages from
  • client_id (default = otel-collector): The consumer client ID that receiver will use
  • initial_offset (default = latest): The initial offset to use if no offset was previously committed. Must be latest or earliest.
  • auth
    • plain_text
    • username: The username to use.
    • password: The password to use
    • sasl
    • username: The username to use.
    • password: The password to use
    • mechanism: The sasl mechanism to use (SCRAM-SHA-256, SCRAM-SHA-512, AWS_MSK_IAM or PLAIN)
    • aws_msk.region: AWS Region in case of AWS_MSK_IAM mechanism
    • aws_msk.broker_addr: MSK Broker address in case of AWS_MSK_IAM mechanism
    • tls
    • ca_file: path to the CA cert. For a client this verifies the server certificate. Should only be used if insecure is set to false.
    • cert_file: path to the TLS cert to use for TLS required connections. Should only be used if insecure is set to false.
    • key_file: path to the TLS key to use for TLS required connections. Should only be used if insecure is set to false.
    • insecure (default = false): Disable verifying the server's certificate chain and host name (InsecureSkipVerify in the tls config)
    • server_name_override: ServerName indicates the name of the server requested by the client in order to support virtual hosting.
    • kerberos
    • service_name: Kerberos service name
    • realm: Kerberos realm
    • use_keytab: Use of keytab instead of password, if this is true, keytab file will be used instead of password
    • username: The Kerberos username used for authenticate with KDC
    • password: The Kerberos password used for authenticate with KDC
    • config_file: Path to Kerberos configuration. i.e /etc/krb5.conf
    • keytab_file: Path to keytab file. i.e /etc/security/kafka.keytab
  • metadata
    • full (default = true): Whether to maintain a full set of metadata. When disabled, the client does not make the initial request to broker at the startup.
    • retry
    • max (default = 3): The number of retries to get metadata
    • backoff (default = 250ms): How long to wait between metadata retries
  • autocommit
    • enable: (default = true) Whether or not to auto-commit updated offsets back to the broker
    • interval: (default = 1s) How frequently to commit updated offsets. Ineffective unless auto-commit is enabled
  • message_marking:
    • after: (default = false) If true, the messages are marked after the pipeline execution
    • on_error: (default = false) If false, only the successfully processed messages are marked Note: this can block the entire partition in case a message processing returns a permanent error
  • header_extraction:

    • extract_headers (default = false): Allows user to attach header fields to resource attributes in otel piepline
    • headers (default = []): List of headers they'd like to extract from kafka record. Note: Matching pattern will be exact. Regexes are not supported as of now. Example:

      receivers:
      kafka:
      protocol_version: 2.0.0
      

Example of header extraction:

receivers:
  kafka:
    topic: test
    header_extraction: 
      extract_headers: true
      headers: ["header1", "header2"]
  • If we feed following kafka record to test topic and use above configs:

    {
    event: Hello,
    headers: {
    header1: value1,
    header2: value2,
    }
    }
    

    we will get a log record in collector similar to:

    {
    ...
    body: Hello,
    resource: {
    kafka.header.header1: value1,
    kafka.header.header2: value2,
    },
    ...
    }
    
  • Here you can see the kafka record header header1 and header2 being added to resource attribute.

  • Every matching kafka header key is prefixed with kafka.header string and attached to resource attributes.