如何使用 Promises 在 fs.createReadStream/csv-parser 中使用 await/async?

How do I use await/async in fs.createReadStream/csv-parser using Promises?

我尝试实施 this code from Human Who Codes, along with a 以读取其中包含创意 URL 的 CSV,从媒体服务器下载该创意,然后上传创意 Facebook 的 Node.js SDK。但是,我无法让 Promise 链在 Node.js 文件流中工作。

这是我的命令行脚本的参数:

Usage: creative-upload.js --inputFile --outputFile --adAccountId --uploadType --accessToken --creativeColumn --creativeIdColumn --creativeStatusColumn --maxRetries

这是我的脚本:

const csv = require('csv-parser');
const fs = require('fs');
const path = require('path');
const http = require('http');

const options = []; // removed yargs code, not important

// 
async function readStream(stream, encoding = "utf8") {
    stream.setEncoding(encoding);

    return new Promise((resolve, reject) => {
        let data = [];
        
        stream.on("data", chunk => data.push(chunk));
        stream.on("end", () => resolve(data));
        stream.on("error", error => reject(error));
    });
}

// Here we wait for the myfunction to finish
// and then returns a promise that'll be waited for aswell
// It's useless to wait the myfunction to finish before to return
// we can simply returns a promise that will be resolved later

// Also point that we don't use async keyword on the function because
// we can simply returns the promise returned by myfunction
async function start() {
  return await readStream(fs.createReadStream("test.csv").pipe(csv()));
}

// Call start
(async() => {
    console.log('before start');

    startPromise = start()
    .then(data => {
        for (var row of data) {
            console.log(row);
            
            const creative_url = row.creative_url;
            const fileBasename = path.basename(creative_url);
            const file = fs.createWriteStream("/tmp/" + fileBasename);
            const request = http.get(creative_url, function(response) {
              response.pipe(file);
            });

            // https://developers.facebook.com/docs/marketing-api/reference/ad-account/adimages/
            if (uploadType == "image") {
                let content = fs.readFileSync("/tmp/" + fileBasename).toString('base64');
                const adimage = await account.createAdImage([], {
                    bytes: content
                })
                .then(() => {
                    console.log('Uploaded ' + fileBasename + " successfully.");
                })
                .catch((e) => {
                    throw e;
                })
            }
        }
    })
    .catch(e => {
        console.log(e);
    });

  console.log('after start ' + startPromise);
})();

process.exit(1);

这是我得到的错误:

creative-upload.js:109
                const adimage = await account.createAdImage([], {
                                ^^^^^

SyntaxError: await is only valid in async function
    at new Script (vm.js:80:7)
    at createScript (vm.js:274:10)
    at Object.runInThisContext (vm.js:326:10)
    at Module._compile (internal/modules/cjs/loader.js:664:28)
    at Object.Module._extensions..js (internal/modules/cjs/loader.js:712:10)
    at Module.load (internal/modules/cjs/loader.js:600:32)
    at tryModuleLoad (internal/modules/cjs/loader.js:539:12)
    at Function.Module._load (internal/modules/cjs/loader.js:531:3)
    at Function.Module.runMain (internal/modules/cjs/loader.js:754:12)
    at startup (internal/bootstrap/node.js:283:19)

我去掉了 async/await,这是我的新代码:

var 承诺 = [];

fs.createReadStream(inputFile)
.pipe(csv())
.on('data', (row) => {
    console.log(row);
    const creative_url = row['Banner URL'];
    console.log('Creative URL: ' + creative_url);
    
    const fileBasename = path.basename(creative_url);
    console.log('File Basename: ' + fileBasename);

    const file = fs.createWriteStream("/tmp/" + fileBasename);
    const request = http.get(creative_url, function(response) {
        response.pipe(file);
    });
    console.log('Request: ' + request);
    
    // https://developers.facebook.com/docs/marketing-api/reference/ad-account/adimages/
    if (uploadType == "image") {
        let content = fs.readFileSync("/tmp/" + fileBasename);
        if (content == null || content.toString('base64') == '') {
            console.log('ERROR! Could not get base64 content of tmp file.');
            return;
        }
        
        /*{encoding: 'utf8'}, function(err, data) {
            if (err) {
                console.log('Error: ' + error);
                return null;
            } else {
                return data.toString('base64');
            }
        });*/
        
        content = content.toString('base64');
        promises.push(account.createAdImage([], {
            bytes: content
        })
        .then(() => {
            console.log('Uploaded ' + fileBasename + " successfully.");
        })
        .catch((e) => {
            console.log(e);
        }));
    }

    // https://developers.facebook.com/docs/marketing-api/reference/ad-account/advideos/
    else if (uploadType == "video") {
        let content = fs.readFileSync("/tmp/" + fileBasename).toString('base64');
        if (content == null || content.toString('base64') == '') {
            console.log('ERROR! Could not get base64 content of tmp file.');
            return;
        }
        
        content = content.toString('base64');
        promises.push(account.createAdVideo([], {
            bytes: content
        })
        .then(() => {
            console.log('Uploaded ' + fileBasename + " successfully.");
        })
        .catch((e) => {
            console.log(e);
        }));
    }
})
.on('end', () => {
    Promise.all(promises);
    
    console.log('CSV file successfully processed');
});

此代码成功上传了我的 CSV 中的最后两个广告素材。但是,所有其他行都无法维护内容变量,当它到达时该变量为空脸书代码。这是我的输出:

{ 'Banner Name': '...',
  'Banner Size': '728x90',
  'Banner URL':
   'http://s3.amazonaws.com/beta-adv-cdn/jpeg_ads/c62cf4b9-4d86-4613-be15-b6c3b58babba.jpeg' }
Creative URL: http://s3.amazonaws.com/beta-adv-cdn/jpeg_ads/c62cf4b9-4d86-4613-be15-b6c3b58babba.jpeg
File Basename: c62cf4b9-4d86-4613-be15-b6c3b58babba.jpeg
Request: [object Object]
ERROR! Could not get base64 content of tmp file.
{ 'Banner Name': '...,
  'Banner Size': '728x90',
  'Banner URL':
   'http://s3.amazonaws.com/beta-adv-cdn/jpeg_ads/95714da4-0085-4c0c-ba73-0346197c91db.jpeg' }
Creative URL: http://s3.amazonaws.com/beta-adv-cdn/jpeg_ads/95714da4-0085-4c0c-ba73-0346197c91db.jpeg
File Basename: 95714da4-0085-4c0c-ba73-0346197c91db.jpeg
Request: [object Object]
ERROR! Could not get base64 content of tmp file.
CSV file successfully processed
200 POST https://graph.facebook.com/v10.0/...
Uploaded 58748e44-83c7-4283-b090-36ce8dd8070b.jpeg successfully.
200 POST https://graph.facebook.com/v10.0/...
Uploaded dc5e9dcc-c6ab-4cbe-a334-814b4af4c4fa.jpeg successfully.

Facebook Node.JS SDK 上的文档似乎不多,Stack Exchange 上关于它的问题更少,因此我们将不胜感激。

我不确定您在这里期待什么。首先,console.log('after start ' + startPromise);是死代码;它在 return 语句之后。其次,您在不等待的情况下启动异步匿名函数,然后调用 process.exit(1);.

您应该等待自调用函数创建的 Promise 也解决并处理拒绝。沿线的东西:

console.log('before start');

const startPromise = start()
.then(data => {
    for (var row of data) {
        console.log(row);
        
        const creative_url = row.creative_url;
        const fileBasename = path.basename(creative_url);
        const file = fs.createWriteStream("/tmp/" + fileBasename);
        const request = http.get(creative_url, function(response) {
            response.pipe(file);
        });

        // https://developers.facebook.com/docs/marketing-api/reference/ad-account/adimages/
        if (uploadType == "image") {
            let content = fs.readFileSync("/tmp/" + fileBasename).toString('base64');
            // Upload image
        }
    }
})
.catch(e => {
    console.log(e);
});

编辑:正如@Tomalak 在评论中指出的那样,根本不需要异步包装函数。