Python3: cassandra.cluster.NoHostAvailable: ("Unable to connect to any servers using keyspace 'test'", ['127.0.0.1']) 当使用 execute_async 未来时

Python3: cassandra.cluster.NoHostAvailable: ("Unable to connect to any servers using keyspace 'test'", ['127.0.0.1']) when using execute_async future

我正在尝试从特定的 table 从 Cassandra 中获取数据,并在进行一些更改后尝试将其插入到 Cassandra 中的另一个 table 中。 table 都位于键空间“test”中。当我尝试从第一个 table 获取数据时,一切正常并且能够获取数据。但是,在处理第一个查询输出的未来处理程序中,我试图将数据插入到同一 Cassandra 实例下的另一个 table 中,但它失败了。我从应用程序中收到一条错误消息,指出 "cassandra.cluster.NoHostAvailable: ("Unable to connect to any servers using keyspace 'test'", ['127.0.0.1'])" 。我不确定我哪里错了

import threading
from threading import Event
from cassandra.query import SimpleStatement
from cassandra.cluster import Cluster


hosts=['127.0.0.1']
keyspace="test"
thread_local = threading.local()
cluster_ = Cluster(hosts)
def get_session():
    if hasattr(thread_local, "cassandra_session"):
        print("got session from threadlocal")
        return thread_local.cassandra_session
    print(" Connecting to Cassandra Host " + str(hosts))
    session_ = cluster_.connect(keyspace)
    print(" Connecting and creating session to Cassandra KeySpace " + keyspace)
    thread_local.cassandra_session = session_
    return session_


class PagedResultHandler(object):

    def __init__(self, future):
        self.error = None
        self.finished_event = Event()
        self.future = future
        self.future.add_callbacks(
            callback=self.handle_page,
            errback=self.handle_error)

    def handle_page(self, rows):
        for row in rows:
            process_row(row)

        if self.future.has_more_pages:
            self.future.start_fetching_next_page()
        else:
            self.finished_event.set()

    def handle_error(self, exc):
        self.error = exc
        self.finished_event.set()

def process_row(row):
    print(row)
    session_ = get_session()
    stmt = session_.prepare(
        "INSERT INTO test.data(customer,snr,rttt, event_time) VALUES (?,?,?,?)")
    results = session_.execute(stmt,
                               [row.customer, row.snr, row.rttt,row.created_time])
    print("Done")

session = get_session()
query = "select * from test.data_log"
statement = SimpleStatement(query, fetch_size=1000)
future = session.execute_async(statement)
handler = PagedResultHandler(future)
handler.finished_event.wait()
if handler.error:
    raise handler.error
cluster_.shutdown()

但是,当我尝试执行 python 文件时,应用程序抛出错误“cassandra.cluster.NoHostAvailable:(“无法使用密钥空间 'test' 连接到任何服务器”,[ '127.0.0.1'])”来自“process_row”方法的 getSession() 调用。显然,对 Cassandra 的第一次调用成功了,没有任何问题。没有连接问题,Cassandra 实例在本地 运行 正常。我可以使用 cqlsh 查询数据。如果我在 Future 处理程序之外调用 process_row 方法,一切正常,我不确定需要做什么才能从 Future Handler 中实现它。

Connecting to Cassandra Host ['127.0.0.1']
Connecting and creating session to Cassandra KeySpace test
Row(customer='abcd', snr=100, rttt=121, created_time=datetime.datetime(2020, 8, 8, 2, 26, 51))
 Connecting to Cassandra Host ['127.0.0.1']
Traceback (most recent call last):
  File "test/check.py", , in <module>
    raise handler.error
  File "cassandra/cluster.py", line 4579, in cassandra.cluster.ResponseFuture._set_result
  File "cassandra/cluster.py", line 4777, in cassandra.cluster.ResponseFuture._set_final_result
  File "test/check.py"",  in handle_page
    process_row(row)
  File "test/check.py"",  in process_row
    session_ = get_session()
  File "/test/check.py"", in get_session
    session_ = cluster_.connect(keyspace)
  File "cassandra/cluster.py", line 1715, in cassandra.cluster.Cluster.connect
  File "cassandra/cluster.py", line 1772, in cassandra.cluster.Cluster._new_session
  File "cassandra/cluster.py", line 2553, in cassandra.cluster.Session.__init__
cassandra.cluster.NoHostAvailable: ("Unable to connect to any servers using keyspace 'test'", ['127.0.0.1'])

Process finished with exit code 1

好的,所以 Cassandra 建议如下:

  • 每个键空间最多使用一个会话,或者使用单个会话并在查询中明确指定键空间

https://www.datastax.com/blog/4-simple-rules-when-using-datastax-drivers-cassandra

在您的代码中,您尝试在每次读取查询检索到一些行时创建一个会话。

为了强制代码最多使用一个会话,我们可以创建一个队列,子线程将行发送到主线程,主线程通过执行插入查询进一步处理它。我们在主线程中执行此操作,因为我在子线程中执行查询时遇到过问题。

callback_queue = Queue()
session = cluster_.connect(keyspace)
session.row_factory = dict_factory # because queue doesn't accept a Row instance


class PagedResultHandler(object):

    ...

    def handle_page(self, rows):
        for row in rows:
            callback_queue.put(row) # here we pass the row as a dict to the queue
        ...

def process_rows():
    while True:
        try:
            row = callback_queue.get() # here we retrieve the row as a dict from the child thread
            stmt = session.prepare(
                "INSERT INTO test.data(customer,snr,rttt, event_time) VALUES (?,?,?,?,?)")
            results = session.execute(stmt,
                                       [row['customer'], row['snr'], row['rttt'], row['created_time']])
            print("Done")
        except Empty:
            pass

query = "select * from test.data_log"
statement = SimpleStatement(query, fetch_size=1000)
future = session.execute_async(statement)
handler = PagedResultHandler(future)
process_rows() # for now the code will hang here because we have an infinite loop in this function
handler.finished_event.wait()
if handler.error:
    raise handler.error
cluster_.shutdown()

这将使它工作,但我会替换 while True 否则你将进入无限循环。

好的,在那种情况下我们做两件事,我们可以使用多线程和批量插入。我认为如果我们不需要批量插入并行性,因为这将足够快地从客户端加快速度。多线程不会增加更多的速度,因为它不是 cpu 密集型任务。

session = cluster_.connect(keyspace)
session.row_factory = dict_factory


class Fetcher:

    def __init__(self, session):
        self.session = session
        query = "select * from test.data_log"
        self.statement = SimpleStatement(query, fetch_size=1000)

    def run(self):
        rows = self.session.execute(self.statement)

        temp_rows = []
        total = 0
        for row in rows:
            temp_rows.append(row)
            if len(temp_rows) == 1000:
                handler = PagedResultHandler(self.session, temp_rows)
                handler.start()
                temp_rows = []

        handler = PagedResultHandler(self.session, temp_rows)
        handler.start()

    def handle_error(self, err=None):
        print(err)


class PagedResultHandler(threading.Thread):

    def __init__(self, session, rows):
        super().__init__()
        self.session = session
        self.error = None
        self.rows = rows
        self.finished_event = Event()

    def run(self):
        batch = BatchStatement(consistency_level=ConsistencyLevel.QUORUM)
        stmt = session.prepare("INSERT INTO test.data(id, customer,snr,rttt, event_time) VALUES (?,?,?,?,?)")
        for row in self.rows:
            batch.add(stmt, [1, row['customer'], row['snr'], row['rttt'], row['created_time']])
        results = session.execute(batch)
        print(results)


Fetcher(session).run()

这个脚本同时执行批量插入和多线程处理,但多线程似乎也没有必要。