如何在 Cloud Dataflow 中保持与外部数据库的连接
How to keep a connection to external database in Cloud Dataflow
我有一个未绑定的数据流管道,它从 Pub/Sub 读取,应用 ParDo 并写入 Cassandra。它仅应用 ParDo 转换,因此即使源未绑定,我也使用默认全局 window 和默认触发。
在这样的管道中,我应该如何保持与 Cassandra 的连接?
目前我将它保存在 startBundle
中,如下所示:
private class CassandraWriter <T> extends DoFn<T, Void> {
private transient Cluster cluster;
private transient Session session;
private transient MappingManager mappingManager;
@Override
public void startBundle(Context c) {
this.cluster = Cluster.builder()
.addContactPoints(hosts)
.withPort(port)
.withoutMetrics()
.withoutJMXReporting()
.build();
this.session = cluster.connect(keyspace);
this.mappingManager = new MappingManager(session);
}
@Override
public void processElement(ProcessContext c) throws IOException {
T element = c.element();
Mapper<T> mapper = (Mapper<T>) mappingManager.mapper(element.getClass());
mapper.save(element);
}
@Override
public void finishBundle(Context c) throws IOException {
session.close();
cluster.close();
}
}
但是,这种方式会为每个元素创建一个新连接。
另一种选择是将其作为辅助输入传递,如 https://github.com/benjumanji/cassandra-dataflow:
public PDone apply(PCollection<T> input) {
Pipeline p = input.getPipeline();
CassandraWriteOperation<T> op = new CassandraWriteOperation<T>(this);
Coder<CassandraWriteOperation<T>> coder =
(Coder<CassandraWriteOperation<T>>)SerializableCoder.of(op.getClass());
PCollection<CassandraWriteOperation<T>> opSingleton =
p.apply(Create.<CassandraWriteOperation<T>>of(op)).setCoder(coder);
final PCollectionView<CassandraWriteOperation<T>> opSingletonView =
opSingleton.apply(View.<CassandraWriteOperation<T>>asSingleton());
PCollection<Void> results = input.apply(ParDo.of(new DoFn<T, Void>() {
@Override
public void processElement(ProcessContext c) throws Exception {
// use the side input here
}
}).withSideInputs(opSingletonView));
PCollectionView<Iterable<Void>> voidView = results.apply(View.<Void>asIterable());
opSingleton.apply(ParDo.of(new DoFn<CassandraWriteOperation<T>, Void>() {
private static final long serialVersionUID = 0;
@Override
public void processElement(ProcessContext c) {
CassandraWriteOperation<T> op = c.element();
op.finalize();
}
}).withSideInputs(voidView));
return new PDone();
}
但是这种方式我必须使用 windowing 因为 PCollectionView<Iterable<Void>> voidView = results.apply(View.<Void>asIterable());
应用分组依据。
一般来说,从无界 PCollection 写入外部数据库的 PTransform 应该如何保持与数据库的连接?
您正确地观察到 streaming/unbounded 案例中的典型捆绑包大小比 batch/bounded 案例中的要小。实际的包大小取决于许多参数,有时包可能包含单个元素。
解决此问题的一种方法是为每个工作人员使用一个连接池,存储在 DoFn
的静态状态中。您应该能够在第一次调用 startBundle
期间初始化它,并跨包使用它。或者,您可以按需创建连接并在不再需要时将其释放到池中以供重用。
您应该确保静态静态是线程安全的,并且您没有对 Dataflow 如何管理捆绑包做出任何假设。
正如 Davor Bonaci 所建议的那样,使用静态变量解决了这个问题。
public class CassandraWriter<T> extends DoFn<T, Void> {
private static final Logger log = LoggerFactory.getLogger(CassandraWriter.class);
// Prevent multiple threads from creating multiple cluster connection in parallel.
private static transient final Object lock = new Object();
private static transient Cluster cluster;
private static transient Session session;
private static transient MappingManager mappingManager;
private final String[] hosts;
private final int port;
private final String keyspace;
public CassandraWriter(String[] hosts, int port, String keyspace) {
this.hosts = hosts;
this.port = port;
this.keyspace = keyspace;
}
@Override
public void startBundle(Context c) {
synchronized (lock) {
if (cluster == null) {
cluster = Cluster.builder()
.addContactPoints(hosts)
.withPort(port)
.withoutMetrics()
.withoutJMXReporting()
.build();
session = cluster.connect(keyspace);
mappingManager = new MappingManager(session);
}
}
}
@Override
public void processElement(ProcessContext c) throws IOException {
T element = c.element();
Mapper<T> mapper = (Mapper<T>) mappingManager.mapper(element.getClass());
mapper.save(element);
}
}
我有一个未绑定的数据流管道,它从 Pub/Sub 读取,应用 ParDo 并写入 Cassandra。它仅应用 ParDo 转换,因此即使源未绑定,我也使用默认全局 window 和默认触发。
在这样的管道中,我应该如何保持与 Cassandra 的连接?
目前我将它保存在 startBundle
中,如下所示:
private class CassandraWriter <T> extends DoFn<T, Void> {
private transient Cluster cluster;
private transient Session session;
private transient MappingManager mappingManager;
@Override
public void startBundle(Context c) {
this.cluster = Cluster.builder()
.addContactPoints(hosts)
.withPort(port)
.withoutMetrics()
.withoutJMXReporting()
.build();
this.session = cluster.connect(keyspace);
this.mappingManager = new MappingManager(session);
}
@Override
public void processElement(ProcessContext c) throws IOException {
T element = c.element();
Mapper<T> mapper = (Mapper<T>) mappingManager.mapper(element.getClass());
mapper.save(element);
}
@Override
public void finishBundle(Context c) throws IOException {
session.close();
cluster.close();
}
}
但是,这种方式会为每个元素创建一个新连接。
另一种选择是将其作为辅助输入传递,如 https://github.com/benjumanji/cassandra-dataflow:
public PDone apply(PCollection<T> input) {
Pipeline p = input.getPipeline();
CassandraWriteOperation<T> op = new CassandraWriteOperation<T>(this);
Coder<CassandraWriteOperation<T>> coder =
(Coder<CassandraWriteOperation<T>>)SerializableCoder.of(op.getClass());
PCollection<CassandraWriteOperation<T>> opSingleton =
p.apply(Create.<CassandraWriteOperation<T>>of(op)).setCoder(coder);
final PCollectionView<CassandraWriteOperation<T>> opSingletonView =
opSingleton.apply(View.<CassandraWriteOperation<T>>asSingleton());
PCollection<Void> results = input.apply(ParDo.of(new DoFn<T, Void>() {
@Override
public void processElement(ProcessContext c) throws Exception {
// use the side input here
}
}).withSideInputs(opSingletonView));
PCollectionView<Iterable<Void>> voidView = results.apply(View.<Void>asIterable());
opSingleton.apply(ParDo.of(new DoFn<CassandraWriteOperation<T>, Void>() {
private static final long serialVersionUID = 0;
@Override
public void processElement(ProcessContext c) {
CassandraWriteOperation<T> op = c.element();
op.finalize();
}
}).withSideInputs(voidView));
return new PDone();
}
但是这种方式我必须使用 windowing 因为 PCollectionView<Iterable<Void>> voidView = results.apply(View.<Void>asIterable());
应用分组依据。
一般来说,从无界 PCollection 写入外部数据库的 PTransform 应该如何保持与数据库的连接?
您正确地观察到 streaming/unbounded 案例中的典型捆绑包大小比 batch/bounded 案例中的要小。实际的包大小取决于许多参数,有时包可能包含单个元素。
解决此问题的一种方法是为每个工作人员使用一个连接池,存储在 DoFn
的静态状态中。您应该能够在第一次调用 startBundle
期间初始化它,并跨包使用它。或者,您可以按需创建连接并在不再需要时将其释放到池中以供重用。
您应该确保静态静态是线程安全的,并且您没有对 Dataflow 如何管理捆绑包做出任何假设。
正如 Davor Bonaci 所建议的那样,使用静态变量解决了这个问题。
public class CassandraWriter<T> extends DoFn<T, Void> {
private static final Logger log = LoggerFactory.getLogger(CassandraWriter.class);
// Prevent multiple threads from creating multiple cluster connection in parallel.
private static transient final Object lock = new Object();
private static transient Cluster cluster;
private static transient Session session;
private static transient MappingManager mappingManager;
private final String[] hosts;
private final int port;
private final String keyspace;
public CassandraWriter(String[] hosts, int port, String keyspace) {
this.hosts = hosts;
this.port = port;
this.keyspace = keyspace;
}
@Override
public void startBundle(Context c) {
synchronized (lock) {
if (cluster == null) {
cluster = Cluster.builder()
.addContactPoints(hosts)
.withPort(port)
.withoutMetrics()
.withoutJMXReporting()
.build();
session = cluster.connect(keyspace);
mappingManager = new MappingManager(session);
}
}
}
@Override
public void processElement(ProcessContext c) throws IOException {
T element = c.element();
Mapper<T> mapper = (Mapper<T>) mappingManager.mapper(element.getClass());
mapper.save(element);
}
}