在 Table 存储 azure 中使用 Etag 通过多线程更新 属性
update a property by multi-thread with Etag in Table Storage azure
概览:
当我在 container/productID(folder)/blobName 下的 blob 存储中上传 blob 时,事件订阅会将此事件保存在存储队列中。之后 azure 函数轮询此事件并执行以下操作:
1- read from the corresponding table the current count property (how
many blobs are stored under productID(folder))
2- increase the count + 1
3- write it back in the corresponding table
4- return
问题是一个竞争条件,我试着把它放在 Lock() 中,如代码所示。这在我同时上传 1000 个文件时有效。但是如果我同时加载 10000 个文件并读取计数属性,它 return 超过 10000 这是错误的,它必须 return 只有 10000。我也阻止了只扩展一个实例被建造。
问题仍然是竞争条件(我不认为但可能是)还是 Azure 运行时在函数上运行事件不止一个?我不确定发生了什么。任何想法都会很好
class _tableStorage:
def __init__(self, account, key):
self.table_service = TableService(account, key)
def create_table(self, table_name):
self.table_service.create_table(table_name)
def insert_entity_table(self, table_name, entity):
self.table_service.insert_or_replace_entity(table_name, entity)
def exist_table(self, table_name):
return self.table_service.exists(table_name)
def get_entity_table(self, table_name, entity):
return self.table_service.get_entity(
table_name, entity.PartitionKey, entity.RowKey)
def get_all_entities_table(self, table_name):
try:
list = self.table_service.query_entities(table_name)
except:
logging.info('unknown error by listing entities')
return list
def get_blob_meta(url):
parsed_url = urlparse.urlparse(url)
return {
"storage": parsed_url.netloc.split('.')[0],
"contianer": parsed_url.path.split('/')[1],
"folder": parsed_url.path.split('/')[2]
}
threadLock = threading.Lock()
def main(msg: func.QueueMessage) -> None:
url = json.loads(msg.get_body().decode(
'utf-8').replace("'", "\""))['data']['url']
logging.info(url)
blob_meta = get_blob_meta(url)
logging.info(blob_meta)
table_service = _tableStorage(
blob_meta['storage'],
"xxxxxxxxxx")
threadLock.acquire()
entity = Entity()
# should have same partition to be stored in one node.
entity.PartitionKey = blob_meta['contianer']
entity.RowKey = blob_meta['folder']
if(not table_service.exist_table(blob_meta['contianer'])):
table_service.create_table(blob_meta['contianer'])
entity.count = 1
else:
entity.count = table_service.get_entity_table(
blob_meta['contianer'], entity).count + 1
table_service.insert_entity_table(blob_meta['contianer'], entity)
threadLock.release()
二解:
第一个 multi-threading:
idea 是 header 中的 ETag 标志,以确保原子处理。首先,我读取了计数 属性 和 ETag 标志。然后,我增加它。在我使用 table 中递增的计数 属性 更新计数之前,if_match 会将我的 Etag 与 table 中存储的计数相匹配,如果 ETag 匹配,则计数将被更新,否则,它会抛出一个错误,我捕获这个错误并再次尝试读取和递增,直到更新成功
To understand more read the docu
header_etag = "random-etag"
response_etag = "random-response"
while True:
sleep(random.random()) # sleep between 0 and 1 second.
header = table_service1.get_entity(
client_table, client_table, client_product)
header_etag = header['etag']
new_count = header['Count'] + 1
entity_product = create_product_entity(
client_table, client_product, new_count, client_image_table)
try:
response_etag = table_service1.merge_entity(client_table, entity_product,
if_match=header_etag)
break
except:
logging.info("race condition detected")
第二个
通过阻止解决 multi-thread:
本地调试在local.settings.json
{
"IsEncrypted": false,
"Values": {
"AzureFunctionsJobHost__extensions__queues__batchSize": 1,
"AzureFunctionsJobHost__extensions__queues__newBatchThreshold": 0,
}
}
**生产中** host.json
"extensions": {
"queues": {
"batchSize": 1,
"newBatchThreshold": 0
}
}
概览: 当我在 container/productID(folder)/blobName 下的 blob 存储中上传 blob 时,事件订阅会将此事件保存在存储队列中。之后 azure 函数轮询此事件并执行以下操作:
1- read from the corresponding table the current count property (how many blobs are stored under productID(folder))
2- increase the count + 1
3- write it back in the corresponding table
4- return
问题是一个竞争条件,我试着把它放在 Lock() 中,如代码所示。这在我同时上传 1000 个文件时有效。但是如果我同时加载 10000 个文件并读取计数属性,它 return 超过 10000 这是错误的,它必须 return 只有 10000。我也阻止了只扩展一个实例被建造。 问题仍然是竞争条件(我不认为但可能是)还是 Azure 运行时在函数上运行事件不止一个?我不确定发生了什么。任何想法都会很好
class _tableStorage:
def __init__(self, account, key):
self.table_service = TableService(account, key)
def create_table(self, table_name):
self.table_service.create_table(table_name)
def insert_entity_table(self, table_name, entity):
self.table_service.insert_or_replace_entity(table_name, entity)
def exist_table(self, table_name):
return self.table_service.exists(table_name)
def get_entity_table(self, table_name, entity):
return self.table_service.get_entity(
table_name, entity.PartitionKey, entity.RowKey)
def get_all_entities_table(self, table_name):
try:
list = self.table_service.query_entities(table_name)
except:
logging.info('unknown error by listing entities')
return list
def get_blob_meta(url):
parsed_url = urlparse.urlparse(url)
return {
"storage": parsed_url.netloc.split('.')[0],
"contianer": parsed_url.path.split('/')[1],
"folder": parsed_url.path.split('/')[2]
}
threadLock = threading.Lock()
def main(msg: func.QueueMessage) -> None:
url = json.loads(msg.get_body().decode(
'utf-8').replace("'", "\""))['data']['url']
logging.info(url)
blob_meta = get_blob_meta(url)
logging.info(blob_meta)
table_service = _tableStorage(
blob_meta['storage'],
"xxxxxxxxxx")
threadLock.acquire()
entity = Entity()
# should have same partition to be stored in one node.
entity.PartitionKey = blob_meta['contianer']
entity.RowKey = blob_meta['folder']
if(not table_service.exist_table(blob_meta['contianer'])):
table_service.create_table(blob_meta['contianer'])
entity.count = 1
else:
entity.count = table_service.get_entity_table(
blob_meta['contianer'], entity).count + 1
table_service.insert_entity_table(blob_meta['contianer'], entity)
threadLock.release()
二解:
第一个 multi-threading:
idea 是 header 中的 ETag 标志,以确保原子处理。首先,我读取了计数 属性 和 ETag 标志。然后,我增加它。在我使用 table 中递增的计数 属性 更新计数之前,if_match 会将我的 Etag 与 table 中存储的计数相匹配,如果 ETag 匹配,则计数将被更新,否则,它会抛出一个错误,我捕获这个错误并再次尝试读取和递增,直到更新成功 To understand more read the docu
header_etag = "random-etag"
response_etag = "random-response"
while True:
sleep(random.random()) # sleep between 0 and 1 second.
header = table_service1.get_entity(
client_table, client_table, client_product)
header_etag = header['etag']
new_count = header['Count'] + 1
entity_product = create_product_entity(
client_table, client_product, new_count, client_image_table)
try:
response_etag = table_service1.merge_entity(client_table, entity_product,
if_match=header_etag)
break
except:
logging.info("race condition detected")
第二个
通过阻止解决 multi-thread:
本地调试在local.settings.json
{
"IsEncrypted": false,
"Values": {
"AzureFunctionsJobHost__extensions__queues__batchSize": 1,
"AzureFunctionsJobHost__extensions__queues__newBatchThreshold": 0,
}
}
**生产中** host.json
"extensions": {
"queues": {
"batchSize": 1,
"newBatchThreshold": 0
}
}