使用 cycle-http 发出任意数量的顺序依赖请求
Making an arbitrary number of sequential, dependent requests with `cycle-http`
是否有使用 cycle-http
发出任意数量的顺序依赖 http 请求的示例?
我想从 API 中获取每个页面,其中下一个请求只能使用当前页面中的数据进行。
我尝试改编使用 Observable.merge() 的 ,但我不确定如何将其连接到 cycle-http
个源和接收器。
参考文献
如果能提供一些代码示例就更好了。然而基本逻辑可能是这样的:
- 将响应流映射到请求流
- 使用初始请求启动请求流
代码应该是这样的:
function main (sources){
const initialRequest = {
url: 'http://www.google.com'
};
const request$ = sources.HTTP
.filter(response$ => /*FILTER LOGIC GOES HERE */)
.switch()//or you can use flatMap
.map(response =>/* MAP RESPONSE TO A NEW REQUEST */)
.startWith(initialRequest);
return {
HTTP: request$
};
}
所以这可能过于复杂了,我应该把它废弃到
正确尝试
但这是我想出的...
用法
export default function app({HTTP}) {
const {
allPagesRequest$: staffPagesReq$,
latestData$: staff$,
} = getAllPages({url: '/staff', HTTP});
// staff$ is converted to vdom...
return /* sinks */ {
DOM: staffPagesReq$,
HTTP: staffVdom$,
}
}
实施
const fetchNthPage = (optsIn) => {
const opts = merge(
{
url: '',
page: 0,
HTTP: undefined,
}, optsIn
);
const u = new URI(opts.url)
.setQuery({'_page': opts.page.toString()});
const pageNResponse$ = opts.HTTP
.filter(
res$ => res$.request.url === urlForEndpoint(u)
)
.flatMap(
res$ => res$.catch(
err => Observable.of(
{
body: {'error in response:': err.toString()}
}
)
)
)
.map(res => res.body)
.take(1)
.shareReplay(1);
return Observable.of({
pageNRequest$: Observable.of(basicRequestObject(u)),
pageNResponse$: pageNResponse$,
opts: opts
});
};
const encapsulateAs = typeName => data => {
return {type: typeName, data}
};
const fetchAllPagesIndividually = (optsIn) => {
const opts = merge(
{
url: '',
page: 0,
HTTP: undefined,
},
optsIn
);
return Observable.defer(
() => fetchNthPage(opts)
.flatMap(x => {
const mergedItems$ = Observable
.merge(
x.pageNRequest$.map(encapsulateAs('request')),
x.pageNResponse$.map(encapsulateAs('response'))
);
const optsForNextPage = merge(opts, {
page: opts.page + 1
});
const next$ = Observable
.never() // `next$` shouldn't end when `pageNResponse$` does
.merge(x.pageNResponse$)
.shareReplay(1)
.takeWhile(res => {
//noinspection UnnecessaryLocalVariableJS
let isFullPage = path(['response', 'length'], res) === apiPageSize;
return isFullPage;
})
.flatMap(() => {
return fetchAllPagesIndividually(optsForNextPage)
});
//noinspection UnnecessaryLocalVariableJS
const concattedItem$ = Observable
.concat(
mergedItems$,
next$
);
return concattedItem$
})
.shareReplay(1)
);
};
const concatPages = (acc, currentVal, index, source) => acc.concat(currentVal);
const typeIs = typeStr => compose(
equals(typeStr),
prop('type')
);
const typeNotIn = typesArray => compose(
not,
unary(flip(contains)(typesArray)),
prop('type')
);
const getAllPages = (optsIn) => {
const f$ = fetchAllPagesIndividually(optsIn)
.shareReplay(1);
const allPagesRequest$ = f$
.filter(typeIs('request'))
.map(prop('data'));
const allPagesResponse$ = f$
.filter(typeIs('response'))
.map(prop('data'));
const theRest$ = f$
.filter(typeNotIn(['request', 'response', 'data']));
const latestData$ = allPagesResponse$
.map(prop('response'))
.scan(concatPages);
return {
allPagesRequest$,
allPagesResponse$,
latestData$,
theRest$,
}
};
compose()
、not()
、merge()
、unary()
等来自Ramda.
这是使用 Cycle.js 和 @cycle/fetch
驱动程序处理任意数量的顺序依赖请求的另一种方式。
(使用GitHub用户API。users查询returns每页30个用户,since
URL参数为用户id号和在 next 用户 ID 处开始查询。)
首先是 main
函数的主要部分,带有注释:
const listResponse$ = sources.FETCH // response returned from FETCH driver
.mergeAll()
.flatMap(res => res.json())
.scan(
((userstotals, users) =>
[
userstotals[0] + 1, // page count
users[29] && users[29].id, // last id on full page
userstotals[2].concat(users) // collect all users
]
),
[0, undefined, []] // default accumulator
)
.share(); // allows stream split
// Branch #1 - cycle again for more pages
const listRequest$ = listResponse$
.filter(users =>
0 < users[1] && // full page id exists
maxpages > users[0]; // less than maxpages
)
.startWith('initial')
.map(users =>
`https:\/\/api.github.com/users?since=${
(!isNaN(parseInt(users[1], 10)) && users[1]) || // last id full page
idstart // default id start
}`
);
// Branch #2 - display
const dom$ = listResponse$
.map(userstotals => div(JSON.stringify(userstotals[2])));
(这是一个更新的答案。我意识到 scan
可以合并为一个。)
说明:首先从sources
属性FETCH
中拉出响应,将其拉平并拉出JSON,然后scan
来计算如何到目前为止已经查询了很多页面。查询的页数稍后与maxpages
进行比较,以免超过预定数量。接下来获取完整页面的最后一个 id
(如果存在),最后一个 concat
当前用户页面,其中包含迄今为止积累的用户页面集合。在累积响应信息后 share
流,因此它可以分成两个分支。
第一个分支用于通过FETCH
驱动程序重新循环查询以查询更多页面。但首先 filter
检查最后一页和查询的页数。如果 id 不是数字,则已到达最后一页。如果已经到达最后一页并且因此没有更多页面可查询,请不要继续。如果查询的页数超过maxpages
.
的值也不要继续
第二个分支简单地进入累积响应以获得完整的用户列表,然后 JSON.stringify
s 对象并将其转换为虚拟 dom 对象(div
方法) 发送到 DOM 驱动程序进行显示。
这是完整的脚本:
import Cycle from '@cycle/rx-run';
import {div, makeDOMDriver} from '@cycle/dom';
import {makeFetchDriver} from '@cycle/fetch';
function main(sources) { // provides properties DOM and FETCH (evt. streams)
const acctok = ''; // put your token here, if necessary
const idstart = 19473200; // where do you want to start?
const maxpages = 10;
const listResponse$ = sources.FETCH
.mergeAll()
.flatMap(res => res.json())
.scan(
((userstotals, users) =>
[
userstotals[0] + 1, // page count
users[29] && users[29].id, // last id on full page
userstotals[2].concat(users) // collect all users
]
),
[0, undefined, []]
)
.share();
const listRequest$ = listResponse$
.filter(function (users) {
return 0 < users[1] && maxpages > users[0];
})
.startWith('initial')
.map(users =>
`https:\/\/api.github.com/users?since=${
(!isNaN(parseInt(users[1], 10)) && users[1]) || // last id full page
idstart // default id start
}` //&access_token=${acctok}`
);
const dom$ = listResponse$
.map(userstotals => div(JSON.stringify(userstotals[2])));
return {
DOM: dom$,
FETCH: listRequest$
};
}
Cycle.run(main, {
DOM: makeDOMDriver('#main-container'),
FETCH: makeFetchDriver()
});
(我的第一个答案,留给后人。注意两个 scan
。)
const listResponse$ = sources.FETCH
.mergeAll()
.flatMap(res => res.json())
.scan(((userscount, users) => // <-- scan #1
[userscount[0] + 1, users]), [0, []]
)
.share();
const listRequest$ = listResponse$
.filter(function (users) {
return users[1][29] && users[1][29].id &&
maxpages > users[0];
})
.startWith('initial')
.map(users =>
`https://api.github.com/users?since=${
(users[1][users[1].length-1] && users[1][users[1].length-1].id) ||
idstart
}`
);
const dom$ = listResponse$
.scan(function (usersall, users) { // <-- scan #2
usersall.push(users);
return usersall;
}, [])
.map(res => div(JSON.stringify(res)));
通过 scan
ing 一次,在前面,然后我需要获取完整页面的最后一个 ID(如果存在),并将其存储在累加器中。
是否有使用 cycle-http
发出任意数量的顺序依赖 http 请求的示例?
我想从 API 中获取每个页面,其中下一个请求只能使用当前页面中的数据进行。
我尝试改编使用 Observable.merge() 的 cycle-http
个源和接收器。
参考文献
如果能提供一些代码示例就更好了。然而基本逻辑可能是这样的:
- 将响应流映射到请求流
- 使用初始请求启动请求流
代码应该是这样的:
function main (sources){
const initialRequest = {
url: 'http://www.google.com'
};
const request$ = sources.HTTP
.filter(response$ => /*FILTER LOGIC GOES HERE */)
.switch()//or you can use flatMap
.map(response =>/* MAP RESPONSE TO A NEW REQUEST */)
.startWith(initialRequest);
return {
HTTP: request$
};
}
所以这可能过于复杂了,我应该把它废弃到
正确尝试
用法
export default function app({HTTP}) {
const {
allPagesRequest$: staffPagesReq$,
latestData$: staff$,
} = getAllPages({url: '/staff', HTTP});
// staff$ is converted to vdom...
return /* sinks */ {
DOM: staffPagesReq$,
HTTP: staffVdom$,
}
}
实施
const fetchNthPage = (optsIn) => {
const opts = merge(
{
url: '',
page: 0,
HTTP: undefined,
}, optsIn
);
const u = new URI(opts.url)
.setQuery({'_page': opts.page.toString()});
const pageNResponse$ = opts.HTTP
.filter(
res$ => res$.request.url === urlForEndpoint(u)
)
.flatMap(
res$ => res$.catch(
err => Observable.of(
{
body: {'error in response:': err.toString()}
}
)
)
)
.map(res => res.body)
.take(1)
.shareReplay(1);
return Observable.of({
pageNRequest$: Observable.of(basicRequestObject(u)),
pageNResponse$: pageNResponse$,
opts: opts
});
};
const encapsulateAs = typeName => data => {
return {type: typeName, data}
};
const fetchAllPagesIndividually = (optsIn) => {
const opts = merge(
{
url: '',
page: 0,
HTTP: undefined,
},
optsIn
);
return Observable.defer(
() => fetchNthPage(opts)
.flatMap(x => {
const mergedItems$ = Observable
.merge(
x.pageNRequest$.map(encapsulateAs('request')),
x.pageNResponse$.map(encapsulateAs('response'))
);
const optsForNextPage = merge(opts, {
page: opts.page + 1
});
const next$ = Observable
.never() // `next$` shouldn't end when `pageNResponse$` does
.merge(x.pageNResponse$)
.shareReplay(1)
.takeWhile(res => {
//noinspection UnnecessaryLocalVariableJS
let isFullPage = path(['response', 'length'], res) === apiPageSize;
return isFullPage;
})
.flatMap(() => {
return fetchAllPagesIndividually(optsForNextPage)
});
//noinspection UnnecessaryLocalVariableJS
const concattedItem$ = Observable
.concat(
mergedItems$,
next$
);
return concattedItem$
})
.shareReplay(1)
);
};
const concatPages = (acc, currentVal, index, source) => acc.concat(currentVal);
const typeIs = typeStr => compose(
equals(typeStr),
prop('type')
);
const typeNotIn = typesArray => compose(
not,
unary(flip(contains)(typesArray)),
prop('type')
);
const getAllPages = (optsIn) => {
const f$ = fetchAllPagesIndividually(optsIn)
.shareReplay(1);
const allPagesRequest$ = f$
.filter(typeIs('request'))
.map(prop('data'));
const allPagesResponse$ = f$
.filter(typeIs('response'))
.map(prop('data'));
const theRest$ = f$
.filter(typeNotIn(['request', 'response', 'data']));
const latestData$ = allPagesResponse$
.map(prop('response'))
.scan(concatPages);
return {
allPagesRequest$,
allPagesResponse$,
latestData$,
theRest$,
}
};
compose()
、not()
、merge()
、unary()
等来自Ramda.
这是使用 Cycle.js 和 @cycle/fetch
驱动程序处理任意数量的顺序依赖请求的另一种方式。
(使用GitHub用户API。users查询returns每页30个用户,since
URL参数为用户id号和在 next 用户 ID 处开始查询。)
首先是 main
函数的主要部分,带有注释:
const listResponse$ = sources.FETCH // response returned from FETCH driver
.mergeAll()
.flatMap(res => res.json())
.scan(
((userstotals, users) =>
[
userstotals[0] + 1, // page count
users[29] && users[29].id, // last id on full page
userstotals[2].concat(users) // collect all users
]
),
[0, undefined, []] // default accumulator
)
.share(); // allows stream split
// Branch #1 - cycle again for more pages
const listRequest$ = listResponse$
.filter(users =>
0 < users[1] && // full page id exists
maxpages > users[0]; // less than maxpages
)
.startWith('initial')
.map(users =>
`https:\/\/api.github.com/users?since=${
(!isNaN(parseInt(users[1], 10)) && users[1]) || // last id full page
idstart // default id start
}`
);
// Branch #2 - display
const dom$ = listResponse$
.map(userstotals => div(JSON.stringify(userstotals[2])));
(这是一个更新的答案。我意识到 scan
可以合并为一个。)
说明:首先从sources
属性FETCH
中拉出响应,将其拉平并拉出JSON,然后scan
来计算如何到目前为止已经查询了很多页面。查询的页数稍后与maxpages
进行比较,以免超过预定数量。接下来获取完整页面的最后一个 id
(如果存在),最后一个 concat
当前用户页面,其中包含迄今为止积累的用户页面集合。在累积响应信息后 share
流,因此它可以分成两个分支。
第一个分支用于通过FETCH
驱动程序重新循环查询以查询更多页面。但首先 filter
检查最后一页和查询的页数。如果 id 不是数字,则已到达最后一页。如果已经到达最后一页并且因此没有更多页面可查询,请不要继续。如果查询的页数超过maxpages
.
第二个分支简单地进入累积响应以获得完整的用户列表,然后 JSON.stringify
s 对象并将其转换为虚拟 dom 对象(div
方法) 发送到 DOM 驱动程序进行显示。
这是完整的脚本:
import Cycle from '@cycle/rx-run';
import {div, makeDOMDriver} from '@cycle/dom';
import {makeFetchDriver} from '@cycle/fetch';
function main(sources) { // provides properties DOM and FETCH (evt. streams)
const acctok = ''; // put your token here, if necessary
const idstart = 19473200; // where do you want to start?
const maxpages = 10;
const listResponse$ = sources.FETCH
.mergeAll()
.flatMap(res => res.json())
.scan(
((userstotals, users) =>
[
userstotals[0] + 1, // page count
users[29] && users[29].id, // last id on full page
userstotals[2].concat(users) // collect all users
]
),
[0, undefined, []]
)
.share();
const listRequest$ = listResponse$
.filter(function (users) {
return 0 < users[1] && maxpages > users[0];
})
.startWith('initial')
.map(users =>
`https:\/\/api.github.com/users?since=${
(!isNaN(parseInt(users[1], 10)) && users[1]) || // last id full page
idstart // default id start
}` //&access_token=${acctok}`
);
const dom$ = listResponse$
.map(userstotals => div(JSON.stringify(userstotals[2])));
return {
DOM: dom$,
FETCH: listRequest$
};
}
Cycle.run(main, {
DOM: makeDOMDriver('#main-container'),
FETCH: makeFetchDriver()
});
(我的第一个答案,留给后人。注意两个 scan
。)
const listResponse$ = sources.FETCH
.mergeAll()
.flatMap(res => res.json())
.scan(((userscount, users) => // <-- scan #1
[userscount[0] + 1, users]), [0, []]
)
.share();
const listRequest$ = listResponse$
.filter(function (users) {
return users[1][29] && users[1][29].id &&
maxpages > users[0];
})
.startWith('initial')
.map(users =>
`https://api.github.com/users?since=${
(users[1][users[1].length-1] && users[1][users[1].length-1].id) ||
idstart
}`
);
const dom$ = listResponse$
.scan(function (usersall, users) { // <-- scan #2
usersall.push(users);
return usersall;
}, [])
.map(res => div(JSON.stringify(res)));
通过 scan
ing 一次,在前面,然后我需要获取完整页面的最后一个 ID(如果存在),并将其存储在累加器中。