gRPC 客户端流式 rpc 管道错误。(结束后写入错误)
gRPC client streaming rpc pipeline error.(write after end ERROR)
我正在研究节点运行时的 gRPC 服务器-客户端编程。
我在客户端流式 rpc 中遇到错误。请看下面的rpc方法签名。
service RouteGuide{
rpc DataStreaming(stream File) returns (Stats) {}
}
message Stats{
string msg=1;
}
message File{
bytes chk=1;
}
我想从客户端上传文件到服务器。所以我定义了客户端流式rpc。
问题是文件上传只有第一次会成功。
当我尝试上传另一个文件时,出现错误。 写入结束后出错。
我觉得我处理流不是很好。任何人都可以帮助为什么会这样吗?谢谢!
// server.js
"use strict";
const grpc = require("grpc");
const protoLoader = require("@grpc/proto-loader");
const path = require("path");
const fs = require("fs");
const stream = require("stream");
const PROTO_PATH = path.join(__dirname, "proto", "route.proto"); // path.resolve("proto", "route.proto")
const packageDefinition = protoLoader.loadSync(PROTO_PATH, {
keepCase: false,
longs: String,
enums: String,
defaults: true,
oneofs: true
});
const routeguide = grpc.loadPackageDefinition(packageDefinition).routeguide;
const myTransformStream = new stream.Transform({
objectMode: true,
transform(data, enc, cb) {
cb(null, data.chk.toString());
}
});
function dataStreaming(strm, cb) {
console.log("server : streaming function");
stream.pipeline(
strm,
myTransformStream,
fs.createWriteStream("output.txt"),
err => {
if (err) {
console.log(`server side error : ${err}`);
cb(err);
} else {
console.log("server side no error");
cb(null, "server side finish");
}
}
);
}
function getServer() {
const server = new grpc.Server();
server.addService(routeguide.RouteGuide.service, {
DataStreaming: dataStreaming
});
return server;
}
if (require.main === module) {
const routeServer = getServer();
routeServer.bind("localhost:3333", grpc.ServerCredentials.createInsecure());
routeServer.start();
}
=============
// client.js
"use strict";
const grpc = require("grpc");
const protoLoader = require("@grpc/proto-loader");
const path = require("path");
const fs = require("fs");
const stream = require("stream");
const PROTO_PATH = path.join(__dirname, "proto", "route.proto"); // path.resolve("proto", "route.proto")
const packageDefinition = protoLoader.loadSync(PROTO_PATH, {
keepCase: false,
longs: String,
enums: String,
defaults: true,
oneofs: true
});
const routeguide = grpc.loadPackageDefinition(packageDefinition).routeguide;
const client = new routeguide.RouteGuide(
"localhost:3333",
grpc.credentials.createInsecure()
);
const MyTransform = new stream.Transform({
objectMode: true,
transform(chk, enc, cb) {
cb(null, { chk: chk });
}
});
function runDataStreaming() {
console.log("inside run-data-streaming()");
const strm = client.dataStreaming((err, ret) => {
if (err) {
console.log("client : file transfer failed.");
console.log(err);
} else {
console.log("client : file transfer succeeded.");
}
});
stream.pipeline(fs.createReadStream("test.txt"), MyTransform, strm, err => {
if (err) {
console.log(err.message);
} else {
console.log("pipeline succeeded");
}
});
}
if (require.main === module) {
runDataStreaming();
}
我解决了我的问题。有一些问题。我会解释的。
server.js 中的全局变量 myTransformStream
是问题所在。我将代码移到了 dataStreaming
函数中。
我也更改了关闭、销毁选项。
具体来说,我在fs.createReadStream()
、fs.createWriteStream()
中开启了autoclose
、emitclose
选项
并且...在转换流中打开 emitclose
、autodestroy
选项(myTransformStream
和 myTransform
)
我正在研究节点运行时的 gRPC 服务器-客户端编程。
我在客户端流式 rpc 中遇到错误。请看下面的rpc方法签名。
service RouteGuide{
rpc DataStreaming(stream File) returns (Stats) {}
}
message Stats{
string msg=1;
}
message File{
bytes chk=1;
}
我想从客户端上传文件到服务器。所以我定义了客户端流式rpc。
问题是文件上传只有第一次会成功。
当我尝试上传另一个文件时,出现错误。 写入结束后出错。
我觉得我处理流不是很好。任何人都可以帮助为什么会这样吗?谢谢!
// server.js
"use strict";
const grpc = require("grpc");
const protoLoader = require("@grpc/proto-loader");
const path = require("path");
const fs = require("fs");
const stream = require("stream");
const PROTO_PATH = path.join(__dirname, "proto", "route.proto"); // path.resolve("proto", "route.proto")
const packageDefinition = protoLoader.loadSync(PROTO_PATH, {
keepCase: false,
longs: String,
enums: String,
defaults: true,
oneofs: true
});
const routeguide = grpc.loadPackageDefinition(packageDefinition).routeguide;
const myTransformStream = new stream.Transform({
objectMode: true,
transform(data, enc, cb) {
cb(null, data.chk.toString());
}
});
function dataStreaming(strm, cb) {
console.log("server : streaming function");
stream.pipeline(
strm,
myTransformStream,
fs.createWriteStream("output.txt"),
err => {
if (err) {
console.log(`server side error : ${err}`);
cb(err);
} else {
console.log("server side no error");
cb(null, "server side finish");
}
}
);
}
function getServer() {
const server = new grpc.Server();
server.addService(routeguide.RouteGuide.service, {
DataStreaming: dataStreaming
});
return server;
}
if (require.main === module) {
const routeServer = getServer();
routeServer.bind("localhost:3333", grpc.ServerCredentials.createInsecure());
routeServer.start();
}
=============
// client.js
"use strict";
const grpc = require("grpc");
const protoLoader = require("@grpc/proto-loader");
const path = require("path");
const fs = require("fs");
const stream = require("stream");
const PROTO_PATH = path.join(__dirname, "proto", "route.proto"); // path.resolve("proto", "route.proto")
const packageDefinition = protoLoader.loadSync(PROTO_PATH, {
keepCase: false,
longs: String,
enums: String,
defaults: true,
oneofs: true
});
const routeguide = grpc.loadPackageDefinition(packageDefinition).routeguide;
const client = new routeguide.RouteGuide(
"localhost:3333",
grpc.credentials.createInsecure()
);
const MyTransform = new stream.Transform({
objectMode: true,
transform(chk, enc, cb) {
cb(null, { chk: chk });
}
});
function runDataStreaming() {
console.log("inside run-data-streaming()");
const strm = client.dataStreaming((err, ret) => {
if (err) {
console.log("client : file transfer failed.");
console.log(err);
} else {
console.log("client : file transfer succeeded.");
}
});
stream.pipeline(fs.createReadStream("test.txt"), MyTransform, strm, err => {
if (err) {
console.log(err.message);
} else {
console.log("pipeline succeeded");
}
});
}
if (require.main === module) {
runDataStreaming();
}
我解决了我的问题。有一些问题。我会解释的。
server.js 中的全局变量
myTransformStream
是问题所在。我将代码移到了dataStreaming
函数中。我也更改了关闭、销毁选项。 具体来说,我在
fs.createReadStream()
、fs.createWriteStream()
中开启了autoclose
、emitclose
选项 并且...在转换流中打开emitclose
、autodestroy
选项(myTransformStream
和myTransform
)