如何在 WebSocket 中代理到多个服务器
How to proxy to multiple servers in WebSocket
我有一个用 ballerina 编程的代理,它接收用户对网络套接字的请求,并通过另一个网络套接字将其发送到处理它的服务。当前,当服务向我发送响应时,芭蕾舞脚本 return 会响应用户。我想这样做,一旦我收到来自该服务的响应,它就会被发送到另一个服务以执行第二次处理,当该服务响应时,return 将它发送给用户。
我有以下代码:
import ballerina/http;
import ballerina/log;
final string ASSOCIATED_CONNECTION = "EXTRACTOR CONNECTION";
final string EXTRACTOR = "ws://192.168.10.248:8081";
@http:WebSocketServiceConfig {
path: "/api/ws"
}
service RequestService on new http:Listener(9091) {
resource function onOpen(http:WebSocketCaller caller) {
http:WebSocketClient wsClientEp = new(
EXTRACTOR,
{callbackService: ClientService,
readyOnConnect: false,
maxFrameSize: 2147483648
});
wsClientEp.setAttribute(ASSOCIATED_CONNECTION, caller);
caller.setAttribute(ASSOCIATED_CONNECTION, wsClientEp);
var err = wsClientEp->ready();
if (err is http:WebSocketError) {
log:printError("Error calling ready on client", err);
}
}
resource function onText(http:WebSocketCaller caller, string text, boolean finalFrame) {
http:WebSocketClient clientEp = getAssociatedClientEndpoint(caller);
var err = clientEp->pushText(text, finalFrame);
if (err is http:WebSocketError) {
log:printError("Error occurred when sending text message", err);
}
}
resource function onError(http:WebSocketCaller caller, error err) {
http:WebSocketClient clientEp = getAssociatedClientEndpoint(caller);
var e = clientEp->close(statusCode = 1011, reason = "Unexpected condition");
if (e is http:WebSocketError) {
log:printError("Error occurred when closing the connection", e);
}
_ = caller.removeAttribute(ASSOCIATED_CONNECTION);
log:printError("Unexpected error hence closing the connection", err);
}
resource function onClose(http:WebSocketCaller caller, int statusCode, string reason) {
http:WebSocketClient clientEp = getAssociatedClientEndpoint(caller);
var err = clientEp->close(statusCode = statusCode, reason = reason);
if (err is http:WebSocketError) {
log:printError("Error occurred when closing the connection", err);
}
_ = caller.removeAttribute(ASSOCIATED_CONNECTION);
}
}
service ClientService = @http:WebSocketServiceConfig {} service {
resource function onText(http:WebSocketClient caller, string text, boolean finalFrame) {
http:WebSocketCaller serverEp = getAssociatedServerEndpoint(caller);
var err = serverEp->pushText(text, finalFrame);
if (err is http:WebSocketError) {
log:printError("Error occurred when sending text message", err);
}
}
resource function onError(http:WebSocketClient caller, error err) {
http:WebSocketCaller serverEp = getAssociatedServerEndpoint(caller);
var e = serverEp->close(statusCode = 1011, reason = "Unexpected condition");
if (e is http:WebSocketError) {
log:printError("Error occurred when closing the connection", err = e);
}
_ = caller.removeAttribute(ASSOCIATED_CONNECTION);
log:printError("Unexpected error hense closing the connection", err);
}
resource function onClose(http:WebSocketClient caller, int statusCode, string reason) {
http:WebSocketCaller serverEp = getAssociatedServerEndpoint(caller);
var err = serverEp->close(statusCode = statusCode, reason = reason);
if (err is http:WebSocketError) {
log:printError("Error occurred when closing the connection", err);
}
_ = caller.removeAttribute(ASSOCIATED_CONNECTION);
}
};
function getAssociatedClientEndpoint(http:WebSocketCaller ep) returns (http:WebSocketClient) {
http:WebSocketClient wsClient = <http:WebSocketClient>ep.getAttribute(ASSOCIATED_CONNECTION);
return wsClient;
}
function getAssociatedServerEndpoint(http:WebSocketClient ep) returns (http:WebSocketCaller) {
http:WebSocketCaller wsEndpoint = <http:WebSocketCaller>ep.getAttribute(ASSOCIATED_CONNECTION);
return wsEndpoint;
}
我制作了以下管道:
客户端 --> Ballerina 代理 --> Service1 --> Ballerina 代理 --> 客户端
我想要:
客户端 --> Ballerina 代理 --> Service1 --> Ballerina 代理 --> Service2 --> Ballerina 代理 --> 客户端
我们可以链接关联的连接
caller has wsClientEp
wsClientEp has wsClientEp2
wsClientEp2 has caller
我写了一些粗略的代码我认为下面的代码应该可以工作。确保更改变量名称:
import ballerina/http;
import ballerina/log;
final string ASSOCIATED_CONNECTION = "EXTRACTOR CONNECTION";
final string EXTRACTOR = "ws://localhost:9090/basic";
final string EXTRACTOR2 = "ws://localhost:9091/basic";
@http:WebSocketServiceConfig {
path: "/api/ws"
}
service RequestService on new http:Listener(9092) {
resource function onOpen(http:WebSocketCaller caller) {
http:WebSocketClient wsClientEp = new(
EXTRACTOR,
{callbackService: ClientService1,
readyOnConnect: false,
maxFrameSize: 2147483648
});
http:WebSocketClient wsClientEp2 = new(
EXTRACTOR2,
{callbackService: ClientService2,
readyOnConnect: false,
maxFrameSize: 2147483648
});
caller.setAttribute(ASSOCIATED_CONNECTION, wsClientEp);
wsClientEp.setAttribute(ASSOCIATED_CONNECTION, wsClientEp2);
wsClientEp2.setAttribute(ASSOCIATED_CONNECTION, caller);
var err = wsClientEp->ready();
if (err is http:WebSocketError) {
log:printError("Error calling ready on client 1", err);
}
err = wsClientEp2->ready();
if (err is http:WebSocketError) {
log:printError("Error calling ready on client 2", err);
}
}
resource function onText(http:WebSocketCaller caller, string text, boolean finalFrame) {
http:WebSocketClient clientEp = getAssociatedClientEndpoint(caller);
var err = clientEp->pushText(text, finalFrame);
if (err is http:WebSocketError) {
log:printError("Error occurred when sending text message", err);
}
}
resource function onError(http:WebSocketCaller caller, error err) {
http:WebSocketClient clientEp = getAssociatedClientEndpoint(caller);
var e = clientEp->close(statusCode = 1011, reason = "Unexpected condition");
if (e is http:WebSocketError) {
log:printError("Error occurred when closing the connection", e);
}
_ = caller.removeAttribute(ASSOCIATED_CONNECTION);
log:printError("Unexpected error hence closing the connection", err);
}
resource function onClose(http:WebSocketCaller caller, int statusCode, string reason) {
http:WebSocketClient clientEp = getAssociatedClientEndpoint(caller);
var err = clientEp->close(statusCode = statusCode, reason = reason);
if (err is http:WebSocketError) {
log:printError("Error occurred when closing the connection", err);
}
_ = caller.removeAttribute(ASSOCIATED_CONNECTION);
}
}
service ClientService1 = @http:WebSocketServiceConfig {} service {
resource function onText(http:WebSocketClient caller, string text, boolean finalFrame) {
http:WebSocketClient clientEp2 = getAssociatedClientEndpointFromClient(caller);
var err = clientEp2->pushText(text, finalFrame);
if (err is http:WebSocketError) {
log:printError("Error occurred when sending text message", err);
}
}
resource function onError(http:WebSocketClient caller, error err) {
http:WebSocketClient clientEp2 = getAssociatedClientEndpointFromClient(caller);
var e = clientEp2->close(statusCode = 1011, reason = "Unexpected condition");
if (e is http:WebSocketError) {
log:printError("Error occurred when closing the connection", err = e);
}
_ = caller.removeAttribute(ASSOCIATED_CONNECTION);
log:printError("Unexpected error hense closing the connection", err);
}
resource function onClose(http:WebSocketClient caller, int statusCode, string reason) {
http:WebSocketClient clientEp2 = getAssociatedClientEndpointFromClient(caller);
var err = clientEp2->close(statusCode = statusCode, reason = reason);
if (err is http:WebSocketError) {
log:printError("Error occurred when closing the connection", err);
}
_ = caller.removeAttribute(ASSOCIATED_CONNECTION);
}
};
service ClientService2 = @http:WebSocketServiceConfig {} service {
resource function onText(http:WebSocketClient caller, string text, boolean finalFrame) {
http:WebSocketCaller serverEp = getAssociatedServerEndpoint(caller);
var err = serverEp->pushText(text, finalFrame);
if (err is http:WebSocketError) {
log:printError("Error occurred when sending text message", err);
}
}
resource function onError(http:WebSocketClient caller, error err) {
http:WebSocketCaller serverEp = getAssociatedServerEndpoint(caller);
var e = serverEp->close(statusCode = 1011, reason = "Unexpected condition");
if (e is http:WebSocketError) {
log:printError("Error occurred when closing the connection", err = e);
}
_ = caller.removeAttribute(ASSOCIATED_CONNECTION);
log:printError("Unexpected error hense closing the connection", err);
}
resource function onClose(http:WebSocketClient caller, int statusCode, string reason) {
http:WebSocketCaller serverEp = getAssociatedServerEndpoint(caller);
var err = serverEp->close(statusCode = statusCode, reason = reason);
if (err is http:WebSocketError) {
log:printError("Error occurred when closing the connection", err);
}
_ = caller.removeAttribute(ASSOCIATED_CONNECTION);
}
};
function getAssociatedClientEndpoint(http:WebSocketCaller ep) returns (http:WebSocketClient) {
http:WebSocketClient wsClient = <http:WebSocketClient>ep.getAttribute(ASSOCIATED_CONNECTION);
return wsClient;
}
function getAssociatedServerEndpoint(http:WebSocketClient ep) returns (http:WebSocketCaller) {
http:WebSocketCaller wsEndpoint = <http:WebSocketCaller>ep.getAttribute(ASSOCIATED_CONNECTION);
return wsEndpoint;
}
function getAssociatedClientEndpointFromClient(http:WebSocketClient ep) returns (http:WebSocketClient) {
http:WebSocketClient wsEndpoint = <http:WebSocketClient>ep.getAttribute(ASSOCIATED_CONNECTION);
return wsEndpoint;
}
请注意,我已经稍微更改了 url 以进行测试。
我有一个用 ballerina 编程的代理,它接收用户对网络套接字的请求,并通过另一个网络套接字将其发送到处理它的服务。当前,当服务向我发送响应时,芭蕾舞脚本 return 会响应用户。我想这样做,一旦我收到来自该服务的响应,它就会被发送到另一个服务以执行第二次处理,当该服务响应时,return 将它发送给用户。
我有以下代码:
import ballerina/http;
import ballerina/log;
final string ASSOCIATED_CONNECTION = "EXTRACTOR CONNECTION";
final string EXTRACTOR = "ws://192.168.10.248:8081";
@http:WebSocketServiceConfig {
path: "/api/ws"
}
service RequestService on new http:Listener(9091) {
resource function onOpen(http:WebSocketCaller caller) {
http:WebSocketClient wsClientEp = new(
EXTRACTOR,
{callbackService: ClientService,
readyOnConnect: false,
maxFrameSize: 2147483648
});
wsClientEp.setAttribute(ASSOCIATED_CONNECTION, caller);
caller.setAttribute(ASSOCIATED_CONNECTION, wsClientEp);
var err = wsClientEp->ready();
if (err is http:WebSocketError) {
log:printError("Error calling ready on client", err);
}
}
resource function onText(http:WebSocketCaller caller, string text, boolean finalFrame) {
http:WebSocketClient clientEp = getAssociatedClientEndpoint(caller);
var err = clientEp->pushText(text, finalFrame);
if (err is http:WebSocketError) {
log:printError("Error occurred when sending text message", err);
}
}
resource function onError(http:WebSocketCaller caller, error err) {
http:WebSocketClient clientEp = getAssociatedClientEndpoint(caller);
var e = clientEp->close(statusCode = 1011, reason = "Unexpected condition");
if (e is http:WebSocketError) {
log:printError("Error occurred when closing the connection", e);
}
_ = caller.removeAttribute(ASSOCIATED_CONNECTION);
log:printError("Unexpected error hence closing the connection", err);
}
resource function onClose(http:WebSocketCaller caller, int statusCode, string reason) {
http:WebSocketClient clientEp = getAssociatedClientEndpoint(caller);
var err = clientEp->close(statusCode = statusCode, reason = reason);
if (err is http:WebSocketError) {
log:printError("Error occurred when closing the connection", err);
}
_ = caller.removeAttribute(ASSOCIATED_CONNECTION);
}
}
service ClientService = @http:WebSocketServiceConfig {} service {
resource function onText(http:WebSocketClient caller, string text, boolean finalFrame) {
http:WebSocketCaller serverEp = getAssociatedServerEndpoint(caller);
var err = serverEp->pushText(text, finalFrame);
if (err is http:WebSocketError) {
log:printError("Error occurred when sending text message", err);
}
}
resource function onError(http:WebSocketClient caller, error err) {
http:WebSocketCaller serverEp = getAssociatedServerEndpoint(caller);
var e = serverEp->close(statusCode = 1011, reason = "Unexpected condition");
if (e is http:WebSocketError) {
log:printError("Error occurred when closing the connection", err = e);
}
_ = caller.removeAttribute(ASSOCIATED_CONNECTION);
log:printError("Unexpected error hense closing the connection", err);
}
resource function onClose(http:WebSocketClient caller, int statusCode, string reason) {
http:WebSocketCaller serverEp = getAssociatedServerEndpoint(caller);
var err = serverEp->close(statusCode = statusCode, reason = reason);
if (err is http:WebSocketError) {
log:printError("Error occurred when closing the connection", err);
}
_ = caller.removeAttribute(ASSOCIATED_CONNECTION);
}
};
function getAssociatedClientEndpoint(http:WebSocketCaller ep) returns (http:WebSocketClient) {
http:WebSocketClient wsClient = <http:WebSocketClient>ep.getAttribute(ASSOCIATED_CONNECTION);
return wsClient;
}
function getAssociatedServerEndpoint(http:WebSocketClient ep) returns (http:WebSocketCaller) {
http:WebSocketCaller wsEndpoint = <http:WebSocketCaller>ep.getAttribute(ASSOCIATED_CONNECTION);
return wsEndpoint;
}
我制作了以下管道: 客户端 --> Ballerina 代理 --> Service1 --> Ballerina 代理 --> 客户端
我想要: 客户端 --> Ballerina 代理 --> Service1 --> Ballerina 代理 --> Service2 --> Ballerina 代理 --> 客户端
我们可以链接关联的连接
caller has wsClientEp
wsClientEp has wsClientEp2
wsClientEp2 has caller
我写了一些粗略的代码我认为下面的代码应该可以工作。确保更改变量名称:
import ballerina/http;
import ballerina/log;
final string ASSOCIATED_CONNECTION = "EXTRACTOR CONNECTION";
final string EXTRACTOR = "ws://localhost:9090/basic";
final string EXTRACTOR2 = "ws://localhost:9091/basic";
@http:WebSocketServiceConfig {
path: "/api/ws"
}
service RequestService on new http:Listener(9092) {
resource function onOpen(http:WebSocketCaller caller) {
http:WebSocketClient wsClientEp = new(
EXTRACTOR,
{callbackService: ClientService1,
readyOnConnect: false,
maxFrameSize: 2147483648
});
http:WebSocketClient wsClientEp2 = new(
EXTRACTOR2,
{callbackService: ClientService2,
readyOnConnect: false,
maxFrameSize: 2147483648
});
caller.setAttribute(ASSOCIATED_CONNECTION, wsClientEp);
wsClientEp.setAttribute(ASSOCIATED_CONNECTION, wsClientEp2);
wsClientEp2.setAttribute(ASSOCIATED_CONNECTION, caller);
var err = wsClientEp->ready();
if (err is http:WebSocketError) {
log:printError("Error calling ready on client 1", err);
}
err = wsClientEp2->ready();
if (err is http:WebSocketError) {
log:printError("Error calling ready on client 2", err);
}
}
resource function onText(http:WebSocketCaller caller, string text, boolean finalFrame) {
http:WebSocketClient clientEp = getAssociatedClientEndpoint(caller);
var err = clientEp->pushText(text, finalFrame);
if (err is http:WebSocketError) {
log:printError("Error occurred when sending text message", err);
}
}
resource function onError(http:WebSocketCaller caller, error err) {
http:WebSocketClient clientEp = getAssociatedClientEndpoint(caller);
var e = clientEp->close(statusCode = 1011, reason = "Unexpected condition");
if (e is http:WebSocketError) {
log:printError("Error occurred when closing the connection", e);
}
_ = caller.removeAttribute(ASSOCIATED_CONNECTION);
log:printError("Unexpected error hence closing the connection", err);
}
resource function onClose(http:WebSocketCaller caller, int statusCode, string reason) {
http:WebSocketClient clientEp = getAssociatedClientEndpoint(caller);
var err = clientEp->close(statusCode = statusCode, reason = reason);
if (err is http:WebSocketError) {
log:printError("Error occurred when closing the connection", err);
}
_ = caller.removeAttribute(ASSOCIATED_CONNECTION);
}
}
service ClientService1 = @http:WebSocketServiceConfig {} service {
resource function onText(http:WebSocketClient caller, string text, boolean finalFrame) {
http:WebSocketClient clientEp2 = getAssociatedClientEndpointFromClient(caller);
var err = clientEp2->pushText(text, finalFrame);
if (err is http:WebSocketError) {
log:printError("Error occurred when sending text message", err);
}
}
resource function onError(http:WebSocketClient caller, error err) {
http:WebSocketClient clientEp2 = getAssociatedClientEndpointFromClient(caller);
var e = clientEp2->close(statusCode = 1011, reason = "Unexpected condition");
if (e is http:WebSocketError) {
log:printError("Error occurred when closing the connection", err = e);
}
_ = caller.removeAttribute(ASSOCIATED_CONNECTION);
log:printError("Unexpected error hense closing the connection", err);
}
resource function onClose(http:WebSocketClient caller, int statusCode, string reason) {
http:WebSocketClient clientEp2 = getAssociatedClientEndpointFromClient(caller);
var err = clientEp2->close(statusCode = statusCode, reason = reason);
if (err is http:WebSocketError) {
log:printError("Error occurred when closing the connection", err);
}
_ = caller.removeAttribute(ASSOCIATED_CONNECTION);
}
};
service ClientService2 = @http:WebSocketServiceConfig {} service {
resource function onText(http:WebSocketClient caller, string text, boolean finalFrame) {
http:WebSocketCaller serverEp = getAssociatedServerEndpoint(caller);
var err = serverEp->pushText(text, finalFrame);
if (err is http:WebSocketError) {
log:printError("Error occurred when sending text message", err);
}
}
resource function onError(http:WebSocketClient caller, error err) {
http:WebSocketCaller serverEp = getAssociatedServerEndpoint(caller);
var e = serverEp->close(statusCode = 1011, reason = "Unexpected condition");
if (e is http:WebSocketError) {
log:printError("Error occurred when closing the connection", err = e);
}
_ = caller.removeAttribute(ASSOCIATED_CONNECTION);
log:printError("Unexpected error hense closing the connection", err);
}
resource function onClose(http:WebSocketClient caller, int statusCode, string reason) {
http:WebSocketCaller serverEp = getAssociatedServerEndpoint(caller);
var err = serverEp->close(statusCode = statusCode, reason = reason);
if (err is http:WebSocketError) {
log:printError("Error occurred when closing the connection", err);
}
_ = caller.removeAttribute(ASSOCIATED_CONNECTION);
}
};
function getAssociatedClientEndpoint(http:WebSocketCaller ep) returns (http:WebSocketClient) {
http:WebSocketClient wsClient = <http:WebSocketClient>ep.getAttribute(ASSOCIATED_CONNECTION);
return wsClient;
}
function getAssociatedServerEndpoint(http:WebSocketClient ep) returns (http:WebSocketCaller) {
http:WebSocketCaller wsEndpoint = <http:WebSocketCaller>ep.getAttribute(ASSOCIATED_CONNECTION);
return wsEndpoint;
}
function getAssociatedClientEndpointFromClient(http:WebSocketClient ep) returns (http:WebSocketClient) {
http:WebSocketClient wsEndpoint = <http:WebSocketClient>ep.getAttribute(ASSOCIATED_CONNECTION);
return wsEndpoint;
}
请注意,我已经稍微更改了 url 以进行测试。