npgsql:如何在一个命令中使用集合作为参数 select 多行(具有多个列值)?
npgsql: How to select multiple rows (with multiple column values) with npgsql in one command with a collection as a parameter?
我在下面定义了两个表,supplier_balances
和 supplier_balance_items
(顺便说一句,两者之间存在 1[supplier_balance]:N[supplier_balance_items]
关系):
CREATE TABLE IF NOT EXISTS sch_brand_payment_data_lake_proxy.supplier_balances (
/* id is here for joining purposes with items table, instead of joining with the 4 columns used for sake
of making sure a record is deemed as unique */
id bigserial NOT NULL,
accounting_document text NOT NULL,
accounting_document_type text NOT NULL,
company_code text NOT NULL,
document_date_year int4 NOT NULL,
accounting_doc_created_by_user text,
accounting_clerk text,
assignment_reference text,
document_reference_id text,
original_reference_document text,
payment_terms text,
supplier text,
supplier_name text,
document_date timestamp,
posting_date timestamp,
net_due_date timestamp,
created_on timestamp default NULL,
modified_on timestamp default NULL,
pushed_on timestamp default NULL,
is_modified bool GENERATED ALWAYS AS (modified_on IS NOT NULL AND modified_on > created_on) STORED,
is_pushed bool GENERATED ALWAYS AS (pushed_on IS NOT NULL AND pushed_on > modified_on) STORED,
CONSTRAINT supplier_balances_pkey PRIMARY KEY (id),
/* accounting_document being the field of the composite unique index -> faster querying */
CONSTRAINT supplier_balances_unique UNIQUE (
accounting_document,
accounting_document_type,
company_code,
document_date_year)
);
/* Creating other indexes for querying of those as well */
CREATE INDEX IF NOT EXISTS supplier_balances_accounting_document_type_idx
ON sch_brand_payment_data_lake_proxy.supplier_balances (accounting_document_type);
CREATE INDEX IF NOT EXISTS supplier_balances_company_code_idx
ON sch_brand_payment_data_lake_proxy.supplier_balances (company_code);
CREATE INDEX IF NOT EXISTS supplier_balances_document_date_year_idx
ON sch_brand_payment_data_lake_proxy.supplier_balances (document_date_year);
CREATE TABLE IF NOT EXISTS sch_brand_payment_data_lake_proxy.supplier_balance_items
(
supplier_balance_id bigserial NOT NULL,
posting_view_item text NOT NULL,
posting_key text,
amount_in_company_code_currency numeric,
amount_in_transaction_currency numeric,
cash_discount_1_percent numeric,
cash_discount_amount numeric,
clearing_accounting_document text,
document_item_text text,
gl_account text,
is_cleared bool,
clearing_date timestamp,
due_calculation_base_date timestamp,
/* uniqueness is basically the posting_view_item for a given supplier balance */
CONSTRAINT supplier_balance_items_pkey PRIMARY KEY (supplier_balance_id, posting_view_item),
/* 1(supplier balance):N(supplier balance items) */
CONSTRAINT supplier_balance_items_fkey FOREIGN KEY (supplier_balance_id)
REFERENCES sch_brand_payment_data_lake_proxy.supplier_balances (id)
ON DELETE CASCADE
ON UPDATE CASCADE
);
注意:为了简单起见,我只是填写了不能 NULL
的列。
INSERT INTO
sch_brand_payment_data_lake_proxy.supplier_balances
(accounting_document, accounting_document_type, company_code, document_date_year)
VALUES
('A', 'B', 'C', 0),
('A', 'B', 'C', 1),
('A', 'B', 'C', 2),
('A', 'B', 'C', 3),
('A', 'B', 'C', 4),
('A', 'B', 'C', 5)
RETURNING id;
输出:
id
1
2
3
4
5
6
INSERT INTO
sch_brand_payment_data_lake_proxy.supplier_balance_items
(supplier_balance_id, posting_view_item)
VALUES
(1, 'A'),
(1, 'B'),
(3, 'A'),
(3, 'B'),
(2, 'A'),
(1, 'C');
SELECT
accounting_document,
accounting_document_type,
company_code,
document_date_year
FROM sch_brand_payment_data_lake_proxy.supplier_balances;
输出:
id
accounting_document
accounting_document_type
company_code
document_date_year
1
A
B
C
0
2
A
B
C
1
3
A
B
C
2
4
A
B
C
3
5
A
B
C
4
6
A
B
C
5
SELECT
supplier_balance_id,
posting_view_item
FROM sch_brand_payment_data_lake_proxy.supplier_balance_items;
输出:
supplier_balance_id
posting_view_item
1
A
1
B
3
A
3
B
2
A
1
C
现在,如果我们想 select JOIN 中的多个值,我们可以在原始 SQL:
中执行
SELECT
id,
accounting_document,
accounting_document_type,
company_code,
document_date_year,
posting_view_item
FROM sch_brand_payment_data_lake_proxy.supplier_balances
LEFT OUTER JOIN sch_brand_payment_data_lake_proxy.supplier_balance_items
ON supplier_balances.id = supplier_balance_items.supplier_balance_id
WHERE (accounting_document, accounting_document_type, company_code, document_date_year)
IN (('A', 'B', 'C', 1), ('A', 'B', 'C', 2))
输出:
id
accounting_document
accounting_document_type
company_code
document_date_year
posting_view_item
2
A
B
C
1
A
3
A
B
C
2
A
https://github.com/npgsql/npgsql/issues/1199
现在,在 C# 中使用 npgsql 时,重现上面的查询很容易:
using System.Data;
using Npgsql;
var connectionStringBuilder = new NpgsqlConnectionStringBuilder
{
Host = "localhost",
Port = 5432,
Username = "brand_payment_migration",
Password = "secret",
Database = "brand_payment"
};
using var connection = new NpgsqlConnection(connectionStringBuilder.ToString());
connection.Open();
using var command = connection.CreateCommand();
command.CommandText =
"SELECT id, accounting_document, accounting_document_type, company_code, document_date_year, posting_view_item " +
"FROM sch_brand_payment_data_lake_proxy.supplier_balances " +
"LEFT OUTER JOIN sch_brand_payment_data_lake_proxy.supplier_balance_items " +
"ON supplier_balances.id = supplier_balance_items.supplier_balance_id " +
"WHERE (accounting_document, accounting_document_type, company_code, document_date_year) " +
"IN (('A', 'B', 'C', 1), ('A', 'B', 'C', 2));";
using var reader = command.ExecuteReader();
using var dataTable = new DataTable();
dataTable.Load(reader);
var cols = dataTable.Columns.Cast<DataColumn>().ToArray();
Console.WriteLine(string.Join(Environment.NewLine, cols.Select((x, i) => $"Col{i} = {x}")));
Console.WriteLine(string.Join("\t", cols.Select((_, i) => $"Col{i}")));
foreach (var dataRow in dataTable.Rows.Cast<DataRow>())
{
Console.WriteLine(string.Join("\t", dataRow.ItemArray));
}
如预期输出:
Col0 = id
Col1 = accounting_document
Col2 = accounting_document_type
Col3 = company_code
Col4 = document_date_year
Col5 = posting_view_item
Col0 Col1 Col2 Col3 Col4 Col5
2 A B C 1 A
3 A B C 2 A
3 A B C 2 B
现在,我想要实现的是,与其为 (('A', 'B', 'C', 1), ('A', 'B', 'C', 2));
传递原始字符串,我更愿意使用带有值集集合的 NpgSqlParameter
(即对于每个列)).
所以我更改了上面的 C# 代码段并添加了参数
// ...
"WHERE (accounting_document, accounting_document_type, company_code, document_date_year) " +
"IN @values;";
var parameter = command.CreateParameter();
parameter.ParameterName = "@values";
parameter.NpgsqlDbType = NpgsqlDbType.Array;
parameter.NpgsqlValue = new object[,]
{
{ "A", "B", "C", 1 },
{ "A", "B", "C", 2 }
};
// Note: the same kind of issue arises when using tuples, i.e.
// ( "A", "B", "C", 1 )
// ( "A", "B", "C", 2 )
command.Parameters.Add(parameter);
using var reader = command.ExecuteReader();
// ...
然后我得到这个异常:
Unhandled exception. System.ArgumentOutOfRangeException: Cannot set NpgsqlDbType to just Array, Binary-Or with the element type (e.g. Array of Box is NpgsqlDbType.Array | Npg
sqlDbType.Box). (Parameter 'value')
at Npgsql.NpgsqlParameter.set_NpgsqlDbType(NpgsqlDbType value)
at Program.<Main>$(String[] args) in C:\Users\natalie-perret\Desktop\Personal\playground\csharp\CSharpPlayground\Program.cs:line 25
然后我尝试使用以下方法解决该错误:
parameter.NpgsqlDbType = NpgsqlDbType.Array | NpgsqlDbType.Unknown;
但随后出现另一个异常:
Unhandled exception. System.ArgumentException: No array type could be found in the database for element .<unknown>
at Npgsql.TypeMapping.ConnectorTypeMapper.ResolveByNpgsqlDbType(NpgsqlDbType npgsqlDbType)
at Npgsql.NpgsqlParameter.ResolveHandler(ConnectorTypeMapper typeMapper)
at Npgsql.NpgsqlParameterCollection.ValidateAndBind(ConnectorTypeMapper typeMapper)
at Npgsql.NpgsqlCommand.ExecuteReader(CommandBehavior behavior, Boolean async, CancellationToken cancellationToken)
at Npgsql.NpgsqlCommand.ExecuteReader(CommandBehavior behavior, Boolean async, CancellationToken cancellationToken)
at Npgsql.NpgsqlCommand.ExecuteReader(CommandBehavior behavior)
at Program.<Main>$(String[] args) in C:\Users\natalie-perret\Desktop\Personal\playground\csharp\CSharpPlayground\Program.cs:line 32
似乎出于某种原因需要注册类型,实际上如果我不指定类型:
Unhandled exception. System.NotSupportedException: The CLR type System.Object isn't natively supported by Npgsql or your PostgreSQL. To use it with a PostgreSQL composite
you need to specify DataTypeName or to map it, please refer to the documentation.
at Npgsql.TypeMapping.ConnectorTypeMapper.ResolveByClrType(Type type)
at Npgsql.TypeMapping.ConnectorTypeMapper.ResolveByClrType(Type type)
at Npgsql.NpgsqlParameter.ResolveHandler(ConnectorTypeMapper typeMapper)
at Npgsql.NpgsqlParameter.Bind(ConnectorTypeMapper typeMapper)
at Npgsql.NpgsqlParameterCollection.ValidateAndBind(ConnectorTypeMapper typeMapper)
at Npgsql.NpgsqlCommand.ExecuteReader(CommandBehavior behavior, Boolean async, CancellationToken cancellationToken)
at Npgsql.NpgsqlCommand.ExecuteReader(CommandBehavior behavior, Boolean async, CancellationToken cancellationToken)
at Npgsql.NpgsqlCommand.ExecuteReader(CommandBehavior behavior)
at Program.<Main>$(String[] args) in C:\Users\natalie-perret\Desktop\Personal\playground\csharp\CSharpPlayground\Program.cs:line 31
[编辑]
我最终得到的临时解决方案是依靠 jsonb 支持,尤其是 jsonb_to_recordset
函数(参见 PostgreSQL documentation section about json functions):
using System.Data;
using System.Text.Json;
using Npgsql;
using NpgsqlTypes;
var connectionStringBuilder = new NpgsqlConnectionStringBuilder
{
Host = "localhost",
Port = 5432,
Username = "brand_payment_migration",
Password = "secret",
Database = "brand_payment"
};
using var connection = new NpgsqlConnection(connectionStringBuilder.ToString());
connection.Open();
using var command = connection.CreateCommand();
command.CommandText =
"SELECT id, accounting_document, accounting_document_type, company_code, document_date_year, posting_view_item " +
"FROM sch_brand_payment_data_lake_proxy.supplier_balances " +
"LEFT OUTER JOIN sch_brand_payment_data_lake_proxy.supplier_balance_items " +
"ON supplier_balances.id = supplier_balance_items.supplier_balance_id " +
"WHERE (accounting_document, accounting_document_type, company_code, document_date_year) " +
"IN (SELECT * FROM jsonb_to_recordset(@values) " +
"AS params (accounting_document text, accounting_document_type text, company_code text, document_date_year integer));";
var parameter = command.CreateParameter();
parameter.ParameterName = "@values";
parameter.NpgsqlDbType = NpgsqlDbType.Jsonb;
parameter.NpgsqlValue = JsonSerializer.Serialize(new []
{
new Params("A", "B", "C", 1),
new Params("A", "B", "C", 2)
});
command.Parameters.Add(parameter);
using var reader = command.ExecuteReader();
using var dataTable = new DataTable();
dataTable.Load(reader);
var cols = dataTable.Columns.Cast<DataColumn>().ToArray();
Console.WriteLine(string.Join(Environment.NewLine, cols.Select((x, i) => $"Col{i} = {x}")));
Console.WriteLine(string.Join("\t", cols.Select((_, i) => $"Col{i}")));
foreach (var dataRow in dataTable.Rows.Cast<DataRow>())
{
Console.WriteLine(string.Join("\t", dataRow.ItemArray));
}
public Params(
string accounting_document,
string accounting_document_type,
string company_code,
int document_date_year);
输出:
Col0 = id
Col1 = accounting_document
Col2 = accounting_document_type
Col3 = company_code
Col4 = document_date_year
Col5 = posting_view_item
Col0 Col1 Col2 Col3 Col4 Col5
2 A B C 1 A
3 A B C 2 A
3 A B C 2 B
但是这是以在传递参数时添加额外的json序列化步骤为代价的。因此,除此之外并构建了一个非常长的字符串,我有点困惑的是没有办法直接将实际值传递给 NpgsqlParameter.NpgsqlValue
属性.
[编辑 2]
添加一个DbFiddle
[编辑 3]
相同的 jsonb“技巧”可用于提供数据(尽管,我已经在上面提到过同样的问题):
INSERT INTO sch_brand_payment_data_lake_proxy.supplier_balances
(accounting_document, accounting_document_type, company_code, document_date_year)
SELECT * FROM jsonb_to_recordset(
'[{"accounting_document":"E","accounting_document_type":"B","company_code":"C","document_date_year":1},
{"accounting_document":"E","accounting_document_type":"B","company_code":"C","document_date_year":2}]'::jsonb)
AS params (accounting_document text, accounting_document_type text, company_code text, document_date_year integer)
RETURNING id;
[编辑 4]
另一种方法是使用 jsonb_populate_recordset
并将相关的 NULL::table-full-name
作为第一个参数(将定义列)传递,并将相关的 jsonb
作为第二个参数(类似于 jsonb_to_recordset
第一个参数)。
基本上是实现我想要的 3 种主要方法(更新了 DbFiddle accordingly):
注意:使用 PostgreSQL 15 和 json_table
feature.
,事情可能会变得更容易
[编辑 3]
这篇文章很好地总结了一些事情:
https://dev.to/forbeslindesay/postgres-unnest-cheat-sheet-for-bulk-operations-1obg
[编辑 2]
跟进我今天早些时候提交的问题
https://github.com/npgsql/npgsql/issues/4437#issuecomment-1113999994
我已经解决了 @dhedey in another, somehow, related issue 提到的解决方案/解决方法:
If it helps anyone else, I have found quite a neat workaround for these types of queries using the UNNEST
command, which can take multiple array parameters and zip them together into columns, which can be joined with the table to filter to the relevant columns.
The use of the join is also more performant than the ANY/IN pattern in some cases.
SELECT * FROM table WHERE (itemauthor, itemtitle) = ANY (('bob', 'hello'), ('frank', 'hi')...)
Can be represented with:
var authorsParameter = new NpgsqlParameter("@authors", NpgsqlDbType.Array | NpgsqlDbType.Varchar)
{ Value = authors.ToList() };
var titlesParameter = new NpgsqlParameter("@titles", NpgsqlDbType.Array | NpgsqlDbType.Varchar)
{ Value = titles.ToList() };
var results = dbContext.Set<MyRow>()
.FromSqlInterpolated($@"
SELECT
t.*
FROM UNNEST({authorsParameter}, {titlesParameter}) params (author, title)
INNER JOIN table t
ON t.author = params.author
AND t.title = params.title
");
NB - the Varchar can be replaced by other types for parameters which are arrays of other types (eg Bigint) - check out the NpgsqlDbType
enum for more details.
然后我重写了一些我最初 post 编辑的代码,unnest
PostgreSQL function 解决方案似乎很有魅力。这是我暂时接受的答案,它看起来比 Json / JsonB 更整洁,后者需要进一步 postgresql-json-specific 映射恶作剧或提取。
不过,我还不太确定对性能的影响:
unnest
涉及你映射差异
jsonb_to_recordset
需要额外的 .NET Json 序列化步骤,并且在某些情况下,将 jsonb_to_recordset
的输出显式映射到相关列。
两者都不是免费的。但我喜欢 unnest
明确地为每个列(即更大的 .NET 类型(元组、记录、类、结构等)的每个值集/集合)明确地传递给 NpgsqlParameter.NpgsqlValue
属性 将通过 NpgsqlDbType
enum
使用哪种数据库类型
using System.Data;
using Npgsql;
using NpgsqlTypes;
var connectionStringBuilder = new NpgsqlConnectionStringBuilder
{
Host = "localhost",
Port = 5432,
Username = "brand_payment_migration",
Password = "secret",
Database = "brand_payment"
};
using var connection = new NpgsqlConnection(connectionStringBuilder.ToString());
connection.Open();
var selectStatement =
"SELECT * FROM sch_brand_payment_data_lake_proxy.supplier_balances " +
"WHERE (accounting_document, accounting_document_type, company_code, document_date_year) " +
"IN (SELECT * FROM unnest(" +
"@accounting_document_texts, " +
"@accounting_document_types, " +
"@company_codes, " +
"@document_date_years" +
"))";
var insertStatement =
"INSERT INTO sch_brand_payment_data_lake_proxy.supplier_balances " +
"(accounting_document, accounting_document_type, company_code, document_date_year) " +
"SELECT * FROM unnest(" +
"@accounting_document_texts, " +
"@accounting_document_types, " +
"@company_codes, " +
"@document_date_years" +
") RETURNING id;";
var parameters = new (string Name, NpgsqlDbType DbType, object Value)[]
{
("@accounting_document_texts", NpgsqlDbType.Array | NpgsqlDbType.Text, new[] {"G", "G", "G"}),
("@accounting_document_types", NpgsqlDbType.Array | NpgsqlDbType.Text, new[] {"Y", "Y", "Y"}),
("@company_codes", NpgsqlDbType.Array | NpgsqlDbType.Text, new[] {"Z", "Z", "Z"}),
("@document_date_years", NpgsqlDbType.Array | NpgsqlDbType.Integer, new[] {1, 2, 3})
};
connection.ExecuteNewCommandAndWriteResultToConsole(insertStatement, parameters);
connection.ExecuteNewCommandAndWriteResultToConsole(selectStatement, parameters);
public static class Extensions
{
public static void AddParameter(this NpgsqlCommand command, string name, NpgsqlDbType dbType, object value)
{
var parameter = command.CreateParameter();
parameter.ParameterName = name;
parameter.NpgsqlDbType = dbType;
parameter.NpgsqlValue = value;
command.Parameters.Add(parameter);
}
public static NpgsqlCommand CreateCommand(this NpgsqlConnection connection,
string text,
IEnumerable<(string Name, NpgsqlDbType DbType, object Value)> parameters)
{
var command = connection.CreateCommand();
command.CommandText = text;
foreach (var (name, dbType, value) in parameters)
{
command.AddParameter(name, dbType, value);
}
return command;
}
public static void ExecuteAndWriteResultToConsole(this NpgsqlCommand command)
{
Console.WriteLine($"Executing command... {command.CommandText}");
using var reader = command.ExecuteReader();
using var dataTable = new DataTable();
dataTable.Load(reader);
var cols = dataTable.Columns.Cast<DataColumn>().ToArray();
Console.WriteLine(string.Join(Environment.NewLine, cols.Select((x, i) => $"Col{i} = {x}")));
Console.WriteLine(string.Join("\t", cols.Select((_, i) => $"Col{i}")));
foreach (var dataRow in dataTable.Rows.Cast<DataRow>())
{
Console.WriteLine(string.Join("\t", dataRow.ItemArray));
}
}
public static void ExecuteNewCommandAndWriteResultToConsole(this NpgsqlConnection connection,
string text,
IEnumerable<(string Name, NpgsqlDbType DbType, object Value)> parameters)
{
using var command = connection.CreateCommand(text, parameters);
command.ExecuteAndWriteResultToConsole();
}
}
输出:
Executing command... INSERT INTO sch_brand_payment_data_lake_proxy.supplier_balances (accounting_document, accounting_document_type, company_code, document_date_year) SEL
ECT * FROM unnest(@accounting_document_texts, @accounting_document_types, @company_codes, @document_date_years) RETURNING id;
Col0 = id
Col0
28
29
30
Executing command... SELECT * FROM sch_brand_payment_data_lake_proxy.supplier_balances WHERE (accounting_document, accounting_document_type, company_code, document_date_y
ear) IN (SELECT * FROM unnest(@accounting_document_texts, @accounting_document_types, @company_codes, @document_date_years))
Col0 = id
Col1 = accounting_document
Col2 = accounting_document_type
Col3 = company_code
Col4 = document_date_year
Col5 = accounting_doc_created_by_user
Col6 = accounting_clerk
Col7 = assignment_reference
Col8 = document_reference_id
Col9 = original_reference_document
Col10 = payment_terms
Col11 = supplier
Col12 = supplier_name
Col13 = document_date
Col14 = posting_date
Col15 = net_due_date
Col16 = created_on
Col17 = modified_on
Col18 = pushed_on
Col19 = is_modified
Col20 = is_pushed
Col0 Col1 Col2 Col3 Col4 Col5 Col6 Col7 Col8 Col9 Col10 Col11 Col12 Col13 Col14 Col15 Col16 Col17 Col18 Col19 Col20
28 G Y Z 1 False False
29 G Y Z 2 False False
30 G Y Z 3 False False
[编辑 1]
由于@Charlieface 指出这不是合适的答案,我认为最好从 npgsql 维护者/贡献者那里获得答案/信息。
因此在他们的 GitHub 存储库上提交问题:https://github.com/npgsql/npgsql/issues/4437
原回答:
截至今天,除其他外,无法将元组或集合作为复合“类型”或通过 positional-slash-implicit“定义”传递,(然后可以在本应被使用的集合中使用传递给参数值 属性),npgslq 需要先前的 PostgreSQL 类型定义(但元组和嵌套集合仍然无法解决,因为维护者或至少其中之一认为不够安全) .
https://github.com/npgsql/npgsql/issues/2154
As the exception says the corresponding composite is required in the database. This is because anonymous types are not mapped to records.
So, you should create a type and a struct which must be mapped to the type.
FYI, there is a similar issue #2097 to track mapping composites to value tuples.
但这需要 npgsql 的一些其他相关开发人员,例如 #2097 which has been dropped the author / main contributed deemed as too brittle in https://github.com/dotnet/efcore/issues/14661#issuecomment-462440199
Note that after discussion in npgsql/npgsql#2097 we decided to drop this idea. C# value tuples don't have names, so any mapping to PostgreSQL composites would rely on field definition ordering, which seems quite dangerous/brittle.
我终于决定接受 jsonb 替代方案,不是一个超级粉丝,但至少它允许以相对安全的方式传递集合(只要传递 jsonb 的序列化受到控制)。
但我最初设想的方法到今天已经无法实现了。
还有一件事我在写 post:
的过程中学到了
- 有一个非常好的 Postgre 专用 Slack 服务器SQL:postgresteam.slack.com
- 关于在寻求 PostgreSQL-related 帮助时如何正确格式化 SQL 的非常好的指南(尽管作者的意见固执己见):https://www.depesz.com/2010/05/28/what-mistakes-you-can-avoid-when-looking-for-help-on-irc/
- 一个粘贴箱自动格式化 SQL 按照作者的喜好:https://paste.depesz.com
我在下面定义了两个表,supplier_balances
和 supplier_balance_items
(顺便说一句,两者之间存在 1[supplier_balance]:N[supplier_balance_items]
关系):
CREATE TABLE IF NOT EXISTS sch_brand_payment_data_lake_proxy.supplier_balances (
/* id is here for joining purposes with items table, instead of joining with the 4 columns used for sake
of making sure a record is deemed as unique */
id bigserial NOT NULL,
accounting_document text NOT NULL,
accounting_document_type text NOT NULL,
company_code text NOT NULL,
document_date_year int4 NOT NULL,
accounting_doc_created_by_user text,
accounting_clerk text,
assignment_reference text,
document_reference_id text,
original_reference_document text,
payment_terms text,
supplier text,
supplier_name text,
document_date timestamp,
posting_date timestamp,
net_due_date timestamp,
created_on timestamp default NULL,
modified_on timestamp default NULL,
pushed_on timestamp default NULL,
is_modified bool GENERATED ALWAYS AS (modified_on IS NOT NULL AND modified_on > created_on) STORED,
is_pushed bool GENERATED ALWAYS AS (pushed_on IS NOT NULL AND pushed_on > modified_on) STORED,
CONSTRAINT supplier_balances_pkey PRIMARY KEY (id),
/* accounting_document being the field of the composite unique index -> faster querying */
CONSTRAINT supplier_balances_unique UNIQUE (
accounting_document,
accounting_document_type,
company_code,
document_date_year)
);
/* Creating other indexes for querying of those as well */
CREATE INDEX IF NOT EXISTS supplier_balances_accounting_document_type_idx
ON sch_brand_payment_data_lake_proxy.supplier_balances (accounting_document_type);
CREATE INDEX IF NOT EXISTS supplier_balances_company_code_idx
ON sch_brand_payment_data_lake_proxy.supplier_balances (company_code);
CREATE INDEX IF NOT EXISTS supplier_balances_document_date_year_idx
ON sch_brand_payment_data_lake_proxy.supplier_balances (document_date_year);
CREATE TABLE IF NOT EXISTS sch_brand_payment_data_lake_proxy.supplier_balance_items
(
supplier_balance_id bigserial NOT NULL,
posting_view_item text NOT NULL,
posting_key text,
amount_in_company_code_currency numeric,
amount_in_transaction_currency numeric,
cash_discount_1_percent numeric,
cash_discount_amount numeric,
clearing_accounting_document text,
document_item_text text,
gl_account text,
is_cleared bool,
clearing_date timestamp,
due_calculation_base_date timestamp,
/* uniqueness is basically the posting_view_item for a given supplier balance */
CONSTRAINT supplier_balance_items_pkey PRIMARY KEY (supplier_balance_id, posting_view_item),
/* 1(supplier balance):N(supplier balance items) */
CONSTRAINT supplier_balance_items_fkey FOREIGN KEY (supplier_balance_id)
REFERENCES sch_brand_payment_data_lake_proxy.supplier_balances (id)
ON DELETE CASCADE
ON UPDATE CASCADE
);
注意:为了简单起见,我只是填写了不能 NULL
的列。
INSERT INTO
sch_brand_payment_data_lake_proxy.supplier_balances
(accounting_document, accounting_document_type, company_code, document_date_year)
VALUES
('A', 'B', 'C', 0),
('A', 'B', 'C', 1),
('A', 'B', 'C', 2),
('A', 'B', 'C', 3),
('A', 'B', 'C', 4),
('A', 'B', 'C', 5)
RETURNING id;
输出:
id |
---|
1 |
2 |
3 |
4 |
5 |
6 |
INSERT INTO
sch_brand_payment_data_lake_proxy.supplier_balance_items
(supplier_balance_id, posting_view_item)
VALUES
(1, 'A'),
(1, 'B'),
(3, 'A'),
(3, 'B'),
(2, 'A'),
(1, 'C');
SELECT
accounting_document,
accounting_document_type,
company_code,
document_date_year
FROM sch_brand_payment_data_lake_proxy.supplier_balances;
输出:
id | accounting_document | accounting_document_type | company_code | document_date_year |
---|---|---|---|---|
1 | A | B | C | 0 |
2 | A | B | C | 1 |
3 | A | B | C | 2 |
4 | A | B | C | 3 |
5 | A | B | C | 4 |
6 | A | B | C | 5 |
SELECT
supplier_balance_id,
posting_view_item
FROM sch_brand_payment_data_lake_proxy.supplier_balance_items;
输出:
supplier_balance_id | posting_view_item |
---|---|
1 | A |
1 | B |
3 | A |
3 | B |
2 | A |
1 | C |
现在,如果我们想 select JOIN 中的多个值,我们可以在原始 SQL:
中执行SELECT
id,
accounting_document,
accounting_document_type,
company_code,
document_date_year,
posting_view_item
FROM sch_brand_payment_data_lake_proxy.supplier_balances
LEFT OUTER JOIN sch_brand_payment_data_lake_proxy.supplier_balance_items
ON supplier_balances.id = supplier_balance_items.supplier_balance_id
WHERE (accounting_document, accounting_document_type, company_code, document_date_year)
IN (('A', 'B', 'C', 1), ('A', 'B', 'C', 2))
输出:
id | accounting_document | accounting_document_type | company_code | document_date_year | posting_view_item |
---|---|---|---|---|---|
2 | A | B | C | 1 | A |
3 | A | B | C | 2 | A |
https://github.com/npgsql/npgsql/issues/1199
现在,在 C# 中使用 npgsql 时,重现上面的查询很容易:
using System.Data;
using Npgsql;
var connectionStringBuilder = new NpgsqlConnectionStringBuilder
{
Host = "localhost",
Port = 5432,
Username = "brand_payment_migration",
Password = "secret",
Database = "brand_payment"
};
using var connection = new NpgsqlConnection(connectionStringBuilder.ToString());
connection.Open();
using var command = connection.CreateCommand();
command.CommandText =
"SELECT id, accounting_document, accounting_document_type, company_code, document_date_year, posting_view_item " +
"FROM sch_brand_payment_data_lake_proxy.supplier_balances " +
"LEFT OUTER JOIN sch_brand_payment_data_lake_proxy.supplier_balance_items " +
"ON supplier_balances.id = supplier_balance_items.supplier_balance_id " +
"WHERE (accounting_document, accounting_document_type, company_code, document_date_year) " +
"IN (('A', 'B', 'C', 1), ('A', 'B', 'C', 2));";
using var reader = command.ExecuteReader();
using var dataTable = new DataTable();
dataTable.Load(reader);
var cols = dataTable.Columns.Cast<DataColumn>().ToArray();
Console.WriteLine(string.Join(Environment.NewLine, cols.Select((x, i) => $"Col{i} = {x}")));
Console.WriteLine(string.Join("\t", cols.Select((_, i) => $"Col{i}")));
foreach (var dataRow in dataTable.Rows.Cast<DataRow>())
{
Console.WriteLine(string.Join("\t", dataRow.ItemArray));
}
如预期输出:
Col0 = id
Col1 = accounting_document
Col2 = accounting_document_type
Col3 = company_code
Col4 = document_date_year
Col5 = posting_view_item
Col0 Col1 Col2 Col3 Col4 Col5
2 A B C 1 A
3 A B C 2 A
3 A B C 2 B
现在,我想要实现的是,与其为 (('A', 'B', 'C', 1), ('A', 'B', 'C', 2));
传递原始字符串,我更愿意使用带有值集集合的 NpgSqlParameter
(即对于每个列)).
所以我更改了上面的 C# 代码段并添加了参数
// ...
"WHERE (accounting_document, accounting_document_type, company_code, document_date_year) " +
"IN @values;";
var parameter = command.CreateParameter();
parameter.ParameterName = "@values";
parameter.NpgsqlDbType = NpgsqlDbType.Array;
parameter.NpgsqlValue = new object[,]
{
{ "A", "B", "C", 1 },
{ "A", "B", "C", 2 }
};
// Note: the same kind of issue arises when using tuples, i.e.
// ( "A", "B", "C", 1 )
// ( "A", "B", "C", 2 )
command.Parameters.Add(parameter);
using var reader = command.ExecuteReader();
// ...
然后我得到这个异常:
Unhandled exception. System.ArgumentOutOfRangeException: Cannot set NpgsqlDbType to just Array, Binary-Or with the element type (e.g. Array of Box is NpgsqlDbType.Array | Npg
sqlDbType.Box). (Parameter 'value')
at Npgsql.NpgsqlParameter.set_NpgsqlDbType(NpgsqlDbType value)
at Program.<Main>$(String[] args) in C:\Users\natalie-perret\Desktop\Personal\playground\csharp\CSharpPlayground\Program.cs:line 25
然后我尝试使用以下方法解决该错误:
parameter.NpgsqlDbType = NpgsqlDbType.Array | NpgsqlDbType.Unknown;
但随后出现另一个异常:
Unhandled exception. System.ArgumentException: No array type could be found in the database for element .<unknown>
at Npgsql.TypeMapping.ConnectorTypeMapper.ResolveByNpgsqlDbType(NpgsqlDbType npgsqlDbType)
at Npgsql.NpgsqlParameter.ResolveHandler(ConnectorTypeMapper typeMapper)
at Npgsql.NpgsqlParameterCollection.ValidateAndBind(ConnectorTypeMapper typeMapper)
at Npgsql.NpgsqlCommand.ExecuteReader(CommandBehavior behavior, Boolean async, CancellationToken cancellationToken)
at Npgsql.NpgsqlCommand.ExecuteReader(CommandBehavior behavior, Boolean async, CancellationToken cancellationToken)
at Npgsql.NpgsqlCommand.ExecuteReader(CommandBehavior behavior)
at Program.<Main>$(String[] args) in C:\Users\natalie-perret\Desktop\Personal\playground\csharp\CSharpPlayground\Program.cs:line 32
似乎出于某种原因需要注册类型,实际上如果我不指定类型:
Unhandled exception. System.NotSupportedException: The CLR type System.Object isn't natively supported by Npgsql or your PostgreSQL. To use it with a PostgreSQL composite
you need to specify DataTypeName or to map it, please refer to the documentation.
at Npgsql.TypeMapping.ConnectorTypeMapper.ResolveByClrType(Type type)
at Npgsql.TypeMapping.ConnectorTypeMapper.ResolveByClrType(Type type)
at Npgsql.NpgsqlParameter.ResolveHandler(ConnectorTypeMapper typeMapper)
at Npgsql.NpgsqlParameter.Bind(ConnectorTypeMapper typeMapper)
at Npgsql.NpgsqlParameterCollection.ValidateAndBind(ConnectorTypeMapper typeMapper)
at Npgsql.NpgsqlCommand.ExecuteReader(CommandBehavior behavior, Boolean async, CancellationToken cancellationToken)
at Npgsql.NpgsqlCommand.ExecuteReader(CommandBehavior behavior, Boolean async, CancellationToken cancellationToken)
at Npgsql.NpgsqlCommand.ExecuteReader(CommandBehavior behavior)
at Program.<Main>$(String[] args) in C:\Users\natalie-perret\Desktop\Personal\playground\csharp\CSharpPlayground\Program.cs:line 31
[编辑]
我最终得到的临时解决方案是依靠 jsonb 支持,尤其是 jsonb_to_recordset
函数(参见 PostgreSQL documentation section about json functions):
using System.Data;
using System.Text.Json;
using Npgsql;
using NpgsqlTypes;
var connectionStringBuilder = new NpgsqlConnectionStringBuilder
{
Host = "localhost",
Port = 5432,
Username = "brand_payment_migration",
Password = "secret",
Database = "brand_payment"
};
using var connection = new NpgsqlConnection(connectionStringBuilder.ToString());
connection.Open();
using var command = connection.CreateCommand();
command.CommandText =
"SELECT id, accounting_document, accounting_document_type, company_code, document_date_year, posting_view_item " +
"FROM sch_brand_payment_data_lake_proxy.supplier_balances " +
"LEFT OUTER JOIN sch_brand_payment_data_lake_proxy.supplier_balance_items " +
"ON supplier_balances.id = supplier_balance_items.supplier_balance_id " +
"WHERE (accounting_document, accounting_document_type, company_code, document_date_year) " +
"IN (SELECT * FROM jsonb_to_recordset(@values) " +
"AS params (accounting_document text, accounting_document_type text, company_code text, document_date_year integer));";
var parameter = command.CreateParameter();
parameter.ParameterName = "@values";
parameter.NpgsqlDbType = NpgsqlDbType.Jsonb;
parameter.NpgsqlValue = JsonSerializer.Serialize(new []
{
new Params("A", "B", "C", 1),
new Params("A", "B", "C", 2)
});
command.Parameters.Add(parameter);
using var reader = command.ExecuteReader();
using var dataTable = new DataTable();
dataTable.Load(reader);
var cols = dataTable.Columns.Cast<DataColumn>().ToArray();
Console.WriteLine(string.Join(Environment.NewLine, cols.Select((x, i) => $"Col{i} = {x}")));
Console.WriteLine(string.Join("\t", cols.Select((_, i) => $"Col{i}")));
foreach (var dataRow in dataTable.Rows.Cast<DataRow>())
{
Console.WriteLine(string.Join("\t", dataRow.ItemArray));
}
public Params(
string accounting_document,
string accounting_document_type,
string company_code,
int document_date_year);
输出:
Col0 = id
Col1 = accounting_document
Col2 = accounting_document_type
Col3 = company_code
Col4 = document_date_year
Col5 = posting_view_item
Col0 Col1 Col2 Col3 Col4 Col5
2 A B C 1 A
3 A B C 2 A
3 A B C 2 B
但是这是以在传递参数时添加额外的json序列化步骤为代价的。因此,除此之外并构建了一个非常长的字符串,我有点困惑的是没有办法直接将实际值传递给 NpgsqlParameter.NpgsqlValue
属性.
[编辑 2]
添加一个DbFiddle
[编辑 3]
相同的 jsonb“技巧”可用于提供数据(尽管,我已经在上面提到过同样的问题):
INSERT INTO sch_brand_payment_data_lake_proxy.supplier_balances
(accounting_document, accounting_document_type, company_code, document_date_year)
SELECT * FROM jsonb_to_recordset(
'[{"accounting_document":"E","accounting_document_type":"B","company_code":"C","document_date_year":1},
{"accounting_document":"E","accounting_document_type":"B","company_code":"C","document_date_year":2}]'::jsonb)
AS params (accounting_document text, accounting_document_type text, company_code text, document_date_year integer)
RETURNING id;
[编辑 4]
另一种方法是使用 jsonb_populate_recordset
并将相关的 NULL::table-full-name
作为第一个参数(将定义列)传递,并将相关的 jsonb
作为第二个参数(类似于 jsonb_to_recordset
第一个参数)。
基本上是实现我想要的 3 种主要方法(更新了 DbFiddle accordingly):
注意:使用 PostgreSQL 15 和 json_table
feature.
[编辑 3] 这篇文章很好地总结了一些事情: https://dev.to/forbeslindesay/postgres-unnest-cheat-sheet-for-bulk-operations-1obg
[编辑 2]
跟进我今天早些时候提交的问题 https://github.com/npgsql/npgsql/issues/4437#issuecomment-1113999994
我已经解决了 @dhedey in another, somehow, related issue 提到的解决方案/解决方法:
If it helps anyone else, I have found quite a neat workaround for these types of queries using the
UNNEST
command, which can take multiple array parameters and zip them together into columns, which can be joined with the table to filter to the relevant columns.The use of the join is also more performant than the ANY/IN pattern in some cases.
SELECT * FROM table WHERE (itemauthor, itemtitle) = ANY (('bob', 'hello'), ('frank', 'hi')...)
Can be represented with:
var authorsParameter = new NpgsqlParameter("@authors", NpgsqlDbType.Array | NpgsqlDbType.Varchar) { Value = authors.ToList() }; var titlesParameter = new NpgsqlParameter("@titles", NpgsqlDbType.Array | NpgsqlDbType.Varchar) { Value = titles.ToList() }; var results = dbContext.Set<MyRow>() .FromSqlInterpolated($@" SELECT t.* FROM UNNEST({authorsParameter}, {titlesParameter}) params (author, title) INNER JOIN table t ON t.author = params.author AND t.title = params.title ");
NB - the Varchar can be replaced by other types for parameters which are arrays of other types (eg Bigint) - check out the
NpgsqlDbType
enum for more details.
然后我重写了一些我最初 post 编辑的代码,unnest
PostgreSQL function 解决方案似乎很有魅力。这是我暂时接受的答案,它看起来比 Json / JsonB 更整洁,后者需要进一步 postgresql-json-specific 映射恶作剧或提取。
不过,我还不太确定对性能的影响:
unnest
涉及你映射差异jsonb_to_recordset
需要额外的 .NET Json 序列化步骤,并且在某些情况下,将jsonb_to_recordset
的输出显式映射到相关列。
两者都不是免费的。但我喜欢 unnest
明确地为每个列(即更大的 .NET 类型(元组、记录、类、结构等)的每个值集/集合)明确地传递给 NpgsqlParameter.NpgsqlValue
属性 将通过 NpgsqlDbType
enum
using System.Data;
using Npgsql;
using NpgsqlTypes;
var connectionStringBuilder = new NpgsqlConnectionStringBuilder
{
Host = "localhost",
Port = 5432,
Username = "brand_payment_migration",
Password = "secret",
Database = "brand_payment"
};
using var connection = new NpgsqlConnection(connectionStringBuilder.ToString());
connection.Open();
var selectStatement =
"SELECT * FROM sch_brand_payment_data_lake_proxy.supplier_balances " +
"WHERE (accounting_document, accounting_document_type, company_code, document_date_year) " +
"IN (SELECT * FROM unnest(" +
"@accounting_document_texts, " +
"@accounting_document_types, " +
"@company_codes, " +
"@document_date_years" +
"))";
var insertStatement =
"INSERT INTO sch_brand_payment_data_lake_proxy.supplier_balances " +
"(accounting_document, accounting_document_type, company_code, document_date_year) " +
"SELECT * FROM unnest(" +
"@accounting_document_texts, " +
"@accounting_document_types, " +
"@company_codes, " +
"@document_date_years" +
") RETURNING id;";
var parameters = new (string Name, NpgsqlDbType DbType, object Value)[]
{
("@accounting_document_texts", NpgsqlDbType.Array | NpgsqlDbType.Text, new[] {"G", "G", "G"}),
("@accounting_document_types", NpgsqlDbType.Array | NpgsqlDbType.Text, new[] {"Y", "Y", "Y"}),
("@company_codes", NpgsqlDbType.Array | NpgsqlDbType.Text, new[] {"Z", "Z", "Z"}),
("@document_date_years", NpgsqlDbType.Array | NpgsqlDbType.Integer, new[] {1, 2, 3})
};
connection.ExecuteNewCommandAndWriteResultToConsole(insertStatement, parameters);
connection.ExecuteNewCommandAndWriteResultToConsole(selectStatement, parameters);
public static class Extensions
{
public static void AddParameter(this NpgsqlCommand command, string name, NpgsqlDbType dbType, object value)
{
var parameter = command.CreateParameter();
parameter.ParameterName = name;
parameter.NpgsqlDbType = dbType;
parameter.NpgsqlValue = value;
command.Parameters.Add(parameter);
}
public static NpgsqlCommand CreateCommand(this NpgsqlConnection connection,
string text,
IEnumerable<(string Name, NpgsqlDbType DbType, object Value)> parameters)
{
var command = connection.CreateCommand();
command.CommandText = text;
foreach (var (name, dbType, value) in parameters)
{
command.AddParameter(name, dbType, value);
}
return command;
}
public static void ExecuteAndWriteResultToConsole(this NpgsqlCommand command)
{
Console.WriteLine($"Executing command... {command.CommandText}");
using var reader = command.ExecuteReader();
using var dataTable = new DataTable();
dataTable.Load(reader);
var cols = dataTable.Columns.Cast<DataColumn>().ToArray();
Console.WriteLine(string.Join(Environment.NewLine, cols.Select((x, i) => $"Col{i} = {x}")));
Console.WriteLine(string.Join("\t", cols.Select((_, i) => $"Col{i}")));
foreach (var dataRow in dataTable.Rows.Cast<DataRow>())
{
Console.WriteLine(string.Join("\t", dataRow.ItemArray));
}
}
public static void ExecuteNewCommandAndWriteResultToConsole(this NpgsqlConnection connection,
string text,
IEnumerable<(string Name, NpgsqlDbType DbType, object Value)> parameters)
{
using var command = connection.CreateCommand(text, parameters);
command.ExecuteAndWriteResultToConsole();
}
}
输出:
Executing command... INSERT INTO sch_brand_payment_data_lake_proxy.supplier_balances (accounting_document, accounting_document_type, company_code, document_date_year) SEL
ECT * FROM unnest(@accounting_document_texts, @accounting_document_types, @company_codes, @document_date_years) RETURNING id;
Col0 = id
Col0
28
29
30
Executing command... SELECT * FROM sch_brand_payment_data_lake_proxy.supplier_balances WHERE (accounting_document, accounting_document_type, company_code, document_date_y
ear) IN (SELECT * FROM unnest(@accounting_document_texts, @accounting_document_types, @company_codes, @document_date_years))
Col0 = id
Col1 = accounting_document
Col2 = accounting_document_type
Col3 = company_code
Col4 = document_date_year
Col5 = accounting_doc_created_by_user
Col6 = accounting_clerk
Col7 = assignment_reference
Col8 = document_reference_id
Col9 = original_reference_document
Col10 = payment_terms
Col11 = supplier
Col12 = supplier_name
Col13 = document_date
Col14 = posting_date
Col15 = net_due_date
Col16 = created_on
Col17 = modified_on
Col18 = pushed_on
Col19 = is_modified
Col20 = is_pushed
Col0 Col1 Col2 Col3 Col4 Col5 Col6 Col7 Col8 Col9 Col10 Col11 Col12 Col13 Col14 Col15 Col16 Col17 Col18 Col19 Col20
28 G Y Z 1 False False
29 G Y Z 2 False False
30 G Y Z 3 False False
[编辑 1]
由于@Charlieface 指出这不是合适的答案,我认为最好从 npgsql 维护者/贡献者那里获得答案/信息。
因此在他们的 GitHub 存储库上提交问题:https://github.com/npgsql/npgsql/issues/4437
原回答:
截至今天,除其他外,无法将元组或集合作为复合“类型”或通过 positional-slash-implicit“定义”传递,(然后可以在本应被使用的集合中使用传递给参数值 属性),npgslq 需要先前的 PostgreSQL 类型定义(但元组和嵌套集合仍然无法解决,因为维护者或至少其中之一认为不够安全) . https://github.com/npgsql/npgsql/issues/2154
As the exception says the corresponding composite is required in the database. This is because anonymous types are not mapped to records.
So, you should create a type and a struct which must be mapped to the type.
FYI, there is a similar issue #2097 to track mapping composites to value tuples.
但这需要 npgsql 的一些其他相关开发人员,例如 #2097 which has been dropped the author / main contributed deemed as too brittle in https://github.com/dotnet/efcore/issues/14661#issuecomment-462440199
Note that after discussion in npgsql/npgsql#2097 we decided to drop this idea. C# value tuples don't have names, so any mapping to PostgreSQL composites would rely on field definition ordering, which seems quite dangerous/brittle.
我终于决定接受 jsonb 替代方案,不是一个超级粉丝,但至少它允许以相对安全的方式传递集合(只要传递 jsonb 的序列化受到控制)。
但我最初设想的方法到今天已经无法实现了。
还有一件事我在写 post:
的过程中学到了- 有一个非常好的 Postgre 专用 Slack 服务器SQL:postgresteam.slack.com
- 关于在寻求 PostgreSQL-related 帮助时如何正确格式化 SQL 的非常好的指南(尽管作者的意见固执己见):https://www.depesz.com/2010/05/28/what-mistakes-you-can-avoid-when-looking-for-help-on-irc/
- 一个粘贴箱自动格式化 SQL 按照作者的喜好:https://paste.depesz.com