123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199 |
- #!/usr/bin/env bash
- ################################################################################
- # Licensed to the Apache Software Foundation (ASF) under one
- # or more contributor license agreements. See the NOTICE file
- # distributed with this work for additional information
- # regarding copyright ownership. The ASF licenses this file
- # to you under the Apache License, Version 2.0 (the
- # "License"); you may not use this file except in compliance
- # with the License. You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- ################################################################################
- # Start/stop a Flink daemon.
- USAGE="Usage: flink-daemon.sh (start|stop|stop-all) (taskexecutor|zookeeper|historyserver|standalonesession|standalonejob|sql-gateway) [args]"
- STARTSTOP=$1
- DAEMON=$2
- ARGS=("${@:3}") # get remaining arguments as array
- bin=`dirname "$0"`
- bin=`cd "$bin"; pwd`
- . "$bin"/config.sh
- case $DAEMON in
- (taskexecutor)
- CLASS_TO_RUN=org.apache.flink.runtime.taskexecutor.TaskManagerRunner
- ;;
- (zookeeper)
- CLASS_TO_RUN=org.apache.flink.runtime.zookeeper.FlinkZooKeeperQuorumPeer
- ;;
- (historyserver)
- CLASS_TO_RUN=org.apache.flink.runtime.webmonitor.history.HistoryServer
- ;;
- (standalonesession)
- CLASS_TO_RUN=org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint
- ;;
- (standalonejob)
- CLASS_TO_RUN=org.apache.flink.container.entrypoint.StandaloneApplicationClusterEntryPoint
- ;;
- (sql-gateway)
- CLASS_TO_RUN=org.apache.flink.table.gateway.SqlGateway
- SQL_GATEWAY_CLASSPATH="`findSqlGatewayJar`":"`findFlinkPythonJar`"
- ;;
- (*)
- echo "Unknown daemon '${DAEMON}'. $USAGE."
- exit 1
- ;;
- esac
- if [ "$FLINK_IDENT_STRING" = "" ]; then
- FLINK_IDENT_STRING="$USER"
- fi
- FLINK_TM_CLASSPATH=`constructFlinkClassPath`
- pid=$FLINK_PID_DIR/flink-$FLINK_IDENT_STRING-$DAEMON.pid
- mkdir -p "$FLINK_PID_DIR"
- # Log files for daemons are indexed from the process ID's position in the PID
- # file. The following lock prevents a race condition during daemon startup
- # when multiple daemons read, index, and write to the PID file concurrently.
- # The lock is created on the PID directory since a lock file cannot be safely
- # removed. The daemon is started with the lock closed and the lock remains
- # active in this script until the script exits.
- command -v flock >/dev/null 2>&1
- if [[ $? -eq 0 ]]; then
- exec 200<"$FLINK_PID_DIR"
- flock 200
- fi
- # Ascending ID depending on number of lines in pid file.
- # This allows us to start multiple daemon of each type.
- id=$([ -f "$pid" ] && echo $(wc -l < "$pid") || echo "0")
- FLINK_LOG_PREFIX="${FLINK_LOG_DIR}/flink-${FLINK_IDENT_STRING}-${DAEMON}-${id}-${HOSTNAME}"
- log="${FLINK_LOG_PREFIX}.log"
- out="${FLINK_LOG_PREFIX}.out"
- log_setting=("-Dlog.file=${log}" "-Dlog4j.configuration=file:${FLINK_CONF_DIR}/log4j.properties" "-Dlog4j.configurationFile=file:${FLINK_CONF_DIR}/log4j.properties" "-Dlogback.configurationFile=file:${FLINK_CONF_DIR}/logback.xml")
- function guaranteed_kill {
- to_stop_pid=$1
- daemon=$2
- # send sigterm for graceful shutdown
- kill $to_stop_pid
- # if timeout exists, use it
- if command -v timeout &> /dev/null ; then
- # wait 10 seconds for process to stop. By default, Flink kills the JVM 5 seconds after sigterm.
- timeout 10 tail --pid=$to_stop_pid -f /dev/null &> /dev/null
- if [ "$?" -eq 124 ]; then
- echo "Daemon $daemon didn't stop within 10 seconds. Killing it."
- # send sigkill
- kill -9 $to_stop_pid
- fi
- fi
- }
- case $STARTSTOP in
- (start)
- # Print a warning if daemons are already running on host
- if [ -f "$pid" ]; then
- active=()
- while IFS='' read -r p || [[ -n "$p" ]]; do
- kill -0 $p >/dev/null 2>&1
- if [ $? -eq 0 ]; then
- active+=($p)
- fi
- done < "${pid}"
- count="${#active[@]}"
- if [ ${count} -gt 0 ]; then
- echo "[INFO] $count instance(s) of $DAEMON are already running on $HOSTNAME."
- fi
- fi
- # Evaluate user options for local variable expansion
- FLINK_ENV_JAVA_OPTS=$(eval echo ${FLINK_ENV_JAVA_OPTS})
- echo "Starting $DAEMON daemon on host $HOSTNAME."
- "$JAVA_RUN" $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} "${log_setting[@]}" -classpath "`manglePathList "$FLINK_TM_CLASSPATH:$SQL_GATEWAY_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" ${CLASS_TO_RUN} "${ARGS[@]}" > "$out" 200<&- 2>&1 < /dev/null &
- mypid=$!
- # Add to pid file if successful start
- if [[ ${mypid} =~ ${IS_NUMBER} ]] && kill -0 $mypid > /dev/null 2>&1 ; then
- echo $mypid >> "$pid"
- else
- echo "Error starting $DAEMON daemon."
- exit 1
- fi
- ;;
- (stop)
- if [ -f "$pid" ]; then
- # Remove last in pid file
- to_stop=$(tail -n 1 "$pid")
- if [ -z $to_stop ]; then
- rm "$pid" # If all stopped, clean up pid file
- echo "No $DAEMON daemon to stop on host $HOSTNAME."
- else
- sed \$d "$pid" > "$pid.tmp" # all but last line
- # If all stopped, clean up pid file
- [ $(wc -l < "$pid.tmp") -eq 0 ] && rm "$pid" "$pid.tmp" || mv "$pid.tmp" "$pid"
- if kill -0 $to_stop > /dev/null 2>&1; then
- echo "Stopping $DAEMON daemon (pid: $to_stop) on host $HOSTNAME."
- guaranteed_kill $to_stop $DAEMON
- else
- echo "No $DAEMON daemon (pid: $to_stop) is running anymore on $HOSTNAME."
- fi
- fi
- else
- echo "No $DAEMON daemon to stop on host $HOSTNAME."
- fi
- ;;
- (stop-all)
- if [ -f "$pid" ]; then
- mv "$pid" "${pid}.tmp"
- while read to_stop; do
- if kill -0 $to_stop > /dev/null 2>&1; then
- echo "Stopping $DAEMON daemon (pid: $to_stop) on host $HOSTNAME."
- guaranteed_kill $to_stop $DAEMON
- else
- echo "Skipping $DAEMON daemon (pid: $to_stop), because it is not running anymore on $HOSTNAME."
- fi
- done < "${pid}.tmp"
- rm "${pid}.tmp"
- fi
- ;;
- (*)
- echo "Unexpected argument '$STARTSTOP'. $USAGE."
- exit 1
- ;;
- esac
|