Apache Spark:没有 HDFS 检查点的流式传输

Apache Spark: Streaming without HDFS checkpoint

我正在实施一个使用 reduceByKeyAndWindow 的 Spark 作业,因此我需要添加检查点。

我从 Spark 的网站上看到:

Checkpointing can be enabled by setting a directory in a fault-tolerant, reliable file system (e.g., HDFS, S3, etc.) to which the checkpoint information will be saved.

我的申请只是为了学术目的,因此我不想设置一个HDFS来做检查点,而只是一个本地文件。在 MacOS 中这样做工作正常(将临时目录设置为检查点目录),在 Windows 中这样做时会出现问题,这会引发权限异常。

我已经尝试以管理员身份启动 eclipse 并手动创建目录设置 setWritablesetReadablesetExecutable。关于如何解决 Windows 中的问题的任何提示?

谢谢!

更新 这是我的代码和异常。再次澄清一下,它在 Mac 中运行良好,但在 Windows.

中运行良好
SparkConf conf = new SparkConf().setAppName("testApp").setMaster("local[2]");
JavaSparkContext ctx = new JavaSparkContext(conf);
JavaStreamingContext jsc = new JavaStreamingContext(ctx, new Duration(1000));
jsc.checkpoint(Files.createTempDir().getAbsolutePath());

异常:

Exception in thread "pool-7-thread-3" java.lang.NullPointerException
at java.lang.ProcessBuilder.start(Unknown Source)
at org.apache.hadoop.util.Shell.runCommand(Shell.java:404)
at org.apache.hadoop.util.Shell.run(Shell.java:379)
at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:589)
at org.apache.hadoop.util.Shell.execCommand(Shell.java:678)
at org.apache.hadoop.util.Shell.execCommand(Shell.java:661)
at org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:639)
at org.apache.hadoop.fs.FilterFileSystem.setPermission(FilterFileSystem.java:468)
at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:456)
at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:424)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:905)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:886)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:783)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:772)
at org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler.run(Checkpoint.scala:135)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)

通过将最新的 Hadoop 库添加到我的项目来解决。

如果使用 Maven,以下一组依赖项可以解决问题。

<dependencies>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.10</artifactId>
        <version>1.3.0</version>
    </dependency>

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.10</artifactId>
        <version>1.2.1</version>
    </dependency>

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming-twitter_2.11</artifactId>
        <version>1.3.0</version>
    </dependency>

    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-core</artifactId>
        <version>1.2.1</version>
    </dependency>

    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-hdfs</artifactId>
        <version>2.6.0</version>
    </dependency>

    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-client</artifactId>
        <version>2.6.0</version>
    </dependency>

    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-common</artifactId>
        <version>2.6.0</version>
    </dependency>

在Windows上,您可以按以下方式解决此问题

  • 下载 winutils.exe 到文件夹说 MY_UTILS/bin
  • 创建环境变量HADOOP_HOME并将其指向MY_UTILS