火花流式传输多个套接字源
spark streaming multiple sockets sources
我是 Spark 的新手。对于我的项目,我需要合并来自不同端口上不同流的数据。为了测试我做了一个练习,目的是打印来自不同端口的流的数据。下面你可以看到代码:
object hello {
def main(args: Array[String]) {
val ssc = new StreamingContext(new SparkConf(), Seconds(2))
val lines9 = ssc.socketTextStream("localhost", 9999)
val lines8 = ssc.socketTextStream("localhost", 9998)
lines9.print()
lines8.print()
ssc.start()
ssc.awaitTermination()
}
}
比起我 运行 那些代码并启动 nc -lk 9999 和 nc -lk 9998。当我在端口 9999 上放置任何东西时,我看到 Spark 上的输出 - 工作正常。当我在 9998 上输入任何内容时,我看不到任何输出。
你能解释一下为什么 9998 上没有输出吗?我应该如何实现它来合并这两个流?
您可以使用这种(Dstream1.union(Dstream2)
)方法来组合您的 Dstream。它将 return 一个新的 Dstream。
好的,我找到了问题的答案。太简单了,我在这里 post 觉得很傻。
问题出在执行应用程序中。我是这样做的:
./spark-submit --class 你好
而正确的方法是:
./spark-submit --class hello --master local[2]
无论如何,感谢大家的贡献。
// 具有单一流上下文的完美工作的多套接字火花流示例。
// 在本地目录上有检查点
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.{Seconds, StreamingContext}
var dir_path_opp="file:///Users/keeratjohar2305/Downloads/59e8f3c42ef0ee849a77-ef2360e85067356b16ebd3af2689db720a47963d/SIKANDRABAD_CP_TEST"
// 第一次创建批处理间隔为 3 秒的本地 StreamingContext
def functionToCreateContext(): StreamingContext = {
val conf = new SparkConf().setAppName("NetworkWordCount").setMaster("local[*]")
val ssc = new StreamingContext(conf, Seconds(3)) // new context
val sc = ssc.sparkContext // created just to reduce logging
sc.setLogLevel("ERROR")
ssc.checkpoint(dir_path_opp)
ssc
}
// 如果检查点中不存在则创建流上下文
val sscc = StreamingContext.getOrCreate(dir_path_opp, ()=>functionToCreateContext())
//UpdateByKey保持状态
def updatefunc(v: Seq[Int], rc: Option[Int]) = {
val nc = v.sum + rc.getOrElse(0)
new Some(nc)
}
// 第一个流打开
val lines = sscc.socketTextStream("localhost",9999)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
var tolcnt= wordCounts.updateStateByKey(updatefunc _)
tolcnt.print()
// 第二个流打开
val lines1 = sscc.socketTextStream("localhost",6666)
val words1 = lines1.flatMap(_.split(" "))
val wordCounts1 = words1.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts1.print()
sscc.start()
sscc.awaitTermination() // add it if you want to program to run infinitaly.
我是 Spark 的新手。对于我的项目,我需要合并来自不同端口上不同流的数据。为了测试我做了一个练习,目的是打印来自不同端口的流的数据。下面你可以看到代码:
object hello {
def main(args: Array[String]) {
val ssc = new StreamingContext(new SparkConf(), Seconds(2))
val lines9 = ssc.socketTextStream("localhost", 9999)
val lines8 = ssc.socketTextStream("localhost", 9998)
lines9.print()
lines8.print()
ssc.start()
ssc.awaitTermination()
}
}
比起我 运行 那些代码并启动 nc -lk 9999 和 nc -lk 9998。当我在端口 9999 上放置任何东西时,我看到 Spark 上的输出 - 工作正常。当我在 9998 上输入任何内容时,我看不到任何输出。
你能解释一下为什么 9998 上没有输出吗?我应该如何实现它来合并这两个流?
您可以使用这种(Dstream1.union(Dstream2)
)方法来组合您的 Dstream。它将 return 一个新的 Dstream。
好的,我找到了问题的答案。太简单了,我在这里 post 觉得很傻。
问题出在执行应用程序中。我是这样做的: ./spark-submit --class 你好 而正确的方法是: ./spark-submit --class hello --master local[2]
无论如何,感谢大家的贡献。
// 具有单一流上下文的完美工作的多套接字火花流示例。
// 在本地目录上有检查点
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.{Seconds, StreamingContext}
var dir_path_opp="file:///Users/keeratjohar2305/Downloads/59e8f3c42ef0ee849a77-ef2360e85067356b16ebd3af2689db720a47963d/SIKANDRABAD_CP_TEST"
// 第一次创建批处理间隔为 3 秒的本地 StreamingContext
def functionToCreateContext(): StreamingContext = {
val conf = new SparkConf().setAppName("NetworkWordCount").setMaster("local[*]")
val ssc = new StreamingContext(conf, Seconds(3)) // new context
val sc = ssc.sparkContext // created just to reduce logging
sc.setLogLevel("ERROR")
ssc.checkpoint(dir_path_opp)
ssc
}
// 如果检查点中不存在则创建流上下文
val sscc = StreamingContext.getOrCreate(dir_path_opp, ()=>functionToCreateContext())
//UpdateByKey保持状态
def updatefunc(v: Seq[Int], rc: Option[Int]) = {
val nc = v.sum + rc.getOrElse(0)
new Some(nc)
}
// 第一个流打开
val lines = sscc.socketTextStream("localhost",9999)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
var tolcnt= wordCounts.updateStateByKey(updatefunc _)
tolcnt.print()
// 第二个流打开
val lines1 = sscc.socketTextStream("localhost",6666)
val words1 = lines1.flatMap(_.split(" "))
val wordCounts1 = words1.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts1.print()
sscc.start()
sscc.awaitTermination() // add it if you want to program to run infinitaly.