Tornado 后台任务使用 cx_Oracle

Tornado background task using cx_Oracle

我正在运行一个 Bokeh server, using the underlying Tornado 框架。

我需要服务器在某个时候刷新一些数据。这是通过使用 Cx_Oracle.

从 Oracle 数据库中获取行来完成的

感谢 Tornado 的 PeriodicCallback,该程序每 30 秒检查一次是否应加载新数据:

server.start() 
from tornado.ioloop import PeriodicCallback
pcallback = PeriodicCallback(db_obj.reload_data_async, 10 * 1e3)
pcallback.start()
server.io_loop.start()

其中 db_obj 是一个 class 的实例,它负责与数据库相关的功能(连接、获取等)。

基本上,reload_data_async 函数是这样的:

executor = concurrent.futures.ThreadPoolExecutor(4)

# methods of the db_obj class ...

@gen.coroutine
def reload_data_async(self):
  # ... first, some code to check if the data should be reloaded ...
  # ...
  if data_should_be_reloaded:
    new_data = yield executor.submit(self.fetch_data)

def fetch_data(self):
   """ fetch new data in the DB """
   cursor = cx.Cursor(self.db_connection) 
   cursor.execute("some SQL select request that takes time (select * from ...)")

   rows = cursor.fetchall()
   # some more processing thereafter 
   # ...

基本上,这行得通。但是,当我尝试在 fetch_data 中加载数据时读取数据(通过单击以在 GUI 中显示),程序由于竞争条件而崩溃(我猜?):它在获取数据时正在访问数据同时。

我刚刚发现 tornado.concurrent.futures 不是线程安全的:

tornado.concurrent.Future is similar to concurrent.futures.Future, but not thread-safe (and therefore faster for use with single-threaded event loops).

总而言之,我认为我应该创建一个新线程来处理 CX_Oracle 操作。我可以使用 Tornado 并继续使用 PerodicCallback 函数吗?如何将我的异步操作转换为线程安全的?有什么方法可以做到这一点?

PS:我正在使用 Python 2.7

谢谢

解决了!

@Sraw 是对的:它应该不会导致崩溃。

说明fetch_data() 正在使用 cx Oracle Connection 对象 (self.db_connection),默认情况下它不是线程安全的。将 threaded 参数设置为 True 用互斥体包装共享连接,如 Cx Oracle 文档中所述:

The threaded parameter is expected to be a boolean expression which indicates whether or not Oracle should wrap accesses to connections with a mutex. Doing so in single threaded applications imposes a performance penalty of about 10-15% which is why the default is False.

所以我在我的代码中,我只是修改了以下内容,现在当用户尝试访问正在刷新的数据时它可以正常工作而不会崩溃:

# inside the connect method of the db_obj class
self.db_connection = cx.connect('connection string', threaded=True) # False by default