由于 Mac m1 中的 RocksDB,Kafka Streams groupByKey 无法正常工作

Kafka Streams groupByKey not working due to RocksDB in Mac m1

当我尝试在函数中使用 kafka 流的 groupByKey 功能时,出现以下 rocksdb 错误。而一个简单的消费者函数就可以正常工作。

环境:confluent v1.30.0(在单节点开发环境上融合运行)

OS: Apple m1 mac - Big Sur v11.5.1 with rosetta

Java: openjdk 11.0.12 2021-07-20 LTS

错误跟踪:

Exception in thread "streamApp-c9d33475-adca-4567-8ec1-8db1fe4bc4a9-StreamThread-1" java.lang.UnsatisfiedLinkError: /private/var/folders/2g/x44n4hwx19v_m7l09fq_r6k80000gn/T/librocksdbjni2764289432609839437.jnilib: dlopen(/private/var/folders/2g/x44n4hwx19v_m7l09fq_r6k80000gn/T/librocksdbjni2764289432609839437.jnilib, 1): no suitable image found.  Did find:
    /private/var/folders/2g/x44n4hwx19v_m7l09fq_r6k80000gn/T/librocksdbjni2764289432609839437.jnilib: mach-o, but wrong architecture
    /private/var/folders/2g/x44n4hwx19v_m7l09fq_r6k80000gn/T/librocksdbjni2764289432609839437.jnilib: mach-o, but wrong architecture
    at java.base/java.lang.ClassLoader$NativeLibrary.load0(Native Method)
    at java.base/java.lang.ClassLoader$NativeLibrary.load(ClassLoader.java:2442)
    at java.base/java.lang.ClassLoader$NativeLibrary.loadLibrary(ClassLoader.java:2498)
    at java.base/java.lang.ClassLoader.loadLibrary0(ClassLoader.java:2694)
    at java.base/java.lang.ClassLoader.loadLibrary(ClassLoader.java:2627)
    at java.base/java.lang.Runtime.load0(Runtime.java:768)
    at java.base/java.lang.System.load(System.java:1837)
    at org.rocksdb.NativeLibraryLoader.loadLibraryFromJar(NativeLibraryLoader.java:78)
    at org.rocksdb.NativeLibraryLoader.loadLibrary(NativeLibraryLoader.java:56)
    at org.rocksdb.RocksDB.loadLibrary(RocksDB.java:64)
    at org.rocksdb.RocksDB.<clinit>(RocksDB.java:35)
    at org.rocksdb.DBOptions.<clinit>(DBOptions.java:21)
    at org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:128)
    at org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:253)
    at org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:55)
    at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.init(ChangeLoggingKeyValueBytesStore.java:54)
    at org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:55)
    at org.apache.kafka.streams.state.internals.CachingKeyValueStore.init(CachingKeyValueStore.java:74)
    at org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:55)
    at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$init(MeteredKeyValueStore.java:120)
    at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:883)
    at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:120)
    at org.apache.kafka.streams.processor.internals.ProcessorStateManager.registerStateStores(ProcessorStateManager.java:201)
    at org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:103)
    at org.apache.kafka.streams.processor.internals.StreamTask.initializeIfNeeded(StreamTask.java:210)
    at org.apache.kafka.streams.processor.internals.TaskManager.tryToCompleteRestoration(TaskManager.java:473)
    at org.apache.kafka.streams.processor.internals.StreamThread.initializeAndRestorePhase(StreamThread.java:754)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:636)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:564)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:523)

有问题的代码

@Bean
public Function<KStream<String, String>, KStream<String, String>> streamApp() {
    return kstream -> kstream
            .peek((key, value) -> logger.info("streamApp triggered with key {}, value {}", key, value))
            .groupByKey()
            .reduce((s, v1) -> s + ", " + v1)
            .toStream()
            .flatMap((ignoredWindowedKey, value) -> getChanges(value));
}

正常工作代码:

@Bean
public Function<KStream<String, String>, KStream<String, String>> streamApp() {
    return kstream -> kstream
            .peek((key, value) -> logger.info("streamApp triggered with key {}, value {}", key, value))
            .flatMap((ignoredWindowedKey, value) -> getChanges(value));
}

rocksDB 本身还不支持 Apple Silicon。

正在为 Apple Silicon 开发 rocksDB 版本,但截至今天 (2021-08-07) 尚未完成:https://github.com/facebook/rocksdb/issues/7720. In this GitHub issue, you can find a link to the code - also included here - of a working version of rocksDB on Apple Silicon, which you can use to build rocksDB yourself: https://github.com/adamretter/rocksdb/tree/macos-multi-arch

您还可以 运行 您的代码使用 x86 JDK 和 Rosetta 仿真,其中 rocksDB 应该 运行 正确。这个 Whosebug 答案可能会帮助您: