123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221 |
- apiVersion: apps/v1
- kind: StatefulSet
- metadata:
- name: {{ include "kafka.fullname" . }}
- labels:
- {{- include "kafka.labels" . | nindent 4 }}
- spec:
- selector:
- matchLabels:
- {{- include "kafka.selectorLabels" . | nindent 6 }}
- serviceName: {{ include "kafka.fullname" . }}-headless
- replicas: {{ .Values.replicaCount }}
- updateStrategy:
- type: RollingUpdate
- podManagementPolicy: Parallel
- template:
- metadata:
- labels:
- {{- include "kafka.selectorLabels" . | nindent 8 }}
- spec:
- serviceAccountName: {{ include "kafka.fullname" . }}-sa
- securityContext:
- {{- toYaml .Values.podSecurityContext | nindent 8 }}
- affinity:
- {{- if .Values.affinity }}
- {{ toYaml .Values.affinity | indent 8 }}
- {{- else }}
- podAntiAffinity:
- preferredDuringSchedulingIgnoredDuringExecution:
- - podAffinityTerm:
- labelSelector:
- matchExpressions:
- - key: app
- operator: In
- values:
- - {{ include "kafka.name" . }}
- topologyKey: kubernetes.io/hostname
- weight: 1
- {{- end }}
- containers:
- - name: {{ .Chart.Name }}
- {{- with .Values.imagePullSecrets }}
- imagePullSecrets:
- {{- toYaml . | nindent 8 }}
- {{- end }}
- imagePullPolicy: {{ .Values.image.pullPolicy }}
- image: "{{ .Values.image.registry }}/{{ .Values.image.repository }}:{{ .Values.image.tag | default .Chart.AppVersion }}"
- env:
- - name: POD_IP
- valueFrom:
- fieldRef:
- apiVersion: v1
- fieldPath: status.podIP
- - name: HOST_IP
- valueFrom:
- fieldRef:
- apiVersion: v1
- fieldPath: status.hostIP
- - name: POD_NAME
- valueFrom:
- fieldRef:
- apiVersion: v1
- fieldPath: metadata.name
- - name: POD_NAMESPACE
- valueFrom:
- fieldRef:
- apiVersion: v1
- fieldPath: metadata.namespace
- - name: KAFKA_ZOOKEEPER_CONNECT
- value: {{ include "kafka.zookeeper.ensemble" . | quote }}
- - name: KAFKA_HEAP_OPTS
- value: {{ .Values.heapOpts | quote }}
- - name: KAFKA_CONFLUENT_SUPPORT_METRICS_ENABLE
- value: {{ .Values.confluentSupportMetricsEnable | quote }}
- - name: KAFKA_LOG_DIRS
- value: "/var/lib/kafka/data"
- - name: KAFKA_AUTO_CREATE_TOPICS_ENABLE
- value: {{ .Values.autoCreateTopicsEnable | quote }}
- - name: KAFKA_DELETE_TOPIC_ENABLE
- value: {{ .Values.deleteTopicEnable | quote }}
- - name: KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR
- value: {{ .Values.offsetsTopicReplicationFactor | int | quote }}
- - name: KAFKA_NUM_PARTITIONS
- value: {{ .Values.numPartitions | int | quote }}
- - name: KAFKA_DEFAULT_REPLICATION_FACTOR
- value: {{ .Values.defaultReplicationFactor | int | quote }}
- - name: KAFKA_MIN_INSYNC_REPLICAS
- value: {{ .Values.minInsyncReplicas | int | quote }}
- - name: KAFKA_UNCLEAN_LEADER_ELECTION.ENABLE
- value: {{ .Values.uncleanLeaderElectionEnable | quote }}
- - name: KAFKA_LOG_FLUSH_INTERVAL_MESSAGES
- value: {{ .Values.logFlushIntervalMessages | int | quote }}
- - name: KAFKA_LOG_FLUSH_INTERVAL_MS
- value: {{ .Values.logFlushIntervalMs | int | quote }}
- - name: KAFKA_LOG_RETENTION_BYTES
- value: {{ .Values.logRetentionBytes | int | quote }}
- - name: KAFKA_LOG_RETENTION_CHECK_INTERVAL_MS
- value: {{ .Values.logRetentionCheckIntervalMs | int | quote }}
- - name: KAFKA_LOG_RETENTION_HOURS
- value: {{ .Values.logRetentionHours | int | quote }}
- - name: KAFKA_LOG_SEGMENT_BYTES
- value: {{ .Values.logSegmentBytes | int | quote }}
- - name: KAFKA_MESSAGE_MAX_BYTES
- value: {{ .Values.messageMaxBytes | int | quote }}
- - name: KAFKA_LOG4J_ROOT_LOGLEVEL
- value: {{ .Values.log4jRootLoglevel | quote }}
- - name: KAFKA_LOG4J_LOGGERS
- value: {{ .Values.log4jLoggers | quote }}
- {{- if .Values.kerberos.enabled }}
- - name: KAFKA_SECURITY_INTER_BROKER_PROTOCOL
- value: SASL_PLAINTEXT
- - name: KAFKA_SASL_KERBEROS_SERVICE_NAME
- value: "kafka"
- - name: KAFKA_LISTENERS
- value: "SASL_PLAINTEXT://:{{ .Values.port.kafkaInternal }},EXTERNAL://:{{ .Values.port.kafkaExternal }}"
- - name: KAFKA_LISTENER_SECURITY_PROTOCOL_MAP
- value: "SASL_PLAINTEXT:SASL_PLAINTEXT,EXTERNAL:SASL_PLAINTEXT"
- {{- else }}
- - name: KAFKA_LISTENERS
- value: "PLAINTEXT://:{{ .Values.port.kafkaInternal }},EXTERNAL://:{{ .Values.port.kafkaExternal }}"
- - name: KAFKA_LISTENER_SECURITY_PROTOCOL_MAP
- value: "PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT"
- {{- end }}
- - name: ZOOKEEPER_SASL_ENABLED
- value: {{ .Values.zookeeper.kerberos.enabled | quote }}
- {{- if .Values.acls.enabled }}
- - name: KAFKA_ZOOKEEPER_SET_ACL
- value: "true"
- - name: KAFKA_SUPER_USERS
- value: "User:kafka"
- - name: KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND
- value: "false"
- - name: KAFKA_AUTHORIZER_CLASS_NAME
- value: "kafka.security.authorizer.AclAuthorizer"
- {{- end }}
- command:
- - "sh"
- - "-exc"
- - |
- export KAFKA_BROKER_ID=${HOSTNAME##*-} && \
- {{- if .Values.isDocker }}
- export KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://{{ include "kafka.listener" . }}:{{ .Values.port.kafkaInternal }},EXTERNAL://172.24.100.101:$(({{ .Values.externalAccess.initNodePort }} + ${KAFKA_BROKER_ID})) && \
- {{- else if .Values.kerberos.enabled }}
- export KAFKA_ADVERTISED_LISTENERS=SASL_PLAINTEXT://{{ include "kafka.listener" . }}:{{ .Values.port.kafkaInternal }},EXTERNAL://{{ include "kafka.listener" . }}:$(({{ .Values.externalAccess.initNodePort }} + ${KAFKA_BROKER_ID})) && \
- export KAFKA_OPTS="-Djava.security.krb5.conf=/etc/krb5.conf -Djava.security.auth.login.config=/jaas/${POD_NAME}_jaas.conf -Dsun.security.krb5.debug=false" && \
- {{- else }}
- export KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://{{ include "kafka.listener" . }}:{{ .Values.port.kafkaInternal }},EXTERNAL://${HOST_IP}:$(({{ .Values.externalAccess.initNodePort }} + ${KAFKA_BROKER_ID})) && \
- {{- end }}
- exec /etc/confluent/docker/run
- ports:
- - name: tcp-kafka-int
- containerPort: {{ .Values.port.kafkaInternal }}
- - name: tcp-kafka-ext
- containerPort: {{ .Values.port.kafkaExternal }}
- {{- if .Values.livenessProbe.enabled }}
- livenessProbe:
- tcpSocket:
- port: tcp-kafka-int
- initialDelaySeconds: {{ .Values.livenessProbe.initialDelaySeconds }}
- periodSeconds: {{ .Values.livenessProbe.periodSeconds }}
- timeoutSeconds: {{ .Values.livenessProbe.timeoutSeconds }}
- successThreshold: {{ .Values.livenessProbe.successThreshold }}
- failureThreshold: {{ .Values.livenessProbe.failureThreshold }}
- {{- end }}
- {{- if .Values.readinessProbe.enabled }}
- readinessProbe:
- tcpSocket:
- port: tcp-kafka-int
- initialDelaySeconds: {{ .Values.readinessProbe.initialDelaySeconds }}
- periodSeconds: {{ .Values.readinessProbe.periodSeconds }}
- timeoutSeconds: {{ .Values.readinessProbe.timeoutSeconds }}
- successThreshold: {{ .Values.readinessProbe.successThreshold }}
- failureThreshold: {{ .Values.readinessProbe.failureThreshold }}
- {{- end }}
- volumeMounts:
- - name: data
- mountPath: /var/lib/kafka/data
- - name: config
- mountPath: /etc/kafka
- - name: logs
- mountPath: /var/log
- {{- if or .Values.kerberos.enabled .Values.zookeeper.kerberos.enabled }}
- - name: krb5
- mountPath: /etc/krb5.conf
- subPath: krb5.conf
- - name: keytabs
- mountPath: /keytabs
- readOnly: true
- - name: jaas
- mountPath: /jaas
- readOnly: true
- {{- end }}
- securityContext:
- {{- toYaml .Values.securityContext | nindent 12 }}
- resources:
- {{- toYaml .Values.resources | nindent 12 }}
- volumes:
- - name: config
- emptyDir: {}
- - name: logs
- emptyDir: {}
- {{- if .Values.kerberos.enabled }}
- - name: krb5
- configMap:
- name: {{ required "The .Values.kerberos.krb5Conf is required when kerberos enabled!" .Values.kerberos.krb5Conf }}
- - name: keytabs
- secret:
- secretName: {{ required "The .Values.kerberos.keyTabSecret is required when kerberos enabled!" .Values.kerberos.keyTabSecret }}
- - name: jaas
- configMap:
- name: {{ required "The .Values.kerberos.jaasConf is required when kerberos enabled!" .Values.kerberos.jaasConf }}
- {{- end }}
- volumeClaimTemplates:
- - metadata:
- name: data
- spec:
- storageClassName: {{ .Values.data.storageClass | quote }}
- accessModes: [ "ReadWriteOnce" ]
- resources:
- requests:
- storage: {{ .Values.data.storageSize | quote }}
|