合并延迟的 Observables Udp

Merging Deferred Observerables Udp

我正在尝试将一组延迟的可观察对象(源自 UDP.RecieveAsync)调用合并到一个我也可以订阅的可观察对象中。我是 Reactive Extensions 的新手,我确定我在延迟方面做错了。

Log.Information("InboundUdpListener is starting");

IObservable<UdpReceiveResult> receiveStream = null;

IList<IObservable<UdpReceiveResult>> receivingStreams = new List<IObservable<UdpReceiveResult>>();

foreach (var devicePortMapping in _deviceTypeMapper.GetDeviceTypes())
{
      Log.Information("InboundUdpListener is starting for DeviceType: {DeviceType}, Port: {Port}",
      devicePortMapping.DeviceType, devicePortMapping.Port);

      var client = new UdpClient(devicePortMapping.Port);

      receivingStreams.Add(Observable.Defer(() => client
          .ReceiveAsync()
          .ToObservable())
          .Repeat());

      _clients.Add(client);
}

receiveStream = receivingStreams.Merge();

_listener = receiveStream.Subscribe(async r =>
{
    Log.Information("InboundUdpListener received {BytesReceived} bytes from IPAddress : {IPAddress}, Port : {Port}", r.Buffer.Length, r.RemoteEndPoint.Address.MapToIPv4(),r.RemoteEndPoint.Port);

    var message = new IncomingMessage(r.RemoteEndPoint, r.Buffer);

    var deviceTypeMap = _deviceTypeMapper.GetDeviceType(message);

    message.DeviceType = deviceTypeMap?.DeviceType ?? DeviceTypeEnum.UnIdentified;

    Log.Information("InboundUdpListener is publishing message {@Message}", message);

    await _messagePublisher.Publish(message);
});

Log.Information("InboundUdpListener is started");

您实际上并没有通过在循环中调用 merge 来合并您的可观察对象。您只需在每次迭代中创建一个可观察对象。你想要做的是将你在循环中创建的所有可观察对象传递到最后合并以创建一个合并的可观察对象。

编辑:将我上面的评论变成了答案。

你肯定是在以程序的方式思考,而不是功能性的。

您需要尝试将您的处理保持在可观察范围内,并避免 foreach 和可观察范围的临时列表。

您还使用 UdpClient 来生成您的值 - 该对象是一次性的,因此您的 observable 应该为您管理它的生命周期。您可以使用 Observable.Using.

此外,异步方法可以与 Observable.FromAsync 一起使用,因此您也应该使用它。

所以,考虑到所有这些,您的 receiveStream 应该看起来像这样:

IObservable<UdpReceiveResult> receiveStream =
    from devicePortMapping in _deviceTypeMapper.GetDeviceTypes().ToObservable()
    from stream in
        Observable
            .Using(
                () => new UdpClient(devicePortMapping.Port),
                client =>
                    Observable
                        .FromAsync(() => client.ReceiveAsync())
                        .Repeat())
    select stream;

现在,鉴于我在您的订阅电话中看到的内容,您可能可以更进一步并执行此操作:

IObservable<UdpReceiveResult> receiveStream =
    from devicePortMapping in _deviceTypeMapper.GetDeviceTypes().ToObservable()
    from stream in
        Observable
            .Using(
                () => new UdpClient(devicePortMapping.Port),
                client =>
                    Observable
                        .FromAsync(() => client.ReceiveAsync())
                        .Repeat())
    select new IncomingMessage(stream.RemoteEndPoint, stream.Buffer)
    {
        DeviceType = devicePortMapping
    };

现在这意味着您可以在查询本身中访问原始设备类型,因此无需查找 - 如果我正确理解了您的代码在做什么。

如果您确实需要进行查找,那么您应该将其作为查询的一部分进行。尝试这样做:

IObservable<UdpReceiveResult> receiveStream =
    from devicePortMapping in _deviceTypeMapper.GetDeviceTypes().ToObservable()
    from stream in
        Observable
            .Using(
                () => new UdpClient(devicePortMapping.Port),
                client =>
                    Observable
                        .FromAsync(() => client.ReceiveAsync())
                        .Repeat())
    from message in Observable.Start(() =>
    {
        var message = new IncomingMessage(r.RemoteEndPoint, r.Buffer);
        var deviceTypeMap = _deviceTypeMapper.GetDeviceType(message);
        message.DeviceType = deviceTypeMap?.DeviceType ?? DeviceTypeEnum.UnIdentified;
    })
    select message;

如果可以,最好使用第二个查询。