123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206 |
- {{- $componet := include "kafka.broker.componet" . }}
- apiVersion: v1
- kind: ConfigMap
- metadata:
- name: {{ include "kafka.entrypoint.configmapName" . }}
- labels:
- {{- include "kafka.labels" $ | nindent 4 }}
- component: {{ $componet | quote }}
- data:
- entrypoint.sh: |
- #!/bin/bash
-
- export KAFKA_CONF_FILE="/etc/kafka/server.properties"
- export KAFKA_BASE_CONF_FILE="${KAFKA_BASE_CONF_FILE:=/etc/kafka/base/server.properties}"
- export KAFKA_CFG_LOG_DIR="$KAFKA_HOME/data"
-
- if [[ -z "$KAFKA_HOME" ]]; then
- export KAFKA_HOME="/opt/kafka"
- export KAFKA_CFG_LOG_DIR="$KAFKA_HOME/data"
- fi
- if [[ ! -d "$KAFKA_HOME" ]]; then
- mkdir -p "$KAFKA_HOME"
- fi
-
- check_runtime() {
- java -version
- if [[ $? -ne 0 ]]; then
- echo "[ERROR] Missing java"
- exit "500"
- fi
- }
-
- run_as_other_user_if_needed() {
- if [[ "$(id -u)" == "0" ]]; then
- # If running as root, drop to specified UID and run command
- exec chroot --userspec=1000:0 / "${@}"
- else
- # Either we are running in Openshift with random uid and are a member of the root group
- # or with a custom --user
- exec "${@}"
- fi
- }
-
- get_nodeid_from_suffix() {
- local line="$1"
- local index="${line##*-}"
- if [[ "$index" =~ ^[0-9]+$ ]]; then
- export KAFKA_CFG_NODE_ID="$index"
- if [[ "$KAFKA_NODE_ID_OFFSET" =~ ^[0-9]+$ ]]; then
- if [[ $KAFKA_NODE_ID_OFFSET -gt "0" ]]; then
- export KAFKA_CFG_NODE_ID="$((index + KAFKA_NODE_ID_OFFSET))"
- fi
- fi
- fi
- }
-
- fix_external_advertised_listeners() {
- if [[ -z "$KAFKA_EXTERNAL_SERVICE_TYPE" ]]; then
- return
- fi
- if [[ -z "$KAFKA_EXTERNAL_ADVERTISED_LISTENERS" ]]; then
- return
- fi
- local ext_listeners="$KAFKA_EXTERNAL_ADVERTISED_LISTENERS"
- local i="${POD_NAME##*-}"
- local listener=$(echo "$ext_listeners" | cut -d "," -f $((i+1)) | sed 's/ //g')
- if [[ "$KAFKA_EXTERNAL_SERVICE_TYPE" == "NodePort" ]]; then
- listener=$(echo "$listener" | sed -E "s%://[^:]*:%://${POD_HOST_IP}:%")
- fi
- if [[ "$listener" =~ ://[^:]*:[0-9]+$ ]]; then
- export KAFKA_CFG_ADVERTISED_LISTENERS="${KAFKA_CFG_ADVERTISED_LISTENERS},${listener}"
- echo "KAFKA_CFG_ADVERTISED_LISTENERS: $KAFKA_CFG_ADVERTISED_LISTENERS"
- else
- echo "[WARN] KAFKA_EXTERNAL_ADVERTISED_LISTENER invalid, value: [$listener]"
- fi
- }
-
- init_nodeid() {
- if [[ "$KAFKA_NODE_ID" =~ ^[0-9]+$ ]]; then
- export KAFKA_CFG_NODE_ID="$KAFKA_NODE_ID"
- return
- fi
- if [[ "$KAFKA_NODE_ID" = hostname* ]]; then
- get_nodeid_from_suffix "$HOSTNAME"
- elif [[ "$KAFKA_NODE_ID" = pod* ]]; then
- if [[ -n "$POD_NAME" ]]; then
- get_nodeid_from_suffix "$POD_NAME"
- fi
- fi
- if [[ -z "$KAFKA_CFG_NODE_ID" ]]; then
- export KAFKA_CFG_NODE_ID="1"
- fi
- }
-
- take_file_ownership() {
- if [[ "$(id -u)" == "0" ]]; then
- chown -R 1000:0 "$KAFKA_HOME"
- if [[ -d "$KAFKA_CFG_LOG_DIR" ]]; then
- chown -R 1000:0 "$KAFKA_CFG_LOG_DIR"
- fi
- fi
- }
-
- update_server_conf() {
- local key=$1
- local value=$2
- local pattern="$(echo $key | sed 's/\./\\./')"
- sed -i "/^${pattern} *=/d" "$KAFKA_CONF_FILE"
- echo "${key}=${value}" >> "$KAFKA_CONF_FILE"
- }
-
- set_kafka_cfg_default() {
- if [[ -z "$KAFKA_CFG_NODE_ID" ]]; then
- export KAFKA_CFG_NODE_ID="1"
- fi
- if [[ -z "$KAFKA_CFG_PROCESS_ROLES" ]]; then
- export KAFKA_CFG_PROCESS_ROLES="broker,controller"
- fi
- if [[ -z "$KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP" ]]; then
- export KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP="CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT"
- fi
- if [[ -z "$KAFKA_CFG_INTER_BROKER_LISTENER_NAME" ]]; then
- export KAFKA_CFG_INTER_BROKER_LISTENER_NAME="PLAINTEXT"
- fi
- if [[ -z "$KAFKA_CFG_CONTROLLER_LISTENER_NAMES" ]]; then
- export KAFKA_CFG_CONTROLLER_LISTENER_NAMES="CONTROLLER"
- fi
- if [[ -z "$KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR" ]]; then
- export KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR="1"
- fi
- if [[ -z "$KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR" ]]; then
- export KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR="1"
- fi
- if [[ -z "$KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISR" ]]; then
- export KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISR="1"
- fi
- ##
- ## KAFKA_CONTROLLER_LISTENER_PORT default value: 19091
- local ctl_port="${KAFKA_CONTROLLER_LISTENER_PORT-19091}"
- ## KAFKA_BROKER_LISTENER_PORT default value: 9092
- local broker_port="${KAFKA_BROKER_LISTENER_PORT-9092}"
- if [[ -z "$KAFKA_CFG_LISTENERS" ]]; then
- export KAFKA_CFG_LISTENERS="CONTROLLER://:${ctl_port},PLAINTEXT://:${broker_port}"
- fi
- if [[ -z "$KAFKA_CFG_CONTROLLER_QUORUM_VOTERS" ]]; then
- export KAFKA_CFG_CONTROLLER_QUORUM_VOTERS="${KAFKA_CFG_NODE_ID}@127.0.0.1:${ctl_port}"
- fi
- }
-
- init_server_conf() {
- init_nodeid
- set_kafka_cfg_default
- fix_external_advertised_listeners
- if [[ ! -f "$KAFKA_CONF_FILE" ]]; then
- mkdir -p "$(dirname $KAFKA_CONF_FILE)"
- if [[ -f "$KAFKA_BASE_CONF_FILE" ]]; then
- cat "$KAFKA_BASE_CONF_FILE" > $KAFKA_CONF_FILE
- fi
- touch "$KAFKA_CONF_FILE"
- fi
- for var in "${!KAFKA_CFG_@}"; do
- # printf '%s=%s\n' "$var" "${!var}"
- key="$(echo "$var" | sed -e 's/^KAFKA_CFG_//' -e 's/_/./g' | tr 'A-Z' 'a-z')"
- value="${!var}"
- update_server_conf "$key" "$value"
- done
- }
-
- reset_log_dirs() {
- ## protect log.dirs
- sed -i "/^log.dir *=/d" "$KAFKA_CONF_FILE"
- update_server_conf "log.dirs" "$KAFKA_CFG_LOG_DIR"
- }
-
- start_server() {
- check_runtime
- reset_log_dirs
- if [[ -n "$KAFKA_HEAP_OPTS" ]]; then
- export JAVA_TOOL_OPTIONS="${JAVA_TOOL_OPTIONS} ${KAFKA_HEAP_OPTS}"
- fi
- if [[ ! -f "$KAFKA_CFG_LOG_DIR/meta.properties" ]]; then
- echo ">>> Format Log Directories <<<"
- if [[ -z "$KAFKA_CLUSTER_ID" ]]; then
- echo "Generate a Cluster UUID"
- export KAFKA_CLUSTER_ID="$(${KAFKA_HOME}/bin/kafka-storage.sh random-uuid)"
- fi
- cat "$KAFKA_CONF_FILE"
- if [[ "$(id -u)" == "0" ]]; then
- chroot --userspec=1000:0 / ${KAFKA_HOME}/bin/kafka-storage.sh format \
- -t $KAFKA_CLUSTER_ID -c "$KAFKA_CONF_FILE"
- else
- ${KAFKA_HOME}/bin/kafka-storage.sh format \
- -t $KAFKA_CLUSTER_ID -c "$KAFKA_CONF_FILE"
- fi
- fi
- run_as_other_user_if_needed "${KAFKA_HOME}/bin/kafka-server-start.sh" "$KAFKA_CONF_FILE"
- }
-
- init_server_conf
- take_file_ownership
- if [[ "$@" = "start" ]]; then
- start_server
- else
- exec "$@"
- fi
|