JdbcIO.read 未在 Apache Beam 中返回结果

JdbcIO.read is not returning results in apache beam

我正在尝试在 apache beam 中使用 jdbIO.read 读取一些数据,如果我有如下代码,它工作正常。

Pipeline p = createPipeline(options);
p.apply(JdbcIO.<TestRow>read()
                   .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(dataSource))
                        .withQuery("query  ")
                        .withCoder(SerializableCoder.of(TestRow.class))
                        .withRowMapper(new JdbcIO.RowMapper<TestRow>() {
                            @Override
                            public TestRow mapRow(ResultSet resultSet) throws Exception {
                                TestRow testRow = new TestRow();
                                //setters

                                return testRow;
                            }
                        }))
                .apply(MapElements.via(new SimpleFunction<TestRow, String>() {
                    @Override
                    public String apply(TestRow input) {
                        
                        return input.toString();
                    }
                }));

但没有得到任何结果,当我以删除匿名函数的方式重构它并将该调用放在单独的 class 中并扩展 DoFn class 时。行映射程序块根本没有执行。

PCollection<String> t = p
                 .apply(Create.<Long>of(1L))
                 .apply("Read Data", ParDo.of(readInput))

public abstract class ReadInput<S, T> extends DoFn<Long, TestRow> {

     @DoFn.ProcessElement
    public void processElement(@Element Long seq, final OutputReceiver<TestRow> receiver) {
        getInput(receiver);

public class ReadInputOtc extends ReadInput<Long, TestRow>
@Override
    protected void getInput(OutputReceiver<TestRow> receiver) {
        JdbcIO.<TestRow>read()
                .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(this.dataSource))
                .withCoder(SerializableCoder.of(TestRow.class))
                .withQuery("query ")
                .withRowMapper(new JdbcIO.RowMapper<TestRow>() {
                    public TestRow mapRow(ResultSet resultSet) throws Exception {
                        TestRow testRow = new TestRow();
                       //setters
                        while (resultSet.next()) {
                            System.out.println(resultSet.getString("id"));
                        }
                        receiver.output(testRow);
                        return testRow;

                    }

                });
    }

感谢您的帮助

JdbcIO.<TestRow>read()只是创建一个读数PTransform,它实际上并没有做任何读数。要进行读取,它必须应用于生成 PCollection 记录的管道对象(如您在第一个示例中所具有的那样)。 PTransforms 并不意味着在 DoFn 中使用,DoFns 作用于单个元素,而不是元素的 PCollections。

如果您尝试删除匿名 类,您可以按如下方式编写代码

[public static] class MuRowMapper extends JdbcIO.RowMapper<TestRow> {
    @Override
    public TestRow mapRow(ResultSet resultSet) throws Exception {
        TestRow testRow = new TestRow();
        ...
        return testRow;
    }
}

[public static] class MyDoFn extends DoFn<MyRow, String> {
    @DoFn.ProcessElement
    public void processElement(@Element TestRow testRow, 
                               final OutputReceiver<String> receiver) {
        return receiver.output(testRow.toString());
    }
}

Pipeline p = createPipeline(options);
p
    .apply(JdbcIO.<TestRow>read()
                 .withDataSourceConfiguration(
                     JdbcIO.DataSourceConfiguration.create(dataSource))
                 .withQuery("query  ")
                 .withCoder(SerializableCoder.of(TestRow.class))
                 .withRowMapper(new MyRowMapper()))
    .apply(ParDo.of(new MyDoFn()));