如何强制 Observable 在闲置一段时间后始终发出

How to force an observable to always emit after some time idle

我有一个应扫描特定蓝牙设备的应用程序。我使用的扫描功能不断扫描所有设备,甚至应该报告重复项(同一设备可以扫描多次)。为了组织扫描的数据,我创建了一个 observable,它发出列表中找到的所有设备并删除在特定时间内未扫描的设备:

public findMyDevice() {
    let my_devices: any[] = [];

    return this.ble.startScan().pipe(
        filter(
            (device) => this.deviceIsMyDevice(device)
        ),
        map(
            (device) => {
                device = this.treatData(device);
                my_devices =  my_devices.filter((dev) => dev.id != device.id); // in case it is already in the list
                let now = new Date();
                device.time_scan = now;
                my_devices.push(device);
                my_devices.sort((a, b) => a.sn - b.sn);
                my_devices = this.removeOldDevices(my_devices); // remove scans older than 5 seconds
                return my_devices;
            }
        )
    )
}

如果附近至少有一台设备开启,则工作正常。

问题是:如果所有设备都关闭,我的扫描函数永远不会发出并且永远不会调用 removeOldDevices... 这样,一些旧设备就会保留在列表中...

我试图解决它添加到我的管道:

timeoutWith(3000,
    of().pipe(
        map(
            () => {my_devices = this.removeOldDevices(my_devices); return my_devices}
        )
    )
)

但是好像超时后订阅就结束了。解决这个问题的最佳方法是什么?在没有完成订阅的情况下,如何在空闲时间后强制发出 observable?有没有其他 rxjs 运算符可以帮助解决这种情况?

听起来你想要的是间歇流。

interval(3000),创建一个每 3 秒发出一个数字的流。

const intervalUpdate$ = interval(3000).pipe(
    map(_ => my_devices = this.removeOldDevices(my_devices))
)

这将每 3 秒删除一次旧设备。旁注:请记住,如果您从不退订,这将永远存在。然后你可以(例如)将它与合并一起使用:

const scanUpdate$ = this.ble.startScan().pipe(
    filter(
    [... more code here]
);
return merge(intervalUpdate$, scanUpdate$);
    

如果您想变得更复杂,可以使用 switchMap 将间隔与您的流混合,以便每次发射设备时启动一个新计时器:

// Make a fake object so we can trigger switchMap immediately and get our
// interval/timer stream engaged. There's probably a better way to do this.
const falseStart = {isFakeDevice: true};
// Compose the return stream
return this.ble.startScan().pipe(
    filter(device => this.deviceIsMyDevice(device)),
    // startWith() as a hack to make sure out interval stream gets 
    // switched into right away
    startWith(falseStart),
    // switchMap will create a new stream that emits the given value
    // (normally a device, but will be 'falseStart' to start) and then  
    // emits null every 3 seconds. That logic is restarted every time a  
    // new device arrives, effectively resetting the interval stream
    switchMap(device => 
        interval(3000).pipe(
            mapTo(null),
            startWith(device)
        )
    ),
    // Filtering out our hacked falseStart object; we don't want it
    filter(device => !device?.isFakeDevice),
    map(device => {
        // Your code here, remembering that device might be null if it
        // was triggered by the interval stream
        [...]
        return this.removeOldDevices(my_devices);
    })
);

更多 'sophisticated' 方法的缺点是传递的第一个值并不是真正可取的。我可能会将 falseStart 替换为 'null' 而不会过滤掉它。当您订阅此流时,您会立即得到响应,但这通常不会对性能造成太大影响,甚至可能是可取的。

“复杂”方法的好处是,只要您仍在寻找设备,interval(3000) 可能永远不需要发出值。这可能会为您节省很多性能,具体取决于 this.removeOldDevices() 的成本。