试图让火花流从网站读取数据流,套接字是什么?
Trying to get spark streaming to read data stream from website, what is the socket?
我正在尝试将这些数据 http://stream.meetup.com/2/rsvps 放入 spark 流中
它们是 JSON 个对象,我知道这些行将是字符串,我只是希望它在我尝试 JSON 之前工作。
我不确定应该把什么作为端口,我想这就是问题所在。
SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("Spark Streaming");
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));
JavaReceiverInputDStream<String> lines = jssc.socketTextStream("http://stream.meetup.com/2/rsvps", 80);
lines.print();
jssc.start();
jssc.awaitTermination();
这是我的错误
java.net.UnknownHostException: http://stream.meetup.com/2/rsvps
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:178)
at java.net.PlainSocketImpl.connect(PlainSocketImpl.java:172)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.net.Socket.connect(Socket.java:579)
at java.net.Socket.connect(Socket.java:528)
at java.net.Socket.<init>(Socket.java:425)
at java.net.Socket.<init>(Socket.java:208)
socketTextStream 未设计为用作 http 客户端。正如您所注意到的,您将需要创建一个自定义接收器,一个可能的起点是基于作为聚会流数据源的一部分创建的接收器(请参阅 https://github.com/actions/meetup-stream/blob/master/src/main/scala/receiver/MeetupReceiver.scala)。
这是一个自定义 UrlReceiver,它遵循有关自定义接收器的 spark 文档:
class UrlReceiver(urlStr: String) extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) with Logging {
override def onStart() = {
new Thread("Url Receiver") {
override def run() = {
val urlConnection: URLConnection = new URL(urlStr).openConnection
val bufferedReader: BufferedReader = new BufferedReader(
new InputStreamReader(urlConnection.getInputStream)
)
var msg = bufferedReader.readLine
while (msg != null) {
if (!msg.isEmpty) {
store(msg)
}
msg = bufferedReader.readLine
}
bufferedReader.close()
}
}.start()
}
override def onStop() = {
// nothing to do
}
}
然后像这样使用它:
val lines = sc.receiverStream(new UrlReceiver("http://stream.meetup.com/2/rsvps"))
我正在尝试将这些数据 http://stream.meetup.com/2/rsvps 放入 spark 流中
它们是 JSON 个对象,我知道这些行将是字符串,我只是希望它在我尝试 JSON 之前工作。
我不确定应该把什么作为端口,我想这就是问题所在。
SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("Spark Streaming");
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));
JavaReceiverInputDStream<String> lines = jssc.socketTextStream("http://stream.meetup.com/2/rsvps", 80);
lines.print();
jssc.start();
jssc.awaitTermination();
这是我的错误
java.net.UnknownHostException: http://stream.meetup.com/2/rsvps
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:178)
at java.net.PlainSocketImpl.connect(PlainSocketImpl.java:172)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.net.Socket.connect(Socket.java:579)
at java.net.Socket.connect(Socket.java:528)
at java.net.Socket.<init>(Socket.java:425)
at java.net.Socket.<init>(Socket.java:208)
socketTextStream 未设计为用作 http 客户端。正如您所注意到的,您将需要创建一个自定义接收器,一个可能的起点是基于作为聚会流数据源的一部分创建的接收器(请参阅 https://github.com/actions/meetup-stream/blob/master/src/main/scala/receiver/MeetupReceiver.scala)。
这是一个自定义 UrlReceiver,它遵循有关自定义接收器的 spark 文档:
class UrlReceiver(urlStr: String) extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) with Logging {
override def onStart() = {
new Thread("Url Receiver") {
override def run() = {
val urlConnection: URLConnection = new URL(urlStr).openConnection
val bufferedReader: BufferedReader = new BufferedReader(
new InputStreamReader(urlConnection.getInputStream)
)
var msg = bufferedReader.readLine
while (msg != null) {
if (!msg.isEmpty) {
store(msg)
}
msg = bufferedReader.readLine
}
bufferedReader.close()
}
}.start()
}
override def onStop() = {
// nothing to do
}
}
然后像这样使用它:
val lines = sc.receiverStream(new UrlReceiver("http://stream.meetup.com/2/rsvps"))