Cassandra 多处理无法 pickle _thread.lock 个对象
Cassandra multiprocessing can't pickle _thread.lock objects
我尝试使用 Cassandra
和 multiprocessing
根据
中的示例同时插入行(虚拟数据)
这是我的代码
class QueryManager(object):
concurrency = 100 # chosen to match the default in execute_concurrent_with_args
def __init__(self, session, process_count=None):
self.pool = Pool(processes=process_count, initializer=self._setup, initargs=(session,))
@classmethod
def _setup(cls, session):
cls.session = session
cls.prepared = cls.session.prepare("""
INSERT INTO test_table (key1, key2, key3, key4, key5) VALUES (?, ?, ?, ?, ?)
""")
def close_pool(self):
self.pool.close()
self.pool.join()
def get_results(self, params):
results = self.pool.map(_multiprocess_write, (params[n:n+self.concurrency] for n in range(0, len(params), self.concurrency)))
return list(itertools.chain(*results))
@classmethod
def _results_from_concurrent(cls, params):
return [results[1] for results in execute_concurrent_with_args(cls.session, cls.prepared, params)]
def _multiprocess_write(params):
return QueryManager._results_from_concurrent(params)
if __name__ == '__main__':
processes = 2
# connect cluster
cluster = Cluster(contact_points=['127.0.0.1'], port=9042)
session = cluster.connect()
# database name is a concatenation of client_id and system_id
keyspace_name = 'unit_test_0'
# drop keyspace if it already exists in a cluster
try:
session.execute("DROP KEYSPACE IF EXISTS " + keyspace_name)
except:
pass
create_keyspace_query = "CREATE KEYSPACE " + keyspace_name \
+ " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'};"
session.execute(create_keyspace_query)
# use a session's keyspace
session.set_keyspace(keyspace_name)
# drop table if it already exists in the keyspace
try:
session.execute("DROP TABLE IF EXISTS " + "test_table")
except:
pass
# create a table for invoices in the keyspace
create_test_table = "CREATE TABLE test_table("
keys = "key1 text,\n" \
"key2 text,\n" \
"key3 text,\n" \
"key4 text,\n" \
"key5 text,\n"
create_invoice_table_query += keys
create_invoice_table_query += "PRIMARY KEY (key1))"
session.execute(create_test_table)
qm = QueryManager(session, processes)
params = list()
for row in range(100000):
key = 'test' + str(row)
params.append([key, 'test', 'test', 'test', 'test'])
start = time.time()
rows = qm.get_results(params)
delta = time.time() - start
log.info(fm('Cassandra inserts 100k dummy rows for ', delta, ' secs'))
当我执行代码时,出现以下错误
TypeError: can't pickle _thread.lock objects
指向
self.pool = Pool(processes=process_count, initializer=self._setup, initargs=(session,))
这表明您正在尝试序列化 IPC 边界上的锁。我认为这可能是因为您提供了一个 Session 对象作为 worker 初始化函数的参数。使 init 函数在每个工作进程中创建一个新会话(请参阅您引用的 blog post 中的 "Session per Process" 部分)。
我知道这已经有了答案,但我想强调 cassandra-driver 包中的一些变化,这些变化使得这段代码在 python 3.7 和 3.18.0 中仍然无法正常工作cassandra-driver 包。
如果您查看链接的博客 post。 __init__
函数没有传入 session
,但它传递了一个 cluster
对象。甚至 cluster
也不能再作为 initarg 发送,因为它包含一个锁。您需要在 def _setup(cls):
类方法中创建它。
其次,execute_concurrent_with_args
returns 现在有一个 ResultSet,也不能序列化。 cassandra-driver 包的旧版本只返回一个对象列表。
要修复上述代码,请更改以下 2 部分:
首先,__init__
和_setup
方法
def __init__(self, process_count=None):
self.pool = Pool(processes=process_count, initializer=self._setup)
@classmethod
def _setup(cls):
cluster = Cluster()
cls.session = cluster.connect()
cls.prepared = cls.session.prepare("""
INSERT INTO test_table (key1, key2, key3, key4, key5) VALUES (?, ?, ?, ?, ?)
""")
二、_results_from_concurrent
方法
@classmethod
def _results_from_concurrent(cls, params):
return [list(results[1]) for results in execute_concurrent_with_args(cls.session, cls.prepared, params)]
最后,如果您对与 python3 和 cassandra-driver 3.18.0 一起使用的原始 DataStax 博客 post 中 multiprocess_execute.py 的要点感兴趣,你可以在这里找到:https://gist.github.com/jWolo/6127b2e57c7e24740afd7a4254cc00a3
我尝试使用 Cassandra
和 multiprocessing
根据
这是我的代码
class QueryManager(object):
concurrency = 100 # chosen to match the default in execute_concurrent_with_args
def __init__(self, session, process_count=None):
self.pool = Pool(processes=process_count, initializer=self._setup, initargs=(session,))
@classmethod
def _setup(cls, session):
cls.session = session
cls.prepared = cls.session.prepare("""
INSERT INTO test_table (key1, key2, key3, key4, key5) VALUES (?, ?, ?, ?, ?)
""")
def close_pool(self):
self.pool.close()
self.pool.join()
def get_results(self, params):
results = self.pool.map(_multiprocess_write, (params[n:n+self.concurrency] for n in range(0, len(params), self.concurrency)))
return list(itertools.chain(*results))
@classmethod
def _results_from_concurrent(cls, params):
return [results[1] for results in execute_concurrent_with_args(cls.session, cls.prepared, params)]
def _multiprocess_write(params):
return QueryManager._results_from_concurrent(params)
if __name__ == '__main__':
processes = 2
# connect cluster
cluster = Cluster(contact_points=['127.0.0.1'], port=9042)
session = cluster.connect()
# database name is a concatenation of client_id and system_id
keyspace_name = 'unit_test_0'
# drop keyspace if it already exists in a cluster
try:
session.execute("DROP KEYSPACE IF EXISTS " + keyspace_name)
except:
pass
create_keyspace_query = "CREATE KEYSPACE " + keyspace_name \
+ " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'};"
session.execute(create_keyspace_query)
# use a session's keyspace
session.set_keyspace(keyspace_name)
# drop table if it already exists in the keyspace
try:
session.execute("DROP TABLE IF EXISTS " + "test_table")
except:
pass
# create a table for invoices in the keyspace
create_test_table = "CREATE TABLE test_table("
keys = "key1 text,\n" \
"key2 text,\n" \
"key3 text,\n" \
"key4 text,\n" \
"key5 text,\n"
create_invoice_table_query += keys
create_invoice_table_query += "PRIMARY KEY (key1))"
session.execute(create_test_table)
qm = QueryManager(session, processes)
params = list()
for row in range(100000):
key = 'test' + str(row)
params.append([key, 'test', 'test', 'test', 'test'])
start = time.time()
rows = qm.get_results(params)
delta = time.time() - start
log.info(fm('Cassandra inserts 100k dummy rows for ', delta, ' secs'))
当我执行代码时,出现以下错误
TypeError: can't pickle _thread.lock objects
指向
self.pool = Pool(processes=process_count, initializer=self._setup, initargs=(session,))
这表明您正在尝试序列化 IPC 边界上的锁。我认为这可能是因为您提供了一个 Session 对象作为 worker 初始化函数的参数。使 init 函数在每个工作进程中创建一个新会话(请参阅您引用的 blog post 中的 "Session per Process" 部分)。
我知道这已经有了答案,但我想强调 cassandra-driver 包中的一些变化,这些变化使得这段代码在 python 3.7 和 3.18.0 中仍然无法正常工作cassandra-driver 包。
如果您查看链接的博客 post。 __init__
函数没有传入 session
,但它传递了一个 cluster
对象。甚至 cluster
也不能再作为 initarg 发送,因为它包含一个锁。您需要在 def _setup(cls):
类方法中创建它。
其次,execute_concurrent_with_args
returns 现在有一个 ResultSet,也不能序列化。 cassandra-driver 包的旧版本只返回一个对象列表。
要修复上述代码,请更改以下 2 部分:
首先,__init__
和_setup
方法
def __init__(self, process_count=None):
self.pool = Pool(processes=process_count, initializer=self._setup)
@classmethod
def _setup(cls):
cluster = Cluster()
cls.session = cluster.connect()
cls.prepared = cls.session.prepare("""
INSERT INTO test_table (key1, key2, key3, key4, key5) VALUES (?, ?, ?, ?, ?)
""")
二、_results_from_concurrent
方法
@classmethod
def _results_from_concurrent(cls, params):
return [list(results[1]) for results in execute_concurrent_with_args(cls.session, cls.prepared, params)]
最后,如果您对与 python3 和 cassandra-driver 3.18.0 一起使用的原始 DataStax 博客 post 中 multiprocess_execute.py 的要点感兴趣,你可以在这里找到:https://gist.github.com/jWolo/6127b2e57c7e24740afd7a4254cc00a3