我可以使用 Parallel.For 和 sql 命令吗?

Can I use Parallel.For with sql Commands?

我有一个class范围

public class avl_range
{
    public long start { get; set; }
    public long end { get; set; }
}

如果我使用正常 FOR 完美,但必须等待每个命令完成并且每个查询需要 8 秒,所以 10 个查询需要 80 秒。

在并行版本中,如果我只打印范围,则效果很好,但如果尝试执行命令,则表示已经在进行中。

{"An operation is already in progress."}

我该如何解决?

var numbers = new List<avl_range>();
using (var conn = new NpgsqlConnection(strConnection))
    {
        conn.Open();

        Action<avl_range> forEachLoop = number => //Begin definition of forLoop
        {
             // only the console write line works ok
            Console.WriteLine(number.start + " - " + number.end);

            using (var cmd = new NpgsqlCommand())
            {
                cmd.Connection = conn;                            
                cmd.CommandText = String.Format( "SELECT * FROM avl_db.process_near_link({0}, {1});"
                                                 , number.start
                                                 , number.end);
                // here cause the error.
                using (var reader = cmd.ExecuteReader())
                {
                    while (reader.Read())
                    {
                        Console.WriteLine(reader.GetString(0));
                    }
                }
            }
        };

        Parallel.ForEach(numbers, forEachLoop);
    }
 );

仅供参考:我正在尝试解决这个问题 post before

不能同时使用 Npgsql 连接 - 在任何给定时间点只能有一个命令 运行(换句话说,不支持 MARS)。

打开多个连接以并行执行查询绝对有意义。尽管建立新的物理连接成本很高,但连接池非常轻量级,因此重用物理连接的开销非常小。不这样做的主要原因是如果您需要多个操作在同一个事务中。

即使您可以让它与 MARS 一起工作,连接对象也几乎永远不是线程安全的,您需要每个线程都有一个连接。 Parallel.ForEach has overloads to make this easy 在线程开始和结束时具有 运行 的函数。

var numbers = new List<avl_range>();

Func<NpgsqlConnection> localInit => () => 
{
    var conn = new NpgsqlConnection(strConnection);
    conn.Open();
};

Action<NpgsqlConnection> localFinally = (conn) => conn.Dispose();

Func<avl_range, ParallelLoopState, NpgsqlConnection, NpgsqlConnection> forEachLoop = (number, loopState, conn) => //Begin definition of forLoop
{
     // only the console write line works ok
    Console.WriteLine(number.start + " - " + number.end);

    using (var cmd = new NpgsqlCommand())
    {
        cmd.Connection = conn;                            
        cmd.CommandText = String.Format( "SELECT * FROM avl_db.process_near_link({0}, {1});"
                                         , number.start
                                         , number.end);
        // here cause the error.
        using (var reader = cmd.ExecuteReader())
        {
            while (reader.Read())
            {
                Console.WriteLine(reader.GetString(0));
            }
        }
    }
    return conn;
};

Parallel.ForEach(numbers, localInit, forEachLoop, localFinally);

话虽这么说,但大多数时候对数据库进行并发连接并不是正确的想法,瓶颈可能在其他地方,您应该使用探查器来查看真正减慢程序速度的原因,并将您的精力集中在那里.


评论示例代码:

var numbers = GetDataForNumbers();
List<string> results = new List<string>();

Func<List<string>> localInit => () => new List<string>();

Func<avl_range, ParallelLoopState, List<string>, List<string>> forEachLoop = (number, loopState, localList) => //Begin definition of forLoop
{
    using (var conn = new NpgsqlConnection(strConnection))
    {
        conn.Open();

        //This line is going to slow your program down a lot, so i commented it out.
        //Console.WriteLine(number.start + " - " + number.end);

        using (var cmd = new NpgsqlCommand())
        {
            cmd.Connection = conn;                            
            cmd.CommandText = String.Format( "SELECT * FROM avl_db.process_near_link({0}, {1});"
                                             , number.start
                                             , number.end);
            using (var reader = cmd.ExecuteReader())
            {
                while (reader.Read())
                {
                    //Add a object to the thread local list, we don't need to lock here because we are the only thread with access to it.
                    localList.Add(reader.GetString(0));
                }
            }
        }
    }
    return localList;
};

Action<List<String>> localFinally = localList => 
{
    //Combine the local list to the main results, we need to lock here as more than one thread could be merging at once.
    lock(results)
    {
        results.AddRange(localList);
    }
};

Parallel.ForEach(numbers, localInit, forEachLoop, localFinally);

//results now contains strings from all the threads here.