如何序列化 apache arrow c++ table,通过套接字传输,并在 python 端反序列化
How to serialize apache arrow c++ table, trans through socket,and deserialize it at python side
我是 apache arrow 的新手,我的 C++ 项目使用 apache::table 来很好地存储数据。
现在,我需要使用套接字将 c++ table 转换为其他 python 客户端。为什么要尝试这个,因为 python 客户端需要将数据转换为数据帧,我注意到 python 中的箭头 table 可以使用 'to_pandas()' 来做到这一点。
我试图查找箭头 cython 代码,但我什么也没找到。
您可以通过基本套接字发送箭头table(如下例),但您最好还是使用飞行。 Flight 使用 grpc 来回发送箭头数据,它将消除使用套接字的一些单调乏味。 Here 就是一个很好的例子。
完整的套接字示例可以在 gist.
中找到
我会把相关的位放在这里:
发送中
void SendTable(int socket_fd) {
auto output_res = SocketOutputStream::Open(socket_fd);
if (!CheckErr(output_res.status(), "arrow::io::FileOutputStream")) {
return;
}
auto output = *output_res;
arrow::MemoryPool *pool = arrow::default_memory_pool();
auto table = MakeTable();
if (table == nullptr) {
return;
}
auto writer_res = arrow::ipc::MakeStreamWriter(output, table->schema());
if (!CheckErr(writer_res.status(), "arrow::ipc::MakeStreamWriter")) {
return;
}
auto writer = *writer_res;
if (!CheckErr(writer->WriteTable(*table), "RecordBatchWriter::WriteTable")) {
return;
}
CheckErr(writer->Close(), "RecordBatchWriter::Close");
}
正在接收
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
sock.bind((listen, port))
sock.listen()
print(f"Listening on {listen} on port {port}")
conn, _ = sock.accept()
with conn:
conn_file = conn.makefile(mode="b")
reader = pyarrow.ipc.RecordBatchStreamReader(conn_file)
table = reader.read_all()
print(table)
print(table.to_pandas())
我是 apache arrow 的新手,我的 C++ 项目使用 apache::table 来很好地存储数据。 现在,我需要使用套接字将 c++ table 转换为其他 python 客户端。为什么要尝试这个,因为 python 客户端需要将数据转换为数据帧,我注意到 python 中的箭头 table 可以使用 'to_pandas()' 来做到这一点。 我试图查找箭头 cython 代码,但我什么也没找到。
您可以通过基本套接字发送箭头table(如下例),但您最好还是使用飞行。 Flight 使用 grpc 来回发送箭头数据,它将消除使用套接字的一些单调乏味。 Here 就是一个很好的例子。
完整的套接字示例可以在 gist.
中找到我会把相关的位放在这里:
发送中
void SendTable(int socket_fd) {
auto output_res = SocketOutputStream::Open(socket_fd);
if (!CheckErr(output_res.status(), "arrow::io::FileOutputStream")) {
return;
}
auto output = *output_res;
arrow::MemoryPool *pool = arrow::default_memory_pool();
auto table = MakeTable();
if (table == nullptr) {
return;
}
auto writer_res = arrow::ipc::MakeStreamWriter(output, table->schema());
if (!CheckErr(writer_res.status(), "arrow::ipc::MakeStreamWriter")) {
return;
}
auto writer = *writer_res;
if (!CheckErr(writer->WriteTable(*table), "RecordBatchWriter::WriteTable")) {
return;
}
CheckErr(writer->Close(), "RecordBatchWriter::Close");
}
正在接收
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
sock.bind((listen, port))
sock.listen()
print(f"Listening on {listen} on port {port}")
conn, _ = sock.accept()
with conn:
conn_file = conn.makefile(mode="b")
reader = pyarrow.ipc.RecordBatchStreamReader(conn_file)
table = reader.read_all()
print(table)
print(table.to_pandas())