将 React useEffect 钩子与 rxjs mergeMap 运算符一起使用

Using React useEffect hook with rxjs mergeMap operator

我正在尝试实现一个必须使用 inner observables 的数据流,其中我使用了来自 mergeMapconcatMap 等的数据流。

例如:

const output$$ = input$$.pipe(
    mergeMap(str => of(str).pipe(delay(10))),
    share()
  );

  output$$.subscribe(console.log);

这在登录控制台时工作正常。 但是当我尝试像下面那样在 React 中使用它时,利用 useEffectuseState 钩子来更新一些文本:

function App() {
  const input$ = new Subject<string>();
  const input$$ = input$.pipe(share());
  const output$$ = input$$.pipe(
    mergeMap(str => of(str).pipe(delay(10))),
    share()
  );

  output$$.subscribe(console.log);
  // This works

  const [input, setInput] = useState("");
  const [output, setOutput] = useState("");

  useEffect(() => {
    const subscription = input$$.subscribe(setInput);

    return () => {
      subscription.unsubscribe();
    };
  }, [input$$]);

  useEffect(() => {
    const subscription = output$$.subscribe(setOutput);
    // This doesn't

    return () => {
      subscription.unsubscribe();
    };
  }, [output$$]);

  return (
    <div className="App">
      <input
        onChange={event => input$.next(event.target.value)}
        value={input}
      />
      <p>{output}</p>
    </div>
  );
}

它开始行动 weird/unpredictable(例如:有时文本会在键入过程中更新,有时根本不会更新)。

我注意到的事情:

我相信这与 useEffect 的内部工作原理以及它如何捕获和注意到外部变化有关,但无法使其正常工作。
任何帮助深表感谢。

案例的最小复制:
https://codesandbox.io/s/hooks-and-observables-1-7ygd8

我不太确定你想要达到什么目的,但我发现了一些问题,希望下面的代码可以解决这些问题:

function App() {
    // Create these observables only once.
    const [input$] = useState(() => new Subject<string>());
    const [input$$] = useState(() => input$.pipe(share()));
    const [output$$] = useState(() => input$$.pipe(
        mergeMap(str => of(str).pipe(delay(10))),
        share()
    ));

    const [input, setInput] = useState("");
    const [output, setOutput] = useState("");

    // Create the subscription to input$$ on component mount, not on every render.
    useEffect(() => {
        const subscription = input$$.subscribe(setInput);

        return () => {
            subscription.unsubscribe();
        };
    }, []);

    // Create the subscription to output$$ on component mount, not on every render.
    useEffect(() => {
        const subscription = output$$.subscribe(setOutput);

        return () => {
            subscription.unsubscribe();
        };
    }, []);

    return (
        <div className="App">
            <input
                onChange={event => input$.next(event.target.value)}
                value={input}
            />
            <p>{output}</p>
        </div>
    );
}

我有一个类似的任务,但目标是对输入测试进行管道和去抖并执行 ajax 调用。 简单的答案是你应该在反应钩子中用箭头函数初始化 RxJS 主题 'useState' 以便每次初始化一次主题。

然后你应该使用 Effect 和空数组 [] 以便在组件初始化时创建一个管道。

import React, { useEffect, useState } from "react";
import { ajax } from "rxjs/ajax";
import { debounceTime, delay, takeUntil } from "rxjs/operators";
import { Subject } from "rxjs/internal/Subject";

const App = () => {
  const [items, setItems] = useState([]);
  const [loading, setLoading] = useState(true);
  const [filterChangedSubject] = useState(() => {
    // Arrow function is used to init Singleton Subject. (in a scope of a current component)
    return new Subject<string>();
  });

  useEffect(() => {
    // Effect that will be initialized once on a react component init. 
    // Define your pipe here.
    const subscription = filterChangedSubject
      .pipe(debounceTime(200))
      .subscribe((filter) => {
        if (!filter) {
          setLoading(false);
          setItems([]);
          return;
        }
        ajax(`https://swapi.dev/api/people?search=${filter}`)
          .pipe(
            // current running ajax is canceled on filter change.
            takeUntil(filterChangedSubject)
          )
          .subscribe(
            (results) => {
              // Set items will cause render:
              setItems(results.response.results);
            },
            () => {
              setLoading(false);
            },
            () => {
              setLoading(false);
            }
          );
      });

    return () => {
      // On Component destroy. notify takeUntil to unsubscribe from current running ajax request
      filterChangedSubject.next("");
      // unsubscribe filter change listener
      subscription.unsubscribe();
    };
  }, []);

  const onFilterChange = (e) => {
    // Notify subject about the filter change
    filterChangedSubject.next(e.target.value);
  };
  return (
    <div>
      Cards
      {loading && <div>Loading...</div>}
      <input onChange={onFilterChange}></input>
      {items && items.map((item, index) => <div key={index}>{item.name}</div>)}
    </div>
  );
};

export default App;