从几个主题阅读
reading from several topics
我正在尝试开发一个应用程序,它从 kafka 服务器获取四个不同的主题,并对每个主题采取特定的操作。
我已经创建了一个 class 来接收 DStream 并且有一个应该转换 DStream 的方法。
例如,处理程序 class:
class StreamHandler(stream:DStream[String]) {
val stream:DStream[String] = stream
def doActions():DStream[String] = {
//Do smth. to DStream
}
}
现在,假设我从主程序 class 为我想要的每个处理程序调用 doActions() class,它会随着每个到达的 DStream 重复还是只重复一次?
val topicHandler1 = new StreamHandler(KafkaUtils.createStream(ssc, zkQuorum, "myGroup", Map("topic1"->1)).map(_._2)
val topicHandler2 = new OtherStreamHandler(KafkaUtils.createStream(ssc, zkQuorum, "myGroup", Map("topic2"->1)).map(_._2)
topicHandler1.doActions()
topicHandler2 .doActions()
ssc.start()
有没有更好的方法?
在 StreamHandler
上声明的转换将应用于每个批次的 DStream。目前的代码还很不完整,无法给你一个确定的答案。在 DStream 转换管道中,您将需要一个 action that materializes the DStream,否则什么也不会发生。
关于该方法,一个接受 DStream 并对其应用转换的函数就足够了并且易于测试:
val pipeline:DStream[Data] => () = dstream =>
dstream.map(...).filter(...).print()
目前看来,class 建筑的购买量并不大。
我正在尝试开发一个应用程序,它从 kafka 服务器获取四个不同的主题,并对每个主题采取特定的操作。
我已经创建了一个 class 来接收 DStream 并且有一个应该转换 DStream 的方法。
例如,处理程序 class:
class StreamHandler(stream:DStream[String]) {
val stream:DStream[String] = stream
def doActions():DStream[String] = {
//Do smth. to DStream
}
}
现在,假设我从主程序 class 为我想要的每个处理程序调用 doActions() class,它会随着每个到达的 DStream 重复还是只重复一次?
val topicHandler1 = new StreamHandler(KafkaUtils.createStream(ssc, zkQuorum, "myGroup", Map("topic1"->1)).map(_._2)
val topicHandler2 = new OtherStreamHandler(KafkaUtils.createStream(ssc, zkQuorum, "myGroup", Map("topic2"->1)).map(_._2)
topicHandler1.doActions()
topicHandler2 .doActions()
ssc.start()
有没有更好的方法?
在 StreamHandler
上声明的转换将应用于每个批次的 DStream。目前的代码还很不完整,无法给你一个确定的答案。在 DStream 转换管道中,您将需要一个 action that materializes the DStream,否则什么也不会发生。
关于该方法,一个接受 DStream 并对其应用转换的函数就足够了并且易于测试:
val pipeline:DStream[Data] => () = dstream =>
dstream.map(...).filter(...).print()
目前看来,class 建筑的购买量并不大。