RxJava 2 - 观察列表中的添加和删除

RxJava 2 - observing additions and removals from a list

我对 RxJava 2 有点陌生。我可以订阅从第 3 方库给我的或从 ranges/lists 创建的可观察对象。

现在我想提供我自己的 RxJava 2 flowables。这是一些上下文:

我有一个需要发现蓝牙设备的应用程序。某些服务在检测到扫描记录时向我提供扫描记录(设备发送有关其状态、RSSI 等的更新)。

在 DeviceRegistry 中,我维护了一个设备列表(mac 地址、平均 RSSI 等),我想在添加设备时从我的 DeviceRegistry 外部观察(新 MAC地址检测到)或删除(设备关闭)。

我想我应该提供类似的东西:

class DeviceRegistry {

    // All devices currently active
    val devices: MutableMap<String, Device> 

    // Gives a way to subscribe to newly detected devices (to update the UI for instance)
    fun newlyDetectedDevices(): Flowable<Device> 

    // Gives a way to subscribe to devices which get turned off (to update the UI for instance)
    fun newlyDetectedDevices(): Flowable<Device>

    fun onNewScanRecord(scanRecord) { 
        // Check if the device is new, if it is, emit something on the 
        // newlyDetectedDevices flowable
    }
}

我不明白如何从什么创建可流动的。然后还有如何在其上发出新事件,以便订阅者在那里获得事件。

我认为您的案例不太适合 Flowable 用法。查看 Observable 和 Flowable 之间的差异 here。根据官方文档,你应该使用 Flowable,当:

Dealing with 10k+ of elements that are generated in some fashion somewhere and thus the chain can tell the source to limit the amount it generates.

Reading (parsing) files from disk is inherently blocking and pull-based which works well with backpressure as you control, for example, how many lines you read from this for a specified request amount).

Reading from a database through JDBC is also blocking and pull-based and is controlled by you by calling ResultSet.next() for likely each downstream request.

Network (Streaming) IO where either the network helps or the protocol used supports requesting some logical amount.

Many blocking and/or pull-based data sources which may eventually get a non-blocking reactive API/driver in the future.

在您的情况下,最好使用 Observable 而不是 Flowable。顺便说一句,您可以轻松地从 Subjects 创建它们。查看它们 here