如何从 List 中创建多个 GenServer 进程并映射存储在其中的数据?
How to create many GenServer processes from List and map data stored in them?
在这两种方法中,我坚持如何通过给定的一组 ID 或组映射进程,然后将存储的结构映射到过滤数据。
%{group => [users]}
实施。
我意识到与用户相反,组将受到限制,因此我创建了一个使用组名称作为键的流程模块。
恐怕将来会有很多用户分几个组,所以我的问题是如何拆分当前的 UserGroupServer
模块以保留许多由组名标识的独立进程?我想保留当前模块的功能,在 init processes by groups 列表中,另外我不知道如何映射每个进程以通过 user_id?
获取组
目前我在 Phoenix lib/myapp.ex
中只启动了一个进程,通过在子树列表中包含模块,所以我可以直接在 Channels 中调用 UserGroupServer
。
defmodule UserGroupServer do
use GenServer
## Client API
def start_link(opts \ []) do
GenServer.start_link(__MODULE__, :ok, opts)
end
def update_user_groups_state(server, data) do
{groups, user_id} = data
GenServer.call(server, {:clean_groups, user_id}, :infinity)
users = Enum.map(groups, fn(group) ->
GenServer.call(server, {:add_user_group, group, user_id}, :infinity)
end)
Enum.count(Enum.uniq(List.flatten(users)))
end
def get_user_groups(server, user_id) do
GenServer.call(server, {:get_user_groups, user_id})
end
def users_count_in_gorup(server, group) do
GenServer.call(server, {:users_count_in_gorup, group})
end
## Callbacks (Server API)
def init(_) do
{:ok, Map.new}
end
def handle_call({:clean_groups, user_id}, _from, user_group_dict) do
user_group_dict = user_group_dict
|> Enum.map(fn({gr, users}) -> {gr, List.delete(users, user_id)} end)
|> Enum.into(%{})
{:reply, user_group_dict, user_group_dict}
end
def handle_call({:add_user_group, group, user_id}, _from, user_group_dict) do
user_group_dict = if Map.has_key?(user_group_dict, group) do
Map.update!(user_group_dict, group, fn(users) -> [user_id | users] end)
else
Map.put(user_group_dict, group, [user_id])
end
{:reply, Map.fetch(user_group_dict, group), user_group_dict}
end
end
测试:
defmodule MyappUserGroupServerTest do
use ExUnit.Case, async: false
setup do
{:ok, server_pid} = UserGroupServer.start_link
{:ok, server_pid: server_pid}
end
test "add users", context do
c1 = UserGroupServer.update_user_groups_state(context[:server_pid], {[:a, :b, :c], 1})
assert(1 == c1)
c2 = UserGroupServer.update_user_groups_state(context[:server_pid], {[:c, :d], 2})
assert(2 == c2)
c3 = UserGroupServer.update_user_groups_state(context[:server_pid], {[:x], 2})
assert(1 == c3)
c4 = UserGroupServer.update_user_groups_state(context[:server_pid], {[:d], 1})
assert(1 == c4)
c5 = UserGroupServer.update_user_groups_state(context[:server_pid], {[:d, :c], 2})
assert(2 == c5)
end
end
旧方法%{user => [groups]}
监控存储分配给 user_id 的组列表。如何查找给定组中的用户?我是否必须创建单独的进程来处理组和用户 ID 之间的 m..n 关系?我应该更改什么以获取每个用户组然后映射它们?
服务器实现:
defmodule Myapp.Monitor do
use GenServer
def create(user_id) do
case GenServer.whereis(ref(user_id)) do
nil -> Myapp.Supervisor.start_child(user_id)
end
end
def start_link(user_id) do
GenServer.start_link(__MODULE__, [], name: ref(user_id))
end
def set_groups(user_pid, groups) do
try_call user_pid, {:set_groups, groups}
end
def handle_call({:set_groups, groups}, _from, state) do
{ :reply, groups, groups } # reset user groups on each set_group call.
end
defp ref(user_id) do
{:global, {:user, user_id}}
end
defp try_call(user_id, call_function) do
case GenServer.whereis(ref(user_id)) do
nil -> {:error, :invalid_user}
user_pid -> GenServer.call(user_pid, call_function)
end
end
end
主管:
defmodule Myapp.Supervisor do
use Supervisor
def start_link do
Supervisor.start_link(__MODULE__, :ok, name: __MODULE__)
end
def start_child(user_id) do
Supervisor.start_child(__MODULE__, [user_id])
end
def init(:ok) do
supervise([worker(Myapp.Monitor, [], restart: :temporary)], strategy: :simple_one_for_one)
end
end
示例:
Monitor.create(5)
Monitor.set_groups(5, ['a', 'b', 'c'])
Monitor.create(6)
Monitor.set_groups(6, ['a', 'b'])
Monitor.set_groups(6, ['a', 'c'])
# Monitor.users_in_gorup('a') # -> 2
# Monitor.users_in_gorup('b') # -> 1
# Monitor.users_in_gorup('c') # -> 2
# or eventually more desired:
# Monitor.unique_users_in_groups(['a', 'b', 'c']) # -> 2
# or return in set_groups unique_users_in_groups result
在跳转到进程和 gen_servers 之前,您始终需要考虑数据结构。
您打算如何添加数据?多常?你打算如何查询它?多久一次?
在您的示例中,您提到了三个操作:
- 为用户设置组(重置所有以前设置的组)
- return组中的所有用户
- return 一组组中的唯一用户
使用 Elixir 中最基本的类型(列表和地图),您可以通过两种方式排列数据:
- 映射,其中键是用户,值是组列表 (
%{user => [groups]}
)
- 或反过来 (
%{group => [users]}
)
对于这两个实现,您可以评估操作的速度。对于 %{user => [groups]}
:
- 为用户设置的组是
O(1)
(只需更新映射中的键)
- return 组
O(n*m)
中的所有用户,其中 n
是用户数,m
是组数(对于所有 n
用户,您需要通过扫描 m
组名来检查它是否在组中)
- 组内唯一用户同上+排序去重
对于 %{group => [users]}
的实施:
- set groups for user is
O(n*m)
(如果用户在,需要扫描所有组,删除它,然后只为新组设置)如果set groups only added user to new groups without首先删除它,它只会及时添加与输入中的组数成比例的用户(不是所有组)
- return组中的所有用户
O(1)
- 只需查询地图
- 与查询中的组数成正比+排序和去重
这表明,如果您的监视器更新迅速且查询频率较低,则首次实施效果会好得多。如果您不那么频繁地更新它,那么第二个更好,但一直查询它。
在没有任何参与者或 gen_server 实施这些解决方案之一并且可以说它有效之后,您可能希望将 pid 视为映射键并重写算法。您也可以考虑只使用一个进程来存储所有数据。这也取决于您的确切问题。祝你好运!
在这两种方法中,我坚持如何通过给定的一组 ID 或组映射进程,然后将存储的结构映射到过滤数据。
%{group => [users]}
实施。
我意识到与用户相反,组将受到限制,因此我创建了一个使用组名称作为键的流程模块。
恐怕将来会有很多用户分几个组,所以我的问题是如何拆分当前的 UserGroupServer
模块以保留许多由组名标识的独立进程?我想保留当前模块的功能,在 init processes by groups 列表中,另外我不知道如何映射每个进程以通过 user_id?
目前我在 Phoenix lib/myapp.ex
中只启动了一个进程,通过在子树列表中包含模块,所以我可以直接在 Channels 中调用 UserGroupServer
。
defmodule UserGroupServer do
use GenServer
## Client API
def start_link(opts \ []) do
GenServer.start_link(__MODULE__, :ok, opts)
end
def update_user_groups_state(server, data) do
{groups, user_id} = data
GenServer.call(server, {:clean_groups, user_id}, :infinity)
users = Enum.map(groups, fn(group) ->
GenServer.call(server, {:add_user_group, group, user_id}, :infinity)
end)
Enum.count(Enum.uniq(List.flatten(users)))
end
def get_user_groups(server, user_id) do
GenServer.call(server, {:get_user_groups, user_id})
end
def users_count_in_gorup(server, group) do
GenServer.call(server, {:users_count_in_gorup, group})
end
## Callbacks (Server API)
def init(_) do
{:ok, Map.new}
end
def handle_call({:clean_groups, user_id}, _from, user_group_dict) do
user_group_dict = user_group_dict
|> Enum.map(fn({gr, users}) -> {gr, List.delete(users, user_id)} end)
|> Enum.into(%{})
{:reply, user_group_dict, user_group_dict}
end
def handle_call({:add_user_group, group, user_id}, _from, user_group_dict) do
user_group_dict = if Map.has_key?(user_group_dict, group) do
Map.update!(user_group_dict, group, fn(users) -> [user_id | users] end)
else
Map.put(user_group_dict, group, [user_id])
end
{:reply, Map.fetch(user_group_dict, group), user_group_dict}
end
end
测试:
defmodule MyappUserGroupServerTest do
use ExUnit.Case, async: false
setup do
{:ok, server_pid} = UserGroupServer.start_link
{:ok, server_pid: server_pid}
end
test "add users", context do
c1 = UserGroupServer.update_user_groups_state(context[:server_pid], {[:a, :b, :c], 1})
assert(1 == c1)
c2 = UserGroupServer.update_user_groups_state(context[:server_pid], {[:c, :d], 2})
assert(2 == c2)
c3 = UserGroupServer.update_user_groups_state(context[:server_pid], {[:x], 2})
assert(1 == c3)
c4 = UserGroupServer.update_user_groups_state(context[:server_pid], {[:d], 1})
assert(1 == c4)
c5 = UserGroupServer.update_user_groups_state(context[:server_pid], {[:d, :c], 2})
assert(2 == c5)
end
end
旧方法%{user => [groups]}
监控存储分配给 user_id 的组列表。如何查找给定组中的用户?我是否必须创建单独的进程来处理组和用户 ID 之间的 m..n 关系?我应该更改什么以获取每个用户组然后映射它们?
服务器实现:
defmodule Myapp.Monitor do
use GenServer
def create(user_id) do
case GenServer.whereis(ref(user_id)) do
nil -> Myapp.Supervisor.start_child(user_id)
end
end
def start_link(user_id) do
GenServer.start_link(__MODULE__, [], name: ref(user_id))
end
def set_groups(user_pid, groups) do
try_call user_pid, {:set_groups, groups}
end
def handle_call({:set_groups, groups}, _from, state) do
{ :reply, groups, groups } # reset user groups on each set_group call.
end
defp ref(user_id) do
{:global, {:user, user_id}}
end
defp try_call(user_id, call_function) do
case GenServer.whereis(ref(user_id)) do
nil -> {:error, :invalid_user}
user_pid -> GenServer.call(user_pid, call_function)
end
end
end
主管:
defmodule Myapp.Supervisor do
use Supervisor
def start_link do
Supervisor.start_link(__MODULE__, :ok, name: __MODULE__)
end
def start_child(user_id) do
Supervisor.start_child(__MODULE__, [user_id])
end
def init(:ok) do
supervise([worker(Myapp.Monitor, [], restart: :temporary)], strategy: :simple_one_for_one)
end
end
示例:
Monitor.create(5)
Monitor.set_groups(5, ['a', 'b', 'c'])
Monitor.create(6)
Monitor.set_groups(6, ['a', 'b'])
Monitor.set_groups(6, ['a', 'c'])
# Monitor.users_in_gorup('a') # -> 2
# Monitor.users_in_gorup('b') # -> 1
# Monitor.users_in_gorup('c') # -> 2
# or eventually more desired:
# Monitor.unique_users_in_groups(['a', 'b', 'c']) # -> 2
# or return in set_groups unique_users_in_groups result
在跳转到进程和 gen_servers 之前,您始终需要考虑数据结构。
您打算如何添加数据?多常?你打算如何查询它?多久一次?
在您的示例中,您提到了三个操作:
- 为用户设置组(重置所有以前设置的组)
- return组中的所有用户
- return 一组组中的唯一用户
使用 Elixir 中最基本的类型(列表和地图),您可以通过两种方式排列数据:
- 映射,其中键是用户,值是组列表 (
%{user => [groups]}
) - 或反过来 (
%{group => [users]}
)
对于这两个实现,您可以评估操作的速度。对于 %{user => [groups]}
:
- 为用户设置的组是
O(1)
(只需更新映射中的键) - return 组
O(n*m)
中的所有用户,其中n
是用户数,m
是组数(对于所有n
用户,您需要通过扫描m
组名来检查它是否在组中) - 组内唯一用户同上+排序去重
对于 %{group => [users]}
的实施:
- set groups for user is
O(n*m)
(如果用户在,需要扫描所有组,删除它,然后只为新组设置)如果set groups only added user to new groups without首先删除它,它只会及时添加与输入中的组数成比例的用户(不是所有组) - return组中的所有用户
O(1)
- 只需查询地图 - 与查询中的组数成正比+排序和去重
这表明,如果您的监视器更新迅速且查询频率较低,则首次实施效果会好得多。如果您不那么频繁地更新它,那么第二个更好,但一直查询它。
在没有任何参与者或 gen_server 实施这些解决方案之一并且可以说它有效之后,您可能希望将 pid 视为映射键并重写算法。您也可以考虑只使用一个进程来存储所有数据。这也取决于您的确切问题。祝你好运!