使用 Rx 扫描运算符将流减少为值

Reduce stream into value with Rx scan operator

我有来自实时数据源的位置数据流,我正在尝试使用 rx scan 运算符来减少数据流并计算发出新位置变化时行进的距离。

发出的值采用这种格式

{
  lat: 3.4646343,
  lng: 6.4343234,
  speed: 1.3353,
  heading: 279
}

这是处理器

function distanceBetweenPoints (point, point2, unit: string = 'M') {
    const radlat1 = Math.PI * (point.lat / 180);
    const radlat2 = Math.PI * (point2.lat / 180);
    const theta = point.lng - point2.lng;
    const radtheta = Math.PI * theta / 180;
    let dist = Math.sin(radlat1) * Math.sin(radlat2) + Math.cos(radlat1) * Math.cos(radlat2) * Math.cos(radtheta);
    dist = Math.acos(dist);
    dist = dist * 180 / Math.PI;
    dist = dist * 60 * 1.1515;

    if (unit == 'KM') { dist = dist * 1.609344; }
    if (unit == 'M') { dist = dist * 1.609344 * 1000; }
    return dist;
}

const location$: Subject<LocationUpdate> = new Subject();

location$
    .pipe(
        map(location => {
            return { lat: location.lat, lng: location.lng };
        }),
        scan((x, y) => {
            return distanceBetweenPoints(x, y);
        }),
        takeUntil(locationUpdateEnd$)
    )
    .subscribe(distance => console.info('distance', distance));

我很确定我使用的 scan 运算符有误,因为我在输出中看到 Nan 和一个数值。任何帮助将不胜感激。

使用 pairwise() 而不是 scan()

pairwise() 将发出值对:[last_value, current_value],在您的情况下,[last_location, current_location].

然后 map() 那就是你的距离函数。

我使用了 pairwise 运算符

location$
        .pipe(
            map(coords => {
                return { lat: coords.lat, lng: coords.lng };
            }),
            pairwise(),
            scan((acc, coords) => {
                return acc + distanceBetweenPoints(coords[0], coords[1]);
            }, 0),
            takeUntil(endtrip$)
        )
        .subscribe(console.log);