# Kafka Receiver | Status | | | ------------- |-----------| | Stability | [beta]: metrics, logs, traces | | Distributions | [contrib], [aws], [grafana], [observiq], [splunk], [sumo] | | Issues | [![Open issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aopen%20label%3Areceiver%2Fkafka%20&label=open&color=orange&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aopen+is%3Aissue+label%3Areceiver%2Fkafka) [![Closed issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aclosed%20label%3Areceiver%2Fkafka%20&label=closed&color=blue&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aclosed+is%3Aissue+label%3Areceiver%2Fkafka) | | [Code Owners](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/CONTRIBUTING.md#becoming-a-code-owner) | [@pavolloffay](https://www.github.com/pavolloffay), [@MovieStoreGuy](https://www.github.com/MovieStoreGuy) | [beta]: https://github.com/open-telemetry/opentelemetry-collector#beta [contrib]: https://github.com/open-telemetry/opentelemetry-collector-releases/tree/main/distributions/otelcol-contrib [aws]: https://github.com/aws-observability/aws-otel-collector [grafana]: https://github.com/grafana/agent [observiq]: https://github.com/observIQ/observiq-otel-collector [splunk]: https://github.com/signalfx/splunk-otel-collector [sumo]: https://github.com/SumoLogic/sumologic-otel-collector Kafka receiver receives traces, metrics, and logs from Kafka. Message payload encoding is configurable. Note that metrics and logs only support OTLP. ## Getting Started The following settings are required: - `protocol_version` (no default): Kafka protocol version e.g. 2.0.0 The following settings can be optionally configured: - `brokers` (default = localhost:9092): The list of kafka brokers - `resolve_canonical_bootstrap_servers_only` (default = false): Whether to resolve then reverse-lookup broker IPs during startup - `topic` (default = otlp_spans): The name of the kafka topic to read from - `encoding` (default = otlp_proto): The encoding of the payload received from kafka. Available encodings: - `otlp_proto`: the payload is deserialized to `ExportTraceServiceRequest`, `ExportLogsServiceRequest` or `ExportMetricsServiceRequest` respectively. - `jaeger_proto`: the payload is deserialized to a single Jaeger proto `Span`. - `jaeger_json`: the payload is deserialized to a single Jaeger JSON Span using `jsonpb`. - `zipkin_proto`: the payload is deserialized into a list of Zipkin proto spans. - `zipkin_json`: the payload is deserialized into a list of Zipkin V2 JSON spans. - `zipkin_thrift`: the payload is deserialized into a list of Zipkin Thrift spans. - `raw`: (logs only) the payload's bytes are inserted as the body of a log record. - `text`: (logs only) the payload are decoded as text and inserted as the body of a log record. By default, it uses UTF-8 to decode. You can use `text_`, like `text_utf-8`, `text_shift_jis`, etc., to customize this behavior. - `json`: (logs only) the payload is decoded as JSON and inserted as the body of a log record. - `group_id` (default = otel-collector): The consumer group that receiver will be consuming messages from - `client_id` (default = otel-collector): The consumer client ID that receiver will use - `initial_offset` (default = latest): The initial offset to use if no offset was previously committed. Must be `latest` or `earliest`. - `auth` - `plain_text` - `username`: The username to use. - `password`: The password to use - `sasl` - `username`: The username to use. - `password`: The password to use - `mechanism`: The sasl mechanism to use (SCRAM-SHA-256, SCRAM-SHA-512, AWS_MSK_IAM or PLAIN) - `aws_msk.region`: AWS Region in case of AWS_MSK_IAM mechanism - `aws_msk.broker_addr`: MSK Broker address in case of AWS_MSK_IAM mechanism - `tls` - `ca_file`: path to the CA cert. For a client this verifies the server certificate. Should only be used if `insecure` is set to false. - `cert_file`: path to the TLS cert to use for TLS required connections. Should only be used if `insecure` is set to false. - `key_file`: path to the TLS key to use for TLS required connections. Should only be used if `insecure` is set to false. - `insecure` (default = false): Disable verifying the server's certificate chain and host name (`InsecureSkipVerify` in the tls config) - `server_name_override`: ServerName indicates the name of the server requested by the client in order to support virtual hosting. - `kerberos` - `service_name`: Kerberos service name - `realm`: Kerberos realm - `use_keytab`: Use of keytab instead of password, if this is true, keytab file will be used instead of password - `username`: The Kerberos username used for authenticate with KDC - `password`: The Kerberos password used for authenticate with KDC - `config_file`: Path to Kerberos configuration. i.e /etc/krb5.conf - `keytab_file`: Path to keytab file. i.e /etc/security/kafka.keytab - `metadata` - `full` (default = true): Whether to maintain a full set of metadata. When disabled, the client does not make the initial request to broker at the startup. - `retry` - `max` (default = 3): The number of retries to get metadata - `backoff` (default = 250ms): How long to wait between metadata retries - `autocommit` - `enable`: (default = true) Whether or not to auto-commit updated offsets back to the broker - `interval`: (default = 1s) How frequently to commit updated offsets. Ineffective unless auto-commit is enabled - `message_marking`: - `after`: (default = false) If true, the messages are marked after the pipeline execution - `on_error`: (default = false) If false, only the successfully processed messages are marked **Note: this can block the entire partition in case a message processing returns a permanent error** - `header_extraction`: - `extract_headers` (default = false): Allows user to attach header fields to resource attributes in otel piepline - `headers` (default = []): List of headers they'd like to extract from kafka record. **Note: Matching pattern will be `exact`. Regexes are not supported as of now.** Example: ```yaml receivers: kafka: protocol_version: 2.0.0 ``` Example of header extraction: ```yaml receivers: kafka: topic: test header_extraction: extract_headers: true headers: ["header1", "header2"] ``` - If we feed following kafka record to `test` topic and use above configs: ```yaml { event: Hello, headers: { header1: value1, header2: value2, } } ``` we will get a log record in collector similar to: ```yaml { ... body: Hello, resource: { kafka.header.header1: value1, kafka.header.header2: value2, }, ... } ``` - Here you can see the kafka record header `header1` and `header2` being added to resource attribute. - Every **matching** kafka header key is prefixed with `kafka.header` string and attached to resource attributes.