如何知道你是否将两个套接字连接到 django 通道中的同一组 channel-layer

How to know if you are connecting two sockets to same group of channel-layer in django channels

我实际上正在尝试构建一个系统,其中两个实体(医生和患者)可以使用 django 通过 websockets 共享相同的数据。
我设置频道的方式是在 websocket 协议中通过 query_string 发送 auth-token
这些模型按以下方式配置


患者模型还有一个名为“grp_id”的属性 grp_id = models.UUIDField(default=uuid.uuid4, editable=False)


consumers.py文件是
• 为患者工作

  1. 通过 query-string 发送身份验证令牌来发出连接请求,这将对用户进行身份验证
  2. 由于用户是有耐心的,因此提取了患者的grp_id
  3. 使用 grp_id 值创建频道
  4. 患者触发start_sepsis函数接收一组sepsis-attribute,然后序列化存入DB
  5. 相同的序列化数据通过通道广播

• 为医生工作

  1. 如上认证
  2. 医生使用 _get_patient_of_doctor 辅助函数
  3. 获取与其关联的患者
  4. 医生会尝试连接所有患者的 grp_id 相关联
  5. 连接后广播一条名为“doc is connected”的消息

class SepsisDynamicConsumer(AsyncJsonWebsocketConsumer):

    groups = ['test']

    @database_sync_to_async
    def _get_user_group(self, user):
        return user.user_type

    @database_sync_to_async
    def _get_user_grp_id(self, user):
        #************** THE grp function initiated **************
        print("THE USER obj", user)#output below
        print("email", user.email)#output below
        if "PATIENT" == user.user_type:
            return str(user.patient_set.all()[0].grp_id)
        elif "DOCTOR" == user.user_type:
            return map(str, user.doctor_set.all()[0].patient_set.all().values_list('grp_id', flat=True))
        else:
            print("THE USER IS SOMETHING ELSE")

    @database_sync_to_async
    def _get_patient_of_doctor(self, user):
        x = user.doctor_set.all()
        doc_obj = x[0]
        pat_ids = doc_obj.patient_set.all().values_list('grp_id', flat=True)
        return pat_ids

    @database_sync_to_async
    def _created_sepsis_data(self, data):
        """[summary]
            This helper function would generate and return the 
        """
        x = get_user_model().objects.get(id=data['patient'])
        x = x.patient_set.values('id')[0]['id']
        data.update({'patient': x})
        serializer = SepsisPatientSerializer(data=data)
        serializer.is_valid(raise_exception=True)
        x = serializer.create(serializer.validated_data)
        return SepsisPatientSerializer(x).data

    async def connect(self):
        user = self.scope['user']
        print("THE USER IS ------>", user)#output below
        if user.is_anonymous:
            print("user was unknown")
            await self.close()
        else:
            if user.user_type == 'PATIENT':
                pat_grp_id = await self._get_user_grp_id(user)
                await self.channel_layer.group_add(
                    group=pat_grp_id,
                    channel=self.channel_name
                )
                print("CONNECT TO ---------> ", pat_grp_id)
            elif user.user_type == 'DOCTOR':
                for doc_pat_grp_id in await self._get_user_grp_id(user):
                    print("Doc connected --------->", doc_pat_grp_id)
                    await self.channel_layer.group_add(
                        group=doc_pat_grp_id,
                        channel=self.channel_name
                    )
                print("Doc connected ", doc_pat_grp_id)
            await self.accept()

    async def start_sepsis(self, message):
        data = message.get('data')
        sepsis_generated_and_saved_data = await self._created_sepsis_data(data)
        await self.send_json({
            'type': 'echo.message',
            'data': sepsis_generated_and_saved_data
        })

    async def disconnect(self, code):
        user = self.scope['user']
        if user.is_anonymous:
            await self.close()
        else:
            if user.user_type == 'PATIENT':
                pat_grp_id = await self._get_user_grp_id(user)
                await self.channel_layer.group_discard(
                    group=pat_grp_id,
                    channel=self.channel_name
                )
                await super().disconnect(code)

    async def echo_message(self, message):
        await self.send_json(message)

    async def receive_json(self, content, **kwargs):
        message_type = content.get('type')
        if message_type == 'start.sepsis':
            await self.start_sepsis(content)
        if message_type == 'echo.message':
            await self.send_json({
                'type': message_type,
                'data': content.get('data'),
            })

问题是: 当我 运行 一个按以下顺序进行的测试时

  1. 创建患者
  2. 向服务器发送一个 websocket 连接请求
  3. 在患者的数据库中,我添加了一个“grp_id”属性,以便为 django-channel-layer 组提供唯一的 ASCII unicode,患者将连接到该组
  4. 联系医生
  5. 向服务器发送一个websocket连接请求
  6. 通过 websocket 从患者发送“echo.message”数据:“患者已连接”
  7. 通过 websocket 从 doctor 发送一个“echo.message”数据:“doctor got connected”

问题是,每当测试 运行s 时,即使我使用 django-channel 广播功能,患者也只会收到患者发送的数据,而医生会收到医生发送的相同数据这会将数据发送给所有连接的用户。
还要解决类似的问题(如果有多个患者与医生相关联,并且由于“for 循环”,医生正在 query_set 中找到最后一个患者怎么办)好吧,现在只有一名医生与每个患者相关联。

我的测试

async def test_patient_doctor_on_same_channel(self, settings):
        settings.CHANNEL_LAYERS = TEST_CHANNEL_LAYERS
        # create doctor
        doctor_user, doctor_access = await create_user(
            'test_Doctor.user@example.com', 'pAssw0rd', 'DOCTOR', 'testDoctor_username'
        )
        # create patient
        user, access = await create_user(
            'test.user@example.com', 'pAssw0rd', 'PATIENT', 'test_username'
        )
        # connect patient
        communicator = WebsocketCommunicator(
            application=application,
            path=f'/sepsisDynamic/?token={access}'
        )
        connected, _ = await communicator.connect()

        # connect doctor
        doctor_communicator = WebsocketCommunicator(
            application=application,
            path=f'/sepsisDynamic/?token={doctor_access}'
        )
        # the doctor connects to the online patient
        doctor_connected, _ = await doctor_communicator.connect()

        # Simply echo an message
        message = {
            'type': 'echo.message',
            'data': 'This is a test message.'
        }

        await communicator.send_json_to(message)

        # checking if patient received the data on their end
        response = await communicator.receive_json_from()
        assert response == message

        # checking if doctor received the patient's data on their end
        response_doc = await doctor_communicator.receive_json_from()
        response_doc == message

        await communicator.disconnect()
        await doctor_communicator.disconnect()

我的直觉说它没有连接到同一个 websocket,但我不知道为什么。
测试的输出

THE USER IS ------> test_username
************** THE grp function initiated **************
THE USER obj test_username
email test.user@example.com
CONNECT TO --------->  1b2a455c-28b0-4c4d-9d26-40f6a4634fa9
**********************************************************
THE USER IS ------> testDoctor_username
THE DOC CONDITION RAN
************** THE grp function initiated **************
THE USER obj testDoctor_username
email test_Doctor.user@example.com
THE patient's unicode  <QuerySet [UUID('1b2a455c-28b0-4c4d-9d26-40f6a4634fa9')]>
Doc connected ---------> 1b2a455c-28b0-4c4d-9d26-40f6a4634fa9
Doc connected  1b2a455c-28b0-4c4d-9d26-40f6a4634fa9
THE RECEIVE FUNCTION RAN
THE MESSAGE TYPE echo.message
******The error******
TestWebSocket::test_patient_doctor_on_same_channel - asyncio.exceptions.TimeoutError
response_doc = await doctor_communicator.receive_json_from()

请帮忙。

谢谢。

您正在使用发送到同一频道的 send_json。要发送到群组,您必须使用文档 chat example 中记录的 channel_layer.group_send 方法。您在组中的每个通道中指定处理程序,然后将每条消息向下推送到客户端。您的体系结构的一个问题是医生的频道可以连接到许多患者组,并且无法知道他正在向哪个特定患者发送消息。解决这个问题的一种方法是将患者组编号转发到前端,以便它可以在发送消息时指定患者组名称。然后你可以这样发送消息

async def receive_json(self, content, **kwargs):
        message_type = content.get('type')
        if message_type == 'start.sepsis':
            await self.start_sepsis(content)
        if message_type == 'echo.message':
            group_name = await self._get_user_grp_id(user) 
            if "DOCTOR" == user.user_type:
                group_name = content['group_name']
            await self.channel_layer.group_send(
                'type': 'chat.message',
                'data': content.get('data'),
            })

async def chat_message(self, event):
    """
    Called when someone has messaged our chat.
    """
    # Send a message down to the client
    await self.send_json(
        {
            "message": event["data"],
        }
    )