除非 Heroku dyno 计数按比例增加,否则消息不会到达消费者

Messages not getting to consumer unless Heroku dyno count is scaled way up

我们用 React 构建了一个前端,用 Django Rest Frameworks 和通道构建了一个后端。我们使用 Heroku Redis 作为我们的 Redis 供应商。我们的用户通过 ReconnectingWebSocket.

连接到频道

我们正在使用 Python 3.6 和频道 2.4

问题是我们的 API 调用试图将信息传递给套接字,但它们并不总是传递给消费者。我通过打印记录了呼叫的步骤,打印了 channel_name 它将尝试将其发送到并确认它是连接时返回给用户的内容,但消费者中的打印件没有被称为含义消息永远不会发送给用户。

如果我将 dynos 的数量增加到或多或少的 1-1,并且用户连接到套接字,那么它似乎可以解决问题(或者至少使它更可靠)。据我了解,1 dyno 应该能够处理许多套接字连接。我的消费者没有收到信号是否有原因?是否有理由扩大测功机的数量来解决问题?

连接时,我让用户加入一个名为“u_{their id}”的组,以允许潜在地将信号发送到以同一用户身份登录的多台计算机。我已经尝试通过他们的 channel_name 直接和通过该组发送消息,当消息没有通过时,似乎也没有通过。 prints 验证 channel_names 是否正确,消费者仍然没有收到消息。似乎没有发生任何错误。它可能不起作用,然后我会刷新收件人并且它会起作用,然后我会再次刷新收件人并且它恢复不工作。

套接字连接肯定是活动的——我在前端做了一个简单的函数来 ping 套接字,当我这样做时(即使消费者没有从 API 调用中获得信号),它回应。

我还注意到,如果我重新启动我的测功机,当它们加载并且套接字重新连接时,第一个用户有信号通过 API 调用工作一小段时间然后他们开始不再通过。另外,如果我有一段时间不使用套接字然后刷新它们似乎也会再次开始短暂工作。

Procfile

web: daphne doctalk.asgi:application --port $PORT --bind 0.0.0.0

consumers.py

import json
from asgiref.sync import async_to_sync
from channels.db import database_sync_to_async

from channels.generic.websocket import AsyncWebsocketConsumer
from messages.models import Thread
from profile.models import OnlineStatus, DailyOnlineUserActivity
from rest_framework.authtoken.models import Token
from django.contrib.auth.models import AnonymousUser
from .exceptions import ClientError
import datetime
from django.utils import timezone

class HeaderConsumer(AsyncWebsocketConsumer):
    async def connect(self):   
        await self.accept()
        await self.send("request_for_token")


    async def continue_connect(self):
        print(self.channel_name)
        print(self.channel_layer)
        await self.send(json.dumps({'your_channel_name': self.channel_name}))

        await self.get_user_from_token(self.scope['token'])

        await self.channel_layer.group_send(
            "online_users",
            {
                "type": "new_user_online",
                "user": self.user,
                "channel_layer": str(self.channel_layer),
                "channel_name": self.channel_name,
            }
        )

        await self.channel_layer.group_add(
            "online_users",
            self.channel_name,
        )

        print("adding to personal group u_%d" % self.user['id'])
        await self.channel_layer.group_add(
            "u_%d" % self.user['id'],
            self.channel_name,
        )


        self.message_threads = set()

        self.message_threads = await self.get_message_ids()

        for thread in self.message_threads:
            await self.monitor_thread(thread)

        self.doa = await self.check_for_or_establish_dailyonlineactivity()
        self.online_status = await self.establish_onlinestatus()
        await self.add_to_online_status_list()

        self.user_id_list = await self.get_online_user_list()
        await self.send_online_user_list()

    async def disconnect(self, code):
        # Leave all the rooms we are still in
        if hasattr(self, 'user'):
            await self.remove_from_dailyonlineactivity()

            try:
                await self.channel_layer.group_discard(
                    "u_%d" % self.user['id'],
                    self.channel_name,
                )
            except Exception as e:
                print("issue with self channel")
                print(e)

            try:
                await self.channel_layer.group_send(
                    "online_users",
                    {
                        "type": "user_went_offline",
                        "message": self.user['id'],
                    }
                )

            except Exception as e:
                print("issue with online_users")
                print(e)

            await self.channel_layer.group_discard(
                "online_users",
                self.channel_name,
            )
            try:
                for thread_id in list(self.message_threads):
                    print("leaving " + str(thread_id))
                    try:
                        self.message_threads.discard(thread_id)
                        await self.channel_layer.group_discard(
                            "m_%d" % thread_id,
                            self.channel_name,
                        )
                    except ClientError:
                        pass
            except Exception as e:
                print("issue with threads")
                print(e)

    async def receive(self, text_data):
        print(text_data)
        text_data_json = json.loads(text_data)
        if 'token' in text_data_json:
            self.scope['token'] = text_data_json['token']
            await self.continue_connect()

        #self.send(text_data=json.dumps({
        #    'message': message
        #}))

    async def new_message(self, event):
        # Send a message down to the client
        await self.send(text_data=json.dumps(
            {
                "type": event['type'],
                "thread": event['thread'],
                "message": event["message"],
            },
        ))

    async def user_went_offline(self, event):
        await self.send(text_data=json.dumps(
            {
                "type": event['type'],
                "message": event["message"],
            },
        ))

    async def send_call_ring(self, event):
        print("SENDING CALL RING")
        print(event["message"])
        await self.send(text_data=json.dumps(
            {
                "type": event['type'],
                "message": event["message"],
            },
        ))

    async def rejoin_call(self, event):
        await self.send(text_data=json.dumps(
            {
                "type": event['type'],
                "message": event["message"],
            },
        ))

    async def popup_notification(self, event):
        print("sending popup_notification")
        print(event)
        await self.send(text_data=json.dumps(
            {
                "type": event['type'],
                "message": event["message"],
            },
        ))


    async def new_call_participant(self, event):
        print("new_call_participant received")
        print(event)
        await self.send(text_data=json.dumps(
            {
                "type": event['type'],
                "message": event["message"],
            },
        ))

    async def new_participants_invited(self, event):
        print("new_participants_invited received")
        print(event)
        await self.send(text_data=json.dumps(
            {
                "type": event['type'],
                "message": event["message"],
            },
        ))

    async def share_document_via_videocall(self, event):    
        print("share_document received")
        print(event)
        print(self.channel_name)
        print(self.user['id'])
        
        await self.send(text_data=json.dumps(
            {
                "type": event['type'],
                "message": event["message"],
            },
        ))


    async def event_video_share_link(self, event):   

        await self.send(text_data=json.dumps(
            {
                "type": event['type'],
                "message": event["message"],
            },
        ))

    async def event_video_hand_up(self, event):    

        await self.send(text_data=json.dumps(
            {
                "type": event['type'],
                "message": event["message"],
            },
        ))

    async def event_video_address_hand_up(self, event):    

        await self.send(text_data=json.dumps(
            {
                "type": event['type'],
                "message": event["message"],
            },
        ))


    async def you_are_dominant_speaker(self, event):

        # Send a message down to the client
        print("SENDING DOMINANT SPEAKER")
        print(event)
        await self.send(text_data=json.dumps(
            {
                "type": event['type'],
                "message": event["message"],
            },
        ))


    async def you_are_no_longer_dominant_speaker(self, event):

        print("SENDING NO LONGER DOMINANT SPEAKER")
        print(event)
        await self.send(text_data=json.dumps(
            {
                "type": event['type'],
                "message": event["message"],
            },
        ))


    async def event_video_screenshare(self, event):    

        await self.send(text_data=json.dumps(
            {
                "type": event['type'],
                "message": event["message"],
            },
        ))


    async def event_video_reaction(self, event):    

        await self.send(text_data=json.dumps(
            {
                "type": event['type'],
                "message": event["message"],
            },
        ))

    async def video_call_thread(self, event):

        print("sending video call thread")
        print(event)

        await self.send(text_data=json.dumps(
            {
                "type": event['type'],
                "message": event["message"],
            },
        ))


    async def video_call_chat_message(self, event):

        print("sending video call chat message")
        print(event)

        await self.send(text_data=json.dumps(
            {
                "type": event['type'],
                "message": event["message"],
            },
        ))

    async def event_chat_message(self, event):

        print("sending event chat message")
        print(event)

        await self.send(text_data=json.dumps(
            {
                "type": event['type'],
                "message": event["message"],
            },
        ))

    async def to_next_agenda_item(self, event):

        print("sending video call chat message")
        print(event)

        await self.send(text_data=json.dumps(
            {
                "type": event['type'],
                "message": event["message"],
            },
        ))


    async def mute_all_event_participants(self, event):

        print("sending mute all participants")
        print(event)

        await self.send(text_data=json.dumps(
            {
                "type": event['type'],
                "message": event["message"],
            },
        ))


    async def event_started(self, event):

        print("event started consumer")
        print(event)

        await self.send(text_data=json.dumps(
            {
                "type": event['type'],
                "message": event["message"],
            },
        ))

    async def event_ended(self, event):

        print(event)

        await self.send(text_data=json.dumps(
            {
                "type": event['type'],
                "message": event["message"],
            },
        ))
        
    async def video_call_reaction(self, event):

        print("sending video call reaction")
        print(event)

        await self.send(text_data=json.dumps(
            {
                "type": event['type'],
                "message": event["message"],
            },
        ))


    async def new_user_online(self, event):

        print("user_online received")
        print(event)
        await self.send(text_data=json.dumps(
            {
                "type": event['type'],
                "message": event["user"],
                "channel_layer": event["channel_layer"],
                "channel_name": event["channel_name"],
            },
        ))

    @database_sync_to_async
    def get_message_ids(self):
        return set(Thread.objects.filter(participants__id=self.user['id'], subject="").values_list('id', flat=True))

    async def monitor_thread(self, thread_id):
        print("monitoring thread %d" % thread_id)
        print("on channel %s" % self.channel_name)
        await self.channel_layer.group_add(
            "m_%d" % thread_id,
            self.channel_name,
        )

    @database_sync_to_async
    def get_user_from_token(self, t):
        try:
            print("trying token" + t)
            token = Token.objects.get(key=t)
            self.user = token.user.get_profile.json()
        except Token.DoesNotExist:
            print("failed")
            self.user = AnonymousUser()

    @database_sync_to_async
    def check_for_or_establish_dailyonlineactivity(self):
        doa, created = DailyOnlineUserActivity.objects.get_or_create(date=datetime.date.today())
        if created:
            print("created DOA %d" %doa.id)
        else:
            print("found existing DOA %d" %doa.id)
        return doa

    @database_sync_to_async
    def establish_onlinestatus(self):
        old_os = OnlineStatus.objects.filter(user_id=self.user['id'], online_to=None)
        if old_os.exists():
            for os in old_os:
                print("found unclosed OS %d" % old_os[0].id)
                os.online_to = timezone.now()
                os.save()
        new_os = OnlineStatus(
            user_id=self.user['id'],
            channel_name=self.channel_name
        )
        new_os.save()
        return new_os

    @database_sync_to_async
    def add_to_online_status_list(self):
        self.doa.currently_active_users.add(self.user['id'])
        self.doa.all_daily_users.add(self.user['id'])
        self.doa.online_log.add(self.online_status)
        self.doa.save()

    @database_sync_to_async
    def remove_from_dailyonlineactivity(self):
        if hasattr(self, 'doa') and self.doa is not None:
            self.doa.currently_active_users.remove(self.user['id'])
        if hasattr(self, 'onine_status') and self.online_status is not None:
            self.online_status.online_to = timezone.now()
            self.online_status.save()

    @database_sync_to_async
    def get_online_user_list(self):   
        user_id_list = list(self.doa.currently_active_users.all().values_list('id', flat=True))
        user_id_list.remove(self.user['id'])
        return user_id_list

    async def send_online_user_list(self):
        print("sending online_users")
        await self.send(text_data=json.dumps(
            {
                "type": "online_users",
                "message": self.user_id_list,
            },
        ))

    async def participant_ignored(self, event):
        print("irgnored call")
        await self.send(text_data=json.dumps(
            {
                "type": event['type'],
                "message": event["message"],
            },
        ))

    async def participant_left(self, event):
        print("left call")
        await self.send(text_data=json.dumps(
            {
                "type": event['type'],
                "message": event["message"],
            },
        ))

    async def participant_joined(self, event):
        print("left call")
        await self.send(text_data=json.dumps(
            {
                "type": event['type'],
                "message": event["message"],
            },
        ))

    async def video_screenshare(self, event):

        print("sending screenshare")
        await self.send(text_data=json.dumps(
            {
                "type": event['type'],
                "message": event["message"],
            },
        ))

通过向 VideoRoom 添加配置文件触发的 django 信号:

@receiver(m2m_changed, sender=VideoRoom.invitees.through)
def invitee_added(sender, **kwargs):
    instance = kwargs.pop('instance', None)
    action = kwargs.pop('action', None)
    pk = kwargs.pop('pk_set', None)

    if action == 'post_add':    

        if len(pk) > 0:
            user = Profile.objects.get(id=list(pk)[0])
            if instance.initiator.id == user.id:
                return

            identity = "u_%d" % user.id

            # Create access token with credentials
            token = AccessToken(settings.TWILIO_ACCOUNT_SID, settings.TWILIO_API_KEY, settings.TWILIO_API_SECRET,
                                identity=identity, ttl=86399)

            # Create a Video grant and add to token
            video_grant = VideoGrant(room=instance.room_name)
            token.add_grant(video_grant)

            invitee_access_token = VideoAccessToken(user=user, token=token.to_jwt())
            invitee_access_token.save()

            instance.invitee_access_tokens.add(invitee_access_token)

            channel_layer = get_channel_layer()
            print(channel_layer)

            profiles = {"u_%d" % instance.initiator.id: instance.initiator.json()}

            for u in instance.current_participants.all():
                profiles["u_%d" % u.id] = u.json()
            print("instance.type")
            print(instance.type)
            if instance.type != 'event':
                print("sending to existing users")
                for key, value in profiles.items():
                    if value['id'] != user.id:
                        async_to_sync(channel_layer.group_send)(
                            key,
                            {'type': 'new_call_participant',
                             'message': {
                                 'key': "u_%d" % user.id,
                                 'value': user.json()
                             }
                             }
                        )

                ons = OnlineStatus.objects.get(user=user, online_to=None)
                print("in signal, sending to %s on channel %s" % (user.full_name, ons.channel_name))

                async_to_sync(channel_layer.send)(
                    ons.channel_name,
                    {'type': 'send_call_ring',
                     'message': {
                         'id': instance.id,
                         'room_name': instance.room_name,
                         'identity': "u_%d" % user.id,
                         'profiles': profiles,
                         'token': invitee_access_token.token.decode(),
                         'answered': False,
                         'initiated': False,
                         'caller': instance.initiator.json()
                     }
                     }
                )

在不成功的套接字信号期间记录:

2021-03-11T15:16:14.489596+00:00 app[web.1]: pk
2021-03-11T15:16:14.489655+00:00 app[web.1]: {113}
2021-03-11T15:16:14.518051+00:00 app[web.1]: pk
2021-03-11T15:16:14.518058+00:00 app[web.1]: {68}
2021-03-11T15:16:14.786357+00:00 app[web.1]: sending to existing users
2021-03-11T15:16:14.786377+00:00 app[web.1]: u_113
2021-03-11T15:16:14.911441+00:00 app[web.1]: u_68
2021-03-11T15:16:14.915900+00:00 app[web.1]: in signal, sending to John Doe on channel u_68
2021-03-11T15:16:15.228644+00:00 app[web.1]: 10.63.249.212:12999 - - [11/Mar/2021:10:16:15] "POST /api/start-video-chat/" 200 3523
2021-03-11T15:16:15.231562+00:00 heroku[router]: at=info method=POST path="/api/start-video-chat/" host=project-name.herokuapp.com request_id=7ec75a21-c6bd-452b-9517-cd500064d7ee fwd="12.34.56.78" dyno=web.1 connect=3ms service=955ms status=200 bytes=3714 protocol=http

调用成功:

2021-03-11T15:20:50.253243+00:00 app[web.4]: pk
2021-03-11T15:20:50.253248+00:00 app[web.4]: {113}
2021-03-11T15:20:50.280925+00:00 app[web.4]: pk
2021-03-11T15:20:50.280926+00:00 app[web.4]: {68}
2021-03-11T15:20:50.614504+00:00 app[web.4]: sending to existing users
2021-03-11T15:20:50.614527+00:00 app[web.4]: u_113
2021-03-11T15:20:50.713880+00:00 app[web.4]: u_68
2021-03-11T15:20:50.718141+00:00 app[web.4]: in signal, sending to John Doe on channel u_68
2021-03-11T15:20:50.799546+00:00 app[web.2]: CALLING
2021-03-11T15:20:50.801670+00:00 app[web.2]: {'type': 'send_call_ring', 'message': "some payload data"}
2021-03-11T15:20:50.965602+00:00 app[web.4]: 10.11.225.205:25635 - - [11/Mar/2021:10:20:50] "POST /api/start-video-chat/" 200 3533
2021-03-11T15:20:50.964378+00:00 heroku[router]: at=info method=POST path="/api/start-video-chat/" host=project-name.herokuapp.com request_id=2da9918b-b587-4db9-a3c2-9d6dfd55ef42 fwd="12.34.56.78" dyno=web.4 connect=1ms service=888ms status=200 bytes=3724 protocol=http

问题最终出在了 Redis 上。我从 channels-redis 转换为 channels-rabbitmq,我所有的问题都消失了。我不知道是我的 Redis 提供商还是 channels-redis,但只需更改后端即可解决所有问题。