Status | |
---|---|
Stability | beta: metrics, logs, traces |
Distributions | contrib, aws, grafana, observiq, splunk, sumo |
Issues | |
Code Owners | @pavolloffay, @MovieStoreGuy |
Kafka receiver receives traces, metrics, and logs from Kafka. Message payload encoding is configurable.
Note that metrics and logs only support OTLP.
The following settings are required:
protocol_version
(no default): Kafka protocol version e.g. 2.0.0The following settings can be optionally configured:
brokers
(default = localhost:9092): The list of kafka brokersresolve_canonical_bootstrap_servers_only
(default = false): Whether to resolve then reverse-lookup broker IPs during startuptopic
(default = otlp_spans): The name of the kafka topic to read fromencoding
(default = otlp_proto): The encoding of the payload received from kafka. Available encodings:
otlp_proto
: the payload is deserialized to ExportTraceServiceRequest
, ExportLogsServiceRequest
or ExportMetricsServiceRequest
respectively.jaeger_proto
: the payload is deserialized to a single Jaeger proto Span
.jaeger_json
: the payload is deserialized to a single Jaeger JSON Span using jsonpb
.zipkin_proto
: the payload is deserialized into a list of Zipkin proto spans.zipkin_json
: the payload is deserialized into a list of Zipkin V2 JSON spans.zipkin_thrift
: the payload is deserialized into a list of Zipkin Thrift spans.raw
: (logs only) the payload's bytes are inserted as the body of a log record.text
: (logs only) the payload are decoded as text and inserted as the body of a log record. By default, it uses UTF-8 to decode. You can use text_<ENCODING>
, like text_utf-8
, text_shift_jis
, etc., to customize this behavior.json
: (logs only) the payload is decoded as JSON and inserted as the body of a log record.group_id
(default = otel-collector): The consumer group that receiver will be consuming messages fromclient_id
(default = otel-collector): The consumer client ID that receiver will useinitial_offset
(default = latest): The initial offset to use if no offset was previously committed. Must be latest
or earliest
.auth
plain_text
username
: The username to use.password
: The password to usesasl
username
: The username to use.password
: The password to usemechanism
: The sasl mechanism to use (SCRAM-SHA-256, SCRAM-SHA-512, AWS_MSK_IAM or PLAIN)aws_msk.region
: AWS Region in case of AWS_MSK_IAM mechanismaws_msk.broker_addr
: MSK Broker address in case of AWS_MSK_IAM mechanismtls
ca_file
: path to the CA cert. For a client this verifies the server certificate. Should
only be used if insecure
is set to false.cert_file
: path to the TLS cert to use for TLS required connections. Should
only be used if insecure
is set to false.key_file
: path to the TLS key to use for TLS required connections. Should
only be used if insecure
is set to false.insecure
(default = false): Disable verifying the server's certificate
chain and host name (InsecureSkipVerify
in the tls config)server_name_override
: ServerName indicates the name of the server requested by the client
in order to support virtual hosting.kerberos
service_name
: Kerberos service namerealm
: Kerberos realmuse_keytab
: Use of keytab instead of password, if this is true, keytab file will be used instead of passwordusername
: The Kerberos username used for authenticate with KDCpassword
: The Kerberos password used for authenticate with KDCconfig_file
: Path to Kerberos configuration. i.e /etc/krb5.confkeytab_file
: Path to keytab file. i.e /etc/security/kafka.keytabmetadata
full
(default = true): Whether to maintain a full set of metadata. When
disabled, the client does not make the initial request to broker at the
startup.retry
max
(default = 3): The number of retries to get metadatabackoff
(default = 250ms): How long to wait between metadata retriesautocommit
enable
: (default = true) Whether or not to auto-commit updated offsets back to the brokerinterval
: (default = 1s) How frequently to commit updated offsets. Ineffective unless auto-commit is enabledmessage_marking
:
after
: (default = false) If true, the messages are marked after the pipeline executionon_error
: (default = false) If false, only the successfully processed messages are marked
Note: this can block the entire partition in case a message processing returns a permanent errorheader_extraction
:
extract_headers
(default = false): Allows user to attach header fields to resource attributes in otel pieplineheaders
(default = []): List of headers they'd like to extract from kafka record.
Note: Matching pattern will be exact
. Regexes are not supported as of now.
Example:
receivers:
kafka:
protocol_version: 2.0.0
Example of header extraction:
receivers:
kafka:
topic: test
header_extraction:
extract_headers: true
headers: ["header1", "header2"]
If we feed following kafka record to test
topic and use above configs:
{
event: Hello,
headers: {
header1: value1,
header2: value2,
}
}
we will get a log record in collector similar to:
{
...
body: Hello,
resource: {
kafka.header.header1: value1,
kafka.header.header2: value2,
},
...
}
Here you can see the kafka record header header1
and header2
being added to resource attribute.
Every matching kafka header key is prefixed with kafka.header
string and attached to resource attributes.