Flume 使用 Scala 汇入 Spark
Flume Sink to Spark using Scala
我的Flume配置
source_agent.sources = tail
source_agent.sources.tail.type = exec
source_agent.sources.tail.command = python loggen.py
source_agent.sources.tail.batchSize = 1
source_agent.sources.tail.channels = memoryChannel
#memory-channel
source_agent.channels = memoryChannel
source_agent.channels.memoryChannel.type = memory
source_agent.channels.memoryChannel.capacity = 10000
source_agent.channels.memoryChannel.transactionCapacity=10000
source_agent.channels.memoryChannel.byteCapacityBufferPercentage = 20
source_agent.channels.memoryChannel.byteCapacity = 800000
# Send to Flume Collector on saprk sink
source_agent.sinks = spark
source_agent.sinks.spark.type=org.apache.spark.streaming.flume.sink.SparkSink
source_agent.sinks.spark.batchSize=100
source_agent.sinks.spark.channel = memoryChannel
source_agent.sinks.spark.hostname=localhost
source_agent.sinks.spark.port=1234
我的 Spark-Scala 代码
package com.thanga.twtsteam
import org.apache.spark.streaming.flume._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.SparkConf
object SampleStream {
def main(args: Array[String]) {
val conf = new SparkConf().setMaster("local[2]").setAppName("SampleStream")
val ssc = new StreamingContext(conf, Seconds(1))
val flumeStream = FlumeUtils.createPollingStream(ssc, "localhost", 1234)
ssc.stop()
}
}
我正在使用 SBT 构建 Jar 我的 SBT 配置如下:
name := "Flume"
version := "1.0"
scalaVersion := "2.10.4"
publishMavenStyle := true
libraryDependencies += "org.apache.spark" % "spark-core_2.10" % "1.4.1"
libraryDependencies += "org.apache.spark" % "spark-streaming_2.10" % "1.4.1"
libraryDependencies += "org.apache.spark" % "spark-streaming-flume_2.10" % "1.4.1"
libraryDependencies += "org.apache.spark" % "spark-streaming-flume-sink_2.10" % "1.4.1"
libraryDependencies += "org.scala-lang" % "scala-library" % "2.10.4"
resolvers += "Akka Repository" at "http://repo.akka.io/releases/"
问题是现在我可以毫无错误地构建我的 jar,但是 运行 我收到以下错误:
16/04/11 19:52:56 INFO BlockManagerMaster: Registered BlockManager
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/spark/streaming/flume/FlumeUtils$
at com.thagna.twtsteam.SampleStream$.main(SampleStream.scala:10)
at com.thanga.twtsteam.SampleStream.main(SampleStream.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:665)
at org.apache.spark.deploy.SparkSubmit$.doRunMain(SparkSubmit.scala:170)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:193)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:112)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.streaming.flume.FlumeUtils$
at java.net.URLClassLoader.run(URLClassLoader.java:366)
at java.net.URLClassLoader.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
... 11 more
16/04/11 19:52:56 INFO SparkContext: Invoking stop() from shutdown hook
谁能帮忙解决
将此添加到您的构建中以消除此错误:
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-flume_2.10 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-flume_2.10</artifactId>
<version>2.0.0</version>
</dependency>
如果你是 运行 使用 spark-submit
的工作,你可以使用 --jars 选项
例如:
spark-submit --jars ....../lib/spark-streaming_2.10-1.2.1.2.2.6.0-2800.jar
或
将此添加到您的 SBT 配置中
libraryDependencies += "org.apache.spark" %% "spark-streaming-flume" % "2.1.0"
https://spark.apache.org/docs/latest/streaming-flume-integration.html
我的Flume配置
source_agent.sources = tail
source_agent.sources.tail.type = exec
source_agent.sources.tail.command = python loggen.py
source_agent.sources.tail.batchSize = 1
source_agent.sources.tail.channels = memoryChannel
#memory-channel
source_agent.channels = memoryChannel
source_agent.channels.memoryChannel.type = memory
source_agent.channels.memoryChannel.capacity = 10000
source_agent.channels.memoryChannel.transactionCapacity=10000
source_agent.channels.memoryChannel.byteCapacityBufferPercentage = 20
source_agent.channels.memoryChannel.byteCapacity = 800000
# Send to Flume Collector on saprk sink
source_agent.sinks = spark
source_agent.sinks.spark.type=org.apache.spark.streaming.flume.sink.SparkSink
source_agent.sinks.spark.batchSize=100
source_agent.sinks.spark.channel = memoryChannel
source_agent.sinks.spark.hostname=localhost
source_agent.sinks.spark.port=1234
我的 Spark-Scala 代码
package com.thanga.twtsteam
import org.apache.spark.streaming.flume._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.SparkConf
object SampleStream {
def main(args: Array[String]) {
val conf = new SparkConf().setMaster("local[2]").setAppName("SampleStream")
val ssc = new StreamingContext(conf, Seconds(1))
val flumeStream = FlumeUtils.createPollingStream(ssc, "localhost", 1234)
ssc.stop()
}
}
我正在使用 SBT 构建 Jar 我的 SBT 配置如下:
name := "Flume"
version := "1.0"
scalaVersion := "2.10.4"
publishMavenStyle := true
libraryDependencies += "org.apache.spark" % "spark-core_2.10" % "1.4.1"
libraryDependencies += "org.apache.spark" % "spark-streaming_2.10" % "1.4.1"
libraryDependencies += "org.apache.spark" % "spark-streaming-flume_2.10" % "1.4.1"
libraryDependencies += "org.apache.spark" % "spark-streaming-flume-sink_2.10" % "1.4.1"
libraryDependencies += "org.scala-lang" % "scala-library" % "2.10.4"
resolvers += "Akka Repository" at "http://repo.akka.io/releases/"
问题是现在我可以毫无错误地构建我的 jar,但是 运行 我收到以下错误:
16/04/11 19:52:56 INFO BlockManagerMaster: Registered BlockManager
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/spark/streaming/flume/FlumeUtils$
at com.thagna.twtsteam.SampleStream$.main(SampleStream.scala:10)
at com.thanga.twtsteam.SampleStream.main(SampleStream.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:665)
at org.apache.spark.deploy.SparkSubmit$.doRunMain(SparkSubmit.scala:170)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:193)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:112)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.streaming.flume.FlumeUtils$
at java.net.URLClassLoader.run(URLClassLoader.java:366)
at java.net.URLClassLoader.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
... 11 more
16/04/11 19:52:56 INFO SparkContext: Invoking stop() from shutdown hook
谁能帮忙解决
将此添加到您的构建中以消除此错误:
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-flume_2.10 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-flume_2.10</artifactId>
<version>2.0.0</version>
</dependency>
如果你是 运行 使用 spark-submit
的工作,你可以使用 --jars 选项例如:
spark-submit --jars ....../lib/spark-streaming_2.10-1.2.1.2.2.6.0-2800.jar
或 将此添加到您的 SBT 配置中
libraryDependencies += "org.apache.spark" %% "spark-streaming-flume" % "2.1.0"
https://spark.apache.org/docs/latest/streaming-flume-integration.html