如何使用非 Lambda 函数定义 Spark RDD 转换
How to define Spark RDD transformation with non-Lambda Function
我最近开始使用 Spark 和 Java。我目前正在试验 RDD 转换和操作。目前我正在从包含一些 DateTime 字段的 csv 中读取数据,然后我应用过滤器以仅保留那些小于 2 天的行,最后我检查生成的 RDD 是否为空。我写了一个简单的片段,它在最低限度上完成了我想要的。
Function<List<String>, Boolean> filterPredicate = row -> new DateTime(row.get(1).isAfter(dtThreshold);
sc.textFile(inputFilePath)
.map(text -> Arrays.asList(text.split(",")))
.filter(filterPredicate)
.isEmpty();
在这个简单的例子中,我假设 DateTime 对象总是位于第一列。我现在想扩展它以使用多个列索引。
但要做到这一点,我需要能够定义一个多行的谓词函数。这就是我将谓词函数定义与转换代码分开的原因。
我该如何定义这样的函数?
使用花括号表示法...
Function<List<String>, Boolean> filterPredicate = row -> {
boolean isDateAfter = new DateTime(row.get(1)).isAfter(dtThreshold);
boolean hasName = row.get(2) != "";
return isDateAfter && hasName;
}
我最近开始使用 Spark 和 Java。我目前正在试验 RDD 转换和操作。目前我正在从包含一些 DateTime 字段的 csv 中读取数据,然后我应用过滤器以仅保留那些小于 2 天的行,最后我检查生成的 RDD 是否为空。我写了一个简单的片段,它在最低限度上完成了我想要的。
Function<List<String>, Boolean> filterPredicate = row -> new DateTime(row.get(1).isAfter(dtThreshold);
sc.textFile(inputFilePath)
.map(text -> Arrays.asList(text.split(",")))
.filter(filterPredicate)
.isEmpty();
在这个简单的例子中,我假设 DateTime 对象总是位于第一列。我现在想扩展它以使用多个列索引。 但要做到这一点,我需要能够定义一个多行的谓词函数。这就是我将谓词函数定义与转换代码分开的原因。
我该如何定义这样的函数?
使用花括号表示法...
Function<List<String>, Boolean> filterPredicate = row -> {
boolean isDateAfter = new DateTime(row.get(1)).isAfter(dtThreshold);
boolean hasName = row.get(2) != "";
return isDateAfter && hasName;
}