将 100,000 多条记录插入 DocumentDB 的最快方法
Fastest way to insert 100,000+ records into DocumentDB
如标题所示,我需要以编程方式将 100,000 多条记录插入 DocumentDb collection。这些数据将用于稍后创建报告。我正在使用 Azure Documents SDK 和存储过程来批量插入文档(参见问题 Azure documentdb bulk insert using stored procedure)。
以下控制台应用程序显示了我如何插入文档。
InsertDocuments 生成 500 个测试文档以传递给存储过程。 main函数调用了10次InsertDocuments,总共插入了5000个文档。 运行 此应用程序每隔几秒就会插入 500 个文档。如果我增加每次调用的文档数量,我就会开始出现错误和丢失文档。
谁能推荐一种更快的插入文档的方法?
static void Main(string[] args)
{
Console.WriteLine("Starting...");
MainAsync().Wait();
}
static async Task MainAsync()
{
int campaignId = 1001,
count = 500;
for (int i = 0; i < 10; i++)
{
await InsertDocuments(campaignId, (count * i) + 1, (count * i) + count);
}
}
static async Task InsertDocuments(int campaignId, int startId, int endId)
{
using (DocumentClient client = new DocumentClient(new Uri(documentDbUrl), documentDbKey))
{
List<dynamic> items = new List<dynamic>();
// Create x number of documents to insert
for (int i = startId; i <= endId; i++)
{
var item = new
{
id = Guid.NewGuid(),
campaignId = campaignId,
userId = i,
status = "Pending"
};
items.Add(item);
}
var task = client.ExecuteStoredProcedureAsync<dynamic>("/dbs/default/colls/campaignusers/sprocs/bulkImport", new RequestOptions()
{
PartitionKey = new PartitionKey(campaignId)
},
new
{
items = items
});
try
{
await task;
int insertCount = (int)task.Result.Response;
Console.WriteLine("{0} documents inserted...", insertCount);
}
catch (Exception e)
{
Console.WriteLine("Error: {0}", e.Message);
}
}
}
将文档插入 Azure DocumentDB 的最快方法。在 Github 上作为示例提供:https://github.com/Azure/azure-documentdb-dotnet/tree/master/samples/documentdb-benchmark
以下提示将帮助您使用 .NET SDK 实现最佳吞吐量:
- 初始化单例 DocumentClient
- 使用直接连接和 TCP 协议(
ConnectionMode.Direct
和 ConnectionProtocol.Tcp
)
- 并行使用 100 个任务(取决于您的硬件)
- 将 DocumentClient 构造函数中的
MaxConnectionLimit
增加到一个较高的值,比如 1000 个连接
- 开启
gcServer
- 确保您的集合具有适当的配置吞吐量(和良好的分区键)
- 运行 在同一个 Azure 区域中也会有帮助
有了 10,000 RU/s,您可以在大约 50 秒内插入 100,000 个文档(每次写入大约 5 个请求单元)。
100,000 RU/s,您可以在大约 5 秒内插入。您可以通过配置吞吐量(对于非常高的插入数量,将插入分散到多个 VMs/workers)
来尽可能快地完成此操作
编辑: 您现在可以在 https://docs.microsoft.com/en-us/azure/cosmos-db/bulk-executor-overview、7/12/19
使用批量执行程序库
其他方法是其他人提到的存储过程。存储过程需要分区键。此外,根据文档,存储过程应在 4 秒内结束,否则所有记录都将回滚。使用 python azure documentdb sdk 和基于 javascript 的存储过程查看下面的代码。我已经修改了脚本并解决了下面代码工作正常的很多错误:-
function bulkimport2(docObject) {
var collection = getContext().getCollection();
var collectionLink = collection.getSelfLink();
// The count of imported docs, also used as current doc index.
var count = 0;
getContext().getResponse().setBody(docObject.items);
//return
// Validate input.
//if (!docObject.items || !docObject.items.length) getContext().getResponse().setBody(docObject);
docObject.items=JSON.stringify(docObject.items)
docObject.items = docObject.items.replace("\\r", "");
docObject.items = docObject.items.replace("\\n", "");
var docs = JSON.parse(docObject.items);
var docsLength = docObject.items.length;
if (docsLength == 0) {
getContext().getResponse().setBody(0);
return;
}
// Call the CRUD API to create a document.
tryCreate(docs[count], callback, collectionLink,count);
// Note that there are 2 exit conditions:
// 1) The createDocument request was not accepted.
// In this case the callback will not be called, we just call setBody and we are done.
// 2) The callback was called docs.length times.
// In this case all documents were created and we don't need to call tryCreate anymore. Just call setBody and we are done.
function tryCreate(doc, callback, collectionLink,count ) {
doc=JSON.stringify(doc);
if (typeof doc == "undefined") {
getContext().getResponse().setBody(count);
return ;
} else {
doc = doc.replace("\r", "");
doc = doc.replace("\n", "");
doc=JSON.parse(doc);
}
getContext().getResponse().setBody(doc);
var isAccepted = collection.upsertDocument(collectionLink, doc, callback);
// If the request was accepted, callback will be called.
// Otherwise report current count back to the client,
// which will call the script again with remaining set of docs.
// This condition will happen when this stored procedure has been running too long
// and is about to get cancelled by the server. This will allow the calling client
// to resume this batch from the point we got to before isAccepted was set to false
if (!isAccepted) {
getContext().getResponse().setBody(count);
}
}
// This is called when collection.createDocument is done and the document has been persisted.
function callback(err, doc, options) {
if (err) throw getContext().getResponse().setBody(err + doc);
// One more document has been inserted, increment the count.
count++;
if (count >= docsLength) {
// If we have created all documents, we are done. Just set the response.
getContext().getResponse().setBody(count);
return ;
} else {
// Create next document.
tryCreate(docs[count], callback, collectionLink,count);
}
}
}
编辑:- getContext().getResponse().setBody(count);
return; //当所有记录都被处理完。
python 加载存储过程和批量导入的脚本
# Initialize the Python DocumentDB client
client = document_client.DocumentClient(config['ENDPOINT'], {'masterKey': config['MASTERKEY'] ,'DisableSSLVerification' : 'true' })
# Create a database
#db = client.CreateDatabase({ 'id': config['DOCUMENTDB_DATABASE'] })
db=client.ReadDatabases({ 'id': 'db2' })
print(db)
# Create collection options
options = {
'offerEnableRUPerMinuteThroughput': True,
'offerVersion': "V2",
'offerThroughput': 400
}
# Create a collection
#collection = client.CreateCollection('dbs/db2' , { 'id': 'coll2'}, options)
#collection = client.CreateCollection({ 'id':'db2'},{ 'id': 'coll2'}, options)
database_link = 'dbs/db2'
collection_link = database_link + '/colls/coll2'
"""
#List collections
collection = client.ReadCollection(collection_link)
print(collection)
print('Databases:')
databases = list(client.ReadDatabases())
if not databases:
print('No Databases:')
for database in databases:
print(database['id'])
"""
# Create some documents
"""
document1 = client.CreateDocument(collection['_self'],
{
'Web Site': 0,
'Cloud Service': 0,
'Virtual Machine': 0,
'name': 'some'
})
document2 = client.CreateDocument(collection['_self'],
{
'Web Site': 1,
'Cloud Service': 0,
'Virtual Machine': 0,
'name': 'some'
})
"""
# Query them in SQL
"""
query = { 'query': 'SELECT * FROM server s' }
options = {}
options['enableCrossPartitionQuery'] = True
options['maxItemCount'] = 20
#result_iterable = client.QueryDocuments(collection['_self'], query, options)
result_iterable = client.QueryDocuments(collection_link, query, options)
results = list(result_iterable);
print(results)
"""
##How to store procedure and use it
"""
sproc3 = {
'id': 'storedProcedure2',
'body': (
'function (input) {' +
' getContext().getResponse().setBody(' +
' \'a\' + input.temp);' +
'}')
}
retrieved_sproc3 = client.CreateStoredProcedure(collection_link,sproc3)
result = client.ExecuteStoredProcedure('dbs/db2/colls/coll2/sprocs/storedProcedure3',{'temp': 'so'})
"""
## delete all records in collection
"""
result = client.ExecuteStoredProcedure('dbs/db2/colls/coll2/sprocs/bulkDeleteSproc',"SELECT * FROM c ORDER BY c._ts DESC ")
print(result)
"""
multiplerecords="""[{
"Virtual Machine": 0,
"name": "some",
"Web Site": 0,
"Cloud Service": 0
},
{
"Virtual Machine": 0,
"name": "some",
"Web Site": 1,
"Cloud Service": 0
}]"""
multiplerecords=json.loads(multiplerecords)
print(multiplerecords)
print(str(json.dumps(json.dumps(multiplerecords).encode('utf8'))))
#bulkloadresult = client.ExecuteStoredProcedure('dbs/db2/colls/coll2/sprocs/bulkImport',json.dumps(multiplerecords).encode('utf8'))
#bulkloadresult = client.ExecuteStoredProcedure('dbs/db2/colls/coll2/sprocs/bulkImport',json.dumps(json.loads(r'{"items": [{"name":"John","age":30,"city":"New York"},{"name":"John","age":30,"city":"New York"}]}')).encode('utf8'))
str1='{name":"John","age":30,"city":"New York","PartitionKey" : "Morisplane"}'
str2='{name":"John","age":30,"city":"New York","partitionKey" : "Morisplane"}'
key1=base64.b64encode(str1.encode("utf-8"))
key2=base64.b64encode(str2.encode("utf-8"))
data= {"items":[{"id": key1 ,"name":"John","age":30,"city":"Morisplane","PartitionKey" : "Morisplane" },{"id": key2,"name":"John","age":30,"city":"Morisplane","partitionKey" : "Morisplane"}] , "city": "Morisplane", "partitionKey" : "Morisplane"}
print(repr(data))
#retrieved_sproc3 =client.DeleteStoredProcedure('dbs/db2/colls/coll2/sprocs/bulkimport2')
sproc3 = {
'id': 'bulkimport2',
'body': (
"""function bulkimport2(docObject) {
var collection = getContext().getCollection();
var collectionLink = collection.getSelfLink();
// The count of imported docs, also used as current doc index.
var count = 0;
getContext().getResponse().setBody(docObject.items);
//return
// Validate input.
//if (!docObject.items || !docObject.items.length) getContext().getResponse().setBody(docObject);
docObject.items=JSON.stringify(docObject.items)
docObject.items = docObject.items.replace("\\r", "");
docObject.items = docObject.items.replace("\\n", "");
var docs = JSON.parse(docObject.items);
var docsLength = docObject.items.length;
if (docsLength == 0) {
getContext().getResponse().setBody(0);
return;
}
// Call the CRUD API to create a document.
tryCreate(docs[count], callback, collectionLink,count);
// Note that there are 2 exit conditions:
// 1) The createDocument request was not accepted.
// In this case the callback will not be called, we just call setBody and we are done.
// 2) The callback was called docs.length times.
// In this case all documents were created and we don't need to call tryCreate anymore. Just call setBody and we are done.
function tryCreate(doc, callback, collectionLink,count ) {
doc=JSON.stringify(doc);
if (typeof doc == "undefined") {
getContext().getResponse().setBody(count);
return ;
} else {
doc = doc.replace("\r", "");
doc = doc.replace("\n", "");
doc=JSON.parse(doc);
}
getContext().getResponse().setBody(doc);
return
var isAccepted = collection.upsertDocument(collectionLink, doc, callback);
// If the request was accepted, callback will be called.
// Otherwise report current count back to the client,
// which will call the script again with remaining set of docs.
// This condition will happen when this stored procedure has been running too long
// and is about to get cancelled by the server. This will allow the calling client
// to resume this batch from the point we got to before isAccepted was set to false
if (!isAccepted) {
getContext().getResponse().setBody(count);
}
}
// This is called when collection.createDocument is done and the document has been persisted.
function callback(err, doc, options) {
if (err) throw getContext().getResponse().setBody(err + doc);
// One more document has been inserted, increment the count.
count++;
if (count >= docsLength) {
// If we have created all documents, we are done. Just set the response.
getContext().getResponse().setBody(count);
return ;
} else {
// Create next document.
tryCreate(docs[count], callback, collectionLink,count);
}
}
}"""
)
}
#retrieved_sproc3 = client.CreateStoredProcedure(collection_link,sproc3)
bulkloadresult = client.ExecuteStoredProcedure('dbs/db2/colls/coll2/sprocs/bulkimport2', data , {"partitionKey" : "Morisplane"} )
print(repr(bulkloadresult))
private async Task<T> ExecuteDataUpload<T>(IEnumerable<object> data,PartitionKey partitionKey)
{
using (var client = new DocumentClient(m_endPointUrl, m_authKey, connPol))
{
while (true)
{
try
{
var result = await client.ExecuteStoredProcedureAsync<T>(m_spSelfLink, new RequestOptions { PartitionKey = partitionKey }, data);
return result;
}
catch (DocumentClientException ex)
{
if (429 == (int)ex.StatusCode)
{
Thread.Sleep(ex.RetryAfter);
continue;
}
if (HttpStatusCode.RequestTimeout == ex.StatusCode)
{
Thread.Sleep(ex.RetryAfter);
continue;
}
throw ex;
}
catch (Exception)
{
Thread.Sleep(TimeSpan.FromSeconds(1));
continue;
}
}
}
}
public async Task uploadData(IEnumerable<object> data, string partitionKey)
{
int groupSize = 600;
int dataSize = data.Count();
int chunkSize = dataSize > groupSize ? groupSize : dataSize;
List<Task> uploadTasks = new List<Task>();
while (dataSize > 0)
{
IEnumerable<object> chunkData = data.Take(chunkSize);
object[] taskData = new object[3];
taskData[0] = chunkData;
taskData[1] = chunkSize;
taskData[2] = partitionKey;
uploadTasks.Add(Task.Factory.StartNew(async (arg) =>
{
object[] reqdData = (object[])arg;
int chunkSizes = (int)reqdData[1];
IEnumerable<object> chunkDatas = (IEnumerable<object>)reqdData[0];
var partKey = new PartitionKey((string)reqdData[2]);
int chunkDatasCount = chunkDatas.Count();
while (chunkDatasCount > 0)
{
int insertedCount = await ExecuteDataUpload<int>(chunkDatas, partKey);
chunkDatas = chunkDatas.Skip(insertedCount);
chunkDatasCount = chunkDatasCount - insertedCount;
}
}, taskData));
data = data.Skip(chunkSize);
dataSize = dataSize - chunkSize;
chunkSize = dataSize > groupSize ? groupSize : dataSize;
}
await Task.WhenAll(uploadTasks);
}
现在调用 uploadData 与您要上传的对象列表并行。请记住一件事,只发送类似 Partitionkey 的数据。
Cosmos Db 团队刚刚发布了一个批量导入和更新 SDK,遗憾的是它只在 Framework 4.5.1 中可用,但这显然为您做了很多繁重的工作并最大限度地利用了吞吐量。见
https://docs.microsoft.com/en-us/azure/cosmos-db/bulk-executor-overview
https://docs.microsoft.com/en-us/azure/cosmos-db/sql-api-sdk-bulk-executor-dot-net
Cosmos DB SDK 已更新为允许批量插入:https://docs.microsoft.com/en-us/azure/cosmos-db/tutorial-sql-api-dotnet-bulk-import 通过 AllowBulkExecution 选项。
如标题所示,我需要以编程方式将 100,000 多条记录插入 DocumentDb collection。这些数据将用于稍后创建报告。我正在使用 Azure Documents SDK 和存储过程来批量插入文档(参见问题 Azure documentdb bulk insert using stored procedure)。
以下控制台应用程序显示了我如何插入文档。
InsertDocuments 生成 500 个测试文档以传递给存储过程。 main函数调用了10次InsertDocuments,总共插入了5000个文档。 运行 此应用程序每隔几秒就会插入 500 个文档。如果我增加每次调用的文档数量,我就会开始出现错误和丢失文档。
谁能推荐一种更快的插入文档的方法?
static void Main(string[] args)
{
Console.WriteLine("Starting...");
MainAsync().Wait();
}
static async Task MainAsync()
{
int campaignId = 1001,
count = 500;
for (int i = 0; i < 10; i++)
{
await InsertDocuments(campaignId, (count * i) + 1, (count * i) + count);
}
}
static async Task InsertDocuments(int campaignId, int startId, int endId)
{
using (DocumentClient client = new DocumentClient(new Uri(documentDbUrl), documentDbKey))
{
List<dynamic> items = new List<dynamic>();
// Create x number of documents to insert
for (int i = startId; i <= endId; i++)
{
var item = new
{
id = Guid.NewGuid(),
campaignId = campaignId,
userId = i,
status = "Pending"
};
items.Add(item);
}
var task = client.ExecuteStoredProcedureAsync<dynamic>("/dbs/default/colls/campaignusers/sprocs/bulkImport", new RequestOptions()
{
PartitionKey = new PartitionKey(campaignId)
},
new
{
items = items
});
try
{
await task;
int insertCount = (int)task.Result.Response;
Console.WriteLine("{0} documents inserted...", insertCount);
}
catch (Exception e)
{
Console.WriteLine("Error: {0}", e.Message);
}
}
}
将文档插入 Azure DocumentDB 的最快方法。在 Github 上作为示例提供:https://github.com/Azure/azure-documentdb-dotnet/tree/master/samples/documentdb-benchmark
以下提示将帮助您使用 .NET SDK 实现最佳吞吐量:
- 初始化单例 DocumentClient
- 使用直接连接和 TCP 协议(
ConnectionMode.Direct
和ConnectionProtocol.Tcp
) - 并行使用 100 个任务(取决于您的硬件)
- 将 DocumentClient 构造函数中的
MaxConnectionLimit
增加到一个较高的值,比如 1000 个连接 - 开启
gcServer
- 确保您的集合具有适当的配置吞吐量(和良好的分区键)
- 运行 在同一个 Azure 区域中也会有帮助
有了 10,000 RU/s,您可以在大约 50 秒内插入 100,000 个文档(每次写入大约 5 个请求单元)。
100,000 RU/s,您可以在大约 5 秒内插入。您可以通过配置吞吐量(对于非常高的插入数量,将插入分散到多个 VMs/workers)
来尽可能快地完成此操作编辑: 您现在可以在 https://docs.microsoft.com/en-us/azure/cosmos-db/bulk-executor-overview、7/12/19
使用批量执行程序库其他方法是其他人提到的存储过程。存储过程需要分区键。此外,根据文档,存储过程应在 4 秒内结束,否则所有记录都将回滚。使用 python azure documentdb sdk 和基于 javascript 的存储过程查看下面的代码。我已经修改了脚本并解决了下面代码工作正常的很多错误:-
function bulkimport2(docObject) {
var collection = getContext().getCollection();
var collectionLink = collection.getSelfLink();
// The count of imported docs, also used as current doc index.
var count = 0;
getContext().getResponse().setBody(docObject.items);
//return
// Validate input.
//if (!docObject.items || !docObject.items.length) getContext().getResponse().setBody(docObject);
docObject.items=JSON.stringify(docObject.items)
docObject.items = docObject.items.replace("\\r", "");
docObject.items = docObject.items.replace("\\n", "");
var docs = JSON.parse(docObject.items);
var docsLength = docObject.items.length;
if (docsLength == 0) {
getContext().getResponse().setBody(0);
return;
}
// Call the CRUD API to create a document.
tryCreate(docs[count], callback, collectionLink,count);
// Note that there are 2 exit conditions:
// 1) The createDocument request was not accepted.
// In this case the callback will not be called, we just call setBody and we are done.
// 2) The callback was called docs.length times.
// In this case all documents were created and we don't need to call tryCreate anymore. Just call setBody and we are done.
function tryCreate(doc, callback, collectionLink,count ) {
doc=JSON.stringify(doc);
if (typeof doc == "undefined") {
getContext().getResponse().setBody(count);
return ;
} else {
doc = doc.replace("\r", "");
doc = doc.replace("\n", "");
doc=JSON.parse(doc);
}
getContext().getResponse().setBody(doc);
var isAccepted = collection.upsertDocument(collectionLink, doc, callback);
// If the request was accepted, callback will be called.
// Otherwise report current count back to the client,
// which will call the script again with remaining set of docs.
// This condition will happen when this stored procedure has been running too long
// and is about to get cancelled by the server. This will allow the calling client
// to resume this batch from the point we got to before isAccepted was set to false
if (!isAccepted) {
getContext().getResponse().setBody(count);
}
}
// This is called when collection.createDocument is done and the document has been persisted.
function callback(err, doc, options) {
if (err) throw getContext().getResponse().setBody(err + doc);
// One more document has been inserted, increment the count.
count++;
if (count >= docsLength) {
// If we have created all documents, we are done. Just set the response.
getContext().getResponse().setBody(count);
return ;
} else {
// Create next document.
tryCreate(docs[count], callback, collectionLink,count);
}
}
}
编辑:- getContext().getResponse().setBody(count); return; //当所有记录都被处理完。
python 加载存储过程和批量导入的脚本
# Initialize the Python DocumentDB client
client = document_client.DocumentClient(config['ENDPOINT'], {'masterKey': config['MASTERKEY'] ,'DisableSSLVerification' : 'true' })
# Create a database
#db = client.CreateDatabase({ 'id': config['DOCUMENTDB_DATABASE'] })
db=client.ReadDatabases({ 'id': 'db2' })
print(db)
# Create collection options
options = {
'offerEnableRUPerMinuteThroughput': True,
'offerVersion': "V2",
'offerThroughput': 400
}
# Create a collection
#collection = client.CreateCollection('dbs/db2' , { 'id': 'coll2'}, options)
#collection = client.CreateCollection({ 'id':'db2'},{ 'id': 'coll2'}, options)
database_link = 'dbs/db2'
collection_link = database_link + '/colls/coll2'
"""
#List collections
collection = client.ReadCollection(collection_link)
print(collection)
print('Databases:')
databases = list(client.ReadDatabases())
if not databases:
print('No Databases:')
for database in databases:
print(database['id'])
"""
# Create some documents
"""
document1 = client.CreateDocument(collection['_self'],
{
'Web Site': 0,
'Cloud Service': 0,
'Virtual Machine': 0,
'name': 'some'
})
document2 = client.CreateDocument(collection['_self'],
{
'Web Site': 1,
'Cloud Service': 0,
'Virtual Machine': 0,
'name': 'some'
})
"""
# Query them in SQL
"""
query = { 'query': 'SELECT * FROM server s' }
options = {}
options['enableCrossPartitionQuery'] = True
options['maxItemCount'] = 20
#result_iterable = client.QueryDocuments(collection['_self'], query, options)
result_iterable = client.QueryDocuments(collection_link, query, options)
results = list(result_iterable);
print(results)
"""
##How to store procedure and use it
"""
sproc3 = {
'id': 'storedProcedure2',
'body': (
'function (input) {' +
' getContext().getResponse().setBody(' +
' \'a\' + input.temp);' +
'}')
}
retrieved_sproc3 = client.CreateStoredProcedure(collection_link,sproc3)
result = client.ExecuteStoredProcedure('dbs/db2/colls/coll2/sprocs/storedProcedure3',{'temp': 'so'})
"""
## delete all records in collection
"""
result = client.ExecuteStoredProcedure('dbs/db2/colls/coll2/sprocs/bulkDeleteSproc',"SELECT * FROM c ORDER BY c._ts DESC ")
print(result)
"""
multiplerecords="""[{
"Virtual Machine": 0,
"name": "some",
"Web Site": 0,
"Cloud Service": 0
},
{
"Virtual Machine": 0,
"name": "some",
"Web Site": 1,
"Cloud Service": 0
}]"""
multiplerecords=json.loads(multiplerecords)
print(multiplerecords)
print(str(json.dumps(json.dumps(multiplerecords).encode('utf8'))))
#bulkloadresult = client.ExecuteStoredProcedure('dbs/db2/colls/coll2/sprocs/bulkImport',json.dumps(multiplerecords).encode('utf8'))
#bulkloadresult = client.ExecuteStoredProcedure('dbs/db2/colls/coll2/sprocs/bulkImport',json.dumps(json.loads(r'{"items": [{"name":"John","age":30,"city":"New York"},{"name":"John","age":30,"city":"New York"}]}')).encode('utf8'))
str1='{name":"John","age":30,"city":"New York","PartitionKey" : "Morisplane"}'
str2='{name":"John","age":30,"city":"New York","partitionKey" : "Morisplane"}'
key1=base64.b64encode(str1.encode("utf-8"))
key2=base64.b64encode(str2.encode("utf-8"))
data= {"items":[{"id": key1 ,"name":"John","age":30,"city":"Morisplane","PartitionKey" : "Morisplane" },{"id": key2,"name":"John","age":30,"city":"Morisplane","partitionKey" : "Morisplane"}] , "city": "Morisplane", "partitionKey" : "Morisplane"}
print(repr(data))
#retrieved_sproc3 =client.DeleteStoredProcedure('dbs/db2/colls/coll2/sprocs/bulkimport2')
sproc3 = {
'id': 'bulkimport2',
'body': (
"""function bulkimport2(docObject) {
var collection = getContext().getCollection();
var collectionLink = collection.getSelfLink();
// The count of imported docs, also used as current doc index.
var count = 0;
getContext().getResponse().setBody(docObject.items);
//return
// Validate input.
//if (!docObject.items || !docObject.items.length) getContext().getResponse().setBody(docObject);
docObject.items=JSON.stringify(docObject.items)
docObject.items = docObject.items.replace("\\r", "");
docObject.items = docObject.items.replace("\\n", "");
var docs = JSON.parse(docObject.items);
var docsLength = docObject.items.length;
if (docsLength == 0) {
getContext().getResponse().setBody(0);
return;
}
// Call the CRUD API to create a document.
tryCreate(docs[count], callback, collectionLink,count);
// Note that there are 2 exit conditions:
// 1) The createDocument request was not accepted.
// In this case the callback will not be called, we just call setBody and we are done.
// 2) The callback was called docs.length times.
// In this case all documents were created and we don't need to call tryCreate anymore. Just call setBody and we are done.
function tryCreate(doc, callback, collectionLink,count ) {
doc=JSON.stringify(doc);
if (typeof doc == "undefined") {
getContext().getResponse().setBody(count);
return ;
} else {
doc = doc.replace("\r", "");
doc = doc.replace("\n", "");
doc=JSON.parse(doc);
}
getContext().getResponse().setBody(doc);
return
var isAccepted = collection.upsertDocument(collectionLink, doc, callback);
// If the request was accepted, callback will be called.
// Otherwise report current count back to the client,
// which will call the script again with remaining set of docs.
// This condition will happen when this stored procedure has been running too long
// and is about to get cancelled by the server. This will allow the calling client
// to resume this batch from the point we got to before isAccepted was set to false
if (!isAccepted) {
getContext().getResponse().setBody(count);
}
}
// This is called when collection.createDocument is done and the document has been persisted.
function callback(err, doc, options) {
if (err) throw getContext().getResponse().setBody(err + doc);
// One more document has been inserted, increment the count.
count++;
if (count >= docsLength) {
// If we have created all documents, we are done. Just set the response.
getContext().getResponse().setBody(count);
return ;
} else {
// Create next document.
tryCreate(docs[count], callback, collectionLink,count);
}
}
}"""
)
}
#retrieved_sproc3 = client.CreateStoredProcedure(collection_link,sproc3)
bulkloadresult = client.ExecuteStoredProcedure('dbs/db2/colls/coll2/sprocs/bulkimport2', data , {"partitionKey" : "Morisplane"} )
print(repr(bulkloadresult))
private async Task<T> ExecuteDataUpload<T>(IEnumerable<object> data,PartitionKey partitionKey)
{
using (var client = new DocumentClient(m_endPointUrl, m_authKey, connPol))
{
while (true)
{
try
{
var result = await client.ExecuteStoredProcedureAsync<T>(m_spSelfLink, new RequestOptions { PartitionKey = partitionKey }, data);
return result;
}
catch (DocumentClientException ex)
{
if (429 == (int)ex.StatusCode)
{
Thread.Sleep(ex.RetryAfter);
continue;
}
if (HttpStatusCode.RequestTimeout == ex.StatusCode)
{
Thread.Sleep(ex.RetryAfter);
continue;
}
throw ex;
}
catch (Exception)
{
Thread.Sleep(TimeSpan.FromSeconds(1));
continue;
}
}
}
}
public async Task uploadData(IEnumerable<object> data, string partitionKey)
{
int groupSize = 600;
int dataSize = data.Count();
int chunkSize = dataSize > groupSize ? groupSize : dataSize;
List<Task> uploadTasks = new List<Task>();
while (dataSize > 0)
{
IEnumerable<object> chunkData = data.Take(chunkSize);
object[] taskData = new object[3];
taskData[0] = chunkData;
taskData[1] = chunkSize;
taskData[2] = partitionKey;
uploadTasks.Add(Task.Factory.StartNew(async (arg) =>
{
object[] reqdData = (object[])arg;
int chunkSizes = (int)reqdData[1];
IEnumerable<object> chunkDatas = (IEnumerable<object>)reqdData[0];
var partKey = new PartitionKey((string)reqdData[2]);
int chunkDatasCount = chunkDatas.Count();
while (chunkDatasCount > 0)
{
int insertedCount = await ExecuteDataUpload<int>(chunkDatas, partKey);
chunkDatas = chunkDatas.Skip(insertedCount);
chunkDatasCount = chunkDatasCount - insertedCount;
}
}, taskData));
data = data.Skip(chunkSize);
dataSize = dataSize - chunkSize;
chunkSize = dataSize > groupSize ? groupSize : dataSize;
}
await Task.WhenAll(uploadTasks);
}
现在调用 uploadData 与您要上传的对象列表并行。请记住一件事,只发送类似 Partitionkey 的数据。
Cosmos Db 团队刚刚发布了一个批量导入和更新 SDK,遗憾的是它只在 Framework 4.5.1 中可用,但这显然为您做了很多繁重的工作并最大限度地利用了吞吐量。见
https://docs.microsoft.com/en-us/azure/cosmos-db/bulk-executor-overview https://docs.microsoft.com/en-us/azure/cosmos-db/sql-api-sdk-bulk-executor-dot-net
Cosmos DB SDK 已更新为允许批量插入:https://docs.microsoft.com/en-us/azure/cosmos-db/tutorial-sql-api-dotnet-bulk-import 通过 AllowBulkExecution 选项。