Apache Spark:没有 HDFS 检查点的流式传输
Apache Spark: Streaming without HDFS checkpoint
我正在实施一个使用 reduceByKeyAndWindow 的 Spark 作业,因此我需要添加检查点。
我从 Spark 的网站上看到:
Checkpointing can be enabled by setting a directory in a fault-tolerant, reliable file system (e.g., HDFS, S3, etc.) to which the checkpoint information will be saved.
我的申请只是为了学术目的,因此我不想设置一个HDFS来做检查点,而只是一个本地文件。在 MacOS 中这样做工作正常(将临时目录设置为检查点目录),在 Windows 中这样做时会出现问题,这会引发权限异常。
我已经尝试以管理员身份启动 eclipse 并手动创建目录设置 setWritable、setReadable 和 setExecutable 到 真 。关于如何解决 Windows 中的问题的任何提示?
谢谢!
更新 这是我的代码和异常。再次澄清一下,它在 Mac 中运行良好,但在 Windows.
中运行良好
SparkConf conf = new SparkConf().setAppName("testApp").setMaster("local[2]");
JavaSparkContext ctx = new JavaSparkContext(conf);
JavaStreamingContext jsc = new JavaStreamingContext(ctx, new Duration(1000));
jsc.checkpoint(Files.createTempDir().getAbsolutePath());
异常:
Exception in thread "pool-7-thread-3" java.lang.NullPointerException
at java.lang.ProcessBuilder.start(Unknown Source)
at org.apache.hadoop.util.Shell.runCommand(Shell.java:404)
at org.apache.hadoop.util.Shell.run(Shell.java:379)
at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:589)
at org.apache.hadoop.util.Shell.execCommand(Shell.java:678)
at org.apache.hadoop.util.Shell.execCommand(Shell.java:661)
at org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:639)
at org.apache.hadoop.fs.FilterFileSystem.setPermission(FilterFileSystem.java:468)
at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:456)
at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:424)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:905)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:886)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:783)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:772)
at org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler.run(Checkpoint.scala:135)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
通过将最新的 Hadoop 库添加到我的项目来解决。
如果使用 Maven,以下一组依赖项可以解决问题。
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>1.2.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-twitter_2.11</artifactId>
<version>1.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<version>1.2.1</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.6.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.6.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.6.0</version>
</dependency>
在Windows上,您可以按以下方式解决此问题
- 下载 winutils.exe 到文件夹说 MY_UTILS/bin
- 创建环境变量HADOOP_HOME并将其指向MY_UTILS
我正在实施一个使用 reduceByKeyAndWindow 的 Spark 作业,因此我需要添加检查点。
我从 Spark 的网站上看到:
Checkpointing can be enabled by setting a directory in a fault-tolerant, reliable file system (e.g., HDFS, S3, etc.) to which the checkpoint information will be saved.
我的申请只是为了学术目的,因此我不想设置一个HDFS来做检查点,而只是一个本地文件。在 MacOS 中这样做工作正常(将临时目录设置为检查点目录),在 Windows 中这样做时会出现问题,这会引发权限异常。
我已经尝试以管理员身份启动 eclipse 并手动创建目录设置 setWritable、setReadable 和 setExecutable 到 真 。关于如何解决 Windows 中的问题的任何提示?
谢谢!
更新 这是我的代码和异常。再次澄清一下,它在 Mac 中运行良好,但在 Windows.
中运行良好SparkConf conf = new SparkConf().setAppName("testApp").setMaster("local[2]");
JavaSparkContext ctx = new JavaSparkContext(conf);
JavaStreamingContext jsc = new JavaStreamingContext(ctx, new Duration(1000));
jsc.checkpoint(Files.createTempDir().getAbsolutePath());
异常:
Exception in thread "pool-7-thread-3" java.lang.NullPointerException
at java.lang.ProcessBuilder.start(Unknown Source)
at org.apache.hadoop.util.Shell.runCommand(Shell.java:404)
at org.apache.hadoop.util.Shell.run(Shell.java:379)
at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:589)
at org.apache.hadoop.util.Shell.execCommand(Shell.java:678)
at org.apache.hadoop.util.Shell.execCommand(Shell.java:661)
at org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:639)
at org.apache.hadoop.fs.FilterFileSystem.setPermission(FilterFileSystem.java:468)
at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:456)
at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:424)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:905)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:886)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:783)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:772)
at org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler.run(Checkpoint.scala:135)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
通过将最新的 Hadoop 库添加到我的项目来解决。
如果使用 Maven,以下一组依赖项可以解决问题。
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>1.2.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-twitter_2.11</artifactId>
<version>1.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<version>1.2.1</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.6.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.6.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.6.0</version>
</dependency>
在Windows上,您可以按以下方式解决此问题
- 下载 winutils.exe 到文件夹说 MY_UTILS/bin
- 创建环境变量HADOOP_HOME并将其指向MY_UTILS