在 DataFlow DoFn 子 类 之间共享 BigTable 连接对象
Sharing BigTable Connection object among DataFlow DoFn sub-classes
我正在 DataFlow 中设置一个 Java 管道来读取 .csv
文件并根据文件的内容创建一堆 BigTable 行。我在 BigTable 文档中看到连接到 BigTable 是一个 'expensive' 操作,最好只执行一次并在需要它的函数之间共享连接。
但是,如果我在 main class 中将 Connection 对象声明为 public static
变量,并首先在 main 函数中连接到 BigTable,我随后会得到 NullPointerException
尝试在 DoFn
sub-classes' processElement()
函数的实例中引用连接作为我的 DataFlow 管道的一部分。
相反,如果我在实际DoFn
class中将Connection声明为静态变量,则操作成功。
执行此操作的最佳做法或最佳方法是什么?
我担心如果大规模实施第二个选项,我会浪费大量时间和资源。如果我在 DoFn
class 中将变量保持为静态,是否足以确保 API 不会每次都尝试重新建立连接?
我知道有一个特殊的 BigTable I/O 调用来将 DataFlow 管道对象与 BigTable 同步,但我认为我需要自己编写一个以将一些特殊逻辑内置到 DoFn
processElement()
函数...
这就是 "working" 代码的样子:
class DigitizeBT extends DoFn<String, String>{
private static Connection m_locConn;
@Override
public void processElement(ProcessContext c)
{
try
{
m_locConn = BigtableConfiguration.connect("projectID", "instanceID");
Table tbl = m_locConn.getTable(TableName.valueOf("TableName"));
Put put = new Put(Bytes.toBytes(rowKey));
put.addColumn(
Bytes.toBytes("CF1"),
Bytes.toBytes("SomeName"),
Bytes.toBytes("SomeValue"));
tbl.put(put);
}
catch (IOException e)
{
e.printStackTrace();
System.exit(1);
}
}
}
这是更新后的代码,仅供参考:
public void SmallKVJob()
{
CloudBigtableScanConfiguration config = new CloudBigtableScanConfiguration.Builder()
.withProjectId(DEF.ID_PROJ)
.withInstanceId(DEF.ID_INST)
.withTableId(DEF.ID_TBL_UNITS)
.build();
DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
options.setProject(DEF.ID_PROJ);
options.setStagingLocation(DEF.ID_STG_LOC);
// options.setNumWorkers(3);
// options.setMaxNumWorkers(5);
// options.setRunner(BlockingDataflowPipelineRunner.class);
options.setRunner(DirectPipelineRunner.class);
Pipeline p = Pipeline.create(options);
p.apply(TextIO.Read.from(DEF.ID_BAL))
.apply(ParDo.of(new DoFn1()))
.apply(ParDo.of(new DoFn2()))
.apply(ParDo.of(new DoFn3(config)));
m_log.info("starting to run the job");
p.run();
m_log.info("finished running the job");
}
}
class DoFn1 extends DoFn<String, KV<String, Integer>>
{
@Override
public void processElement(ProcessContext c)
{
c.output(KV.of(c.element().split("\,")[0],Integer.valueOf(c.element().split("\,")[1])));
}
}
class DoFn2 extends DoFn<KV<String, Integer>, KV<String, Integer>>
{
@Override
public void processElement(ProcessContext c)
{
int max = c.element().getValue();
String name = c.element().getKey();
for(int i = 0; i<max;i++)
c.output(KV.of(name, 1));
}
}
class DoFn3 extends AbstractCloudBigtableTableDoFn<KV<String, Integer>, String>
{
public DoFn3(CloudBigtableConfiguration config)
{
super(config);
}
@Override
public void processElement(ProcessContext c)
{
try
{
Integer max = c.element().getValue();
for(int i = 0; i<max; i++)
{
String owner = c.element().getKey();
String rnd = UUID.randomUUID().toString();
Put p = new Put(Bytes.toBytes(owner+"*"+rnd));
p.addColumn(Bytes.toBytes(DEF.ID_CF1), Bytes.toBytes("Owner"), Bytes.toBytes(owner));
getConnection().getTable(TableName.valueOf(DEF.ID_TBL_UNITS)).put(p);
c.output("Success");
}
} catch (IOException e)
{
c.output(e.toString());
e.printStackTrace();
}
}
}
输入的 .csv 文件看起来像这样:
玛丽,3000
约翰,5000
彼得,2000
因此,对于 .csv 文件中的每一行,我必须将 x 行数放入 BigTable,其中 x 是 .csv 文件中的第二个单元格...
我正在 DataFlow 中设置一个 Java 管道来读取 .csv
文件并根据文件的内容创建一堆 BigTable 行。我在 BigTable 文档中看到连接到 BigTable 是一个 'expensive' 操作,最好只执行一次并在需要它的函数之间共享连接。
但是,如果我在 main class 中将 Connection 对象声明为 public static
变量,并首先在 main 函数中连接到 BigTable,我随后会得到 NullPointerException
尝试在 DoFn
sub-classes' processElement()
函数的实例中引用连接作为我的 DataFlow 管道的一部分。
相反,如果我在实际DoFn
class中将Connection声明为静态变量,则操作成功。
执行此操作的最佳做法或最佳方法是什么?
我担心如果大规模实施第二个选项,我会浪费大量时间和资源。如果我在 DoFn
class 中将变量保持为静态,是否足以确保 API 不会每次都尝试重新建立连接?
我知道有一个特殊的 BigTable I/O 调用来将 DataFlow 管道对象与 BigTable 同步,但我认为我需要自己编写一个以将一些特殊逻辑内置到 DoFn
processElement()
函数...
这就是 "working" 代码的样子:
class DigitizeBT extends DoFn<String, String>{
private static Connection m_locConn;
@Override
public void processElement(ProcessContext c)
{
try
{
m_locConn = BigtableConfiguration.connect("projectID", "instanceID");
Table tbl = m_locConn.getTable(TableName.valueOf("TableName"));
Put put = new Put(Bytes.toBytes(rowKey));
put.addColumn(
Bytes.toBytes("CF1"),
Bytes.toBytes("SomeName"),
Bytes.toBytes("SomeValue"));
tbl.put(put);
}
catch (IOException e)
{
e.printStackTrace();
System.exit(1);
}
}
}
这是更新后的代码,仅供参考:
public void SmallKVJob()
{
CloudBigtableScanConfiguration config = new CloudBigtableScanConfiguration.Builder()
.withProjectId(DEF.ID_PROJ)
.withInstanceId(DEF.ID_INST)
.withTableId(DEF.ID_TBL_UNITS)
.build();
DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
options.setProject(DEF.ID_PROJ);
options.setStagingLocation(DEF.ID_STG_LOC);
// options.setNumWorkers(3);
// options.setMaxNumWorkers(5);
// options.setRunner(BlockingDataflowPipelineRunner.class);
options.setRunner(DirectPipelineRunner.class);
Pipeline p = Pipeline.create(options);
p.apply(TextIO.Read.from(DEF.ID_BAL))
.apply(ParDo.of(new DoFn1()))
.apply(ParDo.of(new DoFn2()))
.apply(ParDo.of(new DoFn3(config)));
m_log.info("starting to run the job");
p.run();
m_log.info("finished running the job");
}
}
class DoFn1 extends DoFn<String, KV<String, Integer>>
{
@Override
public void processElement(ProcessContext c)
{
c.output(KV.of(c.element().split("\,")[0],Integer.valueOf(c.element().split("\,")[1])));
}
}
class DoFn2 extends DoFn<KV<String, Integer>, KV<String, Integer>>
{
@Override
public void processElement(ProcessContext c)
{
int max = c.element().getValue();
String name = c.element().getKey();
for(int i = 0; i<max;i++)
c.output(KV.of(name, 1));
}
}
class DoFn3 extends AbstractCloudBigtableTableDoFn<KV<String, Integer>, String>
{
public DoFn3(CloudBigtableConfiguration config)
{
super(config);
}
@Override
public void processElement(ProcessContext c)
{
try
{
Integer max = c.element().getValue();
for(int i = 0; i<max; i++)
{
String owner = c.element().getKey();
String rnd = UUID.randomUUID().toString();
Put p = new Put(Bytes.toBytes(owner+"*"+rnd));
p.addColumn(Bytes.toBytes(DEF.ID_CF1), Bytes.toBytes("Owner"), Bytes.toBytes(owner));
getConnection().getTable(TableName.valueOf(DEF.ID_TBL_UNITS)).put(p);
c.output("Success");
}
} catch (IOException e)
{
c.output(e.toString());
e.printStackTrace();
}
}
}
输入的 .csv 文件看起来像这样:
玛丽,3000
约翰,5000
彼得,2000
因此,对于 .csv 文件中的每一行,我必须将 x 行数放入 BigTable,其中 x 是 .csv 文件中的第二个单元格...