多线程 NpgsqlConnections 和读取器产生重复结果。 C#
Multi-threading NpgsqlConnections and readers producing duplicate results. C#
我有数千个查询需要 运行 对超过 10 个数据库的数千个不同模式进行查询。我正在尝试将这些查询线程化并使用 BlockingCollection 将结果写入其中,同时还使用另一个线程从该集合中读取并将其写入磁盘,因为这些查询的结果集太大而无法存储在内存中。
这是我的代码中的问题区域:
public class Node {
public string ConnectionString;
public string Query;
public Node(string databaseDetails, string query) {
//Cannot put in actual logic, but this part is fine
ConnectionString = {logic for connection string}
Query = "set search_path to {schema from databaseDetails};" + query
}
}
public void runQuery(string query, BlockingCollection<Dictionary<string, object>> producer) {
List<Node> nodes = getNodes(query);
Parallel.ForEach(nodes, node => {
NpgsqlConnection conn = new NpgsqlConnection(node.ConnectionString);
conn.Open();
NpgsqlCommand npgQuery = new NpgsqlCommand(node.Query, conn);
NpgsqlDataReader reader = npgQuery.ExecuteReader();
while (reader.Read()) {
Dictionary<string, object> row = new Dictionary<string, object>();
for (int i = 0; i < reader.FieldCount; i++) {
row[reader.GetName(i)] = reader.GetValue(i);
}
producer.Add(row);
}
conn.Close();
});
producer.CompleteAdding();
}
此代码 运行s,并检索所有结果,但它也重复了很多结果,因此阻塞集合的记录数比应有的多 5-10 倍。任何帮助将不胜感激。
所以我只是个白痴,将我生成的结果集与我运行所有查询的 UNION 进行比较,而不是 UNION ALL,所以我的 "true" 结果集在这是因为工会要删除它们:/
我有数千个查询需要 运行 对超过 10 个数据库的数千个不同模式进行查询。我正在尝试将这些查询线程化并使用 BlockingCollection 将结果写入其中,同时还使用另一个线程从该集合中读取并将其写入磁盘,因为这些查询的结果集太大而无法存储在内存中。
这是我的代码中的问题区域:
public class Node {
public string ConnectionString;
public string Query;
public Node(string databaseDetails, string query) {
//Cannot put in actual logic, but this part is fine
ConnectionString = {logic for connection string}
Query = "set search_path to {schema from databaseDetails};" + query
}
}
public void runQuery(string query, BlockingCollection<Dictionary<string, object>> producer) {
List<Node> nodes = getNodes(query);
Parallel.ForEach(nodes, node => {
NpgsqlConnection conn = new NpgsqlConnection(node.ConnectionString);
conn.Open();
NpgsqlCommand npgQuery = new NpgsqlCommand(node.Query, conn);
NpgsqlDataReader reader = npgQuery.ExecuteReader();
while (reader.Read()) {
Dictionary<string, object> row = new Dictionary<string, object>();
for (int i = 0; i < reader.FieldCount; i++) {
row[reader.GetName(i)] = reader.GetValue(i);
}
producer.Add(row);
}
conn.Close();
});
producer.CompleteAdding();
}
此代码 运行s,并检索所有结果,但它也重复了很多结果,因此阻塞集合的记录数比应有的多 5-10 倍。任何帮助将不胜感激。
所以我只是个白痴,将我生成的结果集与我运行所有查询的 UNION 进行比较,而不是 UNION ALL,所以我的 "true" 结果集在这是因为工会要删除它们:/