在 Spark Streaming 作业中启动 JDBC 次连接
Initiate JDBC connection once in Spark Streaming job
环境
- 从 Kafka 读取 Spark Streaming 作业,微批量大小 30 秒(Durations.seconds (30))
- 具有参考状态的内存存储 (Hazelcast)。这是一个非静态状态,由 Spark Workers
实时更新
- 与 Hazelcast 连接的 Spark Workers
当前方法
- 使用foreachRDD
对数据进行操作,每一个micro-batch(RDD)都会建立一个连接。这每 30 秒发生一次 (Durations.seconds (30))。
kafkaStream.foreachRDD (new VoidFunction<JavaRDD<String>>() {
@Override
public void call(JavaRDD<String> microBatch) throws Exception {
ClientConfig clientConfig = new ClientConfig();
clientConfig.addAddress("myHost:5701"); //Define connection
HazelcastInstance client = HazelcastClient.newHazelcastClient(clientConfig);
//Do processing
}
}
The Ask:希望在每个 Spark Worker 上打开一次连接(提交作业时),而不是为每个微批次打开新连接。实现这一目标的正确方法是什么?
由于 Spark 序列化作业并将其分发到 Worker 中,覆盖反序列化方法来执行 init 任务(创建 JDBC 连接、初始化变量等)有助于在火花流中。
覆盖默认反序列化方法(Java)
@Override
private void readObject(java.io.ObjectInputStream in) throws IOException, ClassNotFoundException {
in.defaultReadObject();
//Init 1
ClientConfig clientConfig = new ClientConfig();
clientConfig.addAddress("myHost:5701"); //Define connection
HazelcastInstance client = HazelcastClient.newHazelcastClient(clientConfig);
//Init 2
kafkaProducer=new KafkaProducer<>(kafkaProducerProps);
}
或者,可以使用 static hack for init tasks until Cloudera/Databricks guys add init support inherently in Spark。
您需要的内容在这里得到了很好的解释:
https://spark.apache.org/docs/latest/streaming-programming-guide.html#design-patterns-for-using-foreachrdd
dstream.foreachRDD { rdd =>
rdd.foreachPartition { partitionOfRecords =>
// ConnectionPool is a static, lazily initialized pool of connections
val connection = ConnectionPool.getConnection()
partitionOfRecords.foreach(record => connection.send(record))
ConnectionPool.returnConnection(connection) // return to the pool for future reuse
}
}
在foreachPartition中,body是在executors中本地执行的。在那里你可以有静态客户端连接(每个工作人员都将使用自己的静态对象,例如)
希望对您有所帮助。
谢谢,
环境
- 从 Kafka 读取 Spark Streaming 作业,微批量大小 30 秒(Durations.seconds (30))
- 具有参考状态的内存存储 (Hazelcast)。这是一个非静态状态,由 Spark Workers 实时更新
- 与 Hazelcast 连接的 Spark Workers
当前方法
- 使用foreachRDD
对数据进行操作,每一个micro-batch(RDD)都会建立一个连接。这每 30 秒发生一次 (Durations.seconds (30))。
kafkaStream.foreachRDD (new VoidFunction<JavaRDD<String>>() {
@Override
public void call(JavaRDD<String> microBatch) throws Exception {
ClientConfig clientConfig = new ClientConfig();
clientConfig.addAddress("myHost:5701"); //Define connection
HazelcastInstance client = HazelcastClient.newHazelcastClient(clientConfig);
//Do processing
}
}
The Ask:希望在每个 Spark Worker 上打开一次连接(提交作业时),而不是为每个微批次打开新连接。实现这一目标的正确方法是什么?
由于 Spark 序列化作业并将其分发到 Worker 中,覆盖反序列化方法来执行 init 任务(创建 JDBC 连接、初始化变量等)有助于在火花流中。
覆盖默认反序列化方法(Java)
@Override
private void readObject(java.io.ObjectInputStream in) throws IOException, ClassNotFoundException {
in.defaultReadObject();
//Init 1
ClientConfig clientConfig = new ClientConfig();
clientConfig.addAddress("myHost:5701"); //Define connection
HazelcastInstance client = HazelcastClient.newHazelcastClient(clientConfig);
//Init 2
kafkaProducer=new KafkaProducer<>(kafkaProducerProps);
}
或者,可以使用 static hack for init tasks until Cloudera/Databricks guys add init support inherently in Spark。
您需要的内容在这里得到了很好的解释: https://spark.apache.org/docs/latest/streaming-programming-guide.html#design-patterns-for-using-foreachrdd
dstream.foreachRDD { rdd =>
rdd.foreachPartition { partitionOfRecords =>
// ConnectionPool is a static, lazily initialized pool of connections
val connection = ConnectionPool.getConnection()
partitionOfRecords.foreach(record => connection.send(record))
ConnectionPool.returnConnection(connection) // return to the pool for future reuse
}
}
在foreachPartition中,body是在executors中本地执行的。在那里你可以有静态客户端连接(每个工作人员都将使用自己的静态对象,例如)
希望对您有所帮助。
谢谢,