zip_longest 始终用于左侧列表

zip_longest for the left list always

我知道 zip 函数(将根据最短列表压缩)和 zip_longest(将根据最长列表压缩),但我将如何根据第一个列表,不管它是否最长?

例如:

Input:  ['a', 'b', 'c'], [1, 2]
Output: [('a', 1), ('b', 2), ('c', None)]

还有:

Input:  ['a', 'b'], [1, 2, 3]
Output: [('a', 1), ('b', 2)]

这两种功能是否存在于一个函数中?

Return 只有 len(a) 个元素来自 zip_longest:

from itertools import zip_longest

def zip_first(a, b):
    z = zip_longest(a, b)
    for i, r in zip(range(len(a)), z):
        yield r

有点难看,但我会选择这个。这个想法是将第二个列表缩短到第一个列表的大小(如果它更长)。然后我们使用 zip_longest 保证结果至少与 zip.

的第一个参数一样长
import itertools

input1 = [['a', 'b', 'c'], [1, 2]]
input2 = [['a', 'b'], [1, 2, 3]]

zip1 = itertools.zip_longest(input1[0], input1[1][:len(input1[0])])
zip2 = itertools.zip_longest(input2[0], input2[1][:len(input2[0])])

print(list(zip1))
print(list(zip2))

输出:

[('a', 1), ('b', 2), ('c', None)]
[('a', 1), ('b', 2)]

要压缩多个列表,可以使用:

import itertools

def zip_first(lists):
    equal_lists = [l[:len(lists[0])] for l in lists]
    return itertools.zip_longest(*equal_lists)

我不知道有现成的,但您可以定义自己的。

使用 object() 作为标记确保它始终测试为唯一,并且永远不会与 None 或任何其他填充值混淆。因此,即使您的任何一个 iterables 包含 None.

,这也应该正常运行

zip_longest一样,它接受任意数量的迭代器(不一定是两个),你可以指定fillvalue.

from itertools import zip_longest

def zip_left(*iterables, fillvalue=None):
    SENTINEL = object()
    
    for first, *others in zip_longest(*iterables, fillvalue=SENTINEL):
        if first is SENTINEL:
            return
        others = [i if i is not SENTINEL else fillvalue for i in others]
        yield (first, *others)


print(list(zip_left(['a', 'b', 'c'], [1, 2])))
print(list(zip_left(['a', 'b'], [1, 2, 3])))

输出:

[('a', 1), ('b', 2), ('c', None)]
[('a', 1), ('b', 2)]

对于通用迭代器(或列表),您可以使用它。我们产生对,直到我们在 a 上命中 StopIteration。如果我们首先在 b 上点击 StopIteration,我们使用 None 作为第二个值。

def zip_first(a, b):
    ai, bi = iter(a), iter(b)
    while True:
        try:
            aa = next(ai)
        except StopIteration:
            return           
        try:
            bb = next(bi)
        except StopIteration:
            bb = None
        yield aa, bb

您可以重新调整 itertools.zip_longest 文档中显示的“大致等效”python 代码的用途,以制作根据第一个参数的长度压缩的通用版本:

from itertools import repeat

def zip_by_first(*args, fillvalue=None):
    # zip_by_first('ABCD', 'xy', fillvalue='-') --> Ax By C- D-
    # zip_by_first('ABC', 'xyzw', fillvalue='-') --> Ax By Cz
    if not args:
        return
    iterators = [iter(it) for it in args]
    while True:
        values = []
        for i, it in enumerate(iterators):
            try:
                value = next(it)
            except StopIteration:
                if i == 0:
                    return
                iterators[i] = repeat(fillvalue)
                value = fillvalue
            values.append(value)
        yield tuple(values)

您或许可以进行一些小改进,例如缓存 repeat(fillvalue) 左右。此实现的问题在于它是用 Python 编写的,而 itertools 中的大部分使用更快的 C 实现。您可以通过与 .

进行比较来了解其效果

如果输入是列表(或其他可以与 len 一起使用的集合),您可以使用 zip_longest 并将结果延迟限制为第一个列表的长度1,通过使用 islice:

from itertools import islice, zip_longest

def zip_first(a, b):
    return islice(zip_longest(a, b), len(a))

1这个基本思想取自

解决方案

将重复的填充值链接到第一个以外的可迭代对象后面:

from itertools import chain, repeat

def zip_first(first, *rest, fillvalue=None):
    return zip(first, *map(chain, rest, repeat(repeat(fillvalue))))

或者使用 zip_longest 和 trim 它与 compresszip 技巧:

def zip_first(first, *rest, fillvalue=None):
    a, b = tee(first)
    return compress(zip_longest(b, *rest, fillvalue=fillvalue), zip(a))

就像 zipzip_longest 一样,它们接受任何数量(嗯,至少一个)任何类型的可迭代对象(包括无限的)和 return 一个迭代器(转换需要时列出)。

基准测试结果

与其他同样通用的解决方案的基准(所有代码都在答案的末尾):

10 iterables of 10,000 to 90,000 elements, first has 50,000:
────────────────────────────────────────────────────────────
 2.2 ms   2.2 ms   2.3 ms  limit_cheat
 2.6 ms   2.6 ms   2.6 ms  Kelly_Bundy_chain
 3.3 ms   3.3 ms   3.3 ms  Kelly_Bundy_compress
50.2 ms  50.6 ms  50.7 ms  CrazyChucky
54.7 ms  55.0 ms  55.0 ms  Sven_Marnach
74.8 ms  74.9 ms  75.0 ms  Mad_Physicist
 5.4 ms   5.4 ms   5.4 ms  Kelly_Bundy_3
 5.9 ms   6.0 ms   6.0 ms  Kelly_Bundy_4
 4.6 ms   4.7 ms   4.7 ms  Kelly_Bundy_5

10,000 iterables of 0 to 100 elements, first has 50:
────────────────────────────────────────────────────
 4.6 ms   4.7 ms   4.8 ms  limit_cheat
 4.8 ms   4.8 ms   4.8 ms  Kelly_Bundy_compress
 8.4 ms   8.4 ms   8.4 ms  Kelly_Bundy_chain
27.1 ms  27.3 ms  27.5 ms  CrazyChucky
38.3 ms  38.5 ms  38.7 ms  Sven_Marnach
73.0 ms  73.0 ms  73.1 ms  Mad_Physicist
 4.9 ms   4.9 ms   5.0 ms  Kelly_Bundy_3
 4.9 ms   4.9 ms   5.0 ms  Kelly_Bundy_4
 5.0 ms   5.0 ms   5.0 ms  Kelly_Bundy_5

第一个是知道长度的作弊程序,用于显示我们可以达到多快的可能限制。

解释

以上两种解决方案的一点解释:

第一个解决方案,如果与例如三个可迭代对象一起使用,则等效于:

def zip_first(first, second, third, fillvalue=None):
    filler = repeat(fillvalue)
    return zip(first,
               chain(second, filler),
               chain(third, filler))

第二种解决方案基本上让 zip_longest 完成工作。唯一的问题是当第一个迭代完成时它不会停止。因此,我复制了第一个可迭代对象(使用 tee),然后将一个用于它的元素,另一个用于它的长度。 zip(a) 将每个元素包装在一个元组中,non-empty 元组是 true. So compress 给我所有由 zip_longest 生成的元组,与第一个可迭代对象中的元素一样多.

基准代码(Try it online!

def limit_cheat(*iterables, fillvalue=None):
    return islice(zip_longest(*iterables, fillvalue=fillvalue), cheat_length)

def Kelly_Bundy_chain(first, *rest, fillvalue=None):
    return zip(first, *map(chain, rest, repeat(repeat(fillvalue))))

def Kelly_Bundy_compress(first, *rest, fillvalue=None):
    a, b = tee(first)
    return compress(zip_longest(b, *rest, fillvalue=fillvalue), zip(a))

def CrazyChucky(*iterables, fillvalue=None):
    SENTINEL = object()
    
    for first, *others in zip_longest(*iterables, fillvalue=SENTINEL):
        if first is SENTINEL:
            return
        others = [i if i is not SENTINEL else fillvalue for i in others]
        yield (first, *others)

def Sven_Marnach(first, *rest, fillvalue=None):
    rest = [iter(r) for r in rest]
    for x in first:
        yield x, *(next(r, fillvalue) for r in rest)

def Mad_Physicist(*args, fillvalue=None):
    # zip_by_first('ABCD', 'xy', fillvalue='-') --> Ax By C- D-
    # zip_by_first('ABC', 'xyzw', fillvalue='-') --> Ax By Cz
    if not args:
        return
    iterators = [iter(it) for it in args]
    while True:
        values = []
        for i, it in enumerate(iterators):
            try:
                value = next(it)
            except StopIteration:
                if i == 0:
                    return
                iterators[i] = repeat(fillvalue)
                value = fillvalue
            values.append(value)
        yield tuple(values)

def Kelly_Bundy_3(first, *rest, fillvalue=None):
    a, b = tee(first)
    return map(itemgetter(1), zip(a, zip_longest(b, *rest, fillvalue=fillvalue)))

def Kelly_Bundy_4(first, *rest, fillvalue=None):
    sentinel = object()
    for z in zip_longest(chain(first, [sentinel]), *rest, fillvalue=fillvalue):
        if z[0] is sentinel:
            break
        yield z

def Kelly_Bundy_5(first, *rest, fillvalue=None):
    stopped = False
    def stop():
        nonlocal stopped
        stopped = True
        return
        yield
    for z in zip_longest(chain(first, stop()), *rest, fillvalue=fillvalue):
        if stopped:
            break
        yield z


import timeit
from itertools import chain, repeat, zip_longest, islice, tee, compress
from operator import itemgetter
from collections import deque

funcs = [
    limit_cheat,
    Kelly_Bundy_chain,
    Kelly_Bundy_compress,
    CrazyChucky,
    Sven_Marnach,
    Mad_Physicist,
    Kelly_Bundy_3,
    Kelly_Bundy_4,
    Kelly_Bundy_5,
]

def test(args_creator):

    # Correctness
    expect = list(funcs[0](*args_creator()))
    for func in funcs:
        result = list(func(*args_creator()))
        print(result == expect, func.__name__)
    
    # Speed
    tss = [[] for _ in funcs]
    for _ in range(5):
        print()
        print(args_creator.__name__)
        for func, ts in zip(funcs, tss):
            t = min(timeit.repeat(lambda: deque(func(*args_creator()), 0), number=1))
            ts.append(t)
            print(*('%4.1f ms ' % (t * 1e3) for t in sorted(ts)[:3]), func.__name__)

def args_few_but_long_iterables():
    global cheat_length
    cheat_length = 50_000
    first = repeat(0, 50_000)
    rest = [repeat(i, 10_000 * i) for i in range(1, 10)]
    return first, *rest

def args_many_but_short_iterables():
    global cheat_length
    cheat_length = 50
    first = repeat(0, 50)
    rest = [repeat(i, i % 101) for i in range(1, 10_000)]
    return first, *rest

test(args_few_but_long_iterables)
funcs[1:3] = funcs[1:3][::-1]
test(args_many_but_short_iterables)

我知道但是

first = ['a', 'b', 'c']
last = [1, 2, 3, 4]
if len(first) < len(last):
    b = list(zip(first, last))
else:
    b = list(zip_longest(first, last))
print(b)

使第二个无限大,然后使用普通 zip:

from itertools import chain, repeat

a = ['a', 'b', 'c']
b = [1, 2]

b = chain(b, repeat(None))

print(*zip(a, b))

这是另一种做法,如果目标是可读的、易于理解的代码:

def zip_first(first, *rest, fillvalue=None):
    rest = [iter(r) for r in rest]
    for x in first:
        yield x, *(next(r, fillvalue) for r in rest)

这使用 next() 的 two-argument 形式到 return 已耗尽的所有迭代的填充值。

对于两个可迭代对象,这可以简化为

def zip_first(first, second, fillvalue=None):
    second = iter(second)
    for x in first:
        yield x, next(second, fillvalue)