Cassandra 更新失败
Cassandra update fails
已解决
我在 3 个节点上测试更新,其中一个节点的时间落后 1 秒,所以当更新一行时,写入时间总是落后于时间戳,cassandra 不会更新行。我同步了所有节点时间,问题解决了。
编辑:
我仔细检查了结果,所有插入都成功了,部分更新失败了。没有 error/exception 条消息
我有一个包含 5 个节点的 cassandra 集群 (Cassandra 2.0.13)。使用 python(2.6.6) cassandra driver(2.6.0c2)
将数据插入数据库。我的服务器系统是 Centos6.X
以下代码是我如何连接到 cassandra 并获取会话。我最多提供了 2 个节点的 ip 地址和 select 密钥空间。
def get_cassandra_session():
"""creates cluster and gets the session base on key space"""
# be aware that session cannot be shared between threads/processes
# or it will raise OperationTimedOut Exception
if CLUSTER_HOST2:
cluster = cassandra.cluster.Cluster([CLUSTER_HOST1, CLUSTER_HOST2])
else:
# if only one address is available, we have to use older protocol version
cluster = cassandra.cluster.Cluster([CLUSTER_HOST1], protocol_version=1)
session = cluster.connect(KEY_SPACE)
return session
对于每一行,我有 17 列,如果数据库中不存在该键,我将使用 session
插入键和其余列的默认值,然后更新特定列的值。
def insert_initial_row(session, key):
session.execute(INITIAL_INSERTION_STATEMENT, tuple(INITIAL_COLUMNS_VALUES))
def update_columnX(session, key, column):
session.execute("INSERT INTO " + TABLE + "(" + KEY + "," + COLUMN_X + ") VALUES(%s, %s)", (key, column))
def has_found(session, key):
"""checks key is in database or not"""
query = "SELECT " + "*" + " FROM " + KEY_SPACE + "." + TABLE \
+ " WHERE " + KEY + " = " + "'" + key + "'"
# returns a list
row = session.execute(query)
return True if row else False
以下是我调用它们的方式:
for a_key in keys_set:
"""keys_set contains 100 no duplicate keys"""
if has_found(session, a_key):
update_columnX(session, a_key, "column x value")
else:
"""the key is not in db, initialize it with all default values, then update column x"""
insert_initial_row(session, a_key)
if has_found(sessin, a_key):
update_columnX(session, a_key, "column x value")
else:
logger.error("not initialized correctly...")
我试图插入 100 行并更新每行的 columnX,但只能更新这 100 行中的一部分,其余行的 columnX 是默认值。insert_initial_row
已调用并初始化默认值对于所有 100 行,但 update_columnX
没有。事件我将一致性级别更改为 Quorum,它根本没有帮助。 "not initialized correctly..." 从来没有打印出来,我在 update_columnX
中添加了一个 print
行并且该行打印了 100 次,所以它被调用了 100 次,但并不是所有的都更新了。
有什么想法吗?请帮忙。
谢谢
如果您的 session.execute 写入不成功(它们不符合所需的一致性级别),则驱动程序将引发以下异常之一:
- Unavailable - 没有足够的实时副本来满足请求的一致性级别,因此协调节点立即使请求失败,而没有将其转发给任何副本。
- Timeout - 副本在 cassandra 超时之前未响应协调器。
- Write timeout - 副本在写入超时前未响应协调器。在 cassandra.yaml 中配置。读也有类似的超时,读写超时在yaml中单独配置。
- Operation timeout - 操作花费的时间比指定的客户端超时时间长。在您的应用程序代码中配置。
您可以尝试 tracing 您的查询并找出每次写入的确切情况。这将向您显示操作中涉及的协调器和副本节点以及请求在每个节点上花费了多少时间。
已解决 我在 3 个节点上测试更新,其中一个节点的时间落后 1 秒,所以当更新一行时,写入时间总是落后于时间戳,cassandra 不会更新行。我同步了所有节点时间,问题解决了。
编辑: 我仔细检查了结果,所有插入都成功了,部分更新失败了。没有 error/exception 条消息
我有一个包含 5 个节点的 cassandra 集群 (Cassandra 2.0.13)。使用 python(2.6.6) cassandra driver(2.6.0c2)
将数据插入数据库。我的服务器系统是 Centos6.X
以下代码是我如何连接到 cassandra 并获取会话。我最多提供了 2 个节点的 ip 地址和 select 密钥空间。
def get_cassandra_session():
"""creates cluster and gets the session base on key space"""
# be aware that session cannot be shared between threads/processes
# or it will raise OperationTimedOut Exception
if CLUSTER_HOST2:
cluster = cassandra.cluster.Cluster([CLUSTER_HOST1, CLUSTER_HOST2])
else:
# if only one address is available, we have to use older protocol version
cluster = cassandra.cluster.Cluster([CLUSTER_HOST1], protocol_version=1)
session = cluster.connect(KEY_SPACE)
return session
对于每一行,我有 17 列,如果数据库中不存在该键,我将使用 session
插入键和其余列的默认值,然后更新特定列的值。
def insert_initial_row(session, key):
session.execute(INITIAL_INSERTION_STATEMENT, tuple(INITIAL_COLUMNS_VALUES))
def update_columnX(session, key, column):
session.execute("INSERT INTO " + TABLE + "(" + KEY + "," + COLUMN_X + ") VALUES(%s, %s)", (key, column))
def has_found(session, key):
"""checks key is in database or not"""
query = "SELECT " + "*" + " FROM " + KEY_SPACE + "." + TABLE \
+ " WHERE " + KEY + " = " + "'" + key + "'"
# returns a list
row = session.execute(query)
return True if row else False
以下是我调用它们的方式:
for a_key in keys_set:
"""keys_set contains 100 no duplicate keys"""
if has_found(session, a_key):
update_columnX(session, a_key, "column x value")
else:
"""the key is not in db, initialize it with all default values, then update column x"""
insert_initial_row(session, a_key)
if has_found(sessin, a_key):
update_columnX(session, a_key, "column x value")
else:
logger.error("not initialized correctly...")
我试图插入 100 行并更新每行的 columnX,但只能更新这 100 行中的一部分,其余行的 columnX 是默认值。insert_initial_row
已调用并初始化默认值对于所有 100 行,但 update_columnX
没有。事件我将一致性级别更改为 Quorum,它根本没有帮助。 "not initialized correctly..." 从来没有打印出来,我在 update_columnX
中添加了一个 print
行并且该行打印了 100 次,所以它被调用了 100 次,但并不是所有的都更新了。
有什么想法吗?请帮忙。
谢谢
如果您的 session.execute 写入不成功(它们不符合所需的一致性级别),则驱动程序将引发以下异常之一:
- Unavailable - 没有足够的实时副本来满足请求的一致性级别,因此协调节点立即使请求失败,而没有将其转发给任何副本。
- Timeout - 副本在 cassandra 超时之前未响应协调器。
- Write timeout - 副本在写入超时前未响应协调器。在 cassandra.yaml 中配置。读也有类似的超时,读写超时在yaml中单独配置。
- Operation timeout - 操作花费的时间比指定的客户端超时时间长。在您的应用程序代码中配置。
您可以尝试 tracing 您的查询并找出每次写入的确切情况。这将向您显示操作中涉及的协调器和副本节点以及请求在每个节点上花费了多少时间。