设置客户端套接字和服务器套接字侦听器 (Java)
Setting up a Client socket and ServerSocket listener (Java)
我正在尝试在 java 中建立点对点连接。
我正在尝试设置我的程序以侦听传入连接,同时在外部能够连接到不同的客户端。
如何实例化我的套接字连接:socketConnection
作为连接到程序的任何内容。理想情况下是这样的:
if(socketConnection.isConnectedToExternalPeer()){
//do stuff
} else if (socketConnection.hasAnIncomingConnection()){
//do stuff
}
在咨询了@L.Spillner的解决方案后,我在下面整理了以下代码,唯一的问题是我不太明白如何接受连接,这从事实上,当我尝试设置流时,程序在等待对等方回复时陷入循环:
public class Client implements AutoCloseable {
// Any other ThreadPool can be used as well
private ExecutorService cachedExecutor = null;
private ExecutorService singleThreadExecutor = null;
// port this client shall listen on
private int port = 0;
// Name of the client
private String name = null;
// indicates that a connection is ongoing
private boolean isConnected = false;
// the socket the Client is currently connected with
private Socket activeConenctionSocket = null;
// The ServerSocket which will be listening for any incoming connection
private ServerSocket listener = null;
// The socket which has been accepted by the ServerSocket
private Future<Socket> acceptedSocket;
private ObjectInputStream inputStream = null;
private ObjectOutputStream outputStream = null;
private BloomChain bloomChain = null;
/**
* @param port Port number by which this client shall be accessed.
* @param name The name of this Client.
*/
public Client( int port, String name )
{
this.port = port;
this.name = name;
this.bloomChain = new BloomChain();
this.cachedExecutor = Executors.newCachedThreadPool();
this.singleThreadExecutor = Executors.newSingleThreadExecutor();
this.listener = createListeningSocket();
startListening();
}
private ServerSocket createListeningSocket()
{
ServerSocket temp = null;
try
{
temp = new ServerSocket( this.port );
}
catch ( IOException e )
{
e.printStackTrace();
}
return temp;
}
private void startListening()
{
if ( !this.isConnected )
{
this.listener = createListeningSocket();
this.acceptedSocket = this.cachedExecutor.submit( new ServAccept( this.listener ) );
}
}
/**
* Attempts to connect to any other socket specified by the hostname and the targetport.
*
* @param host The hostname of the target to connect.
* @param targetport The port of the target.
*/
public void connect( String host, int targetport )
{
try
{ System.out.println(host);
System.out.println(targetport);
this.activeConenctionSocket = new Socket( InetAddress.getByName( host ), targetport );
setUpStreams(this.activeConenctionSocket);
this.isConnected = true;
System.out.println(InetAddress.getAllByName(host));
}
catch ( IOException e )
{
e.printStackTrace();
}
try
{
this.listener.close();
}
catch ( IOException e )
{
// this will almost certainly throw an exception but it is intended.
}
}
public void setUpStreams(Socket socket) throws IOException {
this.outputStream = new ObjectOutputStream(socket.getOutputStream());
this.outputStream.flush();
this.inputStream = new ObjectInputStream(socket.getInputStream());
}
@Override
public void close() throws Exception
{
// close logic (can be rather nasty)
}
public void sendMessage(String message){
if(bloomChain.size()<1){
bloomChain.addBlock(new Block(message, "0"));
} else {
bloomChain.addBlock(new Block(message, bloomChain.get(bloomChain.size()-1).getPreviousHash()));
}
try {
this.outputStream.writeObject(bloomChain);
this.outputStream.flush();
} catch (IOException e) {
e.printStackTrace();
}
}
public String mineMessage(){
final String[] receivedMessage = {null};
final Block tempBlock = this.bloomChain.get(this.bloomChain.size()-1);
this.singleThreadExecutor.submit(()->{
tempBlock.mineBlock(bloomChain.getDifficulty());
receivedMessage[0] = tempBlock.getData();
});
return receivedMessage[0];
}
public String dataListener(){
if(isConnected) {
try {
BloomChain tempChain = (BloomChain) this.inputStream.readObject();
if (tempChain.isChainValid()) {
this.bloomChain = tempChain;
return mineMessage();
}
} catch (IOException e) {
e.printStackTrace();
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
}
return null;
}
public ServerSocket getListener() {
return this.listener;
}
public boolean isConnected(){
return isConnected;
}
public ObjectOutputStream getOutputStream(){
return this.outputStream;
}
public ObjectInputStream getInputStream(){
return this.inputStream;
}
}
编辑 2:
我试图在单独的线程中等待 acceptedSocket.get()
到 return 套接字,如下所示:
new Thread(()->{
setupStreams(this.acceptedSocket.get());
//try-catch blocks omitted
}).start();
这成功等待 acceptedSocket
到 return 连接的套接字但是当我尝试连接到另一个本地 运行 客户端时,我收到以下错误:java.net.SocketException: socket closed
好吧,经过一些修补,我终于想出了一个巧妙的小解决方案:
我们希望能够同时收听和连接,因此我们需要一个 ServerSocket
并发出一个 ServerSocket#accept
调用来接受传入的连接。
然而,这个方法阻塞了线程,所以为了能够继续我们的程序,我们必须将这个调用外包到另一个线程,幸运的是默认的 Java API 确实提供了一个简单的方法来做到这一点。
以下代码示例未完成但提供了核心功能:
Client.java:
public class Client
implements AutoCloseable
{
// Any other ThreadPool can be used as well
private ExecutorService es = Executors.newCachedThreadPool();
// port this client shall listen on
private int port;
// Name of the client
private String name;
// indicates that a connection is ongoing
private boolean isConnected = false;
// the socket the Client is currently connected with
private Socket activeConenctionSocket;
// The ServerSocket which will be listening for any incoming connection
private ServerSocket listener;
// The socket which has been accepted by the ServerSocket
private Future<Socket> acceptedSocket;
/**
* @param port Port number by which this client shall be accessed.
* @param name The name of this Client.
*/
public Client( int port, String name )
{
this.port = port;
this.name = name;
this.listener = createListeningSocket();
startListening();
}
private ServerSocket createListeningSocket()
{
ServerSocket temp = null;
try
{
temp = new ServerSocket( port );
}
catch ( IOException e )
{
e.printStackTrace();
}
return temp;
}
private void startListening()
{
if ( !isConnected )
{
listener = createListeningSocket();
acceptedSocket = es.submit( new ServAccept( listener ) );
}
}
/**
* Attempts to connect to any other socket specified by the hostname and the targetport.
*
* @param host The hostname of the target to connect.
* @param targetport The port of the target.
*/
public void connect( String host, int targetport )
{
isConnected = true;
try
{
activeConenctionSocket = new Socket( InetAddress.getByName( host ), targetport );
}
catch ( IOException e )
{
e.printStackTrace();
}
try
{
listener.close();
}
catch ( IOException e )
{
// this will almost certainly throw an exception but it is intended.
}
}
@Override
public void close() throws Exception
{
// close logic (can be rather nasty)
}
}
让我们逐步了解如何实例化新的 Client 对象:
- 当我们实例化我们的对象时,我们创建了一个新的 ServerSocket
- 我们通过创建一个
Callable<V>
对象的新线程来开始监听,出于示例目的,我将其命名为 ServAccept
。
- 现在我们有一个
Future<T>
对象,如果任何连接被接受,它将包含一个套接字。
startListening()
方法的一个积极的副作用是,您可以使它成为 public 并在连接断开时再次调用它。
conenct(...)
方法几乎与您的 setupConnection()
方法相同,但有一点不同。仍在另一个线程中侦听的 ServerSocket 将关闭。这样做的原因是,没有其他方法可以退出另一个线程卡在其中的 accept()
方法。
最后一件事(您必须弄清楚)是何时检查 Future 对象是否已经完成。
ServAccept.java
public class ServAccept
implements Callable<Socket>
{
ServerSocket serv;
public ServAccept( ServerSocket sock )
{
this.serv = sock;
}
@Override
public Socket call() throws Exception
{
return serv.accept();
}
}
编辑:
事实上,我不得不承认我的方法可能不是一个非常全面的方法来完成任务,所以我决定改变一些东西。这次我没有使用 Future 对象,而是决定使用 Events / 自定义 EventListener,它只是坐在那里并监听要接收的连接。我测试了连接功能,它工作得很好,但我还没有实施解决方案来确定客户端是否真的连接到对等方。我只是确保客户端一次只能保持一个连接。
变化:
ServerAccept.java
import java.io.IOException;
import java.net.ServerSocket;
public class ServAccept implements Runnable
{
private ServerSocket serv;
private ConnectionReceivedListener listener;
public ServAccept( ServerSocket sock,ConnectionReceivedListener con )
{
this.serv = sock;
this.listener = con;
}
@Override
public void run()
{
try
{
listener.onConnectionReceived( new ConnectionReceivedEvent( serv.accept() ) );
} catch (IOException e)
{
// planned exception here.
}
}
}
不再实现 Callable<V>
但 Runnable
进行该更改的唯一原因是我们不再等待任何 return 因为我们将使用侦听器和一些有趣的事件.无论如何,为了做到这一点,我们需要创建一个侦听器并将其传递给该对象。但首先我们应该看一下监听器/事件结构:
ConnectionReceivedListener.java
import java.util.EventListener;
@FunctionalInterface
public interface ConnectionReceivedListener extends EventListener
{
public void onConnectionReceived(ConnectionReceivedEvent event);
}
只是我们构建一些匿名 classes 或 lambda 表达式的简单接口。没什么好想的。它甚至不需要扩展 EventListener
接口,但我喜欢这样做来提醒我 class 的目的是什么。
ConnectionReceivedEvent.java
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
public class ConnectionReceivedEvent
{
private Socket accepted;
public ConnectionReceivedEvent( Socket sock )
{
this.accepted = sock;
}
public Socket getSocket()
{
return accepted;
}
public OutputStream getOutput() throws IOException
{
return accepted.getOutputStream();
}
public InputStream getInput() throws IOException
{
return accepted.getInputStream();
}
public int getPort()
{
return accepted.getPort();
}
}
也没什么好想的,只是将 Socket 作为构造函数参数传递并定义一些 getter,其中大多数不会在此示例中使用。
但是我们现在如何使用它呢?
private void startListening()
{
if (!isConnected)
{
closeIfNotNull();
listener = createListeningSocket();
es.execute( new ServAccept( listener, event -> setAccepted( event.getSocket() ) ) );
}
}
private void setAccepted( Socket socket )
{
if (!isConnected)
{
this.activeConenctionSocket = socket;
setUpStreams( socket );
} else
{
sendError( socket );
}
}
我们仍然使用我们的 ExecutorService
并使用 ServAccept
class 创建一个新线程。然而,由于我们不期望任何 return,我从 ExecutorService#submit
更改为 ExecutorService#execute
(只是意见和品味问题)。
但是 ServAccept
现在需要两个参数。要使用的 ServerSocket 和 Listener。幸运的是,我们可以使用匿名 classes,而且由于我们的 Listener 只有一种方法,我们甚至可以使用 lambda 表达式。 event -> setAccepted(event.getSocket())
.
作为对您第二次编辑的回答:我犯了一个逻辑错误。不是 ServerSocket#close
方法在中断 ServerSocket#accept
调用时抛出异常,而是 accept()
调用本身抛出异常。换句话说,你得到的异常是有意的,我错误地抑制了另一个。
我正在尝试在 java 中建立点对点连接。
我正在尝试设置我的程序以侦听传入连接,同时在外部能够连接到不同的客户端。
如何实例化我的套接字连接:socketConnection
作为连接到程序的任何内容。理想情况下是这样的:
if(socketConnection.isConnectedToExternalPeer()){
//do stuff
} else if (socketConnection.hasAnIncomingConnection()){
//do stuff
}
在咨询了@L.Spillner的解决方案后,我在下面整理了以下代码,唯一的问题是我不太明白如何接受连接,这从事实上,当我尝试设置流时,程序在等待对等方回复时陷入循环:
public class Client implements AutoCloseable {
// Any other ThreadPool can be used as well
private ExecutorService cachedExecutor = null;
private ExecutorService singleThreadExecutor = null;
// port this client shall listen on
private int port = 0;
// Name of the client
private String name = null;
// indicates that a connection is ongoing
private boolean isConnected = false;
// the socket the Client is currently connected with
private Socket activeConenctionSocket = null;
// The ServerSocket which will be listening for any incoming connection
private ServerSocket listener = null;
// The socket which has been accepted by the ServerSocket
private Future<Socket> acceptedSocket;
private ObjectInputStream inputStream = null;
private ObjectOutputStream outputStream = null;
private BloomChain bloomChain = null;
/**
* @param port Port number by which this client shall be accessed.
* @param name The name of this Client.
*/
public Client( int port, String name )
{
this.port = port;
this.name = name;
this.bloomChain = new BloomChain();
this.cachedExecutor = Executors.newCachedThreadPool();
this.singleThreadExecutor = Executors.newSingleThreadExecutor();
this.listener = createListeningSocket();
startListening();
}
private ServerSocket createListeningSocket()
{
ServerSocket temp = null;
try
{
temp = new ServerSocket( this.port );
}
catch ( IOException e )
{
e.printStackTrace();
}
return temp;
}
private void startListening()
{
if ( !this.isConnected )
{
this.listener = createListeningSocket();
this.acceptedSocket = this.cachedExecutor.submit( new ServAccept( this.listener ) );
}
}
/**
* Attempts to connect to any other socket specified by the hostname and the targetport.
*
* @param host The hostname of the target to connect.
* @param targetport The port of the target.
*/
public void connect( String host, int targetport )
{
try
{ System.out.println(host);
System.out.println(targetport);
this.activeConenctionSocket = new Socket( InetAddress.getByName( host ), targetport );
setUpStreams(this.activeConenctionSocket);
this.isConnected = true;
System.out.println(InetAddress.getAllByName(host));
}
catch ( IOException e )
{
e.printStackTrace();
}
try
{
this.listener.close();
}
catch ( IOException e )
{
// this will almost certainly throw an exception but it is intended.
}
}
public void setUpStreams(Socket socket) throws IOException {
this.outputStream = new ObjectOutputStream(socket.getOutputStream());
this.outputStream.flush();
this.inputStream = new ObjectInputStream(socket.getInputStream());
}
@Override
public void close() throws Exception
{
// close logic (can be rather nasty)
}
public void sendMessage(String message){
if(bloomChain.size()<1){
bloomChain.addBlock(new Block(message, "0"));
} else {
bloomChain.addBlock(new Block(message, bloomChain.get(bloomChain.size()-1).getPreviousHash()));
}
try {
this.outputStream.writeObject(bloomChain);
this.outputStream.flush();
} catch (IOException e) {
e.printStackTrace();
}
}
public String mineMessage(){
final String[] receivedMessage = {null};
final Block tempBlock = this.bloomChain.get(this.bloomChain.size()-1);
this.singleThreadExecutor.submit(()->{
tempBlock.mineBlock(bloomChain.getDifficulty());
receivedMessage[0] = tempBlock.getData();
});
return receivedMessage[0];
}
public String dataListener(){
if(isConnected) {
try {
BloomChain tempChain = (BloomChain) this.inputStream.readObject();
if (tempChain.isChainValid()) {
this.bloomChain = tempChain;
return mineMessage();
}
} catch (IOException e) {
e.printStackTrace();
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
}
return null;
}
public ServerSocket getListener() {
return this.listener;
}
public boolean isConnected(){
return isConnected;
}
public ObjectOutputStream getOutputStream(){
return this.outputStream;
}
public ObjectInputStream getInputStream(){
return this.inputStream;
}
}
编辑 2:
我试图在单独的线程中等待 acceptedSocket.get()
到 return 套接字,如下所示:
new Thread(()->{
setupStreams(this.acceptedSocket.get());
//try-catch blocks omitted
}).start();
这成功等待 acceptedSocket
到 return 连接的套接字但是当我尝试连接到另一个本地 运行 客户端时,我收到以下错误:java.net.SocketException: socket closed
好吧,经过一些修补,我终于想出了一个巧妙的小解决方案:
我们希望能够同时收听和连接,因此我们需要一个 ServerSocket
并发出一个 ServerSocket#accept
调用来接受传入的连接。
然而,这个方法阻塞了线程,所以为了能够继续我们的程序,我们必须将这个调用外包到另一个线程,幸运的是默认的 Java API 确实提供了一个简单的方法来做到这一点。
以下代码示例未完成但提供了核心功能:
Client.java:
public class Client
implements AutoCloseable
{
// Any other ThreadPool can be used as well
private ExecutorService es = Executors.newCachedThreadPool();
// port this client shall listen on
private int port;
// Name of the client
private String name;
// indicates that a connection is ongoing
private boolean isConnected = false;
// the socket the Client is currently connected with
private Socket activeConenctionSocket;
// The ServerSocket which will be listening for any incoming connection
private ServerSocket listener;
// The socket which has been accepted by the ServerSocket
private Future<Socket> acceptedSocket;
/**
* @param port Port number by which this client shall be accessed.
* @param name The name of this Client.
*/
public Client( int port, String name )
{
this.port = port;
this.name = name;
this.listener = createListeningSocket();
startListening();
}
private ServerSocket createListeningSocket()
{
ServerSocket temp = null;
try
{
temp = new ServerSocket( port );
}
catch ( IOException e )
{
e.printStackTrace();
}
return temp;
}
private void startListening()
{
if ( !isConnected )
{
listener = createListeningSocket();
acceptedSocket = es.submit( new ServAccept( listener ) );
}
}
/**
* Attempts to connect to any other socket specified by the hostname and the targetport.
*
* @param host The hostname of the target to connect.
* @param targetport The port of the target.
*/
public void connect( String host, int targetport )
{
isConnected = true;
try
{
activeConenctionSocket = new Socket( InetAddress.getByName( host ), targetport );
}
catch ( IOException e )
{
e.printStackTrace();
}
try
{
listener.close();
}
catch ( IOException e )
{
// this will almost certainly throw an exception but it is intended.
}
}
@Override
public void close() throws Exception
{
// close logic (can be rather nasty)
}
}
让我们逐步了解如何实例化新的 Client 对象:
- 当我们实例化我们的对象时,我们创建了一个新的 ServerSocket
- 我们通过创建一个
Callable<V>
对象的新线程来开始监听,出于示例目的,我将其命名为ServAccept
。 - 现在我们有一个
Future<T>
对象,如果任何连接被接受,它将包含一个套接字。
startListening()
方法的一个积极的副作用是,您可以使它成为 public 并在连接断开时再次调用它。
conenct(...)
方法几乎与您的 setupConnection()
方法相同,但有一点不同。仍在另一个线程中侦听的 ServerSocket 将关闭。这样做的原因是,没有其他方法可以退出另一个线程卡在其中的 accept()
方法。
最后一件事(您必须弄清楚)是何时检查 Future 对象是否已经完成。
ServAccept.java
public class ServAccept
implements Callable<Socket>
{
ServerSocket serv;
public ServAccept( ServerSocket sock )
{
this.serv = sock;
}
@Override
public Socket call() throws Exception
{
return serv.accept();
}
}
编辑:
事实上,我不得不承认我的方法可能不是一个非常全面的方法来完成任务,所以我决定改变一些东西。这次我没有使用 Future 对象,而是决定使用 Events / 自定义 EventListener,它只是坐在那里并监听要接收的连接。我测试了连接功能,它工作得很好,但我还没有实施解决方案来确定客户端是否真的连接到对等方。我只是确保客户端一次只能保持一个连接。
变化:
ServerAccept.java
import java.io.IOException;
import java.net.ServerSocket;
public class ServAccept implements Runnable
{
private ServerSocket serv;
private ConnectionReceivedListener listener;
public ServAccept( ServerSocket sock,ConnectionReceivedListener con )
{
this.serv = sock;
this.listener = con;
}
@Override
public void run()
{
try
{
listener.onConnectionReceived( new ConnectionReceivedEvent( serv.accept() ) );
} catch (IOException e)
{
// planned exception here.
}
}
}
不再实现 Callable<V>
但 Runnable
进行该更改的唯一原因是我们不再等待任何 return 因为我们将使用侦听器和一些有趣的事件.无论如何,为了做到这一点,我们需要创建一个侦听器并将其传递给该对象。但首先我们应该看一下监听器/事件结构:
ConnectionReceivedListener.java
import java.util.EventListener;
@FunctionalInterface
public interface ConnectionReceivedListener extends EventListener
{
public void onConnectionReceived(ConnectionReceivedEvent event);
}
只是我们构建一些匿名 classes 或 lambda 表达式的简单接口。没什么好想的。它甚至不需要扩展 EventListener
接口,但我喜欢这样做来提醒我 class 的目的是什么。
ConnectionReceivedEvent.java
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
public class ConnectionReceivedEvent
{
private Socket accepted;
public ConnectionReceivedEvent( Socket sock )
{
this.accepted = sock;
}
public Socket getSocket()
{
return accepted;
}
public OutputStream getOutput() throws IOException
{
return accepted.getOutputStream();
}
public InputStream getInput() throws IOException
{
return accepted.getInputStream();
}
public int getPort()
{
return accepted.getPort();
}
}
也没什么好想的,只是将 Socket 作为构造函数参数传递并定义一些 getter,其中大多数不会在此示例中使用。
但是我们现在如何使用它呢?
private void startListening()
{
if (!isConnected)
{
closeIfNotNull();
listener = createListeningSocket();
es.execute( new ServAccept( listener, event -> setAccepted( event.getSocket() ) ) );
}
}
private void setAccepted( Socket socket )
{
if (!isConnected)
{
this.activeConenctionSocket = socket;
setUpStreams( socket );
} else
{
sendError( socket );
}
}
我们仍然使用我们的 ExecutorService
并使用 ServAccept
class 创建一个新线程。然而,由于我们不期望任何 return,我从 ExecutorService#submit
更改为 ExecutorService#execute
(只是意见和品味问题)。
但是 ServAccept
现在需要两个参数。要使用的 ServerSocket 和 Listener。幸运的是,我们可以使用匿名 classes,而且由于我们的 Listener 只有一种方法,我们甚至可以使用 lambda 表达式。 event -> setAccepted(event.getSocket())
.
作为对您第二次编辑的回答:我犯了一个逻辑错误。不是 ServerSocket#close
方法在中断 ServerSocket#accept
调用时抛出异常,而是 accept()
调用本身抛出异常。换句话说,你得到的异常是有意的,我错误地抑制了另一个。