无法在 Azure Databricks 上使用 UDF 解压缩流数据 - Python

Failing to decompress streaming data by using UDF on Azure Databricks - Python

我正在尝试使用 Azure DataBricks 和 python (PySpark) 读取 Azure EventHub GZIP 压缩消息,但使用 UDF 无法处理 BinaryType 数据。

好吧,这是我检查 body

中的内容的部分
df = eventHubStream.withColumn("body", eventHubStream["body"]).select("body")
display(df, truncate=False)

并且显示压缩良好的数据,如下所示:H4sIAKeM0FwC/3VS22rbQBB9z1cIQ6ElWN37JW8baeMKZEmRNk4LhcXUppg2cYncy...

但是,当我尝试将数据发送到我的 UDF 时,它的行为并不像预期的那样。该函数实际上什么都不做,但输出看起来已被转换:

import zlib
from pyspark.sql.types import StringType

def streamDecompress(val: BinaryType()):
  #return zlib.decompress(val)
  return val

func_udf = udf(lambda x: streamDecompress(x), StringType())

df = eventHubStream.withColumn("body", func_udf(eventHubStream["body"])).select("body")
display(df, truncate=False)

这是输出:

[B@49d3f786

所以,正如预期的那样,当我尝试使用 zlib 解压缩时它失败了。

有人知道我是怎么做到的吗?

好吧,这比我想象的要简单得多。我基本上是在尝试显示 byte-like 数据哈哈。

下面的代码解决了问题:

import zlib

def streamDecompress(val):   
  return str(zlib.decompress(val, 15+32))

func_udf = udf(lambda x: streamDecompress(x))

df = eventHubStream.withColumn("body", func_udf(eventHubStream["body"])).select('body')

display(df, truncate=False)

非常感谢。上周我一直在努力对 zStandard 做同样的事情,我想我会添加我的代码片段以防其他人正在寻找类似的解决方案(我在任何地方都找不到):

import zstandard as zstd
def streamDecompress(val):
    return str(zstd.ZstdDecompressor().decompress(val))

my_udf=udf(lambda x: streamDecompress(x))
decompressedStream= pyStreamIn.withColumn("body",my_udf(pyStreamIn["body"]))