Async/await 事件流 mapSync 不工作
Async/await with event-stream mapSync not working
我用 //******This await does not work */
评论的 await
命令似乎不起作用。不确定这是否与它们在事件流中或导入模块中的承诺问题有关。
当我从映射数组中调用 run
函数以插入来自多个源的数据时,run
函数 returns 立即而不是等到 knex
完成插入数据。
app.get("/api/pull/all_data", (req, res)=>{
const dataSources = [
{resource: 'DI_ORDER_TYPE', tableName: 'ln_order_type'},
{resource: 'DI_DATES', tableName: 'ln_dates'},
{resource: 'WHINR140_INVENTORY', tableName: 'ln_inventory'}
]
dataSources.map(async (ds)=>{
console.log(`Importing ${ds.resource} into table ${ds.tableName}`)
await get_all_data(ds.tableName, ds.resource)
})
console.log("Import complete")
})
这是我的 run
函数,它是从上面的代码调用的。
const request = require('request')
const JSONStream = require('JSONStream')
const es = require('event-stream')
const knex_ln = require('../knex_ln')
const insertData = require('../insert_data')
const create_table = require('../create_table_from_json.js')
const clean_fieldnames = require('../clean_fieldnames')
function run(tableName, resourceName) {
return new Promise(async (resolve, reject)=>{
let tableData = []
let recordCount = 0
let maxRecords = 10000
let totalRecords = 0
// let tableName = 'LN_di_order_type'
// let resourceName = 'DI_ORDER_TYPE'
let rowData = {}
//Delete Existing Data and wait for it to complete
await knex_ln.schema.hasTable(tableName).then(async (exists)=>{
if(exists){
try {
await knex_ln(tableName).delete().then(()=>{})
} catch (error) {
}
}
})
//Get LN replica data and pipe data into JSONStream
request(`${process.env.API_BASE_URL}/${process.env.SECURITY_NAME}/${resourceName}`,
{
auth: {
'user': process.env.API_USER,
'pass': process.env.API_PASS
}
}
)
.pipe(JSONStream.parse([true, {recurse: true}, `${process.env.SECURITY_NAME}.row`, true]))
.pipe(es.mapSync(async (row)=>{
rowData = row
let cleanData = await clean_fieldnames(row)
tableData.push(cleanData)
recordCount += 1
totalRecords += 1
if(recordCount >= maxRecords){
try {
//******This await does not work */
await create_table(tableName, row)
} catch (error) {
console.log("Unable to create table", error)
}
//Insert records
try {
//******This await does not work */
await insertData(tableName, tableData)
console.log(`inserting ${recordCount} records into table ${tableName}`)
} catch (error) {
console.log("Unable to insert data: ", error)
}
//Reset tracker variables
recordCount = 0
tableData = []
}
}))
.on('end', async ()=>{
await create_table(tableName, rowData)
await insertData(tableName, tableData)
console.log(`Inserted ${totalRecords} into table ${tableName}`)
resolve('OK')
})
.on('error',(err)=>{
reject(err)
})
})
}
module.exports = run
这是我的模块文件,其中 returns 一个承诺
//insert_data.js
const knex_ln = require('./knex_ln')
module.exports = async (tableName, tableData) =>
new Promise(async (resolve, reject) => {
try {
await knex_ln(tableName).insert(tableData)
console.log("Inserting Data: ", tableData.length)
resolve()
} catch (error) {
console.log("Error inserting data: ", err)
reject(err)
}
})
这是一个输出示例
Importing DI_ORDER_TYPE into table ln_order_type
Importing DI_DATES into table ln_dates
Importing WHINR140_INVENTORY into table ln_inventory
Importing WHWMD210_WAREHOUSE_ITEM_DATA into table ln_warehouse_item_data
Importing TDIPU010_ITEM_BUY_FROM_BP_INFORMATION into table ln_item_buy_from_bp_information
Importing TDIPU001_ITEM_PURCHASE_DATA into table ln_item_purchase_data
Importing TDPCG031_PRICE_BOOKS into table ln_price_books
Importing TDPUR300_PURCHASE_CONTRACTS into table ln_purchase_contracts
Importing TDPUR301_PURCHASE_CONTRACT_LINES into table ln_purchase_contract_lines
Inserted 72 records into table ln_order_type
Inserted 217 records into table ln_purchase_contracts
inserting 10000 records into table ln_inventory
Inserted 4694 records into table ln_purchase_contract_lines
inserting 10000 records into table ln_item_buy_from_bp_information
inserting 10000 records into table ln_dates
inserting 10000 records into table ln_inventory
inserting 10000 records into table ln_price_books
inserting 10000 records into table ln_item_purchase_data
inserting 10000 records into table ln_inventory
inserting 10000 records into table ln_price_books
inserting 10000 records into table ln_dates
inserting 10000 records into table ln_inventory
inserting 10000 records into table ln_price_books
inserting 10000 records into table ln_item_purchase_data
您只是在等待 map
中的内部函数,而不是顶级函数。
顶层函数需要加await:
await Promise.all(dataSources.map(async (ds)=> {
console.log(`Importing ${ds.resource} into table ${ds.tableName}`)
await get_all_data(ds.tableName, ds.resource)
}));
否则,您只是在内部函数中等待,而不是在路由处理程序本身中等待。
我的解决方案是使用 bluebird Promise.each
这将处理数组数据源中的每一项,并在处理列表中的下一项之前等待 return 的承诺。
Promise.each(dataSources, function(ds){
....
}).then(()=>{
....
})
我实现了自己的可写实现。我可以在回调函数的帮助下按顺序控制调用。只有当从上一次迭代收到回调时,才会处理下一次迭代。我无法使用 event-stream 地图回调
实现此目的
引用https://nodejs.org/api/stream.html#stream_simplified_construction
readStream.pipe(eventStream.split())
.pipe(
new Writable({
write : async (record, encoding,callback)=>{
await saveToDatabase(record);
callback();
}
})
)
我用 //******This await does not work */
评论的 await
命令似乎不起作用。不确定这是否与它们在事件流中或导入模块中的承诺问题有关。
当我从映射数组中调用 run
函数以插入来自多个源的数据时,run
函数 returns 立即而不是等到 knex
完成插入数据。
app.get("/api/pull/all_data", (req, res)=>{
const dataSources = [
{resource: 'DI_ORDER_TYPE', tableName: 'ln_order_type'},
{resource: 'DI_DATES', tableName: 'ln_dates'},
{resource: 'WHINR140_INVENTORY', tableName: 'ln_inventory'}
]
dataSources.map(async (ds)=>{
console.log(`Importing ${ds.resource} into table ${ds.tableName}`)
await get_all_data(ds.tableName, ds.resource)
})
console.log("Import complete")
})
这是我的 run
函数,它是从上面的代码调用的。
const request = require('request')
const JSONStream = require('JSONStream')
const es = require('event-stream')
const knex_ln = require('../knex_ln')
const insertData = require('../insert_data')
const create_table = require('../create_table_from_json.js')
const clean_fieldnames = require('../clean_fieldnames')
function run(tableName, resourceName) {
return new Promise(async (resolve, reject)=>{
let tableData = []
let recordCount = 0
let maxRecords = 10000
let totalRecords = 0
// let tableName = 'LN_di_order_type'
// let resourceName = 'DI_ORDER_TYPE'
let rowData = {}
//Delete Existing Data and wait for it to complete
await knex_ln.schema.hasTable(tableName).then(async (exists)=>{
if(exists){
try {
await knex_ln(tableName).delete().then(()=>{})
} catch (error) {
}
}
})
//Get LN replica data and pipe data into JSONStream
request(`${process.env.API_BASE_URL}/${process.env.SECURITY_NAME}/${resourceName}`,
{
auth: {
'user': process.env.API_USER,
'pass': process.env.API_PASS
}
}
)
.pipe(JSONStream.parse([true, {recurse: true}, `${process.env.SECURITY_NAME}.row`, true]))
.pipe(es.mapSync(async (row)=>{
rowData = row
let cleanData = await clean_fieldnames(row)
tableData.push(cleanData)
recordCount += 1
totalRecords += 1
if(recordCount >= maxRecords){
try {
//******This await does not work */
await create_table(tableName, row)
} catch (error) {
console.log("Unable to create table", error)
}
//Insert records
try {
//******This await does not work */
await insertData(tableName, tableData)
console.log(`inserting ${recordCount} records into table ${tableName}`)
} catch (error) {
console.log("Unable to insert data: ", error)
}
//Reset tracker variables
recordCount = 0
tableData = []
}
}))
.on('end', async ()=>{
await create_table(tableName, rowData)
await insertData(tableName, tableData)
console.log(`Inserted ${totalRecords} into table ${tableName}`)
resolve('OK')
})
.on('error',(err)=>{
reject(err)
})
})
}
module.exports = run
这是我的模块文件,其中 returns 一个承诺
//insert_data.js
const knex_ln = require('./knex_ln')
module.exports = async (tableName, tableData) =>
new Promise(async (resolve, reject) => {
try {
await knex_ln(tableName).insert(tableData)
console.log("Inserting Data: ", tableData.length)
resolve()
} catch (error) {
console.log("Error inserting data: ", err)
reject(err)
}
})
这是一个输出示例
Importing DI_ORDER_TYPE into table ln_order_type
Importing DI_DATES into table ln_dates
Importing WHINR140_INVENTORY into table ln_inventory
Importing WHWMD210_WAREHOUSE_ITEM_DATA into table ln_warehouse_item_data
Importing TDIPU010_ITEM_BUY_FROM_BP_INFORMATION into table ln_item_buy_from_bp_information
Importing TDIPU001_ITEM_PURCHASE_DATA into table ln_item_purchase_data
Importing TDPCG031_PRICE_BOOKS into table ln_price_books
Importing TDPUR300_PURCHASE_CONTRACTS into table ln_purchase_contracts
Importing TDPUR301_PURCHASE_CONTRACT_LINES into table ln_purchase_contract_lines
Inserted 72 records into table ln_order_type
Inserted 217 records into table ln_purchase_contracts
inserting 10000 records into table ln_inventory
Inserted 4694 records into table ln_purchase_contract_lines
inserting 10000 records into table ln_item_buy_from_bp_information
inserting 10000 records into table ln_dates
inserting 10000 records into table ln_inventory
inserting 10000 records into table ln_price_books
inserting 10000 records into table ln_item_purchase_data
inserting 10000 records into table ln_inventory
inserting 10000 records into table ln_price_books
inserting 10000 records into table ln_dates
inserting 10000 records into table ln_inventory
inserting 10000 records into table ln_price_books
inserting 10000 records into table ln_item_purchase_data
您只是在等待 map
中的内部函数,而不是顶级函数。
顶层函数需要加await:
await Promise.all(dataSources.map(async (ds)=> {
console.log(`Importing ${ds.resource} into table ${ds.tableName}`)
await get_all_data(ds.tableName, ds.resource)
}));
否则,您只是在内部函数中等待,而不是在路由处理程序本身中等待。
我的解决方案是使用 bluebird Promise.each
这将处理数组数据源中的每一项,并在处理列表中的下一项之前等待 return 的承诺。
Promise.each(dataSources, function(ds){
....
}).then(()=>{
....
})
我实现了自己的可写实现。我可以在回调函数的帮助下按顺序控制调用。只有当从上一次迭代收到回调时,才会处理下一次迭代。我无法使用 event-stream 地图回调
实现此目的引用https://nodejs.org/api/stream.html#stream_simplified_construction
readStream.pipe(eventStream.split())
.pipe(
new Writable({
write : async (record, encoding,callback)=>{
await saveToDatabase(record);
callback();
}
})
)