java 上的 spark 流数据帧和累加器
spark streaming dataframes and accumulators on java
我正在 Spark Structured Streaming 中处理一个 kafka JSON-stream。作为微批处理,我可以将累加器与流数据帧一起使用吗?
LongAccumulator longAccum = new LongAccumulator("my accum");
Dataset<Row> df2 = df.filter(output.col("Called number").equalTo("0860"))
.groupBy("Calling number").count();
// put row counter to accumulator for example
df2.javaRDD().foreach(row -> {longAccumulator.add(1);})
投掷
Exception in thread "main" org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
。我也很困惑以这种方式使用累加器。将数据帧向下转换为 RDD 看起来很奇怪而且没有必要。我可以不用 c RDD 和 foreach() 吗?
根据例外情况,我从源数据帧中删除了 foreach 并在 writeStream().foreachBatch() 中完成
StreamingQuery ds = df2
.writeStream().foreachBatch( (rowDataset, aLong) -> {
longAccum.add(1);
log.info("accum : " + longAccum.value());
})
.outputMode("complete")
.format("console").start();
它正在工作,但我在日志中没有值,并且在 GUI 中看不到累加器。
不,您可以直接使用数据集访问,如下所示-
LongAccumulator longAccum = spark.sparkContext().longAccumulator("my accum");
Dataset<Row> df = spark.range(100).withColumn("x", lit("x"));
//access in map
df.map((MapFunction<Row, Row>) row -> {
longAccum.add(1);
return row;
}, RowEncoder.apply(df.schema()))
.count();
// accumulator value
System.out.println(longAccum.value()); // 100
longAccum.reset();
// access in for each
df.foreach((ForeachFunction<Row>) row -> longAccum.add(1));
// accumulator value
System.out.println(longAccum.value()); // 100
Please note that accumulator value gets updated only when the action
performed.
使用流数据帧
longAccum.reset();
/**
* streaming dataframe from csv dir
* test.csv
* --------
* csv
* id,name
* 1,bob
* 2,smith
* 3,jam
* 4,dwayne
* 5,mike
*/
String fileDir = getClass().getResource("/" + "csv").getPath();
StructType schema = new StructType()
.add(new StructField("id", DataTypes.LongType, true, Metadata.empty()))
.add(new StructField("name", DataTypes.StringType, true, Metadata.empty()));
Dataset<Row> json = spark.readStream().schema(schema).option("header", true).csv(fileDir);
StreamingQuery streamingQuery = json
.map((MapFunction<Row, Row>) row -> {
longAccum.add(1);
return row;
}, RowEncoder.apply(df.schema()))
.writeStream()
.format("console").start();
streamingQuery.processAllAvailable();
// accumulator value
System.out.println(longAccum.value()); // 5
我正在 Spark Structured Streaming 中处理一个 kafka JSON-stream。作为微批处理,我可以将累加器与流数据帧一起使用吗?
LongAccumulator longAccum = new LongAccumulator("my accum");
Dataset<Row> df2 = df.filter(output.col("Called number").equalTo("0860"))
.groupBy("Calling number").count();
// put row counter to accumulator for example
df2.javaRDD().foreach(row -> {longAccumulator.add(1);})
投掷
Exception in thread "main" org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
。我也很困惑以这种方式使用累加器。将数据帧向下转换为 RDD 看起来很奇怪而且没有必要。我可以不用 c RDD 和 foreach() 吗?
根据例外情况,我从源数据帧中删除了 foreach 并在 writeStream().foreachBatch() 中完成
StreamingQuery ds = df2
.writeStream().foreachBatch( (rowDataset, aLong) -> {
longAccum.add(1);
log.info("accum : " + longAccum.value());
})
.outputMode("complete")
.format("console").start();
它正在工作,但我在日志中没有值,并且在 GUI 中看不到累加器。
不,您可以直接使用数据集访问,如下所示-
LongAccumulator longAccum = spark.sparkContext().longAccumulator("my accum");
Dataset<Row> df = spark.range(100).withColumn("x", lit("x"));
//access in map
df.map((MapFunction<Row, Row>) row -> {
longAccum.add(1);
return row;
}, RowEncoder.apply(df.schema()))
.count();
// accumulator value
System.out.println(longAccum.value()); // 100
longAccum.reset();
// access in for each
df.foreach((ForeachFunction<Row>) row -> longAccum.add(1));
// accumulator value
System.out.println(longAccum.value()); // 100
Please note that accumulator value gets updated only when the
action
performed.
使用流数据帧
longAccum.reset();
/**
* streaming dataframe from csv dir
* test.csv
* --------
* csv
* id,name
* 1,bob
* 2,smith
* 3,jam
* 4,dwayne
* 5,mike
*/
String fileDir = getClass().getResource("/" + "csv").getPath();
StructType schema = new StructType()
.add(new StructField("id", DataTypes.LongType, true, Metadata.empty()))
.add(new StructField("name", DataTypes.StringType, true, Metadata.empty()));
Dataset<Row> json = spark.readStream().schema(schema).option("header", true).csv(fileDir);
StreamingQuery streamingQuery = json
.map((MapFunction<Row, Row>) row -> {
longAccum.add(1);
return row;
}, RowEncoder.apply(df.schema()))
.writeStream()
.format("console").start();
streamingQuery.processAllAvailable();
// accumulator value
System.out.println(longAccum.value()); // 5