pandasUDF 和 pyarrow 0.15.0

pandasUDF and pyarrow 0.15.0

我最近开始在 EMR 集群上的一些 pyspark 作业 运行 上遇到一堆错误。错误是

java.lang.IllegalArgumentException
    at java.nio.ByteBuffer.allocate(ByteBuffer.java:334)
    at org.apache.arrow.vector.ipc.message.MessageSerializer.readMessage(MessageSerializer.java:543)
    at org.apache.arrow.vector.ipc.message.MessageChannelReader.readNext(MessageChannelReader.java:58)
    at org.apache.arrow.vector.ipc.ArrowStreamReader.readSchema(ArrowStreamReader.java:132)
    at org.apache.arrow.vector.ipc.ArrowReader.initialize(ArrowReader.java:181)
    at org.apache.arrow.vector.ipc.ArrowReader.ensureInitialized(ArrowReader.java:172)
    at org.apache.arrow.vector.ipc.ArrowReader.getVectorSchemaRoot(ArrowReader.java:65)
    at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon.read(ArrowPythonRunner.scala:162)
    at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon.read(ArrowPythonRunner.scala:122)
    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:406)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at org.apache.spark.sql.execution.python.ArrowEvalPythonExec$$anon.<init>(ArrowEvalPythonExec.scala:98)
    at org.apache.spark.sql.execution.python.ArrowEvalPythonExec.evaluate(ArrowEvalPythonExec.scala:96)
    at org.apache.spark.sql.execution.python.EvalPythonExec$$anonfun$doExecute.apply(EvalPythonExec.scala:127)...

它们似乎都发生在 pandas 系列的 apply 函数中。我发现的唯一变化是 pyarrow 已于星期六 (05/10/2019) 更新。测试似乎适用于 0.14.1

所以我的问题是,是否有人知道这是否是新更新的 pyarrow 中的错误,或者是否有一些重大变化会使 pandasUDF 将来难以使用?

这不是错误。我们在 0.15.0 中进行了一项重要的协议更改,使 pyarrow 的默认行为与 Java 中旧版本的 Arrow 不兼容——您的 Spark 环境似乎使用的是旧版本。

你的选择是

  • 从您使用的位置设置环境变量 ARROW_PRE_0_15_IPC_FORMAT=1 Python
  • 暂时降级到 pyarrow < 0.15.0。

希望 Spark 社区能够在 Java 中尽快升级到 0.15.0,这样这个问题就会消失。

这在 http://arrow.apache.org/blog/2019/10/06/0.15.0-release/

中讨论

在 Spark 中尝试以下附录:

spark-submit --deploy-mode cluster --conf spark.yarn.appExecutorEnv.ARROW_PRE_0_15_IPC_FORMAT=1 --conf spark.yarn.appMasterEnv.ARROW_PRE_0_15_IPC_FORMAT=1 --conf spark.executorEnv.ARROW_PRE_0_15_IPC_FORMAT=1