芭蕾舞女演员:回调时出现多个 websocket 错误
Ballerina: Multiple websocket error on callback
我使用 ballerina 编写了一个代理服务器。该代理通过 websocket 接收来自客户的请求。目标是 Ballerina 服务器收集此请求并将其发送到名为 "EXTRACTOR" 的服务器。此服务器处理请求并 return 对 Ballerina 服务器作出响应。那时 Ballerina 服务器不会 return 对发出请求的客户端的响应,而是将 "EXTRACTOR" 的响应发送到另一个名为 "EXTRACTOR2" 的服务器,该服务器将处理 "EXTRACTOR2" 的响应=21=]。稍后 "EXTRACTOR2" return 将处理后的信息发送到 Ballerina 的服务器,而这个 return 将其发送到第一个发出请求的客户端。 Ballerina、客户端、EXTRACTOR 和 EXTRACTOR2 之间的所有通信都是通过 websockets 完成的。问题在于,当 EXTRACTOR2 试图 return 将信息发送到 Ballerina 的服务器时,它发现它不能,因为通信已关闭。
这是我的代码:
import ballerina/http;
import ballerina/log;
final string ASSOCIATED_CONNECTION = "EXTRACTOR CONNECTION";
final string EXTRACTOR = "ws://localhost:9090/basic";
final string EXTRACTOR2 = "ws://localhost:9091/basic";
@http:WebSocketServiceConfig {
path: "/api/ws"
}
service RequestService on new http:Listener(9092) {
resource function onOpen(http:WebSocketCaller caller) {
http:WebSocketClient wsClientEp = new(
EXTRACTOR,
{callbackService: ClientService1,
readyOnConnect: false,
maxFrameSize: 2147483648
});
http:WebSocketClient wsClientEp2 = new(
EXTRACTOR2,
{callbackService: ClientService2,
readyOnConnect: false,
maxFrameSize: 2147483648
});
caller.setAttribute(ASSOCIATED_CONNECTION, wsClientEp);
wsClientEp.setAttribute(ASSOCIATED_CONNECTION, wsClientEp2);
wsClientEp2.setAttribute(ASSOCIATED_CONNECTION, caller);
var err = wsClientEp->ready();
if (err is http:WebSocketError) {
log:printError("Error calling ready on client 1", err);
}
err = wsClientEp2->ready();
if (err is http:WebSocketError) {
log:printError("Error calling ready on client 2", err);
}
}
resource function onText(http:WebSocketCaller caller, string text, boolean finalFrame) {
http:WebSocketClient clientEp = getAssociatedClientEndpoint(caller);
var err = clientEp->pushText(text, finalFrame);
if (err is http:WebSocketError) {
log:printError("Error occurred when sending text message", err);
}
}
resource function onError(http:WebSocketCaller caller, error err) {
http:WebSocketClient clientEp = getAssociatedClientEndpoint(caller);
var e = clientEp->close(statusCode = 1011, reason = "Unexpected condition");
if (e is http:WebSocketError) {
log:printError("Error occurred when closing the connection", e);
}
_ = caller.removeAttribute(ASSOCIATED_CONNECTION);
log:printError("Unexpected error hence closing the connection", err);
}
resource function onClose(http:WebSocketCaller caller, int statusCode, string reason) {
http:WebSocketClient clientEp = getAssociatedClientEndpoint(caller);
var err = clientEp->close(statusCode = statusCode, reason = reason);
if (err is http:WebSocketError) {
log:printError("Error occurred when closing the connection", err);
}
_ = caller.removeAttribute(ASSOCIATED_CONNECTION);
}
}
service ClientService1 = @http:WebSocketServiceConfig {} service {
resource function onText(http:WebSocketClient caller, string text, boolean finalFrame) {
http:WebSocketClient clientEp2 = getAssociatedClientEndpointFromClient(caller);
var err = clientEp2->pushText(text, finalFrame);
if (err is http:WebSocketError) {
log:printError("Error occurred when sending text message", err);
}
}
resource function onError(http:WebSocketClient caller, error err) {
http:WebSocketClient clientEp2 = getAssociatedClientEndpointFromClient(caller);
var e = clientEp2->close(statusCode = 1011, reason = "Unexpected condition");
if (e is http:WebSocketError) {
log:printError("Error occurred when closing the connection", err = e);
}
_ = caller.removeAttribute(ASSOCIATED_CONNECTION);
log:printError("Unexpected error hense closing the connection", err);
}
resource function onClose(http:WebSocketClient caller, int statusCode, string reason) {
http:WebSocketClient clientEp2 = getAssociatedClientEndpointFromClient(caller);
var err = clientEp2->close(statusCode = statusCode, reason = reason);
if (err is http:WebSocketError) {
log:printError("Error occurred when closing the connection", err);
}
_ = caller.removeAttribute(ASSOCIATED_CONNECTION);
}
};
service ClientService2 = @http:WebSocketServiceConfig {} service {
resource function onText(http:WebSocketClient caller, string text, boolean finalFrame) {
http:WebSocketCaller serverEp = getAssociatedServerEndpoint(caller);
var err = serverEp->pushText(text, finalFrame);
if (err is http:WebSocketError) {
log:printError("Error occurred when sending text message", err);
}
}
resource function onError(http:WebSocketClient caller, error err) {
http:WebSocketCaller serverEp = getAssociatedServerEndpoint(caller);
var e = serverEp->close(statusCode = 1011, reason = "Unexpected condition");
if (e is http:WebSocketError) {
log:printError("Error occurred when closing the connection", err = e);
}
_ = caller.removeAttribute(ASSOCIATED_CONNECTION);
log:printError("Unexpected error hense closing the connection", err);
}
resource function onClose(http:WebSocketClient caller, int statusCode, string reason) {
http:WebSocketCaller serverEp = getAssociatedServerEndpoint(caller);
var err = serverEp->close(statusCode = statusCode, reason = reason);
if (err is http:WebSocketError) {
log:printError("Error occurred when closing the connection", err);
}
_ = caller.removeAttribute(ASSOCIATED_CONNECTION);
}
};
function getAssociatedClientEndpoint(http:WebSocketCaller ep) returns (http:WebSocketClient) {
http:WebSocketClient wsClient = <http:WebSocketClient>ep.getAttribute(ASSOCIATED_CONNECTION);
return wsClient;
}
function getAssociatedServerEndpoint(http:WebSocketClient ep) returns (http:WebSocketCaller) {
http:WebSocketCaller wsEndpoint = <http:WebSocketCaller>ep.getAttribute(ASSOCIATED_CONNECTION);
return wsEndpoint;
}
function getAssociatedClientEndpointFromClient(http:WebSocketClient ep) returns (http:WebSocketClient) {
http:WebSocketClient wsEndpoint = <http:WebSocketClient>ep.getAttribute(ASSOCIATED_CONNECTION);
return wsEndpoint;
}
当 EXTRACTOR2 尝试 return 将信息发送到 Ballerina 的服务器时,弹出此错误:
Error in connection handler
Traceback (most recent call last):
File "/home/cluster/.local/lib/python3.7/site-packages/websockets/protocol.py", line 795, in transfer_data
message = await self.read_message()
File "/home/cluster/.local/lib/python3.7/site-packages/websockets/protocol.py", line 863, in read_message
frame = await self.read_data_frame(max_size=self.max_size)
File "/home/cluster/.local/lib/python3.7/site-packages/websockets/protocol.py", line 938, in read_data_frame
frame = await self.read_frame(max_size)
File "/home/cluster/.local/lib/python3.7/site-packages/websockets/protocol.py", line 1018, in read_frame
extensions=self.extensions,
File "/home/cluster/.local/lib/python3.7/site-packages/websockets/framing.py", line 121, in read
data = await reader(2)
File "/usr/lib/python3.7/asyncio/streams.py", line 677, in readexactly
raise IncompleteReadError(incomplete, n)
asyncio.streams.IncompleteReadError: 0 bytes read on a total of 2 expected bytes
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/home/cluster/.local/lib/python3.7/site-packages/websockets/server.py", line 195, in handler
await self.ws_handler(self, path)
File "analyzer_main.py", line 39, in server
await websocket.send(json.dumps(response))
File "/home/cluster/.local/lib/python3.7/site-packages/websockets/protocol.py", line 530, in send
await self.ensure_open()
File "/home/cluster/.local/lib/python3.7/site-packages/websockets/protocol.py", line 771, in ensure_open
raise self.connection_closed_exc()
websockets.exceptions.ConnectionClosedError: code = 1006 (connection closed abnormally [internal]), no reason
Exception found
{"ok": 0, "data": "[ANALYZER] <class 'websockets.exceptions.ConnectionClosedError'>code = 1006 (connection closed abnormally [internal]), no reason"}
这是 python EXTRACTOR2 代码:
import json
import asyncio
import websockets
from analyzer import Analyzer
analyzer = Analyzer()
print("Ready")
async def server(websocket, path):
try:
request_json = await websocket.recv()
#await websocket.send(":)")
request = json.loads(request_json)
print("Peticion recibida")
print(request_json)
with open("log.txt","w+") as f:
f.write(request_json)
# Parsear request
data = []
for topic in request['data']:
sentiment = {}
sentiment['word'] = topic['word']
sentiment['sentiment'] = analyzer.api_get_sentiment(topic['context'])
data.append(sentiment)
print("Analizado " + topic['word'])
# Enviar respuesta
response = {"ok": 1, "data": data}
print(response)
await websocket.send(json.dumps(response))
except BaseException as exception:
print("Exception found")
response = {"ok": 0, "data": "[ANALYZER] " + str(type(exception)) + str(exception)}
print(json.dumps(response))
await websocket.send(json.dumps(response))
finally:
print("Finished")
start_server = websockets.serve(server, "0.0.0.0", 8082)
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()
我使用了以下 Ballerina 代理:
import ballerina/http;
import ballerina/log;
final string ASSOCIATED_CONNECTION = "EXTRACTOR CONNECTION";
final string EXTRACTOR = "ws://localhost:9091/basic";
final string EXTRACTOR2 = "ws://localhost:8082";
@http:WebSocketServiceConfig {
path: "/api/ws"
}
service RequestService on new http:Listener(9092) {
resource function onOpen(http:WebSocketCaller caller) {
http:WebSocketClient wsClientEp = new(
EXTRACTOR,
{callbackService: ClientService1,
readyOnConnect: false,
maxFrameSize: 2147483648
});
http:WebSocketClient wsClientEp2 = new(
EXTRACTOR2,
{callbackService: ClientService2,
readyOnConnect: false,
maxFrameSize: 2147483648
});
caller.setAttribute(ASSOCIATED_CONNECTION, wsClientEp);
wsClientEp.setAttribute(ASSOCIATED_CONNECTION, wsClientEp2);
wsClientEp2.setAttribute(ASSOCIATED_CONNECTION, caller);
var err = wsClientEp->ready();
if (err is http:WebSocketError) {
log:printError("Error calling ready on client 1", err);
}
err = wsClientEp2->ready();
if (err is http:WebSocketError) {
log:printError("Error calling ready on client 2", err);
}
}
resource function onText(http:WebSocketCaller caller, string text, boolean finalFrame) {
http:WebSocketClient clientEp = getAssociatedClientEndpoint(caller);
var err = clientEp->pushText(text, finalFrame);
if (err is http:WebSocketError) {
log:printError("Error occurred when sending text message", err);
}
}
resource function onError(http:WebSocketCaller caller, error err) {
http:WebSocketClient clientEp = getAssociatedClientEndpoint(caller);
var e = clientEp->close(statusCode = 1011, reason = "Unexpected condition");
if (e is http:WebSocketError) {
log:printError("Error occurred when closing the connection", e);
}
_ = caller.removeAttribute(ASSOCIATED_CONNECTION);
log:printError("Unexpected error hence closing the connection", err);
}
resource function onClose(http:WebSocketCaller caller, int statusCode, string reason) {
http:WebSocketClient clientEp = getAssociatedClientEndpoint(caller);
var err = clientEp->close(statusCode = statusCode, reason = reason);
if (err is http:WebSocketError) {
log:printError("Error occurred when closing the connection", err);
}
_ = caller.removeAttribute(ASSOCIATED_CONNECTION);
}
}
service ClientService1 = @http:WebSocketServiceConfig {} service {
resource function onText(http:WebSocketClient caller, string text, boolean finalFrame) {
http:WebSocketClient clientEp2 = getAssociatedClientEndpointFromClient(caller);
var err = clientEp2->pushText(text, finalFrame);
if (err is http:WebSocketError) {
log:printError("Error occurred when sending text message", err);
}
}
resource function onError(http:WebSocketClient caller, error err) {
http:WebSocketClient clientEp2 = getAssociatedClientEndpointFromClient(caller);
var e = clientEp2->close(statusCode = 1011, reason = "Unexpected condition");
if (e is http:WebSocketError) {
log:printError("Error occurred when closing the connection", err = e);
}
_ = caller.removeAttribute(ASSOCIATED_CONNECTION);
log:printError("Unexpected error hense closing the connection", err);
}
resource function onClose(http:WebSocketClient caller, int statusCode, string reason) {
http:WebSocketClient clientEp2 = getAssociatedClientEndpointFromClient(caller);
var err = clientEp2->close(statusCode = statusCode, reason = reason);
if (err is http:WebSocketError) {
log:printError("Error occurred when closing the connection", err);
}
_ = caller.removeAttribute(ASSOCIATED_CONNECTION);
}
};
service ClientService2 = @http:WebSocketServiceConfig {} service {
resource function onText(http:WebSocketClient caller, string text, boolean finalFrame) {
http:WebSocketCaller serverEp = getAssociatedServerEndpoint(caller);
var err = serverEp->pushText(text, finalFrame);
if (err is http:WebSocketError) {
log:printError("Error occurred when sending text message", err);
}
}
resource function onError(http:WebSocketClient caller, error err) {
http:WebSocketCaller serverEp = getAssociatedServerEndpoint(caller);
var e = serverEp->close(statusCode = 1011, reason = "Unexpected condition");
if (e is http:WebSocketError) {
log:printError("Error occurred when closing the connection", err = e);
}
_ = caller.removeAttribute(ASSOCIATED_CONNECTION);
log:printError("Unexpected error hense closing the connection", err);
}
resource function onClose(http:WebSocketClient caller, int statusCode, string reason) {
http:WebSocketCaller serverEp = getAssociatedServerEndpoint(caller);
var err = serverEp->close(statusCode = statusCode, reason = reason);
if (err is http:WebSocketError) {
log:printError("Error occurred when closing the connection", err);
}
_ = caller.removeAttribute(ASSOCIATED_CONNECTION);
}
};
function getAssociatedClientEndpoint(http:WebSocketCaller ep) returns (http:WebSocketClient) {
http:WebSocketClient wsClient = <http:WebSocketClient>ep.getAttribute(ASSOCIATED_CONNECTION);
return wsClient;
}
function getAssociatedServerEndpoint(http:WebSocketClient ep) returns (http:WebSocketCaller) {
http:WebSocketCaller wsEndpoint = <http:WebSocketCaller>ep.getAttribute(ASSOCIATED_CONNECTION);
return wsEndpoint;
}
function getAssociatedClientEndpointFromClient(http:WebSocketClient ep) returns (http:WebSocketClient) {
http:WebSocketClient wsEndpoint = <http:WebSocketClient>ep.getAttribute(ASSOCIATED_CONNECTION);
return wsEndpoint;
}
和这个 python 服务器,为了我的测试稍作修改。
import json
import asyncio
import websockets
print("Ready")
async def server(websocket, path):
try:
request_json = await websocket.recv()
print("Peticion recibida")
data = ["hello"]
response = {"ok": 1, "data": data}
print(response)
await websocket.send(json.dumps(response))
except BaseException as exception:
print("Exception found")
response = {"ok": 0, "data": "[ANALYZER] " + str(type(exception)) + str(exception)}
print(json.dumps(response))
await websocket.send(json.dumps(response))
finally:
print("Finished")
start_server = websockets.serve(server, "0.0.0.0", 8082)
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()
这些对我来说很好。
我使用 ballerina 编写了一个代理服务器。该代理通过 websocket 接收来自客户的请求。目标是 Ballerina 服务器收集此请求并将其发送到名为 "EXTRACTOR" 的服务器。此服务器处理请求并 return 对 Ballerina 服务器作出响应。那时 Ballerina 服务器不会 return 对发出请求的客户端的响应,而是将 "EXTRACTOR" 的响应发送到另一个名为 "EXTRACTOR2" 的服务器,该服务器将处理 "EXTRACTOR2" 的响应=21=]。稍后 "EXTRACTOR2" return 将处理后的信息发送到 Ballerina 的服务器,而这个 return 将其发送到第一个发出请求的客户端。 Ballerina、客户端、EXTRACTOR 和 EXTRACTOR2 之间的所有通信都是通过 websockets 完成的。问题在于,当 EXTRACTOR2 试图 return 将信息发送到 Ballerina 的服务器时,它发现它不能,因为通信已关闭。
这是我的代码:
import ballerina/http;
import ballerina/log;
final string ASSOCIATED_CONNECTION = "EXTRACTOR CONNECTION";
final string EXTRACTOR = "ws://localhost:9090/basic";
final string EXTRACTOR2 = "ws://localhost:9091/basic";
@http:WebSocketServiceConfig {
path: "/api/ws"
}
service RequestService on new http:Listener(9092) {
resource function onOpen(http:WebSocketCaller caller) {
http:WebSocketClient wsClientEp = new(
EXTRACTOR,
{callbackService: ClientService1,
readyOnConnect: false,
maxFrameSize: 2147483648
});
http:WebSocketClient wsClientEp2 = new(
EXTRACTOR2,
{callbackService: ClientService2,
readyOnConnect: false,
maxFrameSize: 2147483648
});
caller.setAttribute(ASSOCIATED_CONNECTION, wsClientEp);
wsClientEp.setAttribute(ASSOCIATED_CONNECTION, wsClientEp2);
wsClientEp2.setAttribute(ASSOCIATED_CONNECTION, caller);
var err = wsClientEp->ready();
if (err is http:WebSocketError) {
log:printError("Error calling ready on client 1", err);
}
err = wsClientEp2->ready();
if (err is http:WebSocketError) {
log:printError("Error calling ready on client 2", err);
}
}
resource function onText(http:WebSocketCaller caller, string text, boolean finalFrame) {
http:WebSocketClient clientEp = getAssociatedClientEndpoint(caller);
var err = clientEp->pushText(text, finalFrame);
if (err is http:WebSocketError) {
log:printError("Error occurred when sending text message", err);
}
}
resource function onError(http:WebSocketCaller caller, error err) {
http:WebSocketClient clientEp = getAssociatedClientEndpoint(caller);
var e = clientEp->close(statusCode = 1011, reason = "Unexpected condition");
if (e is http:WebSocketError) {
log:printError("Error occurred when closing the connection", e);
}
_ = caller.removeAttribute(ASSOCIATED_CONNECTION);
log:printError("Unexpected error hence closing the connection", err);
}
resource function onClose(http:WebSocketCaller caller, int statusCode, string reason) {
http:WebSocketClient clientEp = getAssociatedClientEndpoint(caller);
var err = clientEp->close(statusCode = statusCode, reason = reason);
if (err is http:WebSocketError) {
log:printError("Error occurred when closing the connection", err);
}
_ = caller.removeAttribute(ASSOCIATED_CONNECTION);
}
}
service ClientService1 = @http:WebSocketServiceConfig {} service {
resource function onText(http:WebSocketClient caller, string text, boolean finalFrame) {
http:WebSocketClient clientEp2 = getAssociatedClientEndpointFromClient(caller);
var err = clientEp2->pushText(text, finalFrame);
if (err is http:WebSocketError) {
log:printError("Error occurred when sending text message", err);
}
}
resource function onError(http:WebSocketClient caller, error err) {
http:WebSocketClient clientEp2 = getAssociatedClientEndpointFromClient(caller);
var e = clientEp2->close(statusCode = 1011, reason = "Unexpected condition");
if (e is http:WebSocketError) {
log:printError("Error occurred when closing the connection", err = e);
}
_ = caller.removeAttribute(ASSOCIATED_CONNECTION);
log:printError("Unexpected error hense closing the connection", err);
}
resource function onClose(http:WebSocketClient caller, int statusCode, string reason) {
http:WebSocketClient clientEp2 = getAssociatedClientEndpointFromClient(caller);
var err = clientEp2->close(statusCode = statusCode, reason = reason);
if (err is http:WebSocketError) {
log:printError("Error occurred when closing the connection", err);
}
_ = caller.removeAttribute(ASSOCIATED_CONNECTION);
}
};
service ClientService2 = @http:WebSocketServiceConfig {} service {
resource function onText(http:WebSocketClient caller, string text, boolean finalFrame) {
http:WebSocketCaller serverEp = getAssociatedServerEndpoint(caller);
var err = serverEp->pushText(text, finalFrame);
if (err is http:WebSocketError) {
log:printError("Error occurred when sending text message", err);
}
}
resource function onError(http:WebSocketClient caller, error err) {
http:WebSocketCaller serverEp = getAssociatedServerEndpoint(caller);
var e = serverEp->close(statusCode = 1011, reason = "Unexpected condition");
if (e is http:WebSocketError) {
log:printError("Error occurred when closing the connection", err = e);
}
_ = caller.removeAttribute(ASSOCIATED_CONNECTION);
log:printError("Unexpected error hense closing the connection", err);
}
resource function onClose(http:WebSocketClient caller, int statusCode, string reason) {
http:WebSocketCaller serverEp = getAssociatedServerEndpoint(caller);
var err = serverEp->close(statusCode = statusCode, reason = reason);
if (err is http:WebSocketError) {
log:printError("Error occurred when closing the connection", err);
}
_ = caller.removeAttribute(ASSOCIATED_CONNECTION);
}
};
function getAssociatedClientEndpoint(http:WebSocketCaller ep) returns (http:WebSocketClient) {
http:WebSocketClient wsClient = <http:WebSocketClient>ep.getAttribute(ASSOCIATED_CONNECTION);
return wsClient;
}
function getAssociatedServerEndpoint(http:WebSocketClient ep) returns (http:WebSocketCaller) {
http:WebSocketCaller wsEndpoint = <http:WebSocketCaller>ep.getAttribute(ASSOCIATED_CONNECTION);
return wsEndpoint;
}
function getAssociatedClientEndpointFromClient(http:WebSocketClient ep) returns (http:WebSocketClient) {
http:WebSocketClient wsEndpoint = <http:WebSocketClient>ep.getAttribute(ASSOCIATED_CONNECTION);
return wsEndpoint;
}
当 EXTRACTOR2 尝试 return 将信息发送到 Ballerina 的服务器时,弹出此错误:
Error in connection handler
Traceback (most recent call last):
File "/home/cluster/.local/lib/python3.7/site-packages/websockets/protocol.py", line 795, in transfer_data
message = await self.read_message()
File "/home/cluster/.local/lib/python3.7/site-packages/websockets/protocol.py", line 863, in read_message
frame = await self.read_data_frame(max_size=self.max_size)
File "/home/cluster/.local/lib/python3.7/site-packages/websockets/protocol.py", line 938, in read_data_frame
frame = await self.read_frame(max_size)
File "/home/cluster/.local/lib/python3.7/site-packages/websockets/protocol.py", line 1018, in read_frame
extensions=self.extensions,
File "/home/cluster/.local/lib/python3.7/site-packages/websockets/framing.py", line 121, in read
data = await reader(2)
File "/usr/lib/python3.7/asyncio/streams.py", line 677, in readexactly
raise IncompleteReadError(incomplete, n)
asyncio.streams.IncompleteReadError: 0 bytes read on a total of 2 expected bytes
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/home/cluster/.local/lib/python3.7/site-packages/websockets/server.py", line 195, in handler
await self.ws_handler(self, path)
File "analyzer_main.py", line 39, in server
await websocket.send(json.dumps(response))
File "/home/cluster/.local/lib/python3.7/site-packages/websockets/protocol.py", line 530, in send
await self.ensure_open()
File "/home/cluster/.local/lib/python3.7/site-packages/websockets/protocol.py", line 771, in ensure_open
raise self.connection_closed_exc()
websockets.exceptions.ConnectionClosedError: code = 1006 (connection closed abnormally [internal]), no reason
Exception found
{"ok": 0, "data": "[ANALYZER] <class 'websockets.exceptions.ConnectionClosedError'>code = 1006 (connection closed abnormally [internal]), no reason"}
这是 python EXTRACTOR2 代码:
import json
import asyncio
import websockets
from analyzer import Analyzer
analyzer = Analyzer()
print("Ready")
async def server(websocket, path):
try:
request_json = await websocket.recv()
#await websocket.send(":)")
request = json.loads(request_json)
print("Peticion recibida")
print(request_json)
with open("log.txt","w+") as f:
f.write(request_json)
# Parsear request
data = []
for topic in request['data']:
sentiment = {}
sentiment['word'] = topic['word']
sentiment['sentiment'] = analyzer.api_get_sentiment(topic['context'])
data.append(sentiment)
print("Analizado " + topic['word'])
# Enviar respuesta
response = {"ok": 1, "data": data}
print(response)
await websocket.send(json.dumps(response))
except BaseException as exception:
print("Exception found")
response = {"ok": 0, "data": "[ANALYZER] " + str(type(exception)) + str(exception)}
print(json.dumps(response))
await websocket.send(json.dumps(response))
finally:
print("Finished")
start_server = websockets.serve(server, "0.0.0.0", 8082)
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()
我使用了以下 Ballerina 代理:
import ballerina/http;
import ballerina/log;
final string ASSOCIATED_CONNECTION = "EXTRACTOR CONNECTION";
final string EXTRACTOR = "ws://localhost:9091/basic";
final string EXTRACTOR2 = "ws://localhost:8082";
@http:WebSocketServiceConfig {
path: "/api/ws"
}
service RequestService on new http:Listener(9092) {
resource function onOpen(http:WebSocketCaller caller) {
http:WebSocketClient wsClientEp = new(
EXTRACTOR,
{callbackService: ClientService1,
readyOnConnect: false,
maxFrameSize: 2147483648
});
http:WebSocketClient wsClientEp2 = new(
EXTRACTOR2,
{callbackService: ClientService2,
readyOnConnect: false,
maxFrameSize: 2147483648
});
caller.setAttribute(ASSOCIATED_CONNECTION, wsClientEp);
wsClientEp.setAttribute(ASSOCIATED_CONNECTION, wsClientEp2);
wsClientEp2.setAttribute(ASSOCIATED_CONNECTION, caller);
var err = wsClientEp->ready();
if (err is http:WebSocketError) {
log:printError("Error calling ready on client 1", err);
}
err = wsClientEp2->ready();
if (err is http:WebSocketError) {
log:printError("Error calling ready on client 2", err);
}
}
resource function onText(http:WebSocketCaller caller, string text, boolean finalFrame) {
http:WebSocketClient clientEp = getAssociatedClientEndpoint(caller);
var err = clientEp->pushText(text, finalFrame);
if (err is http:WebSocketError) {
log:printError("Error occurred when sending text message", err);
}
}
resource function onError(http:WebSocketCaller caller, error err) {
http:WebSocketClient clientEp = getAssociatedClientEndpoint(caller);
var e = clientEp->close(statusCode = 1011, reason = "Unexpected condition");
if (e is http:WebSocketError) {
log:printError("Error occurred when closing the connection", e);
}
_ = caller.removeAttribute(ASSOCIATED_CONNECTION);
log:printError("Unexpected error hence closing the connection", err);
}
resource function onClose(http:WebSocketCaller caller, int statusCode, string reason) {
http:WebSocketClient clientEp = getAssociatedClientEndpoint(caller);
var err = clientEp->close(statusCode = statusCode, reason = reason);
if (err is http:WebSocketError) {
log:printError("Error occurred when closing the connection", err);
}
_ = caller.removeAttribute(ASSOCIATED_CONNECTION);
}
}
service ClientService1 = @http:WebSocketServiceConfig {} service {
resource function onText(http:WebSocketClient caller, string text, boolean finalFrame) {
http:WebSocketClient clientEp2 = getAssociatedClientEndpointFromClient(caller);
var err = clientEp2->pushText(text, finalFrame);
if (err is http:WebSocketError) {
log:printError("Error occurred when sending text message", err);
}
}
resource function onError(http:WebSocketClient caller, error err) {
http:WebSocketClient clientEp2 = getAssociatedClientEndpointFromClient(caller);
var e = clientEp2->close(statusCode = 1011, reason = "Unexpected condition");
if (e is http:WebSocketError) {
log:printError("Error occurred when closing the connection", err = e);
}
_ = caller.removeAttribute(ASSOCIATED_CONNECTION);
log:printError("Unexpected error hense closing the connection", err);
}
resource function onClose(http:WebSocketClient caller, int statusCode, string reason) {
http:WebSocketClient clientEp2 = getAssociatedClientEndpointFromClient(caller);
var err = clientEp2->close(statusCode = statusCode, reason = reason);
if (err is http:WebSocketError) {
log:printError("Error occurred when closing the connection", err);
}
_ = caller.removeAttribute(ASSOCIATED_CONNECTION);
}
};
service ClientService2 = @http:WebSocketServiceConfig {} service {
resource function onText(http:WebSocketClient caller, string text, boolean finalFrame) {
http:WebSocketCaller serverEp = getAssociatedServerEndpoint(caller);
var err = serverEp->pushText(text, finalFrame);
if (err is http:WebSocketError) {
log:printError("Error occurred when sending text message", err);
}
}
resource function onError(http:WebSocketClient caller, error err) {
http:WebSocketCaller serverEp = getAssociatedServerEndpoint(caller);
var e = serverEp->close(statusCode = 1011, reason = "Unexpected condition");
if (e is http:WebSocketError) {
log:printError("Error occurred when closing the connection", err = e);
}
_ = caller.removeAttribute(ASSOCIATED_CONNECTION);
log:printError("Unexpected error hense closing the connection", err);
}
resource function onClose(http:WebSocketClient caller, int statusCode, string reason) {
http:WebSocketCaller serverEp = getAssociatedServerEndpoint(caller);
var err = serverEp->close(statusCode = statusCode, reason = reason);
if (err is http:WebSocketError) {
log:printError("Error occurred when closing the connection", err);
}
_ = caller.removeAttribute(ASSOCIATED_CONNECTION);
}
};
function getAssociatedClientEndpoint(http:WebSocketCaller ep) returns (http:WebSocketClient) {
http:WebSocketClient wsClient = <http:WebSocketClient>ep.getAttribute(ASSOCIATED_CONNECTION);
return wsClient;
}
function getAssociatedServerEndpoint(http:WebSocketClient ep) returns (http:WebSocketCaller) {
http:WebSocketCaller wsEndpoint = <http:WebSocketCaller>ep.getAttribute(ASSOCIATED_CONNECTION);
return wsEndpoint;
}
function getAssociatedClientEndpointFromClient(http:WebSocketClient ep) returns (http:WebSocketClient) {
http:WebSocketClient wsEndpoint = <http:WebSocketClient>ep.getAttribute(ASSOCIATED_CONNECTION);
return wsEndpoint;
}
和这个 python 服务器,为了我的测试稍作修改。
import json
import asyncio
import websockets
print("Ready")
async def server(websocket, path):
try:
request_json = await websocket.recv()
print("Peticion recibida")
data = ["hello"]
response = {"ok": 1, "data": data}
print(response)
await websocket.send(json.dumps(response))
except BaseException as exception:
print("Exception found")
response = {"ok": 0, "data": "[ANALYZER] " + str(type(exception)) + str(exception)}
print(json.dumps(response))
await websocket.send(json.dumps(response))
finally:
print("Finished")
start_server = websockets.serve(server, "0.0.0.0", 8082)
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()
这些对我来说很好。