KAFKA STREAM:Lib Rocks DB 上的 UnsatisfiedLinkError
KAFKA STREAM: UnsatisfiedLinkError on Lib Rocks DB
我正在尝试 kafka 流的字数统计问题。我将 Kafka 1.1.0 与 scala 版本 2.11.12 和 sbt 版本 1.1.4 一起使用。我收到以下错误:
Exception in thread "wordcount-application-d81ee069-9307-46f1-8e71-c9f777d2db64-StreamThread-1" java.lang.UnsatisfiedLinkError: C:\Users\user\AppData\Local\Temp\librocksdbjni5439068356048679315.dll: À¦¥Y
at java.lang.ClassLoader$NativeLibrary.load(Native Method)
at java.lang.ClassLoader.loadLibrary0(ClassLoader.java:1941)
at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1824)
at java.lang.Runtime.load0(Runtime.java:809)
at java.lang.System.load(System.java:1086)
at org.rocksdb.NativeLibraryLoader.loadLibraryFromJar(NativeLibraryLoader.java:78)
at org.rocksdb.NativeLibraryLoader.loadLibrary(NativeLibraryLoader.java:56)
at org.rocksdb.RocksDB.loadLibrary(RocksDB.java:64)
at org.rocksdb.RocksDB.<clinit>(RocksDB.java:35)
at org.rocksdb.Options.<clinit>(Options.java:25)
at org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:116)
at org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:167)
at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.init(ChangeLoggingKeyValueBytesStore.java:40)
at org.apache.kafka.streams.state.internals.CachingKeyValueStore.init(CachingKeyValueStore.java:63)
at org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.init(InnerMeteredKeyValueStore.java:160)
at org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.init(MeteredKeyValueBytesStore.java:102)
at org.apache.kafka.streams.processor.internals.AbstractTask.registerStateStores(AbstractTask.java:225)
at org.apache.kafka.streams.processor.internals.StreamTask.initializeStateStores(StreamTask.java:162)
at org.apache.kafka.streams.processor.internals.AssignedTasks.initializeNewTasks(AssignedTasks.java:88)
at org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:316)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:789)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:750)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:720)
我已经尝试过此处给出的解决方案 .
这是我在 scala 中尝试的代码。
object WordCountApplication {
def main(args: Array[String]) {
val config: Properties = {
val p = new Properties()
p.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application")
p.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
p.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass)
p.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass)
p
}
val builder: StreamsBuilder = new StreamsBuilder()
val textLines: KStream[String, String] = builder.stream("streams-plaintext-input")
val afterFlatMap: KStream[String, String] = textLines.flatMapValues(new ValueMapper[String,java.lang.Iterable[String]] {
override def apply(value: String): lang.Iterable[String] = value.split("\W+").toIterable.asJava
})
val afterGroupBy: KGroupedStream[String, String] = afterFlatMap.groupBy(new KeyValueMapper[String,String,String] {
override def apply(key: String, value: String): String = value
})
val wordCounts: KTable[String, Long] = afterGroupBy
.count(Materialized.as("counts-store").asInstanceOf[Materialized[String, Long, KeyValueStore[Bytes, Array[Byte]]]])
wordCounts.toStream().to("streams-wordcount-output ", Produced.`with`(Serdes.String(), Serdes.Long()))
val streams: KafkaStreams = new KafkaStreams(builder.build(), config)
streams.start()
Runtime.getRuntime.addShutdownHook(new Thread(
new Runnable{
override def run() = streams.close(10, TimeUnit.SECONDS)}
))
}
}
Build.sbt
name := "KafkaStreamDemo"
version := "0.1"
scalaVersion := "2.11.12"
libraryDependencies ++= Seq(
"org.apache.kafka" %% "kafka" % "1.1.0",
"org.apache.kafka" % "kafka-clients" % "1.1.0",
"org.apache.kafka" % "kafka-streams" % "1.1.0",
"ch.qos.logback" % "logback-classic" % "1.2.3"
)
如果有人遇到过这样的问题,请帮忙。
最后,我找到了有效的答案。我关注了 Unable to load rocksdbjni.
我做了两件对我有用的事情。
1) 我安装了 Visual C++ Redistributable for Visual Studio 2015.
2) 早些时候,我将 rocksdb 5.7.3 与 kafka-streams 1.1.0 一起使用(rocksdb 5.7.3 默认带有 kafka-streams 1.1.0)。我从 kafka-streams 依赖项中排除了 rocksdb 依赖项并安装了 rocksdb 5.3.6。作为参考,下面是我现在的built.sbt。
name := "KafkaStreamDemo"
version := "0.1"
scalaVersion := "2.12.5"
libraryDependencies ++= Seq(
"org.apache.kafka" %% "kafka" % "1.1.0",
"org.apache.kafka" % "kafka-clients" % "1.1.0",
"org.apache.kafka" % "kafka-streams" % "1.1.0" exclude("org.rocksdb","rocksdbjni"),
"ch.qos.logback" % "logback-classic" % "1.2.3",
"org.rocksdb" % "rocksdbjni" % "5.3.6"
)
希望对某人有所帮助。
谢谢
在我的例子中,我使用的是 kafka-streams:1.0.2。将基础 docker 图像从 alpine-jdk8:latest 更改为 openjdk:8-jre 有效。
这个 link - https://github.com/docker-flink/docker-flink/pull/22 帮助我找到了这个解决方案。
我正在尝试 kafka 流的字数统计问题。我将 Kafka 1.1.0 与 scala 版本 2.11.12 和 sbt 版本 1.1.4 一起使用。我收到以下错误:
Exception in thread "wordcount-application-d81ee069-9307-46f1-8e71-c9f777d2db64-StreamThread-1" java.lang.UnsatisfiedLinkError: C:\Users\user\AppData\Local\Temp\librocksdbjni5439068356048679315.dll: À¦¥Y
at java.lang.ClassLoader$NativeLibrary.load(Native Method)
at java.lang.ClassLoader.loadLibrary0(ClassLoader.java:1941)
at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1824)
at java.lang.Runtime.load0(Runtime.java:809)
at java.lang.System.load(System.java:1086)
at org.rocksdb.NativeLibraryLoader.loadLibraryFromJar(NativeLibraryLoader.java:78)
at org.rocksdb.NativeLibraryLoader.loadLibrary(NativeLibraryLoader.java:56)
at org.rocksdb.RocksDB.loadLibrary(RocksDB.java:64)
at org.rocksdb.RocksDB.<clinit>(RocksDB.java:35)
at org.rocksdb.Options.<clinit>(Options.java:25)
at org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:116)
at org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:167)
at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.init(ChangeLoggingKeyValueBytesStore.java:40)
at org.apache.kafka.streams.state.internals.CachingKeyValueStore.init(CachingKeyValueStore.java:63)
at org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.init(InnerMeteredKeyValueStore.java:160)
at org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.init(MeteredKeyValueBytesStore.java:102)
at org.apache.kafka.streams.processor.internals.AbstractTask.registerStateStores(AbstractTask.java:225)
at org.apache.kafka.streams.processor.internals.StreamTask.initializeStateStores(StreamTask.java:162)
at org.apache.kafka.streams.processor.internals.AssignedTasks.initializeNewTasks(AssignedTasks.java:88)
at org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:316)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:789)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:750)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:720)
我已经尝试过此处给出的解决方案
这是我在 scala 中尝试的代码。
object WordCountApplication {
def main(args: Array[String]) {
val config: Properties = {
val p = new Properties()
p.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application")
p.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
p.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass)
p.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass)
p
}
val builder: StreamsBuilder = new StreamsBuilder()
val textLines: KStream[String, String] = builder.stream("streams-plaintext-input")
val afterFlatMap: KStream[String, String] = textLines.flatMapValues(new ValueMapper[String,java.lang.Iterable[String]] {
override def apply(value: String): lang.Iterable[String] = value.split("\W+").toIterable.asJava
})
val afterGroupBy: KGroupedStream[String, String] = afterFlatMap.groupBy(new KeyValueMapper[String,String,String] {
override def apply(key: String, value: String): String = value
})
val wordCounts: KTable[String, Long] = afterGroupBy
.count(Materialized.as("counts-store").asInstanceOf[Materialized[String, Long, KeyValueStore[Bytes, Array[Byte]]]])
wordCounts.toStream().to("streams-wordcount-output ", Produced.`with`(Serdes.String(), Serdes.Long()))
val streams: KafkaStreams = new KafkaStreams(builder.build(), config)
streams.start()
Runtime.getRuntime.addShutdownHook(new Thread(
new Runnable{
override def run() = streams.close(10, TimeUnit.SECONDS)}
))
}
}
Build.sbt
name := "KafkaStreamDemo"
version := "0.1"
scalaVersion := "2.11.12"
libraryDependencies ++= Seq(
"org.apache.kafka" %% "kafka" % "1.1.0",
"org.apache.kafka" % "kafka-clients" % "1.1.0",
"org.apache.kafka" % "kafka-streams" % "1.1.0",
"ch.qos.logback" % "logback-classic" % "1.2.3"
)
如果有人遇到过这样的问题,请帮忙。
最后,我找到了有效的答案。我关注了 Unable to load rocksdbjni.
我做了两件对我有用的事情。
1) 我安装了 Visual C++ Redistributable for Visual Studio 2015.
2) 早些时候,我将 rocksdb 5.7.3 与 kafka-streams 1.1.0 一起使用(rocksdb 5.7.3 默认带有 kafka-streams 1.1.0)。我从 kafka-streams 依赖项中排除了 rocksdb 依赖项并安装了 rocksdb 5.3.6。作为参考,下面是我现在的built.sbt。
name := "KafkaStreamDemo"
version := "0.1"
scalaVersion := "2.12.5"
libraryDependencies ++= Seq(
"org.apache.kafka" %% "kafka" % "1.1.0",
"org.apache.kafka" % "kafka-clients" % "1.1.0",
"org.apache.kafka" % "kafka-streams" % "1.1.0" exclude("org.rocksdb","rocksdbjni"),
"ch.qos.logback" % "logback-classic" % "1.2.3",
"org.rocksdb" % "rocksdbjni" % "5.3.6"
)
希望对某人有所帮助。
谢谢
在我的例子中,我使用的是 kafka-streams:1.0.2。将基础 docker 图像从 alpine-jdk8:latest 更改为 openjdk:8-jre 有效。
这个 link - https://github.com/docker-flink/docker-flink/pull/22 帮助我找到了这个解决方案。