{{- /* Copyright VMware, Inc. SPDX-License-Identifier: APACHE-2.0 */}} {{- $releaseNamespace := include "common.names.namespace" . }} {{- $fullname := include "common.names.fullname" . }} {{- $clusterDomain := .Values.clusterDomain }} apiVersion: v1 kind: ConfigMap metadata: name: {{ printf "%s-scripts" $fullname }} namespace: {{ $releaseNamespace | quote }} labels: {{- include "common.labels.standard" ( dict "customLabels" .Values.commonLabels "context" $ ) | nindent 4 }} {{- if .Values.commonAnnotations }} annotations: {{- include "common.tplvalues.render" ( dict "value" .Values.commonAnnotations "context" $ ) | nindent 4 }} {{- end }} data: {{- if .Values.externalAccess.autoDiscovery.enabled }} auto-discovery.sh: |- #!/bin/bash SVC_NAME="${MY_POD_NAME}-external" AUTODISCOVERY_SERVICE_TYPE="${AUTODISCOVERY_SERVICE_TYPE:-}" # Auxiliary functions retry_while() { local -r cmd="${1:?cmd is missing}" local -r retries="${2:-12}" local -r sleep_time="${3:-5}" local return_value=1 read -r -a command <<< "$cmd" for ((i = 1 ; i <= retries ; i+=1 )); do "${command[@]}" && return_value=0 && break sleep "$sleep_time" done return $return_value } k8s_svc_lb_ip() { local namespace=${1:?namespace is missing} local service=${2:?service is missing} local service_ip=$(kubectl get svc "$service" -n "$namespace" -o jsonpath="{.status.loadBalancer.ingress[0].ip}") local service_hostname=$(kubectl get svc "$service" -n "$namespace" -o jsonpath="{.status.loadBalancer.ingress[0].hostname}") if [[ -n ${service_ip} ]]; then echo "${service_ip}" else echo "${service_hostname}" fi } k8s_svc_lb_ip_ready() { local namespace=${1:?namespace is missing} local service=${2:?service is missing} [[ -n "$(k8s_svc_lb_ip "$namespace" "$service")" ]] } k8s_svc_node_port() { local namespace=${1:?namespace is missing} local service=${2:?service is missing} local index=${3:-0} local node_port="$(kubectl get svc "$service" -n "$namespace" -o jsonpath="{.spec.ports[$index].nodePort}")" echo "$node_port" } if [[ "$AUTODISCOVERY_SERVICE_TYPE" = "LoadBalancer" ]]; then # Wait until LoadBalancer IP is ready retry_while "k8s_svc_lb_ip_ready {{ $releaseNamespace }} $SVC_NAME" || exit 1 # Obtain LoadBalancer external IP k8s_svc_lb_ip "{{ $releaseNamespace }}" "$SVC_NAME" | tee "/shared/external-host.txt" elif [[ "$AUTODISCOVERY_SERVICE_TYPE" = "NodePort" ]]; then k8s_svc_node_port "{{ $releaseNamespace }}" "$SVC_NAME" | tee "/shared/external-port.txt" else echo "Unsupported autodiscovery service type: '$AUTODISCOVERY_SERVICE_TYPE'" exit 1 fi {{- end }} kafka-init.sh: |- #!/bin/bash set -o errexit set -o nounset set -o pipefail error(){ local message="${1:?missing message}" echo "ERROR: ${message}" exit 1 } retry_while() { local -r cmd="${1:?cmd is missing}" local -r retries="${2:-12}" local -r sleep_time="${3:-5}" local return_value=1 read -r -a command <<< "$cmd" for ((i = 1 ; i <= retries ; i+=1 )); do "${command[@]}" && return_value=0 && break sleep "$sleep_time" done return $return_value } replace_in_file() { local filename="${1:?filename is required}" local match_regex="${2:?match regex is required}" local substitute_regex="${3:?substitute regex is required}" local posix_regex=${4:-true} local result # We should avoid using 'sed in-place' substitutions # 1) They are not compatible with files mounted from ConfigMap(s) # 2) We found incompatibility issues with Debian10 and "in-place" substitutions local -r del=$'\001' # Use a non-printable character as a 'sed' delimiter to avoid issues if [[ $posix_regex = true ]]; then result="$(sed -E "s${del}${match_regex}${del}${substitute_regex}${del}g" "$filename")" else result="$(sed "s${del}${match_regex}${del}${substitute_regex}${del}g" "$filename")" fi echo "$result" > "$filename" } kafka_conf_set() { local file="${1:?missing file}" local key="${2:?missing key}" local value="${3:?missing value}" # Check if the value was set before if grep -q "^[#\\s]*$key\s*=.*" "$file"; then # Update the existing key replace_in_file "$file" "^[#\\s]*${key}\s*=.*" "${key}=${value}" false else # Add a new key printf '\n%s=%s' "$key" "$value" >>"$file" fi } replace_placeholder() { local placeholder="${1:?missing placeholder value}" local password="${2:?missing password value}" sed -i "s/$placeholder/$password/g" "$KAFKA_CONFIG_FILE" } append_file_to_kafka_conf() { local file="${1:?missing source file}" local conf="${2:?missing kafka conf file}" cat "$1" >> "$2" } configure_external_access() { # Configure external hostname if [[ -f "/shared/external-host.txt" ]]; then host=$(cat "/shared/external-host.txt") elif [[ -n "${EXTERNAL_ACCESS_HOST:-}" ]]; then host="$EXTERNAL_ACCESS_HOST" elif [[ -n "${EXTERNAL_ACCESS_HOSTS_LIST:-}" ]]; then read -r -a hosts <<<"$(tr ',' ' ' <<<"${EXTERNAL_ACCESS_HOSTS_LIST}")" host="${hosts[$POD_ID]}" elif [[ "$EXTERNAL_ACCESS_HOST_USE_PUBLIC_IP" =~ ^(yes|true)$ ]]; then host=$(curl -s https://ipinfo.io/ip) else error "External access hostname not provided" fi # Configure external port if [[ -f "/shared/external-port.txt" ]]; then port=$(cat "/shared/external-port.txt") elif [[ -n "${EXTERNAL_ACCESS_PORT:-}" ]]; then if [[ "${EXTERNAL_ACCESS_PORT_AUTOINCREMENT:-}" =~ ^(yes|true)$ ]]; then port="$((EXTERNAL_ACCESS_PORT + POD_ID))" else port="$EXTERNAL_ACCESS_PORT" fi elif [[ -n "${EXTERNAL_ACCESS_PORTS_LIST:-}" ]]; then read -r -a ports <<<"$(tr ',' ' ' <<<"${EXTERNAL_ACCESS_PORTS_LIST}")" port="${ports[$POD_ID]}" else error "External access port not provided" fi # Configure Kafka advertised listeners sed -i -E "s|^(advertised\.listeners=\S+)$|\1,{{ upper .Values.listeners.external.name }}://${host}:${port}|" "$KAFKA_CONFIG_FILE" } {{- if (include "kafka.sslEnabled" .) }} configure_kafka_tls() { # Remove previously existing keystores and certificates, if any rm -f /certs/kafka.keystore.jks /certs/kafka.truststore.jks rm -f /certs/tls.crt /certs/tls.key /certs/ca.crt find /certs -name "xx*" -exec rm {} \; if [[ "${KAFKA_TLS_TYPE}" = "PEM" ]]; then # Copy PEM certificate and key if [[ -f "/mounted-certs/kafka-${POD_ROLE}-${POD_ID}.crt" && "/mounted-certs/kafka-${POD_ROLE}-${POD_ID}.key" ]]; then cp "/mounted-certs/kafka-${POD_ROLE}-${POD_ID}.crt" /certs/tls.crt # Copy the PEM key ensuring the key used PEM format with PKCS#8 openssl pkcs8 -topk8 -nocrypt -in "/mounted-certs/kafka-${POD_ROLE}-${POD_ID}.key" > /certs/tls.key elif [[ -f /mounted-certs/kafka.crt && -f /mounted-certs/kafka.key ]]; then cp "/mounted-certs/kafka.crt" /certs/tls.crt # Copy the PEM key ensuring the key used PEM format with PKCS#8 openssl pkcs8 -topk8 -nocrypt -in "/mounted-certs/kafka.key" > /certs/tls.key elif [[ -f /mounted-certs/tls.crt && -f /mounted-certs/tls.key ]]; then cp "/mounted-certs/tls.crt" /certs/tls.crt # Copy the PEM key ensuring the key used PEM format with PKCS#8 openssl pkcs8 -topk8 -nocrypt -in "/mounted-certs/tls.key" > /certs/tls.key else error "PEM key and cert files not found" fi {{- if not .Values.tls.pemChainIncluded }} # Copy CA certificate if [[ -f /mounted-certs/kafka-ca.crt ]]; then cp /mounted-certs/kafka-ca.crt /certs/ca.crt elif [[ -f /mounted-certs/ca.crt ]]; then cp /mounted-certs/ca.crt /certs/ca.crt else error "CA certificate file not found" fi {{- else }} # CA certificates are also included in the same certificate # All public certs will be included in the truststore cp /certs/tls.crt /certs/ca.crt {{- end }} # Create JKS keystore from PEM cert and key openssl pkcs12 -export -in "/certs/tls.crt" \ -passout pass:"${KAFKA_TLS_KEYSTORE_PASSWORD}" \ -inkey "/certs/tls.key" \ -out "/certs/kafka.keystore.p12" keytool -importkeystore -srckeystore "/certs/kafka.keystore.p12" \ -srcstoretype PKCS12 \ -srcstorepass "${KAFKA_TLS_KEYSTORE_PASSWORD}" \ -deststorepass "${KAFKA_TLS_KEYSTORE_PASSWORD}" \ -destkeystore "/certs/kafka.keystore.jks" \ -noprompt # Create JKS truststore from CA cert keytool -keystore /certs/kafka.truststore.jks -alias CARoot -import -file /certs/ca.crt -storepass "${KAFKA_TLS_TRUSTSTORE_PASSWORD}" -noprompt # Remove extra files rm -f "/certs/kafka.keystore.p12" "/certs/tls.crt" "/certs/tls.key" "/certs/ca.crt" elif [[ "${KAFKA_TLS_TYPE}" = "JKS" ]]; then if [[ -f "/mounted-certs/kafka-${POD_ROLE}-${POD_ID}.keystore.jks" ]]; then cp "/mounted-certs/kafka-${POD_ROLE}-${POD_ID}.keystore.jks" /certs/kafka.keystore.jks elif [[ -f {{ printf "/mounted-certs/%s" ( default "kafka.keystore.jks" .Values.tls.jksKeystoreKey) | quote }} ]]; then cp {{ printf "/mounted-certs/%s" ( default "kafka.keystore.jks" .Values.tls.jksKeystoreKey) | quote }} /certs/kafka.keystore.jks else error "Keystore file not found" fi if [[ -f {{ printf "/mounted-certs/%s" ( default "kafka.truststore.jks" .Values.tls.jksTruststoreKey) | quote }} ]]; then cp {{ printf "/mounted-certs/%s" ( default "kafka.truststore.jks" .Values.tls.jksTruststoreKey) | quote }} /certs/kafka.truststore.jks else error "Truststore file not found" fi else error "Invalid type ${KAFKA_TLS_TYPE}" fi # Configure TLS password settings in Kafka configuration [[ -n "${KAFKA_TLS_KEYSTORE_PASSWORD:-}" ]] && kafka_conf_set "$KAFKA_CONFIG_FILE" "ssl.keystore.password" "$KAFKA_TLS_KEYSTORE_PASSWORD" [[ -n "${KAFKA_TLS_TRUSTSTORE_PASSWORD:-}" ]] && kafka_conf_set "$KAFKA_CONFIG_FILE" "ssl.truststore.password" "$KAFKA_TLS_TRUSTSTORE_PASSWORD" [[ -n "${KAFKA_TLS_PEM_KEY_PASSWORD:-}" ]] && kafka_conf_set "$KAFKA_CONFIG_FILE" "ssl.key.password" "$KAFKA_TLS_PEM_KEY_PASSWORD" # Avoid errors caused by previous checks true } {{- end }} {{- if and .Values.tls.zookeeper.enabled .Values.tls.zookeeper.existingSecret }} configure_zookeeper_tls() { # Remove previously existing keystores rm -f /certs/zookeeper.keystore.jks /certs/zookeeper.truststore.jks ZOOKEEPER_TRUSTSTORE={{ printf "/zookeeper-certs/%s" .Values.tls.zookeeper.existingSecretTruststoreKey | quote }} ZOOKEEPER_KEYSTORE={{ printf "/zookeeper-certs/%s" .Values.tls.zookeeper.existingSecretKeystoreKey | quote }} if [[ -f "$ZOOKEEPER_KEYSTORE" ]]; then cp "$ZOOKEEPER_KEYSTORE" "/certs/zookeeper.keystore.jks" else error "Zookeeper keystore file not found" fi if [[ -f "$ZOOKEEPER_TRUSTSTORE" ]]; then cp "$ZOOKEEPER_TRUSTSTORE" "/certs/zookeeper.truststore.jks" else error "Zookeeper keystore file not found" fi [[ -n "${KAFKA_ZOOKEEPER_TLS_KEYSTORE_PASSWORD:-}" ]] && kafka_conf_set "$KAFKA_CONFIG_FILE" "zookeeper.ssl.keystore.password" "${KAFKA_ZOOKEEPER_TLS_KEYSTORE_PASSWORD}" [[ -n "${KAFKA_ZOOKEEPER_TLS_TRUSTSTORE_PASSWORD:-}" ]] && kafka_conf_set "$KAFKA_CONFIG_FILE" "zookeeper.ssl.truststore.password" "${KAFKA_ZOOKEEPER_TLS_TRUSTSTORE_PASSWORD}" # Avoid errors caused by previous checks true } {{- end }} {{- if (include "kafka.saslEnabled" .) }} configure_kafka_sasl() { # Replace placeholders with passwords {{- if regexFind "SASL" (upper .Values.listeners.interbroker.protocol) }} {{- if (include "kafka.saslUserPasswordsEnabled" .) }} replace_placeholder "interbroker-password-placeholder" "$KAFKA_INTER_BROKER_PASSWORD" {{- end }} {{- if (include "kafka.saslClientSecretsEnabled" .) }} replace_placeholder "interbroker-client-secret-placeholder" "$KAFKA_INTER_BROKER_CLIENT_SECRET" {{- end }} {{- end -}} {{- if and .Values.kraft.enabled (regexFind "SASL" (upper .Values.listeners.controller.protocol)) }} {{- if (include "kafka.saslUserPasswordsEnabled" .) }} replace_placeholder "controller-password-placeholder" "$KAFKA_CONTROLLER_PASSWORD" {{- end }} {{- if (include "kafka.saslClientSecretsEnabled" .) }} replace_placeholder "controller-client-secret-placeholder" "$KAFKA_CONTROLLER_CLIENT_SECRET" {{- end }} {{- end }} {{- if (include "kafka.client.saslEnabled" .)}} read -r -a passwords <<<"$(tr ',;' ' ' <<<"${KAFKA_CLIENT_PASSWORDS:-}")" for ((i = 0; i < ${#passwords[@]}; i++)); do replace_placeholder "password-placeholder-${i}" "${passwords[i]}" done {{- end }} {{- if .Values.sasl.zookeeper.user }} replace_placeholder "zookeeper-password-placeholder" "$KAFKA_ZOOKEEPER_PASSWORD" {{- end }} } {{- end }} {{- if .Values.externalAccess.autoDiscovery.enabled }} # Wait for autodiscovery to finish if [[ "${EXTERNAL_ACCESS_ENABLED:-false}" =~ ^(yes|true)$ ]]; then retry_while "test -f /shared/external-host.txt -o -f /shared/external-port.txt" || error "Timed out waiting for autodiscovery init-container" fi {{- end }} export KAFKA_CONFIG_FILE=/config/server.properties cp /configmaps/server.properties $KAFKA_CONFIG_FILE # Get pod ID and role, last and second last fields in the pod name respectively POD_ID=$(echo "$MY_POD_NAME" | rev | cut -d'-' -f 1 | rev) POD_ROLE=$(echo "$MY_POD_NAME" | rev | cut -d'-' -f 2 | rev) # Configure node.id and/or broker.id if [[ -f "/bitnami/kafka/data/meta.properties" ]]; then if grep -q "broker.id" /bitnami/kafka/data/meta.properties; then ID="$(grep "broker.id" /bitnami/kafka/data/meta.properties | awk -F '=' '{print $2}')" {{- if or (and .Values.kraft.enabled (not .Values.broker.zookeeperMigrationMode)) (and (not .Values.zookeeper.enabled) (not .Values.externalZookeeper.servers)) }} kafka_conf_set "$KAFKA_CONFIG_FILE" "node.id" "$ID" {{- else }} kafka_conf_set "$KAFKA_CONFIG_FILE" "broker.id" "$ID" {{- end }} else ID="$(grep "node.id" /bitnami/kafka/data/meta.properties | awk -F '=' '{print $2}')" kafka_conf_set "$KAFKA_CONFIG_FILE" "node.id" "$ID" fi else ID=$((POD_ID + KAFKA_MIN_ID)) {{- if .Values.kraft.enabled }} kafka_conf_set "$KAFKA_CONFIG_FILE" "node.id" "$ID" {{- end }} {{- if or .Values.zookeeper.enabled .Values.externalZookeeper.servers }} kafka_conf_set "$KAFKA_CONFIG_FILE" "broker.id" "$ID" {{- end }} fi {{- if not .Values.listeners.advertisedListeners }} replace_placeholder "advertised-address-placeholder" "${MY_POD_NAME}.{{ $fullname }}-${POD_ROLE}-headless.{{ $releaseNamespace }}.svc.{{ $clusterDomain }}" if [[ "${EXTERNAL_ACCESS_ENABLED:-false}" =~ ^(yes|true)$ ]]; then configure_external_access fi {{- end }} {{- if (include "kafka.sslEnabled" .) }} configure_kafka_tls {{- end }} {{- if (include "kafka.saslEnabled" .) }} configure_kafka_sasl {{- end }} {{- if and .Values.tls.zookeeper.enabled .Values.tls.zookeeper.existingSecret }} configure_zookeeper_tls {{- end }} if [ -f /secret-config/server-secret.properties ]; then append_file_to_kafka_conf /secret-config/server-secret.properties $KAFKA_CONFIG_FILE fi {{- include "common.tplvalues.render" ( dict "value" .Values.extraInit "context" $ ) | nindent 4 }}