_kafka-env.tpl 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244
  1. {{/*
  2. KAFKA broker domainSuffix
  3. */}}
  4. {{- define "kafka.broker.domainSuffix" -}}
  5. {{- $serviceName := include "kafka.broker.headless.serviceName" . -}}
  6. {{- $namespace := .Release.Namespace -}}
  7. {{- $clusterDomain := ( include "kafka.clusterDomain" .) -}}
  8. {{- printf "%s.%s.svc.%s" $serviceName $namespace $clusterDomain -}}
  9. {{- end -}}
  10. {{/*
  11. KAFKA_CFG_CONTROLLER_QUORUM_VOTERS
  12. */}}
  13. {{- define "kafka.controller.quorum.voters" -}}
  14. {{- $controllerReplicaCount := int .Values.controller.replicaCount }}
  15. {{- $controllerFullName := include "kafka.controller.fullname" . }}
  16. {{- $serviceName := include "kafka.controller.headless.serviceName" . }}
  17. {{- if (eq (include "kafka.combinedMode" .) "true") }}
  18. {{- $controllerReplicaCount = int .Values.broker.replicaCount }}
  19. {{- $controllerFullName = include "kafka.broker.fullname" . }}
  20. {{- $serviceName = include "kafka.broker.headless.serviceName" . }}
  21. {{- end }}
  22. {{- $namespace := .Release.Namespace -}}
  23. {{- $clusterDomain := ( include "kafka.clusterDomain" .) -}}
  24. {{- $port := int .Values.containerPort.controller }}
  25. {{- $suffix := printf "%s.%s.svc.%s:%d" $serviceName $namespace $clusterDomain $port -}}
  26. {{- $servers := list -}}
  27. {{- range $i := until $controllerReplicaCount -}}
  28. {{- $servers = printf "%d@%s-%d.%s" $i $controllerFullName $i $suffix | append $servers -}}
  29. {{- end -}}
  30. {{ join "," $servers }}
  31. {{- end -}}
  32. {{/*
  33. KAFKA Broker Componet label
  34. */}}
  35. {{- define "kafka.broker.componet" -}}
  36. {{- if (eq (include "kafka.combinedMode" .) "true") -}}
  37. {{- print "broker_controller" -}}
  38. {{- else -}}
  39. {{- print "broker" -}}
  40. {{- end -}}
  41. {{- end -}}
  42. {{/*
  43. controller env
  44. */}}
  45. {{- define "kafka.controller.containerEnv" -}}
  46. - name: KAFKA_HEAP_OPTS
  47. value: {{ .Values.controller.heapOpts | quote }}
  48. - name: KAFKA_CFG_PROCESS_ROLES
  49. value: controller
  50. - name: KAFKA_CFG_LISTENERS
  51. value: "CONTROLLER://0.0.0.0:{{ .Values.containerPort.controller }}"
  52. - name: KAFKA_CFG_CONTROLLER_LISTENER_NAMES
  53. value: CONTROLLER
  54. - name: KAFKA_CFG_CONTROLLER_QUORUM_VOTERS
  55. value: {{ include "kafka.controller.quorum.voters" . }}
  56. - name: KAFKA_CLUSTER_ID
  57. valueFrom:
  58. secretKeyRef:
  59. name: {{ include "kafka.clusterId.SecretName" . }}
  60. key: clusterId
  61. - name: KAFKA_NODE_ID
  62. value: "podnameSuffix"
  63. - name: POD_NAME
  64. valueFrom:
  65. fieldRef:
  66. fieldPath: metadata.name
  67. {{- with .Values.controller.extraEnvs }}
  68. {{- toYaml . | nindent 0 }}
  69. {{- end }}
  70. {{- end }}
  71. {{/*
  72. broker env
  73. */}}
  74. {{- define "kafka.broker.containerEnv" -}}
  75. - name: POD_HOST_IP
  76. valueFrom:
  77. fieldRef:
  78. fieldPath: status.hostIP
  79. - name: POD_IP
  80. valueFrom:
  81. fieldRef:
  82. fieldPath: status.podIP
  83. - name: POD_NAME
  84. valueFrom:
  85. fieldRef:
  86. fieldPath: metadata.name
  87. - name: KAFKA_HEAP_OPTS
  88. value: {{ .Values.broker.heapOpts | quote }}
  89. - name: KAFKA_CFG_PROCESS_ROLES
  90. {{- if (eq (include "kafka.combinedMode" .) "true") }}
  91. value: "broker,controller"
  92. {{- else }}
  93. value: "broker"
  94. {{- end }}
  95. - name: KAFKA_CFG_LISTENERS
  96. {{- if (eq (include "kafka.combinedMode" .) "true") }}
  97. value: "BROKER://0.0.0.0:{{ .Values.containerPort.broker }},EXTERNAL://0.0.0.0:{{ .Values.containerPort.brokerExternal }},CONTROLLER://0.0.0.0:{{ .Values.containerPort.controller }}"
  98. {{- else }}
  99. value: "BROKER://0.0.0.0:{{ .Values.containerPort.broker }},EXTERNAL://0.0.0.0:{{ .Values.containerPort.brokerExternal }}"
  100. {{- end }}
  101. - name: KAFKA_CFG_ADVERTISED_LISTENERS
  102. {{- $domainSuffix := (include "kafka.broker.domainSuffix" .) }}
  103. value: "BROKER://$(POD_NAME).{{ $domainSuffix }}:{{ .Values.containerPort.broker }}"
  104. {{- if .Values.broker.external.enabled }}
  105. - name: KAFKA_EXTERNAL_SERVICE_TYPE
  106. value: {{ .Values.broker.external.service.type | quote }}
  107. - name: KAFKA_EXTERNAL_ADVERTISED_LISTENERS
  108. value: {{ (include "kafka.external.advertisedListeners" .) | quote }}
  109. {{- end }}
  110. - name: KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP
  111. value: CONTROLLER:PLAINTEXT,BROKER:PLAINTEXT,EXTERNAL:PLAINTEXT
  112. - name: KAFKA_CFG_INTER_BROKER_LISTENER_NAME
  113. value: BROKER
  114. - name: KAFKA_CFG_CONTROLLER_LISTENER_NAMES
  115. value: CONTROLLER
  116. - name: KAFKA_CFG_CONTROLLER_QUORUM_VOTERS
  117. value: {{ include "kafka.controller.quorum.voters" . }}
  118. {{- $replicaCount := .Values.broker.replicaCount | int }}
  119. {{- if and $replicaCount (ge $replicaCount 3) }}
  120. - name: KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR
  121. value: "{{ index .Values.broker.config "offsets.topic.replication.factor" | default "3" }}"
  122. - name: KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR
  123. value: "{{ index .Values.broker.config "transaction.state.log.replication.factor" | default "3" }}"
  124. - name: KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISR
  125. value: "{{ index .Values.broker.config "transaction.state.log.min.isr" | default "2" }}"
  126. {{- end }}
  127. - name: KAFKA_CLUSTER_ID
  128. valueFrom:
  129. secretKeyRef:
  130. name: {{ include "kafka.clusterId.SecretName" . }}
  131. key: clusterId
  132. - name: KAFKA_NODE_ID
  133. value: "podnameSuffix"
  134. - name: KAFKA_BASE_CONF_FILE
  135. value: {{ include "kafka.broker.baseConfigFile" . }}
  136. {{- if (eq (include "kafka.combinedMode" .) "false") }}
  137. - name: KAFKA_NODE_ID_OFFSET
  138. value: "1000"
  139. {{- end }}
  140. {{- with .Values.broker.extraEnvs }}
  141. {{- toYaml . | nindent 0 }}
  142. {{- end }}
  143. {{- end }}
  144. {{/*
  145. broker container ports
  146. */}}
  147. {{- define "kafka.broker.containerPorts" -}}
  148. - containerPort: {{ .Values.containerPort.broker }}
  149. name: broker
  150. protocol: TCP
  151. {{- if .Values.broker.external.enabled }}
  152. - containerPort: {{ .Values.containerPort.brokerExternal }}
  153. name: external
  154. protocol: TCP
  155. {{- end }}
  156. {{- if (eq (include "kafka.combinedMode" .) "true") }}
  157. - containerPort: {{ .Values.containerPort.controller }}
  158. name: controller
  159. protocol: TCP
  160. {{- end }}
  161. {{- end }}
  162. {{/*
  163. kafka fullNodePorts
  164. */}}
  165. {{- define "kafka.fullNodePorts" -}}
  166. {{- $fullNodePorts := list -}}
  167. {{- $nodeports := .Values.broker.external.nodePorts -}}
  168. {{- $replicaCount := .Values.broker.replicaCount | int -}}
  169. {{- $nodeport := (index $nodeports 0) | int -}}
  170. {{- $indexLastNodeport := (sub (len $nodeports) 1) | int -}}
  171. {{- $lastNodeport := (index $nodeports $indexLastNodeport) | int -}}
  172. {{- range $i := until $replicaCount -}}
  173. {{- if le $i $indexLastNodeport -}}
  174. {{- $nodeport = (index $nodeports $i) | int -}}
  175. {{- else -}}
  176. {{- $nodeport = (add (sub $i $indexLastNodeport) $lastNodeport) | int -}}
  177. {{- end -}}
  178. {{- $fullNodePorts = printf "%d" $nodeport | append $fullNodePorts -}}
  179. {{- end -}}
  180. {{ join "," $fullNodePorts }}
  181. {{- end -}}
  182. {{/*
  183. KAFKA BOOTSTRAPSERVERS
  184. */}}
  185. {{- define "kafka.bootstrapServers" -}}
  186. {{- $brokerFullName := include "kafka.broker.fullname" . -}}
  187. {{- $domainSuffix := (include "kafka.broker.domainSuffix" .) -}}
  188. {{- $brokerPort := .Values.containerPort.broker | int -}}
  189. {{- $servers := list -}}
  190. {{- $brokerReplicaCount := int .Values.broker.replicaCount -}}
  191. {{- range $i := until $brokerReplicaCount -}}
  192. {{- $servers = printf "%s-%d.%s:%d" $brokerFullName $i $domainSuffix $brokerPort | append $servers -}}
  193. {{- end -}}
  194. {{ join "," $servers }}
  195. {{- end -}}
  196. {{/*
  197. KAFKA External BOOTSTRAPSERVERS
  198. */}}
  199. {{- define "kafka.external.bootstrapServers" -}}
  200. {{- $servers := list -}}
  201. {{- $brokerFullname := include "kafka.broker.fullname" . -}}
  202. {{- $servicePort := .Values.broker.external.service.port | int -}}
  203. {{- $domain := .Values.broker.external.domainSuffix -}}
  204. {{- $hosts := .Values.broker.external.hosts -}}
  205. {{- $indexLastHosts:= (sub (len $hosts) 1) | int -}}
  206. {{- $replicaCount := .Values.broker.replicaCount | int -}}
  207. {{- $nodePorts := list -}}
  208. {{- $nodePortServers := list -}}
  209. {{- range (include "kafka.fullNodePorts" . | split ",") -}}
  210. {{- $nodePorts = printf "%s" . | append $nodePorts -}}
  211. {{- end -}}
  212. {{- range $i := until $replicaCount -}}
  213. {{- if le $i $indexLastHosts -}}
  214. {{- $servers = printf "%s:%d" (index $hosts $i) $servicePort | append $servers -}}
  215. {{- else -}}
  216. {{- $servers = printf "%s-%d%s:%d" $brokerFullname $i $domain $servicePort | append $servers -}}
  217. {{- end -}}
  218. {{- $nodePortServers = printf "KUBERNETES_NODE_IP_%d:%d" $i (index $nodePorts $i | int) | append $nodePortServers -}}
  219. {{- end -}}
  220. {{- if eq .Values.broker.external.service.type "NodePort" -}}
  221. {{ join "," $nodePortServers }}
  222. {{- else if eq .Values.broker.external.service.type "LoadBalancer" -}}
  223. {{ join "," $servers }}
  224. {{- end -}}
  225. {{- end -}}
  226. {{/*
  227. KAFKA External ADVERTISED_LISTENER
  228. */}}
  229. {{- define "kafka.external.advertisedListeners" -}}
  230. {{- $addrList := list -}}
  231. {{- range (include "kafka.external.bootstrapServers" . | split ",") -}}
  232. {{- $addrList = printf "EXTERNAL://%s" . | append $addrList -}}
  233. {{- end -}}
  234. {{ join "," $addrList }}
  235. {{- end -}}