有效地转换大 pandas DataFrame

Convert big pandas DataFrame efficiently

我有几个指标(每秒 2000 行 x 100 列数据帧,可能更大)我想将它们存储在 OpenTSDB 中。为此,我需要以数据库理解的方式格式化值,telnet style or json style.

问题是使用天真的 python 函数,我无法足够快地处理它们。这是我的第一种方法:

def ndarray_to_opentsdb_series_comprehension(frame, table, **tags):
    series = [{
        "metric": '{table}.{col}'.format(
            table=table, col=frame.columns[col].item()
         ),
         "timestamp": frame.index[idx].item(),
         "value": val.item(),
         "tags": tags
    } for col, serie in frame.iteritems() for idx, val in serie.iteritems()]
    return json.dumps(series)

在 2000x100 数据帧上使用 timeit,我得到:

In [1]: %timeit utilities.ndarray_to_opentsdb_series_comprehension(f, 't1', p1='p1')
1 loops, best of 3: 3.9 s per loop

然后我尝试使用 DataFrame.apply 函数更有效地迭代我的数据,但我必须这样做几次才能获得我需要的所有信息:

def ndarray_to_opentsdb_series_tmp_df(frame, table, **tags):
    tags_str = ' '.join('{k}={v}'.format(k=k, v=v) for k, v in tags.items())
    df = frame.apply(lambda s: s.apply(lambda e: '{ts} {v} {tags}'.format(ts=s.name, v=e, tags=tags_str)), axis=1)
    df = df.apply(lambda s: s.apply(lambda e: '{table}.{col} {v}'.format(table=table, col=s.name, v=e)), axis=0)
    flat = [e for l in df.values.tolist() for e in l]
    return '\n'.join(flat)

(我尝试了其他没有创建多个数据帧的实现,但它大致和这个一样快)。

在这里,timeit 说:

In[1]: %timeit utilities.ndarray_to_opentsdb_series_tmp_df(f, 't1', p1='p1')
1 loops, best of 3: 2.59 s per loop

我已经获得了超过一秒的时间,但这仍然不够,我需要能够在一秒内处理那么多数据。在测试期间,我意识到最耗时的是在我的 DataFrame 中检索给定值的索引列对,但我需要这些来构建我的 OpenTSDB 请求。

有没有办法只使用 python 来处理大数据帧,或者我应该尝试在 Cython 中实现这个逻辑?我知道我可以获得巨大的改进,但我想在尝试使用低级语言进行优化之前确保我拥有最佳 python 代码。

好吧,我设法在 ~.5 秒内处理了我的 2000 行 x 100 列 DataFrame。在 ipython 中使用 prun,我发现访问 frame.columnsframe.index 非常昂贵,使用 string.format.

也是如此

我选择首先使用他们的 tolist() 方法将我的 DataFrame 的列和索引转换为 Python 列表,然后对其进行索引。我停止使用 string.format 并改用 % 格式化程序(仅此一项就减少了我函数的执行时间!)。

此外,我使用 DataFrame.apply 方法的 raw 属性来获取 numpy.ndarray 作为我的 lambda 函数的参数,而不是 pandas.Series。然后我使用列表理解对其进行迭代。

这是我修改后的函数:

def ndarray_to_opentsdb_series(frame, table, **tags):
    tags_str = ' '.join('{k}={v}'.format(k=k, v=v) for k, v in tags.items())
    indices = frame.index.tolist()
    columns = frame.columns.tolist()
    df = frame.apply(lambda s: ['%d %d %s' % (indices[i], e, tags_str) for i, e in enumerate(s)], axis=0, raw=True)
    df = df.apply(lambda s: ['%s.%d %s' % (table, columns[i], e) for i, e in enumerate(s)], axis=1, raw=True)
    flat = [e for l in df.values.tolist() for e in l]
    return '\n'.join(flat)

简单地编译它作为 Cython 代码将 运行 时间再减少 100 毫秒,我现在将尝试在 Cython 中对其进行更多优化。