使用 RxJava 的多个 POST 请求

Multiple POST requests with RxJava

我有一份图书清单。

open class Book(
    @PrimaryKey
    var id: String? = null,
    var title: String? = null,
    var author: String? = null
): RealmObject()

在进行循环时,我过滤了一些书籍并创建了一个包含过滤后书籍的 Observable。我将每个 observable 添加到一个数组中。

val listInserts = ArrayList<Observable<Book>>()
for loop(..) {
  if (localBook condition) {
    val postObservable = networkApiAdapter.insert(localBook)
    listInserts.add(postObservable)
  }
}

我合并(或连接)可观察对象,希望得到一些顺序的 POST 请求。

        Observable.concat(listInserts)
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(
                { s ->
                    println(s)
                },
                { err ->
                    println(err)
                },
                { println("onComplete") }
            )

我的 MongoDb+Flask 服务器总是 只收到一个 POST 请求 ,我也收到此错误:

com.google.gson.JsonSyntaxException: java.lang.IllegalStateException: Expected BEGIN_OBJECT but was NUMBER at line 1 column 4 path $

networkApiAdapter 函数,Retrofit:

class NetworkAPIAdapter private constructor() {
    fun insert(dto: Book): Observable<Book> {
        println(dto.toString())
        return bookService.insert(dto.title!!, dto.author!!)
    }

    interface BooksService {
        @FormUrlEncoded
        @POST(URL_ORDERS_ALL)
        fun insert(
            @Field("title") title: String,
            @Field("author") author: String
        ): Observable<Book>
    }
}

欢迎任何帮助。我不知道如何执行多个请求。我用 zip、repeatUntil、flatMap 尝试了很多解决方案,但 none 有效。

解决这个问题后,我必须删除所有本地书籍并进行GET请求。全部使用 RxJava 以某种方式工作。

您的 BooksService.insert 被定义为一个 POST 请求,它期望服务器以 [​​=17=] JSON 字符串的形式(即 Observable<Book>)作出响应.我认为在 POST 请求通过后,这不是您的服务器 returns。检查你的 POST 请求 returns,我认为它 returns 一些 Int (可能是响应状态)而不是 Book,这可以解释你的错误:

com.google.gson.JsonSyntaxException: java.lang.IllegalStateException: Expected BEGIN_OBJECT but was NUMBER at line 1 column 4 path $

如果是这种情况,您的串联 Observable 将在抛出错误后立即终止,即在您的第一个 POST 请求之后立即终止,因此将不会执行其余请求。

如果我的假设是正确的,请将您的服务更改为:

interface BooksService {
        @FormUrlEncoded
        @POST(URL_ORDERS_ALL)
        fun insert(
            @Field("title") title: String,
            @Field("author") author: String
        ): Observable<Int>
    }

此外,如果您希望其他请求即使在前一个抛出错误时也能执行,请执行:

val postObservable = networkApiAdapter.insert(localBook).onErrorReturn(_ -> SOME_INT_FLAG)

更新

要在发送完所有 POST 请求后执行所需的逻辑,请尝试:

    Observable.concat(listInserts)
        // complete this observable after all POST requests have been sent
        // not that it does not necesarily mean that all responses were 200, you should implement yourself what happens to items that were not successfully `POST`ed
        .take(listInserts.size)
        .doOnComplete { 
            // called when this observable completes, i.e. when all POST requests have been sent
            executeFinalActions()
        }
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(
            { s->
                println(s)
            },
            { err ->
                println(err)
            },
            { println("onComplete") }
        )

并实现executeFinalActions()函数:

private fun executeFinalActions() {
    realm.executeTransaction(Realm::deleteAll)
    networkApiAdapter.fetchAll()
    .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(
            { books ->
                // todo: store books
            },
            { err ->
                println(err)
            }
        )
}

PS:您可能想看看 Observable.take() 做了什么:http://reactivex.io/documentation/operators/take.html