如何处理 spark 作业中的 cassandra 连接?
How to handle cassandra connections in spark job?
我正在对使用 spark cassandra 连接器和 cassandra 驱动程序的 spark 应用程序进行压力测试。
在我的应用程序中,我使用 cassandra 驱动程序 select 来自 C* table 的最新值。
只要通过 spark-job 服务器一个一个地提交 spark 作业,这就可以正常工作。
但是如果多个作业提交(请求数 = 80)同时发生,那么我会得到如下异常。
org.jboss.netty.channel.ChannelException: Failed to create a selector.
at org.jboss.netty.channel.socket.nio.AbstractNioSelector.openSelector(AbstractNioSelector.java:343) ~[netty-3.8.0.Final.jar:na]
at org.jboss.netty.channel.socket.nio.AbstractNioSelector.<init>(AbstractNioSelector.java:100) ~[netty-3.8.0.Final.jar:na]
at org.jboss.netty.channel.socket.nio.AbstractNioWorker.<init>(AbstractNioWorker.java:52) ~[netty-3.8.0.Final.jar:na]
at org.jboss.netty.channel.socket.nio.NioWorker.<init>(NioWorker.java:45) ~[netty-3.8.0.Final.jar:na]
at org.jboss.netty.channel.socket.nio.NioWorkerPool.createWorker(NioWorkerPool.java:45) ~[netty-3.8.0.Final.jar:na]
at org.jboss.netty.channel.socket.nio.NioWorkerPool.createWorker(NioWorkerPool.java:28) ~[netty-3.8.0.Final.jar:na]
at org.jboss.netty.channel.socket.nio.AbstractNioWorkerPool.newWorker(AbstractNioWorkerPool.java:143) ~[netty-3.8.0.Final.jar:na]
at org.jboss.netty.channel.socket.nio.AbstractNioWorkerPool.init(AbstractNioWorkerPool.java:81) ~[netty-3.8.0.Final.jar:na]
at org.jboss.netty.channel.socket.nio.NioWorkerPool.<init>(NioWorkerPool.java:39) ~[netty-3.8.0.Final.jar:na]
at org.jboss.netty.channel.socket.nio.NioWorkerPool.<init>(NioWorkerPool.java:33) ~[netty-3.8.0.Final.jar:na]
at org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory.<init>(NioClientSocketChannelFactory.java:151) ~[netty-3.8.0.Final.jar:na]
at org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory.<init>(NioClientSocketChannelFactory.java:116) ~[netty-3.8.0.Final.jar:na]
at com.datastax.driver.core.Connection$Factory.<init>(Connection.java:532) ~[cassandra-driver-core-2.1.5.jar:na]
at com.datastax.driver.core.Cluster$Manager.<init>(Cluster.java:1201) ~[cassandra-driver-core-2.1.5.jar:na]
at com.datastax.driver.core.Cluster$Manager.<init>(Cluster.java:1144) ~[cassandra-driver-core-2.1.5.jar:na]
at com.datastax.driver.core.Cluster.<init>(Cluster.java:121) ~[cassandra-driver-core-2.1.5.jar:na]
at com.datastax.driver.core.Cluster.<init>(Cluster.java:108) ~[cassandra-driver-core-2.1.5.jar:na]
at com.datastax.driver.core.Cluster.buildFrom(Cluster.java:177) ~[cassandra-driver-core-2.1.5.jar:na]
at com.datastax.driver.core.Cluster$Builder.build(Cluster.java:1109) ~[cassandra-driver-core-2.1.5.jar:na]
...
Caused by: java.io.IOException: Too many open files
at sun.nio.ch.IOUtil.makePipe(Native Method) ~[na:1.7.0_55]
at sun.nio.ch.EPollSelectorImpl.<init>(EPollSelectorImpl.java:65) ~[na:1.7.0_55]
at sun.nio.ch.EPollSelectorProvider.openSelector(EPollSelectorProvider.java:36) ~[na:1.7.0_55]
at java.nio.channels.Selector.open(Selector.java:227) ~[na:1.7.0_55]
at org.jboss.netty.channel.socket.nio.SelectorUtil.open(SelectorUtil.java:63) ~[netty-3.8.0.Final.jar:na]
at org.jboss.netty.channel.socket.nio.AbstractNioSelector.openSelector(AbstractNioSelector.java:341) ~[netty-3.8.0.Final.jar:na]
我是 运行 通过在 spark 作业服务器中创建单个上下文来完成作业。
我的代码
val dateQuery = "SELECT st_date FROM %s limit 1"
val queryString = dateQuery.format(tableName)
val cluster = Cluster.builder().addContactPoints(cassandraHosts: _*)
.withCredentials(username, password).build()
val session = cluster.connect(keyspace)
val queryResult = Try(session.execute(queryString).map(x => x.getDate("st_date")).head)
cluster.close()
问题
我在代码中做错了什么吗?
如何解决这个问题?
我应该为整个应用程序创建一个单例集群对象并共享它吗?
我应该使用 sc.cassandraTable
方法而不是直接使用 java 驱动程序吗?
我不熟悉 spark jobserver,但代码片段看起来不正确。
首先您没有关闭会话,这应该在关闭集群之前完成。
其次,您应该为每个查询重新使用会话,而不是为每个单独的查询打开和关闭它。
所以是的,集群和会话应该创建为单例并重新使用,并且您通常只会在应用程序退出时关闭它们。
我正在对使用 spark cassandra 连接器和 cassandra 驱动程序的 spark 应用程序进行压力测试。 在我的应用程序中,我使用 cassandra 驱动程序 select 来自 C* table 的最新值。 只要通过 spark-job 服务器一个一个地提交 spark 作业,这就可以正常工作。 但是如果多个作业提交(请求数 = 80)同时发生,那么我会得到如下异常。
org.jboss.netty.channel.ChannelException: Failed to create a selector.
at org.jboss.netty.channel.socket.nio.AbstractNioSelector.openSelector(AbstractNioSelector.java:343) ~[netty-3.8.0.Final.jar:na]
at org.jboss.netty.channel.socket.nio.AbstractNioSelector.<init>(AbstractNioSelector.java:100) ~[netty-3.8.0.Final.jar:na]
at org.jboss.netty.channel.socket.nio.AbstractNioWorker.<init>(AbstractNioWorker.java:52) ~[netty-3.8.0.Final.jar:na]
at org.jboss.netty.channel.socket.nio.NioWorker.<init>(NioWorker.java:45) ~[netty-3.8.0.Final.jar:na]
at org.jboss.netty.channel.socket.nio.NioWorkerPool.createWorker(NioWorkerPool.java:45) ~[netty-3.8.0.Final.jar:na]
at org.jboss.netty.channel.socket.nio.NioWorkerPool.createWorker(NioWorkerPool.java:28) ~[netty-3.8.0.Final.jar:na]
at org.jboss.netty.channel.socket.nio.AbstractNioWorkerPool.newWorker(AbstractNioWorkerPool.java:143) ~[netty-3.8.0.Final.jar:na]
at org.jboss.netty.channel.socket.nio.AbstractNioWorkerPool.init(AbstractNioWorkerPool.java:81) ~[netty-3.8.0.Final.jar:na]
at org.jboss.netty.channel.socket.nio.NioWorkerPool.<init>(NioWorkerPool.java:39) ~[netty-3.8.0.Final.jar:na]
at org.jboss.netty.channel.socket.nio.NioWorkerPool.<init>(NioWorkerPool.java:33) ~[netty-3.8.0.Final.jar:na]
at org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory.<init>(NioClientSocketChannelFactory.java:151) ~[netty-3.8.0.Final.jar:na]
at org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory.<init>(NioClientSocketChannelFactory.java:116) ~[netty-3.8.0.Final.jar:na]
at com.datastax.driver.core.Connection$Factory.<init>(Connection.java:532) ~[cassandra-driver-core-2.1.5.jar:na]
at com.datastax.driver.core.Cluster$Manager.<init>(Cluster.java:1201) ~[cassandra-driver-core-2.1.5.jar:na]
at com.datastax.driver.core.Cluster$Manager.<init>(Cluster.java:1144) ~[cassandra-driver-core-2.1.5.jar:na]
at com.datastax.driver.core.Cluster.<init>(Cluster.java:121) ~[cassandra-driver-core-2.1.5.jar:na]
at com.datastax.driver.core.Cluster.<init>(Cluster.java:108) ~[cassandra-driver-core-2.1.5.jar:na]
at com.datastax.driver.core.Cluster.buildFrom(Cluster.java:177) ~[cassandra-driver-core-2.1.5.jar:na]
at com.datastax.driver.core.Cluster$Builder.build(Cluster.java:1109) ~[cassandra-driver-core-2.1.5.jar:na]
...
Caused by: java.io.IOException: Too many open files
at sun.nio.ch.IOUtil.makePipe(Native Method) ~[na:1.7.0_55]
at sun.nio.ch.EPollSelectorImpl.<init>(EPollSelectorImpl.java:65) ~[na:1.7.0_55]
at sun.nio.ch.EPollSelectorProvider.openSelector(EPollSelectorProvider.java:36) ~[na:1.7.0_55]
at java.nio.channels.Selector.open(Selector.java:227) ~[na:1.7.0_55]
at org.jboss.netty.channel.socket.nio.SelectorUtil.open(SelectorUtil.java:63) ~[netty-3.8.0.Final.jar:na]
at org.jboss.netty.channel.socket.nio.AbstractNioSelector.openSelector(AbstractNioSelector.java:341) ~[netty-3.8.0.Final.jar:na]
我是 运行 通过在 spark 作业服务器中创建单个上下文来完成作业。
我的代码
val dateQuery = "SELECT st_date FROM %s limit 1"
val queryString = dateQuery.format(tableName)
val cluster = Cluster.builder().addContactPoints(cassandraHosts: _*)
.withCredentials(username, password).build()
val session = cluster.connect(keyspace)
val queryResult = Try(session.execute(queryString).map(x => x.getDate("st_date")).head)
cluster.close()
问题
我在代码中做错了什么吗?
如何解决这个问题?
我应该为整个应用程序创建一个单例集群对象并共享它吗?
我应该使用 sc.cassandraTable
方法而不是直接使用 java 驱动程序吗?
我不熟悉 spark jobserver,但代码片段看起来不正确。
首先您没有关闭会话,这应该在关闭集群之前完成。
其次,您应该为每个查询重新使用会话,而不是为每个单独的查询打开和关闭它。
所以是的,集群和会话应该创建为单例并重新使用,并且您通常只会在应用程序退出时关闭它们。