由于 Mac m1 中的 RocksDB,Kafka Streams groupByKey 无法正常工作
Kafka Streams groupByKey not working due to RocksDB in Mac m1
当我尝试在函数中使用 kafka 流的 groupByKey
功能时,出现以下 rocksdb 错误。而一个简单的消费者函数就可以正常工作。
环境:confluent v1.30.0
(在单节点开发环境上融合运行)
OS: Apple m1 mac - Big Sur v11.5.1 with rosetta
Java: openjdk 11.0.12 2021-07-20 LTS
错误跟踪:
Exception in thread "streamApp-c9d33475-adca-4567-8ec1-8db1fe4bc4a9-StreamThread-1" java.lang.UnsatisfiedLinkError: /private/var/folders/2g/x44n4hwx19v_m7l09fq_r6k80000gn/T/librocksdbjni2764289432609839437.jnilib: dlopen(/private/var/folders/2g/x44n4hwx19v_m7l09fq_r6k80000gn/T/librocksdbjni2764289432609839437.jnilib, 1): no suitable image found. Did find:
/private/var/folders/2g/x44n4hwx19v_m7l09fq_r6k80000gn/T/librocksdbjni2764289432609839437.jnilib: mach-o, but wrong architecture
/private/var/folders/2g/x44n4hwx19v_m7l09fq_r6k80000gn/T/librocksdbjni2764289432609839437.jnilib: mach-o, but wrong architecture
at java.base/java.lang.ClassLoader$NativeLibrary.load0(Native Method)
at java.base/java.lang.ClassLoader$NativeLibrary.load(ClassLoader.java:2442)
at java.base/java.lang.ClassLoader$NativeLibrary.loadLibrary(ClassLoader.java:2498)
at java.base/java.lang.ClassLoader.loadLibrary0(ClassLoader.java:2694)
at java.base/java.lang.ClassLoader.loadLibrary(ClassLoader.java:2627)
at java.base/java.lang.Runtime.load0(Runtime.java:768)
at java.base/java.lang.System.load(System.java:1837)
at org.rocksdb.NativeLibraryLoader.loadLibraryFromJar(NativeLibraryLoader.java:78)
at org.rocksdb.NativeLibraryLoader.loadLibrary(NativeLibraryLoader.java:56)
at org.rocksdb.RocksDB.loadLibrary(RocksDB.java:64)
at org.rocksdb.RocksDB.<clinit>(RocksDB.java:35)
at org.rocksdb.DBOptions.<clinit>(DBOptions.java:21)
at org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:128)
at org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:253)
at org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:55)
at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.init(ChangeLoggingKeyValueBytesStore.java:54)
at org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:55)
at org.apache.kafka.streams.state.internals.CachingKeyValueStore.init(CachingKeyValueStore.java:74)
at org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:55)
at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$init(MeteredKeyValueStore.java:120)
at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:883)
at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:120)
at org.apache.kafka.streams.processor.internals.ProcessorStateManager.registerStateStores(ProcessorStateManager.java:201)
at org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:103)
at org.apache.kafka.streams.processor.internals.StreamTask.initializeIfNeeded(StreamTask.java:210)
at org.apache.kafka.streams.processor.internals.TaskManager.tryToCompleteRestoration(TaskManager.java:473)
at org.apache.kafka.streams.processor.internals.StreamThread.initializeAndRestorePhase(StreamThread.java:754)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:636)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:564)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:523)
有问题的代码
@Bean
public Function<KStream<String, String>, KStream<String, String>> streamApp() {
return kstream -> kstream
.peek((key, value) -> logger.info("streamApp triggered with key {}, value {}", key, value))
.groupByKey()
.reduce((s, v1) -> s + ", " + v1)
.toStream()
.flatMap((ignoredWindowedKey, value) -> getChanges(value));
}
正常工作代码:
@Bean
public Function<KStream<String, String>, KStream<String, String>> streamApp() {
return kstream -> kstream
.peek((key, value) -> logger.info("streamApp triggered with key {}, value {}", key, value))
.flatMap((ignoredWindowedKey, value) -> getChanges(value));
}
rocksDB 本身还不支持 Apple Silicon。
正在为 Apple Silicon 开发 rocksDB 版本,但截至今天 (2021-08-07) 尚未完成:https://github.com/facebook/rocksdb/issues/7720. In this GitHub issue, you can find a link to the code - also included here - of a working version of rocksDB on Apple Silicon, which you can use to build rocksDB yourself: https://github.com/adamretter/rocksdb/tree/macos-multi-arch
您还可以 运行 您的代码使用 x86 JDK 和 Rosetta 仿真,其中 rocksDB 应该 运行 正确。这个 Whosebug 答案可能会帮助您:
当我尝试在函数中使用 kafka 流的 groupByKey
功能时,出现以下 rocksdb 错误。而一个简单的消费者函数就可以正常工作。
环境:confluent v1.30.0
(在单节点开发环境上融合运行)
OS: Apple m1 mac - Big Sur v11.5.1 with rosetta
Java: openjdk 11.0.12 2021-07-20 LTS
错误跟踪:
Exception in thread "streamApp-c9d33475-adca-4567-8ec1-8db1fe4bc4a9-StreamThread-1" java.lang.UnsatisfiedLinkError: /private/var/folders/2g/x44n4hwx19v_m7l09fq_r6k80000gn/T/librocksdbjni2764289432609839437.jnilib: dlopen(/private/var/folders/2g/x44n4hwx19v_m7l09fq_r6k80000gn/T/librocksdbjni2764289432609839437.jnilib, 1): no suitable image found. Did find:
/private/var/folders/2g/x44n4hwx19v_m7l09fq_r6k80000gn/T/librocksdbjni2764289432609839437.jnilib: mach-o, but wrong architecture
/private/var/folders/2g/x44n4hwx19v_m7l09fq_r6k80000gn/T/librocksdbjni2764289432609839437.jnilib: mach-o, but wrong architecture
at java.base/java.lang.ClassLoader$NativeLibrary.load0(Native Method)
at java.base/java.lang.ClassLoader$NativeLibrary.load(ClassLoader.java:2442)
at java.base/java.lang.ClassLoader$NativeLibrary.loadLibrary(ClassLoader.java:2498)
at java.base/java.lang.ClassLoader.loadLibrary0(ClassLoader.java:2694)
at java.base/java.lang.ClassLoader.loadLibrary(ClassLoader.java:2627)
at java.base/java.lang.Runtime.load0(Runtime.java:768)
at java.base/java.lang.System.load(System.java:1837)
at org.rocksdb.NativeLibraryLoader.loadLibraryFromJar(NativeLibraryLoader.java:78)
at org.rocksdb.NativeLibraryLoader.loadLibrary(NativeLibraryLoader.java:56)
at org.rocksdb.RocksDB.loadLibrary(RocksDB.java:64)
at org.rocksdb.RocksDB.<clinit>(RocksDB.java:35)
at org.rocksdb.DBOptions.<clinit>(DBOptions.java:21)
at org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:128)
at org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:253)
at org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:55)
at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.init(ChangeLoggingKeyValueBytesStore.java:54)
at org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:55)
at org.apache.kafka.streams.state.internals.CachingKeyValueStore.init(CachingKeyValueStore.java:74)
at org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:55)
at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$init(MeteredKeyValueStore.java:120)
at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:883)
at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:120)
at org.apache.kafka.streams.processor.internals.ProcessorStateManager.registerStateStores(ProcessorStateManager.java:201)
at org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:103)
at org.apache.kafka.streams.processor.internals.StreamTask.initializeIfNeeded(StreamTask.java:210)
at org.apache.kafka.streams.processor.internals.TaskManager.tryToCompleteRestoration(TaskManager.java:473)
at org.apache.kafka.streams.processor.internals.StreamThread.initializeAndRestorePhase(StreamThread.java:754)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:636)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:564)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:523)
有问题的代码
@Bean
public Function<KStream<String, String>, KStream<String, String>> streamApp() {
return kstream -> kstream
.peek((key, value) -> logger.info("streamApp triggered with key {}, value {}", key, value))
.groupByKey()
.reduce((s, v1) -> s + ", " + v1)
.toStream()
.flatMap((ignoredWindowedKey, value) -> getChanges(value));
}
正常工作代码:
@Bean
public Function<KStream<String, String>, KStream<String, String>> streamApp() {
return kstream -> kstream
.peek((key, value) -> logger.info("streamApp triggered with key {}, value {}", key, value))
.flatMap((ignoredWindowedKey, value) -> getChanges(value));
}
rocksDB 本身还不支持 Apple Silicon。
正在为 Apple Silicon 开发 rocksDB 版本,但截至今天 (2021-08-07) 尚未完成:https://github.com/facebook/rocksdb/issues/7720. In this GitHub issue, you can find a link to the code - also included here - of a working version of rocksDB on Apple Silicon, which you can use to build rocksDB yourself: https://github.com/adamretter/rocksdb/tree/macos-multi-arch
您还可以 运行 您的代码使用 x86 JDK 和 Rosetta 仿真,其中 rocksDB 应该 运行 正确。这个 Whosebug 答案可能会帮助您: