Cassandra 集群中的操作超时错误
Operation timed out error in Cassandra cluster
我的集群大小是 6 台机器,我经常收到这个错误消息,我真的不知道如何解决这个问题:
code=1100 [Coordinator node timed out waiting for replica nodes' responses] message="Operation timed out - received only 0 responses." info={'received_responses': 0, 'required_responses': 1, 'consistency': 'LOCAL_ONE'}
这是我的完整代码,出现错误消息的代码部分是这样的:
batch.add(schedule_remove_stmt, (source, type, row['scheduled_for'],row['id']));session.execute(batch,30)
完整代码:
cluster = Cluster(['localhost'])
session = cluster.connect('keyspace')
d = datetime.utcnow()
scheduled_for = d.replace(second=0, microsecond=0)
rowid=[]
stmt = session.prepare('SELECT * FROM schedules WHERE source=? AND type= ? AND scheduled_for = ?')
schedule_remove_stmt = session.prepare("DELETE FROM schedules WHERE source = ? AND type = ? AND scheduled_for = ? AND id = ?")
schedule_insert_stmt = session.prepare("INSERT INTO schedules(source, type, scheduled_for, id) VALUES (?, ?, ?, ?)")
schedules_to_delete = []
articles={}
source=''
type=''
try:
rows = session.execute(stmt, [source,type, scheduled_for])
article_schedule_delete = ''
for row in rows:
schedules_to_delete.append({'id':row.id,'scheduled_for':row.scheduled_for})
article_schedule_delete=article_schedule_delete+'\''+row.id+'\','
rowid.append(row.id)
article_schedule_delete = article_schedule_delete[0:-1]
cql = 'SELECT * FROM articles WHERE id in (%s)' % article_schedule_delete
articles_row = session.execute(cql)
for row in articles_row:
articles[row.id]=row.created_at
except Exception as e:
print e
log.info('select error is:%s' % e)
try:
for row in schedules_to_delete:
batch = BatchStatement()
batch.add(schedule_remove_stmt, (source, type, row['scheduled_for'],row['id']))
try:
if row['id'] in articles.keys():
next_schedule =d
elapsed = datetime.utcnow() - articles[row['id']]
if elapsed <= timedelta(hours=1):
next_schedule += timedelta(minutes=6)
elif elapsed <= timedelta(hours=3):
next_schedule += timedelta(minutes=18)
elif elapsed <= timedelta(hours=6):
next_schedule += timedelta(minutes=36)
elif elapsed <= timedelta(hours=12):
next_schedule += timedelta(minutes=72)
elif elapsed <= timedelta(days=1):
next_schedule += timedelta(minutes=144)
elif elapsed <= timedelta(days=3):
next_schedule += timedelta(minutes=432)
elif elapsed <= timedelta(days=30) :
next_schedule += timedelta(minutes=1440)
if not next_schedule==d:
batch.add(schedule_insert_stmt, (source,type, next_schedule.replace(second=0, microsecond=0),row['id']))
#log.info('schedule id:%s' % row['id'])
except Exception as e:
print 'key error:',e
log.info('HOW IT CHANGES %s %s %s %s ERROR:%s' % (source,type, next_schedule.replace(second=0, microsecond=0), row['id'],e))
session.execute(batch,30)
except Exception as e:
print 'schedules error is =======================>',e
log.info('schedules error is:%s' % e)
非常感谢您的帮助我真的不知道如何解决这个问题!
我认为你不应该在这种情况下使用批处理语句,因为你试图使用批处理对不同的分区键执行大量操作,这会导致超时异常。您应该使用批处理来保持表同步,而不是为了性能优化。
您可以在 this article
中找到有关滥用批处理的更多信息
使用 asynchronous driver api 更适合为您的案例执行大量删除查询。它将允许保持代码的性能并避免协调器过载。
我的集群大小是 6 台机器,我经常收到这个错误消息,我真的不知道如何解决这个问题:
code=1100 [Coordinator node timed out waiting for replica nodes' responses] message="Operation timed out - received only 0 responses." info={'received_responses': 0, 'required_responses': 1, 'consistency': 'LOCAL_ONE'}
这是我的完整代码,出现错误消息的代码部分是这样的:
batch.add(schedule_remove_stmt, (source, type, row['scheduled_for'],row['id']));session.execute(batch,30)
完整代码:
cluster = Cluster(['localhost'])
session = cluster.connect('keyspace')
d = datetime.utcnow()
scheduled_for = d.replace(second=0, microsecond=0)
rowid=[]
stmt = session.prepare('SELECT * FROM schedules WHERE source=? AND type= ? AND scheduled_for = ?')
schedule_remove_stmt = session.prepare("DELETE FROM schedules WHERE source = ? AND type = ? AND scheduled_for = ? AND id = ?")
schedule_insert_stmt = session.prepare("INSERT INTO schedules(source, type, scheduled_for, id) VALUES (?, ?, ?, ?)")
schedules_to_delete = []
articles={}
source=''
type=''
try:
rows = session.execute(stmt, [source,type, scheduled_for])
article_schedule_delete = ''
for row in rows:
schedules_to_delete.append({'id':row.id,'scheduled_for':row.scheduled_for})
article_schedule_delete=article_schedule_delete+'\''+row.id+'\','
rowid.append(row.id)
article_schedule_delete = article_schedule_delete[0:-1]
cql = 'SELECT * FROM articles WHERE id in (%s)' % article_schedule_delete
articles_row = session.execute(cql)
for row in articles_row:
articles[row.id]=row.created_at
except Exception as e:
print e
log.info('select error is:%s' % e)
try:
for row in schedules_to_delete:
batch = BatchStatement()
batch.add(schedule_remove_stmt, (source, type, row['scheduled_for'],row['id']))
try:
if row['id'] in articles.keys():
next_schedule =d
elapsed = datetime.utcnow() - articles[row['id']]
if elapsed <= timedelta(hours=1):
next_schedule += timedelta(minutes=6)
elif elapsed <= timedelta(hours=3):
next_schedule += timedelta(minutes=18)
elif elapsed <= timedelta(hours=6):
next_schedule += timedelta(minutes=36)
elif elapsed <= timedelta(hours=12):
next_schedule += timedelta(minutes=72)
elif elapsed <= timedelta(days=1):
next_schedule += timedelta(minutes=144)
elif elapsed <= timedelta(days=3):
next_schedule += timedelta(minutes=432)
elif elapsed <= timedelta(days=30) :
next_schedule += timedelta(minutes=1440)
if not next_schedule==d:
batch.add(schedule_insert_stmt, (source,type, next_schedule.replace(second=0, microsecond=0),row['id']))
#log.info('schedule id:%s' % row['id'])
except Exception as e:
print 'key error:',e
log.info('HOW IT CHANGES %s %s %s %s ERROR:%s' % (source,type, next_schedule.replace(second=0, microsecond=0), row['id'],e))
session.execute(batch,30)
except Exception as e:
print 'schedules error is =======================>',e
log.info('schedules error is:%s' % e)
非常感谢您的帮助我真的不知道如何解决这个问题!
我认为你不应该在这种情况下使用批处理语句,因为你试图使用批处理对不同的分区键执行大量操作,这会导致超时异常。您应该使用批处理来保持表同步,而不是为了性能优化。 您可以在 this article
中找到有关滥用批处理的更多信息使用 asynchronous driver api 更适合为您的案例执行大量删除查询。它将允许保持代码的性能并避免协调器过载。