Spark Dataset Foreach函数不迭代
Spark Dataset Foreach function does not iterate
上下文
我想遍历 Spark Dataset 并为每一行更新一个 HashMap。
这是我的代码:
// At this point, I have a my_dataset variable containing 300 000 rows and 10 columns
// - my_dataset.count() == 300 000
// - my_dataset.columns().length == 10
// Declare my HashMap
HashMap<String, Vector<String>> my_map = new HashMap<String, Vector<String>>();
// Initialize the map
for(String col : my_dataset.columns())
{
my_map.put(col, new Vector<String>());
}
// Iterate over the dataset and update the map
my_dataset.foreach( (ForeachFunction<Row>) row -> {
for(String col : my_map.KeySet())
{
my_map.get(col).add(row.get(row.fieldIndex(col)).toString());
}
});
问题
我的问题是 foreach 根本不迭代,lambda 从未执行,我不知道为什么。
我按照此处的指示实施了它:
最后,尽管数据集不是,但所有内部向量都保持为空(因为它们已初始化)(查看给定代码示例中的第一条评论)。
我知道 foreach 从不迭代,因为我做了两个测试:
- 添加一个
AtomicInteger
来计算迭代次数,使用 incrementAndGet()
方法在 lambda 的开头递增它。 => 在过程结束时计数器值保持 0
。
- 在 lambda 的开头打印调试消息。 => 该消息永远不会显示。
我没用过 Java(更不用说 Java lambdas)所以也许我错过了一个重要的点,但我找不到什么。
我可能有点老派,但我不太喜欢 lambda,因为它会变得非常复杂。
这里是 foreach()
:
的完整示例
package net.jgp.labs.spark.l240_foreach.l000;
import java.io.Serializable;
import org.apache.spark.api.java.function.ForeachFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
public class ForEachBookApp implements Serializable {
private static final long serialVersionUID = -4250231621481140775L;
private final class BookPrinter implements ForeachFunction<Row> {
private static final long serialVersionUID = -3680381094052442862L;
@Override
public void call(Row r) throws Exception {
System.out.println(r.getString(2) + " can be bought at " + r.getString(
4));
}
}
public static void main(String[] args) {
ForEachBookApp app = new ForEachBookApp();
app.start();
}
private void start() {
SparkSession spark = SparkSession.builder().appName("For Each Book").master(
"local").getOrCreate();
String filename = "data/books.csv";
Dataset<Row> df = spark.read().format("csv").option("inferSchema", "true")
.option("header", "true")
.load(filename);
df.show();
df.foreach(new BookPrinter());
}
}
如您所见,此示例读取 CSV 文件并从数据中打印一条消息。相当简单。
foreach()
实例化一个新的 class,工作已完成。
df.foreach(new BookPrinter());
工作在class的call()
方法中完成:
private final class BookPrinter implements ForeachFunction<Row> {
@Override
public void call(Row r) throws Exception {
...
}
}
由于您是 Java 的新手,请确保您具有正确的签名(对于 classes 和方法)和正确的导入。
您也可以从 https://github.com/jgperrin/net.jgp.labs.spark/tree/master/src/main/java/net/jgp/labs/spark/l240_foreach/l000 复制示例。这应该可以帮助您 foreach()
.
上下文
我想遍历 Spark Dataset 并为每一行更新一个 HashMap。
这是我的代码:
// At this point, I have a my_dataset variable containing 300 000 rows and 10 columns
// - my_dataset.count() == 300 000
// - my_dataset.columns().length == 10
// Declare my HashMap
HashMap<String, Vector<String>> my_map = new HashMap<String, Vector<String>>();
// Initialize the map
for(String col : my_dataset.columns())
{
my_map.put(col, new Vector<String>());
}
// Iterate over the dataset and update the map
my_dataset.foreach( (ForeachFunction<Row>) row -> {
for(String col : my_map.KeySet())
{
my_map.get(col).add(row.get(row.fieldIndex(col)).toString());
}
});
问题
我的问题是 foreach 根本不迭代,lambda 从未执行,我不知道为什么。
我按照此处的指示实施了它:
最后,尽管数据集不是,但所有内部向量都保持为空(因为它们已初始化)(查看给定代码示例中的第一条评论)。
我知道 foreach 从不迭代,因为我做了两个测试:
- 添加一个
AtomicInteger
来计算迭代次数,使用incrementAndGet()
方法在 lambda 的开头递增它。 => 在过程结束时计数器值保持0
。 - 在 lambda 的开头打印调试消息。 => 该消息永远不会显示。
我没用过 Java(更不用说 Java lambdas)所以也许我错过了一个重要的点,但我找不到什么。
我可能有点老派,但我不太喜欢 lambda,因为它会变得非常复杂。
这里是 foreach()
:
package net.jgp.labs.spark.l240_foreach.l000;
import java.io.Serializable;
import org.apache.spark.api.java.function.ForeachFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
public class ForEachBookApp implements Serializable {
private static final long serialVersionUID = -4250231621481140775L;
private final class BookPrinter implements ForeachFunction<Row> {
private static final long serialVersionUID = -3680381094052442862L;
@Override
public void call(Row r) throws Exception {
System.out.println(r.getString(2) + " can be bought at " + r.getString(
4));
}
}
public static void main(String[] args) {
ForEachBookApp app = new ForEachBookApp();
app.start();
}
private void start() {
SparkSession spark = SparkSession.builder().appName("For Each Book").master(
"local").getOrCreate();
String filename = "data/books.csv";
Dataset<Row> df = spark.read().format("csv").option("inferSchema", "true")
.option("header", "true")
.load(filename);
df.show();
df.foreach(new BookPrinter());
}
}
如您所见,此示例读取 CSV 文件并从数据中打印一条消息。相当简单。
foreach()
实例化一个新的 class,工作已完成。
df.foreach(new BookPrinter());
工作在class的call()
方法中完成:
private final class BookPrinter implements ForeachFunction<Row> {
@Override
public void call(Row r) throws Exception {
...
}
}
由于您是 Java 的新手,请确保您具有正确的签名(对于 classes 和方法)和正确的导入。
您也可以从 https://github.com/jgperrin/net.jgp.labs.spark/tree/master/src/main/java/net/jgp/labs/spark/l240_foreach/l000 复制示例。这应该可以帮助您 foreach()
.