Python3: cassandra.cluster.NoHostAvailable: ("Unable to connect to any servers using keyspace 'test'", ['127.0.0.1']) 当使用 execute_async 未来时
Python3: cassandra.cluster.NoHostAvailable: ("Unable to connect to any servers using keyspace 'test'", ['127.0.0.1']) when using execute_async future
我正在尝试从特定的 table 从 Cassandra 中获取数据,并在进行一些更改后尝试将其插入到 Cassandra 中的另一个 table 中。 table 都位于键空间“test”中。当我尝试从第一个 table 获取数据时,一切正常并且能够获取数据。但是,在处理第一个查询输出的未来处理程序中,我试图将数据插入到同一 Cassandra 实例下的另一个 table 中,但它失败了。我从应用程序中收到一条错误消息,指出 "cassandra.cluster.NoHostAvailable: ("Unable to connect to any servers using keyspace 'test'", ['127.0.0.1'])" 。我不确定我哪里错了
import threading
from threading import Event
from cassandra.query import SimpleStatement
from cassandra.cluster import Cluster
hosts=['127.0.0.1']
keyspace="test"
thread_local = threading.local()
cluster_ = Cluster(hosts)
def get_session():
if hasattr(thread_local, "cassandra_session"):
print("got session from threadlocal")
return thread_local.cassandra_session
print(" Connecting to Cassandra Host " + str(hosts))
session_ = cluster_.connect(keyspace)
print(" Connecting and creating session to Cassandra KeySpace " + keyspace)
thread_local.cassandra_session = session_
return session_
class PagedResultHandler(object):
def __init__(self, future):
self.error = None
self.finished_event = Event()
self.future = future
self.future.add_callbacks(
callback=self.handle_page,
errback=self.handle_error)
def handle_page(self, rows):
for row in rows:
process_row(row)
if self.future.has_more_pages:
self.future.start_fetching_next_page()
else:
self.finished_event.set()
def handle_error(self, exc):
self.error = exc
self.finished_event.set()
def process_row(row):
print(row)
session_ = get_session()
stmt = session_.prepare(
"INSERT INTO test.data(customer,snr,rttt, event_time) VALUES (?,?,?,?)")
results = session_.execute(stmt,
[row.customer, row.snr, row.rttt,row.created_time])
print("Done")
session = get_session()
query = "select * from test.data_log"
statement = SimpleStatement(query, fetch_size=1000)
future = session.execute_async(statement)
handler = PagedResultHandler(future)
handler.finished_event.wait()
if handler.error:
raise handler.error
cluster_.shutdown()
但是,当我尝试执行 python 文件时,应用程序抛出错误“cassandra.cluster.NoHostAvailable:(“无法使用密钥空间 'test' 连接到任何服务器”,[ '127.0.0.1'])”来自“process_row”方法的 getSession() 调用。显然,对 Cassandra 的第一次调用成功了,没有任何问题。没有连接问题,Cassandra 实例在本地 运行 正常。我可以使用 cqlsh 查询数据。如果我在 Future 处理程序之外调用 process_row 方法,一切正常,我不确定需要做什么才能从 Future Handler 中实现它。
Connecting to Cassandra Host ['127.0.0.1']
Connecting and creating session to Cassandra KeySpace test
Row(customer='abcd', snr=100, rttt=121, created_time=datetime.datetime(2020, 8, 8, 2, 26, 51))
Connecting to Cassandra Host ['127.0.0.1']
Traceback (most recent call last):
File "test/check.py", , in <module>
raise handler.error
File "cassandra/cluster.py", line 4579, in cassandra.cluster.ResponseFuture._set_result
File "cassandra/cluster.py", line 4777, in cassandra.cluster.ResponseFuture._set_final_result
File "test/check.py"", in handle_page
process_row(row)
File "test/check.py"", in process_row
session_ = get_session()
File "/test/check.py"", in get_session
session_ = cluster_.connect(keyspace)
File "cassandra/cluster.py", line 1715, in cassandra.cluster.Cluster.connect
File "cassandra/cluster.py", line 1772, in cassandra.cluster.Cluster._new_session
File "cassandra/cluster.py", line 2553, in cassandra.cluster.Session.__init__
cassandra.cluster.NoHostAvailable: ("Unable to connect to any servers using keyspace 'test'", ['127.0.0.1'])
Process finished with exit code 1
好的,所以 Cassandra 建议如下:
- 每个键空间最多使用一个会话,或者使用单个会话并在查询中明确指定键空间
https://www.datastax.com/blog/4-simple-rules-when-using-datastax-drivers-cassandra
在您的代码中,您尝试在每次读取查询检索到一些行时创建一个会话。
为了强制代码最多使用一个会话,我们可以创建一个队列,子线程将行发送到主线程,主线程通过执行插入查询进一步处理它。我们在主线程中执行此操作,因为我在子线程中执行查询时遇到过问题。
callback_queue = Queue()
session = cluster_.connect(keyspace)
session.row_factory = dict_factory # because queue doesn't accept a Row instance
class PagedResultHandler(object):
...
def handle_page(self, rows):
for row in rows:
callback_queue.put(row) # here we pass the row as a dict to the queue
...
def process_rows():
while True:
try:
row = callback_queue.get() # here we retrieve the row as a dict from the child thread
stmt = session.prepare(
"INSERT INTO test.data(customer,snr,rttt, event_time) VALUES (?,?,?,?,?)")
results = session.execute(stmt,
[row['customer'], row['snr'], row['rttt'], row['created_time']])
print("Done")
except Empty:
pass
query = "select * from test.data_log"
statement = SimpleStatement(query, fetch_size=1000)
future = session.execute_async(statement)
handler = PagedResultHandler(future)
process_rows() # for now the code will hang here because we have an infinite loop in this function
handler.finished_event.wait()
if handler.error:
raise handler.error
cluster_.shutdown()
这将使它工作,但我会替换 while True 否则你将进入无限循环。
好的,在那种情况下我们做两件事,我们可以使用多线程和批量插入。我认为如果我们不需要批量插入并行性,因为这将足够快地从客户端加快速度。多线程不会增加更多的速度,因为它不是 cpu 密集型任务。
session = cluster_.connect(keyspace)
session.row_factory = dict_factory
class Fetcher:
def __init__(self, session):
self.session = session
query = "select * from test.data_log"
self.statement = SimpleStatement(query, fetch_size=1000)
def run(self):
rows = self.session.execute(self.statement)
temp_rows = []
total = 0
for row in rows:
temp_rows.append(row)
if len(temp_rows) == 1000:
handler = PagedResultHandler(self.session, temp_rows)
handler.start()
temp_rows = []
handler = PagedResultHandler(self.session, temp_rows)
handler.start()
def handle_error(self, err=None):
print(err)
class PagedResultHandler(threading.Thread):
def __init__(self, session, rows):
super().__init__()
self.session = session
self.error = None
self.rows = rows
self.finished_event = Event()
def run(self):
batch = BatchStatement(consistency_level=ConsistencyLevel.QUORUM)
stmt = session.prepare("INSERT INTO test.data(id, customer,snr,rttt, event_time) VALUES (?,?,?,?,?)")
for row in self.rows:
batch.add(stmt, [1, row['customer'], row['snr'], row['rttt'], row['created_time']])
results = session.execute(batch)
print(results)
Fetcher(session).run()
这个脚本同时执行批量插入和多线程处理,但多线程似乎也没有必要。
我正在尝试从特定的 table 从 Cassandra 中获取数据,并在进行一些更改后尝试将其插入到 Cassandra 中的另一个 table 中。 table 都位于键空间“test”中。当我尝试从第一个 table 获取数据时,一切正常并且能够获取数据。但是,在处理第一个查询输出的未来处理程序中,我试图将数据插入到同一 Cassandra 实例下的另一个 table 中,但它失败了。我从应用程序中收到一条错误消息,指出 "cassandra.cluster.NoHostAvailable: ("Unable to connect to any servers using keyspace 'test'", ['127.0.0.1'])" 。我不确定我哪里错了
import threading
from threading import Event
from cassandra.query import SimpleStatement
from cassandra.cluster import Cluster
hosts=['127.0.0.1']
keyspace="test"
thread_local = threading.local()
cluster_ = Cluster(hosts)
def get_session():
if hasattr(thread_local, "cassandra_session"):
print("got session from threadlocal")
return thread_local.cassandra_session
print(" Connecting to Cassandra Host " + str(hosts))
session_ = cluster_.connect(keyspace)
print(" Connecting and creating session to Cassandra KeySpace " + keyspace)
thread_local.cassandra_session = session_
return session_
class PagedResultHandler(object):
def __init__(self, future):
self.error = None
self.finished_event = Event()
self.future = future
self.future.add_callbacks(
callback=self.handle_page,
errback=self.handle_error)
def handle_page(self, rows):
for row in rows:
process_row(row)
if self.future.has_more_pages:
self.future.start_fetching_next_page()
else:
self.finished_event.set()
def handle_error(self, exc):
self.error = exc
self.finished_event.set()
def process_row(row):
print(row)
session_ = get_session()
stmt = session_.prepare(
"INSERT INTO test.data(customer,snr,rttt, event_time) VALUES (?,?,?,?)")
results = session_.execute(stmt,
[row.customer, row.snr, row.rttt,row.created_time])
print("Done")
session = get_session()
query = "select * from test.data_log"
statement = SimpleStatement(query, fetch_size=1000)
future = session.execute_async(statement)
handler = PagedResultHandler(future)
handler.finished_event.wait()
if handler.error:
raise handler.error
cluster_.shutdown()
但是,当我尝试执行 python 文件时,应用程序抛出错误“cassandra.cluster.NoHostAvailable:(“无法使用密钥空间 'test' 连接到任何服务器”,[ '127.0.0.1'])”来自“process_row”方法的 getSession() 调用。显然,对 Cassandra 的第一次调用成功了,没有任何问题。没有连接问题,Cassandra 实例在本地 运行 正常。我可以使用 cqlsh 查询数据。如果我在 Future 处理程序之外调用 process_row 方法,一切正常,我不确定需要做什么才能从 Future Handler 中实现它。
Connecting to Cassandra Host ['127.0.0.1']
Connecting and creating session to Cassandra KeySpace test
Row(customer='abcd', snr=100, rttt=121, created_time=datetime.datetime(2020, 8, 8, 2, 26, 51))
Connecting to Cassandra Host ['127.0.0.1']
Traceback (most recent call last):
File "test/check.py", , in <module>
raise handler.error
File "cassandra/cluster.py", line 4579, in cassandra.cluster.ResponseFuture._set_result
File "cassandra/cluster.py", line 4777, in cassandra.cluster.ResponseFuture._set_final_result
File "test/check.py"", in handle_page
process_row(row)
File "test/check.py"", in process_row
session_ = get_session()
File "/test/check.py"", in get_session
session_ = cluster_.connect(keyspace)
File "cassandra/cluster.py", line 1715, in cassandra.cluster.Cluster.connect
File "cassandra/cluster.py", line 1772, in cassandra.cluster.Cluster._new_session
File "cassandra/cluster.py", line 2553, in cassandra.cluster.Session.__init__
cassandra.cluster.NoHostAvailable: ("Unable to connect to any servers using keyspace 'test'", ['127.0.0.1'])
Process finished with exit code 1
好的,所以 Cassandra 建议如下:
- 每个键空间最多使用一个会话,或者使用单个会话并在查询中明确指定键空间
https://www.datastax.com/blog/4-simple-rules-when-using-datastax-drivers-cassandra
在您的代码中,您尝试在每次读取查询检索到一些行时创建一个会话。
为了强制代码最多使用一个会话,我们可以创建一个队列,子线程将行发送到主线程,主线程通过执行插入查询进一步处理它。我们在主线程中执行此操作,因为我在子线程中执行查询时遇到过问题。
callback_queue = Queue()
session = cluster_.connect(keyspace)
session.row_factory = dict_factory # because queue doesn't accept a Row instance
class PagedResultHandler(object):
...
def handle_page(self, rows):
for row in rows:
callback_queue.put(row) # here we pass the row as a dict to the queue
...
def process_rows():
while True:
try:
row = callback_queue.get() # here we retrieve the row as a dict from the child thread
stmt = session.prepare(
"INSERT INTO test.data(customer,snr,rttt, event_time) VALUES (?,?,?,?,?)")
results = session.execute(stmt,
[row['customer'], row['snr'], row['rttt'], row['created_time']])
print("Done")
except Empty:
pass
query = "select * from test.data_log"
statement = SimpleStatement(query, fetch_size=1000)
future = session.execute_async(statement)
handler = PagedResultHandler(future)
process_rows() # for now the code will hang here because we have an infinite loop in this function
handler.finished_event.wait()
if handler.error:
raise handler.error
cluster_.shutdown()
这将使它工作,但我会替换 while True 否则你将进入无限循环。
好的,在那种情况下我们做两件事,我们可以使用多线程和批量插入。我认为如果我们不需要批量插入并行性,因为这将足够快地从客户端加快速度。多线程不会增加更多的速度,因为它不是 cpu 密集型任务。
session = cluster_.connect(keyspace)
session.row_factory = dict_factory
class Fetcher:
def __init__(self, session):
self.session = session
query = "select * from test.data_log"
self.statement = SimpleStatement(query, fetch_size=1000)
def run(self):
rows = self.session.execute(self.statement)
temp_rows = []
total = 0
for row in rows:
temp_rows.append(row)
if len(temp_rows) == 1000:
handler = PagedResultHandler(self.session, temp_rows)
handler.start()
temp_rows = []
handler = PagedResultHandler(self.session, temp_rows)
handler.start()
def handle_error(self, err=None):
print(err)
class PagedResultHandler(threading.Thread):
def __init__(self, session, rows):
super().__init__()
self.session = session
self.error = None
self.rows = rows
self.finished_event = Event()
def run(self):
batch = BatchStatement(consistency_level=ConsistencyLevel.QUORUM)
stmt = session.prepare("INSERT INTO test.data(id, customer,snr,rttt, event_time) VALUES (?,?,?,?,?)")
for row in self.rows:
batch.add(stmt, [1, row['customer'], row['snr'], row['rttt'], row['created_time']])
results = session.execute(batch)
print(results)
Fetcher(session).run()
这个脚本同时执行批量插入和多线程处理,但多线程似乎也没有必要。