在 DataFlow DoFn 子 类 之间共享 BigTable 连接对象

Sharing BigTable Connection object among DataFlow DoFn sub-classes

我正在 DataFlow 中设置一个 Java 管道来读取 .csv 文件并根据文件的内容创建一堆 BigTable 行。我在 BigTable 文档中看到连接到 BigTable 是一个 'expensive' 操作,最好只执行一次并在需要它的函数之间共享连接。

但是,如果我在 main class 中将 Connection 对象声明为 public static 变量,并首先在 main 函数中连接到 BigTable,我随后会得到 NullPointerException尝试在 DoFn sub-classes' processElement() 函数的实例中引用连接作为我的 DataFlow 管道的一部分。

相反,如果我在实际DoFnclass中将Connection声明为静态变量,则操作成功。

执行此操作的最佳做​​法或最佳方法是什么?

我担心如果大规模实施第二个选项,我会浪费大量时间和资源。如果我在 DoFn class 中将变量保持为静态,是否足以确保 API 不会每次都尝试重新建立连接?

我知道有一个特殊的 BigTable I/O 调用来将 DataFlow 管道对象与 BigTable 同步,但我认为我需要自己编写一个以将一些特殊逻辑内置到 DoFn processElement()函数...

这就是 "working" 代码的样子:

class DigitizeBT extends DoFn<String, String>{
    private static Connection m_locConn;

    @Override
    public void processElement(ProcessContext c)
    {       
        try
        {
            m_locConn = BigtableConfiguration.connect("projectID", "instanceID");
            Table tbl = m_locConn.getTable(TableName.valueOf("TableName"));

            Put put = new Put(Bytes.toBytes(rowKey));

            put.addColumn(
                Bytes.toBytes("CF1"),
                Bytes.toBytes("SomeName"),
                Bytes.toBytes("SomeValue"));

            tbl.put(put);
        }
        catch (IOException e)
        {
            e.printStackTrace();
            System.exit(1);
        }
    }
}

这是更新后的代码,仅供参考:

    public void SmallKVJob()
    {
        CloudBigtableScanConfiguration config = new CloudBigtableScanConfiguration.Builder()
                .withProjectId(DEF.ID_PROJ)
                .withInstanceId(DEF.ID_INST)
                .withTableId(DEF.ID_TBL_UNITS)
                .build();

        DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
        options.setProject(DEF.ID_PROJ);
        options.setStagingLocation(DEF.ID_STG_LOC);
//      options.setNumWorkers(3);
//      options.setMaxNumWorkers(5);        
//      options.setRunner(BlockingDataflowPipelineRunner.class);
        options.setRunner(DirectPipelineRunner.class);
        Pipeline p = Pipeline.create(options);

        p.apply(TextIO.Read.from(DEF.ID_BAL))
        .apply(ParDo.of(new DoFn1()))
        .apply(ParDo.of(new DoFn2()))
        .apply(ParDo.of(new DoFn3(config)));

        m_log.info("starting to run the job");
        p.run();
        m_log.info("finished running the job");
    }
}

class DoFn1 extends DoFn<String, KV<String, Integer>>
{
    @Override
    public void processElement(ProcessContext c)
    {
        c.output(KV.of(c.element().split("\,")[0],Integer.valueOf(c.element().split("\,")[1])));
    }
}

class DoFn2 extends DoFn<KV<String, Integer>, KV<String, Integer>>
{
    @Override
    public void processElement(ProcessContext c)
    {
        int max = c.element().getValue();
        String name = c.element().getKey();
        for(int i = 0; i<max;i++)
            c.output(KV.of(name,  1));
    }
}

class DoFn3 extends AbstractCloudBigtableTableDoFn<KV<String, Integer>, String>
{   
    public DoFn3(CloudBigtableConfiguration config)
    {
        super(config);
    }

    @Override
    public void processElement(ProcessContext c) 
    {
        try
        {
            Integer max = c.element().getValue();
            for(int i = 0; i<max; i++)
            {
                String owner = c.element().getKey();
                String rnd = UUID.randomUUID().toString();  

                Put p = new Put(Bytes.toBytes(owner+"*"+rnd));
                p.addColumn(Bytes.toBytes(DEF.ID_CF1), Bytes.toBytes("Owner"), Bytes.toBytes(owner));
                getConnection().getTable(TableName.valueOf(DEF.ID_TBL_UNITS)).put(p);
                c.output("Success");
            }
        } catch (IOException e)
        {
            c.output(e.toString());
            e.printStackTrace();
        }
    }
}

输入的 .csv 文件看起来像这样:
玛丽,3000
约翰,5000
彼得,2000
因此,对于 .csv 文件中的每一行,我必须将 x 行数放入 BigTable,其中 x 是 .csv 文件中的第二个单元格...

我们为此构建了 AbstractCloudBigtableTableDoFn ( Source & Docs )。扩展 class 而不是 DoFn,并调用 getConnection() 而不是自己创建连接。

10,000 个小行实际工作需要一两秒。

编辑:根据评论,应使用 BufferedMutator 而不是 Table.put() 以获得最佳吞吐量。