将 pickle 缓冲区从 Node 传递到 Python
Pass a pickle buffer from Node to Python
我有一个订阅 JSON 数据流的节点应用程序。我想扩展它以订阅 Python pickle 数据流(我愿意删除或转换非原始类型)。 node-pickle 和 jpickle 包让我失望了。我现在想编写自己的 Python 脚本来将泡菜转换为 JSON。
我摆弄了 node-pickle 源代码以使其部分工作(可以将 JSON 从 Node 传递到 Python 并取回 pickle 字符串,也可以使用预定义的 Python dict 并作为 JSON) 传递给 Node。我的问题是让 Python 将来自 Node 的数据识别为腌制数据。我正在将数据流缓冲区从 Node 传递到 Python 并拼命地尝试将字符串缓冲区参数转换为我可以 pickle.loads
的格式。
经过反复试验,我得到了这个:
main.js
const pickle = require('node-pickle');
const amqp = require('amqplib/callback_api');
amqp.connect(`amqp://${usr}:${pwd}@${url}`, (err, conn) => {
if (err) {
console.error(err);
}
conn.createChannel((err, ch) => {
if (err) {
console.error(err);
}
ch.assertExchange(ex, 'fanout', { durable: false });
ch.assertQueue('', {}, (err, q) => {
ch.bindQueue(q.queue, ex, '');
console.log('consuming');
ch.consume(q.queue, msg => {
console.log('Received [x]');
const p = msg.content.toString('base64');
pickle.loads(p).then(r => console.log('Res:', r));
// conn.close();
});
});
});
});
index.js(节点泡菜)
const spawn = require('child_process').spawn,
Bluebird = require('bluebird');
module.exports.loads = function loads(pickle) {
return new Bluebird((resolve, reject) => {
const convert = spawn('python', [__dirname + '/convert.py', '--loads']),
stdout_buffer = [];
convert.stdout.on('data', function(data) {
stdout_buffer.push(data);
});
convert.on('exit', function(code) {
const data = stdout_buffer.join('');
// console.log('buffer toString', stdout_buffer[0] ? stdout_buffer[0].toString() : null);
if (data == -1) {
resolve(false);
} else {
let result;
try {
result = JSON.parse(data);
} catch (err) {
console.log('failed parse');
result = false;
}
resolve(result);
}
});
convert.stdin.write(pickle);
convert.stdin.end();
});
};
convert.py(节点泡菜)
import sys
try:
import simplejson as json
except ImportError:
import json
try:
import cPickle as pickle
except ImportError:
import pickle
import codecs
import jsonpickle
def main(argv):
try:
if argv[0] == '--loads':
buffer = sys.stdin.buffer.read()
decoded = codecs.decode(buffer, 'base64')
d = pickle.loads(decoded, encoding='latin1')
j = jsonpickle.encode(d,False)
sys.stdout.write(j)
elif argv[0] == '--dumps':
d = json.loads(argv[1])
p = pickle.dumps(d)
sys.stdout.write(str(p))
except Exception as e:
print('Error: ' + str(e))
sys.stdout.write('-1')
if __name__ == '__main__':
main(sys.argv[1:])
我目前遇到的错误是:
invalid load key, '\xef'
编辑 1:
我现在将缓冲区字符串表示而不是缓冲区发送到 Python。然后我使用 stdin 以字节形式读取它。我开始将 bytes 对象写入一个文件,以便与从 Node 接收到的数据以及当我从 Python 脚本订阅数据流时接收到的缓冲区进行比较。我发现它们似乎是相同的,除了从 Python 订阅时发现的某些 \x..
序列,当从 Node 订阅时被表示为 \xef\xbf\xbd
。我认为这与字符串编码有关??错误表述序列的一些示例是:\x80
(这是 b'
之后的第一个序列;但是 \x80
确实出现在其他地方)、\xe3
和 \x85
.
编辑 2:
我现在已经将要发送到 Python 的字符串编码为 base64,然后在 Python 中使用 codecs.decode
解码标准输入缓冲区。我正在写入文件的缓冲区现在看起来与 Python only 流更加一致,没有更多的 \xef\xbf\xbd
替换。但是,我现在遇到了这个错误:
'ascii' codec can't decode byte 0xe3 in position 1: ordinal not in range(128)
此外,我在尝试匹配每个流的最后 1000 个字符时发现了细微差别。这是 Python 流 (\x0c,'\x023
) 中的一个部分,在来自 Node 的流中看起来像这样 (\x0c,\'\x023
)。不确定这会对事情产生多大影响。
编辑 3(成功!):
搜索我的新错误后,我找到了这个编码难题的最后一块。由于我在 Python 3 工作,而泡菜来自 Python 2.x,我需要将 pickle.loads
的编码指定为 bytes
或 latin1
(我需要的那个)。然后,我能够利用出色的 jsonpickle 包来完成 JSON 序列化字典、将日期时间对象更改为日期字符串的工作。
所以我能够让 node-pickle npm 包工作。我从 Node 获取腌制数据缓冲区到 Python 以返回 JSON 的流程是:
在节点中
- 将缓冲区编码为 base64 字符串
- 将字符串作为标准输入而不是参数发送到 Python 子进程
在Python
- 以字节形式从标准输入中读取缓冲区
- 使用
codecs
从base64解码
- 如果使用 Python 3,请为
pickle.loads
指定 bytes
或 latin1
编码
- 使用
jsonpickle
在JSON 中序列化python个对象
在节点中
- 从 stdout 收集缓冲区并
JSON.parse
它
我有一个订阅 JSON 数据流的节点应用程序。我想扩展它以订阅 Python pickle 数据流(我愿意删除或转换非原始类型)。 node-pickle 和 jpickle 包让我失望了。我现在想编写自己的 Python 脚本来将泡菜转换为 JSON。
我摆弄了 node-pickle 源代码以使其部分工作(可以将 JSON 从 Node 传递到 Python 并取回 pickle 字符串,也可以使用预定义的 Python dict 并作为 JSON) 传递给 Node。我的问题是让 Python 将来自 Node 的数据识别为腌制数据。我正在将数据流缓冲区从 Node 传递到 Python 并拼命地尝试将字符串缓冲区参数转换为我可以 pickle.loads
的格式。
经过反复试验,我得到了这个:
main.js
const pickle = require('node-pickle');
const amqp = require('amqplib/callback_api');
amqp.connect(`amqp://${usr}:${pwd}@${url}`, (err, conn) => {
if (err) {
console.error(err);
}
conn.createChannel((err, ch) => {
if (err) {
console.error(err);
}
ch.assertExchange(ex, 'fanout', { durable: false });
ch.assertQueue('', {}, (err, q) => {
ch.bindQueue(q.queue, ex, '');
console.log('consuming');
ch.consume(q.queue, msg => {
console.log('Received [x]');
const p = msg.content.toString('base64');
pickle.loads(p).then(r => console.log('Res:', r));
// conn.close();
});
});
});
});
index.js(节点泡菜)
const spawn = require('child_process').spawn,
Bluebird = require('bluebird');
module.exports.loads = function loads(pickle) {
return new Bluebird((resolve, reject) => {
const convert = spawn('python', [__dirname + '/convert.py', '--loads']),
stdout_buffer = [];
convert.stdout.on('data', function(data) {
stdout_buffer.push(data);
});
convert.on('exit', function(code) {
const data = stdout_buffer.join('');
// console.log('buffer toString', stdout_buffer[0] ? stdout_buffer[0].toString() : null);
if (data == -1) {
resolve(false);
} else {
let result;
try {
result = JSON.parse(data);
} catch (err) {
console.log('failed parse');
result = false;
}
resolve(result);
}
});
convert.stdin.write(pickle);
convert.stdin.end();
});
};
convert.py(节点泡菜)
import sys
try:
import simplejson as json
except ImportError:
import json
try:
import cPickle as pickle
except ImportError:
import pickle
import codecs
import jsonpickle
def main(argv):
try:
if argv[0] == '--loads':
buffer = sys.stdin.buffer.read()
decoded = codecs.decode(buffer, 'base64')
d = pickle.loads(decoded, encoding='latin1')
j = jsonpickle.encode(d,False)
sys.stdout.write(j)
elif argv[0] == '--dumps':
d = json.loads(argv[1])
p = pickle.dumps(d)
sys.stdout.write(str(p))
except Exception as e:
print('Error: ' + str(e))
sys.stdout.write('-1')
if __name__ == '__main__':
main(sys.argv[1:])
我目前遇到的错误是:
invalid load key, '\xef'
编辑 1:
我现在将缓冲区字符串表示而不是缓冲区发送到 Python。然后我使用 stdin 以字节形式读取它。我开始将 bytes 对象写入一个文件,以便与从 Node 接收到的数据以及当我从 Python 脚本订阅数据流时接收到的缓冲区进行比较。我发现它们似乎是相同的,除了从 Python 订阅时发现的某些 \x..
序列,当从 Node 订阅时被表示为 \xef\xbf\xbd
。我认为这与字符串编码有关??错误表述序列的一些示例是:\x80
(这是 b'
之后的第一个序列;但是 \x80
确实出现在其他地方)、\xe3
和 \x85
.
编辑 2:
我现在已经将要发送到 Python 的字符串编码为 base64,然后在 Python 中使用 codecs.decode
解码标准输入缓冲区。我正在写入文件的缓冲区现在看起来与 Python only 流更加一致,没有更多的 \xef\xbf\xbd
替换。但是,我现在遇到了这个错误:
'ascii' codec can't decode byte 0xe3 in position 1: ordinal not in range(128)
此外,我在尝试匹配每个流的最后 1000 个字符时发现了细微差别。这是 Python 流 (\x0c,'\x023
) 中的一个部分,在来自 Node 的流中看起来像这样 (\x0c,\'\x023
)。不确定这会对事情产生多大影响。
编辑 3(成功!):
搜索我的新错误后,我找到了这个编码难题的最后一块。由于我在 Python 3 工作,而泡菜来自 Python 2.x,我需要将 pickle.loads
的编码指定为 bytes
或 latin1
(我需要的那个)。然后,我能够利用出色的 jsonpickle 包来完成 JSON 序列化字典、将日期时间对象更改为日期字符串的工作。
所以我能够让 node-pickle npm 包工作。我从 Node 获取腌制数据缓冲区到 Python 以返回 JSON 的流程是:
在节点中
- 将缓冲区编码为 base64 字符串
- 将字符串作为标准输入而不是参数发送到 Python 子进程
在Python
- 以字节形式从标准输入中读取缓冲区
- 使用
codecs
从base64解码 - 如果使用 Python 3,请为
pickle.loads
指定 - 使用
jsonpickle
在JSON 中序列化python个对象
bytes
或 latin1
编码
在节点中
- 从 stdout 收集缓冲区并
JSON.parse
它