如何在多线程应用程序中使用aiopg池?

How to use aiopg pool in multi-threaded application?

我有一个 python 3.4.3,postgreSQL 9.4,aiopg-0.7.0。多线程应用程序的示例取自该站点。如何使用泳池? select.

操作时线程挂起
import time
import asyncio
import aiopg
import functools
from threading import Thread, current_thread, Event
from concurrent.futures import Future

class B(Thread):
   def __init__(self, start_event):
       Thread.__init__(self)
       self.loop = None
       self.tid = None
       self.event = start_event

   def run(self):
       self.loop = asyncio.new_event_loop()
       asyncio.set_event_loop(self.loop)
       self.tid = current_thread()
       self.loop.call_soon(self.event.set)
       self.loop.run_forever()

   def stop(self):
       self.loop.call_soon_threadsafe(self.loop.stop)

   def add_task(self, coro):
       """this method should return a task object, that I
         can cancel, not a handle"""
      def _async_add(func, fut):
          try:
              ret = func()
              fut.set_result(ret)
          except Exception as e:
              fut.set_exception(e)

       f = functools.partial(asyncio.async, coro, loop=self.loop)
       if current_thread() == self.tid:
           return f() # We can call directly if we're not going between threads.
       else:
           # We're in a non-event loop thread so we use a Future
           # to get the task from the event loop thread once
           # it's ready.
           fut = Future()
           self.loop.call_soon_threadsafe(_async_add, f, fut)
           return fut.result()

   def cancel_task(self, task):
       self.loop.call_soon_threadsafe(task.cancel)


@asyncio.coroutine
def test(pool, name_task):
    while True:
        print(name_task, 'running')
        with (yield from pool.cursor()) as cur:
            print(name_task, " select. ")
            yield from cur.execute("SELECT count(*) FROM test")
            count = yield from cur.fetchone()
            print(name_task, ' Result: ', count)
        yield from asyncio.sleep(3)

@asyncio.coroutine
def connect_db():
    dsn = 'dbname=%s user=%s password=%s host=%s' % ('testdb', 'user', 'passw', '127.0.0.1')
    pool = yield from aiopg.create_pool(dsn)
    print('create pool type =', type(pool))
    # future.set_result(pool)
    return (pool)

event = Event()
b = B(event)
b.start()
event.wait() # Let the loop's thread signal us, rather than sleeping
loop_db = asyncio.get_event_loop()
pool = loop_db.run_until_complete(connect_db())
time.sleep(2)
t = b.add_task(test(pool, 'Task1'))  # This is a real task
t = b.add_task(test(pool, 'Task2'))

while True:
    time.sleep(10)

b.stop()

不是return导致'yield from cur.execute("SELECT count(*) FROM test")'

长话短说:您不能共享来自不同事件循环的 aiopg 池对象。

每个aiopg.Pool都耦合到事件循环。如果您未明确指定 loop 参数,则它取自 asyncio.get_event_loop() 调用。

所以在你的例子中,你有一个池耦合到主线程的事件循环。

当您从单独的线程执行数据库查询时,您试图通过执行线程的循环而不是主循环来完成它。没用。