"Piping" 使用 Python 中缀语法从一个函数输出到另一个函数

"Piping" output from one function to another using Python infix syntax

我正在尝试使用 Python/Pandas 从 R 中大致复制 dplyr 包(作为学习练习)。我坚持的是 "piping" 功能。

在 R/dplyr 中,这是使用管道运算符 %>% 完成的,其中 x %>% f(y) 等同于 f(x, y)。如果可能的话,我想使用中缀语法复制它(参见 here)。

为了说明,请考虑以下两个函数。

import pandas as pd

def select(df, *args):
    cols = [x for x in args]
    df = df[cols]
    return df

def rename(df, **kwargs):
    for name, value in kwargs.items():
        df = df.rename(columns={'%s' % name: '%s' % value})
    return df

第一个函数采用数据框,returns 仅采用给定的列。第二个采用数据框,并重命名给定的列。例如:

d = {'one' : [1., 2., 3., 4., 4.],
     'two' : [4., 3., 2., 1., 3.]}

df = pd.DataFrame(d)

# Keep only the 'one' column.
df = select(df, 'one')

# Rename the 'one' column to 'new_one'.
df = rename(df, one = 'new_one')

要使用 pipe/infix 语法实现相同的效果,代码为:

df = df | select('one') \
        | rename(one = 'new_one')

因此 | 左侧的输出作为第一个参数传递给右侧的函数。每当我看到这样的事情(例如 here)时,它都涉及 lambda 函数。是否可以以相同的方式在函数之间传输 Pandas' 数据帧?

我知道 Pandas 有 .pipe 方法,但对我来说重要的是我提供的示例的语法。任何帮助,将不胜感激。

很难使用按位 or 运算符实现它,因为 pandas.DataFrame 实现了它。如果你不介意用 >> 替换 |,你可以试试这个:

import pandas as pd

def select(df, *args):
    cols = [x for x in args]
    return df[cols]


def rename(df, **kwargs):
    for name, value in kwargs.items():
        df = df.rename(columns={'%s' % name: '%s' % value})
    return df


class SinkInto(object):
    def __init__(self, function, *args, **kwargs):
        self.args = args
        self.kwargs = kwargs
        self.function = function

    def __rrshift__(self, other):
        return self.function(other, *self.args, **self.kwargs)

    def __repr__(self):
        return "<SinkInto {} args={} kwargs={}>".format(
            self.function, 
            self.args, 
            self.kwargs
        )

df = pd.DataFrame({'one' : [1., 2., 3., 4., 4.],
                   'two' : [4., 3., 2., 1., 3.]})

那么你可以这样做:

>>> df
   one  two
0    1    4
1    2    3
2    3    2
3    4    1
4    4    3

>>> df = df >> SinkInto(select, 'one') \
            >> SinkInto(rename, one='new_one')
>>> df
   new_one
0        1
1        2
2        3
3        4
4        4

在Python3中你可以滥用unicode:

>>> print('\u01c1')
ǁ
>>> ǁ = SinkInto
>>> df >> ǁ(select, 'one') >> ǁ(rename, one='new_one')
   new_one
0        1
1        2
2        3
3        4
4        4

[更新]

Thanks for your response. Would it be possible to make a separate class (like SinkInto) for each function to avoid having to pass the functions as an argument?

装饰师怎么样?

def pipe(original):
    class PipeInto(object):
        data = {'function': original}

        def __init__(self, *args, **kwargs):
            self.data['args'] = args
            self.data['kwargs'] = kwargs

        def __rrshift__(self, other):
            return self.data['function'](
                other, 
                *self.data['args'], 
                **self.data['kwargs']
            )

    return PipeInto


@pipe
def select(df, *args):
    cols = [x for x in args]
    return df[cols]


@pipe
def rename(df, **kwargs):
    for name, value in kwargs.items():
        df = df.rename(columns={'%s' % name: '%s' % value})
    return df

现在您可以修饰任何以 DataFrame 作为第一个参数的函数:

>>> df >> select('one') >> rename(one='first')
   first
0      1
1      2
2      3
3      4
4      4

Python太棒了!

我知道像 Ruby 这样的语言是 "so expressive",它鼓励人们将每个程序都写成新的 DSL,但这在 Python 中有点不受欢迎。许多 Python 主义者认为出于不同目的而重载运算符是一种有罪的亵渎。

[更新]

用户 OHLÁLÁ 没有留下深刻印象:

The problem with this solution is when you are trying to call the function instead of piping. – OHLÁLÁ

你可以实现dunder-call方法:

def __call__(self, df):
    return df >> self

然后:

>>> select('one')(df)
   one
0  1.0
1  2.0
2  3.0
3  4.0
4  4.0

看来要取悦 OHLÁLÁ 并不容易:

In that case you need to call the object explicitly:
select('one')(df) Is there a way to avoid that? – OHLÁLÁ

好吧,我可以想到一个解决方案,但有一个警告:您的原始函数不得采用第二个位置参数,即 pandas 数据帧(关键字参数可以)。让我们在 docorator 中向我们的 PipeInto class 添加一个 __new__ 方法来测试第一个参数是否是数据帧,如果是,那么我们只需使用参数调用原始函数:

def __new__(cls, *args, **kwargs):
    if args and isinstance(args[0], pd.DataFrame):
        return cls.data['function'](*args, **kwargs)
    return super().__new__(cls)

它似乎有效,但可能存在一些我无法发现的缺点。

>>> select(df, 'one')
   one
0  1.0
1  2.0
2  3.0
3  4.0
4  4.0

>>> df >> select('one')
   one
0  1.0
1  2.0
2  3.0
3  4.0
4  4.0

虽然我不禁要提到使用 dplyr in Python might the closest thing to having in dplyr in Python (it has the rshift operator, but as a gimmick), I'd like to also point out that the pipe operator might only be necessary in R because of its use of generic functions rather than methods as object attributes. Method chaining 可以让您在本质上相同而无需覆盖运算符:

dataf = (DataFrame(mtcars).
         filter('gear>=3').
         mutate(powertoweight='hp*36/wt').
         group_by('gear').
         summarize(mean_ptw='mean(powertoweight)'))

请注意,将链条包裹在一对括号之间可以让您将其分成多行,而无需在每行上添加尾随 \

我找不到执行此操作的内置方法,因此我创建了一个使用 __call__ 运算符的 class,因为它支持 *args/**kwargs:

class Pipe:
    def __init__(self, value):
        """
        Creates a new pipe with a given value.
        """
        self.value = value
    def __call__(self, func, *args, **kwargs):
        """
        Creates a new pipe with the value returned from `func` called with
        `args` and `kwargs` and it's easy to save your intermedi.
        """
        value = func(self.value, *args, **kwargs)
        return Pipe(value)

语法需要一些时间来适应,但它允许管道。

def get(dictionary, key):
    assert isinstance(dictionary, dict)
    assert isinstance(key, str)
    return dictionary.get(key)

def keys(dictionary):
    assert isinstance(dictionary, dict)
    return dictionary.keys()

def filter_by(iterable, check):
    assert hasattr(iterable, '__iter__')
    assert callable(check)
    return [item for item in iterable if check(item)]

def update(dictionary, **kwargs):
    assert isinstance(dictionary, dict)
    dictionary.update(kwargs)
    return dictionary


x = Pipe({'a': 3, 'b': 4})(update, a=5, c=7, d=8, e=1)
y = (x
    (keys)
    (filter_by, lambda key: key in ('a', 'c', 'e', 'g'))
    (set)
    ).value
z = x(lambda dictionary: dictionary['a']).value

assert x.value == {'a': 5, 'b': 4, 'c': 7, 'd': 8, 'e': 1}
assert y == {'a', 'c', 'e'}
assert z == 5

您可以使用 sspipe 库,并使用以下语法:

from sspipe import p
df = df | p(select, 'one') \
        | p(rename, one = 'new_one')

我强烈反对这样做或这里建议的任何答案,而只是在标准 python 代码中实现 pipe 函数,没有运算符欺骗、装饰器或其他东西:

def pipe(first, *args):
  for fn in args:
    first = fn(first)
  return first

在这里查看我的回答以了解更多背景信息:

重载运算符,涉及外部库以及不利于降低代码可读性、可维护性、可测试性和 pythonic 的东西。 如果我想在 python 中做某种管道,我不想做超过 pipe(input, fn1, fn2, fn3) 的事情。那是我能想到的最具可读性和最强大的解决方案。 如果我们公司中的某个人对生产 只是 承诺操作员超载或新的依赖关系来做一个管道,它会立即恢复并且他们将被判处在本周余下的时间进行质量检查 :D 如果你真的真的真的必须为管道使用某种运算符,那么也许你有更大的问题并且 Python 不是适合你的用例的语言......

一个老问题,但我仍然感兴趣(来自 R)。因此,尽管纯粹主义者反对,但这里的矮个子灵感来自 http://tomerfiliba.com/blog/Infix-Operators/

class FuncPipe:
    class Arg:
        def __init__(self, arg):
            self.arg = arg
        def __or__(self, func):
            return func(self.arg)

    def __ror__(self, arg):
        return self.Arg(arg)
pipe = FuncPipe()

然后

1 |pipe| \
  (lambda x: return x+1) |pipe| \
  (lambda x: return 2*x)

returns

4 

我一直在 python 中从 R 移植数据包(dplyr、tidyr、tibble 等):

https://github.com/pwwang/datar

如果您熟悉 R 中的那些包,并想在 python 中应用它,那么这里适合您:

from datar.all import *

d = {'one' : [1., 2., 3., 4., 4.],
     'two' : [4., 3., 2., 1., 3.]}
df = tibble(one=d['one'], two=d['two'])

df = df >> select(f.one) >> rename(new_one=f.one)
print(df)

输出:

   new_one
0      1.0
1      2.0
2      3.0
3      4.0
4      4.0