在 Java 中从 Spark 数据帧中提取二进制数据
Extract binary data from Spark dataframe in Java
我有一个具有以下架构的数据框
root
|-- blob: binary (nullable = true)
数据如下所示
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|blob |
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|[1F 8B 08 00 00 00 00 00 00 00 7B C0 C8 F0 EA 1D E3 FC C6 E6 7B 8C B3 DA F7 31 4E EB 3B 94 9C 9F AB 97 98 9B 58 95 9F A7 97 59 92 9A 1B 9F 9B 58 5C 92 5A A4 17 94 9A 93 58 92 99 9F 57 9C 91 59 10 1F 90 58 94 9A 57 12 1F 92 1F EF 9C 91 99 93 12 1F 9E 59 92 11 EF 92 9A 53 92 E8 60 A8 67 D0 99 92 9F 9B 98 99 17 9F 99 D2 0E 36 21 33 A5 A7 2C B5 A8 18 A8 39 BE 24 33 37 B5 AF 2F 37 B1 28 3B B5 A4 20 27 31 39 15 28 D9 5B 5C 9A 94 9B 59 0C 96 CF 4C E9 9B 9C 0C 36 B2 08 C9 BE E2 77 9B 1A BB EE AD EB 56 52 DF D5 D3 E5 64 60 69 66 EC E5 11 10 16 D4 AA C8 D4 9B DD C0 FF B4 75 4E C7 94 BE C3 4C 8B FA 94 BA B3 FB D5 98 98 8B 9F DE D1 9A 70 01 00 F6 9B E3 17 DA 00 00 00]|
我想在 dataframe 上使用 map 函数来读取此列值并执行一些操作。
d.map(relationshipMapFunction, encoder)
在 relationshipMapFunction
我试图提取上面的 blob。
public class RelationshipMapFunction implements MapFunction<Row, String> {
private static final long serialVersionUID = 6766320395808127072L;
private static Logger LOG = Logger.getLogger(JobRunner.class);
@Override
public String call(Row row) throws Exception {
// Code to read binary data and perform some actions
}
}
如何从调用方法中的 row
变量中提取字节数组?
您有多种方法可以做到这一点,但哪种方法最好取决于您的要求。让我们看看其中的一些。
带 Dataframe 的地图函数
要按照您显示的代码,然后直接从 Row 类型的对象中提取字节数组,您可以使用 Row
class 的 getAs(int)
方法:
public class RelationshipMapFunction implements MapFunction<Row, String> {
@Override
public String call(Row row) throws Exception {
final byte[] blob = row.<byte[]>getAs(0); // 0 is the index of the blob column in the dataframe
return transformBlob(blob);
}
}
然后您可以将它与您的 Dataframe 一起使用:
Dataset<String> mappedDs = df.map(new RelationshipMapFunction(), Encoders.STRING());
具有类型化数据集的映射函数
您可以使用编码器将您的 Dataframe 转换为类型化数据集,例如,如果您的 Dataframe 仅包含 byte[]
类型的列,您可以直接使用 Encoders.BINARY()
。在这种情况下,map 函数将接收类型为 byte[]
的对象,而不是 Row
.
public class RelationshipMapFunction implements MapFunction<byte[], String> {
@Override
public String call(byte[] blob) {
return transformBlob(blob);
}
}
然后您可以在应用编码器后将它与您的 Dataframe 一起使用:
Dataset<String> mappedDs = df
.as(Encoders.BINARY())
.map(new RelationshipMapFunction(), Encoders.STRING());
在这两种情况下,如果 map 函数很简单,您可以将其替换为 lambda。
其他选项包括使用 UDF。
请注意,如果您需要应用的转换就像从字节数组中解码 UTF8 字符串一样简单,您可以直接使用 Spark SQL 函数(从二进制到字符串的转换就可以完成这项工作) .
我有一个具有以下架构的数据框
root
|-- blob: binary (nullable = true)
数据如下所示
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|blob |
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|[1F 8B 08 00 00 00 00 00 00 00 7B C0 C8 F0 EA 1D E3 FC C6 E6 7B 8C B3 DA F7 31 4E EB 3B 94 9C 9F AB 97 98 9B 58 95 9F A7 97 59 92 9A 1B 9F 9B 58 5C 92 5A A4 17 94 9A 93 58 92 99 9F 57 9C 91 59 10 1F 90 58 94 9A 57 12 1F 92 1F EF 9C 91 99 93 12 1F 9E 59 92 11 EF 92 9A 53 92 E8 60 A8 67 D0 99 92 9F 9B 98 99 17 9F 99 D2 0E 36 21 33 A5 A7 2C B5 A8 18 A8 39 BE 24 33 37 B5 AF 2F 37 B1 28 3B B5 A4 20 27 31 39 15 28 D9 5B 5C 9A 94 9B 59 0C 96 CF 4C E9 9B 9C 0C 36 B2 08 C9 BE E2 77 9B 1A BB EE AD EB 56 52 DF D5 D3 E5 64 60 69 66 EC E5 11 10 16 D4 AA C8 D4 9B DD C0 FF B4 75 4E C7 94 BE C3 4C 8B FA 94 BA B3 FB D5 98 98 8B 9F DE D1 9A 70 01 00 F6 9B E3 17 DA 00 00 00]|
我想在 dataframe 上使用 map 函数来读取此列值并执行一些操作。
d.map(relationshipMapFunction, encoder)
在 relationshipMapFunction
我试图提取上面的 blob。
public class RelationshipMapFunction implements MapFunction<Row, String> {
private static final long serialVersionUID = 6766320395808127072L;
private static Logger LOG = Logger.getLogger(JobRunner.class);
@Override
public String call(Row row) throws Exception {
// Code to read binary data and perform some actions
}
}
如何从调用方法中的 row
变量中提取字节数组?
您有多种方法可以做到这一点,但哪种方法最好取决于您的要求。让我们看看其中的一些。
带 Dataframe 的地图函数
要按照您显示的代码,然后直接从 Row 类型的对象中提取字节数组,您可以使用 Row
class 的 getAs(int)
方法:
public class RelationshipMapFunction implements MapFunction<Row, String> {
@Override
public String call(Row row) throws Exception {
final byte[] blob = row.<byte[]>getAs(0); // 0 is the index of the blob column in the dataframe
return transformBlob(blob);
}
}
然后您可以将它与您的 Dataframe 一起使用:
Dataset<String> mappedDs = df.map(new RelationshipMapFunction(), Encoders.STRING());
具有类型化数据集的映射函数
您可以使用编码器将您的 Dataframe 转换为类型化数据集,例如,如果您的 Dataframe 仅包含 byte[]
类型的列,您可以直接使用 Encoders.BINARY()
。在这种情况下,map 函数将接收类型为 byte[]
的对象,而不是 Row
.
public class RelationshipMapFunction implements MapFunction<byte[], String> {
@Override
public String call(byte[] blob) {
return transformBlob(blob);
}
}
然后您可以在应用编码器后将它与您的 Dataframe 一起使用:
Dataset<String> mappedDs = df
.as(Encoders.BINARY())
.map(new RelationshipMapFunction(), Encoders.STRING());
在这两种情况下,如果 map 函数很简单,您可以将其替换为 lambda。
其他选项包括使用 UDF。
请注意,如果您需要应用的转换就像从字节数组中解码 UTF8 字符串一样简单,您可以直接使用 Spark SQL 函数(从二进制到字符串的转换就可以完成这项工作) .