如何使用spark streaming读取MQ中的消息,即ZeroMQ,RabbitMQ?
How to read messages in MQs using spark streaming,i.e ZeroMQ,RabbitMQ?
正如 spark 文档所说,它支持 kafka 作为数据流 source.but 我使用 ZeroMQ,但没有 ZeroMQUtils.so 我该如何使用它?一般来说,其他 MQ 怎么样。我是 spark 和 spark streaming 的新手,所以如果问题是 stupid.Could,我很抱歉有人给我一个 solution.Thanks
顺便说一句,我使用 python.
更新,我终于在 java 中用自定义接收器做到了。以下是我的解决方案
public class ZeroMQReceiver extends Receiver<T> {
private static final ObjectMapper mapper = new ObjectMapper();
public ZeroMQReceiver() {
super(StorageLevel.MEMORY_AND_DISK_2());
}
@Override
public void onStart() {
// Start the thread that receives data over a connection
new Thread(this::receive).start();
}
@Override
public void onStop() {
// There is nothing much to do as the thread calling receive()
// is designed to stop by itself if isStopped() returns false
}
/** Create a socket connection and receive data until receiver is stopped */
private void receive() {
String message = null;
try {
ZMQ.Context context = ZMQ.context(1);
ZMQ.Socket subscriber = context.socket(ZMQ.SUB);
subscriber.connect("tcp://ip:port");
subscriber.subscribe("".getBytes());
// Until stopped or connection broken continue reading
while (!isStopped() && (message = subscriber.recvStr()) != null) {
List<T> results = mapper.readValue(message,
new TypeReference<List<T>>(){} );
for (T item : results) {
store(item);
}
}
// Restart in an attempt to connect again when server is active again
restart("Trying to connect again");
} catch(Throwable t) {
// restart if there is any other error
restart("Error receiving data", t);
}
}
}
我假设你在谈论结构化流。
我不熟悉 ZeroMQ,但 Spark Structured Streaming 源中的一个重要点是可重放性(为了确保容错),如果我理解正确的话,ZeroMQ 不会提供现成的 -盒子.
一种实用的方法是在 Kafka 中缓冲数据并使用 KafkaSource 或作为(本地 FS/NFS、HDFS、S3)目录中的文件并使用 FileSource 进行读取。比照。 Spark Docs。如果您使用 FileSource,请确保不要将任何内容附加到 FileSource 的输入目录中的现有文件,而是将它们自动移动到目录中。
正如 spark 文档所说,它支持 kafka 作为数据流 source.but 我使用 ZeroMQ,但没有 ZeroMQUtils.so 我该如何使用它?一般来说,其他 MQ 怎么样。我是 spark 和 spark streaming 的新手,所以如果问题是 stupid.Could,我很抱歉有人给我一个 solution.Thanks 顺便说一句,我使用 python.
更新,我终于在 java 中用自定义接收器做到了。以下是我的解决方案
public class ZeroMQReceiver extends Receiver<T> {
private static final ObjectMapper mapper = new ObjectMapper();
public ZeroMQReceiver() {
super(StorageLevel.MEMORY_AND_DISK_2());
}
@Override
public void onStart() {
// Start the thread that receives data over a connection
new Thread(this::receive).start();
}
@Override
public void onStop() {
// There is nothing much to do as the thread calling receive()
// is designed to stop by itself if isStopped() returns false
}
/** Create a socket connection and receive data until receiver is stopped */
private void receive() {
String message = null;
try {
ZMQ.Context context = ZMQ.context(1);
ZMQ.Socket subscriber = context.socket(ZMQ.SUB);
subscriber.connect("tcp://ip:port");
subscriber.subscribe("".getBytes());
// Until stopped or connection broken continue reading
while (!isStopped() && (message = subscriber.recvStr()) != null) {
List<T> results = mapper.readValue(message,
new TypeReference<List<T>>(){} );
for (T item : results) {
store(item);
}
}
// Restart in an attempt to connect again when server is active again
restart("Trying to connect again");
} catch(Throwable t) {
// restart if there is any other error
restart("Error receiving data", t);
}
}
}
我假设你在谈论结构化流。
我不熟悉 ZeroMQ,但 Spark Structured Streaming 源中的一个重要点是可重放性(为了确保容错),如果我理解正确的话,ZeroMQ 不会提供现成的 -盒子.
一种实用的方法是在 Kafka 中缓冲数据并使用 KafkaSource 或作为(本地 FS/NFS、HDFS、S3)目录中的文件并使用 FileSource 进行读取。比照。 Spark Docs。如果您使用 FileSource,请确保不要将任何内容附加到 FileSource 的输入目录中的现有文件,而是将它们自动移动到目录中。