在 Apache Spark 中搜索和替换
Search and replace in Apache Spark
我们创建了两个数据集 sentenceDataFrame,sentenceDataFrame2 应该在其中进行搜索替换。
sentenceDataFrame2 存储搜索和替换词。
我们还执行了所有 11 种类型的联接 'inner'、'outer'、'full'、'fullouter'、'leftouter'、'left'、'rightouter', 'right', 'leftsemi', 'leftanti', 'cross' none 给了我们结果。
你能告诉我们哪里错了吗,请指出正确的方向。
List<Row> data = Arrays.asList(
RowFactory.create(0, "Allen jeevi pramod Allen"),
RowFactory.create(1,"sandesh Armstrong jeevi"),
RowFactory.create(2,"harsha Nischay DeWALT"));
StructType schema = new StructType(new StructField[] {
new StructField("label", DataTypes.IntegerType, false,
Metadata.empty()),
new StructField("sentence", DataTypes.StringType, false,
Metadata.empty()) });
Dataset<Row> sentenceDataFrame = spark.createDataFrame(data, schema);
List<Row> data2 = Arrays.asList(
RowFactory.create("Allen", "Apex Tool Group"),
RowFactory.create("Armstrong","Apex Tool Group"),
RowFactory.create("DeWALT","StanleyBlack"));
StructType schema2 = new StructType(new StructField[] {
new StructField("label2", DataTypes.StringType, false,
Metadata.empty()),
new StructField("sentence2", DataTypes.StringType, false,
Metadata.empty()) });
Dataset<Row> sentenceDataFrame2 = spark.createDataFrame(data2, schema2);
Dataset<Row> remainingElements=sentenceDataFrame.join(sentenceDataFrame2,sentenceDataFrame.col("label").equalTo(sentenceDataFrame2.col("label2")),"cross");
System.out.println("Left anti join count :"+remainingElements.count());
输入
Allen jeevi pramod Allen
桑德什·阿姆斯特朗·吉维
harsha Nischay DeWALT
预期输出
Apex 工具组 jeevi pramod Apex 工具组
sandesh Apex 工具组 jeevi
harsha Nischay StanleyBlack
对于不涉及像这样的简单等式的连接条件,您将需要使用 Spark 用户定义函数 (UDF)。
这是一个 JUnit 代码片段,它不会直接编译,但会显示相关的导入和逻辑。但是,Java API 非常冗长。我将把在 Scala 中执行此操作的问题留作 reader 的练习。这样会简洁很多。
callUDF()
和 col()
方法需要静态导入。
import static org.apache.spark.sql.functions.*;
import org.apache.spark.sql.*;
import org.apache.spark.sql.api.java.UDF2;
import org.apache.spark.sql.api.java.UDF3;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
@Test
public void testSomething() {
List<Row> data = Arrays.asList(
RowFactory.create(0, "Allen jeevi pramod Allen"),
RowFactory.create(1, "sandesh Armstrong jeevi"),
RowFactory.create(2, "harsha Nischay DeWALT")
);
StructType schema = new StructType(new StructField[] {
new StructField("label", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("sentence", DataTypes.StringType, false, Metadata.empty())
});
Dataset<Row> sentenceDataFrame = spark.createDataFrame(data, schema);
List<Row> data2 = Arrays.asList(
RowFactory.create("Allen", "Apex Tool Group"),
RowFactory.create("Armstrong","Apex Tool Group"),
RowFactory.create("DeWALT","StanleyBlack")
);
StructType schema2 = new StructType(new StructField[] {
new StructField("label2", DataTypes.StringType, false, Metadata.empty()),
new StructField("sentence2", DataTypes.StringType, false, Metadata.empty())
});
Dataset<Row> sentenceDataFrame2 = spark.createDataFrame(data2, schema2);
UDF2<String, String, Boolean> contains = new UDF2<String, String, Boolean>() {
private static final long serialVersionUID = -5239951370238629896L;
@Override
public Boolean call(String t1, String t2) throws Exception {
return t1.contains(t2);
}
};
spark.udf().register("contains", contains, DataTypes.BooleanType);
UDF3<String, String, String, String> replaceWithTerm = new UDF3<String, String, String, String>() {
private static final long serialVersionUID = -2882956931420910207L;
@Override
public String call(String t1, String t2, String t3) throws Exception {
return t1.replaceAll(t2, t3);
}
};
spark.udf().register("replaceWithTerm", replaceWithTerm, DataTypes.StringType);
Dataset<Row> joined = sentenceDataFrame.join(sentenceDataFrame2, callUDF("contains", sentenceDataFrame.col("sentence"), sentenceDataFrame2.col("label2")))
.withColumn("sentence_replaced", callUDF("replaceWithTerm", sentenceDataFrame.col("sentence"), sentenceDataFrame2.col("label2"), sentenceDataFrame2.col("sentence2")))
.select(col("sentence_replaced"));
joined.show(false);
}
输出:
+--------------------------------------------+
|sentence_replaced |
+--------------------------------------------+
|Apex Tool Group jeevi pramod Apex Tool Group|
|sandesh Apex Tool Group jeevi |
|harsha Nischay StanleyBlack |
+--------------------------------------------+
仍然面临类似的问题
输入
艾伦·阿姆斯特朗 jeevi pramod 艾伦
桑德什·阿姆斯特朗·吉维
harsha nischay DeWALT
输出
Apex 工具组 Armstrong jeevi pramod Apex 工具组
Allen Apex 工具组 jeevi pramod Allen
sandesh Apex 工具组 jeevi
harsha nischay StanleyBlack
预期输出
Apex 工具组 Apex 工具组 jeevi pramod Apex 工具组
sandesh Apex 工具组 jeevi
harsha nischay StanleyBlack
连续进行多次替换时得到此输出。
是否必须遵循任何其他方法才能获得正确的输出。?或者这是 UDF 的限制?
我们可以使用 replaceAll 和 UDF 函数来实现预期的输出。
public class Test {
public static void main(String[] args) {
JavaSparkContext sc = new JavaSparkContext(new SparkConf().setAppName("SparkJdbcDs").setMaster("local[*]"));
SQLContext sqlContext = new SQLContext(sc);
SparkSession spark = SparkSession.builder().appName("JavaTokenizerExample").getOrCreate();
List<Row> data = Arrays.asList(
RowFactory.create(0, "Allen jeevi pramod Allen"),
RowFactory.create(1, "sandesh Armstrong jeevi"),
RowFactory.create(2, "harsha Nischay DeWALT")
);
StructType schema = new StructType(new StructField[] {
new StructField("label", DataTypes.IntegerType, false,
Metadata.empty()),
new StructField("sentence", DataTypes.StringType, false,
Metadata.empty()) });
Dataset<Row> sentenceDataFrame = spark.createDataFrame(data, schema);
UDF1 mode = new UDF1<String, String>() {
public String call(final String types) throws Exception {
return types.replaceAll("Allen", "Apex Tool Group")
.replaceAll("Armstrong","Apex Tool Group")
.replaceAll(""DeWALT","StanleyBlack"")
}
};
sqlContext.udf().register("mode", mode, DataTypes.StringType);
sentenceDataFrame.createOrReplaceTempView("people");
Dataset<Row> newDF = sqlContext.sql("SELECT mode(sentence), label FROM people").withColumnRenamed("UDF(sentence)", "sentence");
newDF.show(false);
}
}
输出
+--------------------------------------------+------+
|sentence |label |
+--------------------------------------------+------+
|Apex Tool Group jeevi pramod Apex Tool Group| 0 |
|sandesh Apex Tool Group jeevi | 1 |
|harsha Nischay StanleyBlack | 2 |
+--------------------------------------------+------+
我们创建了两个数据集 sentenceDataFrame,sentenceDataFrame2 应该在其中进行搜索替换。
sentenceDataFrame2 存储搜索和替换词。
我们还执行了所有 11 种类型的联接 'inner'、'outer'、'full'、'fullouter'、'leftouter'、'left'、'rightouter', 'right', 'leftsemi', 'leftanti', 'cross' none 给了我们结果。
你能告诉我们哪里错了吗,请指出正确的方向。
List<Row> data = Arrays.asList(
RowFactory.create(0, "Allen jeevi pramod Allen"),
RowFactory.create(1,"sandesh Armstrong jeevi"),
RowFactory.create(2,"harsha Nischay DeWALT"));
StructType schema = new StructType(new StructField[] {
new StructField("label", DataTypes.IntegerType, false,
Metadata.empty()),
new StructField("sentence", DataTypes.StringType, false,
Metadata.empty()) });
Dataset<Row> sentenceDataFrame = spark.createDataFrame(data, schema);
List<Row> data2 = Arrays.asList(
RowFactory.create("Allen", "Apex Tool Group"),
RowFactory.create("Armstrong","Apex Tool Group"),
RowFactory.create("DeWALT","StanleyBlack"));
StructType schema2 = new StructType(new StructField[] {
new StructField("label2", DataTypes.StringType, false,
Metadata.empty()),
new StructField("sentence2", DataTypes.StringType, false,
Metadata.empty()) });
Dataset<Row> sentenceDataFrame2 = spark.createDataFrame(data2, schema2);
Dataset<Row> remainingElements=sentenceDataFrame.join(sentenceDataFrame2,sentenceDataFrame.col("label").equalTo(sentenceDataFrame2.col("label2")),"cross");
System.out.println("Left anti join count :"+remainingElements.count());
输入
Allen jeevi pramod Allen
桑德什·阿姆斯特朗·吉维
harsha Nischay DeWALT
预期输出
Apex 工具组 jeevi pramod Apex 工具组
sandesh Apex 工具组 jeevi
harsha Nischay StanleyBlack
对于不涉及像这样的简单等式的连接条件,您将需要使用 Spark 用户定义函数 (UDF)。
这是一个 JUnit 代码片段,它不会直接编译,但会显示相关的导入和逻辑。但是,Java API 非常冗长。我将把在 Scala 中执行此操作的问题留作 reader 的练习。这样会简洁很多。
callUDF()
和 col()
方法需要静态导入。
import static org.apache.spark.sql.functions.*;
import org.apache.spark.sql.*;
import org.apache.spark.sql.api.java.UDF2;
import org.apache.spark.sql.api.java.UDF3;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
@Test
public void testSomething() {
List<Row> data = Arrays.asList(
RowFactory.create(0, "Allen jeevi pramod Allen"),
RowFactory.create(1, "sandesh Armstrong jeevi"),
RowFactory.create(2, "harsha Nischay DeWALT")
);
StructType schema = new StructType(new StructField[] {
new StructField("label", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("sentence", DataTypes.StringType, false, Metadata.empty())
});
Dataset<Row> sentenceDataFrame = spark.createDataFrame(data, schema);
List<Row> data2 = Arrays.asList(
RowFactory.create("Allen", "Apex Tool Group"),
RowFactory.create("Armstrong","Apex Tool Group"),
RowFactory.create("DeWALT","StanleyBlack")
);
StructType schema2 = new StructType(new StructField[] {
new StructField("label2", DataTypes.StringType, false, Metadata.empty()),
new StructField("sentence2", DataTypes.StringType, false, Metadata.empty())
});
Dataset<Row> sentenceDataFrame2 = spark.createDataFrame(data2, schema2);
UDF2<String, String, Boolean> contains = new UDF2<String, String, Boolean>() {
private static final long serialVersionUID = -5239951370238629896L;
@Override
public Boolean call(String t1, String t2) throws Exception {
return t1.contains(t2);
}
};
spark.udf().register("contains", contains, DataTypes.BooleanType);
UDF3<String, String, String, String> replaceWithTerm = new UDF3<String, String, String, String>() {
private static final long serialVersionUID = -2882956931420910207L;
@Override
public String call(String t1, String t2, String t3) throws Exception {
return t1.replaceAll(t2, t3);
}
};
spark.udf().register("replaceWithTerm", replaceWithTerm, DataTypes.StringType);
Dataset<Row> joined = sentenceDataFrame.join(sentenceDataFrame2, callUDF("contains", sentenceDataFrame.col("sentence"), sentenceDataFrame2.col("label2")))
.withColumn("sentence_replaced", callUDF("replaceWithTerm", sentenceDataFrame.col("sentence"), sentenceDataFrame2.col("label2"), sentenceDataFrame2.col("sentence2")))
.select(col("sentence_replaced"));
joined.show(false);
}
输出:
+--------------------------------------------+
|sentence_replaced |
+--------------------------------------------+
|Apex Tool Group jeevi pramod Apex Tool Group|
|sandesh Apex Tool Group jeevi |
|harsha Nischay StanleyBlack |
+--------------------------------------------+
仍然面临类似的问题
输入
艾伦·阿姆斯特朗 jeevi pramod 艾伦
桑德什·阿姆斯特朗·吉维
harsha nischay DeWALT
输出
Apex 工具组 Armstrong jeevi pramod Apex 工具组
Allen Apex 工具组 jeevi pramod Allen
sandesh Apex 工具组 jeevi
harsha nischay StanleyBlack
预期输出
Apex 工具组 Apex 工具组 jeevi pramod Apex 工具组
sandesh Apex 工具组 jeevi
harsha nischay StanleyBlack
连续进行多次替换时得到此输出。
是否必须遵循任何其他方法才能获得正确的输出。?或者这是 UDF 的限制?
我们可以使用 replaceAll 和 UDF 函数来实现预期的输出。
public class Test {
public static void main(String[] args) {
JavaSparkContext sc = new JavaSparkContext(new SparkConf().setAppName("SparkJdbcDs").setMaster("local[*]"));
SQLContext sqlContext = new SQLContext(sc);
SparkSession spark = SparkSession.builder().appName("JavaTokenizerExample").getOrCreate();
List<Row> data = Arrays.asList(
RowFactory.create(0, "Allen jeevi pramod Allen"),
RowFactory.create(1, "sandesh Armstrong jeevi"),
RowFactory.create(2, "harsha Nischay DeWALT")
);
StructType schema = new StructType(new StructField[] {
new StructField("label", DataTypes.IntegerType, false,
Metadata.empty()),
new StructField("sentence", DataTypes.StringType, false,
Metadata.empty()) });
Dataset<Row> sentenceDataFrame = spark.createDataFrame(data, schema);
UDF1 mode = new UDF1<String, String>() {
public String call(final String types) throws Exception {
return types.replaceAll("Allen", "Apex Tool Group")
.replaceAll("Armstrong","Apex Tool Group")
.replaceAll(""DeWALT","StanleyBlack"")
}
};
sqlContext.udf().register("mode", mode, DataTypes.StringType);
sentenceDataFrame.createOrReplaceTempView("people");
Dataset<Row> newDF = sqlContext.sql("SELECT mode(sentence), label FROM people").withColumnRenamed("UDF(sentence)", "sentence");
newDF.show(false);
}
}
输出
+--------------------------------------------+------+
|sentence |label |
+--------------------------------------------+------+
|Apex Tool Group jeevi pramod Apex Tool Group| 0 |
|sandesh Apex Tool Group jeevi | 1 |
|harsha Nischay StanleyBlack | 2 |
+--------------------------------------------+------+