Monet DB JDBC 并行 executeBatch 失败
Monet DB JDBC parallel executeBatch failing
在我的应用程序中,我想在不同的 table 中并行插入多行。为此,我在每个线程中创建准备好的语句,并使用 10K 作为批量大小的 exceuteBatch。我已将自动提交设为错误。在每个 executeBatch 之后,我使用 connection.commit 提交事务。在单线程中,此代码工作正常,但在多线程中,当它开始插入不同的 tables(每个线程中明显不同的 table)时,会出现提交失败异常。
请指导如何进行并行插入(请注意,所有线程在不同的 table 中工作,彼此之间没有 link)。
谢谢,
维卡斯
问题
这是我为从 monetDB 驱动程序获取并发冲突错误而创建的代码。
public static class TestRunner implements Runnable {
private final static String problemHere = "jdbc:monetdb://localhost/test-db";
static {
try {
Class.forName("nl.cwi.monetdb.jdbc.MonetDriver");
} catch (ClassNotFoundException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
private static Connection createConnection() throws SQLException {
// happens one time per thread
return DriverManager.getConnection(problemHere, "myID", "myPass");
}
private static int fieldCount = 10;
private static int[] colType = new int[fieldCount];
private static final int[] types = { Types.VARCHAR, Types.TIMESTAMP, Types.DECIMAL };
static {
Random random = new Random();
// initialize column types
for (int i = 0; i < fieldCount; i++) {
colType[i] = types[random.nextInt(types.length)];
}
}
private final String name;
private Connection con;
private int batchCount;
private int batchSize;
private String create;
private String drop;
private String insert;
public TestRunner(String string, int bs, int bc) throws SQLException {
this.name = string;
this.con = createConnection();
this.batchCount = bc;
this.batchSize = bs;
this.create = "create table " + name + " (";
for (int i = 0; i < fieldCount; i++) {
create += (i == 0 ? "" : ",") + "col" + i + " " + getType(colType[i]);
}
this.create += ")";
this.insert = "insert into " + name + " values (";
for (int i = 0; i < fieldCount; i++) {
insert += (i == 0 ? "" : ",") + "?";
}
this.insert += ")";
this.drop = "drop table " + name;
}
private static String getType(int i) {
switch (i) {
case Types.DECIMAL:
return "decimal(18,9)";
case Types.VARCHAR:
return "varchar(30000)";
case Types.TIMESTAMP:
return "timestamp";
}
return null;
}
protected void finalize() throws Throwable {
if (con != null) {
con.close();
}
};
public void run() {
System.out.format("%s started.%ncreate: %s%ninsert: %s%ndrop: %s%n", name, create, insert, drop);
try (Statement stmt = con.createStatement()) {
// this will throw the exception of concurrency conflict
System.out.format("%d created %s%n", stmt.executeUpdate(create), name);
if (!con.getAutoCommit()) {
con.commit();
}
} catch (SQLException e1) {
e1.printStackTrace();
return;
}
try (PreparedStatement stmt = con.prepareStatement(insert)) {
while (batchCount-- > 0) {
for (int i = batchSize; i > 0; i--) {
setBatchInput(stmt);
stmt.addBatch();
}
System.out.format("%s - submitting batch ...%n", name);
stmt.executeBatch();
if (!con.getAutoCommit()) {
con.commit();
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
if (con != null) {
try (Statement stmt = con.createStatement()) {
System.out.format("%d deleted - %s%n", stmt.executeUpdate(drop), name);
if (!con.getAutoCommit()) {
con.commit();
}
} catch (SQLException e1) {
e1.printStackTrace();
}
try {
con.close();
con = null;
} catch (SQLException e) {
e.printStackTrace();
}
}
}
System.out.format("%s finished.", name);
}
private static void setBatchInput(PreparedStatement stmt) throws SQLException {
for (int i = 1; i <= fieldCount; i++) {
stmt.setObject(i, getRandomFieldValue(colType[i - 1]));
}
}
private static Object getRandomFieldValue(int type) {
switch (type) {
case Types.DECIMAL:
return 0;
case Types.VARCHAR:
return "null";
case Types.TIMESTAMP:
return new Timestamp(System.currentTimeMillis());
}
return null;
}
}
public static void main(String[] args) {
int num = 2;
List<Thread> ts = new ArrayList<>();
while (num-- > 0) {
Thread t = null;
try {
(t = new Thread(new TestRunner("runner_" + num, 10000, 1))).start();
} catch (SQLException e) {
e.printStackTrace();
}
ts.add(t);
}
for (Thread t : ts) {
try {
t.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
这是我得到的例外
java.sql.SQLException: COMMIT: transaction is aborted because of concurrency conflicts, will ROLLBACK instead
at nl.cwi.monetdb.jdbc.MonetConnection$ResponseList.executeQuery(MonetConnection.java:2597)
at nl.cwi.monetdb.jdbc.MonetConnection$ResponseList.processQuery(MonetConnection.java:2345)
at nl.cwi.monetdb.jdbc.MonetStatement.internalExecute(MonetStatement.java:507)
at nl.cwi.monetdb.jdbc.MonetStatement.execute(MonetStatement.java:345)
at nl.cwi.monetdb.jdbc.MonetStatement.executeUpdate(MonetStatement.java:545)
at monet.test.BatchTest$TestRunner.run(BatchTest.java:92)
at java.lang.Thread.run(Thread.java:745)
如果我同步创建,那么在一个线程中会出现批处理失败 -
java.sql.SQLException: EXEC: no prepared statement with id: 2no prepared statement with id: 2
根据 this thread!
If you perform a schema update/change, MonetDB
releases all prepared handles, because they possibly are no longer
correct.
You need to re-execute your prepare command.
解决方案
使用MonetDB bulk data load的功能!您可以直接将数据写入服务器而不是使用 JDBC (即使并行打开连接)
以下是可以让您直接测试它的代码
public static class TestRunner implements Runnable {
private final static String problemHere = "jdbc:monetdb://localhost:50000/test-db";
static {
try {
Class.forName("nl.cwi.monetdb.jdbc.MonetDriver");
} catch (ClassNotFoundException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
private static Connection createConnection() throws SQLException {
// happens one time per thread
return DriverManager.getConnection(problemHere, "monetdb", "monetdb");
}
private static int fieldCount = 10;
private static int[] colType = new int[fieldCount];
private static final int[] types = { Types.VARCHAR, Types.TIMESTAMP, Types.DECIMAL };
static {
Random random = new Random();
// initialize column types
for (int i = 0; i < fieldCount; i++) {
colType[i] = types[random.nextInt(types.length)];
}
}
private final String name;
private Connection con;
private int batchCount;
private int batchSize;
private String create;
private String drop;
private String insert;
public TestRunner(String string, int bs, int bc) throws SQLException {
this.name = string;
this.con = createConnection();
this.batchCount = bc;
this.batchSize = bs;
this.create = "create table " + name + " (";
for (int i = 0; i < fieldCount; i++) {
create += (i == 0 ? "" : ",") + "col" + i + " " + getType(colType[i]);
}
this.create += ")";
this.insert = "insert into " + name + " values (";
for (int i = 0; i < fieldCount; i++) {
insert += (i == 0 ? "" : ",") + "?";
}
this.insert += ")";
this.drop = "drop table " + name;
}
private static String getType(int i) {
switch (i) {
case Types.DECIMAL:
return "decimal(18,9)";
case Types.VARCHAR:
return "varchar(30000)";
case Types.TIMESTAMP:
return "timestamp";
}
return null;
}
protected void finalize() throws Throwable {
if (con != null) {
con.close();
}
};
public void run() {
System.out.format("%s started.%ncreate: %s%ninsert: %s%ndrop: %s%n", name, create, insert, drop);
try (Statement stmt = con.createStatement()) {
// this will throw the exception of concurrency con
synchronized (problemHere) {
System.out.format("%d created %s%n", stmt.executeUpdate(create), name);
if (!con.getAutoCommit()) {
con.commit();
}
}
} catch (SQLException e1) {
e1.printStackTrace();
return;
}
MapiSocket server = null;
try {
server = new MapiSocket();
server.setDatabase("test-db");
server.setLanguage("sql");
List<String> warning = server.connect("localhost", 50000, "monetdb", "monetdb");
if (warning != null) {
for (Iterator<String> it = warning.iterator(); it.hasNext();) {
System.out.println(it.next().toString());
}
}
String error = in.waitForPrompt();
if (error != null)
throw new Exception(error);
// try (PreparedStatement stmt = con.prepareStatement(insert)) {
while (batchCount-- > 0) {
BufferedMCLReader in = server.getReader();
BufferedMCLWriter out = server.getWriter();
String query = "COPY INTO " + name + " FROM STDIN USING DELIMITERS ',','\n';";
// the leading 's' is essential, since it is a protocol
// marker that should not be omitted, likewise the
// trailing semicolon
out.write('s');
out.write(query);
out.newLine();
for (int i = batchSize; i > 0; i--) {
// setBatchInput(stmt);
// stmt.addBatch();
insertBatchElement(out);
out.newLine();
}
System.out.format("%s - submitting batch ...%n", name);
// stmt.executeBatch();
out.writeLine(""); // need this one for synchronisation over
// flush()
out.flush();
error = in.waitForPrompt();
if (error != null)
throw new Exception(error);
out.close();
in.close();
if (!con.getAutoCommit()) {
con.commit();
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
if (con != null) {
try (Statement stmt = con.createStatement()) {
synchronized (problemHere) {
// System.out.format("%d deleted - %s%n",
// stmt.executeUpdate(drop), name);
if (!con.getAutoCommit()) {
con.commit();
}
}
} catch (SQLException e1) {
e1.printStackTrace();
}
try {
con.close();
con = null;
} catch (SQLException e) {
e.printStackTrace();
}
}
if (server != null) {
server.close();
}
}
System.out.format("%s finished.", name);
}
private void insertBatchElement(BufferedMCLWriter out) throws IOException {
String data = "";
for (int i = 1; i <= fieldCount; i++) {
data += (i == 1 ? "" : ",") + String.valueOf(getRandomFieldValue(colType[i - 1]));
}
out.write(data);
}
private static void setBatchInput(PreparedStatement stmt) throws SQLException {
for (int i = 1; i <= fieldCount; i++) {
stmt.setObject(i, getRandomFieldValue(colType[i - 1]));
}
}
private static Object getRandomFieldValue(int type) {
switch (type) {
case Types.DECIMAL:
return 0;
case Types.VARCHAR:
return "null";
case Types.TIMESTAMP:
return new Timestamp(System.currentTimeMillis());
}
return null;
}
}
public static void main(String[] args) {
int num = 2;
List<Thread> ts = new ArrayList<>();
while (num-- > 0) {
Thread t = null;
try {
(t = new Thread(new TestRunner("runner_" + num, 10000, 1))).start();
} catch (SQLException e) {
e.printStackTrace();
}
ts.add(t);
}
for (Thread t : ts) {
try {
t.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
在我的应用程序中,我想在不同的 table 中并行插入多行。为此,我在每个线程中创建准备好的语句,并使用 10K 作为批量大小的 exceuteBatch。我已将自动提交设为错误。在每个 executeBatch 之后,我使用 connection.commit 提交事务。在单线程中,此代码工作正常,但在多线程中,当它开始插入不同的 tables(每个线程中明显不同的 table)时,会出现提交失败异常。 请指导如何进行并行插入(请注意,所有线程在不同的 table 中工作,彼此之间没有 link)。
谢谢, 维卡斯
问题
这是我为从 monetDB 驱动程序获取并发冲突错误而创建的代码。
public static class TestRunner implements Runnable {
private final static String problemHere = "jdbc:monetdb://localhost/test-db";
static {
try {
Class.forName("nl.cwi.monetdb.jdbc.MonetDriver");
} catch (ClassNotFoundException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
private static Connection createConnection() throws SQLException {
// happens one time per thread
return DriverManager.getConnection(problemHere, "myID", "myPass");
}
private static int fieldCount = 10;
private static int[] colType = new int[fieldCount];
private static final int[] types = { Types.VARCHAR, Types.TIMESTAMP, Types.DECIMAL };
static {
Random random = new Random();
// initialize column types
for (int i = 0; i < fieldCount; i++) {
colType[i] = types[random.nextInt(types.length)];
}
}
private final String name;
private Connection con;
private int batchCount;
private int batchSize;
private String create;
private String drop;
private String insert;
public TestRunner(String string, int bs, int bc) throws SQLException {
this.name = string;
this.con = createConnection();
this.batchCount = bc;
this.batchSize = bs;
this.create = "create table " + name + " (";
for (int i = 0; i < fieldCount; i++) {
create += (i == 0 ? "" : ",") + "col" + i + " " + getType(colType[i]);
}
this.create += ")";
this.insert = "insert into " + name + " values (";
for (int i = 0; i < fieldCount; i++) {
insert += (i == 0 ? "" : ",") + "?";
}
this.insert += ")";
this.drop = "drop table " + name;
}
private static String getType(int i) {
switch (i) {
case Types.DECIMAL:
return "decimal(18,9)";
case Types.VARCHAR:
return "varchar(30000)";
case Types.TIMESTAMP:
return "timestamp";
}
return null;
}
protected void finalize() throws Throwable {
if (con != null) {
con.close();
}
};
public void run() {
System.out.format("%s started.%ncreate: %s%ninsert: %s%ndrop: %s%n", name, create, insert, drop);
try (Statement stmt = con.createStatement()) {
// this will throw the exception of concurrency conflict
System.out.format("%d created %s%n", stmt.executeUpdate(create), name);
if (!con.getAutoCommit()) {
con.commit();
}
} catch (SQLException e1) {
e1.printStackTrace();
return;
}
try (PreparedStatement stmt = con.prepareStatement(insert)) {
while (batchCount-- > 0) {
for (int i = batchSize; i > 0; i--) {
setBatchInput(stmt);
stmt.addBatch();
}
System.out.format("%s - submitting batch ...%n", name);
stmt.executeBatch();
if (!con.getAutoCommit()) {
con.commit();
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
if (con != null) {
try (Statement stmt = con.createStatement()) {
System.out.format("%d deleted - %s%n", stmt.executeUpdate(drop), name);
if (!con.getAutoCommit()) {
con.commit();
}
} catch (SQLException e1) {
e1.printStackTrace();
}
try {
con.close();
con = null;
} catch (SQLException e) {
e.printStackTrace();
}
}
}
System.out.format("%s finished.", name);
}
private static void setBatchInput(PreparedStatement stmt) throws SQLException {
for (int i = 1; i <= fieldCount; i++) {
stmt.setObject(i, getRandomFieldValue(colType[i - 1]));
}
}
private static Object getRandomFieldValue(int type) {
switch (type) {
case Types.DECIMAL:
return 0;
case Types.VARCHAR:
return "null";
case Types.TIMESTAMP:
return new Timestamp(System.currentTimeMillis());
}
return null;
}
}
public static void main(String[] args) {
int num = 2;
List<Thread> ts = new ArrayList<>();
while (num-- > 0) {
Thread t = null;
try {
(t = new Thread(new TestRunner("runner_" + num, 10000, 1))).start();
} catch (SQLException e) {
e.printStackTrace();
}
ts.add(t);
}
for (Thread t : ts) {
try {
t.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
这是我得到的例外
java.sql.SQLException: COMMIT: transaction is aborted because of concurrency conflicts, will ROLLBACK instead
at nl.cwi.monetdb.jdbc.MonetConnection$ResponseList.executeQuery(MonetConnection.java:2597)
at nl.cwi.monetdb.jdbc.MonetConnection$ResponseList.processQuery(MonetConnection.java:2345)
at nl.cwi.monetdb.jdbc.MonetStatement.internalExecute(MonetStatement.java:507)
at nl.cwi.monetdb.jdbc.MonetStatement.execute(MonetStatement.java:345)
at nl.cwi.monetdb.jdbc.MonetStatement.executeUpdate(MonetStatement.java:545)
at monet.test.BatchTest$TestRunner.run(BatchTest.java:92)
at java.lang.Thread.run(Thread.java:745)
如果我同步创建,那么在一个线程中会出现批处理失败 -
java.sql.SQLException: EXEC: no prepared statement with id: 2no prepared statement with id: 2
根据 this thread!
If you perform a schema update/change, MonetDB releases all prepared handles, because they possibly are no longer correct. You need to re-execute your prepare command.
解决方案
使用MonetDB bulk data load的功能!您可以直接将数据写入服务器而不是使用 JDBC (即使并行打开连接) 以下是可以让您直接测试它的代码
public static class TestRunner implements Runnable {
private final static String problemHere = "jdbc:monetdb://localhost:50000/test-db";
static {
try {
Class.forName("nl.cwi.monetdb.jdbc.MonetDriver");
} catch (ClassNotFoundException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
private static Connection createConnection() throws SQLException {
// happens one time per thread
return DriverManager.getConnection(problemHere, "monetdb", "monetdb");
}
private static int fieldCount = 10;
private static int[] colType = new int[fieldCount];
private static final int[] types = { Types.VARCHAR, Types.TIMESTAMP, Types.DECIMAL };
static {
Random random = new Random();
// initialize column types
for (int i = 0; i < fieldCount; i++) {
colType[i] = types[random.nextInt(types.length)];
}
}
private final String name;
private Connection con;
private int batchCount;
private int batchSize;
private String create;
private String drop;
private String insert;
public TestRunner(String string, int bs, int bc) throws SQLException {
this.name = string;
this.con = createConnection();
this.batchCount = bc;
this.batchSize = bs;
this.create = "create table " + name + " (";
for (int i = 0; i < fieldCount; i++) {
create += (i == 0 ? "" : ",") + "col" + i + " " + getType(colType[i]);
}
this.create += ")";
this.insert = "insert into " + name + " values (";
for (int i = 0; i < fieldCount; i++) {
insert += (i == 0 ? "" : ",") + "?";
}
this.insert += ")";
this.drop = "drop table " + name;
}
private static String getType(int i) {
switch (i) {
case Types.DECIMAL:
return "decimal(18,9)";
case Types.VARCHAR:
return "varchar(30000)";
case Types.TIMESTAMP:
return "timestamp";
}
return null;
}
protected void finalize() throws Throwable {
if (con != null) {
con.close();
}
};
public void run() {
System.out.format("%s started.%ncreate: %s%ninsert: %s%ndrop: %s%n", name, create, insert, drop);
try (Statement stmt = con.createStatement()) {
// this will throw the exception of concurrency con
synchronized (problemHere) {
System.out.format("%d created %s%n", stmt.executeUpdate(create), name);
if (!con.getAutoCommit()) {
con.commit();
}
}
} catch (SQLException e1) {
e1.printStackTrace();
return;
}
MapiSocket server = null;
try {
server = new MapiSocket();
server.setDatabase("test-db");
server.setLanguage("sql");
List<String> warning = server.connect("localhost", 50000, "monetdb", "monetdb");
if (warning != null) {
for (Iterator<String> it = warning.iterator(); it.hasNext();) {
System.out.println(it.next().toString());
}
}
String error = in.waitForPrompt();
if (error != null)
throw new Exception(error);
// try (PreparedStatement stmt = con.prepareStatement(insert)) {
while (batchCount-- > 0) {
BufferedMCLReader in = server.getReader();
BufferedMCLWriter out = server.getWriter();
String query = "COPY INTO " + name + " FROM STDIN USING DELIMITERS ',','\n';";
// the leading 's' is essential, since it is a protocol
// marker that should not be omitted, likewise the
// trailing semicolon
out.write('s');
out.write(query);
out.newLine();
for (int i = batchSize; i > 0; i--) {
// setBatchInput(stmt);
// stmt.addBatch();
insertBatchElement(out);
out.newLine();
}
System.out.format("%s - submitting batch ...%n", name);
// stmt.executeBatch();
out.writeLine(""); // need this one for synchronisation over
// flush()
out.flush();
error = in.waitForPrompt();
if (error != null)
throw new Exception(error);
out.close();
in.close();
if (!con.getAutoCommit()) {
con.commit();
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
if (con != null) {
try (Statement stmt = con.createStatement()) {
synchronized (problemHere) {
// System.out.format("%d deleted - %s%n",
// stmt.executeUpdate(drop), name);
if (!con.getAutoCommit()) {
con.commit();
}
}
} catch (SQLException e1) {
e1.printStackTrace();
}
try {
con.close();
con = null;
} catch (SQLException e) {
e.printStackTrace();
}
}
if (server != null) {
server.close();
}
}
System.out.format("%s finished.", name);
}
private void insertBatchElement(BufferedMCLWriter out) throws IOException {
String data = "";
for (int i = 1; i <= fieldCount; i++) {
data += (i == 1 ? "" : ",") + String.valueOf(getRandomFieldValue(colType[i - 1]));
}
out.write(data);
}
private static void setBatchInput(PreparedStatement stmt) throws SQLException {
for (int i = 1; i <= fieldCount; i++) {
stmt.setObject(i, getRandomFieldValue(colType[i - 1]));
}
}
private static Object getRandomFieldValue(int type) {
switch (type) {
case Types.DECIMAL:
return 0;
case Types.VARCHAR:
return "null";
case Types.TIMESTAMP:
return new Timestamp(System.currentTimeMillis());
}
return null;
}
}
public static void main(String[] args) {
int num = 2;
List<Thread> ts = new ArrayList<>();
while (num-- > 0) {
Thread t = null;
try {
(t = new Thread(new TestRunner("runner_" + num, 10000, 1))).start();
} catch (SQLException e) {
e.printStackTrace();
}
ts.add(t);
}
for (Thread t : ts) {
try {
t.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}