无法在 Azure Databricks 上使用 UDF 解压缩流数据 - Python
Failing to decompress streaming data by using UDF on Azure Databricks - Python
我正在尝试使用 Azure DataBricks 和 python (PySpark) 读取 Azure EventHub GZIP 压缩消息,但使用 UDF 无法处理 BinaryType 数据。
好吧,这是我检查 body
中的内容的部分
df = eventHubStream.withColumn("body", eventHubStream["body"]).select("body")
display(df, truncate=False)
并且显示压缩良好的数据,如下所示:H4sIAKeM0FwC/3VS22rbQBB9z1cIQ6ElWN37JW8baeMKZEmRNk4LhcXUppg2cYncy...
但是,当我尝试将数据发送到我的 UDF 时,它的行为并不像预期的那样。该函数实际上什么都不做,但输出看起来已被转换:
import zlib
from pyspark.sql.types import StringType
def streamDecompress(val: BinaryType()):
#return zlib.decompress(val)
return val
func_udf = udf(lambda x: streamDecompress(x), StringType())
df = eventHubStream.withColumn("body", func_udf(eventHubStream["body"])).select("body")
display(df, truncate=False)
这是输出:
[B@49d3f786
所以,正如预期的那样,当我尝试使用 zlib 解压缩时它失败了。
有人知道我是怎么做到的吗?
好吧,这比我想象的要简单得多。我基本上是在尝试显示 byte-like 数据哈哈。
下面的代码解决了问题:
import zlib
def streamDecompress(val):
return str(zlib.decompress(val, 15+32))
func_udf = udf(lambda x: streamDecompress(x))
df = eventHubStream.withColumn("body", func_udf(eventHubStream["body"])).select('body')
display(df, truncate=False)
非常感谢。上周我一直在努力对 zStandard 做同样的事情,我想我会添加我的代码片段以防其他人正在寻找类似的解决方案(我在任何地方都找不到):
import zstandard as zstd
def streamDecompress(val):
return str(zstd.ZstdDecompressor().decompress(val))
my_udf=udf(lambda x: streamDecompress(x))
decompressedStream= pyStreamIn.withColumn("body",my_udf(pyStreamIn["body"]))
我正在尝试使用 Azure DataBricks 和 python (PySpark) 读取 Azure EventHub GZIP 压缩消息,但使用 UDF 无法处理 BinaryType 数据。
好吧,这是我检查 body
中的内容的部分df = eventHubStream.withColumn("body", eventHubStream["body"]).select("body")
display(df, truncate=False)
并且显示压缩良好的数据,如下所示:H4sIAKeM0FwC/3VS22rbQBB9z1cIQ6ElWN37JW8baeMKZEmRNk4LhcXUppg2cYncy...
但是,当我尝试将数据发送到我的 UDF 时,它的行为并不像预期的那样。该函数实际上什么都不做,但输出看起来已被转换:
import zlib
from pyspark.sql.types import StringType
def streamDecompress(val: BinaryType()):
#return zlib.decompress(val)
return val
func_udf = udf(lambda x: streamDecompress(x), StringType())
df = eventHubStream.withColumn("body", func_udf(eventHubStream["body"])).select("body")
display(df, truncate=False)
这是输出:
[B@49d3f786
所以,正如预期的那样,当我尝试使用 zlib 解压缩时它失败了。
有人知道我是怎么做到的吗?
好吧,这比我想象的要简单得多。我基本上是在尝试显示 byte-like 数据哈哈。
下面的代码解决了问题:
import zlib
def streamDecompress(val):
return str(zlib.decompress(val, 15+32))
func_udf = udf(lambda x: streamDecompress(x))
df = eventHubStream.withColumn("body", func_udf(eventHubStream["body"])).select('body')
display(df, truncate=False)
非常感谢。上周我一直在努力对 zStandard 做同样的事情,我想我会添加我的代码片段以防其他人正在寻找类似的解决方案(我在任何地方都找不到):
import zstandard as zstd
def streamDecompress(val):
return str(zstd.ZstdDecompressor().decompress(val))
my_udf=udf(lambda x: streamDecompress(x))
decompressedStream= pyStreamIn.withColumn("body",my_udf(pyStreamIn["body"]))