PostgreSQL CopyManager copyIn 在与 COPY FROM STDIN 一起使用时似乎卡住了

PostgreSQL CopyManager copyIn appears stuck doing nothing when using it with COPY FROM STDIN

我正在尝试按照文档中的建议,将 PostgreSQL CopyManager copyIn 功能与 COPY FROM STDIN 一起使用,以便非常快速地从 InputStream 复制到数据库 table。我正在考虑使用它来连续流式传输要写入 table 的行,当我 receive/process 一个时。然而,下面的快速而肮脏的示例代码似乎卡在 copyIn 上并且不会写入 table.

有人知道我在这里遗漏了什么或者我的理解有误吗?

import java.sql.*;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.io.BufferedWriter;
import java.io.OutputStreamWriter;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import org.postgresql.core.BaseConnection;
import org.postgresql.copy.CopyManager;

public class PGConnectTest {

    public static void main(String[] args) {

        try {
                try (Connection connection = DriverManager.getConnection("jdbc:postgresql://XX.XX.XX.XX:9432/somedb", "someadmin", "somepassword");
                    BaseConnection pgcon = (BaseConnection)connection;
                    PipedInputStream is = new PipedInputStream();
                    BufferedReader br = new BufferedReader(new InputStreamReader(is));
                    PipedOutputStream os = new PipedOutputStream(is);
                    BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(os));) {
                        ExecutorService executorService = Executors.newSingleThreadExecutor();
                        Callable callable = () -> {
                            Thread.sleep(3000);
                            String frmtStr = "%s\t{\"id\":%s, \"somefield\":\"%s\"}\n";
                            String row = null;
                            for(int i=1; i<10; i++) {
                                row = String.format(frmtStr, i, i, ("row"+i));
                                System.out.print(row);
                                bw.write(row);
                            }
                            bw.write("\n");
                            bw.flush();
                            System.out.println("WRITTEN!");
                            return true;
                        };
                        executorService.submit(callable);
                        System.out.println(connection);
                        CopyManager copyManager = new CopyManager(pgcon);
                        String copySql = "COPY dcm.testtbl FROM STDIN";
                        executorService.submit(() -> copyManager.copyIn(copySql, br));
                        Thread.sleep(10000);
                        System.out.println("QUITTING");
                } catch (Exception e) {
                    throw e;
                }
        } catch(Exception ex) {
            System.out.println(ex);
        }

    }

}

table testtbl 的架构如下,

create table testtbl (
id  integer primary key,
jsnclm  jsonb
)

控制台输出是(它不是 return 并且需要使用 CTRL+C 来终止它),

C:\Users\ml410408\Documents\Useful Lookups\POSTGRESQL>java -cp ".;postgresql-42.2.18.jar" PGConnectTest
org.postgresql.jdbc.PgConnection@41975e01
1       {"id":1, "somefield":"row1"}
2       {"id":2, "somefield":"row2"}
3       {"id":3, "somefield":"row3"}
4       {"id":4, "somefield":"row4"}
5       {"id":5, "somefield":"row5"}
6       {"id":6, "somefield":"row6"}
7       {"id":7, "somefield":"row7"}
8       {"id":8, "somefield":"row8"}
9       {"id":9, "somefield":"row9"}
WRITTEN!
QUITTING

更新:

一旦我将 COPY sql 命令的格式从默认的 TEXT 更改为 CSV 并传入 csv 记录,它不再卡住但什么都不做(意味着 [=36 中没有记录=]) 尽管它 return 与以前不同。

import java.sql.*;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.io.BufferedWriter;
import java.io.OutputStreamWriter;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import org.postgresql.core.BaseConnection;
import org.postgresql.copy.CopyManager;

public class PGConnectTest {

    public static void main(String[] args) {

        try {
                try (Connection connection = DriverManager.getConnection("jdbc:postgresql://XX.XX.XX.XX:9432/somedb", "someadmin", "somepassword");
                    BaseConnection pgcon = (BaseConnection)connection;
                    PipedInputStream is = new PipedInputStream();
                    BufferedReader br = new BufferedReader(new InputStreamReader(is));
                    PipedOutputStream os = new PipedOutputStream(is);
                    BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(os));) {
                        ExecutorService executorService = Executors.newSingleThreadExecutor();
                        Callable callable = () -> {
                            Thread.sleep(3000);
                            String frmtStr = "%s,'{\"id\":%s,\"somefield\":\"%s\"}'\n";
                            String row = null;
                            for(int i=1; i<10; i++) {
                                row = String.format(frmtStr, i, i, ("row"+i));
                                System.out.print(row);
                                bw.write(row);
                            }
                            bw.write("\n");
                            bw.write("'\.'\n");
                            System.out.println("'\.'\n");
                            bw.flush();
                            os.flush();
                            System.out.println("WRITTEN!");
                            return true;
                        };
                        executorService.submit(callable);
                        System.out.println(connection);
                        CopyManager copyManager = new CopyManager(pgcon);
                        String copySql = "COPY dcm.testtbl FROM STDIN FORMAT CSV DELIMITER ','";
                        executorService.submit(() -> copyManager.copyIn(copySql, br));
                        Thread.sleep(5000);
                        System.out.println(br.ready());
                        while (br.ready()) {
                            System.out.println("LINE : " + br.readLine());
                        }
                        executorService.shutdown();
                        System.out.println("QUITTING");
                } catch (Exception e) {
                    throw e;
                }
                System.out.println("QUITTING FINALLY");
        } catch(Exception ex) {
            System.out.println(ex);
        }

    }

}

谢谢

其中似乎有几个不同的问题。

  • 程序挂起,因为 ExecutorService 中的线程使它保持活动状态;提交任务后调用 shutdown() 会导致它按预期终止。
  • 什么都没有被写入的主要原因是 copyIn() 抛出异常:流中尾随的换行符 (bw.write("\n")) 触发了 ERROR: invalid input syntax for integer: "",因为它找不到 id列。

即便如此,由于资源清理的时间安排,看起来这仍然会受到一些竞争条件的影响。 copyIn() 调用将阻塞,直到它到达其 InputStream 的末尾,在 PipedInputStream 的情况下,“结束”是 PipedOutputStream 关闭的点.但是在流关闭并且 copyIn() 调用解除阻塞后,输入流和数据库连接会快速连续关闭,可能在副本有机会完成之前关闭。充其量,它似乎成功提交到 table,但随后出现“取消复制操作时数据库连接失败”的错误。

为确保这些资源在仍在使用时不会被释放:

  • 等待作者完成
  • 关闭OutputStream
  • 等待复印机完成
  • 关闭 InputStream / Connection

等待任务完成的额外好处是可以将任何异常传播到主线程。

由于 newSingleThreadExecutor() 还有一个潜在的死锁:如果写入线程填充了管道的缓冲区,它会阻塞直到 reader 开始使用数据,如果他们'顺序执行。使用 newFixedThreadPool(2) 应该可以解决这个问题。

考虑到所有这些:

  public static void main(String[] args) {
    ExecutorService executorService = Executors.newFixedThreadPool(2);
    try {
      try (Connection connection = DriverManager.getConnection("jdbc:postgresql://XX.XX.XX.XX:9432/somedb", "someadmin", "somepassword");
          BaseConnection pgcon = (BaseConnection) connection;
          PipedInputStream is = new PipedInputStream();
          BufferedReader br = new BufferedReader(new InputStreamReader(is));
      ) {
        Future write;
        Future copy;
        try (
            PipedOutputStream os = new PipedOutputStream(is);
            BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(os))) {
          write = executorService.submit(() -> {
            String frmtStr = "%s\t{\"id\":%s, \"somefield\":\"%s\"}\n";
            String row = null;
            for (int i = 1; i < 1000; i++) {
              row = String.format(frmtStr, i, i, ("row" + i));
              System.out.print(row);
              bw.write(row);
            }
            bw.flush();
            System.out.println("WRITTEN!");
            return true;
          });
          System.out.println(connection);
          CopyManager copyManager = new CopyManager(pgcon);
          String copySql = "COPY dcm.testtbl FROM STDIN";
          copy = executorService.submit(() -> copyManager.copyIn(copySql, br));
          System.out.println("QUITTING");
          write.get();
        }
        copy.get();
      }
    } catch (Exception ex) {
      System.out.println(ex);
    } finally {
      executorService.shutdown();
    }
  }