使用 sparklyr 时无法在本地 Spark 连接中加载 .csv 数据

Can't load .csv data in a local Spark connection when using sparklyr

背景:我对整个 Spark 平台和概念完全陌生,我正在尝试学习如何通过 R 和 [=13= 来操作它].我开始关注有关该主题的在线课程,并尝试将其用于我自己的数据分析,以此作为学习它的一种方式。

问题:我正在尝试加载一个 6.3gb 的 csv 数据集(~3000 万行,~20 列)但我收到以下错误(据我所知,相同的块不断重复,我在这里给出其中的前 3 个,否则我将达到 post 的字符限制)。代码运行但在 17 分钟后退出并出现以下错误(未加载数据):

Error: java.lang.IllegalStateException: Cannot call methods on a stopped SparkContext.
This stopped SparkContext was created at:

org.apache.spark.SparkContext.getOrCreate(SparkContext.scala)
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
java.lang.reflect.Method.invoke(Unknown Source)
sparklyr.Invoke.invoke(invoke.scala:139)
sparklyr.StreamHandler.handleMethodCall(stream.scala:123)
sparklyr.StreamHandler.read(stream.scala:66)
sparklyr.BackendHandler.channelRead0(handler.scala:51)
sparklyr.BackendHandler.channelRead0(handler.scala:4)
io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:367)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:353)
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:346)
io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:367)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:353)
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:346)
io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:293)
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:267)

The currently active SparkContext was created at:

org.apache.spark.SparkContext.getOrCreate(SparkContext.scala)
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
java.lang.reflect.Method.invoke(Unknown Source)
sparklyr.Invoke.invoke(invoke.scala:139)
sparklyr.StreamHandler.handleMethodCall(stream.scala:123)
sparklyr.StreamHandler.read(stream.scala:66)
sparklyr.BackendHandler.channelRead0(handler.scala:51)
sparklyr.BackendHandler.channelRead0(handler.scala:4)
io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:367)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:353)
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:346)
io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:367)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:353)
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:346)
io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:293)
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:267)

    at org.apache.spark.SparkContext.assertNotStopped(SparkContext.scala:100)
    at org.apache.spark.SparkContext$$anonfun$parallelize.apply(SparkContext.scala:716)
    at org.apache.spark.SparkContext$$anonfun$parallelize.apply(SparkContext.scala:715)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.SparkContext.withScope(SparkContext.scala:701)
    at org.apache.spark.SparkContext.parallelize(SparkContext.scala:715)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute.apply(SparkPlan.scala:114)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute.apply(SparkPlan.scala:114)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery.apply(SparkPlan.scala:135)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:132)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:113)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:87)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:87)
    at org.apache.spark.sql.Dataset.<init>(Dataset.scala:185)
    at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:64)
    at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:592)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
    at java.lang.reflect.Method.invoke(Unknown Source)
    at sparklyr.Invoke.invoke(invoke.scala:139)
    at sparklyr.StreamHandler.handleMethodCall(stream.scala:123)
    at sparklyr.StreamHandler.read(stream.scala:66)
    at sparklyr.BackendHandler.channelRead0(handler.scala:51)
    at sparklyr.BackendHandler.channelRead0(handler.scala:4)
    at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:367)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:353)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:346)
    at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:367)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:353)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:346)
    at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:293)
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:267)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:367)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:353)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:346)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1294)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:367)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:353)
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:911)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:652)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:575)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:489)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:451)
    at io.netty.util.concurrent.SingleThreadEventExecutor.run(SingleThreadEventExecutor.java:140)
    at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
    at java.lang.Thread.run(Unknown Source)
Error: java.lang.IllegalStateException: Cannot call methods on a stopped SparkContext.
This stopped SparkContext was created at:

这是我的 R 代码:

library(sparklyr)
spark_install(version = "2.1.0")
sc <- spark_connect(master = "local")
testdata = spark_read_csv(sc, name = "testdata", path = ...)

在 Spark 之外,我可以加载文件,例如使用 read_csv。我用谷歌搜索了这个问题,有人提到潜在的原因是 OutOfMemory 问题 - 我不太确定这是否是问题所在以及如何解决它。

如果有人能指出如何调试和修复它,我将不胜感激!

干杯

几件事:

  • 除非你使用小数据,否则你应该忘记 local 模式。它主要用于测试和小规模实验,而不是用于处理中等规模的数据。

    因为它只对驱动程序和执行程序代码使用单个 JVM,所以可能会出现严重的故障恢复,如果处理擅离职守,您可能会丢失整个会话(这里似乎就是这种情况)。

    因此,如果您想在中等大小的数据上进行本地测试,请考虑使用 standalone mode,否则只需缩小数据集。

    旁注 local 模式仅使用一个处理线程。即使对于测试,使用 local[n](对于 n 线程)或 local[*](对于所有可用内核)也更有意义。

  • 准备好调整配置,因为默认值非常保守 - 例如 spark.driver.memory 默认为 1 GB - 你可能会在独立模式下摆脱它,但不是当所有组件都嵌入到单个 JVM 中时。

  • 不要相信 sparklyr 默认值。

    sparklyr 开发人员做出了一个非常不幸的选择,默认情况下急切地将数据缓存在内存中。它不仅违背了 Spark 默认设置(,使用 MEMORY_AND_DISK 代替 Dataset API),而且很少对实际大小的数据提供任何实际好处,而且还会干扰 Spark优化器以一些丑陋的方式(最显着的是防止投影和选择下推)。

    所以养成在适用时使用 memory = FALSE 的习惯:

    spark_read_csv(sc, name = "testdata", memory = FALSE, path = ...)
    
  • 为 reader 提供模式而不是使用模式推断。参见 SparklyR: Convert directly to parquet