带有高速公路的 websocket 中继 python
websocket relay with Autobahn python
我正在尝试使用 Autobahn python 构建一个 websocket 服务器,它充当 IBM Watson 文本到语音服务的中间人或中继。我已经设法通过使用队列从客户端接收流音频并将其转发到 Watson,并且我正在从 Watson 接收回转录假设作为 JSON 数据到我的服务器,但我不确定如何将 JSON 数据转发给客户端。似乎 Watson 转录侧回调和 Autobahn 客户端回调是独立存在的,我无法从一个回调中调用另一个回调中的例程,也无法从另一个回调中的一个回调中访问数据。
我需要设置某种共享短信队列吗?我确信它应该很简单,但我认为问题可能是我对“self”关键字缺乏理解,它似乎将两个例程隔离开来。也将感谢任何有关理解“自我”的资源。
# For Watson
from ibm_watson import SpeechToTextV1
from ibm_watson.websocket import RecognizeCallback, AudioSource
from threading import Thread
from ibm_cloud_sdk_core.authenticators import IAMAuthenticator
# For Autobahn
from autobahn.twisted.websocket import WebSocketServerProtocol, \
WebSocketServerFactory
from twisted.internet import reactor
try:
from Queue import Queue, Full
except ImportError:
from queue import Queue, Full
###############################################
#### Initalize queue to store the recordings ##
###############################################
CHUNK = 1024
BUF_MAX_SIZE = CHUNK * 10
# Buffer to store audio
q = Queue(maxsize=int(round(BUF_MAX_SIZE / CHUNK)))
# Create an instance of AudioSource
audio_source = AudioSource(q, True, True)
###############################################
#### Prepare Speech to Text Service ########
###############################################
# initialize speech to text service
authenticator = IAMAuthenticator('secretapikeycanttellyou')
speech_to_text = SpeechToTextV1(authenticator=authenticator)
# define callback for the speech to text service
class MyRecognizeCallback(RecognizeCallback):
def __init__(self):
RecognizeCallback.__init__(self)
def on_transcription(self, transcript):
print(transcript)
def on_connected(self):
print('Connection was successful')
def on_error(self, error):
print('Error received: {}'.format(error))
def on_inactivity_timeout(self, error):
print('Inactivity timeout: {}'.format(error))
def on_listening(self):
print('Service is listening')
def on_hypothesis(self, hypothesis):
print(hypothesis)
#self.sendMessage(hypothesis, isBinary = false)
# HOW TO FORWARD THIS TO CLIENT?
def on_data(self, data):
print(data)
#self.sendMessage(data, isBinary = false)
# HOW TO FORWARD THIS TO CLIENT?
def on_close(self):
print("Connection closed")
# define callback for client-side websocket in Autobahn
class MyServerProtocol(WebSocketServerProtocol):
def onConnect(self, request):
print("Client connecting: {0}".format(request.peer))
def onOpen(self):
print("WebSocket connection open.")
recognize_thread = Thread(target=recognize_using_weboscket, args=())
recognize_thread.daemon = True
recognize_thread.start()
def onMessage(self, payload, isBinary):
if isBinary:
# put audio in queue
q.put(payload)
else:
print("Text message received: {0}".format(payload.decode('utf8')))
# echo back message verbatim
self.sendMessage(payload, isBinary)
def onClose(self, wasClean, code, reason):
print("WebSocket connection closed: {0}".format(reason))
## this function will initiate the recognize service and pass in the AudioSource
def recognize_using_weboscket(*args):
mycallback = MyRecognizeCallback()
speech_to_text.recognize_using_websocket(audio=audio_source,
content_type='audio/l16; rate=16000',
recognize_callback=mycallback,
interim_results=True)
if __name__ == '__main__':
factory = WebSocketServerFactory("ws://127.0.0.1:9001")
factory.protocol = MyServerProtocol
reactor.listenTCP(9001, factory)
reactor.run()
看来我需要弥合 MyRecognizeCallback()
和 MyServerProtocol()
之间的差距。也请让我知道这是否是我想要实现的目标的糟糕实现。我知道有更简单的方法来中继 websocket 数据,但我想熟悉 websocket API/ 音频流和文本消息,因为最终我想从等式中删除 Watson 并使用我自己的转录算法。
基于回答, it seems that my efforts to call MyServerProtocol().sendMessage(u"this is a message2".encode('utf8'))
from main
were in fact creating a new and unrelated instance of MyServerProtocol rather than piping messages into the existing connection. I was able to send new messages into the open websocket connection using the method described here。
这是我的最终代码,它仍然需要一些工作,但相关的定义是 broadcast_message
。还需要 'subscribe' 我自己连接到 websocket onConnect
和 'unsubscribe' onClose
才能使此方法起作用:
from ibm_watson import SpeechToTextV1
from ibm_watson.websocket import RecognizeCallback, AudioSource
from threading import Thread
from ibm_cloud_sdk_core.authenticators import IAMAuthenticator
# For autobahn
import json
from autobahn.twisted.websocket import WebSocketServerProtocol, \
WebSocketServerFactory
from twisted.internet import reactor
try:
from Queue import Queue, Full
except ImportError:
from queue import Queue, Full
###############################################
#### Initalize queue to store the recordings ##
###############################################
CHUNK = 1024
# Note: It will discard if the websocket client can't consumme fast enough
# So, increase the max size as per your choice
BUF_MAX_SIZE = CHUNK * 10
# Buffer to store audio
q = Queue(maxsize=int(round(BUF_MAX_SIZE / CHUNK)))
# Create an instance of AudioSource
audio_source = AudioSource(q, True, True)
###############################################
#### Prepare Speech to Text Service ########
###############################################
# initialize speech to text service
authenticator = IAMAuthenticator('secretapikey')
speech_to_text = SpeechToTextV1(authenticator=authenticator)
# define callback for the speech to text service
class MyRecognizeCallback(RecognizeCallback):
def __init__(self):
RecognizeCallback.__init__(self)
def on_transcription(self, transcript):
# Forward to client
MyServerProtocol.broadcast_message(transcript)
def on_connected(self):
print('Connection was successful')
def on_error(self, error):
# Forward to client
MyServerProtocol.broadcast_message('Error received: {}'.format(error))
def on_inactivity_timeout(self, error):
# Forward to client
MyServerProtocol.broadcast_message('Inactivity timeout: {}'.format(error))
def on_listening(self):
print('Service is listening')
def on_hypothesis(self, hypothesis):
# Forward to client
MyServerProtocol.broadcast_message(hypothesis)
def on_data(self, data):
# Forward to client
MyServerProtocol.broadcast_message(data)
def on_close(self):
print("Connection closed")
MyServerProtocol.broadcast_message("Connection closed")
class MyServerProtocol(WebSocketServerProtocol):
connections = list()
def onConnect(self, request):
print("Client connecting: {0}".format(request.peer))
self.connections.append(self)
# Start recognizer on connection
recognize_thread = Thread(target=recognize_using_weboscket, args=())
recognize_thread.daemon = True
recognize_thread.start()
def onOpen(self):
print("WebSocket connection open.")
def onMessage(self, payload, isBinary):
if isBinary:
# Put incoming audio into the queue
try:
q.put(payload)
except Full:
pass # discard
else:
print("Text message received: {0}".format(payload.decode('utf8')))
@classmethod
def broadcast_message(cls, data):
payload = json.dumps(data, ensure_ascii = False).encode('utf8')
for c in set(cls.connections):
reactor.callFromThread(cls.sendMessage, c, payload)
def onClose(self, wasClean, code, reason):
print("WebSocket connection closed: {0}".format(reason))
self.connections.remove(self)
## this function will initiate the recognize service and pass in the AudioSource
def recognize_using_weboscket(*args):
mycallback = MyRecognizeCallback()
speech_to_text.recognize_using_websocket(audio=audio_source,
content_type='audio/l16; rate=16000',
recognize_callback=mycallback,
interim_results=True)
if __name__ == '__main__':
factory = WebSocketServerFactory("ws://127.0.0.1:9001")
factory.protocol = MyServerProtocol
reactor.listenTCP(9001, factory)
reactor.run()
我正在尝试使用 Autobahn python 构建一个 websocket 服务器,它充当 IBM Watson 文本到语音服务的中间人或中继。我已经设法通过使用队列从客户端接收流音频并将其转发到 Watson,并且我正在从 Watson 接收回转录假设作为 JSON 数据到我的服务器,但我不确定如何将 JSON 数据转发给客户端。似乎 Watson 转录侧回调和 Autobahn 客户端回调是独立存在的,我无法从一个回调中调用另一个回调中的例程,也无法从另一个回调中的一个回调中访问数据。
我需要设置某种共享短信队列吗?我确信它应该很简单,但我认为问题可能是我对“self”关键字缺乏理解,它似乎将两个例程隔离开来。也将感谢任何有关理解“自我”的资源。
# For Watson
from ibm_watson import SpeechToTextV1
from ibm_watson.websocket import RecognizeCallback, AudioSource
from threading import Thread
from ibm_cloud_sdk_core.authenticators import IAMAuthenticator
# For Autobahn
from autobahn.twisted.websocket import WebSocketServerProtocol, \
WebSocketServerFactory
from twisted.internet import reactor
try:
from Queue import Queue, Full
except ImportError:
from queue import Queue, Full
###############################################
#### Initalize queue to store the recordings ##
###############################################
CHUNK = 1024
BUF_MAX_SIZE = CHUNK * 10
# Buffer to store audio
q = Queue(maxsize=int(round(BUF_MAX_SIZE / CHUNK)))
# Create an instance of AudioSource
audio_source = AudioSource(q, True, True)
###############################################
#### Prepare Speech to Text Service ########
###############################################
# initialize speech to text service
authenticator = IAMAuthenticator('secretapikeycanttellyou')
speech_to_text = SpeechToTextV1(authenticator=authenticator)
# define callback for the speech to text service
class MyRecognizeCallback(RecognizeCallback):
def __init__(self):
RecognizeCallback.__init__(self)
def on_transcription(self, transcript):
print(transcript)
def on_connected(self):
print('Connection was successful')
def on_error(self, error):
print('Error received: {}'.format(error))
def on_inactivity_timeout(self, error):
print('Inactivity timeout: {}'.format(error))
def on_listening(self):
print('Service is listening')
def on_hypothesis(self, hypothesis):
print(hypothesis)
#self.sendMessage(hypothesis, isBinary = false)
# HOW TO FORWARD THIS TO CLIENT?
def on_data(self, data):
print(data)
#self.sendMessage(data, isBinary = false)
# HOW TO FORWARD THIS TO CLIENT?
def on_close(self):
print("Connection closed")
# define callback for client-side websocket in Autobahn
class MyServerProtocol(WebSocketServerProtocol):
def onConnect(self, request):
print("Client connecting: {0}".format(request.peer))
def onOpen(self):
print("WebSocket connection open.")
recognize_thread = Thread(target=recognize_using_weboscket, args=())
recognize_thread.daemon = True
recognize_thread.start()
def onMessage(self, payload, isBinary):
if isBinary:
# put audio in queue
q.put(payload)
else:
print("Text message received: {0}".format(payload.decode('utf8')))
# echo back message verbatim
self.sendMessage(payload, isBinary)
def onClose(self, wasClean, code, reason):
print("WebSocket connection closed: {0}".format(reason))
## this function will initiate the recognize service and pass in the AudioSource
def recognize_using_weboscket(*args):
mycallback = MyRecognizeCallback()
speech_to_text.recognize_using_websocket(audio=audio_source,
content_type='audio/l16; rate=16000',
recognize_callback=mycallback,
interim_results=True)
if __name__ == '__main__':
factory = WebSocketServerFactory("ws://127.0.0.1:9001")
factory.protocol = MyServerProtocol
reactor.listenTCP(9001, factory)
reactor.run()
看来我需要弥合 MyRecognizeCallback()
和 MyServerProtocol()
之间的差距。也请让我知道这是否是我想要实现的目标的糟糕实现。我知道有更简单的方法来中继 websocket 数据,但我想熟悉 websocket API/ 音频流和文本消息,因为最终我想从等式中删除 Watson 并使用我自己的转录算法。
基于回答MyServerProtocol().sendMessage(u"this is a message2".encode('utf8'))
from main
were in fact creating a new and unrelated instance of MyServerProtocol rather than piping messages into the existing connection. I was able to send new messages into the open websocket connection using the method described here。
这是我的最终代码,它仍然需要一些工作,但相关的定义是 broadcast_message
。还需要 'subscribe' 我自己连接到 websocket onConnect
和 'unsubscribe' onClose
才能使此方法起作用:
from ibm_watson import SpeechToTextV1
from ibm_watson.websocket import RecognizeCallback, AudioSource
from threading import Thread
from ibm_cloud_sdk_core.authenticators import IAMAuthenticator
# For autobahn
import json
from autobahn.twisted.websocket import WebSocketServerProtocol, \
WebSocketServerFactory
from twisted.internet import reactor
try:
from Queue import Queue, Full
except ImportError:
from queue import Queue, Full
###############################################
#### Initalize queue to store the recordings ##
###############################################
CHUNK = 1024
# Note: It will discard if the websocket client can't consumme fast enough
# So, increase the max size as per your choice
BUF_MAX_SIZE = CHUNK * 10
# Buffer to store audio
q = Queue(maxsize=int(round(BUF_MAX_SIZE / CHUNK)))
# Create an instance of AudioSource
audio_source = AudioSource(q, True, True)
###############################################
#### Prepare Speech to Text Service ########
###############################################
# initialize speech to text service
authenticator = IAMAuthenticator('secretapikey')
speech_to_text = SpeechToTextV1(authenticator=authenticator)
# define callback for the speech to text service
class MyRecognizeCallback(RecognizeCallback):
def __init__(self):
RecognizeCallback.__init__(self)
def on_transcription(self, transcript):
# Forward to client
MyServerProtocol.broadcast_message(transcript)
def on_connected(self):
print('Connection was successful')
def on_error(self, error):
# Forward to client
MyServerProtocol.broadcast_message('Error received: {}'.format(error))
def on_inactivity_timeout(self, error):
# Forward to client
MyServerProtocol.broadcast_message('Inactivity timeout: {}'.format(error))
def on_listening(self):
print('Service is listening')
def on_hypothesis(self, hypothesis):
# Forward to client
MyServerProtocol.broadcast_message(hypothesis)
def on_data(self, data):
# Forward to client
MyServerProtocol.broadcast_message(data)
def on_close(self):
print("Connection closed")
MyServerProtocol.broadcast_message("Connection closed")
class MyServerProtocol(WebSocketServerProtocol):
connections = list()
def onConnect(self, request):
print("Client connecting: {0}".format(request.peer))
self.connections.append(self)
# Start recognizer on connection
recognize_thread = Thread(target=recognize_using_weboscket, args=())
recognize_thread.daemon = True
recognize_thread.start()
def onOpen(self):
print("WebSocket connection open.")
def onMessage(self, payload, isBinary):
if isBinary:
# Put incoming audio into the queue
try:
q.put(payload)
except Full:
pass # discard
else:
print("Text message received: {0}".format(payload.decode('utf8')))
@classmethod
def broadcast_message(cls, data):
payload = json.dumps(data, ensure_ascii = False).encode('utf8')
for c in set(cls.connections):
reactor.callFromThread(cls.sendMessage, c, payload)
def onClose(self, wasClean, code, reason):
print("WebSocket connection closed: {0}".format(reason))
self.connections.remove(self)
## this function will initiate the recognize service and pass in the AudioSource
def recognize_using_weboscket(*args):
mycallback = MyRecognizeCallback()
speech_to_text.recognize_using_websocket(audio=audio_source,
content_type='audio/l16; rate=16000',
recognize_callback=mycallback,
interim_results=True)
if __name__ == '__main__':
factory = WebSocketServerFactory("ws://127.0.0.1:9001")
factory.protocol = MyServerProtocol
reactor.listenTCP(9001, factory)
reactor.run()