Python Bigtable 客户在 deepcopy 上花费了很长时间

Python Bigtable client spends ages on deepcopy

我有一个 Python Kafka 消费者 有时 从 Bigtable 读取数据非常慢。它从 Bigtable 中读取一行,执行一些计算并偶尔写回一些信息,然后继续。

问题是,从 GCE 中的 1 vCPU VM,它 reads/writes 非常快,消费者可以咀嚼 100​​-150 messages/s。没问题。

但是,当部署在多区域 (europe-west1-b/c/d) 的生产 Kubernetes 集群 (GKE) 上时,它会经历大约 0.5 messages/s。 是 - 每条消息 2 秒。

Bigtable 在 europe-west1-d - 但 pods 调度在同一区域 (d) 的节点上,与 pods 在其他区域的节点上具有相同的性能,这很奇怪。

pod 不断达到 CPU 限制 (1 vCPU)。分析程序表明,大部分时间 (95%) 都花在 PartialRowData.cells() 函数内部,在 copy.py:132(deepcopy)

它使用最新的 google-cloud-bigtable==0.29.0 包。

现在,我知道该软件包处于 alpha 阶段,但究竟是什么因素导致性能大幅降低 300 倍?

读取行数据的代码是这样的:

def _row_to_dict(cls, row):
    if row is None:
        return {}

    item_dict = {}

    if COLUMN_FAMILY in row.cells:
        structured_cells = {}
        for field_name in STRUCTURED_STATIC_FIELDS:
            if field_name.encode() in row.cells[COLUMN_FAMILY]:
                structured_cells[field_name] = row.cells[COLUMN_FAMILY][field_name.encode()][
                    0].value.decode()
        item_dict[COLUMN_FAMILY] = structured_cells

    return item_dict

传入的row来自

row = self.bt_table.read_row(row_key, filter_=filter_)

并且可能有大约 50 个 STRUCTURED_STATIC_FIELDS

deepcopy真的只是需要很长时间才能复制吗?还是在等待 Bigtable 的数据传输?我是否以某种方式滥用了图书馆? 关于如何提高性能的任何指示?

非常感谢。

事实证明,库将 row.cells 的 getter 定义为:

@property
def cells(self):
    """Property returning all the cells accumulated on this partial row.

    :rtype: dict
    :returns: Dictionary of the :class:`Cell` objects accumulated. This
              dictionary has two-levels of keys (first for column families
              and second for column names/qualifiers within a family). For
              a given column, a list of :class:`Cell` objects is stored.
    """
    return copy.deepcopy(self._cells)

因此,除了查找之外,对字典的每次调用都在执行 deepcopy

添加

row_cells = row.cells 

随后仅提及该问题已解决。

dev/prod 环境的性能差异还利用了这样一个事实,prod table 已经有更多 timestamps/versions 个单元,而 dev table只有一对。这使得必须进行深度复制的返回词典变得更大。

将现有的过滤器与 CellsColumnLimitFilter 链接起来有助于进一步:

filter_ = RowFilterChain(filters=[filter_, CellsColumnLimitFilter(num_cells=1)])