如果客户端订阅了两个主题,如何区分来自 Mercure hub 的一个主题或另一个主题的数据

How to differentiate the data of one topic or another coming from Mercure hub if the client is suscribed to two topics

我在 Symony 应用程序上有一个仪表板,其中包含两个 ChartJS 图表,一个用于温度数据,另一个用于压力数据。我需要实时更新;为此,我尝试将 MercureBundle 与两个主题一起使用:['realtime-notif/temperature/{sensorId}', 'realtime-notif/pressure/{sensorId}']。尽管主题听起来很相似,但符合数据的逻辑是不同的,因为这两个 ChartJS 是不同的,为此我有两个带有 AMQP 队列的 Messenger 消息处理程序,一个在主题 'realtime-notif/temperature/{sensorId}' 中发布 mercure 更新,另一个消息处理程序class 发表于 'realtime-notif/pressure/{sensorId}'。我会尽量把代码总结的简洁一些。

#mercure.yaml:
mercure:
    hubs:
        default:
            url: '%env(MERCURE_URL)%'
            public_url: '%env(MERCURE_PUBLIC_URL)%'
            jwt:
                secret: '%env(MERCURE_JWT_SECRET)%'
                publish: ['realtime-notif/temperature/{sensorId}', 'realtime-notif/pressure/{sensorId}']
                subscribe: ['realtime-notif/temperature/{sensorId}', 'realtime-notif/pressure/{sensorId}']

#The TemperatureMessageHandler class: 
class TemperatureMessageHandler implements MessageHandlerInterface
{

    private $mercureHub;
    private $managerRegistry;

    public function __construct(HubInterface $mercureHub, ManagerRegistry $managerRegistry)
    {
        $this->mercureHub = $mercureHub;
        $this->managerRegistry = managerRegistry;
    }

    public function __invoke(TemperatureMessage $message)
    {
        try {
            $graphId=$message->getGraphId();
            $lastElapsedTime=$message->getLastElapsedTime();
            
            $em=$this->managerRegistry->getManager();

            $storedData = $em->getRepository(Temperature::class)->findLastRecordsForGraph($graphId, $lastElapsedTime);
            
            /**
             Set the data source for the temperature graph to a specific format from $toredData
            **/
            $formatedChartData = [];
                **....**

            $update = new Update(
                    sprintf('realtime-notif/temperature/%s', $graphId),
                    \json_encode($$formatedChartData),
                    true
            );

            $this->mercureHub->publish($update);
        } catch (\Exception $exc) {
            
        }
    }
}

而且,

#The PressureMessageHandler class:
 class PressureMessageHandler implements MessageHandlerInterface
{

    private $mercureHub;
    private $managerRegistry;

    public function __construct(HubInterface $mercureHub, ManagerRegistry $managerRegistry)
    {
        $this->mercureHub = $mercureHub;
        $this->managerRegistry = managerRegistry;
    }

    public function __invoke(PressureMessage $message)
    {
        try {
            $graphId = $message->getGraphId();
            $lastElapsedTime = $message->getLastElapsedTime();
            
            $em = $this->managerRegistry->getManager();

            $storedData = $em->getRepository(Pressure::class)->findLastRecordsForGraph($graphId, $lastElapsedTime);
            
            /**
             Set the data source for the pressure graph to a specific format from $toredData
            **/
            $formatedChartData = [];
                **....**
            

            $update = new Update(
                    sprintf('realtime-notif/pressure/%s', $graphId),
                    \json_encode($$formatedChartData),
                    true
            );

            $this->mercureHub->publish($update);
        } catch (\Exception $exc) {
            
        }
    }
}

我的问题是,如果从 Mercure hub 接收的数据来自 EventSource 对象的消息事件中的温度主题或压力主题,我不知道如何在客户端区分。

<script type="text/javascript">
                $(document).ready(function () {
                    /**Create two graph on page ready **/
                    let temperatureGraphObject = createTemperatureGraph(canvasTemperaturaGraph);
                    let pressureGRaphObject = createPressureGraph(canvasPressureGraph);
                    
                    /** 
                    I have two function updateTemperatureGraph(temperatureGraphObject, newTemperaturaData) and updatePressureGraph(pressureGraphObject, newPresureData)
                    **/
                    
                    /**Subscribe client to topics for data updates **/
                    {% set topics = ['realtime-notif/temperature/'~temperatureSensorId, 'realtime-notif/pressure/'~pressureSensorId] %}

                    const eventSource = new EventSource("{{ mercure(topics, { subscribe:topics})|escape('js')}}", {withCredentials: true});

                    eventSource.onopen = function () {
                        console.log('New socket connection!');
                    };

                    eventSource.onmessage = function (e) {
                        console.log('New data received');
                        var data = JSON.parse(e.data);
                        
                        /** 
                        The problem is here, how differentiate the topics data to call updateTemperaturaGraph(temperatureGraphObject, data) or  updatePressureGraph(pressureGraphObject, data)
                        **/                     
                    };

                    eventSource.onerror = function () {
                        console.log('Socket connection lost!');
                    };
                });
            </script>

那么,如何区分主题数据调用 updateTemperaturaGraph(temperatureGraphObject, data) 或 updatePressureGraph(pressureGraphObject, data) 到 onmessage 事件中?

如果我只为客户端订阅一个主题,所有接收到的数据都将是主题图的一种,当然图会正确更新。

解决方案是为更新 class 构造函数设置 type 属性,您希望从 MessageHandler 相关 class 发布到 Mercure 中心。因此,对于与温度通知相关的消息,我们将设置 type 属性 = 'temperatureUpdate',对于与压力通知相关的消息,我们将设置 type 属性 = 'pressureUpdate' .

TemperatureMessageHandler 上的 __invoke 函数:

public function __invoke(TemperatureMessage $message)
    {
        try {
            $graphId=$message->getGraphId();
            $lastElapsedTime=$message->getLastElapsedTime();
            
            $em=$this->managerRegistry->getManager();

            $storedData = $em->getRepository(Temperature::class)->findLastRecordsForGraph($graphId, $lastElapsedTime);
            
            /**
             Set the data source for the temperature graph to a specific format from $toredData
            **/
            $formatedChartData = [];
                **....**

            $update = new Update(
                    sprintf('realtime-notif/temperature/%s', $graphId),
                    \json_encode($$formatedChartData),
                    true,
                    null,
                    'temperatureUpdate'
            );

            $this->mercureHub->publish($update);
        } catch (\Exception $exc) {
            
        }
    }

PressureMessageHandler 上的 __invoke 函数:

public function __invoke(PressureMessage $message)
    {
        try {
            $graphId = $message->getGraphId();
            $lastElapsedTime = $message->getLastElapsedTime();
            
            $em = $this->managerRegistry->getManager();

            $storedData = $em->getRepository(Pressure::class)->findLastRecordsForGraph($graphId, $lastElapsedTime);
            
            /**
             Set the data source for the pressure graph to a specific format from $toredData
            **/
            $formatedChartData = [];
                **....**
            

            $update = new Update(
                    sprintf('realtime-notif/pressure/%s', $graphId),
                    \json_encode($$formatedChartData),
                    true,
                    null,
                    'pressureUpdate'
            );

            $this->mercureHub->publish($update);
        } catch (\Exception $exc) {
            
        }
    }

在客户端,必须为 EventSource 对象创建两个新的 EventListener,名称与创建的新类型相同。每一位新听众都会对待在 Mercure 中心发布的相关消息类型:

<script type="text/javascript">
                $(document).ready(function () {
                    /**Create two graph on page ready **/
                    let temperatureGraphObject = createTemperatureGraph(canvasTemperaturaGraph);
                    let pressureGRaphObject = createPressureGraph(canvasPressureGraph);
                    
                    /** 
                    I have two function updateTemperatureGraph(temperatureGraphObject, newTemperaturaData) and updatePressureGraph(pressureGraphObject, newPresureData)
                    **/
                    
                    /**Subscribe client to topics for data updates **/
                    {% set topics = ['realtime-notif/temperature/'~temperatureSensorId, 'realtime-notif/pressure/'~pressureSensorId] %}

                    const eventSource = new EventSource("{{ mercure(topics, { subscribe:topics})|escape('js')}}", {withCredentials: true});

                    eventSource.onopen = function () {
                        console.log('New socket connection!');
                    };

                    eventSource.addEventListener("temperaturaUpdate", function (e) {
                        let parsedData = null;
                                try {
                                    parsedData = JSON.parse(e.data);
                                } catch (error) {
                                    console.log(error);
                                }
                                
                                if (parsedData) {
                                    updateTemperatureGraph(temperatureGraphObject, parsedData);
                                }
                    }, false);
                    
                    eventSource.addEventListener("pressureUpdate", function (e) {
                        let parsedData = null;
                                try {
                                    parsedData = JSON.parse(e.data);
                                } catch (error) {
                                    console.log(error);
                                }
                                
                                if (parsedData) {
                                    updatePressureGraph(pressureGraphObject, parsedData);
                                }
                    }, false);

                    eventSource.onerror = function () {
                        console.log('Socket connection lost!');
                    };
                });
            </script>

这样Mercure hub就发布了class化的消息,每个EventListener都会按照消息到达订阅客户端的顺序负责处理相应的消息,与主题无关它已订阅。