拆分一个进程以等待 NodeJS 上的事件

Split a process to wait for event on NodeJS

我正在编写一个方法,使用 web3js 通过智能合约传输令牌。

当你启动转账事件时,你会得到 txHash 作为结果,如果你想获得与转账相关的所有其他值,你必须订阅一个事件并等待它发生值。

我必须 return 客户的价值,所以我订阅了传输事件并等待它广播到 return 数据。

问题是这可能需要很长时间(从 10 秒到几小时不等),有时会超时,所以前端团队建议通知我一个 webhook 端点,我将事件信息转发给它当它发生时。

所以我必须把这个过程分成两个:

  1. 进行转账并通知 txHash,并启动一个单独的进程 (2) 来侦听事件。

  2. 订阅事件,并在事件发生时将其转发到提供的 webhook。

我的代码现在看起来像这样:

function transfer(req, res, next) {

    try {

            contractRouter.transfer(from, to, tokenId).then(function(result){
                transferToWebhook(whHostname, whPath, result);
                next();
            }).fail(function(err){
                return res.status(500).json({status: 'error', name: err.name, message: err.message});
    }
    catch (ex) {
        return res.status(500).json({status: 'error', name: ex.name, message: ex.message});
    }
}

传输到 webhook 的函数如下所示:

function transferToWebhook(whHostname, whPath, txHash){
    contractRouter.getTransferEvent(txHash).then(function(result){

        var postData = JSON.stringify(result);
        var options = {
            hostname: whHostname,
            port: 80,
            path: whPath,
            method: 'POST',
            headers: {
                'Content-Type': 'application/json',
            }
        }
            var req = https.request(options, (res) => {
              console.log(`STATUS: ${res.statusCode}`);
              console.log(`HEADERS: ${JSON.stringify(res.headers)}`);
              res.setEncoding('utf8');
              res.on('data', (chunk) => {
                console.log(`BODY: ${chunk}`);
              });
              res.on('end', () => {
                console.log('No more data in response.');
              });
            });

            req.on('error', (e) => {
              console.log(`problem with request: ${e.message}`);
            });

            req.write(postData);
            req.end();
    });
}

订阅转账事件的函数如下所示:

function getTransferEvent(txHash){
    var deferred = q.defer();

    ethereumHandler.setContract(config.get(cABIAddress), cAddress).then(function(abiContract){

        Promise.resolve(txHash).then(function resolver(txHash){

            abiContract.getPastEvents('Transfer',{filter: {transactionHash: txHash}}, function(error, events){})
            .then(function(event){
                var returnValues = {
                    from: event.returnValues.from,
                    to: event.returnValues.to,
                    tokenId: event.returnValues.tokenId
                }
                deferred.resolve(returnValues);
            });
        });
    
    return deferred.promise;
    });
}

最后一个函数的代码如果我直接放在传输函数上就可以工作,但是如果我尝试通过 transferToWebhook 函数调用它就不会被调用。

如何在响应第一个请求后启动 transferToWebhook 函数?

您可以使用 child_process 模块中的 spawn() 方法生成您的进程,然后监听事件 (process.on('data')) 并使用承诺使用 returned 数据。我不确定它是否会解决你的问题,因为你的函数是 contractRouter contractRouter.getTransferEvent(txHash) 的对象,但你应该能够以某种方式对其进行调整。请参阅我的意思的示例。

在你的 file.js

const { spawn } = require('child_process')

function transfertToWebHook(data) {
    getTransfertEvent(data)
        .then((result) => {
            const dts = JSON.parse(result)
            console.log('the res: ', dts)
            // send the res back
        })
        .catch(e => console.log('handle the err: ', e))
   
       console.log('carry on mother process')
}


function getTransfertEvent(data) {
    return new Promise((resolve, reject) => {
        const sp = spawn(process.execPath, ['childProcess.js'])
        // pass the arguments, here it will be the txHash
        sp.stdin.write(data)
        sp.stdin.end()
        sp.stdout.on('data', (d) => {
            // when datas get proceed you get it back.
            resolve(d.toString())
        })
        sp.stdout.on('error', (e) => {
            reject(e)
        })


        console.log('run what ever need to be proceed on the mother process')
    })
}
transfertToWebHook('test')

创建另一个文件名 childProcess.js。

使用转换流处理 process.sdtin 数据,然后通过 process.stdout

return 它们
const { Transform, pipeline } = require('stream')

const createT = () => {
    return new Transform({
        transform(chunk, enc, next) {
            // here run the code of you getTransfertEventFunction()
            
            // simulate some async process
            setTimeout(() => {
                // chunk is your passed arguments
                // !! chunk is a buffer so encode it as utf8 using 'toString()'
                // make it upperCase to simulate some changement
                const dt = chunk.toString().toUpperCase()

                // return an object as it's what your func return
                const proceededDatas = {
                                name: dt,
                                from: "from datas",
                                to: "to datas",
                                tokenId: "the token"
                        }
                const dataString = JSON.stringify(proceededDatas)
                next(null, dataString)
            }, 1000)
        }
    })
}
pipeline(process.stdin, createT(), process.stdout, e => console.log(e))

运行代码:节点file.js