Apache Flink - 将卷挂载到 Job Pod
Apache Flink - Mount Volume to Job Pod
我正在使用 https://www.tutorialspoint.com/apache_flink/apache_flink_creating_application.htm 教程中的 WordCountProg。代码如下:
WordCountProg.java
package main.java.spendreport;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.util.Collector;
public class WordCountProg {
// *************************************************************************
// PROGRAM
// *************************************************************************
public static void main(String[] args) throws Exception {
final ParameterTool params = ParameterTool.fromArgs(args);
// set up the execution environment
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// make parameters available in the web interface
env.getConfig().setGlobalJobParameters(params);
// get input data
DataSet<String> text = env.readTextFile(params.get("input"));
DataSet<Tuple2<String, Integer>> counts =
// split up the lines in pairs (2-tuples) containing: (word,1)
text.flatMap(new Tokenizer())
// group by the tuple field "0" and sum up tuple field "1"
.groupBy(0)
.sum(1);
// emit result
if (params.has("output")) {
counts.writeAsCsv(params.get("output"), "\n", " ");
// execute program
env.execute("WordCount Example");
} else {
System.out.println("Printing result to stdout. Use --output to specify output path.");
counts.print();
}
}
// *************************************************************************
// USER FUNCTIONS
// *************************************************************************
public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
// normalize and split the line
String[] tokens = value.toLowerCase().split("\W+");
// emit the pairs
for (String token : tokens) {
if (token.length() > 0) {
out.collect(new Tuple2<>(token, 1));
}
}
}
}
}
此示例将一个文本文件作为输入,计算一个词在文档中出现的次数,并将结果写入输出文件。
我正在使用以下 Dockerfile 创建我的作业映像:
Dockerfile
FROM flink:1.13.0-scala_2.11
WORKDIR /opt/flink/usrlib
# Create Directory for Input/Output
RUN mkdir /opt/flink/resources
COPY target/wordcount-0.0.1-SNAPSHOT.jar /opt/flink/usrlib/wordcount.jar
然后我的工作的 yaml 如下所示:
apiVersion: batch/v1
kind: Job
metadata:
name: flink-jobmanager
spec:
template:
metadata:
labels:
app: flink
component: jobmanager
spec:
restartPolicy: OnFailure
containers:
- name: jobmanager
image: docker/wordcount:latest
imagePullPolicy: Never
env:
#command: ["ls"]
args: ["standalone-job", "--job-classname", "main.java.spendreport.WordCountProg", "-input", "/opt/flink/resources/READ.txt", "-output", "/opt/flink/resources/results.txt"] #, <optional arguments>, <job arguments>] # optional arguments: ["--job-id", "<job id>", "--fromSavepoint", "/path/to/savepoint", "--allowNonRestoredState"]
#args: ["standalone-job", "--job-classname", "org.sense.flink.examples.stream.tpch.TPCHQuery03"] #, <optional arguments>, <job arguments>] # optional arguments: ["--job-id", "<job id>", "--fromSavepoint", "/path/to/savepoint", "--allowNonRestoredState"]
ports:
- containerPort: 6123
name: rpc
- containerPort: 6124
name: blob-server
- containerPort: 8081
name: webui
livenessProbe:
tcpSocket:
port: 6123
initialDelaySeconds: 30
periodSeconds: 60
volumeMounts:
- name: job-artifacts-volume
mountPath: /opt/flink/resources
- name: flink-config-volume
mountPath: /opt/flink/conf
securityContext:
runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessary
volumes:
- name: flink-config-volume
configMap:
name: flink-config
items:
- key: flink-conf.yaml
path: flink-conf.yaml
- key: log4j-console.properties
path: log4j-console.properties
- name: job-artifacts-volume
hostPath:
# directory location on host
path: /Users/my-user/Documents/semafor/apache_flink/PV
目标是将有 READ.txt 文件的 /Users/my-user/Documents/semafor/apache_flink/PV 挂载到作为作业输入的 pod 中。但是当作业尝试执行时,出现以下错误:
java.io.FileNotFoundException: File /opt/flink/resources/READ.txt does not exist or the user running Flink ('flink') has insufficient permissions to access it.
我试过运行:
sudo chown -R 9999:9999 /Users/my-user/Documents/semafor/apache_flink/PV
还有运行 chmod 777...但是我得到同样的错误。
我还尝试将 jar 复制到 READ.txt 文件所在的位置:/Users/my-user/Documents/semafor/apache_flink/PV 在我的本地目录上并将其挂载到 /opt/flink/usrlib相反,但后来我得到了:
org.apache.flink.util.FlinkException: Could not find the provided job class (main.java.spendreport.WordCountProg) in the user lib directory (/opt/flink/usrlib).
我对 Kubernetes 和 Flink 没有那么多经验,所以我不确定是我挂载不正确还是我做错了什么。如果您有任何建议,请lmk。提前致谢。
如果使用 minikube,您需要先使用
安装卷
minikube mount /Users/my-user/Documents/semafor/apache_flink/PV:/tmp/PV
然后在卷部分的 hostPath 配置中使用 /tmp/PV
参考这些话题:
我正在使用 https://www.tutorialspoint.com/apache_flink/apache_flink_creating_application.htm 教程中的 WordCountProg。代码如下:
WordCountProg.java
package main.java.spendreport;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.util.Collector;
public class WordCountProg {
// *************************************************************************
// PROGRAM
// *************************************************************************
public static void main(String[] args) throws Exception {
final ParameterTool params = ParameterTool.fromArgs(args);
// set up the execution environment
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// make parameters available in the web interface
env.getConfig().setGlobalJobParameters(params);
// get input data
DataSet<String> text = env.readTextFile(params.get("input"));
DataSet<Tuple2<String, Integer>> counts =
// split up the lines in pairs (2-tuples) containing: (word,1)
text.flatMap(new Tokenizer())
// group by the tuple field "0" and sum up tuple field "1"
.groupBy(0)
.sum(1);
// emit result
if (params.has("output")) {
counts.writeAsCsv(params.get("output"), "\n", " ");
// execute program
env.execute("WordCount Example");
} else {
System.out.println("Printing result to stdout. Use --output to specify output path.");
counts.print();
}
}
// *************************************************************************
// USER FUNCTIONS
// *************************************************************************
public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
// normalize and split the line
String[] tokens = value.toLowerCase().split("\W+");
// emit the pairs
for (String token : tokens) {
if (token.length() > 0) {
out.collect(new Tuple2<>(token, 1));
}
}
}
}
}
此示例将一个文本文件作为输入,计算一个词在文档中出现的次数,并将结果写入输出文件。
我正在使用以下 Dockerfile 创建我的作业映像:
Dockerfile
FROM flink:1.13.0-scala_2.11
WORKDIR /opt/flink/usrlib
# Create Directory for Input/Output
RUN mkdir /opt/flink/resources
COPY target/wordcount-0.0.1-SNAPSHOT.jar /opt/flink/usrlib/wordcount.jar
然后我的工作的 yaml 如下所示:
apiVersion: batch/v1
kind: Job
metadata:
name: flink-jobmanager
spec:
template:
metadata:
labels:
app: flink
component: jobmanager
spec:
restartPolicy: OnFailure
containers:
- name: jobmanager
image: docker/wordcount:latest
imagePullPolicy: Never
env:
#command: ["ls"]
args: ["standalone-job", "--job-classname", "main.java.spendreport.WordCountProg", "-input", "/opt/flink/resources/READ.txt", "-output", "/opt/flink/resources/results.txt"] #, <optional arguments>, <job arguments>] # optional arguments: ["--job-id", "<job id>", "--fromSavepoint", "/path/to/savepoint", "--allowNonRestoredState"]
#args: ["standalone-job", "--job-classname", "org.sense.flink.examples.stream.tpch.TPCHQuery03"] #, <optional arguments>, <job arguments>] # optional arguments: ["--job-id", "<job id>", "--fromSavepoint", "/path/to/savepoint", "--allowNonRestoredState"]
ports:
- containerPort: 6123
name: rpc
- containerPort: 6124
name: blob-server
- containerPort: 8081
name: webui
livenessProbe:
tcpSocket:
port: 6123
initialDelaySeconds: 30
periodSeconds: 60
volumeMounts:
- name: job-artifacts-volume
mountPath: /opt/flink/resources
- name: flink-config-volume
mountPath: /opt/flink/conf
securityContext:
runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessary
volumes:
- name: flink-config-volume
configMap:
name: flink-config
items:
- key: flink-conf.yaml
path: flink-conf.yaml
- key: log4j-console.properties
path: log4j-console.properties
- name: job-artifacts-volume
hostPath:
# directory location on host
path: /Users/my-user/Documents/semafor/apache_flink/PV
目标是将有 READ.txt 文件的 /Users/my-user/Documents/semafor/apache_flink/PV 挂载到作为作业输入的 pod 中。但是当作业尝试执行时,出现以下错误:
java.io.FileNotFoundException: File /opt/flink/resources/READ.txt does not exist or the user running Flink ('flink') has insufficient permissions to access it.
我试过运行:
sudo chown -R 9999:9999 /Users/my-user/Documents/semafor/apache_flink/PV
还有运行 chmod 777...但是我得到同样的错误。
我还尝试将 jar 复制到 READ.txt 文件所在的位置:/Users/my-user/Documents/semafor/apache_flink/PV 在我的本地目录上并将其挂载到 /opt/flink/usrlib相反,但后来我得到了:
org.apache.flink.util.FlinkException: Could not find the provided job class (main.java.spendreport.WordCountProg) in the user lib directory (/opt/flink/usrlib).
我对 Kubernetes 和 Flink 没有那么多经验,所以我不确定是我挂载不正确还是我做错了什么。如果您有任何建议,请lmk。提前致谢。
如果使用 minikube,您需要先使用
安装卷minikube mount /Users/my-user/Documents/semafor/apache_flink/PV:/tmp/PV
然后在卷部分的 hostPath 配置中使用 /tmp/PV
参考这些话题: