火花流式传输多个套接字源

spark streaming multiple sockets sources

我是 Spark 的新手。对于我的项目,我需要合并来自不同端口上不同流的数据。为了测试我做了一个练习,目的是打印来自不同端口的流的数据。下面你可以看到代码:

object hello {
  def main(args: Array[String]) {

    val ssc = new StreamingContext(new SparkConf(), Seconds(2))
    val lines9 = ssc.socketTextStream("localhost", 9999)
    val lines8 = ssc.socketTextStream("localhost", 9998)

    lines9.print()
    lines8.print()

    ssc.start()
    ssc.awaitTermination()
  }
}

比起我 运行 那些代码并启动 nc -lk 9999 和 nc -lk 9998。当我在端口 9999 上放置任何东西时,我看到 Spark 上的输出 - 工作正常。当我在 9998 上输入任何内容时,我看不到任何输出。

你能解释一下为什么 9998 上没有输出吗?我应该如何实现它来合并这两个流?

您可以使用这种(Dstream1.union(Dstream2))方法来组合您的 Dstream。它将 return 一个新的 Dstream。

好的,我找到了问题的答案。太简单了,我在这里 post 觉得很傻。

问题出在执行应用程序中。我是这样做的: ./spark-submit --class 你好 而正确的方法是: ./spark-submit --class hello --master local[2]

无论如何,感谢大家的贡献。

// 具有单一流上下文的完美工作的多套接字火花流示例。
// 在本地目录上有检查点

import org.apache.spark._

import org.apache.spark.streaming._

import org.apache.spark.streaming.{Seconds, StreamingContext}

var dir_path_opp="file:///Users/keeratjohar2305/Downloads/59e8f3c42ef0ee849a77-ef2360e85067356b16ebd3af2689db720a47963d/SIKANDRABAD_CP_TEST"

// 第一次创建批处理间隔为 3 秒的本地 StreamingContext

def functionToCreateContext(): StreamingContext = {


  val conf = new SparkConf().setAppName("NetworkWordCount").setMaster("local[*]")

  val ssc = new StreamingContext(conf, Seconds(3))   // new context

  val sc = ssc.sparkContext    // created just to reduce logging

  sc.setLogLevel("ERROR")

  ssc.checkpoint(dir_path_opp) 

  ssc

  }

// 如果检查点中不存在则创建流上下文

val sscc = StreamingContext.getOrCreate(dir_path_opp, ()=>functionToCreateContext())

//UpdateByKey保持状态

 def updatefunc(v: Seq[Int], rc: Option[Int]) = {

     val nc = v.sum + rc.getOrElse(0)

     new Some(nc)

  }

// 第一个流打开

val lines = sscc.socketTextStream("localhost",9999)

val words = lines.flatMap(_.split(" "))

val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)

var tolcnt= wordCounts.updateStateByKey(updatefunc _)

tolcnt.print()

// 第二个流打开

val lines1 = sscc.socketTextStream("localhost",6666)

val words1 = lines1.flatMap(_.split(" "))

val wordCounts1 = words1.map(x => (x, 1)).reduceByKey(_ + _)

wordCounts1.print()


sscc.start()

sscc.awaitTermination()    // add it if you want to program to run infinitaly.