JNI - 在本机 cpp 回调函数中使用 RxJava 的奇怪行为

JNI - Weird behavior using RxJava in native cpp callback function

我有一个要从我的 Kotlin 代码中调用的 C++ 函数。 该 c++ 函数获取回调函数作为参数,执行一些工作并在完成时调用回调。

我之前已经做过几次了,一切正常。但是,我想以某种方式包装它,而不是传递回调,而是 return 一个 Observable,它会在调用回调时发出一个值。

我用更简单的代码创建了一个示例。 到目前为止我做了什么:

科特林代码:

fun someFunc(str: String): Observable<String> {
    val subject = PublishSubject.create<String>()
    nativeFunc(object: TestCallback {
        override fun invoke(event: String) {
            println("Callback invoked. subject = $subject")
            subject.onNext("$event - $str")
        }
    })
    return subject
}

private external fun nativeFunc(callback: TestCallback)

Kotlin 回调函数接口:


interface TestCallback {
    fun invoke(event: String)
}

本机 JNI 代码:

extern "C"
JNIEXPORT void JNICALL
Java_com_myProject_TestClass_nativeFunc(JNIEnv *env, jobject thiz, jobject callback) {
    env->GetJavaVM(&g_vm);
    auto g_callback = env->NewGlobalRef(callback);

    std::function<void()> * pCompletion = new std::function<void()>([g_callback]() {
        JNIEnv *newEnv = GetJniEnv();
        jclass callbackClazz = newEnv->FindClass("com/myproject/TestCallback");
        jmethodID invokeMethod = newEnv->GetMethodID(callbackClazz, "invoke", "(Ljava/lang/String;)V");
        string callbackStr = "Callback called";
        newEnv->CallVoidMethod(g_callback, invokeMethod, newEnv->NewStringUTF(callbackStr.c_str()));
        newEnv->DeleteGlobalRef(g_callback);
    });
    pCompletion->operator()(); // <--Similar function is passed to the c++ function. Lets skip that
}

一个测试函数运行把它全部放在一起

@Test
fun testSubject() {
    val testClass = TestClass()
    val someList = listOf("a", "b", "c")
    var done = false
    Observable.concat(someList.map { testClass.someFunc(it) })
        .take(3)
        .doOnNext { println("got next: $it") }
        .doOnComplete { done = true }
        .subscribe()
    while (!done);
}

测试函数 运行s someFunc 函数的 3 倍(return 一个 Observable 实例,完成时发出一个字符串)并将所有 Observables 连接在一起。

我希望打印的内容:

Callback invoked. subject = io.reactivex.subjects.PublishSubject@1f7acc8
got next: Callback called - a
Callback invoked. subject = io.reactivex.subjects.PublishSubject@7c9b161
got next: Callback called - b
Callback invoked. subject = io.reactivex.subjects.PublishSubject@6f24486
got next: Callback called - c

然而实际结果是:

Callback invoked. subject = io.reactivex.subjects.PublishSubject@1f7acc8
Callback invoked. subject = io.reactivex.subjects.PublishSubject@7c9b161
Callback invoked. subject = io.reactivex.subjects.PublishSubject@6f24486

似乎一切都按预期工作,但是,尽管该行 println("Callback invoked. subject = $subject") 打印(带有正确的主题地址)时,onNext 不工作并且由于某种原因不发出任何内容。 我在没有本机回调的情况下检查了相同的功能,一切正常。

有什么建议吗???

所以经过一些研究我发现:

  1. 当我从 Java 调用 C/C++ 函数时,JNI 不会在后台创建任何新线程。 。因此,
  2. 代码 运行 是同步的,意思是 - 主题发出一个项目,然后函数 returns 主题和被订阅。所以在订阅之后,它错过了发出的项目并丢失了它。
  3. 我说错了“我在没有本机回调的情况下检查了相同的功能并且一切正常。”。我可能在那里犯了一个错误,使非本机代码异步,这让我“准时”返回了主题并按预期打印了日志。

解决方案是将 PublishSubject 更改为 BehaviorSubject 或 ReplaySubject 以缓存发出的项目并在订阅后获取它。 另一种解决方案是将对本机函数的调用切换到另一个线程中的 运行,因此当函数 运行 时,主题已经返回并被订阅。