asyncio 中的坏锁优化

Bad lock optimization in asyncio

更新:编辑标题以关注主要问题。查看我的完整更新答案。

在下面的代码中,a()b() 是相同的。他们每个人同时从 0 计数到 9,同时每 2 个计数获取和产生一个锁。

import asyncio

lock = asyncio.Lock()

def a ():
 yield from lock.acquire()
 for i in range(10):
  print('a: ' + str(i))
  if i % 2 == 0:
   lock.release()
   yield from lock.acquire()
 lock.release()

def b ():
 yield from lock.acquire()
 for i in range(10):
  print('b: ' + str(i))
  if i % 2 == 0:
   lock.release()
   yield from lock.acquire()
 lock.release()

asyncio.get_event_loop().run_until_complete(asyncio.gather(a(), b()))

print('done')

我期望交错输出,但我却得到:

b: 0
b: 1
b: 2
b: 3
b: 4
b: 5
b: 6
b: 7
b: 8
b: 9
a: 0
a: 1
a: 2
a: 3
a: 4
a: 5
a: 6
a: 7
a: 8
a: 9
done

似乎第二个yield实际上并没有让出而是立即重新获取锁并继续。

这对我来说似乎是个错误。我对吗?还是有其他解释?

以下代码,使用额外的首字母 "noop" yield 进行了修改,按预期工作正常。这让我相信锁确实是公平的,而且可能是正确的。

import asyncio

lock = asyncio.Lock()

def a ():
 yield from lock.acquire()
 yield from asyncio.sleep(0)
 for i in range(10):
  print('a: ' + str(i))
  if i % 2 == 0:
   lock.release()
   yield from lock.acquire()
 lock.release()

def b ():
 yield from lock.acquire()
 yield from asyncio.sleep(0)
 for i in range(10):
  print('b: ' + str(i))
  if i % 2 == 0:
   lock.release()
   yield from lock.acquire()
 lock.release()

asyncio.get_event_loop().run_until_complete(asyncio.gather(a(), b()))

print('done')

输出:

a: 0
b: 0
a: 1
a: 2
b: 1
b: 2
a: 3
a: 4
b: 3
b: 4
a: 5
a: 6
b: 5
b: 6
a: 7
a: 8
b: 7
b: 8
a: 9
b: 9
done

请注意,我在开始时只执行一次 no-op yield,而不是每 2 次计算一次。然而,这样做会导致如第一段代码中预期的那样每 2 个计数交错一次。

调度程序中只有一些优化(在我看来是一个错误)在获取没有其他人正在等待的锁时并没有真正yield

如何解释第一个输出?

更新: 根据我对 github 问题的评论 (link),以下内容已过时。注释观察到你可以使用 Lock.locked() 来预测 Lock.acquire() 是否会屈服。它还观察到许多其他协程在快速情况下不会屈服,因此即使考虑修复所有协程也是徒劳的。最后,它与这个不同的问题是如何解决的有关,并建议它可以更好地解决。那是对 asyncio.nop() 方法的请求,该方法只会屈服于调度程序而不做任何其他事情。他们没有添加该方法,而是决定重载 asyncio.sleep(0) 和 "de-optimize" 它(在 lock.acquire() 的讨论的上下文中)以在参数为 0.[=21 时屈服于调度程序=]

下面的原始答案但被上面的段落取代:

asyncio.lock 的实现试图在其 first three lines 中过于智能并且在没有服务员的情况下不将控制权交还给调度程序的根本原因是:

if not self._locked and all(w.cancelled() for w in self._waiters):
    self._locked = True
    return True

但是,正如我的第一个示例所示,这甚至会阻止其他协程成为服务员。他们只是没有机会 运行 他们试图获取锁的点。

低效 解决方法是始终yield from asyncio.sleep(0) 在 获取锁之前立即

这是低效的,因为在通常情况下会有其他等待者并且获取锁也会将控制权交还给调度程序。因此,在大多数情况下,您会将控制权交还给调度程序两次,这很糟糕。

另请注意,锁的文档含糊地说:"This method blocks until the lock is unlocked, then sets it to locked and returns True."当然给人的印象是它会在获取锁之前将控制权交给调度程序。

在我看来,正确的做法是让锁实现始终让步并且不要太聪明。 或者,锁的实现应该有一个方法告诉你它是否会在获得锁时产生,这样你的代码就可以在没有获得锁的情况下手动产生。 另一种选择 是让 acquire() 调用 return 一个值,告诉您它是否实际产生。这不太可取,但仍然比现状好。

有人可能认为更好的解决方法是在 release() 时手动让步。然而,如果你看一下在完成工作后释放和重新获取的紧密循环,那么它相当于同一件事——在常见情况下,它仍然会产生两次,一次在释放时,再次在获取时,增加了效率。

目前尚不清楚您要实现的目标,但 Lock 似乎不是您需要的工具。要交错 python 代码,你可以像这样简单地做:

def foo(tag, n):
    for i in range(n):
        print("inside", tag, i)
        yield (tag, i)


print('start')

for x in zip(foo('A', 10), foo('B', 10)):
    print(x)

print('done')

不需要 asynciothreading。无论如何,没有 IO 的 asyncio 没有多大意义。

threading.Lock 用于同步程序的关键部分,否则 运行 在独立线程中。 asyncio.Lock 将允许其他协程在一个协程等待时继续执行 IO:

import asyncio
import random

lock = asyncio.Lock()


async def foo(tag):
    print(tag, "Start")
    for i in range(10):
        print(tag, '>', i)
        await asyncio.sleep(random.uniform(0.1, 1))
        print(tag, '<', i)

    async with lock:
        # only one coroutine can execute the critical section at once.
        # other coroutines can still use IO.
        print(tag, "CRITICAL START")
        await asyncio.sleep(1)
        print(tag, "STILL IN CRITICAL")
        await asyncio.sleep(1)
        print(tag, "CRITICAL END")

    for i in range(10, 20):
        print(tag, '>', i)
        await asyncio.sleep(random.uniform(0.1, 1))
        print(tag, '<', i)

    print(tag, "Done")


print('start')

loop = asyncio.get_event_loop()
tasks = asyncio.gather(foo('A'), foo('B'), foo('C'))
loop.run_until_complete(tasks)
loop.close()

print('done')

请记住关键字 yield 并不总是符合 yield 的英文含义 :-) 。

你可以看到 async with lock 会立即获取锁更有意义,而不需要等待其他协程做更多的工作:第一个到达关键部分的协程应该开始 运行宁它。 (即,在 async with lock: 之前添加 await asyncio.sleep(0) 没有任何意义。)