套接字通道问题
SocketChannel issue
我写了一个应用程序,它通过 TCP 和 SocketChannel 连接到服务器
但我有两个问题:
- 第一个是次要的 - 有时出于某种未知原因我会发送串联的消息并且
- 第二点至关重要 - 应用会定期停止 sends/receives 消息
知道哪里出了问题吗?
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.util.List;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SocketSelectorWorker extends Thread {
private static final transient Logger log = LoggerFactory.getLogger(SocketSelectorWorker.class);
private ExecutorService executorService = Executors.newFixedThreadPool(3);
private final Queue<byte[]> messages;
private Selector selector;
public SocketSelectorWorker(Queue messages, Selector selector) {
super();
this.selector = selector;
this.session = session;
this.messages = messages;
}
@Override
public void run() {
super.run();
while (isConnectionAlive()) {
try {
// Wait for an event
selector.select();
} catch (IOException e) {
log.error("Selector error: {}", e.toString());
log.debug("Stacktrace: ", e);
session.closeConnection();
break;
}
handleSelectorkeys(selector.selectedKeys());
}
executorService.shutdown();
log.debug("worker stopped");
}
private void handleSelectorkeys(Set<SelectionKey> selectedKeys) {
for (SelectionKey selKey : selector.selectedKeys()) {
selector.selectedKeys().remove(selKey);
try {
processSelectionKey(selKey);
} catch (IOException e) {
// Handle error with channel and unregister
selKey.cancel();
log.error("Selector error: {}", e.toString());
log.debug("Stacktrace: ", e);
}
}
}
public void processSelectionKey(SelectionKey selKey) throws IOException {
// Since the ready operations are cumulative,
// need to check readiness for each operation
if (selKey.isValid() && selKey.isConnectable()) {
log.debug("connectable");
// Get channel with connection request
SocketChannel sChannel = (SocketChannel) selKey.channel();
boolean success = sChannel.finishConnect();
if (!success) {
// An error occurred; handle it
log.error("Error on finish");
// Unregister the channel with this selector
selKey.cancel();
}
}
if (selKey.isValid() && selKey.isReadable()) {
log.debug("readable");
readMessage(selKey);
}
if (selKey.isValid() && selKey.isWritable()) {
log.debug("writable");
writeMessage(selKey);
}
if (selKey.isValid() && selKey.isAcceptable()) {
log.debug("Acceptable");
}
}
private void writeMessage(SelectionKey selKey) throws IOException {
byte[] message = messages.poll();
if (message == null) {
return;
}
// Get channel that's ready for more bytes
SocketChannel socketChannel = (SocketChannel) selKey.channel();
// See Writing to a SocketChannel
// Create a direct buffer to get bytes from socket.
// Direct buffers should be long-lived and be reused as much as
// possible.
ByteBuffer buf = ByteBuffer.allocateDirect(1024);// .allocateDirect(toSend.getBytes().length);
// try {
// Fill the buffer with the bytes to write;
// see Putting Bytes into a ByteBuffer
// buf.put((byte)0xFF);
buf.clear();
buf.put(new byte[] { 0x02 });
buf.put(message);
buf.put(new byte[] { 0x03 });
// Prepare the buffer for reading by the socket
buf.flip();
// Write bytes
int numBytesWritten = socketChannel.write(buf);
log.debug("Written: {}", numBytesWritten);
while (buf.hasRemaining()) {
numBytesWritten = socketChannel.write(buf);
log.debug("Written remining: {}", numBytesWritten);
}
}
private void readMessage(SelectionKey selKey) throws IOException {
// Get channel with bytes to read
SocketChannel socketChannel = (SocketChannel) selKey.channel();
// See Reading from a SocketChannel
// Create a direct buffer to get bytes from socket.
// Direct buffers should be long-lived and be reused as much as
// possible.
ByteBuffer buf = ByteBuffer.allocateDirect(2048);
Charset charset = Charset.forName("UTF-8");// Charset.forName("ISO-8859-1");
CharsetDecoder decoder = charset.newDecoder();
// try {
// Clear the buffer and read bytes from socket
buf.clear();
int numBytesRead = socketChannel.read(buf);
if (numBytesRead == -1) {
// No more bytes can be read from the channel
// socketChannel.close();
return;
}
log.debug("Read bytes: {}", numBytesRead);
// To read the bytes, flip the buffer
buf.flip();
String result = decoder.decode(buf).toString();
log.debug("Read string: {}", result);
//processMessage(result.getBytes());
}
}
- TCP 没有消息边界。它是一个字节流协议。任何消息边界由您决定。
您没有正确处理选择键。您必须在迭代时通过迭代器删除,而不是通过集合。这意味着您不能使用增强的 for 循环。可能你在跳过键。
当你从read()
得到-1时你必须关闭频道。
当您获得 IOException
时,仅取消密钥是不够的。你应该关闭频道,NB会自动取消他们的密钥。
我写了一个应用程序,它通过 TCP 和 SocketChannel 连接到服务器 但我有两个问题:
- 第一个是次要的 - 有时出于某种未知原因我会发送串联的消息并且
- 第二点至关重要 - 应用会定期停止 sends/receives 消息
知道哪里出了问题吗?
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.util.List;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SocketSelectorWorker extends Thread {
private static final transient Logger log = LoggerFactory.getLogger(SocketSelectorWorker.class);
private ExecutorService executorService = Executors.newFixedThreadPool(3);
private final Queue<byte[]> messages;
private Selector selector;
public SocketSelectorWorker(Queue messages, Selector selector) {
super();
this.selector = selector;
this.session = session;
this.messages = messages;
}
@Override
public void run() {
super.run();
while (isConnectionAlive()) {
try {
// Wait for an event
selector.select();
} catch (IOException e) {
log.error("Selector error: {}", e.toString());
log.debug("Stacktrace: ", e);
session.closeConnection();
break;
}
handleSelectorkeys(selector.selectedKeys());
}
executorService.shutdown();
log.debug("worker stopped");
}
private void handleSelectorkeys(Set<SelectionKey> selectedKeys) {
for (SelectionKey selKey : selector.selectedKeys()) {
selector.selectedKeys().remove(selKey);
try {
processSelectionKey(selKey);
} catch (IOException e) {
// Handle error with channel and unregister
selKey.cancel();
log.error("Selector error: {}", e.toString());
log.debug("Stacktrace: ", e);
}
}
}
public void processSelectionKey(SelectionKey selKey) throws IOException {
// Since the ready operations are cumulative,
// need to check readiness for each operation
if (selKey.isValid() && selKey.isConnectable()) {
log.debug("connectable");
// Get channel with connection request
SocketChannel sChannel = (SocketChannel) selKey.channel();
boolean success = sChannel.finishConnect();
if (!success) {
// An error occurred; handle it
log.error("Error on finish");
// Unregister the channel with this selector
selKey.cancel();
}
}
if (selKey.isValid() && selKey.isReadable()) {
log.debug("readable");
readMessage(selKey);
}
if (selKey.isValid() && selKey.isWritable()) {
log.debug("writable");
writeMessage(selKey);
}
if (selKey.isValid() && selKey.isAcceptable()) {
log.debug("Acceptable");
}
}
private void writeMessage(SelectionKey selKey) throws IOException {
byte[] message = messages.poll();
if (message == null) {
return;
}
// Get channel that's ready for more bytes
SocketChannel socketChannel = (SocketChannel) selKey.channel();
// See Writing to a SocketChannel
// Create a direct buffer to get bytes from socket.
// Direct buffers should be long-lived and be reused as much as
// possible.
ByteBuffer buf = ByteBuffer.allocateDirect(1024);// .allocateDirect(toSend.getBytes().length);
// try {
// Fill the buffer with the bytes to write;
// see Putting Bytes into a ByteBuffer
// buf.put((byte)0xFF);
buf.clear();
buf.put(new byte[] { 0x02 });
buf.put(message);
buf.put(new byte[] { 0x03 });
// Prepare the buffer for reading by the socket
buf.flip();
// Write bytes
int numBytesWritten = socketChannel.write(buf);
log.debug("Written: {}", numBytesWritten);
while (buf.hasRemaining()) {
numBytesWritten = socketChannel.write(buf);
log.debug("Written remining: {}", numBytesWritten);
}
}
private void readMessage(SelectionKey selKey) throws IOException {
// Get channel with bytes to read
SocketChannel socketChannel = (SocketChannel) selKey.channel();
// See Reading from a SocketChannel
// Create a direct buffer to get bytes from socket.
// Direct buffers should be long-lived and be reused as much as
// possible.
ByteBuffer buf = ByteBuffer.allocateDirect(2048);
Charset charset = Charset.forName("UTF-8");// Charset.forName("ISO-8859-1");
CharsetDecoder decoder = charset.newDecoder();
// try {
// Clear the buffer and read bytes from socket
buf.clear();
int numBytesRead = socketChannel.read(buf);
if (numBytesRead == -1) {
// No more bytes can be read from the channel
// socketChannel.close();
return;
}
log.debug("Read bytes: {}", numBytesRead);
// To read the bytes, flip the buffer
buf.flip();
String result = decoder.decode(buf).toString();
log.debug("Read string: {}", result);
//processMessage(result.getBytes());
}
}
- TCP 没有消息边界。它是一个字节流协议。任何消息边界由您决定。
您没有正确处理选择键。您必须在迭代时通过迭代器删除,而不是通过集合。这意味着您不能使用增强的 for 循环。可能你在跳过键。
当你从
read()
得到-1时你必须关闭频道。当您获得
IOException
时,仅取消密钥是不够的。你应该关闭频道,NB会自动取消他们的密钥。