spark ssc.textFileStream 没有从目录中流式传输任何文件

spark ssc.textFileStream is not streamining any files from directory

我正在尝试使用 eclipse(使用 maven conf)和 2 个工作人员执行以下代码,每个工作人员都有 2 个核心,或者也尝试使用 spark-submit。

public class StreamingWorkCount implements Serializable {

    public static void main(String[] args) {
        Logger.getLogger("org.apache.spark").setLevel(Level.WARN);
        JavaStreamingContext jssc = new JavaStreamingContext(
                "spark://192.168.1.19:7077", "JavaWordCount",
                new Duration(1000));
        JavaDStream<String> trainingData = jssc.textFileStream(
                "/home/bdi-user/kaushal-drive/spark/data/training").cache();
        trainingData.foreach(new Function<JavaRDD<String>, Void>() {

            public Void call(JavaRDD<String> rdd) throws Exception {
                List<String> output = rdd.collect();
                System.out.println("Sentences Collected from files " + output);
                return null;
            }
        });

        trainingData.print();
        jssc.start();
        jssc.awaitTermination();
    }
}

以及该代码的日志

15/01/22 21:57:13 INFO FileInputDStream: New files at time 1421944033000 ms:

15/01/22 21:57:13 INFO JobScheduler: Added jobs for time 1421944033000 ms
15/01/22 21:57:13 INFO JobScheduler: Starting job streaming job 1421944033000 ms.0 from job set of time 1421944033000 ms
15/01/22 21:57:13 INFO SparkContext: Starting job: foreach at StreamingKMean.java:33
15/01/22 21:57:13 INFO DAGScheduler: Job 3 finished: foreach at StreamingKMean.java:33, took 0.000094 s
Sentences Collected from files []
-------------------------------------------
15/01/22 21:57:13 INFO JobScheduler: Finished job streaming job 1421944033000 ms.0 from job set of time 1421944033000 ms
Time: 1421944033000 ms
-------------------------------------------15/01/22 21:57:13 INFO JobScheduler: Starting job streaming job 1421944033000 ms.1 from job set of time 1421944033000 ms


15/01/22 21:57:13 INFO JobScheduler: Finished job streaming job 1421944033000 ms.1 from job set of time 1421944033000 ms
15/01/22 21:57:13 INFO JobScheduler: Total delay: 0.028 s for time 1421944033000 ms (execution: 0.013 s)
15/01/22 21:57:13 INFO MappedRDD: Removing RDD 5 from persistence list
15/01/22 21:57:13 INFO BlockManager: Removing RDD 5
15/01/22 21:57:13 INFO FileInputDStream: Cleared 0 old files that were older than 1421943973000 ms: 
15/01/22 21:57:13 INFO FileInputDStream: Cleared 0 old files that were older than 1421943973000 ms: 
15/01/22 21:57:13 INFO ReceivedBlockTracker: Deleting batches ArrayBuffer()

问题是,我没有从目录中的文件中获取数据。请帮助我。

用另一个目录试试,然后将这些文件复制到那个目录,而作业是运行。

我认为你需要添加方案,即在路径前面添加 file://hdfs://


撤消对我的评论的编辑,因为: 事实上 file://hdfs:// 需要添加 "in front of" 路径,因此总路径变为 file:///tmp/file.txthdfs:///user/data。如果配置中没有设置NameNode,则后者需要hdfs://host:port/user/data.

有同样的问题。 这是我的代码:

lines = jssc.textFileStream("file:///Users/projects/spark/test/data');

TextFileSTream 非常 敏感;我最后做的是:

1. Run Spark program
2. touch datafile
3. mv datafile datafile2
4. mv datafile2  /Users/projects/spark/test/data

就是这样。

JavaDoc 建议函数仅流式传输新文件。

参考: https://spark.apache.org/docs/1.0.1/api/java/org/apache/spark/streaming/api/java/JavaStreamingContext.html#textFileStream(java.lang.String)

创建一个输入流来监视 Hadoop 兼容文件系统的新文件并将它们作为文本文件读取(使用 LongWritable 键,Text 值和 TextInputFormat 输入格式)。文件必须由 "moving" 从同一文件系统中的另一个位置写入受监视目录。开头的文件名。被忽略。

textFileStream 只能在文件夹中的文件正在添加更新 时监控该文件夹。

如果你只想读取文件,你可以使用SparkContext.textFile

你必须考虑到 Spark Streaming 只会读取目录中的新文件,不会读取更新的文件(一旦它们在目录中)并且它们都必须具有相同的格式。

Source

我已经挠头好几个小时了,对我有用的是

  • 来自
  • 的回答
  • 我忘了开始流式处理,所以你需要ssc.start()