使用 cqlengine 在 cassandra 中插入和更新大量行的最快和最有效的方法
Fastest and most efficient way to insert and update a lot of rows in cassandra using cqlengine
我有一个包含 15000000 条记录的 csv 文件,我正在尝试将其处理成 cassandra table。这是列 headers 和数据的示例:
为了更好地理解它,这是我在 python 中的模型:
class DIDSummary(Model):
__keyspace__ = 'processor_api'
did = columns.Text(required=True, primary_key=True, partition_key=True)
month = columns.DateTime(required=True, primary_key=True, partition_key=True)
direction = columns.Text(required=True, primary_key=True)
duration = columns.Counter(required=True)
cost = columns.Counter(required=True)
现在我正在尝试处理 csv 文件的每一行中的数据,并将它们分批插入 500、1000、10000、250 等,但结果相同(大约 0.33 秒/ 1000,这意味着完成所有这些需要 90 分钟)。我还尝试采用多处理池并 apply_async()
'ing 每个 batch.execute()
调用,但没有更好的结果。有没有什么办法可以在 python 中使用 SSTableWriter,或者做一些其他的事情来更好地将它们插入到 cassandra 中?作为参考,这是我的 process_sheet_row()
方法:
def process_sheet_row(self, row, batch):
report_datetime = '{0}{1:02d}'.format(self.report.report_year, self.report.report_month)
duration = int(float(row[self.columns['DURATION']]) * 10)
cost = int(float(row[self.columns['COST']]) * 100000)
anisummary = DIDSummary.batch(batch).create(did='{}{}'.format(self.report.ani_country_code, row[self.columns['ANI']]),
direction='from',
month=datetime.datetime.strptime(report_datetime, '%Y%m'))
anisummary.duration += duration
anisummary.cost += cost
anisummary.batch(batch).save()
destsummary = DIDSummary.batch(batch).create(did='{}{}'.format(self.report.dest_country_code, row[self.columns['DEST']]),
direction='to',
month=datetime.datetime.strptime(report_datetime, '%Y%m'))
destsummary.duration += duration
destsummary.cost += cost
destsummary.batch(batch).save()
如有任何帮助,我们将不胜感激。谢谢!
编辑:这是我用于遍历文件并处理它的代码:
with open(self.path) as csvfile:
reader = csv.DictReader(csvfile)
if arr[0] == 'inventory':
self.parse_inventory(reader)
b = BatchQuery(batch_type=BatchType.Unlogged)
i = 1
for row in reader:
self.parse_sheet_row(row, b)
if not i % 1000:
connection.check_connection() # This just makes sure we're still connected to cassandra. Check code below
self.pool.apply_async(b.execute())
b = BatchQuery(batch_type=BatchType.Unlogged)
i += 1
print "Done processing: {}".format(self.path)
print "Time to Execute: {}".format(datetime.datetime.now() - start)
print "Batches: {}".format(i / 1000)
print "Records processed: {}".format(i - 1)
因为这可能有点帮助,这里是 connection.check_connection()
方法(和周围的方法):
def setup_defaults():
connection.setup(['127.0.0.1'], 'processor_api', lazy_connect=True)
def check_connection():
from cdr.models import DIDSummary
try:
DIDSummary.objects.all().count()
except CQLEngineException:
setup_defaults()
批处理通常不是执行插入的最快方法。在包含各种分区的未记录批次中尤其如此。分批阅读 here
如果你可以脱离 cqlengine 进行插入,你应该尝试 async callback chaining which is implemented in the Python driver under: cassandra.execute_concurrent。
在滥用各种大小的批次后,我在 inserts/sec 转向此方法方面有了重大改进,但是 YMMV。
我有一个包含 15000000 条记录的 csv 文件,我正在尝试将其处理成 cassandra table。这是列 headers 和数据的示例:
为了更好地理解它,这是我在 python 中的模型:
class DIDSummary(Model):
__keyspace__ = 'processor_api'
did = columns.Text(required=True, primary_key=True, partition_key=True)
month = columns.DateTime(required=True, primary_key=True, partition_key=True)
direction = columns.Text(required=True, primary_key=True)
duration = columns.Counter(required=True)
cost = columns.Counter(required=True)
现在我正在尝试处理 csv 文件的每一行中的数据,并将它们分批插入 500、1000、10000、250 等,但结果相同(大约 0.33 秒/ 1000,这意味着完成所有这些需要 90 分钟)。我还尝试采用多处理池并 apply_async()
'ing 每个 batch.execute()
调用,但没有更好的结果。有没有什么办法可以在 python 中使用 SSTableWriter,或者做一些其他的事情来更好地将它们插入到 cassandra 中?作为参考,这是我的 process_sheet_row()
方法:
def process_sheet_row(self, row, batch):
report_datetime = '{0}{1:02d}'.format(self.report.report_year, self.report.report_month)
duration = int(float(row[self.columns['DURATION']]) * 10)
cost = int(float(row[self.columns['COST']]) * 100000)
anisummary = DIDSummary.batch(batch).create(did='{}{}'.format(self.report.ani_country_code, row[self.columns['ANI']]),
direction='from',
month=datetime.datetime.strptime(report_datetime, '%Y%m'))
anisummary.duration += duration
anisummary.cost += cost
anisummary.batch(batch).save()
destsummary = DIDSummary.batch(batch).create(did='{}{}'.format(self.report.dest_country_code, row[self.columns['DEST']]),
direction='to',
month=datetime.datetime.strptime(report_datetime, '%Y%m'))
destsummary.duration += duration
destsummary.cost += cost
destsummary.batch(batch).save()
如有任何帮助,我们将不胜感激。谢谢!
编辑:这是我用于遍历文件并处理它的代码:
with open(self.path) as csvfile:
reader = csv.DictReader(csvfile)
if arr[0] == 'inventory':
self.parse_inventory(reader)
b = BatchQuery(batch_type=BatchType.Unlogged)
i = 1
for row in reader:
self.parse_sheet_row(row, b)
if not i % 1000:
connection.check_connection() # This just makes sure we're still connected to cassandra. Check code below
self.pool.apply_async(b.execute())
b = BatchQuery(batch_type=BatchType.Unlogged)
i += 1
print "Done processing: {}".format(self.path)
print "Time to Execute: {}".format(datetime.datetime.now() - start)
print "Batches: {}".format(i / 1000)
print "Records processed: {}".format(i - 1)
因为这可能有点帮助,这里是 connection.check_connection()
方法(和周围的方法):
def setup_defaults():
connection.setup(['127.0.0.1'], 'processor_api', lazy_connect=True)
def check_connection():
from cdr.models import DIDSummary
try:
DIDSummary.objects.all().count()
except CQLEngineException:
setup_defaults()
批处理通常不是执行插入的最快方法。在包含各种分区的未记录批次中尤其如此。分批阅读 here
如果你可以脱离 cqlengine 进行插入,你应该尝试 async callback chaining which is implemented in the Python driver under: cassandra.execute_concurrent。
在滥用各种大小的批次后,我在 inserts/sec 转向此方法方面有了重大改进,但是 YMMV。