使用 RxJava2 从 BLE 通知中缓冲图像
Buffer image from BLE notification with RxJava2
我想缓冲来自 BLE 通知的图像。我在每个通知中得到 20 个字节长的数组。 (通知之间有 15-20 毫秒的停顿。)它们按顺序出现,我可以识别图像的第一个和最后一个包。图片陆续上来。
我的问题是我不知道如何在 RxJava2 中实现它。正如您在下面看到的,我有一个非常糟糕的解决方案。可以用,但是有时图片发送中断,所以我没有收到最后一个包,新旧图片混在一起。
我想缓冲依赖于字节数组的值,从第一个包到最后一个包。
Observable<byte[]> obs = notificationObservable()
.map(notification -> {
return notification.getBytes();
});
disposables.add(notificationObservable()
.map(notification -> {
return notification.getBytes();
})
.buffer(obs.delay(10, TimeUnit.MILLISECONDS)
.filter(bytes -> {
return bytes[2] < 0; //last package
})
)
.map(bytes -> new MyImage(bytes))
.subscribe(
imageSubject::onNext,
imageSubject::onError,
imageSubject::onComplete));
在这种情况下,我通常做的是准备一个处理零件的构建器class。
参见以下示例:
class ImageBuilder {
private final List<byte[]> parts = new ArrayList<>();
public ImageBuilder append(byte[] part) {
if (!isReady()) {
parts.add(part);
return this;
} else {
ImageBuilder builder = new ImageBuilder();
return builder.append(part);
}
}
public boolean isReady() {
byte[] part = parts.get(parts.size() - 1);
return part[2] < 0;
}
public MyImage build() {
// take the parts and build your image
return MyImage.buildFrom(parts);
}
}
那么你可以这样使用它:
Observable<byte[]> bytePartObservable = ...;
Observable<MyImage> images = bytePartObservable
.scan(new ImageBuilder(), ImageBuilder::append)
.filter(ImageBuilder::isReady)
.map(ImageBuilder::build);
images.subscribe( ... )
我想缓冲来自 BLE 通知的图像。我在每个通知中得到 20 个字节长的数组。 (通知之间有 15-20 毫秒的停顿。)它们按顺序出现,我可以识别图像的第一个和最后一个包。图片陆续上来。
我的问题是我不知道如何在 RxJava2 中实现它。正如您在下面看到的,我有一个非常糟糕的解决方案。可以用,但是有时图片发送中断,所以我没有收到最后一个包,新旧图片混在一起。
我想缓冲依赖于字节数组的值,从第一个包到最后一个包。
Observable<byte[]> obs = notificationObservable()
.map(notification -> {
return notification.getBytes();
});
disposables.add(notificationObservable()
.map(notification -> {
return notification.getBytes();
})
.buffer(obs.delay(10, TimeUnit.MILLISECONDS)
.filter(bytes -> {
return bytes[2] < 0; //last package
})
)
.map(bytes -> new MyImage(bytes))
.subscribe(
imageSubject::onNext,
imageSubject::onError,
imageSubject::onComplete));
在这种情况下,我通常做的是准备一个处理零件的构建器class。
参见以下示例:
class ImageBuilder {
private final List<byte[]> parts = new ArrayList<>();
public ImageBuilder append(byte[] part) {
if (!isReady()) {
parts.add(part);
return this;
} else {
ImageBuilder builder = new ImageBuilder();
return builder.append(part);
}
}
public boolean isReady() {
byte[] part = parts.get(parts.size() - 1);
return part[2] < 0;
}
public MyImage build() {
// take the parts and build your image
return MyImage.buildFrom(parts);
}
}
那么你可以这样使用它:
Observable<byte[]> bytePartObservable = ...;
Observable<MyImage> images = bytePartObservable
.scan(new ImageBuilder(), ImageBuilder::append)
.filter(ImageBuilder::isReady)
.map(ImageBuilder::build);
images.subscribe( ... )