火花流 rawSocketStream
Spark streaming rawSocketStream
我正在尝试 spark streaming 和监听套接字,我正在使用 rawSocketStream 方法创建接收器和 DStream。但是当我打印 DStream 时,出现以下异常。
创建 DStream 的代码:
JavaSparkContext jsc = new JavaSparkContext("Master", "app");
JavaStreamingContext jssc = new JavaStreamingContext(jsc, new Seconds(3));
JavaReceiverInputDStream<Object> rawStream = jssc.rawSocketStream("localhost", 9999);
log.info(tracePrefix + "Created the stream ...");
rawStream.print();
jssc.start();
jssc.awaitTermination();
通过 TCP 连接发送 protobug object 的代码:
FileInputStream input = new FileInputStream("address_book");
AddressBook book = AddressBookProtos.AddressBook.parseFrom(input);
log.info(tracePrefix + "Size of contacts: " + book.getPersonList().size());
ServerSocket serverSocket = new ServerSocket(9999);
log.info(tracePrefix + "Waiting for connections ...");
Socket s1 = serverSocket.accept();
log.info(tracePrefix + "Accepted a connection ...");
while(true) {
Thread.sleep(3000);
ObjectOutputStream out = new ObjectOutputStream(s1.getOutputStream());
out.writeByte(book.getSerializedSize());
out.write(book.toByteArray());
out.flush();
log.info(tracePrefix + "Written to new socket");
}
堆栈跟踪如下所示:
java.lang.IllegalArgumentException
at java.nio.ByteBuffer.allocate(ByteBuffer.java:334)
at org.apache.spark.streaming.dstream.RawNetworkReceiver.onStart(RawInputDStream.scala:88)
at org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:148)
at org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:130)
at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun.apply(ReceiverTracker.scala:575)
at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun.apply(ReceiverTracker.scala:565)
at org.apache.spark.SparkContext$$anonfun.apply(SparkContext.scala:1992)
at org.apache.spark.SparkContext$$anonfun.apply(SparkContext.scala:1992)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
2016-04-02 07:45:47,607 ERROR [Executor task launch worker-0] org.apache.spark.streaming.receiver.ReceiverSupervisorImpl
Stopped receiver with error: java.lang.IllegalArgumentException
2016-04-02 07:45:47,613 ERROR [Executor task launch worker-0] org.apache.spark.executor.Executor
Exception in task 0.0 in stage 0.0 (TID 0)
java.lang.IllegalArgumentException
at java.nio.ByteBuffer.allocate(ByteBuffer.java:334)
at org.apache.spark.streaming.dstream.RawNetworkReceiver.onStart(RawInputDStream.scala:88)
at org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:148)
at org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:130)
at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun.apply(ReceiverTracker.scala:575)
at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun.apply(ReceiverTracker.scala:565)
at org.apache.spark.SparkContext$$anonfun.apply(SparkContext.scala:1992)
at org.apache.spark.SparkContext$$anonfun.apply(SparkContext.scala:1992)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
2016-04-02 07:45:47,646 ERROR [task-result-getter-0] org.apache.spark.scheduler.TaskSetManager
Task 0 in stage 0.0 failed 1 times; aborting job
2016-04-02 07:45:47,656 ERROR [submit-job-thread-pool-0] org.apache.spark.streaming.scheduler.ReceiverTracker
Receiver has been stopped. Try to restart it.
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost): java.lang.IllegalArgumentException
at java.nio.ByteBuffer.allocate(ByteBuffer.java:334)
at org.apache.spark.streaming.dstream.RawNetworkReceiver.onStart(RawInputDStream.scala:88)
at org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:148)
at org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:130)
at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun.apply(ReceiverTracker.scala:575)
at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun.apply(ReceiverTracker.scala:565)
at org.apache.spark.SparkContext$$anonfun.apply(SparkContext.scala:1992)
at org.apache.spark.SparkContext$$anonfun.apply(SparkContext.scala:1992)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage.apply(DAGScheduler.scala:1419)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage.apply(DAGScheduler.scala:1418)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed.apply(DAGScheduler.scala:799)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed.apply(DAGScheduler.scala:799)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
at org.apache.spark.util.EventLoop$$anon.run(EventLoop.scala:48)
Caused by: java.lang.IllegalArgumentException
at java.nio.ByteBuffer.allocate(ByteBuffer.java:334)
at org.apache.spark.streaming.dstream.RawNetworkReceiver.onStart(RawInputDStream.scala:88)
at org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:148)
at org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:130)
at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun.apply(ReceiverTracker.scala:575)
at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun.apply(ReceiverTracker.scala:565)
at org.apache.spark.SparkContext$$anonfun.apply(SparkContext.scala:1992)
at org.apache.spark.SparkContext$$anonfun.apply(SparkContext.scala:1992)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
工作代码
通过 TCP
发送 protobuf object 的代码
ServerSocket serverSocket = new ServerSocket(9999);
log.info(tracePrefix + "Waiting for connections ...");
Socket s1 = serverSocket.accept();
log.info(tracePrefix + "Accepted a connection ...");
while(true) {
Thread.sleep(3000);
DataOutputStream out = new DataOutputStream(s1.getOutputStream());
byte[] bytes = book.toByteArray();
log.info(tracePrefix + "Serialized size: " + book.getSerializedSize());
out.writeInt(book.getSerializedSize());
log.info(tracePrefix + "Sending bytes: " + Arrays.toString(bytes));
out.write(bytes);
// out.write("hello world !".getBytes());
out.flush();
log.info(tracePrefix + "Written to new socket");
}
创建接收器和 DStream 的代码
JavaReceiverInputDStream<GeneratedMessage> rawStream = jssc.receiverStream(new JavaSocketReceiver("localhost", 9999));
log.info(tracePrefix + "Created the stream ...");
rawStream.print();
private static class JavaSocketReceiver extends Receiver<GeneratedMessage> {
/**
*
*/
private static final long serialVersionUID = -958378677169958045L;
String host = null;
int port = -1;
JavaSocketReceiver(String host_, int port_) {
super(StorageLevel.MEMORY_AND_DISK());
host = host_;
port = port_;
}
@Override
public void onStart() {
new Thread() {
@Override
public void run() {
receive();
}
}.start();
}
@Override
public void onStop() {
}
private void receive() {
try {
Socket socket = null;
ObjectInputStream in = null;
try {
// Open a socket to the target address and keep reading from
// it
log.info(tracePrefix + "Connecting to " + host + ":" + port);
SocketChannel channel = SocketChannel.open();
channel.configureBlocking(true);
channel.connect(new InetSocketAddress(host, port));
log.info(tracePrefix + "Connected to " + host + ":" + port);
ArrayBlockingQueue<ByteBuffer> queue = new ArrayBlockingQueue<>(2);
Thread blockPushingThread = new Thread(new Runnable() {
@Override
public void run() {
int nextBlockNumber = 0;
while (true) {
try {
ByteBuffer buffer = queue.take();
nextBlockNumber += 1;
AddressBook book = AddressBook.parseFrom(buffer.array());
// log.info(tracePrefix + "Got back the object: " + book);
store(book);
} catch (InterruptedException ie) {
log.error(tracePrefix + "Failed processing data", ie);
} catch (Throwable t) {
log.error(tracePrefix + "Failed processing data", t);
}
}
}
});
blockPushingThread.setDaemon(true);
blockPushingThread.start();
ByteBuffer lengthBuffer = ByteBuffer.allocate(4);
while (true) {
lengthBuffer.clear();
readFully(channel, lengthBuffer);
lengthBuffer.flip();
int length = lengthBuffer.getInt();
// log.info(tracePrefix + "The length read: " + length);
ByteBuffer dataBuffer = ByteBuffer.allocate(length);
readFully(channel, dataBuffer);
dataBuffer.flip();
// log.info(tracePrefix + "Read a block with " + length + " bytes");
queue.put(dataBuffer);
}
} finally {
Closeables.close(in, /* swallowIOException = */ true);
Closeables.close(socket, /* swallowIOException = */ true);
}
} catch (ConnectException ce) {
ce.printStackTrace();
restart("Could not connect", ce);
} catch (Throwable t) {
t.printStackTrace();
restart("Error receiving data", t);
}
}
private void readFully(ReadableByteChannel channel, ByteBuffer dest) {
while (dest.position() < dest.limit()) {
try {
if (channel.read(dest) == -1) {
throw new EOFException("End of channel");
}
} catch (IOException e) {
log.error(tracePrefix + "Failed reading from channel: " + channel, e);
}
}
}
}
上面的JavaSocketReceiver取自spark streaming模块的rawSocketStream。在我发送字节的客户端代码中,如果我将 DataOutputStream 更改为 ObjectOutputStream,我会得到一个损坏的 header 异常,而在 Streaming 代码中,如果我使用内置的 rawSocketStream 来监听传入的数据包,那么我会得到一个ByteBuffer 的 IllegalArgumentException (334)
如果我们查看 ByteBuffer 文档,IllegalArgumentException
只能由尝试分配负缓冲区大小引起。
RawInputDStream
协议需要一个整数大小字段,后跟相应的有效负载。该字段是 4-byte Integer。
问题中显示的sender程序:
out.writeByte(book.getSerializedSize());
正在将整数大小写为一个字节。因此,当读取端尝试解码有效负载大小时,它将读取损坏的字段,因为它将将该字节与有效负载的一些信息组合在一起,解码后会产生负整数。
解决方案应该是写一个 4 字节(32 位)整数来代替:
out.writeInt(book.getSerializedSize());
我一直在使用 JavaStreamingContext class 的 rawSocketStream 方法调查类似的问题。在我的例子中,我想发送和接收 byte[] 数据,我最终让它工作了。
关于原始问题,您可以实现 objective 使用 rawSocketStream 接收数据而无需编写自定义 Receiver。该解决方案基于您如何在发送端使用 ObjectOutputStream。正如 Lokesh 发现的那样,使用 Socket 输出流构造 ObjectOutputStream object 将导致 IllegalArgumentException。但是,如果发送方像这样构造套接字和流,则不会发生异常:
ServerSocket serverSocket = new ServerSocket(9999);
Socket clientSocket = serverSocket.accept();
OutputStream outputStream = clientSocket.getOutputStream();
ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(bos);
请注意,我将 ByteArrayOutputStream object 传递给了 ObjectOutputStream 构造函数。原始方法的根本问题是 ObjectOutputStream 的构造函数将序列化流 header 写入基础流 (https://docs.oracle.com/javase/8/docs/api/java/io/ObjectOutputStream.html#ObjectOutputStream-java.io.OutputStream-)。如果底层流是 clientSocket 输出流,那么 Spark 端的原始套接字流将在连接时立即接收序列化流 header 数据,并将其解释为接收 ByteBuffer 的 4 字节缓冲区大小 object.显然,这个 header 的前 4 个字节必须有一个 1 作为最高有效位,导致接收方将其解释为负整数,并且 ByteBuffer 分配方法抛出 IlleglaArgumentException。
接下来,您使用 ObjectOutputStream 将序列化数据写入 ByteArrayOutputStream object。尽管我正在传输 byte[] 数据,但 write(byte[])
方法导致了异常。我不得不改用 writeObject()
方法,将我的 byte[] 数据传递到 writeObject().
加载ByteArrayOutputStream后,计算其大小并将大小写入socket输出流。确保将此数据写入套接字输出流,而不是 ObjectOutputStream,并在写入 ByteArrayOutputStream 数据之前写入大小数据。由于大小必须作为 4 个字节传输,但 outputStream.write() 方法只写入 8 个 low-order 位,因此您需要使用一些位运算符来传输正确的数据。
发件人中的代码如下所示:
try(ServerSocket serverSocket = new ServerSocket(9999);
Socket clientSocket = serverSocket.accept();
OutputStream outputStream = clientSocket.getOutputStream();
ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(bos);)
{
byte[] bytes;
// Load the byte[] with data
...
oos.writeObject(bytes);
oos.flush();
oos.close();
outputStream.write(bos.size() >> 24);
outputStream.write(bos.size() >> 16);
outputStream.write(bos.size() >> 8);
outputStream.write(bos.size());
outputStream.write(bos.toByteArray());
// Keep socket connections open
}
catch (IOException e) {
e.printStackTrace();
}
在接收端,相关代码是这样的:
SparkConf conf = new SparkConf().setAppName("log jamming").setMaster("local[2]");
JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(1));
JavaReceiverInputDStream<byte[]> bytes = jsc.rawSocketStream("localhost", 9999);
// Have fun with the RDD
jsc.start();
jsc.awaitTermination();
我正在尝试 spark streaming 和监听套接字,我正在使用 rawSocketStream 方法创建接收器和 DStream。但是当我打印 DStream 时,出现以下异常。
创建 DStream 的代码:
JavaSparkContext jsc = new JavaSparkContext("Master", "app");
JavaStreamingContext jssc = new JavaStreamingContext(jsc, new Seconds(3));
JavaReceiverInputDStream<Object> rawStream = jssc.rawSocketStream("localhost", 9999);
log.info(tracePrefix + "Created the stream ...");
rawStream.print();
jssc.start();
jssc.awaitTermination();
通过 TCP 连接发送 protobug object 的代码:
FileInputStream input = new FileInputStream("address_book");
AddressBook book = AddressBookProtos.AddressBook.parseFrom(input);
log.info(tracePrefix + "Size of contacts: " + book.getPersonList().size());
ServerSocket serverSocket = new ServerSocket(9999);
log.info(tracePrefix + "Waiting for connections ...");
Socket s1 = serverSocket.accept();
log.info(tracePrefix + "Accepted a connection ...");
while(true) {
Thread.sleep(3000);
ObjectOutputStream out = new ObjectOutputStream(s1.getOutputStream());
out.writeByte(book.getSerializedSize());
out.write(book.toByteArray());
out.flush();
log.info(tracePrefix + "Written to new socket");
}
堆栈跟踪如下所示:
java.lang.IllegalArgumentException
at java.nio.ByteBuffer.allocate(ByteBuffer.java:334)
at org.apache.spark.streaming.dstream.RawNetworkReceiver.onStart(RawInputDStream.scala:88)
at org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:148)
at org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:130)
at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun.apply(ReceiverTracker.scala:575)
at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun.apply(ReceiverTracker.scala:565)
at org.apache.spark.SparkContext$$anonfun.apply(SparkContext.scala:1992)
at org.apache.spark.SparkContext$$anonfun.apply(SparkContext.scala:1992)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
2016-04-02 07:45:47,607 ERROR [Executor task launch worker-0] org.apache.spark.streaming.receiver.ReceiverSupervisorImpl
Stopped receiver with error: java.lang.IllegalArgumentException
2016-04-02 07:45:47,613 ERROR [Executor task launch worker-0] org.apache.spark.executor.Executor
Exception in task 0.0 in stage 0.0 (TID 0)
java.lang.IllegalArgumentException
at java.nio.ByteBuffer.allocate(ByteBuffer.java:334)
at org.apache.spark.streaming.dstream.RawNetworkReceiver.onStart(RawInputDStream.scala:88)
at org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:148)
at org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:130)
at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun.apply(ReceiverTracker.scala:575)
at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun.apply(ReceiverTracker.scala:565)
at org.apache.spark.SparkContext$$anonfun.apply(SparkContext.scala:1992)
at org.apache.spark.SparkContext$$anonfun.apply(SparkContext.scala:1992)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
2016-04-02 07:45:47,646 ERROR [task-result-getter-0] org.apache.spark.scheduler.TaskSetManager
Task 0 in stage 0.0 failed 1 times; aborting job
2016-04-02 07:45:47,656 ERROR [submit-job-thread-pool-0] org.apache.spark.streaming.scheduler.ReceiverTracker
Receiver has been stopped. Try to restart it.
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost): java.lang.IllegalArgumentException
at java.nio.ByteBuffer.allocate(ByteBuffer.java:334)
at org.apache.spark.streaming.dstream.RawNetworkReceiver.onStart(RawInputDStream.scala:88)
at org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:148)
at org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:130)
at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun.apply(ReceiverTracker.scala:575)
at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun.apply(ReceiverTracker.scala:565)
at org.apache.spark.SparkContext$$anonfun.apply(SparkContext.scala:1992)
at org.apache.spark.SparkContext$$anonfun.apply(SparkContext.scala:1992)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage.apply(DAGScheduler.scala:1419)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage.apply(DAGScheduler.scala:1418)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed.apply(DAGScheduler.scala:799)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed.apply(DAGScheduler.scala:799)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
at org.apache.spark.util.EventLoop$$anon.run(EventLoop.scala:48)
Caused by: java.lang.IllegalArgumentException
at java.nio.ByteBuffer.allocate(ByteBuffer.java:334)
at org.apache.spark.streaming.dstream.RawNetworkReceiver.onStart(RawInputDStream.scala:88)
at org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:148)
at org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:130)
at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun.apply(ReceiverTracker.scala:575)
at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun.apply(ReceiverTracker.scala:565)
at org.apache.spark.SparkContext$$anonfun.apply(SparkContext.scala:1992)
at org.apache.spark.SparkContext$$anonfun.apply(SparkContext.scala:1992)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
工作代码
通过 TCP
发送 protobuf object 的代码 ServerSocket serverSocket = new ServerSocket(9999);
log.info(tracePrefix + "Waiting for connections ...");
Socket s1 = serverSocket.accept();
log.info(tracePrefix + "Accepted a connection ...");
while(true) {
Thread.sleep(3000);
DataOutputStream out = new DataOutputStream(s1.getOutputStream());
byte[] bytes = book.toByteArray();
log.info(tracePrefix + "Serialized size: " + book.getSerializedSize());
out.writeInt(book.getSerializedSize());
log.info(tracePrefix + "Sending bytes: " + Arrays.toString(bytes));
out.write(bytes);
// out.write("hello world !".getBytes());
out.flush();
log.info(tracePrefix + "Written to new socket");
}
创建接收器和 DStream 的代码
JavaReceiverInputDStream<GeneratedMessage> rawStream = jssc.receiverStream(new JavaSocketReceiver("localhost", 9999));
log.info(tracePrefix + "Created the stream ...");
rawStream.print();
private static class JavaSocketReceiver extends Receiver<GeneratedMessage> {
/**
*
*/
private static final long serialVersionUID = -958378677169958045L;
String host = null;
int port = -1;
JavaSocketReceiver(String host_, int port_) {
super(StorageLevel.MEMORY_AND_DISK());
host = host_;
port = port_;
}
@Override
public void onStart() {
new Thread() {
@Override
public void run() {
receive();
}
}.start();
}
@Override
public void onStop() {
}
private void receive() {
try {
Socket socket = null;
ObjectInputStream in = null;
try {
// Open a socket to the target address and keep reading from
// it
log.info(tracePrefix + "Connecting to " + host + ":" + port);
SocketChannel channel = SocketChannel.open();
channel.configureBlocking(true);
channel.connect(new InetSocketAddress(host, port));
log.info(tracePrefix + "Connected to " + host + ":" + port);
ArrayBlockingQueue<ByteBuffer> queue = new ArrayBlockingQueue<>(2);
Thread blockPushingThread = new Thread(new Runnable() {
@Override
public void run() {
int nextBlockNumber = 0;
while (true) {
try {
ByteBuffer buffer = queue.take();
nextBlockNumber += 1;
AddressBook book = AddressBook.parseFrom(buffer.array());
// log.info(tracePrefix + "Got back the object: " + book);
store(book);
} catch (InterruptedException ie) {
log.error(tracePrefix + "Failed processing data", ie);
} catch (Throwable t) {
log.error(tracePrefix + "Failed processing data", t);
}
}
}
});
blockPushingThread.setDaemon(true);
blockPushingThread.start();
ByteBuffer lengthBuffer = ByteBuffer.allocate(4);
while (true) {
lengthBuffer.clear();
readFully(channel, lengthBuffer);
lengthBuffer.flip();
int length = lengthBuffer.getInt();
// log.info(tracePrefix + "The length read: " + length);
ByteBuffer dataBuffer = ByteBuffer.allocate(length);
readFully(channel, dataBuffer);
dataBuffer.flip();
// log.info(tracePrefix + "Read a block with " + length + " bytes");
queue.put(dataBuffer);
}
} finally {
Closeables.close(in, /* swallowIOException = */ true);
Closeables.close(socket, /* swallowIOException = */ true);
}
} catch (ConnectException ce) {
ce.printStackTrace();
restart("Could not connect", ce);
} catch (Throwable t) {
t.printStackTrace();
restart("Error receiving data", t);
}
}
private void readFully(ReadableByteChannel channel, ByteBuffer dest) {
while (dest.position() < dest.limit()) {
try {
if (channel.read(dest) == -1) {
throw new EOFException("End of channel");
}
} catch (IOException e) {
log.error(tracePrefix + "Failed reading from channel: " + channel, e);
}
}
}
}
上面的JavaSocketReceiver取自spark streaming模块的rawSocketStream。在我发送字节的客户端代码中,如果我将 DataOutputStream 更改为 ObjectOutputStream,我会得到一个损坏的 header 异常,而在 Streaming 代码中,如果我使用内置的 rawSocketStream 来监听传入的数据包,那么我会得到一个ByteBuffer 的 IllegalArgumentException (334)
如果我们查看 ByteBuffer 文档,IllegalArgumentException
只能由尝试分配负缓冲区大小引起。
RawInputDStream
协议需要一个整数大小字段,后跟相应的有效负载。该字段是 4-byte Integer。
问题中显示的sender程序:
out.writeByte(book.getSerializedSize());
正在将整数大小写为一个字节。因此,当读取端尝试解码有效负载大小时,它将读取损坏的字段,因为它将将该字节与有效负载的一些信息组合在一起,解码后会产生负整数。
解决方案应该是写一个 4 字节(32 位)整数来代替:
out.writeInt(book.getSerializedSize());
我一直在使用 JavaStreamingContext class 的 rawSocketStream 方法调查类似的问题。在我的例子中,我想发送和接收 byte[] 数据,我最终让它工作了。
关于原始问题,您可以实现 objective 使用 rawSocketStream 接收数据而无需编写自定义 Receiver。该解决方案基于您如何在发送端使用 ObjectOutputStream。正如 Lokesh 发现的那样,使用 Socket 输出流构造 ObjectOutputStream object 将导致 IllegalArgumentException。但是,如果发送方像这样构造套接字和流,则不会发生异常:
ServerSocket serverSocket = new ServerSocket(9999);
Socket clientSocket = serverSocket.accept();
OutputStream outputStream = clientSocket.getOutputStream();
ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(bos);
请注意,我将 ByteArrayOutputStream object 传递给了 ObjectOutputStream 构造函数。原始方法的根本问题是 ObjectOutputStream 的构造函数将序列化流 header 写入基础流 (https://docs.oracle.com/javase/8/docs/api/java/io/ObjectOutputStream.html#ObjectOutputStream-java.io.OutputStream-)。如果底层流是 clientSocket 输出流,那么 Spark 端的原始套接字流将在连接时立即接收序列化流 header 数据,并将其解释为接收 ByteBuffer 的 4 字节缓冲区大小 object.显然,这个 header 的前 4 个字节必须有一个 1 作为最高有效位,导致接收方将其解释为负整数,并且 ByteBuffer 分配方法抛出 IlleglaArgumentException。
接下来,您使用 ObjectOutputStream 将序列化数据写入 ByteArrayOutputStream object。尽管我正在传输 byte[] 数据,但 write(byte[])
方法导致了异常。我不得不改用 writeObject()
方法,将我的 byte[] 数据传递到 writeObject().
加载ByteArrayOutputStream后,计算其大小并将大小写入socket输出流。确保将此数据写入套接字输出流,而不是 ObjectOutputStream,并在写入 ByteArrayOutputStream 数据之前写入大小数据。由于大小必须作为 4 个字节传输,但 outputStream.write() 方法只写入 8 个 low-order 位,因此您需要使用一些位运算符来传输正确的数据。
发件人中的代码如下所示:
try(ServerSocket serverSocket = new ServerSocket(9999);
Socket clientSocket = serverSocket.accept();
OutputStream outputStream = clientSocket.getOutputStream();
ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(bos);)
{
byte[] bytes;
// Load the byte[] with data
...
oos.writeObject(bytes);
oos.flush();
oos.close();
outputStream.write(bos.size() >> 24);
outputStream.write(bos.size() >> 16);
outputStream.write(bos.size() >> 8);
outputStream.write(bos.size());
outputStream.write(bos.toByteArray());
// Keep socket connections open
}
catch (IOException e) {
e.printStackTrace();
}
在接收端,相关代码是这样的:
SparkConf conf = new SparkConf().setAppName("log jamming").setMaster("local[2]");
JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(1));
JavaReceiverInputDStream<byte[]> bytes = jsc.rawSocketStream("localhost", 9999);
// Have fun with the RDD
jsc.start();
jsc.awaitTermination();