写语句失败

Failed to write statements

我正在将 spark 与 cassandra 一起使用,我想将数据写入我的 cassandra table:

CREATE TABLE IF NOT EXISTS MyTable(
 user TEXT,
 date TIMESTAMP,
 event TEXT,
 PRIMARY KEY((user ),date , event)
);

但是我得到了这个错误:

java.io.IOException: Failed to write statements to KeySpace.MyTable.
    at    com.datastax.spark.connector.writer.TableWriter$$anonfun$write.apply(TableWriter.scala:145)
    at com.datastax.spark.connector.writer.TableWriter$$anonfun$write.apply(TableWriter.scala:120)
    at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo.apply(CassandraConnector.scala:100)
    at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo.apply(CassandraConnector.scala:99)
    at com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:151)
    at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:99)
    at com.datastax.spark.connector.writer.TableWriter.write(TableWriter.scala:120)
    at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra.apply(RDDFunctions.scala:36)
    at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra.apply(RDDFunctions.scala:36)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
    at org.apache.spark.scheduler.Task.run(Task.scala:56)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
    Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage.apply(DAGScheduler.scala:1203)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage.apply(DAGScheduler.scala:1202)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed.apply(DAGScheduler.scala:696)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed.apply(DAGScheduler.scala:696)
    at scala.Option.foreach(Option.scala:236)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive.applyOrElse(DAGScheduler.scala:1420)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessActor.aroundReceive(DAGScheduler.scala:1375)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
    at akka.actor.ActorCell.invoke(ActorCell.scala:487)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
    at akka.dispatch.Mailbox.run(Mailbox.scala:220)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
15/04/28 17:57:47 WARN TaskSetManager: Lost task 13.2 in stage 1.0 (TID 43, dev2-cim.aid.fr): TaskKilled (killed intentionally)

以及我的 Cassandra 日志文件中的警告:

WARN  [SharedPool-Worker-2] 2015-04-28 16:45:21,219 BatchStatement.java:243 - Batch of prepared statements for [*********] is of size 8158, exceeding specified threshold of 5120 by 3038

在 Internet 上进行一些搜索后,我发现 link 解释了他如何解决同样的问题: http://progexc.blogspot.fr/2015/03/write-batch-size-error-spark-cassandra.html

所以,现在我修改了我的 spark 算法以添加:

conf.set("spark.cassandra.output.batch.grouping.key", "None")
conf.set("spark.cassandra.output.batch.size.rows", "10")
conf.set("spark.cassandra.output.batch.size.bytes", "2048")

这个值删除了我在 cassandra 日志中收到的警告消息,但我仍然有同样的错误:Failed to write statements

在我的 spark 日志失败中,我发现了这个错误:

Failed to execute: 
    com.datastax.spark.connector.writer.RichBatchStatement@67827d57
    com.datastax.driver.core.exceptions.InvalidQueryException: Key may not be    empty
    at com.datastax.driver.core.Responses$Error.asException(Responses.java:103)
    at com.datastax.driver.core.DefaultResultSetFuture.onSet(DefaultResultSetFuture.java:140)
    at com.datastax.driver.core.RequestHandler.setFinalResult(RequestHandler.java:293)
    at com.datastax.driver.core.RequestHandler.onSet(RequestHandler.java:455)
    at com.datastax.driver.core.Connection$Dispatcher.messageReceived(Connection.java:734)
    at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70)
    at org.jboss.netty.handler.timeout.IdleStateAwareChannelUpstreamHandler.handleUpstream(IdleStateAwareChannelUpstreamHandler.java:36)
    at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
    at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791)
    at org.jboss.netty.handler.timeout.IdleStateHandler.messageReceived(IdleStateHandler.java:294)
    at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70)
    at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
    at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791)
    at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296)
    at org.jboss.netty.handler.codec.oneone.OneToOneDecoder.handleUpstream(OneToOneDecoder.java:70)
    at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
    at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791)
    at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296)
    at org.jboss.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:462)
    at org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:443)
    at org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:303)
    at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70)
    at  org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
    at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:559)
    at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268)
    at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255)
    at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:88)

我通过像节点一样重新启动集群解决了这个问题。 以下是我尝试过的事情。 我也面临同样的问题,我尝试了您在博客中提到的上述所有选项,但没有成功。 我的数据大小是 174gb。总共 174 Gb 数据,我的集群有 3 个节点,每个节点有 16 个内核和 48 GB 内存。 当时我试图一次加载 174gb,我遇到了同样的问题。 之后我将 174 GB 分成 109 个文件,每个 1.6 Gb 并尝试加载,这次我在加载 100 个文件(每个 1.6 GB)后再次遇到同样的问题。 我认为可能是 101 文件中数据的问题。我尝试加载第一个文件并尝试将第一个文件加载到新的 table 中,并尝试将新数据加载到新的 table 中,但所有这些情况都有问题。 然后我认为这是 cassandra 集群的问题并重新启动了集群和节点。 然后问题就消失了。

我遇到了同样的问题,并在上​​面的评论中找到了解决方案(Amine CHERIFI 和 maasg)。

与主键对应的列并不总是用正确的值填充(在我的例子中是一个空字符串“”)。

这触发了错误

ERROR QueryExecutor: Failed to execute: \
com.datastax.spark.connector.writer.RichBatchStatement@26ad2668 \
com.datastax.driver.core.exceptions.InvalidQueryException: Key may not be empty

解决方案是提供一个默认的非空字符串。

在"com/datastax/spark/connector/writer/AsyncExecutor.scala:45 "中添加断点,即可得到真正的异常。

在我的例子中,replication_factor 我的键空间是 2,但我只有一个活着。

如果您 运行 处于 yarn-cluster 模式,请不要忘记使用 yarn logs -applicationId <appId> --appOwner <appOwner> 检查 yarn 上的整个日志。 这给了我比 yarn webUI

上的日志更多的失败原因
Caused by: com.datastax.driver.core.exceptions.UnavailableException: Not enough replicas available for query at consistency LOCAL_QUORUM (2 required but only 1 alive)
at com.datastax.driver.core.Responses$Error.decode(Responses.java:50)
at com.datastax.driver.core.Responses$Error.decode(Responses.java:37)
at com.datastax.driver.core.Message$ProtocolDecoder.decode(Message.java:266)
at com.datastax.driver.core.Message$ProtocolDecoder.decode(Message.java:246)
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:89)
... 11 more

解决方案是在你的 spark-defaults.conf

中设置 spark.cassandra.output.consistency.level=ANY