我可以使用 Parallel.For 和 sql 命令吗?
Can I use Parallel.For with sql Commands?
我有一个class范围
public class avl_range
{
public long start { get; set; }
public long end { get; set; }
}
如果我使用正常 FOR
完美,但必须等待每个命令完成并且每个查询需要 8 秒,所以 10 个查询需要 80 秒。
在并行版本中,如果我只打印范围,则效果很好,但如果尝试执行命令,则表示已经在进行中。
{"An operation is already in progress."}
我该如何解决?
var numbers = new List<avl_range>();
using (var conn = new NpgsqlConnection(strConnection))
{
conn.Open();
Action<avl_range> forEachLoop = number => //Begin definition of forLoop
{
// only the console write line works ok
Console.WriteLine(number.start + " - " + number.end);
using (var cmd = new NpgsqlCommand())
{
cmd.Connection = conn;
cmd.CommandText = String.Format( "SELECT * FROM avl_db.process_near_link({0}, {1});"
, number.start
, number.end);
// here cause the error.
using (var reader = cmd.ExecuteReader())
{
while (reader.Read())
{
Console.WriteLine(reader.GetString(0));
}
}
}
};
Parallel.ForEach(numbers, forEachLoop);
}
);
仅供参考:我正在尝试解决这个问题 post before
不能同时使用 Npgsql 连接 - 在任何给定时间点只能有一个命令 运行(换句话说,不支持 MARS)。
打开多个连接以并行执行查询绝对有意义。尽管建立新的物理连接成本很高,但连接池非常轻量级,因此重用物理连接的开销非常小。不这样做的主要原因是如果您需要多个操作在同一个事务中。
即使您可以让它与 MARS 一起工作,连接对象也几乎永远不是线程安全的,您需要每个线程都有一个连接。 Parallel.ForEach has overloads to make this easy 在线程开始和结束时具有 运行 的函数。
var numbers = new List<avl_range>();
Func<NpgsqlConnection> localInit => () =>
{
var conn = new NpgsqlConnection(strConnection);
conn.Open();
};
Action<NpgsqlConnection> localFinally = (conn) => conn.Dispose();
Func<avl_range, ParallelLoopState, NpgsqlConnection, NpgsqlConnection> forEachLoop = (number, loopState, conn) => //Begin definition of forLoop
{
// only the console write line works ok
Console.WriteLine(number.start + " - " + number.end);
using (var cmd = new NpgsqlCommand())
{
cmd.Connection = conn;
cmd.CommandText = String.Format( "SELECT * FROM avl_db.process_near_link({0}, {1});"
, number.start
, number.end);
// here cause the error.
using (var reader = cmd.ExecuteReader())
{
while (reader.Read())
{
Console.WriteLine(reader.GetString(0));
}
}
}
return conn;
};
Parallel.ForEach(numbers, localInit, forEachLoop, localFinally);
话虽这么说,但大多数时候对数据库进行并发连接并不是正确的想法,瓶颈可能在其他地方,您应该使用探查器来查看真正减慢程序速度的原因,并将您的精力集中在那里.
评论示例代码:
var numbers = GetDataForNumbers();
List<string> results = new List<string>();
Func<List<string>> localInit => () => new List<string>();
Func<avl_range, ParallelLoopState, List<string>, List<string>> forEachLoop = (number, loopState, localList) => //Begin definition of forLoop
{
using (var conn = new NpgsqlConnection(strConnection))
{
conn.Open();
//This line is going to slow your program down a lot, so i commented it out.
//Console.WriteLine(number.start + " - " + number.end);
using (var cmd = new NpgsqlCommand())
{
cmd.Connection = conn;
cmd.CommandText = String.Format( "SELECT * FROM avl_db.process_near_link({0}, {1});"
, number.start
, number.end);
using (var reader = cmd.ExecuteReader())
{
while (reader.Read())
{
//Add a object to the thread local list, we don't need to lock here because we are the only thread with access to it.
localList.Add(reader.GetString(0));
}
}
}
}
return localList;
};
Action<List<String>> localFinally = localList =>
{
//Combine the local list to the main results, we need to lock here as more than one thread could be merging at once.
lock(results)
{
results.AddRange(localList);
}
};
Parallel.ForEach(numbers, localInit, forEachLoop, localFinally);
//results now contains strings from all the threads here.
我有一个class范围
public class avl_range
{
public long start { get; set; }
public long end { get; set; }
}
如果我使用正常 FOR
完美,但必须等待每个命令完成并且每个查询需要 8 秒,所以 10 个查询需要 80 秒。
在并行版本中,如果我只打印范围,则效果很好,但如果尝试执行命令,则表示已经在进行中。
{"An operation is already in progress."}
我该如何解决?
var numbers = new List<avl_range>();
using (var conn = new NpgsqlConnection(strConnection))
{
conn.Open();
Action<avl_range> forEachLoop = number => //Begin definition of forLoop
{
// only the console write line works ok
Console.WriteLine(number.start + " - " + number.end);
using (var cmd = new NpgsqlCommand())
{
cmd.Connection = conn;
cmd.CommandText = String.Format( "SELECT * FROM avl_db.process_near_link({0}, {1});"
, number.start
, number.end);
// here cause the error.
using (var reader = cmd.ExecuteReader())
{
while (reader.Read())
{
Console.WriteLine(reader.GetString(0));
}
}
}
};
Parallel.ForEach(numbers, forEachLoop);
}
);
仅供参考:我正在尝试解决这个问题 post before
不能同时使用 Npgsql 连接 - 在任何给定时间点只能有一个命令 运行(换句话说,不支持 MARS)。
打开多个连接以并行执行查询绝对有意义。尽管建立新的物理连接成本很高,但连接池非常轻量级,因此重用物理连接的开销非常小。不这样做的主要原因是如果您需要多个操作在同一个事务中。
即使您可以让它与 MARS 一起工作,连接对象也几乎永远不是线程安全的,您需要每个线程都有一个连接。 Parallel.ForEach has overloads to make this easy 在线程开始和结束时具有 运行 的函数。
var numbers = new List<avl_range>();
Func<NpgsqlConnection> localInit => () =>
{
var conn = new NpgsqlConnection(strConnection);
conn.Open();
};
Action<NpgsqlConnection> localFinally = (conn) => conn.Dispose();
Func<avl_range, ParallelLoopState, NpgsqlConnection, NpgsqlConnection> forEachLoop = (number, loopState, conn) => //Begin definition of forLoop
{
// only the console write line works ok
Console.WriteLine(number.start + " - " + number.end);
using (var cmd = new NpgsqlCommand())
{
cmd.Connection = conn;
cmd.CommandText = String.Format( "SELECT * FROM avl_db.process_near_link({0}, {1});"
, number.start
, number.end);
// here cause the error.
using (var reader = cmd.ExecuteReader())
{
while (reader.Read())
{
Console.WriteLine(reader.GetString(0));
}
}
}
return conn;
};
Parallel.ForEach(numbers, localInit, forEachLoop, localFinally);
话虽这么说,但大多数时候对数据库进行并发连接并不是正确的想法,瓶颈可能在其他地方,您应该使用探查器来查看真正减慢程序速度的原因,并将您的精力集中在那里.
评论示例代码:
var numbers = GetDataForNumbers();
List<string> results = new List<string>();
Func<List<string>> localInit => () => new List<string>();
Func<avl_range, ParallelLoopState, List<string>, List<string>> forEachLoop = (number, loopState, localList) => //Begin definition of forLoop
{
using (var conn = new NpgsqlConnection(strConnection))
{
conn.Open();
//This line is going to slow your program down a lot, so i commented it out.
//Console.WriteLine(number.start + " - " + number.end);
using (var cmd = new NpgsqlCommand())
{
cmd.Connection = conn;
cmd.CommandText = String.Format( "SELECT * FROM avl_db.process_near_link({0}, {1});"
, number.start
, number.end);
using (var reader = cmd.ExecuteReader())
{
while (reader.Read())
{
//Add a object to the thread local list, we don't need to lock here because we are the only thread with access to it.
localList.Add(reader.GetString(0));
}
}
}
}
return localList;
};
Action<List<String>> localFinally = localList =>
{
//Combine the local list to the main results, we need to lock here as more than one thread could be merging at once.
lock(results)
{
results.AddRange(localList);
}
};
Parallel.ForEach(numbers, localInit, forEachLoop, localFinally);
//results now contains strings from all the threads here.