123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223 |
- #!/usr/bin/env bash
- ################################################################################
- # Licensed to the Apache Software Foundation (ASF) under one
- # or more contributor license agreements. See the NOTICE file
- # distributed with this work for additional information
- # regarding copyright ownership. The ASF licenses this file
- # to you under the Apache License, Version 2.0 (the
- # "License"); you may not use this file except in compliance
- # with the License. You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- ################################################################################
- readFromConfigFile() {
- local key=$1
- local defaultValue=$2
- local configFile=$3
- # first extract the value with the given key (1st sed), then trim the result (2nd sed)
- # if a key exists multiple times, take the "last" one (tail)
- local value=`sed -n "s/^[ ]*${key}[ ]*: \([^#]*\).*$/\1/p" "${configFile}" | sed "s/^ *//;s/ *$//" | tail -n 1`
- [ -z "$value" ] && echo "$defaultValue" || echo "$value"
- }
- setJavaHome() {
- # read JAVA_HOME from config with no default value
- # NOTE: we need to obtain JAVA_HOME before using BashJavaUtils, so the value for env.java.home must
- # be in a flattened format, rather than nested, allowing us to retrieve the corresponding value via
- # shell script.
- CONF_FILE="$1/flink-conf.yaml"
- if [ ! -e "$1/flink-conf.yaml" ]; then
- CONF_FILE="$1/config.yaml"
- fi;
-
- KEY_ENV_JAVA_HOME="env.java.home"
- MY_JAVA_HOME=$(readFromConfigFile ${KEY_ENV_JAVA_HOME} "" "${CONF_FILE}")
- # check if config specified JAVA_HOME
- if [ -z "${MY_JAVA_HOME}" ]; then
- # config did not specify JAVA_HOME. Use system JAVA_HOME
- MY_JAVA_HOME="${JAVA_HOME}"
- fi
- # check if we have a valid JAVA_HOME and if java is not available
- if [ -z "${MY_JAVA_HOME}" ] && ! type java > /dev/null 2> /dev/null; then
- echo "Please specify JAVA_HOME. Either in Flink config ./conf/config.yaml or as system-wide JAVA_HOME."
- exit 1
- else
- export JAVA_HOME="${MY_JAVA_HOME}"
- fi
- }
-
- setJavaRun() {
- setJavaHome "$1"
- UNAME=$(uname -s)
- if [ "${UNAME:0:6}" == "CYGWIN" ]; then
- JAVA_RUN=java
- else
- if [[ -d "$JAVA_HOME" ]]; then
- JAVA_RUN="$JAVA_HOME"/bin/java
- else
- JAVA_RUN=java
- fi
- fi
- export JAVA_RUN
- }
- manglePathList() {
- UNAME=$(uname -s)
- # a path list, for example a java classpath
- if [ "${UNAME:0:6}" == "CYGWIN" ]; then
- echo `cygpath -wp "$1"`
- else
- echo $1
- fi
- }
- findFlinkDistJar() {
- local FLINK_DIST
- local LIB_DIR
- if [[ -n "$1" ]]; then
- LIB_DIR="$1"
- else
- LIB_DIR="$FLINK_LIB_DIR"
- fi
- FLINK_DIST="$(find "$LIB_DIR" -name 'flink-dist*.jar')"
- local FLINK_DIST_COUNT
- FLINK_DIST_COUNT="$(echo "$FLINK_DIST" | wc -l)"
- # If flink-dist*.jar cannot be resolved write error messages to stderr since stdout is stored
- # as the classpath and exit function with empty classpath to force process failure
- if [[ "$FLINK_DIST" == "" ]]; then
- (>&2 echo "[ERROR] Flink distribution jar not found in $FLINK_LIB_DIR.")
- exit 1
- elif [[ "$FLINK_DIST_COUNT" -gt 1 ]]; then
- (>&2 echo "[ERROR] Multiple flink-dist*.jar found in $FLINK_LIB_DIR. Please resolve.")
- exit 1
- fi
- echo "$FLINK_DIST"
- }
- runBashJavaUtilsCmd() {
- local cmd=$1
- local conf_dir=$2
- local class_path=$3
- local dynamic_args=${@:4}
- class_path=`manglePathList "${class_path}"`
- local output=`"${JAVA_RUN}" -classpath "${class_path}" org.apache.flink.runtime.util.bash.BashJavaUtils ${cmd} --configDir "${conf_dir}" $dynamic_args 2>&1 | tail -n 1000`
- if [[ $? -ne 0 ]]; then
- echo "[ERROR] Cannot run BashJavaUtils to execute command ${cmd}." 1>&2
- # Print the output in case the user redirect the log to console.
- echo "$output" 1>&2
- exit 1
- fi
- echo "$output"
- }
- updateAndGetFlinkConfiguration() {
- local FLINK_CONF_DIR="$1"
- local FLINK_BIN_DIR="$2"
- local FLINK_LIB_DIR="$3"
- local command_result
- command_result=$(parseConfigurationAndExportLogs "$FLINK_CONF_DIR" "$FLINK_BIN_DIR" "$FLINK_LIB_DIR" "UPDATE_AND_GET_FLINK_CONFIGURATION" "${@:4}")
- echo "$command_result"
- }
- migrateLegacyFlinkConfigToStandardYaml() {
- local FLINK_CONF_DIR="$1"
- local FLINK_BIN_DIR="$2"
- local FLINK_LIB_DIR="$3"
- local command_result
- command_result=$(parseConfigurationAndExportLogs "$FLINK_CONF_DIR" "$FLINK_BIN_DIR" "$FLINK_LIB_DIR" "MIGRATE_LEGACY_FLINK_CONFIGURATION_TO_STANDARD_YAML")
- echo "$command_result"
- }
- parseConfigurationAndExportLogs() {
- local FLINK_CONF_DIR="$1"
- local FLINK_BIN_DIR="$2"
- local FLINK_LIB_DIR="$3"
- local COMMAND="$4"
- local EXECUTION_PREFIX="BASH_JAVA_UTILS_EXEC_RESULT:"
- java_utils_output=$(runBashJavaUtilsCmd "${COMMAND}" "${FLINK_CONF_DIR}" "${FLINK_BIN_DIR}/bash-java-utils.jar:$(findFlinkDistJar ${FLINK_LIB_DIR})" "${@:5}")
- logging_output=$(extractLoggingOutputs "${java_utils_output}")
- execution_results=$(echo "${java_utils_output}" | grep ${EXECUTION_PREFIX})
- if [[ $? -ne 0 ]]; then
- echo "[ERROR] Could not parse configurations properly."
- echo "[ERROR] Raw output from BashJavaUtils:"
- echo "$java_utils_output"
- exit 1
- fi
- echo "${execution_results//${EXECUTION_PREFIX}/}"
- }
- extractLoggingOutputs() {
- local output="$1"
- local EXECUTION_PREFIX="BASH_JAVA_UTILS_EXEC_RESULT:"
- echo "${output}" | grep -v ${EXECUTION_PREFIX}
- }
- extractExecutionResults() {
- local output="$1"
- local expected_lines="$2"
- local EXECUTION_PREFIX="BASH_JAVA_UTILS_EXEC_RESULT:"
- local execution_results
- local num_lines
- execution_results=$(echo "${output}" | grep ${EXECUTION_PREFIX})
- num_lines=$(echo "${execution_results}" | wc -l)
- # explicit check for empty result, because if execution_results is empty, then wc returns 1
- if [[ -z ${execution_results} ]]; then
- echo "[ERROR] The execution result is empty." 1>&2
- exit 1
- fi
- if [[ ${num_lines} -ne ${expected_lines} ]]; then
- echo "[ERROR] The execution results has unexpected number of lines, expected: ${expected_lines}, actual: ${num_lines}." 1>&2
- echo "[ERROR] An execution result line is expected following the prefix '${EXECUTION_PREFIX}'" 1>&2
- echo "$output" 1>&2
- exit 1
- fi
- echo "${execution_results//${EXECUTION_PREFIX}/}"
- }
- parseResourceParamsAndExportLogs() {
- local cmd=$1
- java_utils_output=$(runBashJavaUtilsCmd ${cmd} "${FLINK_CONF_DIR}" "${FLINK_BIN_DIR}/bash-java-utils.jar:$(findFlinkDistJar)" "${@:2}")
- logging_output=$(extractLoggingOutputs "${java_utils_output}")
- params_output=$(extractExecutionResults "${java_utils_output}" 2)
- if [[ $? -ne 0 ]]; then
- echo "[ERROR] Could not get JVM parameters and dynamic configurations properly."
- echo "[ERROR] Raw output from BashJavaUtils:"
- echo "$java_utils_output"
- exit 1
- fi
- jvm_params=$(echo "${params_output}" | head -n1)
- export JVM_ARGS="${JVM_ARGS} ${jvm_params}"
- export DYNAMIC_PARAMETERS=$(IFS=" " echo "${params_output}" | tail -n1)
- export FLINK_INHERITED_LOGS="
- $FLINK_INHERITED_LOGS
- RESOURCE_PARAMS extraction logs:
- jvm_params: $jvm_params
- dynamic_configs: $DYNAMIC_PARAMETERS
- logs: $logging_output
- "
- }
|