火花流和连接池实现
spark-streaming and connection pool implementation
位于 https://spark.apache.org/docs/latest/streaming-programming-guide.html#output-operations-on-dstreams 的 spark-streaming 网站提到了以下代码:
dstream.foreachRDD { rdd =>
rdd.foreachPartition { partitionOfRecords =>
// ConnectionPool is a static, lazily initialized pool of connections
val connection = ConnectionPool.getConnection()
partitionOfRecords.foreach(record => connection.send(record))
ConnectionPool.returnConnection(connection) // return to the pool for future reuse
}
}
我尝试使用 org.apache.commons.pool2 来实现这个,但是 运行 应用程序失败并出现预期的 java.io.NotSerializableException:
15/05/26 08:06:21 ERROR OneForOneStrategy: org.apache.commons.pool2.impl.GenericObjectPool
java.io.NotSerializableException: org.apache.commons.pool2.impl.GenericObjectPool
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
...
我想知道实现可序列化的连接池有多现实。有人成功过吗?
谢谢。
以下回答错误!
我将答案留在这里供参考,但由于以下原因,答案是错误的。 socketPool
被声明为 lazy val
,因此它将在每个第一个访问请求中被实例化。由于 SocketPool 案例 class 不是 Serializable
,这意味着它将在每个分区内被实例化。这使得连接池变得无用,因为我们希望跨分区和 RDD 保持连接。无论是作为伴随对象还是作为 case class 实现都没有区别。底线是:连接池必须是 Serializable
,而 apache 公共池不是。
import java.io.PrintStream
import java.net.Socket
import org.apache.commons.pool2.{PooledObject, BasePooledObjectFactory}
import org.apache.commons.pool2.impl.{DefaultPooledObject, GenericObjectPool}
import org.apache.spark.streaming.dstream.DStream
/**
* Publish a Spark stream to a socket.
*/
class PooledSocketStreamPublisher[T](host: String, port: Int)
extends Serializable {
lazy val socketPool = SocketPool(host, port)
/**
* Publish the stream to a socket.
*/
def publishStream(stream: DStream[T], callback: (T) => String) = {
stream.foreachRDD { rdd =>
rdd.foreachPartition { partition =>
val socket = socketPool.getSocket
val out = new PrintStream(socket.getOutputStream)
partition.foreach { event =>
val text : String = callback(event)
out.println(text)
out.flush()
}
out.close()
socketPool.returnSocket(socket)
}
}
}
}
class SocketFactory(host: String, port: Int) extends BasePooledObjectFactory[Socket] {
def create(): Socket = {
new Socket(host, port)
}
def wrap(socket: Socket): PooledObject[Socket] = {
new DefaultPooledObject[Socket](socket)
}
}
case class SocketPool(host: String, port: Int) {
val socketPool = new GenericObjectPool[Socket](new SocketFactory(host, port))
def getSocket: Socket = {
socketPool.borrowObject
}
def returnSocket(socket: Socket) = {
socketPool.returnObject(socket)
}
}
您可以按如下方式调用:
val socketStreamPublisher = new PooledSocketStreamPublisher[MyEvent](host = "10.10.30.101", port = 29009)
socketStreamPublisher.publishStream(myEventStream, (e: MyEvent) => Json.stringify(Json.toJson(e)))
要解决此 "local resource" 问题,需要一个单例对象 - 即保证在 JVM 中实例化一次且仅实例化一次的对象。幸运的是,Scala object
提供了开箱即用的功能。
第二件要考虑的事情是,这个单例将为托管它的同一 JVM 上的所有任务 运行 提供服务,因此,它 必须 小心并发和资源管理。
让我们试着描绘一下(*)这样的服务:
class ManagedSocket(private val pool: ObjectPool, val socket:Socket) {
def release() = pool.returnObject(socket)
}
// singleton object
object SocketPool {
var hostPortPool:Map[(String, Int),ObjectPool] = Map()
sys.addShutdownHook{
hostPortPool.values.foreach{ // terminate each pool }
}
// factory method
def apply(host:String, port:String): ManagedSocket = {
val pool = hostPortPool.getOrElse{(host,port), {
val p = ??? // create new pool for (host, port)
hostPortPool += (host,port) -> p
p
}
new ManagedSocket(pool, pool.borrowObject)
}
}
那么用法变为:
val host = ???
val port = ???
stream.foreachRDD { rdd =>
rdd.foreachPartition { partition =>
val mSocket = SocketPool(host, port)
partition.foreach{elem =>
val os = mSocket.socket.getOutputStream()
// do stuff with os + elem
}
mSocket.release()
}
}
我假设问题中使用的 GenericObjectPool
正在处理并发性。否则,需要通过某种形式的同步来保护对每个 pool
实例的访问。
(*) 提供的代码用于说明如何设计此类对象的想法 - 需要额外的努力才能转换为工作版本。
位于 https://spark.apache.org/docs/latest/streaming-programming-guide.html#output-operations-on-dstreams 的 spark-streaming 网站提到了以下代码:
dstream.foreachRDD { rdd =>
rdd.foreachPartition { partitionOfRecords =>
// ConnectionPool is a static, lazily initialized pool of connections
val connection = ConnectionPool.getConnection()
partitionOfRecords.foreach(record => connection.send(record))
ConnectionPool.returnConnection(connection) // return to the pool for future reuse
}
}
我尝试使用 org.apache.commons.pool2 来实现这个,但是 运行 应用程序失败并出现预期的 java.io.NotSerializableException:
15/05/26 08:06:21 ERROR OneForOneStrategy: org.apache.commons.pool2.impl.GenericObjectPool
java.io.NotSerializableException: org.apache.commons.pool2.impl.GenericObjectPool
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
...
我想知道实现可序列化的连接池有多现实。有人成功过吗?
谢谢。
以下回答错误!
我将答案留在这里供参考,但由于以下原因,答案是错误的。 socketPool
被声明为 lazy val
,因此它将在每个第一个访问请求中被实例化。由于 SocketPool 案例 class 不是 Serializable
,这意味着它将在每个分区内被实例化。这使得连接池变得无用,因为我们希望跨分区和 RDD 保持连接。无论是作为伴随对象还是作为 case class 实现都没有区别。底线是:连接池必须是 Serializable
,而 apache 公共池不是。
import java.io.PrintStream
import java.net.Socket
import org.apache.commons.pool2.{PooledObject, BasePooledObjectFactory}
import org.apache.commons.pool2.impl.{DefaultPooledObject, GenericObjectPool}
import org.apache.spark.streaming.dstream.DStream
/**
* Publish a Spark stream to a socket.
*/
class PooledSocketStreamPublisher[T](host: String, port: Int)
extends Serializable {
lazy val socketPool = SocketPool(host, port)
/**
* Publish the stream to a socket.
*/
def publishStream(stream: DStream[T], callback: (T) => String) = {
stream.foreachRDD { rdd =>
rdd.foreachPartition { partition =>
val socket = socketPool.getSocket
val out = new PrintStream(socket.getOutputStream)
partition.foreach { event =>
val text : String = callback(event)
out.println(text)
out.flush()
}
out.close()
socketPool.returnSocket(socket)
}
}
}
}
class SocketFactory(host: String, port: Int) extends BasePooledObjectFactory[Socket] {
def create(): Socket = {
new Socket(host, port)
}
def wrap(socket: Socket): PooledObject[Socket] = {
new DefaultPooledObject[Socket](socket)
}
}
case class SocketPool(host: String, port: Int) {
val socketPool = new GenericObjectPool[Socket](new SocketFactory(host, port))
def getSocket: Socket = {
socketPool.borrowObject
}
def returnSocket(socket: Socket) = {
socketPool.returnObject(socket)
}
}
您可以按如下方式调用:
val socketStreamPublisher = new PooledSocketStreamPublisher[MyEvent](host = "10.10.30.101", port = 29009)
socketStreamPublisher.publishStream(myEventStream, (e: MyEvent) => Json.stringify(Json.toJson(e)))
要解决此 "local resource" 问题,需要一个单例对象 - 即保证在 JVM 中实例化一次且仅实例化一次的对象。幸运的是,Scala object
提供了开箱即用的功能。
第二件要考虑的事情是,这个单例将为托管它的同一 JVM 上的所有任务 运行 提供服务,因此,它 必须 小心并发和资源管理。
让我们试着描绘一下(*)这样的服务:
class ManagedSocket(private val pool: ObjectPool, val socket:Socket) {
def release() = pool.returnObject(socket)
}
// singleton object
object SocketPool {
var hostPortPool:Map[(String, Int),ObjectPool] = Map()
sys.addShutdownHook{
hostPortPool.values.foreach{ // terminate each pool }
}
// factory method
def apply(host:String, port:String): ManagedSocket = {
val pool = hostPortPool.getOrElse{(host,port), {
val p = ??? // create new pool for (host, port)
hostPortPool += (host,port) -> p
p
}
new ManagedSocket(pool, pool.borrowObject)
}
}
那么用法变为:
val host = ???
val port = ???
stream.foreachRDD { rdd =>
rdd.foreachPartition { partition =>
val mSocket = SocketPool(host, port)
partition.foreach{elem =>
val os = mSocket.socket.getOutputStream()
// do stuff with os + elem
}
mSocket.release()
}
}
我假设问题中使用的 GenericObjectPool
正在处理并发性。否则,需要通过某种形式的同步来保护对每个 pool
实例的访问。
(*) 提供的代码用于说明如何设计此类对象的想法 - 需要额外的努力才能转换为工作版本。