将 spark 连接到 cassandra,java.lang.IllegalArgumentException: Frame length should be positive

Connect spark to cassandra, java.lang.IllegalArgumentException: Frame length should be positive

我在使用 spark 2.4.4 连接 cassandra 时收到此错误消息

spark-submit \
--packages anguenot/pyspark-cassandra:2.4.0 \
--master spark://MY_IP:9042
--conf spark.cores.max=20 \
--conf spark.driver.host=MyIP \
--conf spark.executor.memory=10GB \
--conf spark.executor.cores=4 \
--conf spark.cores.max=20 \
--conf spark.cassandra.connection.host=MY_HOST_IP \
--conf spark.cassandra.input.split.size_in_mb=256 \
 my_python_Script.py

conf = SparkConf().setAppName("Data connector").setMaster("spark://MY_IP:9042").set("spark.cassandra.connection.host","MY_IP")
sc = SparkContext.getOrCreate(conf=conf)
20/03/26 16:33:37 INFO TransportClientFactory: Successfully created connection to /MY_IP:9042 after 48 ms (0 ms spent in bootstraps)
20/03/26 16:33:37 WARN TransportChannelHandler: Exception in connection from /MY_IP:9042
java.lang.IllegalArgumentException: Frame length should be positive: -8858580467037765640
        at org.spark_project.guava.base.Preconditions.checkArgument(Preconditions.java:119)
        at org.apache.spark.network.util.TransportFrameDecoder.decodeNext(TransportFrameDecoder.java:134)
        at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:81)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
        at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1359)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
        at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:935)
        at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:138)
        at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
        at io.netty.util.concurrent.SingleThreadEventExecutor.run(SingleThreadEventExecutor.java:858)
        at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
        at java.lang.Thread.run(Thread.java:748)
20/03/26 16:33:37 ERROR TransportResponseHandler: Still have 1 requests outstanding when connection from /MY_IP:9042 is closed

20/03/26 16:34:37 ERROR StandaloneSchedulerBackend: Application has been killed. Reason: All masters are unresponsive! Giving up.
20/03/26 16:34:37 WARN StandaloneSchedulerBackend: Application ID is not initialized yet.
20/03/26 16:34:37 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 36735.
20/03/26 16:34:37 INFO NettyBlockTransferService: Server created on MY_IP:36735
20/03/26 16:34:37 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
20/03/26 16:34:37 INFO SparkUI: Stopped Spark web UI at http://MY_IP:4040
20/03/26 16:34:37 INFO StandaloneSchedulerBackend: Shutting down all executors
20/03/26 16:34:37 INFO CoarseGrainedSchedulerBackend$DriverEndpoint: Asking each executor to shut down
20/03/26 16:34:37 WARN StandaloneAppClient$ClientEndpoint: Drop UnregisterApplication(null) because has not yet connected to master
20/03/26 16:34:37 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
20/03/26 16:34:37 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, MY_IP, 36735, None)
20/03/26 16:34:37 INFO BlockManagerMasterEndpoint: Registering block manager MY_IP:36735 with 2.9 GB RAM, BlockManagerId(driver, MY_IP, 36735, None)
20/03/26 16:34:37 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, MY_IP, 36735, None)
20/03/26 16:34:37 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, MY_IP, 36735, None)
20/03/26 16:34:37 INFO MemoryStore: MemoryStore cleared
20/03/26 16:34:37 INFO BlockManager: BlockManager stopped
20/03/26 16:34:37 INFO BlockManagerMaster: BlockManagerMaster stopped
20/03/26 16:34:37 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
20/03/26 16:34:37 INFO SparkContext: Successfully stopped SparkContext
20/03/26 16:34:37 ERROR SparkContext: Error initializing SparkContext.
java.lang.IllegalArgumentException: requirement failed: Can only call getServletHandlers on a running MetricsSystem
        at scala.Predef$.require(Predef.scala:224)
        at org.apache.spark.metrics.MetricsSystem.getServletHandlers(MetricsSystem.scala:91)
        at org.apache.spark.SparkContext.<init>(SparkContext.scala:516)
        at org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:58)
        at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
        at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
        at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:238)
        at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
        at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
        at py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.lang.Thread.run(Thread.java:748)
20/03/26 16:34:37 INFO SparkContext: SparkContext already stopped.
Traceback (most recent call last):
  File "/home/bd/analyze-data/Action.py", line 12, in <module>
    sc = SparkContext.getOrCreate(conf=conf)
  File "/home/spark/python/lib/pyspark.zip/pyspark/context.py", line 367, in getOrCreate
  File "/home/spark/python/lib/pyspark.zip/pyspark/context.py", line 136, in __init__
  File "/home/spark/python/lib/pyspark.zip/pyspark/context.py", line 198, in _do_init
  File "/home/spark/python/lib/pyspark.zip/pyspark/context.py", line 306, in _initialize_context
  File "/home/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1525, in __call__
  File "/home/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling None.org.apache.spark.api.java.JavaSparkContext.
: java.lang.IllegalArgumentException: requirement failed: Can only call getServletHandlers on a running MetricsSystem
        at scala.Predef$.require(Predef.scala:224)
        at org.apache.spark.metrics.MetricsSystem.getServletHandlers(MetricsSystem.scala:91)
        at org.apache.spark.SparkContext.<init>(SparkContext.scala:516)
        at org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:58)
        at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
        at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
        at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:238)
        at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
        at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
        at py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.lang.Thread.run(Thread.java:748)

20/03/26 16:34:37 INFO ShutdownHookManager: Shutdown hook called
20/03/26 16:34:37 INFO ShutdownHookManager: Deleting directory /tmp/spark-7434633b-9d1f-4af3-9021-bfa54a1b0c4a
20/03/26 16:34:37 INFO ShutdownHookManager: Deleting directory /tmp/spark-e2bc07a7-8b51-49d1-a18f-874941d8a7f0


我找了很多地方来解决这个问题,但都没有用。我对 spark 和 cassandra 很陌生。有没有大佬遇到过这个问题请指教

非常感谢

你的问题是你将主地址设置为 spark://MY_IP:9042 的值,但是这个端口属于 Cassandra 本身,所以 spark-submit 正在尝试与 Spark Master 交谈,并到达没有的 Cassandra'不理解这个协议。

如果您使用的是 Spark 集群,则需要将主地址设置为 spark://spark_master_IP:7077 的值。 Cassandra 地址应作为 --conf spark.cassandra.connection.host=MY_HOST_IP

传递