如何使用 SparkR 取消嵌套数据?
How to unnest data with SparkR?
使用SparkR
嵌套数组怎么会是"exploded along"?我试过像这样使用 explode
:
dat <- nested_spark_df %>%
mutate(a=explode(metadata)) %>%
head()
但是上面虽然不会抛出异常,但是并没有将metadata
中的嵌套字段提升到顶层。本质上,我正在寻求类似于 Hive 的 LATERAL VIEW explode()
功能的行为,而不依赖于 HiveContext
.
请注意,在代码片段中,我使用的是通过 SparkRext
启用的 NSE。我认为等效的直接 SparkR
会像 ... %>% mutate(a=explode(nested_spark_df$metadata)) ...
或类似的东西。
编辑
我试过在 SparkR::sql
函数中使用 LATERAL VIEW explode(...)
。它似乎适用于 Parquet 和 ORC 数据。但是,在处理嵌套的 Avro 数据时,我尝试了:
dat <- collect(sql(HiveContext,
paste0("SELECT a.id, ax.arrival_airport, x.arrival_runway ",
"FROM avrodb.flight a ",
"LATERAL VIEW explode(a.metadata) a AS ax ",
"WHERE ax.arrival_airport='ATL'")))
只是得到以下错误,尽管当用包含等效数据的 parquetdb
换出 avrodb
时,它符合我的预期。
Error in invokeJava(isStatic = TRUE, className, methodName, ...) :
org.apache.spark.SparkException: Job aborted due to stage failure: Task 4 in stage 5.0 failed 4 times, most recent failure: Lost task 4.3 in stage 5.0 (TID 1345, dev-dn04.myorg.org): org.apache.avro.AvroTypeException: Found metadata, expecting union
at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:292)
at org.apache.avro.io.parsing.Parser.advance(Parser.java:88)
at org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:267)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:155)
at org.apache.avro.generic.GenericDatumReader.readArray(GenericDatumReader.java:219)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:155)
at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:193)
at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:183)
at org.apache.avr
Calls: <Anonymous> ... collect -> collect -> .local -> callJStatic -> invokeJava
尽管事实上我在启动 Spark 时包含了 DataBricks Avro 包。使用 SQLContext
(而不是 HiveContext
)用 spark 读取相同的数据工作正常,除了我还没有弄清楚如何有效地使用 explode()
函数。我还确认这不是数据本身的问题,方法是使用我尝试过的相同 HQL 语句通过 Hive 成功查询相同的文件 运行 SparkR::sql(HiveContext, hql)
此时,在 dplyr 中使用数组列是很棘手的,例如,请参阅 this issue. Probably best to use explode()
via Spark. Also note that there is overhead associated with using the DSL version of explode
(see this answer) 因此您可能希望通过 sql()
.[=14 使用 SQL 形式=]
非常感谢@Sim。不过,我终于想出了一个明智的方法。关键是在 explode
操作之后,当所有分解值仍然嵌套一层时,必须执行 select
。例如:
dat <- nested_spark_df %>%
mutate(a=explode(nested_spark_df$metadata)) %>%
select("id", "a.fld1", "a.fld2")
这将产生一个包含 3 列的 SparkR DataFrame
对象:id
、fld1
和 fld2
(没有 a.
前缀)。
我的心理障碍是我试图让 explode 表现得像 PIG 的 flatten
,它会在架构的顶层创建一堆新的字段名称。
使用SparkR
嵌套数组怎么会是"exploded along"?我试过像这样使用 explode
:
dat <- nested_spark_df %>%
mutate(a=explode(metadata)) %>%
head()
但是上面虽然不会抛出异常,但是并没有将metadata
中的嵌套字段提升到顶层。本质上,我正在寻求类似于 Hive 的 LATERAL VIEW explode()
功能的行为,而不依赖于 HiveContext
.
请注意,在代码片段中,我使用的是通过 SparkRext
启用的 NSE。我认为等效的直接 SparkR
会像 ... %>% mutate(a=explode(nested_spark_df$metadata)) ...
或类似的东西。
编辑
我试过在 SparkR::sql
函数中使用 LATERAL VIEW explode(...)
。它似乎适用于 Parquet 和 ORC 数据。但是,在处理嵌套的 Avro 数据时,我尝试了:
dat <- collect(sql(HiveContext,
paste0("SELECT a.id, ax.arrival_airport, x.arrival_runway ",
"FROM avrodb.flight a ",
"LATERAL VIEW explode(a.metadata) a AS ax ",
"WHERE ax.arrival_airport='ATL'")))
只是得到以下错误,尽管当用包含等效数据的 parquetdb
换出 avrodb
时,它符合我的预期。
Error in invokeJava(isStatic = TRUE, className, methodName, ...) :
org.apache.spark.SparkException: Job aborted due to stage failure: Task 4 in stage 5.0 failed 4 times, most recent failure: Lost task 4.3 in stage 5.0 (TID 1345, dev-dn04.myorg.org): org.apache.avro.AvroTypeException: Found metadata, expecting union
at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:292)
at org.apache.avro.io.parsing.Parser.advance(Parser.java:88)
at org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:267)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:155)
at org.apache.avro.generic.GenericDatumReader.readArray(GenericDatumReader.java:219)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:155)
at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:193)
at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:183)
at org.apache.avr
Calls: <Anonymous> ... collect -> collect -> .local -> callJStatic -> invokeJava
尽管事实上我在启动 Spark 时包含了 DataBricks Avro 包。使用 SQLContext
(而不是 HiveContext
)用 spark 读取相同的数据工作正常,除了我还没有弄清楚如何有效地使用 explode()
函数。我还确认这不是数据本身的问题,方法是使用我尝试过的相同 HQL 语句通过 Hive 成功查询相同的文件 运行 SparkR::sql(HiveContext, hql)
此时,在 dplyr 中使用数组列是很棘手的,例如,请参阅 this issue. Probably best to use explode()
via Spark. Also note that there is overhead associated with using the DSL version of explode
(see this answer) 因此您可能希望通过 sql()
.[=14 使用 SQL 形式=]
非常感谢@Sim。不过,我终于想出了一个明智的方法。关键是在 explode
操作之后,当所有分解值仍然嵌套一层时,必须执行 select
。例如:
dat <- nested_spark_df %>%
mutate(a=explode(nested_spark_df$metadata)) %>%
select("id", "a.fld1", "a.fld2")
这将产生一个包含 3 列的 SparkR DataFrame
对象:id
、fld1
和 fld2
(没有 a.
前缀)。
我的心理障碍是我试图让 explode 表现得像 PIG 的 flatten
,它会在架构的顶层创建一堆新的字段名称。