JdbcIO.read 未在 Apache Beam 中返回结果
JdbcIO.read is not returning results in apache beam
我正在尝试在 apache beam 中使用 jdbIO.read 读取一些数据,如果我有如下代码,它工作正常。
Pipeline p = createPipeline(options);
p.apply(JdbcIO.<TestRow>read()
.withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(dataSource))
.withQuery("query ")
.withCoder(SerializableCoder.of(TestRow.class))
.withRowMapper(new JdbcIO.RowMapper<TestRow>() {
@Override
public TestRow mapRow(ResultSet resultSet) throws Exception {
TestRow testRow = new TestRow();
//setters
return testRow;
}
}))
.apply(MapElements.via(new SimpleFunction<TestRow, String>() {
@Override
public String apply(TestRow input) {
return input.toString();
}
}));
但没有得到任何结果,当我以删除匿名函数的方式重构它并将该调用放在单独的 class 中并扩展 DoFn class 时。行映射程序块根本没有执行。
PCollection<String> t = p
.apply(Create.<Long>of(1L))
.apply("Read Data", ParDo.of(readInput))
public abstract class ReadInput<S, T> extends DoFn<Long, TestRow> {
@DoFn.ProcessElement
public void processElement(@Element Long seq, final OutputReceiver<TestRow> receiver) {
getInput(receiver);
public class ReadInputOtc extends ReadInput<Long, TestRow>
@Override
protected void getInput(OutputReceiver<TestRow> receiver) {
JdbcIO.<TestRow>read()
.withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(this.dataSource))
.withCoder(SerializableCoder.of(TestRow.class))
.withQuery("query ")
.withRowMapper(new JdbcIO.RowMapper<TestRow>() {
public TestRow mapRow(ResultSet resultSet) throws Exception {
TestRow testRow = new TestRow();
//setters
while (resultSet.next()) {
System.out.println(resultSet.getString("id"));
}
receiver.output(testRow);
return testRow;
}
});
}
感谢您的帮助
JdbcIO.<TestRow>read()
只是创建一个读数PTransform,它实际上并没有做任何读数。要进行读取,它必须应用于生成 PCollection 记录的管道对象(如您在第一个示例中所具有的那样)。 PTransforms 并不意味着在 DoFn 中使用,DoFns 作用于单个元素,而不是元素的 PCollections。
如果您尝试删除匿名 类,您可以按如下方式编写代码
[public static] class MuRowMapper extends JdbcIO.RowMapper<TestRow> {
@Override
public TestRow mapRow(ResultSet resultSet) throws Exception {
TestRow testRow = new TestRow();
...
return testRow;
}
}
[public static] class MyDoFn extends DoFn<MyRow, String> {
@DoFn.ProcessElement
public void processElement(@Element TestRow testRow,
final OutputReceiver<String> receiver) {
return receiver.output(testRow.toString());
}
}
Pipeline p = createPipeline(options);
p
.apply(JdbcIO.<TestRow>read()
.withDataSourceConfiguration(
JdbcIO.DataSourceConfiguration.create(dataSource))
.withQuery("query ")
.withCoder(SerializableCoder.of(TestRow.class))
.withRowMapper(new MyRowMapper()))
.apply(ParDo.of(new MyDoFn()));
我正在尝试在 apache beam 中使用 jdbIO.read 读取一些数据,如果我有如下代码,它工作正常。
Pipeline p = createPipeline(options);
p.apply(JdbcIO.<TestRow>read()
.withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(dataSource))
.withQuery("query ")
.withCoder(SerializableCoder.of(TestRow.class))
.withRowMapper(new JdbcIO.RowMapper<TestRow>() {
@Override
public TestRow mapRow(ResultSet resultSet) throws Exception {
TestRow testRow = new TestRow();
//setters
return testRow;
}
}))
.apply(MapElements.via(new SimpleFunction<TestRow, String>() {
@Override
public String apply(TestRow input) {
return input.toString();
}
}));
但没有得到任何结果,当我以删除匿名函数的方式重构它并将该调用放在单独的 class 中并扩展 DoFn class 时。行映射程序块根本没有执行。
PCollection<String> t = p
.apply(Create.<Long>of(1L))
.apply("Read Data", ParDo.of(readInput))
public abstract class ReadInput<S, T> extends DoFn<Long, TestRow> {
@DoFn.ProcessElement
public void processElement(@Element Long seq, final OutputReceiver<TestRow> receiver) {
getInput(receiver);
public class ReadInputOtc extends ReadInput<Long, TestRow>
@Override
protected void getInput(OutputReceiver<TestRow> receiver) {
JdbcIO.<TestRow>read()
.withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(this.dataSource))
.withCoder(SerializableCoder.of(TestRow.class))
.withQuery("query ")
.withRowMapper(new JdbcIO.RowMapper<TestRow>() {
public TestRow mapRow(ResultSet resultSet) throws Exception {
TestRow testRow = new TestRow();
//setters
while (resultSet.next()) {
System.out.println(resultSet.getString("id"));
}
receiver.output(testRow);
return testRow;
}
});
}
感谢您的帮助
JdbcIO.<TestRow>read()
只是创建一个读数PTransform,它实际上并没有做任何读数。要进行读取,它必须应用于生成 PCollection 记录的管道对象(如您在第一个示例中所具有的那样)。 PTransforms 并不意味着在 DoFn 中使用,DoFns 作用于单个元素,而不是元素的 PCollections。
如果您尝试删除匿名 类,您可以按如下方式编写代码
[public static] class MuRowMapper extends JdbcIO.RowMapper<TestRow> {
@Override
public TestRow mapRow(ResultSet resultSet) throws Exception {
TestRow testRow = new TestRow();
...
return testRow;
}
}
[public static] class MyDoFn extends DoFn<MyRow, String> {
@DoFn.ProcessElement
public void processElement(@Element TestRow testRow,
final OutputReceiver<String> receiver) {
return receiver.output(testRow.toString());
}
}
Pipeline p = createPipeline(options);
p
.apply(JdbcIO.<TestRow>read()
.withDataSourceConfiguration(
JdbcIO.DataSourceConfiguration.create(dataSource))
.withQuery("query ")
.withCoder(SerializableCoder.of(TestRow.class))
.withRowMapper(new MyRowMapper()))
.apply(ParDo.of(new MyDoFn()));