在 rxjs 中实现 fromSubscriber
Implementing fromSubscriber in rxjs
我 运行 今天遇到了一个有趣的问题。我正在开发一个可以上传文件的应用程序,我们想实现一个进度条。该应用程序是使用 React/Redux/Redux-Observable 编写的。我想为上传进度发送操作。这是我为实现它所做的工作:
withProgress(method, url, body = {}, headers = {}) {
const progressSubscriber = Subscriber.create();
return {
Subscriber: progressSubscriber,
Request: this.ajax({ url, method, body, headers, progressSubscriber }),
};
}
我有一个 class 用于发出所有 ajax 请求。 this.ajax
使用传入的参数调用 Observable.ajax
。
export const blobStorageUploadEpic = (action$) => {
return action$.ofType(a.BLOB_STORAGE_UPLOAD)
.mergeMap(({ payload }) => {
const { url, valetKey, blobId, blobData, contentType } = payload;
const { Subscriber, Request } = RxAjax.withProgress('PUT', `${url}?${valetKey}`, blobData, {
'x-ms-blob-type': 'BlockBlob',
'Content-Type': contentType,
});
const requestObservable = Request
.map(() => ({ type: a.BLOB_STORAGE_UPLOAD_SUCCESS, payload: { blobId } }))
.catch((err) => Observable.of({ type: a.BLOB_STORAGE_UPLOAD_FAILURE, err }));
return Observable.fromSubscriber(Subscriber)
.map((e) => ({ percentage: (e.loaded / e.total) * 100 }))
.map((percentage) => ({ type: a.BLOB_STORAGE_UPLOAD_PROGRESS, payload: { percentage} }))
.merge(requestObservable);
});
};
这是我的史诗。我找回订阅者,并编写了 Observable
的自定义静态方法来接收订阅者。然后我将其与 Request
(这是一个 Observable
)合并。
Observable.fromSubscriber = function fromSubscriber(externalSubscriber) {
return Observable.create((subscriber) => {
externalSubscriber.next = (val) => subscriber.next(val);
externalSubscriber.error = (err) => subscriber.error(err);
externalSubscriber.complete = () => subscriber.complete();
});
};
最后,这是我在Observable
上写的自定义静态方法。我写这篇文章有两个原因。 1.作为其他处理类似问题的人的例子(我花了很多时间试图弄清楚如何在编写自己的 Subscriber
之前弄清楚如何从 Observable
制作一个 Observable
)和 2. 问这是否是实现此目标的最佳方法。 rxjs
很深,我认为有一种现有的方法可以做到这一点,但我找不到它。
这基本上就是 Subject
的用途,以下内容也应该有效:
export const blobStorageUploadEpic = (action$) => {
return action$.ofType(a.BLOB_STORAGE_UPLOAD)
.mergeMap(({ payload }) => {
const { url, valetKey, blobId, blobData, contentType } = payload;
const progressSubscriber = new Rx.Subject();
const request = Rx.Observable.ajax({
method: 'PUT',
url: `${url}?${valetKey}`,
body: blobData,
headers: {
'x-ms-blob-type': 'BlockBlob',
'Content-Type': contentType,
},
progressSubscriber
});
const requestObservable = request
.map(() => ({ type: a.BLOB_STORAGE_UPLOAD_SUCCESS, payload: { blobId } }))
.catch((err) => Observable.of({ type: a.BLOB_STORAGE_UPLOAD_FAILURE, err }));
return progressSubscriber
.map((e) => ({ percentage: (e.loaded / e.total) * 100 }))
.map((percentage) => ({ type: a.BLOB_STORAGE_UPLOAD_PROGRESS, payload: { percentage} }))
.merge(requestObservable);
});
};
这是一个更通用的示例(实时 @jsfiddle):
let data = "";
for (let c = 0; c < 100000; ++c) {
data += "" + Math.random();
}
const progressSubscriber = new Rx.Subject();
const request = Rx.Observable.ajax({
method: 'POST',
url: "/echo/json/",
body: JSON.stringify({ data }),
progressSubscriber
});
progressSubscriber
.merge(request)
.subscribe(console.log);
我 运行 今天遇到了一个有趣的问题。我正在开发一个可以上传文件的应用程序,我们想实现一个进度条。该应用程序是使用 React/Redux/Redux-Observable 编写的。我想为上传进度发送操作。这是我为实现它所做的工作:
withProgress(method, url, body = {}, headers = {}) {
const progressSubscriber = Subscriber.create();
return {
Subscriber: progressSubscriber,
Request: this.ajax({ url, method, body, headers, progressSubscriber }),
};
}
我有一个 class 用于发出所有 ajax 请求。 this.ajax
使用传入的参数调用 Observable.ajax
。
export const blobStorageUploadEpic = (action$) => {
return action$.ofType(a.BLOB_STORAGE_UPLOAD)
.mergeMap(({ payload }) => {
const { url, valetKey, blobId, blobData, contentType } = payload;
const { Subscriber, Request } = RxAjax.withProgress('PUT', `${url}?${valetKey}`, blobData, {
'x-ms-blob-type': 'BlockBlob',
'Content-Type': contentType,
});
const requestObservable = Request
.map(() => ({ type: a.BLOB_STORAGE_UPLOAD_SUCCESS, payload: { blobId } }))
.catch((err) => Observable.of({ type: a.BLOB_STORAGE_UPLOAD_FAILURE, err }));
return Observable.fromSubscriber(Subscriber)
.map((e) => ({ percentage: (e.loaded / e.total) * 100 }))
.map((percentage) => ({ type: a.BLOB_STORAGE_UPLOAD_PROGRESS, payload: { percentage} }))
.merge(requestObservable);
});
};
这是我的史诗。我找回订阅者,并编写了 Observable
的自定义静态方法来接收订阅者。然后我将其与 Request
(这是一个 Observable
)合并。
Observable.fromSubscriber = function fromSubscriber(externalSubscriber) {
return Observable.create((subscriber) => {
externalSubscriber.next = (val) => subscriber.next(val);
externalSubscriber.error = (err) => subscriber.error(err);
externalSubscriber.complete = () => subscriber.complete();
});
};
最后,这是我在Observable
上写的自定义静态方法。我写这篇文章有两个原因。 1.作为其他处理类似问题的人的例子(我花了很多时间试图弄清楚如何在编写自己的 Subscriber
之前弄清楚如何从 Observable
制作一个 Observable
)和 2. 问这是否是实现此目标的最佳方法。 rxjs
很深,我认为有一种现有的方法可以做到这一点,但我找不到它。
这基本上就是 Subject
的用途,以下内容也应该有效:
export const blobStorageUploadEpic = (action$) => {
return action$.ofType(a.BLOB_STORAGE_UPLOAD)
.mergeMap(({ payload }) => {
const { url, valetKey, blobId, blobData, contentType } = payload;
const progressSubscriber = new Rx.Subject();
const request = Rx.Observable.ajax({
method: 'PUT',
url: `${url}?${valetKey}`,
body: blobData,
headers: {
'x-ms-blob-type': 'BlockBlob',
'Content-Type': contentType,
},
progressSubscriber
});
const requestObservable = request
.map(() => ({ type: a.BLOB_STORAGE_UPLOAD_SUCCESS, payload: { blobId } }))
.catch((err) => Observable.of({ type: a.BLOB_STORAGE_UPLOAD_FAILURE, err }));
return progressSubscriber
.map((e) => ({ percentage: (e.loaded / e.total) * 100 }))
.map((percentage) => ({ type: a.BLOB_STORAGE_UPLOAD_PROGRESS, payload: { percentage} }))
.merge(requestObservable);
});
};
这是一个更通用的示例(实时 @jsfiddle):
let data = "";
for (let c = 0; c < 100000; ++c) {
data += "" + Math.random();
}
const progressSubscriber = new Rx.Subject();
const request = Rx.Observable.ajax({
method: 'POST',
url: "/echo/json/",
body: JSON.stringify({ data }),
progressSubscriber
});
progressSubscriber
.merge(request)
.subscribe(console.log);