flink-daemon.sh 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199
  1. #!/usr/bin/env bash
  2. ################################################################################
  3. # Licensed to the Apache Software Foundation (ASF) under one
  4. # or more contributor license agreements. See the NOTICE file
  5. # distributed with this work for additional information
  6. # regarding copyright ownership. The ASF licenses this file
  7. # to you under the Apache License, Version 2.0 (the
  8. # "License"); you may not use this file except in compliance
  9. # with the License. You may obtain a copy of the License at
  10. #
  11. # http://www.apache.org/licenses/LICENSE-2.0
  12. #
  13. # Unless required by applicable law or agreed to in writing, software
  14. # distributed under the License is distributed on an "AS IS" BASIS,
  15. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  16. # See the License for the specific language governing permissions and
  17. # limitations under the License.
  18. ################################################################################
  19. # Start/stop a Flink daemon.
  20. USAGE="Usage: flink-daemon.sh (start|stop|stop-all) (taskexecutor|zookeeper|historyserver|standalonesession|standalonejob|sql-gateway) [args]"
  21. STARTSTOP=$1
  22. DAEMON=$2
  23. ARGS=("${@:3}") # get remaining arguments as array
  24. bin=`dirname "$0"`
  25. bin=`cd "$bin"; pwd`
  26. . "$bin"/config.sh
  27. case $DAEMON in
  28. (taskexecutor)
  29. CLASS_TO_RUN=org.apache.flink.runtime.taskexecutor.TaskManagerRunner
  30. ;;
  31. (zookeeper)
  32. CLASS_TO_RUN=org.apache.flink.runtime.zookeeper.FlinkZooKeeperQuorumPeer
  33. ;;
  34. (historyserver)
  35. CLASS_TO_RUN=org.apache.flink.runtime.webmonitor.history.HistoryServer
  36. ;;
  37. (standalonesession)
  38. CLASS_TO_RUN=org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint
  39. ;;
  40. (standalonejob)
  41. CLASS_TO_RUN=org.apache.flink.container.entrypoint.StandaloneApplicationClusterEntryPoint
  42. ;;
  43. (sql-gateway)
  44. CLASS_TO_RUN=org.apache.flink.table.gateway.SqlGateway
  45. SQL_GATEWAY_CLASSPATH="`findSqlGatewayJar`":"`findFlinkPythonJar`"
  46. ;;
  47. (*)
  48. echo "Unknown daemon '${DAEMON}'. $USAGE."
  49. exit 1
  50. ;;
  51. esac
  52. if [ "$FLINK_IDENT_STRING" = "" ]; then
  53. FLINK_IDENT_STRING="$USER"
  54. fi
  55. FLINK_TM_CLASSPATH=`constructFlinkClassPath`
  56. pid=$FLINK_PID_DIR/flink-$FLINK_IDENT_STRING-$DAEMON.pid
  57. mkdir -p "$FLINK_PID_DIR"
  58. # Log files for daemons are indexed from the process ID's position in the PID
  59. # file. The following lock prevents a race condition during daemon startup
  60. # when multiple daemons read, index, and write to the PID file concurrently.
  61. # The lock is created on the PID directory since a lock file cannot be safely
  62. # removed. The daemon is started with the lock closed and the lock remains
  63. # active in this script until the script exits.
  64. command -v flock >/dev/null 2>&1
  65. if [[ $? -eq 0 ]]; then
  66. exec 200<"$FLINK_PID_DIR"
  67. flock 200
  68. fi
  69. # Ascending ID depending on number of lines in pid file.
  70. # This allows us to start multiple daemon of each type.
  71. id=$([ -f "$pid" ] && echo $(wc -l < "$pid") || echo "0")
  72. FLINK_LOG_PREFIX="${FLINK_LOG_DIR}/flink-${FLINK_IDENT_STRING}-${DAEMON}-${id}-${HOSTNAME}"
  73. log="${FLINK_LOG_PREFIX}.log"
  74. out="${FLINK_LOG_PREFIX}.out"
  75. log_setting=("-Dlog.file=${log}" "-Dlog4j.configuration=file:${FLINK_CONF_DIR}/log4j.properties" "-Dlog4j.configurationFile=file:${FLINK_CONF_DIR}/log4j.properties" "-Dlogback.configurationFile=file:${FLINK_CONF_DIR}/logback.xml")
  76. function guaranteed_kill {
  77. to_stop_pid=$1
  78. daemon=$2
  79. # send sigterm for graceful shutdown
  80. kill $to_stop_pid
  81. # if timeout exists, use it
  82. if command -v timeout &> /dev/null ; then
  83. # wait 10 seconds for process to stop. By default, Flink kills the JVM 5 seconds after sigterm.
  84. timeout 10 tail --pid=$to_stop_pid -f /dev/null &> /dev/null
  85. if [ "$?" -eq 124 ]; then
  86. echo "Daemon $daemon didn't stop within 10 seconds. Killing it."
  87. # send sigkill
  88. kill -9 $to_stop_pid
  89. fi
  90. fi
  91. }
  92. case $STARTSTOP in
  93. (start)
  94. # Print a warning if daemons are already running on host
  95. if [ -f "$pid" ]; then
  96. active=()
  97. while IFS='' read -r p || [[ -n "$p" ]]; do
  98. kill -0 $p >/dev/null 2>&1
  99. if [ $? -eq 0 ]; then
  100. active+=($p)
  101. fi
  102. done < "${pid}"
  103. count="${#active[@]}"
  104. if [ ${count} -gt 0 ]; then
  105. echo "[INFO] $count instance(s) of $DAEMON are already running on $HOSTNAME."
  106. fi
  107. fi
  108. # Evaluate user options for local variable expansion
  109. FLINK_ENV_JAVA_OPTS=$(eval echo ${FLINK_ENV_JAVA_OPTS})
  110. echo "Starting $DAEMON daemon on host $HOSTNAME."
  111. "$JAVA_RUN" $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} "${log_setting[@]}" -classpath "`manglePathList "$FLINK_TM_CLASSPATH:$SQL_GATEWAY_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" ${CLASS_TO_RUN} "${ARGS[@]}" > "$out" 200<&- 2>&1 < /dev/null &
  112. mypid=$!
  113. # Add to pid file if successful start
  114. if [[ ${mypid} =~ ${IS_NUMBER} ]] && kill -0 $mypid > /dev/null 2>&1 ; then
  115. echo $mypid >> "$pid"
  116. else
  117. echo "Error starting $DAEMON daemon."
  118. exit 1
  119. fi
  120. ;;
  121. (stop)
  122. if [ -f "$pid" ]; then
  123. # Remove last in pid file
  124. to_stop=$(tail -n 1 "$pid")
  125. if [ -z $to_stop ]; then
  126. rm "$pid" # If all stopped, clean up pid file
  127. echo "No $DAEMON daemon to stop on host $HOSTNAME."
  128. else
  129. sed \$d "$pid" > "$pid.tmp" # all but last line
  130. # If all stopped, clean up pid file
  131. [ $(wc -l < "$pid.tmp") -eq 0 ] && rm "$pid" "$pid.tmp" || mv "$pid.tmp" "$pid"
  132. if kill -0 $to_stop > /dev/null 2>&1; then
  133. echo "Stopping $DAEMON daemon (pid: $to_stop) on host $HOSTNAME."
  134. guaranteed_kill $to_stop $DAEMON
  135. else
  136. echo "No $DAEMON daemon (pid: $to_stop) is running anymore on $HOSTNAME."
  137. fi
  138. fi
  139. else
  140. echo "No $DAEMON daemon to stop on host $HOSTNAME."
  141. fi
  142. ;;
  143. (stop-all)
  144. if [ -f "$pid" ]; then
  145. mv "$pid" "${pid}.tmp"
  146. while read to_stop; do
  147. if kill -0 $to_stop > /dev/null 2>&1; then
  148. echo "Stopping $DAEMON daemon (pid: $to_stop) on host $HOSTNAME."
  149. guaranteed_kill $to_stop $DAEMON
  150. else
  151. echo "Skipping $DAEMON daemon (pid: $to_stop), because it is not running anymore on $HOSTNAME."
  152. fi
  153. done < "${pid}.tmp"
  154. rm "${pid}.tmp"
  155. fi
  156. ;;
  157. (*)
  158. echo "Unexpected argument '$STARTSTOP'. $USAGE."
  159. exit 1
  160. ;;
  161. esac