cm-entrypoint.yaml 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206
  1. {{- $componet := include "kafka.broker.componet" . }}
  2. apiVersion: v1
  3. kind: ConfigMap
  4. metadata:
  5. name: {{ include "kafka.entrypoint.configmapName" . }}
  6. labels:
  7. {{- include "kafka.labels" $ | nindent 4 }}
  8. component: {{ $componet | quote }}
  9. data:
  10. entrypoint.sh: |
  11. #!/bin/bash
  12. export KAFKA_CONF_FILE="/etc/kafka/server.properties"
  13. export KAFKA_BASE_CONF_FILE="${KAFKA_BASE_CONF_FILE:=/etc/kafka/base/server.properties}"
  14. export KAFKA_CFG_LOG_DIR="$KAFKA_HOME/data"
  15. if [[ -z "$KAFKA_HOME" ]]; then
  16. export KAFKA_HOME="/opt/kafka"
  17. export KAFKA_CFG_LOG_DIR="$KAFKA_HOME/data"
  18. fi
  19. if [[ ! -d "$KAFKA_HOME" ]]; then
  20. mkdir -p "$KAFKA_HOME"
  21. fi
  22. check_runtime() {
  23. java -version
  24. if [[ $? -ne 0 ]]; then
  25. echo "[ERROR] Missing java"
  26. exit "500"
  27. fi
  28. }
  29. run_as_other_user_if_needed() {
  30. if [[ "$(id -u)" == "0" ]]; then
  31. # If running as root, drop to specified UID and run command
  32. exec chroot --userspec=1000:0 / "${@}"
  33. else
  34. # Either we are running in Openshift with random uid and are a member of the root group
  35. # or with a custom --user
  36. exec "${@}"
  37. fi
  38. }
  39. get_nodeid_from_suffix() {
  40. local line="$1"
  41. local index="${line##*-}"
  42. if [[ "$index" =~ ^[0-9]+$ ]]; then
  43. export KAFKA_CFG_NODE_ID="$index"
  44. if [[ "$KAFKA_NODE_ID_OFFSET" =~ ^[0-9]+$ ]]; then
  45. if [[ $KAFKA_NODE_ID_OFFSET -gt "0" ]]; then
  46. export KAFKA_CFG_NODE_ID="$((index + KAFKA_NODE_ID_OFFSET))"
  47. fi
  48. fi
  49. fi
  50. }
  51. fix_external_advertised_listeners() {
  52. if [[ -z "$KAFKA_EXTERNAL_SERVICE_TYPE" ]]; then
  53. return
  54. fi
  55. if [[ -z "$KAFKA_EXTERNAL_ADVERTISED_LISTENERS" ]]; then
  56. return
  57. fi
  58. local ext_listeners="$KAFKA_EXTERNAL_ADVERTISED_LISTENERS"
  59. local i="${POD_NAME##*-}"
  60. local listener=$(echo "$ext_listeners" | cut -d "," -f $((i+1)) | sed 's/ //g')
  61. if [[ "$KAFKA_EXTERNAL_SERVICE_TYPE" == "NodePort" ]]; then
  62. listener=$(echo "$listener" | sed -E "s%://[^:]*:%://${POD_HOST_IP}:%")
  63. fi
  64. if [[ "$listener" =~ ://[^:]*:[0-9]+$ ]]; then
  65. export KAFKA_CFG_ADVERTISED_LISTENERS="${KAFKA_CFG_ADVERTISED_LISTENERS},${listener}"
  66. echo "KAFKA_CFG_ADVERTISED_LISTENERS: $KAFKA_CFG_ADVERTISED_LISTENERS"
  67. else
  68. echo "[WARN] KAFKA_EXTERNAL_ADVERTISED_LISTENER invalid, value: [$listener]"
  69. fi
  70. }
  71. init_nodeid() {
  72. if [[ "$KAFKA_NODE_ID" =~ ^[0-9]+$ ]]; then
  73. export KAFKA_CFG_NODE_ID="$KAFKA_NODE_ID"
  74. return
  75. fi
  76. if [[ "$KAFKA_NODE_ID" = hostname* ]]; then
  77. get_nodeid_from_suffix "$HOSTNAME"
  78. elif [[ "$KAFKA_NODE_ID" = pod* ]]; then
  79. if [[ -n "$POD_NAME" ]]; then
  80. get_nodeid_from_suffix "$POD_NAME"
  81. fi
  82. fi
  83. if [[ -z "$KAFKA_CFG_NODE_ID" ]]; then
  84. export KAFKA_CFG_NODE_ID="1"
  85. fi
  86. }
  87. take_file_ownership() {
  88. if [[ "$(id -u)" == "0" ]]; then
  89. chown -R 1000:0 "$KAFKA_HOME"
  90. if [[ -d "$KAFKA_CFG_LOG_DIR" ]]; then
  91. chown -R 1000:0 "$KAFKA_CFG_LOG_DIR"
  92. fi
  93. fi
  94. }
  95. update_server_conf() {
  96. local key=$1
  97. local value=$2
  98. local pattern="$(echo $key | sed 's/\./\\./')"
  99. sed -i "/^${pattern} *=/d" "$KAFKA_CONF_FILE"
  100. echo "${key}=${value}" >> "$KAFKA_CONF_FILE"
  101. }
  102. set_kafka_cfg_default() {
  103. if [[ -z "$KAFKA_CFG_NODE_ID" ]]; then
  104. export KAFKA_CFG_NODE_ID="1"
  105. fi
  106. if [[ -z "$KAFKA_CFG_PROCESS_ROLES" ]]; then
  107. export KAFKA_CFG_PROCESS_ROLES="broker,controller"
  108. fi
  109. if [[ -z "$KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP" ]]; then
  110. export KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP="CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT"
  111. fi
  112. if [[ -z "$KAFKA_CFG_INTER_BROKER_LISTENER_NAME" ]]; then
  113. export KAFKA_CFG_INTER_BROKER_LISTENER_NAME="PLAINTEXT"
  114. fi
  115. if [[ -z "$KAFKA_CFG_CONTROLLER_LISTENER_NAMES" ]]; then
  116. export KAFKA_CFG_CONTROLLER_LISTENER_NAMES="CONTROLLER"
  117. fi
  118. if [[ -z "$KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR" ]]; then
  119. export KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR="1"
  120. fi
  121. if [[ -z "$KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR" ]]; then
  122. export KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR="1"
  123. fi
  124. if [[ -z "$KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISR" ]]; then
  125. export KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISR="1"
  126. fi
  127. ##
  128. ## KAFKA_CONTROLLER_LISTENER_PORT default value: 19091
  129. local ctl_port="${KAFKA_CONTROLLER_LISTENER_PORT-19091}"
  130. ## KAFKA_BROKER_LISTENER_PORT default value: 9092
  131. local broker_port="${KAFKA_BROKER_LISTENER_PORT-9092}"
  132. if [[ -z "$KAFKA_CFG_LISTENERS" ]]; then
  133. export KAFKA_CFG_LISTENERS="CONTROLLER://:${ctl_port},PLAINTEXT://:${broker_port}"
  134. fi
  135. if [[ -z "$KAFKA_CFG_CONTROLLER_QUORUM_VOTERS" ]]; then
  136. export KAFKA_CFG_CONTROLLER_QUORUM_VOTERS="${KAFKA_CFG_NODE_ID}@127.0.0.1:${ctl_port}"
  137. fi
  138. }
  139. init_server_conf() {
  140. init_nodeid
  141. set_kafka_cfg_default
  142. fix_external_advertised_listeners
  143. if [[ ! -f "$KAFKA_CONF_FILE" ]]; then
  144. mkdir -p "$(dirname $KAFKA_CONF_FILE)"
  145. if [[ -f "$KAFKA_BASE_CONF_FILE" ]]; then
  146. cat "$KAFKA_BASE_CONF_FILE" > $KAFKA_CONF_FILE
  147. fi
  148. touch "$KAFKA_CONF_FILE"
  149. fi
  150. for var in "${!KAFKA_CFG_@}"; do
  151. # printf '%s=%s\n' "$var" "${!var}"
  152. key="$(echo "$var" | sed -e 's/^KAFKA_CFG_//' -e 's/_/./g' | tr 'A-Z' 'a-z')"
  153. value="${!var}"
  154. update_server_conf "$key" "$value"
  155. done
  156. }
  157. reset_log_dirs() {
  158. ## protect log.dirs
  159. sed -i "/^log.dir *=/d" "$KAFKA_CONF_FILE"
  160. update_server_conf "log.dirs" "$KAFKA_CFG_LOG_DIR"
  161. }
  162. start_server() {
  163. check_runtime
  164. reset_log_dirs
  165. if [[ -n "$KAFKA_HEAP_OPTS" ]]; then
  166. export JAVA_TOOL_OPTIONS="${JAVA_TOOL_OPTIONS} ${KAFKA_HEAP_OPTS}"
  167. fi
  168. if [[ ! -f "$KAFKA_CFG_LOG_DIR/meta.properties" ]]; then
  169. echo ">>> Format Log Directories <<<"
  170. if [[ -z "$KAFKA_CLUSTER_ID" ]]; then
  171. echo "Generate a Cluster UUID"
  172. export KAFKA_CLUSTER_ID="$(${KAFKA_HOME}/bin/kafka-storage.sh random-uuid)"
  173. fi
  174. cat "$KAFKA_CONF_FILE"
  175. if [[ "$(id -u)" == "0" ]]; then
  176. chroot --userspec=1000:0 / ${KAFKA_HOME}/bin/kafka-storage.sh format \
  177. -t $KAFKA_CLUSTER_ID -c "$KAFKA_CONF_FILE"
  178. else
  179. ${KAFKA_HOME}/bin/kafka-storage.sh format \
  180. -t $KAFKA_CLUSTER_ID -c "$KAFKA_CONF_FILE"
  181. fi
  182. fi
  183. run_as_other_user_if_needed "${KAFKA_HOME}/bin/kafka-server-start.sh" "$KAFKA_CONF_FILE"
  184. }
  185. init_server_conf
  186. take_file_ownership
  187. if [[ "$@" = "start" ]]; then
  188. start_server
  189. else
  190. exec "$@"
  191. fi