taskmanager.sh 2.9 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980
  1. #!/usr/bin/env bash
  2. ################################################################################
  3. # Licensed to the Apache Software Foundation (ASF) under one
  4. # or more contributor license agreements. See the NOTICE file
  5. # distributed with this work for additional information
  6. # regarding copyright ownership. The ASF licenses this file
  7. # to you under the Apache License, Version 2.0 (the
  8. # "License"); you may not use this file except in compliance
  9. # with the License. You may obtain a copy of the License at
  10. #
  11. # http://www.apache.org/licenses/LICENSE-2.0
  12. #
  13. # Unless required by applicable law or agreed to in writing, software
  14. # distributed under the License is distributed on an "AS IS" BASIS,
  15. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  16. # See the License for the specific language governing permissions and
  17. # limitations under the License.
  18. ################################################################################
  19. # Start/stop a Flink TaskManager.
  20. USAGE="Usage: taskmanager.sh (start|start-foreground|stop|stop-all)"
  21. STARTSTOP=$1
  22. ARGS=("${@:2}")
  23. if [[ $STARTSTOP != "start" ]] && [[ $STARTSTOP != "start-foreground" ]] && [[ $STARTSTOP != "stop" ]] && [[ $STARTSTOP != "stop-all" ]]; then
  24. echo $USAGE
  25. exit 1
  26. fi
  27. bin=`dirname "$0"`
  28. bin=`cd "$bin"; pwd`
  29. . "$bin"/config.sh
  30. ENTRYPOINT=taskexecutor
  31. if [[ $STARTSTOP == "start" ]] || [[ $STARTSTOP == "start-foreground" ]]; then
  32. # if no other JVM options are set, set the GC to G1
  33. if [ -z "${FLINK_ENV_JAVA_OPTS}" ] && [ -z "${FLINK_ENV_JAVA_OPTS_TM}" ]; then
  34. export JVM_ARGS="$JVM_ARGS -XX:+UseG1GC"
  35. fi
  36. # Add TaskManager-specific JVM options
  37. export FLINK_ENV_JAVA_OPTS="${FLINK_ENV_JAVA_OPTS} ${FLINK_ENV_JAVA_OPTS_TM}"
  38. # Startup parameters
  39. parseTmArgsAndExportLogs "${ARGS[@]}"
  40. if [ ! -z "${DYNAMIC_PARAMETERS}" ]; then
  41. ARGS=(${DYNAMIC_PARAMETERS[@]} "${ARGS[@]}")
  42. fi
  43. ARGS=("--configDir" "${FLINK_CONF_DIR}" "${ARGS[@]}")
  44. fi
  45. if [[ $STARTSTOP == "start-foreground" ]]; then
  46. exec "${FLINK_BIN_DIR}"/flink-console.sh $ENTRYPOINT "${ARGS[@]}"
  47. else
  48. if [[ $FLINK_TM_COMPUTE_NUMA == "false" ]]; then
  49. # Start a single TaskManager
  50. "${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP $ENTRYPOINT "${ARGS[@]}"
  51. else
  52. # Example output from `numactl --show` on an AWS c4.8xlarge:
  53. # policy: default
  54. # preferred node: current
  55. # physcpubind: 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
  56. # cpubind: 0 1
  57. # nodebind: 0 1
  58. # membind: 0 1
  59. read -ra NODE_LIST <<< $(numactl --show | grep "^nodebind: ")
  60. for NODE_ID in "${NODE_LIST[@]:1}"; do
  61. # Start a TaskManager for each NUMA node
  62. numactl --membind=$NODE_ID --cpunodebind=$NODE_ID -- "${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP $ENTRYPOINT "${ARGS[@]}"
  63. done
  64. fi
  65. fi