在 pandas groupby 和面板数据中应用自定义函数

Apply customized functions in pandas groupby and panel data

我有一个面板数据如下:

                     volume   VWAP      open    close     high         low   n    ticker      date
time                                    
2021-09-02 09:30:00 597866  110.2781    110.32  110.37  110.4900    110.041 3719.0  AMD 2021-09-02
2021-09-02 09:31:00 512287  109.9928    110.36  109.85  110.4000    109.725 3732.0  AMD 2021-09-02
2021-09-02 09:32:00 359379  109.7271    109.81  109.89  109.9600    109.510 2455.0  AMD 2021-09-02
2021-09-02 09:33:00 368225  109.5740    109.89  109.66  109.8900    109.420 2555.0  AMD 2021-09-02
2021-09-02 09:34:00 320260  109.5616    109.67  109.45  109.8299    109.390 2339.0  AMD 2021-09-02
... ... ... ... ... ... ... ... ... ...
2021-12-31 15:56:00 62680   3334.8825   3332.24 3337.60 3337.8500   3331.890    2334.0  AMZN    2021-12-31
2021-12-31 15:57:00 26046   3336.0700   3337.70 3335.72 3338.6000   3334.990    1292.0  AMZN    2021-12-31
2021-12-31 15:58:00 47989   3336.3885   3334.65 3337.23 3338.0650   3334.650    1651.0  AMZN    2021-12-31
2021-12-31 15:59:00 63865   3335.5288   3336.70 3334.72 3337.3700   3334.180    2172.0  AMZN    2021-12-31
2021-12-31 16:00:00 1974    3334.8869   3334.34 3334.34 3334.3400   3334.340    108.0   AMZN    2021-12-31
153700 rows × 9 columns

我想根据面板数据计算一系列属性。这些函数是预先编写的并发布在 github https://github.com/twopirllc/pandas-ta/blob/main/pandas_ta/overlap/ema.py 上。在詹森医生的例子中,他使用了

import pandas_ta as ta
import pandas as pd

df["feature"] = df.groupby("ticker", group_keys = False).apply(lambda x: ta.ema(x.close))

我能够在 python 3.7 下使用 google 云的计算引擎。但是,当我将我学校的集群与 python 3.8 一起使用时,即使使用相同的 pandas 版本,它也不起作用。我也尝试使用相同版本的 python。不幸的是,它也没有用。

df.groupby("ticker").apply(lambda x: ta.ema(x.close, 200))

output:
ticker  time               
AAPL    2021-09-02 09:30:00            NaN
        2021-09-02 09:31:00            NaN
        2021-09-02 09:32:00            NaN
        2021-09-02 09:33:00            NaN
        2021-09-02 09:34:00            NaN
                                  ...     
TSLA    2021-12-31 15:56:00    1064.446659
        2021-12-31 15:57:00    1064.358135
        2021-12-31 15:58:00    1064.278452
        2021-12-31 15:59:00    1064.207621
        2021-12-31 16:00:00    1064.135904
Name: EMA_200, Length: 153700, dtype: float64
df["alpha_01"] = df.groupby("ticker").apply(lambda x: ta.ema(x.close))

output:
---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
File ~/quant/lib/python3.8/site-packages/pandas/core/frame.py:10772, in _reindex_for_setitem(value, index)
  10771 try:
> 10772     reindexed_value = value.reindex(index)._values
  10773 except ValueError as err:
  10774     # raised in MultiIndex.from_tuples, see test_insert_error_msmgs

File ~/quant/lib/python3.8/site-packages/pandas/core/series.py:4579, in Series.reindex(self, index, **kwargs)
   4571 @doc(
   4572     NDFrame.reindex,  # type: ignore[has-type]
   4573     klass=_shared_doc_kwargs["klass"],
   (...)
   4577 )
   4578 def reindex(self, index=None, **kwargs):
-> 4579     return super().reindex(index=index, **kwargs)

File ~/quant/lib/python3.8/site-packages/pandas/core/generic.py:4809, in NDFrame.reindex(self, *args, **kwargs)
   4808 # perform the reindex on the axes
-> 4809 return self._reindex_axes(
   4810     axes, level, limit, tolerance, method, fill_value, copy
   4811 ).__finalize__(self, method="reindex")

File ~/quant/lib/python3.8/site-packages/pandas/core/generic.py:4825, in NDFrame._reindex_axes(self, axes, level, limit, tolerance, method, fill_value, copy)
   4824 ax = self._get_axis(a)
-> 4825 new_index, indexer = ax.reindex(
   4826     labels, level=level, limit=limit, tolerance=tolerance, method=method
   4827 )
   4829 axis = self._get_axis_number(a)

File ~/quant/lib/python3.8/site-packages/pandas/core/indexes/multi.py:2533, in MultiIndex.reindex(self, target, method, level, limit, tolerance)
   2532 try:
-> 2533     target = MultiIndex.from_tuples(target)
   2534 except TypeError:
   2535     # not all tuples, see test_constructor_dict_multiindex_reindex_flat

File ~/quant/lib/python3.8/site-packages/pandas/core/indexes/multi.py:202, in names_compat.<locals>.new_meth(self_or_cls, *args, **kwargs)
    200     kwargs["names"] = kwargs.pop("name")
--> 202 return meth(self_or_cls, *args, **kwargs)

File ~/quant/lib/python3.8/site-packages/pandas/core/indexes/multi.py:553, in MultiIndex.from_tuples(cls, tuples, sortorder, names)
    551         tuples = np.asarray(tuples._values)
--> 553     arrays = list(lib.tuples_to_object_array(tuples).T)
    554 elif isinstance(tuples, list):

File ~/quant/lib/python3.8/site-packages/pandas/_libs/lib.pyx:2919, in pandas._libs.lib.tuples_to_object_array()

ValueError: cannot include dtype 'M' in a buffer

The above exception was the direct cause of the following exception:

TypeError                                 Traceback (most recent call last)
Input In [19], in <cell line: 1>()
----> 1 df_features["alpha_01"] = df.groupby("ticker").apply(lambda x: ta.ema(x.close))

File ~/quant/lib/python3.8/site-packages/pandas/core/frame.py:3607, in DataFrame.__setitem__(self, key, value)
   3604     self._setitem_array([key], value)
   3605 else:
   3606     # set column
-> 3607     self._set_item(key, value)

File ~/quant/lib/python3.8/site-packages/pandas/core/frame.py:3779, in DataFrame._set_item(self, key, value)
   3769 def _set_item(self, key, value) -> None:
   3770     """
   3771     Add series to DataFrame in specified column.
   3772 
   (...)
   3777     ensure homogeneity.
   3778     """
-> 3779     value = self._sanitize_column(value)
   3781     if (
   3782         key in self.columns
   3783         and value.ndim == 1
   3784         and not is_extension_array_dtype(value)
   3785     ):
   3786         # broadcast across multiple columns if necessary
   3787         if not self.columns.is_unique or isinstance(self.columns, MultiIndex):

File ~/quant/lib/python3.8/site-packages/pandas/core/frame.py:4501, in DataFrame._sanitize_column(self, value)
   4499 # We should never get here with DataFrame value
   4500 if isinstance(value, Series):
-> 4501     return _reindex_for_setitem(value, self.index)
   4503 if is_list_like(value):
   4504     com.require_length_match(value, self.index)

File ~/quant/lib/python3.8/site-packages/pandas/core/frame.py:10779, in _reindex_for_setitem(value, index)
  10775     if not value.index.is_unique:
  10776         # duplicate axis
  10777         raise err
> 10779     raise TypeError(
  10780         "incompatible index of inserted column with frame index"
  10781     ) from err
  10782 return reindexed_value

TypeError: incompatible index of inserted column with frame index
df_features["alpha_01"] = df.groupby("ticker", group_keys = False).apply(lambda x: ta.ema(x.close))

output:
---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
Input In [20], in <cell line: 1>()
----> 1 df_features["alpha_01"] = df.groupby("ticker", group_keys = False).apply(lambda x: ta.ema(x.close))

File ~/quant/lib/python3.8/site-packages/pandas/core/frame.py:3607, in DataFrame.__setitem__(self, key, value)
   3604     self._setitem_array([key], value)
   3605 else:
   3606     # set column
-> 3607     self._set_item(key, value)

File ~/quant/lib/python3.8/site-packages/pandas/core/frame.py:3779, in DataFrame._set_item(self, key, value)
   3769 def _set_item(self, key, value) -> None:
   3770     """
   3771     Add series to DataFrame in specified column.
   3772 
   (...)
   3777     ensure homogeneity.
   3778     """
-> 3779     value = self._sanitize_column(value)
   3781     if (
   3782         key in self.columns
   3783         and value.ndim == 1
   3784         and not is_extension_array_dtype(value)
   3785     ):
   3786         # broadcast across multiple columns if necessary
   3787         if not self.columns.is_unique or isinstance(self.columns, MultiIndex):

File ~/quant/lib/python3.8/site-packages/pandas/core/frame.py:4501, in DataFrame._sanitize_column(self, value)
   4499 # We should never get here with DataFrame value
   4500 if isinstance(value, Series):
-> 4501     return _reindex_for_setitem(value, self.index)
   4503 if is_list_like(value):
   4504     com.require_length_match(value, self.index)

File ~/quant/lib/python3.8/site-packages/pandas/core/frame.py:10777, in _reindex_for_setitem(value, index)
  10773 except ValueError as err:
  10774     # raised in MultiIndex.from_tuples, see test_insert_error_msmgs
  10775     if not value.index.is_unique:
  10776         # duplicate axis
> 10777         raise err
  10779     raise TypeError(
  10780         "incompatible index of inserted column with frame index"
  10781     ) from err
  10782 return reindexed_value

File ~/quant/lib/python3.8/site-packages/pandas/core/frame.py:10772, in _reindex_for_setitem(value, index)
  10770 # GH#4107
  10771 try:
> 10772     reindexed_value = value.reindex(index)._values
  10773 except ValueError as err:
  10774     # raised in MultiIndex.from_tuples, see test_insert_error_msmgs
  10775     if not value.index.is_unique:
  10776         # duplicate axis

File ~/quant/lib/python3.8/site-packages/pandas/core/series.py:4579, in Series.reindex(self, index, **kwargs)
   4571 @doc(
   4572     NDFrame.reindex,  # type: ignore[has-type]
   4573     klass=_shared_doc_kwargs["klass"],
   (...)
   4577 )
   4578 def reindex(self, index=None, **kwargs):
-> 4579     return super().reindex(index=index, **kwargs)

File ~/quant/lib/python3.8/site-packages/pandas/core/generic.py:4809, in NDFrame.reindex(self, *args, **kwargs)
   4806     return self._reindex_multi(axes, copy, fill_value)
   4808 # perform the reindex on the axes
-> 4809 return self._reindex_axes(
   4810     axes, level, limit, tolerance, method, fill_value, copy
   4811 ).__finalize__(self, method="reindex")

File ~/quant/lib/python3.8/site-packages/pandas/core/generic.py:4830, in NDFrame._reindex_axes(self, axes, level, limit, tolerance, method, fill_value, copy)
   4825     new_index, indexer = ax.reindex(
   4826         labels, level=level, limit=limit, tolerance=tolerance, method=method
   4827     )
   4829     axis = self._get_axis_number(a)
-> 4830     obj = obj._reindex_with_indexers(
   4831         {axis: [new_index, indexer]},
   4832         fill_value=fill_value,
   4833         copy=copy,
   4834         allow_dups=False,
   4835     )
   4837 return obj

File ~/quant/lib/python3.8/site-packages/pandas/core/generic.py:4874, in NDFrame._reindex_with_indexers(self, reindexers, fill_value, copy, allow_dups)
   4871     indexer = ensure_platform_int(indexer)
   4873 # TODO: speed up on homogeneous DataFrame objects
-> 4874 new_data = new_data.reindex_indexer(
   4875     index,
   4876     indexer,
   4877     axis=baxis,
   4878     fill_value=fill_value,
   4879     allow_dups=allow_dups,
   4880     copy=copy,
   4881 )
   4882 # If we've made a copy once, no need to make another one
   4883 copy = False

File ~/quant/lib/python3.8/site-packages/pandas/core/internals/managers.py:663, in BaseBlockManager.reindex_indexer(self, new_axis, indexer, axis, fill_value, allow_dups, copy, consolidate, only_slice)
    661 # some axes don't allow reindexing with dups
    662 if not allow_dups:
--> 663     self.axes[axis]._validate_can_reindex(indexer)
    665 if axis >= self.ndim:
    666     raise IndexError("Requested axis not found in manager")

File ~/quant/lib/python3.8/site-packages/pandas/core/indexes/base.py:3785, in Index._validate_can_reindex(self, indexer)
   3783 # trying to reindex on an axis with duplicates
   3784 if not self._index_as_unique and len(indexer):
-> 3785     raise ValueError("cannot reindex from a duplicate axis")

ValueError: cannot reindex from a duplicate axis

数据和 ipynb 可通过此 link: https://drive.google.com/drive/folders/1QnIdYnDFs8XNk7L8KFzCHC_YJPDo618t?usp=sharing

理想输出:

df["new_col"] = df.groupby().apply() # without writing any additional helper function

数据帧后面的应用函数具有以下输出:

df.groupby("ticker").apply(lambda x: ta.ema(x.close, 200))

output:
ticker  time               
AAPL    2021-09-02 09:30:00            NaN
        2021-09-02 09:31:00            NaN
        2021-09-02 09:32:00            NaN
        2021-09-02 09:33:00            NaN
        2021-09-02 09:34:00            NaN
                                  ...     
TSLA    2021-12-31 15:56:00    1064.446659
        2021-12-31 15:57:00    1064.358135
        2021-12-31 15:58:00    1064.278452
        2021-12-31 15:59:00    1064.207621
        2021-12-31 16:00:00    1064.135904
Name: EMA_200, Length: 153700, dtype: float64

我们希望附加的数据框具有相同的 multi-index 列。

df_features = df.reset_index().groupby([pd.Grouper(key = "ticker"), "time"]).sum()
df_features

out:
        volume  VWAP    open    close   high    low n
ticker  time                            
AAPL    2021-09-02 09:30:00 1844930 154.0857    153.8700    154.4300    154.4402    153.8600    9899.0
2021-09-02 09:31:00 565141  154.2679    154.4299    154.0600    154.4600    154.0600    5132.0
2021-09-02 09:32:00 524794  154.1198    154.0600    154.2339    154.3700    153.8500    4036.0
2021-09-02 09:33:00 504479  154.3071    154.2305    154.4750    154.4800    154.1600    4171.0
2021-09-02 09:34:00 794989  154.5478    154.4800    154.4906    154.7100    154.4206    5019.0
... ... ... ... ... ... ... ... ...
TSLA    2021-12-31 15:56:00 91296   1055.9030   1055.4900   1055.9400   1056.3200   1055.3200   2360.0
2021-12-31 15:57:00 104648  1056.0563   1055.9850   1055.5500   1056.4300   1055.5500   2988.0
2021-12-31 15:58:00 149130  1055.6994   1055.5500   1056.3500   1056.8000   1054.5900   3603.0
2021-12-31 15:59:00 189018  1056.4131   1056.2900   1057.1600   1057.2400   1056.0700   4214.0
2021-12-31 16:00:00 37983   1056.3289   1057.0100   1057.0000   1057.1000   1056.0000   319.0
153700 rows × 7 columns

然后将计算出的系列附加到此数据框。

df_features["alpha_01"] = df.groupby("ticker").parallel_apply(lambda x: ta.ema(x.close, 200))
df_features

out:
        volume  VWAP    open    close   high    low n   alpha_01
ticker  time                                
AAPL    2021-09-02 09:30:00 1844930 154.0857    153.8700    154.4300    154.4402    153.8600    9899.0  NaN
2021-09-02 09:31:00 565141  154.2679    154.4299    154.0600    154.4600    154.0600    5132.0  NaN
2021-09-02 09:32:00 524794  154.1198    154.0600    154.2339    154.3700    153.8500    4036.0  NaN
2021-09-02 09:33:00 504479  154.3071    154.2305    154.4750    154.4800    154.1600    4171.0  NaN
2021-09-02 09:34:00 794989  154.5478    154.4800    154.4906    154.7100    154.4206    5019.0  NaN
... ... ... ... ... ... ... ... ... ...
TSLA    2021-12-31 15:56:00 91296   1055.9030   1055.4900   1055.9400   1056.3200   1055.3200   2360.0  1064.446659
2021-12-31 15:57:00 104648  1056.0563   1055.9850   1055.5500   1056.4300   1055.5500   2988.0  1064.358135
2021-12-31 15:58:00 149130  1055.6994   1055.5500   1056.3500   1056.8000   1054.5900   3603.0  1064.278452
2021-12-31 15:59:00 189018  1056.4131   1056.2900   1057.1600   1057.2400   1056.0700   4214.0  1064.207621
2021-12-31 16:00:00 37983   1056.3289   1057.0100   1057.0000   1057.1000   1056.0000   319.0   1064.135904
153700 rows × 8 columns