来自 Actor 的 Spark-Streaming
Spark-Streaming from an Actor
我想让消费者参与者订阅 Kafka 主题和流数据,以便在消费者外部使用 Spark Streaming 进行进一步处理。为什么是演员?因为我读到它的主管策略将是处理 Kafka 故障的好方法(例如,在失败时重新启动)。
我找到了两个选项:
- Java
KafkaConsumer
class:其poll()
方法returns一Map[String, Object]
。我希望像 KafkaUtils.createDirectStream
一样返回 DStream
,但我不知道如何从 actor 外部获取流。
- 扩展
ActorHelper
特性并使用 actorStream()
,如 example 所示。后一个选项不显示与主题的连接,而是与套接字的连接。
谁能给我指出正确的方向?
为了处理 Kafka 故障,我使用了 Apache Curator 框架和以下解决方法:
val client: CuratorFramework = ... // see docs
val zk: CuratorZookeeperClient = client.getZookeeperClient
/**
* This method returns false if kafka or zookeeper is down.
*/
def isKafkaAvailable:Boolean =
Try {
if (zk.isConnected) {
val xs = client.getChildren.forPath("/brokers/ids")
xs.size() > 0
}
else false
}.getOrElse(false)
为了使用 Kafka 主题,我使用了 com.softwaremill.reactivekafka
库。例如:
class KafkaConsumerActor extends Actor {
val kafka = new ReactiveKafka()
val config: ConsumerProperties[Array[Byte], Any] = ... // see docs
override def preStart(): Unit = {
super.preStart()
val publisher = kafka.consume(config)
Source.fromPublisher(publisher)
.map(handleKafkaRecord)
.to(Sink.ignore).run()
}
/**
* This method will be invoked when any kafka records will happen.
*/
def handleKafkaRecord(r: ConsumerRecord[Array[Byte], Any]) = {
// handle record
}
}
我想让消费者参与者订阅 Kafka 主题和流数据,以便在消费者外部使用 Spark Streaming 进行进一步处理。为什么是演员?因为我读到它的主管策略将是处理 Kafka 故障的好方法(例如,在失败时重新启动)。
我找到了两个选项:
- Java
KafkaConsumer
class:其poll()
方法returns一Map[String, Object]
。我希望像KafkaUtils.createDirectStream
一样返回DStream
,但我不知道如何从 actor 外部获取流。 - 扩展
ActorHelper
特性并使用actorStream()
,如 example 所示。后一个选项不显示与主题的连接,而是与套接字的连接。
谁能给我指出正确的方向?
为了处理 Kafka 故障,我使用了 Apache Curator 框架和以下解决方法:
val client: CuratorFramework = ... // see docs
val zk: CuratorZookeeperClient = client.getZookeeperClient
/**
* This method returns false if kafka or zookeeper is down.
*/
def isKafkaAvailable:Boolean =
Try {
if (zk.isConnected) {
val xs = client.getChildren.forPath("/brokers/ids")
xs.size() > 0
}
else false
}.getOrElse(false)
为了使用 Kafka 主题,我使用了 com.softwaremill.reactivekafka
库。例如:
class KafkaConsumerActor extends Actor {
val kafka = new ReactiveKafka()
val config: ConsumerProperties[Array[Byte], Any] = ... // see docs
override def preStart(): Unit = {
super.preStart()
val publisher = kafka.consume(config)
Source.fromPublisher(publisher)
.map(handleKafkaRecord)
.to(Sink.ignore).run()
}
/**
* This method will be invoked when any kafka records will happen.
*/
def handleKafkaRecord(r: ConsumerRecord[Array[Byte], Any]) = {
// handle record
}
}