python 中的功能管道,例如来自 R 的 magrittr 的 %>%

在 R 中(感谢 magrittr),您现在可以通过 %>% 使用功能更强大的管道语法执行操作。这意味着不是编码这个:

> as.Date("2014-01-01")
> as.character((sqrt(12)^2)


> "2014-01-01" %>% as.Date 
> 12 %>% sqrt %>% .^2 %>% as.character

对我来说,这更具可读性,并且扩展到数据框之外的用例。 python 语言是否支持类似的东西?

Does the python language have support for something similar?

"more functional piping syntax" 这真的是更 "functional" 的语法吗?我会说它向 R 添加了 "infix" 语法。

也就是说,Python's grammar 不直接支持标准运算符之外的中缀表示法。

如果你真的需要这样的东西,你应该以 that code from Tomer Filiba 作为起点来实现你自己的中缀表示法:

Code sample and comments by Tomer Filiba ( :

from functools import partial

class Infix(object):
    def __init__(self, func):
        self.func = func
    def __or__(self, other):
        return self.func(other)
    def __ror__(self, other):
        return Infix(partial(self.func, other))
    def __call__(self, v1, v2):
        return self.func(v1, v2)

Using instances of this peculiar class, we can now use a new "syntax" for calling functions as infix operators:

>>> @Infix
... def add(x, y):
...     return x + y
>>> 5 |add| 6

一种可能的方法是使用名为 macropy 的模块。 Macropy 允许您将转换应用于您编写的代码。因此a | b可以转化为b(a)。这有很多优点和缺点。

与 Sylvain Leroux 提到的解决方案相比,主要优点是您不需要为您有兴趣使用的函数创建中缀对象——只需标记您打算使用的代码区域转型。其次,由于转换是在编译时而不是运行时应用的,因此转换后的代码在运行时不会产生任何开销——所有工作都在字节代码首次从源代码生成时完成。

主要缺点是 macropy 需要某种方式来激活它才能工作(稍后提到)。与更快的运行时相比,源代码的解析在计算上更加复杂,因此程序将花费更长的时间来启动。最后,它添加了一种句法风格,这意味着不熟悉 macropy 的程序员可能会发现您的代码更难理解。


import macropy.activate 
# Activates macropy, modules using macropy cannot be imported before this statement
# in the program.
import target
# import the module using macropy

from fpipe import macros, fpipe
from macropy.quick_lambda import macros, f
# The `from module import macros, ...` must be used for macropy to know which 
# macros it should apply to your code.
# Here two macros have been imported `fpipe`, which does what you want
# and `f` which provides a quicker way to write lambdas.

from math import sqrt

# Using the fpipe macro in a single expression.
# The code between the square braces is interpreted as - str(sqrt(12))
print fpipe[12 | sqrt | str] # prints 3.46410161514

# using a decorator
# All code within the function is examined for `x | y` constructs.
x = 1 # global variable
def sum_range_then_square():
    "expected value (1 + 2 + 3)**2 -> 36"
    y = 4 # local variable
    return range(x, y) | sum | f[_**2]
    # `f[_**2]` is macropy syntax for -- `lambda x: x**2`, which would also work here

print sum_range_then_square() # prints 36

# using a with block.
# same as a decorator, but for limited blocks.
with fpipe:
    print range(4) | sum # prints 6
    print 'a b c' | f[_.split()] # prints ['a', 'b', 'c']

最后是完成艰苦工作的模块。我将它称为 fpipe for functional pipe,因为它模拟 shell 语法,用于将输出从一个进程传递到另一个进程。

from macropy.core.macros import *
from macropy.core.quotes import macros, q, ast

macros = Macros()

def fpipe(tree, **kw):

    def pipe_search(tree, stop, **kw):
        """Search code for bitwise or operators and transform `a | b` to `b(a)`."""
        if isinstance(tree, BinOp) and isinstance(tree.op, BitOr):
            operand = tree.left
            function = tree.right
            newtree = q[ast[function](ast[operand])]
            return newtree

    return pipe_search.recurse(tree)

PyToolz [doc] 允许任意可组合的管道,只是它们不是用管道运算符语法定义的。

按照上述 link 进行快速入门。这是一个视频教程:

In [1]: from toolz import pipe

In [2]: from math import sqrt

In [3]: pipe(12, sqrt, str)
Out[3]: '3.4641016151377544'

管道是 Pandas 0.16.2 中的一项新功能。


import pandas as pd
from sklearn.datasets import load_iris

x = load_iris()
x = pd.DataFrame(, columns=x.feature_names)

def remove_units(df):
    df.columns = pd.Index(map(lambda x: x.replace(" (cm)", ""), df.columns))
    return df

def length_times_width(df):
    df['sepal length*width'] = df['sepal length'] * df['sepal width']
    df['petal length*width'] = df['petal length'] * df['petal width']


注意:Pandas 版本保留了 Python 的引用语义。这就是 length_times_width 不需要 return 值的原因;它修改 x 到位。



正如 所提示的,我们可以使用Infix 运算符来构造一个中缀pipe。让我们看看这是如何完成的。

首先,这是来自Tomer Filiba


Using instances of this peculiar class, we can now use a new "syntax" for calling functions as infix operators:

>>> @Infix
... def add(x, y):
...     return x + y
>>> 5 |add| 6

管道运算符将前面的对象作为参数传递给管道后面的对象,因此x %>% f可以转换为f(x)。因此,可以使用 Infix 定义 pipe 运算符,如下所示:

In [1]: @Infix
   ...: def pipe(x, f):
   ...:     return f(x)

In [2]: from math import sqrt

In [3]: 12 |pipe| sqrt |pipe| str
Out[3]: '3.4641016151377544'


dpylr 中的 %>% 运算符将参数压入函数中的第一个参数,因此

df %>% 
filter(x >= 2) %>%
mutate(y = 2*x)


df1 <- filter(df, x >= 2)
df2 <- mutate(df1, y = 2*x)

在 Python 中实现类似功能的最简单方法是使用 curryingtoolz 库提供了一个 curry 装饰器函数,使构造柯里化函数变得容易。

In [2]: from toolz import curry

In [3]: from datetime import datetime

In [4]: @curry
    def asDate(format, date_string):
        return datetime.strptime(date_string, format)

In [5]: "2014-01-01" |pipe| asDate("%Y-%m-%d")
Out[5]: datetime.datetime(2014, 1, 1, 0, 0)


x |pipe| f(2)


f(2, x)


请注意,toolz 包含许多预柯里化函数,包括来自 operator 模块的各种函数。

In [11]: from toolz.curried import map

In [12]: from toolz.curried.operator import add

In [13]: range(5) |pipe| map(add(2)) |pipe| list
Out[13]: [2, 3, 4, 5, 6]


> library(dplyr)
> add2 <- function(x) {x + 2}
> 0:4 %>% sapply(add2)
[1] 2 3 4 5 6


您可以通过覆盖其他 Python 运算符方法来更改中缀调用周围的符号。例如,将 __or____ror__ 切换为 __mod____rmod__ 会将 | 运算符更改为 mod 运算符。

In [5]: 12 %pipe% sqrt %pipe% str
Out[5]: '3.4641016151377544'

如果您只想将此用于个人脚本,您可能需要考虑使用 Coconut 而不是 Python。

椰子是 Python 的超集。因此,您可以使用 Coconut 的管道运算符 |>,同时完全忽略 Coconut 语言的其余部分。


def addone(x):
    x + 1

3 |> addone


# lots of auto-generated header junk

# Compiled Coconut: -----------------------------------------------------------

def addone(x):
    return x + 1


添加我的 2c。我个人使用包 fn 进行函数式编程。你的例子翻译成

from fn import F, _
from math import sqrt

(F(sqrt) >> _**2 >> str)(12)

F 是一个包装器 class,带有用于部分应用和组合的函数式语法糖。 _ 是匿名函数的 Scala 风格的构造函数(类似于 Python 的 lambda);它表示一个变量,因此您可以在一个表达式中组合多个 _ 对象以获得具有更多参数的函数(例如 _ + _ 等同于 lambda a, b: a + b)。 F(sqrt) >> _**2 >> str 生成一个 Callable 对象,可以根据需要多次使用。

我错过了 Elixir 的 |> 管道运算符,所以我创建了一个简单的函数装饰器(约 50 行代码),它将 >> Python 右移运算符重新解释为一个非常Elixir-like 在编译时使用 ast 库和 compile/exec:

from pipeop import pipes

def add3(a, b, c):
    return a + b + c

def times(a, b):
    return a * b

def calc()
    print 1 >> add3(2, 3) >> times(4)  # prints 24

它所做的只是将 a >> b(...) 重写为 b(a, ...)

另一种解决方案是使用工作流工具 dask。虽然它在语法上不如...

| do this
| then do that

...它仍然允许您的变量沿着链向下流动,并且使用 dask 可以在可能的情况下提供并行化的额外好处。

以下是我如何使用 dask 完成管链模式:

import dask

def a(foo):
    return foo + 1
def b(foo):
    return foo / 2
def c(foo,bar):
    return foo + bar

# pattern = 'name_of_behavior': (method_to_call, variables_to_pass_in, variables_can_be_task_names)
workflow = {'a_task':(a,1),

#dask.visualize(workflow) #visualization available. 


# returns 100

在使用 elixir 之后,我想使用 Python 中的管道模式。这不是完全相同的模式,但它很相似,就像我说的那样,具有并行化的额外好处;如果你告诉 dask 在你的工作流程中获得一个不依赖于其他人的任务首先 运行,他们将 运行 并行。


def dask_pipe(initial_var, functions_args):
    call the dask_pipe with an init_var, and a list of functions
    workflow, last_task = dask_pipe(initial_var, {function_1:[], function_2:[arg1, arg2]})
    workflow, last_task = dask_pipe(initial_var, [function_1, function_2])
    dask.get(workflow, last_task)
    workflow = {}
    if isinstance(functions_args, list):
        for ix, function in enumerate(functions_args):
            if ix == 0:
                workflow['task_' + str(ix)] = (function, initial_var)
                workflow['task_' + str(ix)] = (function, 'task_' + str(ix - 1))
        return workflow, 'task_' + str(ix)
    elif isinstance(functions_args, dict):
        for ix, (function, args) in enumerate(functions_args.items()):
            if ix == 0:
                workflow['task_' + str(ix)] = (function, initial_var)
                workflow['task_' + str(ix)] = (function, 'task_' + str(ix - 1), *args )
        return workflow, 'task_' + str(ix)

# piped functions
def foo(df):
    return df[['a','b']]
def bar(df, s1, s2):
    return df.columns.tolist() + [s1, s2]
def baz(df):
    return df.columns.tolist()

# setup 
import dask
import pandas as pd
df = pd.DataFrame({'a':[1,2,3],'b':[1,2,3],'c':[1,2,3]})


# wf, lt = dask_pipe(initial_var, [function_1, function_2])
# wf, lt = dask_pipe(initial_var, {function_1:[], function_2:[arg1, arg2]})


# test 1 - lists for functions only:
workflow, last_task =  dask_pipe(df, [foo, baz])
print(dask.get(workflow, last_task)) # returns ['a','b']

# test 2 - dictionary for args:
workflow, last_task = dask_pipe(df, {foo:[], bar:['string1', 'string2']})
print(dask.get(workflow, last_task)) # returns ['a','b','string1','string2']

您可以使用 sspipe 库。它公开了两个对象 ppx。类似于x %>% f(y,z),可以写x | p(f, y, z),类似于x %>% .^2,可以写x | px**2

from sspipe import p, px
from math import sqrt

12 | p(sqrt) | px ** 2 | p(str)




from dfply import *
diamonds >> group_by('cut') >> row_slice(5)
diamonds >> distinct(X.color)
diamonds >> filter_by(X.cut == 'Ideal', X.color == 'E', X.table < 55, X.price < 500)
diamonds >> mutate(x_plus_y=X.x + X.y, y_div_z=(X.y / X.z)) >> select(columns_from('x')) >> head(3)

这里有非常好的 pipe 模块 它超载了 |运算符并提供许多管道函数,如 add, first, where, tail

>>> [1, 2, 3, 4] | where(lambda x: x % 2 == 0) | add

>>> sum([1, [2, 3], 4] | traverse)


def p_sqrt(x):
    return sqrt(x)

def p_pr(x):

9 | p_sqrt | p_pr

无需第 3 方库或令人困惑的运算符技巧即可实现管道功能 - 您可以自己轻松掌握基础知识。

让我们从定义什么是管道函数开始。从本质上讲,它只是一种按逻辑顺序表达一系列函数调用的方式,而不是标准的 'inside out' 顺序。


def one(value):
  return value

def two(value):
  return 2*value

def three(value):
  return 3*value

不是很有趣,但假设 value 正在发生有趣的事情。我们想按顺序调用它们,将每个的输出传递给下一个。在原版 python 中是:

result = three(two(one(1)))


def pipe(first, *args):
  for fn in args:
    first = fn(first)
  return first


result = pipe(1, one, two, three)

对我来说,'pipe' 语法看起来非常易读 :)。我看不出它比重载运算符或类似的东西有什么可读性。事实上,我认为它更具可读性 python code

这是解决 OP 示例的简陋管道:

from math import sqrt
from datetime import datetime

def as_date(s):
  return datetime.strptime(s, '%Y-%m-%d')

def as_character(value):
  # Do whatever as.character does
  return value

pipe("2014-01-01", as_date)
pipe(12, sqrt, lambda x: x**2, as_character)

管道功能可以通过用点组合 pandas 方法来实现。下面是一个例子。


import seaborn    
iris = seaborn.load_dataset("iris")
# <class 'pandas.core.frame.DataFrame'>

用点说明 pandas 方法的组成:

(iris.query("species == 'setosa'")

如果需要,您可以向熊猫数据框添加新方法(例如 here):

pandas.DataFrame.new_method  = new_method


class FuncPipe:
  class Arg:
    def __init__(self, arg):
      self.arg = arg
    def __or__(self, func):
      return func(self.arg)

  def __ror__(self, arg):
    return self.Arg(arg)
pipe = FuncPipe()


1 |pipe| \
  (lambda x: return x+1) |pipe| \
  (lambda x: return 2*x)




首先,运行python -m pip install cool。 然后,运行 python.

from cool import F

range(10) | F(filter, lambda x: x % 2) | F(sum) == 25

您可以阅读 以获得更多用法。