通过 AbstractCloudBigtableTableDoFn 重新使用 Bigtable 连接
Re-using Bigtable connection with AbstractCloudBigtableTableDoFn
我有一个扩展 AbstractCloudBigtableTableDoFn<>
的 DoFn
,以便向 Bigtable 发送频繁的 Buffered Mutation 请求。
当我 运行 在云端工作时,我在数据流管道的这一步看到重复的日志条目,如下所示:
Opening connection for projectId XXX, instanceId XXX, on data host batch-bigtable.googleapis.com, table admin host bigtableadmin.googleapis.com...
和
Bigtable options: BigtableOptions{XXXXX (lots of option entries here}
DoFn
中的代码如下所示:
@ProcessElement
public void processElement(ProcessContext c)
{
try
{
BufferedMutator mPutUnit = getConnection().getBufferedMutator(TableName.valueOf(TABLE_NAME));
for (CONDITION)
{
// create lots of different rowsIDs
Put p = new Put(newRowID).addColumn(COL_FAMILY, COL_NAME, COL_VALUE);
mPutUnit.mutate(p);
}
mPutUnit.close();
} catch (IOException e){e.printStackTrace();}
c.output(0);
}
此 DoFn
被频繁调用。
我是否应该担心 Dataflow 会在每次调用此 DoFn
时尝试重新建立与 Bigtable 的连接?我的印象是继承此 class 应该确保在所有调用中重复使用到 Bigtable 的单个连接?
"Opening connection for projectId ..." 应该在每个 Worker 每个 AbstractCloudBigtableTableDoFn 实例中出现一次。您能否仔细检查每个呼叫而不是每个工作人员是否正在打开连接?
- 将工人数量限制在少数人
- 在堆栈驱动程序中,展开 "Opening connection for projectId" 消息并检查 jsonPayload.worker 是否在不同的日志消息中重复。
另外,请问您使用的是什么版本的客户端,是什么版本的beam?
谢谢!
回答您的问题...
是的,您应该担心 Dataflow 会在每次调用 DoFn
时尝试重新建立与 Bigtable 的连接。 AbstractCloudBigtableDoFn
的预期行为是每个工作人员维护一个 Connection
实例。
不,从 AbstractCloudBigtableDoFn
继承并不能确保对 DoFn
的每次调用都重复使用单个 Connection
实例。这是不可能的,因为 DoFn
是根据为 Dataflow 作业分配的工作人员数量跨多个物理机器序列化的。
首先,确保 Bigtable 没有 connection/authentication 问题。有时,Dataflow 需要重新建立与 Bigtable 的连接。但是,预计每次调用 DoFn
时都这样做。
我有一个扩展 AbstractCloudBigtableTableDoFn<>
的 DoFn
,以便向 Bigtable 发送频繁的 Buffered Mutation 请求。
当我 运行 在云端工作时,我在数据流管道的这一步看到重复的日志条目,如下所示:
Opening connection for projectId XXX, instanceId XXX, on data host batch-bigtable.googleapis.com, table admin host bigtableadmin.googleapis.com...
和
Bigtable options: BigtableOptions{XXXXX (lots of option entries here}
DoFn
中的代码如下所示:
@ProcessElement
public void processElement(ProcessContext c)
{
try
{
BufferedMutator mPutUnit = getConnection().getBufferedMutator(TableName.valueOf(TABLE_NAME));
for (CONDITION)
{
// create lots of different rowsIDs
Put p = new Put(newRowID).addColumn(COL_FAMILY, COL_NAME, COL_VALUE);
mPutUnit.mutate(p);
}
mPutUnit.close();
} catch (IOException e){e.printStackTrace();}
c.output(0);
}
此 DoFn
被频繁调用。
我是否应该担心 Dataflow 会在每次调用此 DoFn
时尝试重新建立与 Bigtable 的连接?我的印象是继承此 class 应该确保在所有调用中重复使用到 Bigtable 的单个连接?
"Opening connection for projectId ..." 应该在每个 Worker 每个 AbstractCloudBigtableTableDoFn 实例中出现一次。您能否仔细检查每个呼叫而不是每个工作人员是否正在打开连接?
- 将工人数量限制在少数人
- 在堆栈驱动程序中,展开 "Opening connection for projectId" 消息并检查 jsonPayload.worker 是否在不同的日志消息中重复。
另外,请问您使用的是什么版本的客户端,是什么版本的beam?
谢谢!
回答您的问题...
是的,您应该担心 Dataflow 会在每次调用 DoFn
时尝试重新建立与 Bigtable 的连接。 AbstractCloudBigtableDoFn
的预期行为是每个工作人员维护一个 Connection
实例。
不,从 AbstractCloudBigtableDoFn
继承并不能确保对 DoFn
的每次调用都重复使用单个 Connection
实例。这是不可能的,因为 DoFn
是根据为 Dataflow 作业分配的工作人员数量跨多个物理机器序列化的。
首先,确保 Bigtable 没有 connection/authentication 问题。有时,Dataflow 需要重新建立与 Bigtable 的连接。但是,预计每次调用 DoFn
时都这样做。