Flask-socketio 实例不从外部进程发出 (Django)
Flask-socketio instance not Emitting from an External Process (Django)
您好,我正在尝试从外部 python 应用程序 (django) 发送到 flask-socketIO,但不幸的是它没有发送。
我将我的 flask-socket 应用程序定义如下:
app.register_blueprint(views)
socketio.init_app(app, message_queue='redis://127.0.0.1:6380')
我的看法如下:
from flask import render_template
from .. import socketio
from App import socketio
from App.views import views
from flask_socketio import join_room, leave_room, emit
import cgi
def ack():
print("cron_failedMsg was delivered")
@socketio.on('connect', namespace='/node/cron')
def ws_conn():
print("Connected")
@socketio.on('join', namespace='/node/cron')
def on_join(data):
print("Joined room " + data['room'])
join_room("room_" + data['room'])
emit('msg', {'msg': "joined room room_" +
data['room']}, room="room_" + data['room'])
@socketio.on('leave', namespace='/node/cron')
def on_leave(data):
print("leaved")
leave_room("room_" + data['room'])
@socketio.on('cron_failed', namespace='/node/cron')
def ws_cron_failed(message):
print("Failed")
print("sending to room " + message['room'])
"""
socketio.emit('cron_failed', {
'cron_failed': cgi.escape(message['cron_failed'])}, namespace='/node/cron')
"""
emit('cron_failedMsg', {
'cron_failed': cgi.escape(message['cron_failed'])}, room="room_" + cgi.escape(message['room']))
正在从外部 python 应用程序 (Django) 发送到消息队列:
from flask_socketio import SocketIO
socketio = SocketIO(message_queue='redis://' +
redis6380_conf.redis6380['host'] + ":" + redis6380_conf.redis6380['port'])
socketio.emit(
'cron_' + status, {'cron' + status: redisConnection6381.get(str(userID) + '_' + status), 'room': str(userID)}, namespace='/node/cron')
不幸的是,它不是从外部 python 进程发出的。
我已经使用 javascript 测试了来自网页的发射,它可以工作,但不是来自外部 python 应用程序(Django):
var url = "http://" + document.domain + ":" + location.port;
var socket = io.connect(url + "/node/cron");
socket.emit('join',{room:""+$rootScope.Data.ID});
socket.emit('cron_failed',{cron_failed:"ddd",room:"3"})
如何从外部 python 应用程序 (Django) 发出?从外部进程发出的方式是否有任何问题?
有关信息,我遵循了说明:https://flask-socketio.readthedocs.io/en/latest/#emitting-from-an-external-process
您想从 Django 进程发送到 Flask-SocketIO 服务器吗?如果那是您想要做的,那么在 Django 端您需要使用 Socket.IO 客户端。例如这个:https://pypi.python.org/pypi/socketIO-client.
Flask-SocketIO 的"emit from an external process" 特性不同。这个想法是您希望多个服务器进程发送给客户端。此功能允许 Django 服务器通过 Flask-SocketIO 服务器发送给客户端。
从你的例子来看,当你从 Django 发出时,你似乎期待你的 Flask-SocketIO 服务器中的 'cron_failed'
事件到 运行,但实际上你正在做的是发送事件这个名字给客户。也许您想要的是在 Django 发出时将事件名称更改为 'cron_failedMsg'
?看来这就是客户所期望的。
您好,我正在尝试从外部 python 应用程序 (django) 发送到 flask-socketIO,但不幸的是它没有发送。
我将我的 flask-socket 应用程序定义如下:
app.register_blueprint(views)
socketio.init_app(app, message_queue='redis://127.0.0.1:6380')
我的看法如下:
from flask import render_template
from .. import socketio
from App import socketio
from App.views import views
from flask_socketio import join_room, leave_room, emit
import cgi
def ack():
print("cron_failedMsg was delivered")
@socketio.on('connect', namespace='/node/cron')
def ws_conn():
print("Connected")
@socketio.on('join', namespace='/node/cron')
def on_join(data):
print("Joined room " + data['room'])
join_room("room_" + data['room'])
emit('msg', {'msg': "joined room room_" +
data['room']}, room="room_" + data['room'])
@socketio.on('leave', namespace='/node/cron')
def on_leave(data):
print("leaved")
leave_room("room_" + data['room'])
@socketio.on('cron_failed', namespace='/node/cron')
def ws_cron_failed(message):
print("Failed")
print("sending to room " + message['room'])
"""
socketio.emit('cron_failed', {
'cron_failed': cgi.escape(message['cron_failed'])}, namespace='/node/cron')
"""
emit('cron_failedMsg', {
'cron_failed': cgi.escape(message['cron_failed'])}, room="room_" + cgi.escape(message['room']))
正在从外部 python 应用程序 (Django) 发送到消息队列:
from flask_socketio import SocketIO
socketio = SocketIO(message_queue='redis://' +
redis6380_conf.redis6380['host'] + ":" + redis6380_conf.redis6380['port'])
socketio.emit(
'cron_' + status, {'cron' + status: redisConnection6381.get(str(userID) + '_' + status), 'room': str(userID)}, namespace='/node/cron')
不幸的是,它不是从外部 python 进程发出的。
我已经使用 javascript 测试了来自网页的发射,它可以工作,但不是来自外部 python 应用程序(Django):
var url = "http://" + document.domain + ":" + location.port;
var socket = io.connect(url + "/node/cron");
socket.emit('join',{room:""+$rootScope.Data.ID});
socket.emit('cron_failed',{cron_failed:"ddd",room:"3"})
如何从外部 python 应用程序 (Django) 发出?从外部进程发出的方式是否有任何问题?
有关信息,我遵循了说明:https://flask-socketio.readthedocs.io/en/latest/#emitting-from-an-external-process
您想从 Django 进程发送到 Flask-SocketIO 服务器吗?如果那是您想要做的,那么在 Django 端您需要使用 Socket.IO 客户端。例如这个:https://pypi.python.org/pypi/socketIO-client.
Flask-SocketIO 的"emit from an external process" 特性不同。这个想法是您希望多个服务器进程发送给客户端。此功能允许 Django 服务器通过 Flask-SocketIO 服务器发送给客户端。
从你的例子来看,当你从 Django 发出时,你似乎期待你的 Flask-SocketIO 服务器中的 'cron_failed'
事件到 运行,但实际上你正在做的是发送事件这个名字给客户。也许您想要的是在 Django 发出时将事件名称更改为 'cron_failedMsg'
?看来这就是客户所期望的。