Pandas 使用多列滚动应用

Pandas rolling apply using multiple columns

我正尝试在多个列上使用 pandas.DataFrame.rolling.apply() 滚动函数。 Python版本是3.7,pandas是1.0.2。

import pandas as pd

#function to calculate
def masscenter(x):
    print(x); # for debug purposes
    return 0;

#simple DF creation routine
df = pd.DataFrame( [['02:59:47.000282', 87.60, 739],
                    ['03:00:01.042391', 87.51, 10],
                    ['03:00:01.630182', 87.51, 10],
                    ['03:00:01.635150', 88.00, 792],
                    ['03:00:01.914104', 88.00, 10]], 
                   columns=['stamp', 'price','nQty'])
df['stamp'] = pd.to_datetime(df2['stamp'], format='%H:%M:%S.%f')
df.set_index('stamp', inplace=True, drop=True)

'stamp' 是单调且唯一的,'price' 是双精度且不包含 NaN,'nQty' 是整数且也不包含 NaN。

所以,我需要计算滚动'center of mass',即sum(price*nQty)/sum(nQty)

到目前为止我尝试了什么:

df.apply(masscenter, axis = 1)

masscenter 被单行调用 5 次,输出类似于

price     87.6
nQty     739.0
Name: 1900-01-01 02:59:47.000282, dtype: float64

需要输入 masscenter,因为我可以使用 x[0], x[1] 轻松访问 pricenQty。但是,我坚持 rolling.apply() 阅读文档 DataFrame.rolling() and rolling.apply() 我认为在 rolling() 中使用 'axis' 和在 apply 中使用 'raw' 可以实现类似的行为。天真的方法

rol = df.rolling(window=2)
rol.apply(masscenter)

逐行打印(行数增加到 window 大小)

stamp
1900-01-01 02:59:47.000282    87.60
1900-01-01 03:00:01.042391    87.51
dtype: float64

然后

stamp
1900-01-01 02:59:47.000282    739.0
1900-01-01 03:00:01.042391     10.0
dtype: float64

因此,列分别传递给 masscenter(预期)。

遗憾的是,在文档中几乎没有任何关于 'axis' 的信息。然而,下一个变体显然是

rol = df.rolling(window=2, axis = 1)
rol.apply(masscenter)

从不跟注 masscenter 并加注 ValueError in rol.apply(..)

> Length of passed values is 1, index implies 5

我承认由于缺少文档,我不确定 'axis' 参数及其工作原理。这是问题的第一部分: 这是怎么回事?如何正确使用'axis'?它的用途是什么?

当然之前也有答案,即:

How-to-apply-a-function-to-two-columns-of-pandas-dataframe
它适用于整个 DataFrame,而不适用于 Rolling。

How-to-invoke-pandas-rolling-apply-with-parameters-from-multiple-column
答案建议编写我自己的 roll 函数,但对我来说,罪魁祸首与 comments 中的问题相同:如果一个人需要使用偏移量 window 大小(例如 '1T') -统一时间戳?
我不喜欢从头开始重新发明轮子的想法。此外,我想对所有内容使用 pandas,以防止从 pandas 和 'self-made roll' 获得的集合之间的不一致。 这个问题还有另一个答案,建议单独填充数据框并计算我需要的任何东西,但这行不通:存储数据的大小将是巨大的。 这里提出了相同的想法:

此处发布了另一个问答

它很好并且最接近我的问题,但是同样,不可能使用偏移 window 大小 (window = '1T').

一些答案是在 pandas 1.0 出来之前问的,鉴于文档可能会更好,我希望现在可以同时滚动多个列。

问题的第二部分是: 是否有可能使用 pandas 1.0.x 偏移量 window 大小同时翻转多个列?

非常感谢。

您可以使用 numpy_ext 模块中的 rolling_apply 函数:

import numpy as np
import pandas as pd
from numpy_ext import rolling_apply


def masscenter(price, nQty):
    return np.sum(price * nQty) / np.sum(nQty)


df = pd.DataFrame( [['02:59:47.000282', 87.60, 739],
                    ['03:00:01.042391', 87.51, 10],
                    ['03:00:01.630182', 87.51, 10],
                    ['03:00:01.635150', 88.00, 792],
                    ['03:00:01.914104', 88.00, 10]], 
                   columns=['stamp', 'price','nQty'])
df['stamp'] = pd.to_datetime(df['stamp'], format='%H:%M:%S.%f')
df.set_index('stamp', inplace=True, drop=True)

window = 2
df['y'] = rolling_apply(masscenter, window, df.price.values, df.nQty.values)
print(df)

                            price  nQty          y
stamp                                             
1900-01-01 02:59:47.000282  87.60   739        NaN
1900-01-01 03:00:01.042391  87.51    10  87.598798
1900-01-01 03:00:01.630182  87.51    10  87.510000
1900-01-01 03:00:01.635150  88.00   792  87.993890
1900-01-01 03:00:01.914104  88.00    10  88.000000

所以我发现没有办法翻转两列,但是没有内置的 pandas 函数。 代码如下。

# function to find an index corresponding
# to current value minus offset value
def prevInd(series, offset, date):
    offset = to_offset(offset)
    end_date = date - offset
    end = series.index.searchsorted(end_date, side="left")
    return end

# function to find an index corresponding
# to the first value greater than current
# it is useful when one has timeseries with non-unique
# but monotonically increasing values
def nextInd(series, date):
    end = series.index.searchsorted(date, side="right")
    return end

def twoColumnsRoll(dFrame, offset, usecols, fn, columnName = 'twoColRol'):
    # find all unique indices
    uniqueIndices = dFrame.index.unique()
    numOfPoints = len(uniqueIndices)
    # prepare an output array
    moving = np.zeros(numOfPoints)
    # nameholders
    price = dFrame[usecols[0]]
    qty   = dFrame[usecols[1]]

    # iterate over unique indices
    for ii in range(numOfPoints):
        # nameholder
        pp = uniqueIndices[ii]
        # right index - value greater than current
        rInd = afta.nextInd(dFrame,pp)
        # left index - the least value that 
        # is bigger or equal than (pp - offset)
        lInd = afta.prevInd(dFrame,offset,pp)
        # call the actual calcuating function over two arrays
        moving[ii] = fn(price[lInd:rInd], qty[lInd:rInd])
    # construct and return DataFrame
    return pd.DataFrame(data=moving,index=uniqueIndices,columns=[columnName])

此代码有效,但速度相对较慢且效率低下。我想可以使用 How to invoke pandas.rolling.apply with parameters from multiple column? 中的 numpy.lib.stride_tricks 来加快速度。 然而,要么做大要么回家——我结束了用 C++ 编写一个函数和它的包装器。
我不想 post 它作为答案,因为它是一种解决方法,我没有回答我的问题的任何部分,但评论太长了。

这个怎么样:

def masscenter(ser):
    print(df.loc[ser.index])
    return 0

rol = df.price.rolling(window=2)
rol.apply(masscenter, raw=False)

它使用滚动逻辑从任意列中获取子集。 raw=False 选项为您提供这些子集的索引值(作为系列提供给您),然后您使用这些索引值从原始 DataFrame 中获取多列切片。

参考@saninstein 的精彩回答。

安装 numpy_ext 来自:https://pypi.org/project/numpy-ext/

import numpy as np
import pandas as pd
from numpy_ext import rolling_apply as rolling_apply_ext

def box_sum(a,b):
    return np.sum(a) + np.sum(b)

df = pd.DataFrame({"x": [1,2,3,4], "y": [1,2,3,4]})

window = 2
df["sum"] = rolling_apply_ext(box_sum, window , df.x.values, df.y.values)

输出:

print(df.to_string(index=False))
 x  y  sum
 1  1  NaN
 2  2  6.0
 3  3 10.0
 4  4 14.0

备注

  • 滚动函数对时间序列友好。它默认总是向后看,所以 6 是数组中当前值和过去值的总和。
  • 在上面的示例中,将 rolling_apply 导入为 rolling_apply_ext,因此它不可能干扰对 Pandas rolling_apply 的任何现有调用(感谢@LudoSchmidt 的评论) .

附带说明一下,我放弃了使用 Pandas 的尝试。它从根本上被打破了:它处理单列聚合并应用时几乎没有问题,但是当试图让它与更多两列或更多列一起工作时,它是一个过于复杂的 rube-goldberg 机器。