使用 RxJava 的多个 POST 请求
Multiple POST requests with RxJava
我有一份图书清单。
open class Book(
@PrimaryKey
var id: String? = null,
var title: String? = null,
var author: String? = null
): RealmObject()
在进行循环时,我过滤了一些书籍并创建了一个包含过滤后书籍的 Observable。我将每个 observable 添加到一个数组中。
val listInserts = ArrayList<Observable<Book>>()
for loop(..) {
if (localBook condition) {
val postObservable = networkApiAdapter.insert(localBook)
listInserts.add(postObservable)
}
}
我合并(或连接)可观察对象,希望得到一些顺序的 POST 请求。
Observable.concat(listInserts)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
{ s ->
println(s)
},
{ err ->
println(err)
},
{ println("onComplete") }
)
我的 MongoDb+Flask 服务器总是 只收到一个 POST 请求 ,我也收到此错误:
com.google.gson.JsonSyntaxException: java.lang.IllegalStateException: Expected BEGIN_OBJECT but was NUMBER at line 1 column 4 path $
networkApiAdapter 函数,Retrofit:
class NetworkAPIAdapter private constructor() {
fun insert(dto: Book): Observable<Book> {
println(dto.toString())
return bookService.insert(dto.title!!, dto.author!!)
}
interface BooksService {
@FormUrlEncoded
@POST(URL_ORDERS_ALL)
fun insert(
@Field("title") title: String,
@Field("author") author: String
): Observable<Book>
}
}
欢迎任何帮助。我不知道如何执行多个请求。我用 zip、repeatUntil、flatMap 尝试了很多解决方案,但 none 有效。
解决这个问题后,我必须删除所有本地书籍并进行GET请求。全部使用 RxJava 以某种方式工作。
您的 BooksService.insert
被定义为一个 POST
请求,它期望服务器以 [=17=] JSON 字符串的形式(即 Observable<Book>
)作出响应.我认为在 POST
请求通过后,这不是您的服务器 returns。检查你的 POST
请求 returns,我认为它 returns 一些 Int
(可能是响应状态)而不是 Book
,这可以解释你的错误:
com.google.gson.JsonSyntaxException: java.lang.IllegalStateException: Expected BEGIN_OBJECT but was NUMBER at line 1 column 4 path $
如果是这种情况,您的串联 Observable 将在抛出错误后立即终止,即在您的第一个 POST
请求之后立即终止,因此将不会执行其余请求。
如果我的假设是正确的,请将您的服务更改为:
interface BooksService {
@FormUrlEncoded
@POST(URL_ORDERS_ALL)
fun insert(
@Field("title") title: String,
@Field("author") author: String
): Observable<Int>
}
此外,如果您希望其他请求即使在前一个抛出错误时也能执行,请执行:
val postObservable = networkApiAdapter.insert(localBook).onErrorReturn(_ -> SOME_INT_FLAG)
更新
要在发送完所有 POST
请求后执行所需的逻辑,请尝试:
Observable.concat(listInserts)
// complete this observable after all POST requests have been sent
// not that it does not necesarily mean that all responses were 200, you should implement yourself what happens to items that were not successfully `POST`ed
.take(listInserts.size)
.doOnComplete {
// called when this observable completes, i.e. when all POST requests have been sent
executeFinalActions()
}
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
{ s->
println(s)
},
{ err ->
println(err)
},
{ println("onComplete") }
)
并实现executeFinalActions()
函数:
private fun executeFinalActions() {
realm.executeTransaction(Realm::deleteAll)
networkApiAdapter.fetchAll()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
{ books ->
// todo: store books
},
{ err ->
println(err)
}
)
}
PS:您可能想看看 Observable.take()
做了什么:http://reactivex.io/documentation/operators/take.html
我有一份图书清单。
open class Book(
@PrimaryKey
var id: String? = null,
var title: String? = null,
var author: String? = null
): RealmObject()
在进行循环时,我过滤了一些书籍并创建了一个包含过滤后书籍的 Observable。我将每个 observable 添加到一个数组中。
val listInserts = ArrayList<Observable<Book>>()
for loop(..) {
if (localBook condition) {
val postObservable = networkApiAdapter.insert(localBook)
listInserts.add(postObservable)
}
}
我合并(或连接)可观察对象,希望得到一些顺序的 POST 请求。
Observable.concat(listInserts)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
{ s ->
println(s)
},
{ err ->
println(err)
},
{ println("onComplete") }
)
我的 MongoDb+Flask 服务器总是 只收到一个 POST 请求 ,我也收到此错误:
com.google.gson.JsonSyntaxException: java.lang.IllegalStateException: Expected BEGIN_OBJECT but was NUMBER at line 1 column 4 path $
networkApiAdapter 函数,Retrofit:
class NetworkAPIAdapter private constructor() {
fun insert(dto: Book): Observable<Book> {
println(dto.toString())
return bookService.insert(dto.title!!, dto.author!!)
}
interface BooksService {
@FormUrlEncoded
@POST(URL_ORDERS_ALL)
fun insert(
@Field("title") title: String,
@Field("author") author: String
): Observable<Book>
}
}
欢迎任何帮助。我不知道如何执行多个请求。我用 zip、repeatUntil、flatMap 尝试了很多解决方案,但 none 有效。
解决这个问题后,我必须删除所有本地书籍并进行GET请求。全部使用 RxJava 以某种方式工作。
您的 BooksService.insert
被定义为一个 POST
请求,它期望服务器以 [=17=] JSON 字符串的形式(即 Observable<Book>
)作出响应.我认为在 POST
请求通过后,这不是您的服务器 returns。检查你的 POST
请求 returns,我认为它 returns 一些 Int
(可能是响应状态)而不是 Book
,这可以解释你的错误:
com.google.gson.JsonSyntaxException: java.lang.IllegalStateException: Expected BEGIN_OBJECT but was NUMBER at line 1 column 4 path $
如果是这种情况,您的串联 Observable 将在抛出错误后立即终止,即在您的第一个 POST
请求之后立即终止,因此将不会执行其余请求。
如果我的假设是正确的,请将您的服务更改为:
interface BooksService {
@FormUrlEncoded
@POST(URL_ORDERS_ALL)
fun insert(
@Field("title") title: String,
@Field("author") author: String
): Observable<Int>
}
此外,如果您希望其他请求即使在前一个抛出错误时也能执行,请执行:
val postObservable = networkApiAdapter.insert(localBook).onErrorReturn(_ -> SOME_INT_FLAG)
更新
要在发送完所有 POST
请求后执行所需的逻辑,请尝试:
Observable.concat(listInserts)
// complete this observable after all POST requests have been sent
// not that it does not necesarily mean that all responses were 200, you should implement yourself what happens to items that were not successfully `POST`ed
.take(listInserts.size)
.doOnComplete {
// called when this observable completes, i.e. when all POST requests have been sent
executeFinalActions()
}
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
{ s->
println(s)
},
{ err ->
println(err)
},
{ println("onComplete") }
)
并实现executeFinalActions()
函数:
private fun executeFinalActions() {
realm.executeTransaction(Realm::deleteAll)
networkApiAdapter.fetchAll()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
{ books ->
// todo: store books
},
{ err ->
println(err)
}
)
}
PS:您可能想看看 Observable.take()
做了什么:http://reactivex.io/documentation/operators/take.html