将 pickle 缓冲区从 Node 传递到 Python

Pass a pickle buffer from Node to Python

我有一个订阅 JSON 数据流的节点应用程序。我想扩展它以订阅 Python pickle 数据流(我愿意删除或转换非原始类型)。 node-pickle 和 jpickle 包让我失望了。我现在想编写自己的 Python 脚本来将泡菜转换为 JSON。

我摆弄了 node-pickle 源代码以使其部分工作(可以将 JSON 从 Node 传递到 Python 并取回 pickle 字符串,也可以使用预定义的 Python dict 并作为 JSON) 传递给 Node。我的问题是让 Python 将来自 Node 的数据识别为腌制数据。我正在将数据流缓冲区从 Node 传递到 Python 并拼命地尝试将字符串缓冲区参数转换为我可以 pickle.loads 的格式。

经过反复试验,我得到了这个:

main.js

const pickle = require('node-pickle');
const amqp = require('amqplib/callback_api');

amqp.connect(`amqp://${usr}:${pwd}@${url}`, (err, conn) => {
  if (err) {
    console.error(err);
  }
  conn.createChannel((err, ch) => {
    if (err) {
      console.error(err);
    }
    ch.assertExchange(ex, 'fanout', { durable: false });
    ch.assertQueue('', {}, (err, q) => {
      ch.bindQueue(q.queue, ex, '');
      console.log('consuming');
      ch.consume(q.queue, msg => {
        console.log('Received [x]');
        const p = msg.content.toString('base64');
        pickle.loads(p).then(r => console.log('Res:', r));
        // conn.close();
      });
    });
  });
});

index.js(节点泡菜)

const spawn = require('child_process').spawn,
  Bluebird = require('bluebird');

module.exports.loads = function loads(pickle) {
  return new Bluebird((resolve, reject) => {
    const convert = spawn('python', [__dirname + '/convert.py', '--loads']),
      stdout_buffer = [];

    convert.stdout.on('data', function(data) {
      stdout_buffer.push(data);
    });

    convert.on('exit', function(code) {
      const data = stdout_buffer.join('');
      // console.log('buffer toString', stdout_buffer[0] ? stdout_buffer[0].toString() : null);
      if (data == -1) {
        resolve(false);
      } else {
        let result;
        try {
          result = JSON.parse(data);
        } catch (err) {
          console.log('failed parse');
          result = false;
        }
        resolve(result);
      }
    });
    convert.stdin.write(pickle);
    convert.stdin.end();
  });
};

convert.py(节点泡菜)

import sys
try:
    import simplejson as json
except ImportError:
    import json
try:
    import cPickle as pickle
except ImportError:
    import pickle

import codecs
import jsonpickle

def main(argv):
    try:
        if argv[0] == '--loads':
            buffer = sys.stdin.buffer.read()
            decoded = codecs.decode(buffer, 'base64')
            d = pickle.loads(decoded, encoding='latin1')
            j = jsonpickle.encode(d,False)
            sys.stdout.write(j)
        elif argv[0] == '--dumps':
            d = json.loads(argv[1])
            p = pickle.dumps(d)
            sys.stdout.write(str(p))
    except Exception as e:
        print('Error: ' + str(e))
        sys.stdout.write('-1')

if __name__ == '__main__':
    main(sys.argv[1:])

我目前遇到的错误是:

invalid load key, '\xef'

编辑 1: 我现在将缓冲区字符串表示而不是缓冲区发送到 Python。然后我使用 stdin 以字节形式读取它。我开始将 bytes 对象写入一个文件,以便与从 Node 接收到的数据以及当我从 Python 脚本订阅数据流时接收到的缓冲区进行比较。我发现它们似乎是相同的,除了从 Python 订阅时发现的某些 \x.. 序列,当从 Node 订阅时被表示为 \xef\xbf\xbd 。我认为这与字符串编码有关??错误表述序列的一些示例是:\x80(这是 b' 之后的第一个序列;但是 \x80 确实出现在其他地方)、\xe3\x85 .

编辑 2: 我现在已经将要发送到 Python 的字符串编码为 base64,然后在 Python 中使用 codecs.decode 解码标准输入缓冲区。我正在写入文件的缓冲区现在看起来与 Python only 流更加一致,没有更多的 \xef\xbf\xbd 替换。但是,我现在遇到了这个错误:

'ascii' codec can't decode byte 0xe3 in position 1: ordinal not in range(128)

此外,我在尝试匹配每个流的最后 1000 个字符时发现了细微差别。这是 Python 流 (\x0c,'\x023) 中的一个部分,在来自 Node 的流中看起来像这样 (\x0c,\'\x023)。不确定这会对事情产生多大影响。

编辑 3(成功!): 搜索我的新错误后,我找到了这个编码难题的最后一块。由于我在 Python 3 工作,而泡菜来自 Python 2.x,我需要将 pickle.loads 的编码指定为 byteslatin1(我需要的那个)。然后,我能够利用出色的 jsonpickle 包来完成 JSON 序列化字典、将日期时间对象更改为日期字符串的工作。

所以我能够让 node-pickle npm 包工作。我从 Node 获取腌制数据缓冲区到 Python 以返回 JSON 的流程是:

在节点中

  • 将缓冲区编码为 base64 字符串
  • 将字符串作为标准输入而不是参数发送到 Python 子进程

在Python

  • 以字节形式从标准输入中读取缓冲区
  • 使用codecs从base64解码
  • 如果使用 Python 3,请为 pickle.loads
  • 指定 byteslatin1 编码
  • 使用jsonpickle在JSON
  • 中序列化python个对象

在节点中

  • 从 stdout 收集缓冲区并JSON.parse