Phoenix Channels - 每个插座有多个频道
Phoenix Channels - Multiple channels per socket
我正在使用 Elixir Channels 编写一个应用程序来处理实时事件。我知道每个客户端将打开 1 个套接字,并且可以在其上多路复用多个通道。所以我的应用程序是一个聊天应用程序,其中用户是多个群聊的一部分。我有 1 个名为 MessageChannel 的 Phoenix Channel,其中 join 方法将处理动态主题。
def join("groups:" <> group_id, payload, socket) do
....
假设 John 加入 groups/topics A 和 B,而 Bob 仅加入 group/topic B。当 john 向 group/topic A 发送消息时,broadcast!/3
也会发送该消息给 Bob 的信息是否正确?因为 handle_in
没有将消息发送到 topic/group 的上下文。
我该如何处理才能让 Bob 不会收到发送到组 A 的事件。我的设计是否正确?
免责声明:我没有查看通道的内部工作原理,此信息完全来自我在应用程序中使用通道的第一次体验。
当有人加入不同的组时(基于您 join/3
中的模式匹配),将通过单独的通道(套接字)建立连接。因此,向 A 广播不会向 B 的成员发送消息,只会向 A 发送消息。
在我看来,Channel 模块类似于 GenServer
,连接有点像 start_link
,其中启动了一个新的服务器(进程)(但是,只有当它启动时尚不存在)。
您完全可以忽略该模块的内部工作原理,只需了解如果您加入的频道名称与现有频道名称不同,那么您将加入一个独特的频道。您也可以相信,如果您向某个频道广播,只有该频道的成员才能收到消息。
例如,在我的应用程序中,我有一个用户通道,我只希望连接到一个用户。加入看起来像 def join("agent:" <> _agent, payload, socket)
,其中代理只是一个电子邮件地址。当我向这个频道广播消息时,只有单个代理会收到消息。我还有一个所有代理都加入的办公室频道,当我希望所有代理都收到消息时,我会向它广播。
希望这对您有所帮助。
Because handle_in
doesn't have a context of which topic/group the message was sent to.
当调用 Phoenix.Channel.broadcast/3
时,显然它 具有与消息关联的主题(从签名中看不出来)。可以看到代码以on this line of channel.ex:
开头
def broadcast(socket, event, message) do
%{pubsub_server: pubsub_server, topic: topic} = assert_joined!(socket)
Server.broadcast pubsub_server, topic, event, message
end
所以当使用套接字调用 broadcast/3
时,它模式匹配当前主题,然后调用底层 Server.broadcast/4
.
(如果你像我一样好奇,这反过来会调用底层 PubSub.broadcast/3
,它会执行一些分发魔法将调用路由到你配置的 pubsub 实现服务器,很可能使用 pg2 但我跑题了...)
因此,通过阅读 Phoenix.Channel
docs, but they do state it explicitly in the phoenixframework channels page in Incoming Events:
,我发现这种行为并不明显
broadcast!/3
will notify all joined clients on this socket's topic and invoke their handle_out/3
callbacks.
所以它只是在广播"on this socket's topic"。他们在同一页面上将主题定义为:
topic - The string topic or topic:subtopic pair namespace, for example “messages”, “messages:123”
因此在您的示例中,"topics" 实际上是 topic:subtopic 对命名空间字符串:"groups:A"
和 "groups:B"
。 John 必须在客户端分别订阅这两个主题,因此您实际上会引用两个不同的频道,即使它们使用相同的套接字。因此,假设您使用的是 javascript 客户端,频道创建看起来像这样:
let channelA = this.socket.channel("groups:A", {});
let channelB = this.socket.channel("groups:B", {});
然后,当您从客户端在频道上发送消息时,您只使用了主题在服务器上匹配模式的频道,如我们上面所见。
channelA.push(msgName, msgBody);
实际上,套接字路由是根据如何使用 channel
API 在项目套接字模块中定义主题来完成的。对于我的 Slack 克隆,我使用三个通道。我有一个系统级通道来处理状态更新、一个用户通道和一个房间通道。
任何给定用户订阅了 0 或 1 个频道。然而,用户可能订阅了多个频道。
对于发送到特定房间的消息,我通过房间频道广播。
当我检测到特定房间的未读消息、通知或徽章时,我会使用用户频道。每个用户频道也存储用户订阅的房间列表(它们列在客户端栏上)。
所有这一切的诀窍是使用一对通道 APIs,主要是 intercept
、handle_out
、My.Endpoint.subscribe
和 handle_info(%Broadcast{},socket)
。
- 我使用
intercept
来捕获我想忽略或在发送之前对其进行处理的广播消息。
- 在用户频道,我订阅了房间频道广播的事件
- 当您订阅时,您会收到一个
handle_info
调用,其中 %Broadcast{}
结构包括广播消息的主题、事件和负载。
这是我的几段代码:
defmodule UcxChat.UserSocket do
use Phoenix.Socket
alias UcxChat.{User, Repo, MessageService, SideNavService}
require UcxChat.ChatConstants, as: CC
## Channels
channel CC.chan_room <> "*", UcxChat.RoomChannel # "ucxchat:"
channel CC.chan_user <> "*", UcxChat.UserChannel # "user:"
channel CC.chan_system <> "*", UcxChat.SystemChannel # "system:"
# ...
end
# user_channel.ex
# ...
intercept ["room:join", "room:leave", "room:mention", "user:state", "direct:new"]
#...
def handle_out("room:join", msg, socket) do
%{room: room} = msg
UserSocket.push_message_box(socket, socket.assigns.channel_id, socket.assigns.user_id)
update_rooms_list(socket)
clear_unreads(room, socket)
{:noreply, subscribe([room], socket)}
end
def handle_out("room:leave" = ev, msg, socket) do
%{room: room} = msg
debug ev, msg, "assigns: #{inspect socket.assigns}"
socket.endpoint.unsubscribe(CC.chan_room <> room)
update_rooms_list(socket)
{:noreply, assign(socket, :subscribed, List.delete(socket.assigns[:subscribed], room))}
end
# ...
defp subscribe(channels, socket) do
# debug inspect(channels), ""
Enum.reduce channels, socket, fn channel, acc ->
subscribed = acc.assigns[:subscribed]
if channel in subscribed do
acc
else
socket.endpoint.subscribe(CC.chan_room <> channel)
assign(acc, :subscribed, [channel | subscribed])
end
end
end
# ...
end
我还使用 user_channel 处理与特定用户相关的所有事件,如客户端状态、错误消息等。
我正在使用 Elixir Channels 编写一个应用程序来处理实时事件。我知道每个客户端将打开 1 个套接字,并且可以在其上多路复用多个通道。所以我的应用程序是一个聊天应用程序,其中用户是多个群聊的一部分。我有 1 个名为 MessageChannel 的 Phoenix Channel,其中 join 方法将处理动态主题。
def join("groups:" <> group_id, payload, socket) do
....
假设 John 加入 groups/topics A 和 B,而 Bob 仅加入 group/topic B。当 john 向 group/topic A 发送消息时,broadcast!/3
也会发送该消息给 Bob 的信息是否正确?因为 handle_in
没有将消息发送到 topic/group 的上下文。
我该如何处理才能让 Bob 不会收到发送到组 A 的事件。我的设计是否正确?
免责声明:我没有查看通道的内部工作原理,此信息完全来自我在应用程序中使用通道的第一次体验。
当有人加入不同的组时(基于您 join/3
中的模式匹配),将通过单独的通道(套接字)建立连接。因此,向 A 广播不会向 B 的成员发送消息,只会向 A 发送消息。
在我看来,Channel 模块类似于 GenServer
,连接有点像 start_link
,其中启动了一个新的服务器(进程)(但是,只有当它启动时尚不存在)。
您完全可以忽略该模块的内部工作原理,只需了解如果您加入的频道名称与现有频道名称不同,那么您将加入一个独特的频道。您也可以相信,如果您向某个频道广播,只有该频道的成员才能收到消息。
例如,在我的应用程序中,我有一个用户通道,我只希望连接到一个用户。加入看起来像 def join("agent:" <> _agent, payload, socket)
,其中代理只是一个电子邮件地址。当我向这个频道广播消息时,只有单个代理会收到消息。我还有一个所有代理都加入的办公室频道,当我希望所有代理都收到消息时,我会向它广播。
希望这对您有所帮助。
Because
handle_in
doesn't have a context of which topic/group the message was sent to.
当调用 Phoenix.Channel.broadcast/3
时,显然它 具有与消息关联的主题(从签名中看不出来)。可以看到代码以on this line of channel.ex:
def broadcast(socket, event, message) do
%{pubsub_server: pubsub_server, topic: topic} = assert_joined!(socket)
Server.broadcast pubsub_server, topic, event, message
end
所以当使用套接字调用 broadcast/3
时,它模式匹配当前主题,然后调用底层 Server.broadcast/4
.
(如果你像我一样好奇,这反过来会调用底层 PubSub.broadcast/3
,它会执行一些分发魔法将调用路由到你配置的 pubsub 实现服务器,很可能使用 pg2 但我跑题了...)
因此,通过阅读 Phoenix.Channel
docs, but they do state it explicitly in the phoenixframework channels page in Incoming Events:
broadcast!/3
will notify all joined clients on this socket's topic and invoke theirhandle_out/3
callbacks.
所以它只是在广播"on this socket's topic"。他们在同一页面上将主题定义为:
topic - The string topic or topic:subtopic pair namespace, for example “messages”, “messages:123”
因此在您的示例中,"topics" 实际上是 topic:subtopic 对命名空间字符串:"groups:A"
和 "groups:B"
。 John 必须在客户端分别订阅这两个主题,因此您实际上会引用两个不同的频道,即使它们使用相同的套接字。因此,假设您使用的是 javascript 客户端,频道创建看起来像这样:
let channelA = this.socket.channel("groups:A", {});
let channelB = this.socket.channel("groups:B", {});
然后,当您从客户端在频道上发送消息时,您只使用了主题在服务器上匹配模式的频道,如我们上面所见。
channelA.push(msgName, msgBody);
实际上,套接字路由是根据如何使用 channel
API 在项目套接字模块中定义主题来完成的。对于我的 Slack 克隆,我使用三个通道。我有一个系统级通道来处理状态更新、一个用户通道和一个房间通道。
任何给定用户订阅了 0 或 1 个频道。然而,用户可能订阅了多个频道。
对于发送到特定房间的消息,我通过房间频道广播。
当我检测到特定房间的未读消息、通知或徽章时,我会使用用户频道。每个用户频道也存储用户订阅的房间列表(它们列在客户端栏上)。
所有这一切的诀窍是使用一对通道 APIs,主要是 intercept
、handle_out
、My.Endpoint.subscribe
和 handle_info(%Broadcast{},socket)
。
- 我使用
intercept
来捕获我想忽略或在发送之前对其进行处理的广播消息。 - 在用户频道,我订阅了房间频道广播的事件
- 当您订阅时,您会收到一个
handle_info
调用,其中%Broadcast{}
结构包括广播消息的主题、事件和负载。
这是我的几段代码:
defmodule UcxChat.UserSocket do
use Phoenix.Socket
alias UcxChat.{User, Repo, MessageService, SideNavService}
require UcxChat.ChatConstants, as: CC
## Channels
channel CC.chan_room <> "*", UcxChat.RoomChannel # "ucxchat:"
channel CC.chan_user <> "*", UcxChat.UserChannel # "user:"
channel CC.chan_system <> "*", UcxChat.SystemChannel # "system:"
# ...
end
# user_channel.ex
# ...
intercept ["room:join", "room:leave", "room:mention", "user:state", "direct:new"]
#...
def handle_out("room:join", msg, socket) do
%{room: room} = msg
UserSocket.push_message_box(socket, socket.assigns.channel_id, socket.assigns.user_id)
update_rooms_list(socket)
clear_unreads(room, socket)
{:noreply, subscribe([room], socket)}
end
def handle_out("room:leave" = ev, msg, socket) do
%{room: room} = msg
debug ev, msg, "assigns: #{inspect socket.assigns}"
socket.endpoint.unsubscribe(CC.chan_room <> room)
update_rooms_list(socket)
{:noreply, assign(socket, :subscribed, List.delete(socket.assigns[:subscribed], room))}
end
# ...
defp subscribe(channels, socket) do
# debug inspect(channels), ""
Enum.reduce channels, socket, fn channel, acc ->
subscribed = acc.assigns[:subscribed]
if channel in subscribed do
acc
else
socket.endpoint.subscribe(CC.chan_room <> channel)
assign(acc, :subscribed, [channel | subscribed])
end
end
end
# ...
end
我还使用 user_channel 处理与特定用户相关的所有事件,如客户端状态、错误消息等。