将 React useEffect 钩子与 rxjs mergeMap 运算符一起使用
Using React useEffect hook with rxjs mergeMap operator
我正在尝试实现一个必须使用 inner observables 的数据流,其中我使用了来自 mergeMap
、concatMap
等的数据流。
例如:
const output$$ = input$$.pipe(
mergeMap(str => of(str).pipe(delay(10))),
share()
);
output$$.subscribe(console.log);
这在登录控制台时工作正常。
但是当我尝试像下面那样在 React 中使用它时,利用 useEffect
和 useState
钩子来更新一些文本:
function App() {
const input$ = new Subject<string>();
const input$$ = input$.pipe(share());
const output$$ = input$$.pipe(
mergeMap(str => of(str).pipe(delay(10))),
share()
);
output$$.subscribe(console.log);
// This works
const [input, setInput] = useState("");
const [output, setOutput] = useState("");
useEffect(() => {
const subscription = input$$.subscribe(setInput);
return () => {
subscription.unsubscribe();
};
}, [input$$]);
useEffect(() => {
const subscription = output$$.subscribe(setOutput);
// This doesn't
return () => {
subscription.unsubscribe();
};
}, [output$$]);
return (
<div className="App">
<input
onChange={event => input$.next(event.target.value)}
value={input}
/>
<p>{output}</p>
</div>
);
}
它开始行动 weird/unpredictable(例如:有时文本会在键入过程中更新,有时根本不会更新)。
我注意到的事情:
- 如果内部可观察对象完成 immediately/is 一个承诺
立即解决,它工作正常。
- 如果我们打印到控制台而不是
useEffect
,它工作正常。
我相信这与 useEffect
的内部工作原理以及它如何捕获和注意到外部变化有关,但无法使其正常工作。
任何帮助深表感谢。
案例的最小复制:
https://codesandbox.io/s/hooks-and-observables-1-7ygd8
我不太确定你想要达到什么目的,但我发现了一些问题,希望下面的代码可以解决这些问题:
function App() {
// Create these observables only once.
const [input$] = useState(() => new Subject<string>());
const [input$$] = useState(() => input$.pipe(share()));
const [output$$] = useState(() => input$$.pipe(
mergeMap(str => of(str).pipe(delay(10))),
share()
));
const [input, setInput] = useState("");
const [output, setOutput] = useState("");
// Create the subscription to input$$ on component mount, not on every render.
useEffect(() => {
const subscription = input$$.subscribe(setInput);
return () => {
subscription.unsubscribe();
};
}, []);
// Create the subscription to output$$ on component mount, not on every render.
useEffect(() => {
const subscription = output$$.subscribe(setOutput);
return () => {
subscription.unsubscribe();
};
}, []);
return (
<div className="App">
<input
onChange={event => input$.next(event.target.value)}
value={input}
/>
<p>{output}</p>
</div>
);
}
我有一个类似的任务,但目标是对输入测试进行管道和去抖并执行 ajax 调用。
简单的答案是你应该在反应钩子中用箭头函数初始化 RxJS 主题 'useState' 以便每次初始化一次主题。
然后你应该使用 Effect 和空数组 [] 以便在组件初始化时创建一个管道。
import React, { useEffect, useState } from "react";
import { ajax } from "rxjs/ajax";
import { debounceTime, delay, takeUntil } from "rxjs/operators";
import { Subject } from "rxjs/internal/Subject";
const App = () => {
const [items, setItems] = useState([]);
const [loading, setLoading] = useState(true);
const [filterChangedSubject] = useState(() => {
// Arrow function is used to init Singleton Subject. (in a scope of a current component)
return new Subject<string>();
});
useEffect(() => {
// Effect that will be initialized once on a react component init.
// Define your pipe here.
const subscription = filterChangedSubject
.pipe(debounceTime(200))
.subscribe((filter) => {
if (!filter) {
setLoading(false);
setItems([]);
return;
}
ajax(`https://swapi.dev/api/people?search=${filter}`)
.pipe(
// current running ajax is canceled on filter change.
takeUntil(filterChangedSubject)
)
.subscribe(
(results) => {
// Set items will cause render:
setItems(results.response.results);
},
() => {
setLoading(false);
},
() => {
setLoading(false);
}
);
});
return () => {
// On Component destroy. notify takeUntil to unsubscribe from current running ajax request
filterChangedSubject.next("");
// unsubscribe filter change listener
subscription.unsubscribe();
};
}, []);
const onFilterChange = (e) => {
// Notify subject about the filter change
filterChangedSubject.next(e.target.value);
};
return (
<div>
Cards
{loading && <div>Loading...</div>}
<input onChange={onFilterChange}></input>
{items && items.map((item, index) => <div key={index}>{item.name}</div>)}
</div>
);
};
export default App;
我正在尝试实现一个必须使用 inner observables 的数据流,其中我使用了来自 mergeMap
、concatMap
等的数据流。
例如:
const output$$ = input$$.pipe(
mergeMap(str => of(str).pipe(delay(10))),
share()
);
output$$.subscribe(console.log);
这在登录控制台时工作正常。
但是当我尝试像下面那样在 React 中使用它时,利用 useEffect
和 useState
钩子来更新一些文本:
function App() {
const input$ = new Subject<string>();
const input$$ = input$.pipe(share());
const output$$ = input$$.pipe(
mergeMap(str => of(str).pipe(delay(10))),
share()
);
output$$.subscribe(console.log);
// This works
const [input, setInput] = useState("");
const [output, setOutput] = useState("");
useEffect(() => {
const subscription = input$$.subscribe(setInput);
return () => {
subscription.unsubscribe();
};
}, [input$$]);
useEffect(() => {
const subscription = output$$.subscribe(setOutput);
// This doesn't
return () => {
subscription.unsubscribe();
};
}, [output$$]);
return (
<div className="App">
<input
onChange={event => input$.next(event.target.value)}
value={input}
/>
<p>{output}</p>
</div>
);
}
它开始行动 weird/unpredictable(例如:有时文本会在键入过程中更新,有时根本不会更新)。
我注意到的事情:
- 如果内部可观察对象完成 immediately/is 一个承诺 立即解决,它工作正常。
- 如果我们打印到控制台而不是
useEffect
,它工作正常。
我相信这与 useEffect
的内部工作原理以及它如何捕获和注意到外部变化有关,但无法使其正常工作。
任何帮助深表感谢。
案例的最小复制:
https://codesandbox.io/s/hooks-and-observables-1-7ygd8
我不太确定你想要达到什么目的,但我发现了一些问题,希望下面的代码可以解决这些问题:
function App() {
// Create these observables only once.
const [input$] = useState(() => new Subject<string>());
const [input$$] = useState(() => input$.pipe(share()));
const [output$$] = useState(() => input$$.pipe(
mergeMap(str => of(str).pipe(delay(10))),
share()
));
const [input, setInput] = useState("");
const [output, setOutput] = useState("");
// Create the subscription to input$$ on component mount, not on every render.
useEffect(() => {
const subscription = input$$.subscribe(setInput);
return () => {
subscription.unsubscribe();
};
}, []);
// Create the subscription to output$$ on component mount, not on every render.
useEffect(() => {
const subscription = output$$.subscribe(setOutput);
return () => {
subscription.unsubscribe();
};
}, []);
return (
<div className="App">
<input
onChange={event => input$.next(event.target.value)}
value={input}
/>
<p>{output}</p>
</div>
);
}
我有一个类似的任务,但目标是对输入测试进行管道和去抖并执行 ajax 调用。 简单的答案是你应该在反应钩子中用箭头函数初始化 RxJS 主题 'useState' 以便每次初始化一次主题。
然后你应该使用 Effect 和空数组 [] 以便在组件初始化时创建一个管道。
import React, { useEffect, useState } from "react";
import { ajax } from "rxjs/ajax";
import { debounceTime, delay, takeUntil } from "rxjs/operators";
import { Subject } from "rxjs/internal/Subject";
const App = () => {
const [items, setItems] = useState([]);
const [loading, setLoading] = useState(true);
const [filterChangedSubject] = useState(() => {
// Arrow function is used to init Singleton Subject. (in a scope of a current component)
return new Subject<string>();
});
useEffect(() => {
// Effect that will be initialized once on a react component init.
// Define your pipe here.
const subscription = filterChangedSubject
.pipe(debounceTime(200))
.subscribe((filter) => {
if (!filter) {
setLoading(false);
setItems([]);
return;
}
ajax(`https://swapi.dev/api/people?search=${filter}`)
.pipe(
// current running ajax is canceled on filter change.
takeUntil(filterChangedSubject)
)
.subscribe(
(results) => {
// Set items will cause render:
setItems(results.response.results);
},
() => {
setLoading(false);
},
() => {
setLoading(false);
}
);
});
return () => {
// On Component destroy. notify takeUntil to unsubscribe from current running ajax request
filterChangedSubject.next("");
// unsubscribe filter change listener
subscription.unsubscribe();
};
}, []);
const onFilterChange = (e) => {
// Notify subject about the filter change
filterChangedSubject.next(e.target.value);
};
return (
<div>
Cards
{loading && <div>Loading...</div>}
<input onChange={onFilterChange}></input>
{items && items.map((item, index) => <div key={index}>{item.name}</div>)}
</div>
);
};
export default App;