Apache Storm - 从 SPOUT 访问数据库 - 连接池
Apache Storm - Accessing database from SPOUT - connection pooling
有一个 spout,每次 tick 都会进入 Postgre 数据库并读取额外的一行。 spout 代码如下所示:
class RawDataLevelSpout extends BaseRichSpout implements Serializable {
private int counter;
SpoutOutputCollector collector;
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("col1", "col2"));
}
@Override
public void open(Map map, TopologyContext context, SpoutOutputCollector spoutOutputCollector) {
collector = spoutOutputCollector;
}
private Connection initializeDatabaseConnection() {
try {
Class.forName("org.postgresql.Driver");
Connection connection = null;
connection = DriverManager.getConnection(
DATABASE_URI,"root", "root");
return connection;
} catch (ClassNotFoundException e) {
e.printStackTrace();
} catch (SQLException e) {
e.printStackTrace();
}
return null;
}
@Override
public void close() {
}
@Override
public void nextTuple() {
List<String> values = new ArrayList<>();
PreparedStatement statement = null;
try {
Connection connection = initializeDatabaseConnection();
statement = connection.prepareStatement("SELECT * FROM table1 ORDER BY col1 LIMIT 1 OFFSET ?");
statement.setInt(1, counter++);
ResultSet resultSet = statement.executeQuery();
resultSet.next();
ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
int totalColumns = resultSetMetaData.getColumnCount();
for (int i = 1; i <= totalColumns; i++) {
String value = resultSet.getString(i);
values.add(value);
}
connection.close();
} catch (SQLException e) {
e.printStackTrace();
}
collector.emit(new Values(values.stream().toArray(String[]::new)));
}
}
在 apache storm 的 Spouts 中处理连接池的标准方法是什么?此外,是否有可能以某种方式在集群拓扑中跨多个 运行 实例同步 coutner 变量?
关于连接池,如果需要,您可以通过静态变量将连接池化,但是由于不能保证所有 spout 实例 运行ning 在同一个 JVM 中,我认为没有点.
不,无法同步计数器。 spout 实例可能 运行ning 在不同的 JVM 上,并且您不希望它们在 spout 同意计数器值时全部阻塞。不过,我认为您的 spout 实施没有意义。如果您只想一次读取一行,为什么不只是 运行 单个 spout 实例而不是尝试同步多个 spout?
您似乎试图将关系数据库用作队列系统,这可能不合适。考虑例如取而代之的是卡夫卡。我认为您应该能够使用 https://www.confluent.io/product/connectors/ or http://debezium.io/ 中的任何一个将数据从 Postgres 流式传输到 Kafka。
有一个 spout,每次 tick 都会进入 Postgre 数据库并读取额外的一行。 spout 代码如下所示:
class RawDataLevelSpout extends BaseRichSpout implements Serializable {
private int counter;
SpoutOutputCollector collector;
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("col1", "col2"));
}
@Override
public void open(Map map, TopologyContext context, SpoutOutputCollector spoutOutputCollector) {
collector = spoutOutputCollector;
}
private Connection initializeDatabaseConnection() {
try {
Class.forName("org.postgresql.Driver");
Connection connection = null;
connection = DriverManager.getConnection(
DATABASE_URI,"root", "root");
return connection;
} catch (ClassNotFoundException e) {
e.printStackTrace();
} catch (SQLException e) {
e.printStackTrace();
}
return null;
}
@Override
public void close() {
}
@Override
public void nextTuple() {
List<String> values = new ArrayList<>();
PreparedStatement statement = null;
try {
Connection connection = initializeDatabaseConnection();
statement = connection.prepareStatement("SELECT * FROM table1 ORDER BY col1 LIMIT 1 OFFSET ?");
statement.setInt(1, counter++);
ResultSet resultSet = statement.executeQuery();
resultSet.next();
ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
int totalColumns = resultSetMetaData.getColumnCount();
for (int i = 1; i <= totalColumns; i++) {
String value = resultSet.getString(i);
values.add(value);
}
connection.close();
} catch (SQLException e) {
e.printStackTrace();
}
collector.emit(new Values(values.stream().toArray(String[]::new)));
}
}
在 apache storm 的 Spouts 中处理连接池的标准方法是什么?此外,是否有可能以某种方式在集群拓扑中跨多个 运行 实例同步 coutner 变量?
关于连接池,如果需要,您可以通过静态变量将连接池化,但是由于不能保证所有 spout 实例 运行ning 在同一个 JVM 中,我认为没有点.
不,无法同步计数器。 spout 实例可能 运行ning 在不同的 JVM 上,并且您不希望它们在 spout 同意计数器值时全部阻塞。不过,我认为您的 spout 实施没有意义。如果您只想一次读取一行,为什么不只是 运行 单个 spout 实例而不是尝试同步多个 spout?
您似乎试图将关系数据库用作队列系统,这可能不合适。考虑例如取而代之的是卡夫卡。我认为您应该能够使用 https://www.confluent.io/product/connectors/ or http://debezium.io/ 中的任何一个将数据从 Postgres 流式传输到 Kafka。