如何使用 Java 将 CSV 文件复制到 Snowflake DB
How do I copy a CSV file into Snowflake DB using Java
我正在尝试编写一个工具来将 CSV 文件加载到多个数据库中。
在尝试在线搜索如何在 Snowflake 中使用 COPY 命令时,我无法在 Java 中找到如何执行此操作的信息。这就是我使用 PostgreSQL
的方式
public void loadData(Message message) throws Exception {
try (Connection connection = DriverManager.getConnection(message.getJdbcUrl(),
message.getUser(), message.password)) {
loadDataWithMode(loadRequest, (BaseConnection) connection);
} catch (Throwable error){
throw error;
}
}
public void loadDataWithMode(Message message, BaseConnection connection) throws Exception {
CopyManager copyManager = new CopyManager(connection);
String fields = message.getFields();
final String targetTableWithFields = message.getTableName() + "(" + fields + ")";
try (InputStream input = fileService.load(loadRequest.getFilePath())) {
try (InputStreamReader reader = new InputStreamReader(input, "UTF-8")) {
copyManager.copyIn("COPY " + targetTableWithFields + " from STDIN
}
}
}
我不熟悉 Snowflake 的操作方法,如有任何帮助,我们将不胜感激。
public void loadDataWithMode(Message message, Connection connection) throws Exception {
String fields = message.getFields();
final String targetTableWithFields = message.getTableName() + "(" + fields + ")";
LOG.info("about to copy data into table: " + targetTableWithFields);
try (Statement statement = connection.createStatement()) {
final SnowflakeConnectionV1 snowflakeConnectionV1 = (SnowflakeConnectionV1) connection;
final File tempFile = fileSystemService.asLocalFile(message.getFilePath());
try (Statement stmt = snowflakeConnectionV1.createStatement(); InputStream inputStream = new FileInputStream(tempFile)) {
final String createStage = buildCreateStageStatement();
LOG.info("Executing sql:{}", createStage);
stmt.execute(createStage);
LOG.info("Create stage was successfully executed");
snowflakeConnectionV1.uploadStream("COPYIN_STAGE", "", inputStream, tempFile.getName(), false);
LOG.info("Upload stream was successfully executed");
stmt.execute("USE WAREHOUSE "+ message.getExportConnectionDetails().getWarehouse());
LOG.info("Warehouse was successfully set to: "+message.getExportConnectionDetails().getWarehouse());
final boolean purgeData = !(message.getLoadMode() == LoadMode.INCREMENTAL);
String sql = String.format("copy into %s(%s) from @COPYIN_STAGE/%s file_format = ( type='CSV', skip_header=1) purge=" + purgeData + " ", message.getTableName(), fields, tempFile.getName());
LOG.info("Executing sql:{}", sql);
stmt.execute(sql);
}
connection.commit();
LOG.info("data was successfully copied " + targetTableWithFields);
}
}
private String buildCreateStageStatement() {
return "CREATE OR REPLACE TEMPORARY STAGE COPYIN_STAGE " + "file_format = ( type ='CSV')";
}
我正在尝试编写一个工具来将 CSV 文件加载到多个数据库中。 在尝试在线搜索如何在 Snowflake 中使用 COPY 命令时,我无法在 Java 中找到如何执行此操作的信息。这就是我使用 PostgreSQL
的方式public void loadData(Message message) throws Exception {
try (Connection connection = DriverManager.getConnection(message.getJdbcUrl(),
message.getUser(), message.password)) {
loadDataWithMode(loadRequest, (BaseConnection) connection);
} catch (Throwable error){
throw error;
}
}
public void loadDataWithMode(Message message, BaseConnection connection) throws Exception {
CopyManager copyManager = new CopyManager(connection);
String fields = message.getFields();
final String targetTableWithFields = message.getTableName() + "(" + fields + ")";
try (InputStream input = fileService.load(loadRequest.getFilePath())) {
try (InputStreamReader reader = new InputStreamReader(input, "UTF-8")) {
copyManager.copyIn("COPY " + targetTableWithFields + " from STDIN
}
}
}
我不熟悉 Snowflake 的操作方法,如有任何帮助,我们将不胜感激。
public void loadDataWithMode(Message message, Connection connection) throws Exception {
String fields = message.getFields();
final String targetTableWithFields = message.getTableName() + "(" + fields + ")";
LOG.info("about to copy data into table: " + targetTableWithFields);
try (Statement statement = connection.createStatement()) {
final SnowflakeConnectionV1 snowflakeConnectionV1 = (SnowflakeConnectionV1) connection;
final File tempFile = fileSystemService.asLocalFile(message.getFilePath());
try (Statement stmt = snowflakeConnectionV1.createStatement(); InputStream inputStream = new FileInputStream(tempFile)) {
final String createStage = buildCreateStageStatement();
LOG.info("Executing sql:{}", createStage);
stmt.execute(createStage);
LOG.info("Create stage was successfully executed");
snowflakeConnectionV1.uploadStream("COPYIN_STAGE", "", inputStream, tempFile.getName(), false);
LOG.info("Upload stream was successfully executed");
stmt.execute("USE WAREHOUSE "+ message.getExportConnectionDetails().getWarehouse());
LOG.info("Warehouse was successfully set to: "+message.getExportConnectionDetails().getWarehouse());
final boolean purgeData = !(message.getLoadMode() == LoadMode.INCREMENTAL);
String sql = String.format("copy into %s(%s) from @COPYIN_STAGE/%s file_format = ( type='CSV', skip_header=1) purge=" + purgeData + " ", message.getTableName(), fields, tempFile.getName());
LOG.info("Executing sql:{}", sql);
stmt.execute(sql);
}
connection.commit();
LOG.info("data was successfully copied " + targetTableWithFields);
}
}
private String buildCreateStageStatement() {
return "CREATE OR REPLACE TEMPORARY STAGE COPYIN_STAGE " + "file_format = ( type ='CSV')";
}