使用 RxJava2 从 BLE 通知中缓冲图像

Buffer image from BLE notification with RxJava2

我想缓冲来自 BLE 通知的图像。我在每个通知中得到 20 个字节长的数组。 (通知之间有 15-20 毫秒的停顿。)它们按顺序出现,我可以识别图像的第一个和最后一个包。图片陆续上来。

我的问题是我不知道如何在 RxJava2 中实现它。正如您在下面看到的,我有一个非常糟糕的解决方案。可以用,但是有时图片发送中断,所以我没有收到最后一个包,新旧图片混在一起。

我想缓冲依赖于字节数组的值,从第一个包到最后一个包。

Observable<byte[]> obs = notificationObservable()
.map(notification -> {
    return notification.getBytes();
});

disposables.add(notificationObservable()
.map(notification -> {
    return notification.getBytes();
})
.buffer(obs.delay(10, TimeUnit.MILLISECONDS)
    .filter(bytes -> {
        return bytes[2] < 0; //last package
    })
)
.map(bytes -> new MyImage(bytes))
.subscribe(
    imageSubject::onNext,
    imageSubject::onError,
    imageSubject::onComplete));

在这种情况下,我通常做的是准备一个处理零件的构建器class。

参见以下示例:

class ImageBuilder {

    private final List<byte[]> parts = new ArrayList<>();

    public ImageBuilder append(byte[] part) {
        if (!isReady()) {
            parts.add(part);
            return this;
        } else {
            ImageBuilder builder = new ImageBuilder();
            return builder.append(part);
        }
    }

    public boolean isReady() {
        byte[] part = parts.get(parts.size() - 1);
        return part[2] < 0;
    }

    public MyImage build() {
        // take the parts and build your image
        return MyImage.buildFrom(parts);
    }

}

那么你可以这样使用它:

Observable<byte[]> bytePartObservable = ...;

Observable<MyImage> images = bytePartObservable
    .scan(new ImageBuilder(), ImageBuilder::append)
    .filter(ImageBuilder::isReady)
    .map(ImageBuilder::build);

images.subscribe( ... )