如何使用 Java 正确地从套接字流式传输数据

How to properly stream data from a socket with Java

我正在尝试使用 Java 通过套接字流式传输数据,以尝试编写 Kafka 生产者。我写了一个 class 来提取数据,但我没有得到我期望的结果。我已经设置好了,所以数据是从 Linux 框流式传输的。数据源是一个 csv 文件,我正在使用 nc 实用程序进行流式处理。 class 在 Eclipse 的 Windows 10 机器上 运行ning。当我 运行 class 我看到两件奇怪的事情。

  1. 列 headers 未传输。
  2. 我只能运行 class一次。如果我想再次 运行 它,我必须停止 nc 并重新启动它。

下面是我的代码。我错过了什么吗?此时我只是试图连接到套接字并将数据拉过来。

I 运行 nc 使用以下命令: $ nc -kl 9999 < uber_data.csv

下面是我的class

import java.net.*;
import java.io.*;

public class Client 
{
static String userInput;

public static void main(String [] args)
{

    try
    {
        InetAddress serverAddress = InetAddress.getByName("servername");
        Socket socket = new Socket(serverAddress, 9999);
        BufferedReader input = new BufferedReader(new InputStreamReader(socket.getInputStream()));

        while ((userInput = input.readLine()) != null) {
            System.out.println(input.readLine());
        }

        input.close();
        socket.close();

    }
    catch(UnknownHostException e1)
    {
        System.out.println("Unknown host exception " + e1.toString());
    }
    catch(IOException e2)
    {
        System.out.println("IOException " + e2.toString());
    }
    catch(IllegalArgumentException e3)
    {
        System.out.println("Illegal Argument Exception " + e3.toString());
    }
    catch(Exception e4)
    {
        System.out.println("Other exceptions " + e4.toString());
    }
}
}

首先,每次调用 readLine() 都会尝试从输入流中读取行。 在 userInput = input.readLine() 中,您阅读 header,但 println(input.readLine()) 阅读 body 并在控制台中打印。

while ((userInput = input.readLine()) != null) { 
     System.out.println(userInput); //instead input.readLine()
}

其次,我没有使用 nc,但我认为如果您在 finally 语句中关闭套接字(和 reader),问题就会解决。

希望对您有所帮助。

对于第一个问题:您正在尝试打印 userInput 字符串。但它正在打印另一个 readline() 调用的结果。

第二种:文件传输完成后,您必须停止并重新启动nc;无论您从身边做什么。它来自 nc 侧。

参见nc documentation

您正在丢弃所有奇数行。应该是:

while ((userInput = input.readLine()) != null) {
    System.out.println(userInput);
}

其次,您没有关闭套接字。使用 try-with-resources:

try
{
    InetAddress serverAddress = InetAddress.getByName("servername");
    try (
        Socket socket = new Socket(serverAddress, 9999);
        BufferedReader input = new BufferedReader(new InputStreamReader(socket.getInputStream()));
    ) {
        while ((userInput = input.readLine()) != null) {
            System.out.println(input.readLine());
        }
    }
}
catch (...)