问:数据集<Row> 到空数据集 Spark Java 的平面图
Q : Dataset<Row> flatmap to empty dataset Spark Java
我想删除数据集的重复值
例如:
+----------+---------------+--------------------+--------------------+---------+----+-------------+
| e_key|f_timestamp_day| key| value|f_country|f_os|received_date|
+----------+---------------+--------------------+--------------------+---------+----+-------------+
| Tryout| 2020-04-01| item_guid_list| a^a^a^b | FR| iOS| 2020-04-01|
| Tryout| 2020-04-01| sku_list| c^c^d^e^f^f| FR| iOS| 2020-04-01|
至
+----------+---------------+--------------------+--------------------+---------+----+-------------+
| e_key|f_timestamp_day| key| value|f_country|f_os|received_date|
+----------+---------------+--------------------+--------------------+---------+----+-------------+
| Tryout| 2020-04-01| item_guid_list| a | FR| iOS| 2020-04-01|
| Tryout| 2020-04-01| item_guid_list| b | FR| iOS| 2020-04-01|
| Tryout| 2020-04-01| sku_list| c | FR| iOS| 2020-04-01|
| Tryout| 2020-04-01| sku_list| d | FR| iOS| 2020-04-01|
| Tryout| 2020-04-01| sku_list| e | FR| iOS| 2020-04-01|
| Tryout| 2020-04-01| sku_list| f | FR| iOS| 2020-04-01|
但是
当我使用平面图时
结果是
++
||
++
||
||
||
||
||
我的密码是
StructType structType = new StructType();
structType.add("e_key", DataTypes.StringType);
structType.add("f_timestamp_day", DataTypes.StringType);
structType.add("key", DataTypes.StringType);
structType.add("value", DataTypes.StringType);
structType.add("f_country", DataTypes.StringType);
structType.add("f_os", DataTypes.StringType);
structType.add("received_date", DataTypes.StringType);
Dataset<Row> drop_duplicate_feature =
explode_events.flatMap(
(FlatMapFunction<Row, Row>)row->{
List<Row> list = new ArrayList<Row>();
String value = row.getString(3);
String[] array_of_value = value.split("\^");
array_of_value = new HashSet<String>(Arrays.asList(array_of_value)).toArray(new String[0]);
for(int index = 0; index < array_of_value.length; index++){
list.add(
RowFactory.create(row.get(0),row.get(1),row.get(2),array_of_value[index],row.get(4),row.get(5),row.get(6))
);
}
return list.iterator();
}
, RowEncoder.apply(structType)
);
我使用平面图生成不同的行并将其添加到列表中
为什么 RowEncoder.apply() 不起作用?
您可以使用 Spark SQL 和 org.apache.spark.sql.functions 获得相同的结果:
explode_events.select(
$"e_key",
$"f_timestamp_day",
$"key",
explode(split($"value","\^")),
$"f_country",
$"f_os",
$"received_date"
).show()
试试这个-
1。加载提供的测试数据
String data = " e_key|f_timestamp_day| key| value|f_country|f_os|received_date\n" +
" Tryout| 2020-04-01| item_guid_list| a^a^a^b | FR| iOS| 2020-04-01\n" +
" Tryout| 2020-04-01| sku_list| c^c^d^e^f^f| FR| iOS| 2020-04-01";
List<String> list1 = Arrays.stream(data.split(System.lineSeparator()))
.map(s -> Arrays.stream(s.split("\|"))
.map(s1 -> s1.replaceAll("^[ \t]+|[ \t]+$", ""))
.collect(Collectors.joining(","))
)
.collect(Collectors.toList());
Dataset<Row> dataset = spark.read()
.option("header", true)
.option("inferSchema", true)
.option("sep", ",")
.option("nullValue", "null")
.csv(spark.createDataset(list1, Encoders.STRING()));
dataset.show(false);
dataset.printSchema();
/**
* +------+-------------------+--------------+-----------+---------+----+-------------------+
* |e_key |f_timestamp_day |key |value |f_country|f_os|received_date |
* +------+-------------------+--------------+-----------+---------+----+-------------------+
* |Tryout|2020-04-01 00:00:00|item_guid_list|a^a^a^b |FR |iOS |2020-04-01 00:00:00|
* |Tryout|2020-04-01 00:00:00|sku_list |c^c^d^e^f^f|FR |iOS |2020-04-01 00:00:00|
* +------+-------------------+--------------+-----------+---------+----+-------------------+
*
* root
* |-- e_key: string (nullable = true)
* |-- f_timestamp_day: timestamp (nullable = true)
* |-- key: string (nullable = true)
* |-- value: string (nullable = true)
* |-- f_country: string (nullable = true)
* |-- f_os: string (nullable = true)
* |-- received_date: timestamp (nullable = true)
*/
从数组中删除 distinct 并分解它
dataset.withColumn("value", explode(array_distinct(split(col("value"), "\^"))))
.show(false);
/**
* +------+-------------------+--------------+-----+---------+----+-------------------+
* |e_key |f_timestamp_day |key |value|f_country|f_os|received_date |
* +------+-------------------+--------------+-----+---------+----+-------------------+
* |Tryout|2020-04-01 00:00:00|item_guid_list|a |FR |iOS |2020-04-01 00:00:00|
* |Tryout|2020-04-01 00:00:00|item_guid_list|b |FR |iOS |2020-04-01 00:00:00|
* |Tryout|2020-04-01 00:00:00|sku_list |c |FR |iOS |2020-04-01 00:00:00|
* |Tryout|2020-04-01 00:00:00|sku_list |d |FR |iOS |2020-04-01 00:00:00|
* |Tryout|2020-04-01 00:00:00|sku_list |e |FR |iOS |2020-04-01 00:00:00|
* |Tryout|2020-04-01 00:00:00|sku_list |f |FR |iOS |2020-04-01 00:00:00|
* +------+-------------------+--------------+-----+---------+----+-------------------+
*/
使用 split 函数拆分值 col 并使用 array_disticnct 和 explode 函数来实现结果。
from pyspark.sql.functions import *
#create df1
df1= df.withColumn("value",explode(array_distinct((split("VALUES","\^")))))
我想删除数据集的重复值
例如:
+----------+---------------+--------------------+--------------------+---------+----+-------------+
| e_key|f_timestamp_day| key| value|f_country|f_os|received_date|
+----------+---------------+--------------------+--------------------+---------+----+-------------+
| Tryout| 2020-04-01| item_guid_list| a^a^a^b | FR| iOS| 2020-04-01|
| Tryout| 2020-04-01| sku_list| c^c^d^e^f^f| FR| iOS| 2020-04-01|
至
+----------+---------------+--------------------+--------------------+---------+----+-------------+
| e_key|f_timestamp_day| key| value|f_country|f_os|received_date|
+----------+---------------+--------------------+--------------------+---------+----+-------------+
| Tryout| 2020-04-01| item_guid_list| a | FR| iOS| 2020-04-01|
| Tryout| 2020-04-01| item_guid_list| b | FR| iOS| 2020-04-01|
| Tryout| 2020-04-01| sku_list| c | FR| iOS| 2020-04-01|
| Tryout| 2020-04-01| sku_list| d | FR| iOS| 2020-04-01|
| Tryout| 2020-04-01| sku_list| e | FR| iOS| 2020-04-01|
| Tryout| 2020-04-01| sku_list| f | FR| iOS| 2020-04-01|
但是
当我使用平面图时
结果是
++
||
++
||
||
||
||
||
我的密码是
StructType structType = new StructType();
structType.add("e_key", DataTypes.StringType);
structType.add("f_timestamp_day", DataTypes.StringType);
structType.add("key", DataTypes.StringType);
structType.add("value", DataTypes.StringType);
structType.add("f_country", DataTypes.StringType);
structType.add("f_os", DataTypes.StringType);
structType.add("received_date", DataTypes.StringType);
Dataset<Row> drop_duplicate_feature =
explode_events.flatMap(
(FlatMapFunction<Row, Row>)row->{
List<Row> list = new ArrayList<Row>();
String value = row.getString(3);
String[] array_of_value = value.split("\^");
array_of_value = new HashSet<String>(Arrays.asList(array_of_value)).toArray(new String[0]);
for(int index = 0; index < array_of_value.length; index++){
list.add(
RowFactory.create(row.get(0),row.get(1),row.get(2),array_of_value[index],row.get(4),row.get(5),row.get(6))
);
}
return list.iterator();
}
, RowEncoder.apply(structType)
);
我使用平面图生成不同的行并将其添加到列表中
为什么 RowEncoder.apply() 不起作用?
您可以使用 Spark SQL 和 org.apache.spark.sql.functions 获得相同的结果:
explode_events.select(
$"e_key",
$"f_timestamp_day",
$"key",
explode(split($"value","\^")),
$"f_country",
$"f_os",
$"received_date"
).show()
试试这个-
1。加载提供的测试数据
String data = " e_key|f_timestamp_day| key| value|f_country|f_os|received_date\n" +
" Tryout| 2020-04-01| item_guid_list| a^a^a^b | FR| iOS| 2020-04-01\n" +
" Tryout| 2020-04-01| sku_list| c^c^d^e^f^f| FR| iOS| 2020-04-01";
List<String> list1 = Arrays.stream(data.split(System.lineSeparator()))
.map(s -> Arrays.stream(s.split("\|"))
.map(s1 -> s1.replaceAll("^[ \t]+|[ \t]+$", ""))
.collect(Collectors.joining(","))
)
.collect(Collectors.toList());
Dataset<Row> dataset = spark.read()
.option("header", true)
.option("inferSchema", true)
.option("sep", ",")
.option("nullValue", "null")
.csv(spark.createDataset(list1, Encoders.STRING()));
dataset.show(false);
dataset.printSchema();
/**
* +------+-------------------+--------------+-----------+---------+----+-------------------+
* |e_key |f_timestamp_day |key |value |f_country|f_os|received_date |
* +------+-------------------+--------------+-----------+---------+----+-------------------+
* |Tryout|2020-04-01 00:00:00|item_guid_list|a^a^a^b |FR |iOS |2020-04-01 00:00:00|
* |Tryout|2020-04-01 00:00:00|sku_list |c^c^d^e^f^f|FR |iOS |2020-04-01 00:00:00|
* +------+-------------------+--------------+-----------+---------+----+-------------------+
*
* root
* |-- e_key: string (nullable = true)
* |-- f_timestamp_day: timestamp (nullable = true)
* |-- key: string (nullable = true)
* |-- value: string (nullable = true)
* |-- f_country: string (nullable = true)
* |-- f_os: string (nullable = true)
* |-- received_date: timestamp (nullable = true)
*/
从数组中删除 distinct 并分解它
dataset.withColumn("value", explode(array_distinct(split(col("value"), "\^"))))
.show(false);
/**
* +------+-------------------+--------------+-----+---------+----+-------------------+
* |e_key |f_timestamp_day |key |value|f_country|f_os|received_date |
* +------+-------------------+--------------+-----+---------+----+-------------------+
* |Tryout|2020-04-01 00:00:00|item_guid_list|a |FR |iOS |2020-04-01 00:00:00|
* |Tryout|2020-04-01 00:00:00|item_guid_list|b |FR |iOS |2020-04-01 00:00:00|
* |Tryout|2020-04-01 00:00:00|sku_list |c |FR |iOS |2020-04-01 00:00:00|
* |Tryout|2020-04-01 00:00:00|sku_list |d |FR |iOS |2020-04-01 00:00:00|
* |Tryout|2020-04-01 00:00:00|sku_list |e |FR |iOS |2020-04-01 00:00:00|
* |Tryout|2020-04-01 00:00:00|sku_list |f |FR |iOS |2020-04-01 00:00:00|
* +------+-------------------+--------------+-----+---------+----+-------------------+
*/
使用 split 函数拆分值 col 并使用 array_disticnct 和 explode 函数来实现结果。
from pyspark.sql.functions import *
#create df1
df1= df.withColumn("value",explode(array_distinct((split("VALUES","\^")))))