npgsql:如何在一个命令中使用集合作为参数 select 多行(具有多个列值)?

npgsql: How to select multiple rows (with multiple column values) with npgsql in one command with a collection as a parameter?

我在下面定义了两个表,supplier_balancessupplier_balance_items(顺便说一句,两者之间存在 1[supplier_balance]:N[supplier_balance_items] 关系):

CREATE TABLE IF NOT EXISTS sch_brand_payment_data_lake_proxy.supplier_balances (
/* id is here for joining purposes with items table, instead of joining with the 4 columns used for sake
   of making sure a record is deemed as unique */
  id                             bigserial NOT NULL,
  accounting_document            text      NOT NULL,
  accounting_document_type       text      NOT NULL,
  company_code                   text      NOT NULL,
  document_date_year             int4      NOT NULL,
  accounting_doc_created_by_user text,
  accounting_clerk               text,
  assignment_reference           text,
  document_reference_id          text,
  original_reference_document    text,
  payment_terms                  text,
  supplier                       text,
  supplier_name                  text,
  document_date                  timestamp,
  posting_date                   timestamp,
  net_due_date                   timestamp,
  created_on                     timestamp default NULL,
  modified_on                    timestamp default NULL,
  pushed_on                      timestamp default NULL,
  is_modified bool GENERATED ALWAYS AS (modified_on IS NOT NULL AND modified_on > created_on) STORED,
  is_pushed   bool GENERATED ALWAYS AS (pushed_on   IS NOT NULL AND pushed_on > modified_on)  STORED,
  CONSTRAINT supplier_balances_pkey   PRIMARY KEY (id),
  /* accounting_document being the field of the composite unique index -> faster querying */
  CONSTRAINT supplier_balances_unique UNIQUE (
     accounting_document,
     accounting_document_type,
     company_code,
     document_date_year)
);
/* Creating other indexes for querying of those as well */
CREATE INDEX IF NOT EXISTS supplier_balances_accounting_document_type_idx
ON sch_brand_payment_data_lake_proxy.supplier_balances (accounting_document_type);
CREATE INDEX IF NOT EXISTS supplier_balances_company_code_idx
ON sch_brand_payment_data_lake_proxy.supplier_balances (company_code);
CREATE INDEX IF NOT EXISTS supplier_balances_document_date_year_idx
ON sch_brand_payment_data_lake_proxy.supplier_balances (document_date_year);

CREATE TABLE IF NOT EXISTS sch_brand_payment_data_lake_proxy.supplier_balance_items
(
    supplier_balance_id             bigserial NOT NULL,
    posting_view_item               text      NOT NULL,
    posting_key                     text,
    amount_in_company_code_currency numeric,
    amount_in_transaction_currency  numeric,
    cash_discount_1_percent         numeric,
    cash_discount_amount            numeric,
    clearing_accounting_document    text,
    document_item_text              text,
    gl_account                      text,
    is_cleared                      bool,
    clearing_date                   timestamp,
    due_calculation_base_date       timestamp,
    /* uniqueness is basically the posting_view_item for a given supplier balance */
    CONSTRAINT supplier_balance_items_pkey PRIMARY KEY (supplier_balance_id, posting_view_item),
    /* 1(supplier balance):N(supplier balance items) */
    CONSTRAINT supplier_balance_items_fkey FOREIGN KEY (supplier_balance_id)
               REFERENCES sch_brand_payment_data_lake_proxy.supplier_balances (id)
               ON DELETE CASCADE
               ON UPDATE CASCADE
);

注意:为了简单起见,我只是填写了不能 NULL 的列。

INSERT INTO 
sch_brand_payment_data_lake_proxy.supplier_balances 
(accounting_document, accounting_document_type, company_code, document_date_year)
VALUES 
('A', 'B', 'C', 0),
('A', 'B', 'C', 1),
('A', 'B', 'C', 2),
('A', 'B', 'C', 3),
('A', 'B', 'C', 4),
('A', 'B', 'C', 5)
RETURNING id;

输出:

id
1
2
3
4
5
6
INSERT INTO 
sch_brand_payment_data_lake_proxy.supplier_balance_items 
(supplier_balance_id, posting_view_item)
VALUES 
(1, 'A'),
(1, 'B'),
(3, 'A'),
(3, 'B'),
(2, 'A'),
(1, 'C');
SELECT 
    accounting_document, 
    accounting_document_type, 
    company_code, 
    document_date_year
FROM sch_brand_payment_data_lake_proxy.supplier_balances;

输出:

id accounting_document accounting_document_type company_code document_date_year
1 A B C 0
2 A B C 1
3 A B C 2
4 A B C 3
5 A B C 4
6 A B C 5
SELECT 
    supplier_balance_id,
    posting_view_item
FROM sch_brand_payment_data_lake_proxy.supplier_balance_items;

输出:

supplier_balance_id posting_view_item
1 A
1 B
3 A
3 B
2 A
1 C

现在,如果我们想 select JOIN 中的多个值,我们可以在原始 SQL:

中执行
SELECT 
    id,
    accounting_document, 
    accounting_document_type, 
    company_code, 
    document_date_year, 
    posting_view_item
FROM sch_brand_payment_data_lake_proxy.supplier_balances
LEFT OUTER JOIN sch_brand_payment_data_lake_proxy.supplier_balance_items
ON supplier_balances.id = supplier_balance_items.supplier_balance_id
WHERE (accounting_document, accounting_document_type, company_code, document_date_year)
IN  (('A', 'B', 'C', 1), ('A', 'B', 'C', 2))

输出:

id accounting_document accounting_document_type company_code document_date_year posting_view_item
2 A B C 1 A
3 A B C 2 A

https://github.com/npgsql/npgsql/issues/1199

现在,在 C# 中使用 npgsql 时,重现上面的查询很容易:

using System.Data;

using Npgsql;

var connectionStringBuilder = new NpgsqlConnectionStringBuilder
{
    Host     = "localhost",
    Port     = 5432,
    Username = "brand_payment_migration",
    Password = "secret",
    Database = "brand_payment"
};
using var connection = new NpgsqlConnection(connectionStringBuilder.ToString());
connection.Open();
using var command = connection.CreateCommand();
command.CommandText = 
"SELECT id, accounting_document, accounting_document_type, company_code, document_date_year, posting_view_item " +
"FROM sch_brand_payment_data_lake_proxy.supplier_balances " +
"LEFT OUTER JOIN sch_brand_payment_data_lake_proxy.supplier_balance_items " +
"ON supplier_balances.id = supplier_balance_items.supplier_balance_id " +
"WHERE (accounting_document, accounting_document_type, company_code, document_date_year) " +
"IN (('A', 'B', 'C', 1), ('A', 'B', 'C', 2));";

using var reader = command.ExecuteReader();
using var dataTable = new DataTable();
dataTable.Load(reader);
var cols = dataTable.Columns.Cast<DataColumn>().ToArray();
Console.WriteLine(string.Join(Environment.NewLine, cols.Select((x, i) => $"Col{i} = {x}")));
Console.WriteLine(string.Join("\t", cols.Select((_, i) => $"Col{i}")));
foreach (var dataRow in dataTable.Rows.Cast<DataRow>())
{
    Console.WriteLine(string.Join("\t", dataRow.ItemArray));
}

如预期输出:

Col0 = id
Col1 = accounting_document
Col2 = accounting_document_type
Col3 = company_code
Col4 = document_date_year
Col5 = posting_view_item
Col0    Col1    Col2    Col3    Col4    Col5
2       A       B       C       1       A
3       A       B       C       2       A
3       A       B       C       2       B

现在,我想要实现的是,与其为 (('A', 'B', 'C', 1), ('A', 'B', 'C', 2)); 传递原始字符串,我更愿意使用带有值集集合的 NpgSqlParameter(即对于每个列)).

所以我更改了上面的 C# 代码段并添加了参数

// ...
"WHERE (accounting_document, accounting_document_type, company_code, document_date_year) " +
"IN @values;";
var parameter = command.CreateParameter();
parameter.ParameterName = "@values";
parameter.NpgsqlDbType = NpgsqlDbType.Array;
parameter.NpgsqlValue = new object[,]
{
    { "A", "B", "C", 1 }, 
    { "A", "B", "C", 2 }
};
// Note: the same kind of issue arises when using tuples, i.e.
// ( "A", "B", "C", 1 )
// ( "A", "B", "C", 2 )
command.Parameters.Add(parameter);
using var reader = command.ExecuteReader();
// ...

然后我得到这个异常:

Unhandled exception. System.ArgumentOutOfRangeException: Cannot set NpgsqlDbType to just Array, Binary-Or with the element type (e.g. Array of Box is NpgsqlDbType.Array | Npg
sqlDbType.Box). (Parameter 'value')
   at Npgsql.NpgsqlParameter.set_NpgsqlDbType(NpgsqlDbType value)
   at Program.<Main>$(String[] args) in C:\Users\natalie-perret\Desktop\Personal\playground\csharp\CSharpPlayground\Program.cs:line 25

然后我尝试使用以下方法解决该错误:

parameter.NpgsqlDbType = NpgsqlDbType.Array | NpgsqlDbType.Unknown;

但随后出现另一个异常:

Unhandled exception. System.ArgumentException: No array type could be found in the database for element .<unknown>
   at Npgsql.TypeMapping.ConnectorTypeMapper.ResolveByNpgsqlDbType(NpgsqlDbType npgsqlDbType)
   at Npgsql.NpgsqlParameter.ResolveHandler(ConnectorTypeMapper typeMapper)
   at Npgsql.NpgsqlParameterCollection.ValidateAndBind(ConnectorTypeMapper typeMapper)
   at Npgsql.NpgsqlCommand.ExecuteReader(CommandBehavior behavior, Boolean async, CancellationToken cancellationToken)
   at Npgsql.NpgsqlCommand.ExecuteReader(CommandBehavior behavior, Boolean async, CancellationToken cancellationToken)
   at Npgsql.NpgsqlCommand.ExecuteReader(CommandBehavior behavior)
   at Program.<Main>$(String[] args) in C:\Users\natalie-perret\Desktop\Personal\playground\csharp\CSharpPlayground\Program.cs:line 32

似乎出于某种原因需要注册类型,实际上如果我不指定类型:

Unhandled exception. System.NotSupportedException: The CLR type System.Object isn't natively supported by Npgsql or your PostgreSQL. To use it with a PostgreSQL composite
 you need to specify DataTypeName or to map it, please refer to the documentation.
   at Npgsql.TypeMapping.ConnectorTypeMapper.ResolveByClrType(Type type)
   at Npgsql.TypeMapping.ConnectorTypeMapper.ResolveByClrType(Type type)
   at Npgsql.NpgsqlParameter.ResolveHandler(ConnectorTypeMapper typeMapper)
   at Npgsql.NpgsqlParameter.Bind(ConnectorTypeMapper typeMapper)
   at Npgsql.NpgsqlParameterCollection.ValidateAndBind(ConnectorTypeMapper typeMapper)
   at Npgsql.NpgsqlCommand.ExecuteReader(CommandBehavior behavior, Boolean async, CancellationToken cancellationToken)
   at Npgsql.NpgsqlCommand.ExecuteReader(CommandBehavior behavior, Boolean async, CancellationToken cancellationToken)
   at Npgsql.NpgsqlCommand.ExecuteReader(CommandBehavior behavior)
   at Program.<Main>$(String[] args) in C:\Users\natalie-perret\Desktop\Personal\playground\csharp\CSharpPlayground\Program.cs:line 31

[编辑]

我最终得到的临时解决方案是依靠 jsonb 支持,尤其是 jsonb_to_recordset 函数(参见 PostgreSQL documentation section about json functions):

using System.Data;
using System.Text.Json;

using Npgsql;
using NpgsqlTypes;


var connectionStringBuilder = new NpgsqlConnectionStringBuilder
{
    Host     = "localhost",
    Port     = 5432,
    Username = "brand_payment_migration",
    Password = "secret",
    Database = "brand_payment"
};
using var connection = new NpgsqlConnection(connectionStringBuilder.ToString());
connection.Open();
using var command = connection.CreateCommand();
command.CommandText = 
"SELECT id, accounting_document, accounting_document_type, company_code, document_date_year, posting_view_item " +
"FROM sch_brand_payment_data_lake_proxy.supplier_balances " +
"LEFT OUTER JOIN sch_brand_payment_data_lake_proxy.supplier_balance_items " +
"ON supplier_balances.id = supplier_balance_items.supplier_balance_id " +
"WHERE (accounting_document, accounting_document_type, company_code, document_date_year) " +
"IN (SELECT * FROM jsonb_to_recordset(@values) " +
"AS params (accounting_document text, accounting_document_type text, company_code text, document_date_year integer));";
var parameter = command.CreateParameter();
parameter.ParameterName = "@values";
parameter.NpgsqlDbType = NpgsqlDbType.Jsonb;
parameter.NpgsqlValue = JsonSerializer.Serialize(new []
{
    new Params("A", "B", "C", 1), 
    new Params("A", "B", "C", 2)
});
command.Parameters.Add(parameter);
using var reader = command.ExecuteReader();
using var dataTable = new DataTable();
dataTable.Load(reader);
var cols = dataTable.Columns.Cast<DataColumn>().ToArray();
Console.WriteLine(string.Join(Environment.NewLine, cols.Select((x, i) => $"Col{i} = {x}")));
Console.WriteLine(string.Join("\t", cols.Select((_, i) => $"Col{i}")));
foreach (var dataRow in dataTable.Rows.Cast<DataRow>())
{
    Console.WriteLine(string.Join("\t", dataRow.ItemArray));
}


public Params(
    string accounting_document, 
    string accounting_document_type,
    string company_code,
    int document_date_year);

输出:

Col0 = id
Col1 = accounting_document
Col2 = accounting_document_type
Col3 = company_code
Col4 = document_date_year
Col5 = posting_view_item
Col0    Col1    Col2    Col3    Col4    Col5
2       A       B       C       1       A
3       A       B       C       2       A
3       A       B       C       2       B

但是这是以在传递参数时添加额外的json序列化步骤为代价的。因此,除此之外并构建了一个非常长的字符串,我有点困惑的是没有办法直接将实际值传递给 NpgsqlParameter.NpgsqlValue属性.

[编辑 2]

添加一个DbFiddle

[编辑 3]

相同的 jsonb“技巧”可用于提供数据(尽管,我已经在上面提到过同样的问题):

INSERT INTO sch_brand_payment_data_lake_proxy.supplier_balances
    (accounting_document, accounting_document_type, company_code, document_date_year)
SELECT * FROM jsonb_to_recordset(
    '[{"accounting_document":"E","accounting_document_type":"B","company_code":"C","document_date_year":1},
      {"accounting_document":"E","accounting_document_type":"B","company_code":"C","document_date_year":2}]'::jsonb)
       AS params (accounting_document text, accounting_document_type text, company_code text, document_date_year integer)
RETURNING id;

[编辑 4] 另一种方法是使用 jsonb_populate_recordset 并将相关的 NULL::table-full-name 作为第一个参数(将定义列)传递,并将相关的 jsonb 作为第二个参数(类似于 jsonb_to_recordset 第一个参数)。

基本上是实现我想要的 3 种主要方法(更新了 DbFiddle accordingly):

注意:使用 PostgreSQL 15 和 json_table feature.

,事情可能会变得更容易

[编辑 3] 这篇文章很好地总结了一些事情: https://dev.to/forbeslindesay/postgres-unnest-cheat-sheet-for-bulk-operations-1obg

[编辑 2]

跟进我今天早些时候提交的问题 https://github.com/npgsql/npgsql/issues/4437#issuecomment-1113999994

我已经解决了 @dhedey in another, somehow, related issue 提到的解决方案/解决方法:

If it helps anyone else, I have found quite a neat workaround for these types of queries using the UNNEST command, which can take multiple array parameters and zip them together into columns, which can be joined with the table to filter to the relevant columns.

The use of the join is also more performant than the ANY/IN pattern in some cases.

SELECT * FROM table WHERE (itemauthor, itemtitle) = ANY (('bob', 'hello'), ('frank', 'hi')...)

Can be represented with:

 var authorsParameter = new NpgsqlParameter("@authors", NpgsqlDbType.Array | NpgsqlDbType.Varchar)
    { Value = authors.ToList() };
var titlesParameter = new NpgsqlParameter("@titles", NpgsqlDbType.Array | NpgsqlDbType.Varchar)
    { Value = titles.ToList() };

var results = dbContext.Set<MyRow>()
    .FromSqlInterpolated($@"
SELECT
    t.*
FROM UNNEST({authorsParameter}, {titlesParameter}) params (author, title)
INNER JOIN table t
    ON t.author = params.author
    AND t.title = params.title
");

NB - the Varchar can be replaced by other types for parameters which are arrays of other types (eg Bigint) - check out the NpgsqlDbType enum for more details.

然后我重写了一些我最初 post 编辑的代码,unnest PostgreSQL function 解决方案似乎很有魅力。这是我暂时接受的答案,它看起来比 Json / JsonB 更整洁,后者需要进一步 postgresql-json-specific 映射恶作剧或提取。

不过,我还不太确定对性能的影响:

  • unnest涉及你映射差异
  • jsonb_to_recordset 需要额外的 .NET Json 序列化步骤,并且在某些情况下,将 jsonb_to_recordset 的输出显式映射到相关列。

两者都不是免费的。但我喜欢 unnest 明确地为每个列(即更大的 .NET 类型(元组、记录、类、结构等)的每个值集/集合)明确地传递给 NpgsqlParameter.NpgsqlValue 属性 将通过 NpgsqlDbType enum

使用哪种数据库类型
using System.Data;

using Npgsql;
using NpgsqlTypes;


var connectionStringBuilder = new NpgsqlConnectionStringBuilder
{
    Host     = "localhost",
    Port     = 5432,
    Username = "brand_payment_migration",
    Password = "secret",
    Database = "brand_payment"
};
using var connection = new NpgsqlConnection(connectionStringBuilder.ToString());
connection.Open();

var selectStatement =
    "SELECT * FROM sch_brand_payment_data_lake_proxy.supplier_balances " +
    "WHERE (accounting_document, accounting_document_type, company_code, document_date_year) " +
    "IN (SELECT * FROM  unnest(" +
    "@accounting_document_texts, " +
    "@accounting_document_types, " +
    "@company_codes, " +
    "@document_date_years" +
    "))";

var insertStatement = 
    "INSERT INTO sch_brand_payment_data_lake_proxy.supplier_balances " +
    "(accounting_document, accounting_document_type, company_code, document_date_year) " + 
    "SELECT * FROM unnest(" +
    "@accounting_document_texts, " +
    "@accounting_document_types, " +
    "@company_codes, " +
    "@document_date_years" + 
    ") RETURNING id;";

var parameters = new (string Name, NpgsqlDbType DbType, object Value)[]
{
    ("@accounting_document_texts", NpgsqlDbType.Array | NpgsqlDbType.Text,    new[] {"G", "G", "G"}),
    ("@accounting_document_types", NpgsqlDbType.Array | NpgsqlDbType.Text,    new[] {"Y", "Y", "Y"}),
    ("@company_codes",             NpgsqlDbType.Array | NpgsqlDbType.Text,    new[] {"Z", "Z", "Z"}),
    ("@document_date_years",       NpgsqlDbType.Array | NpgsqlDbType.Integer, new[] {1, 2, 3})
};

connection.ExecuteNewCommandAndWriteResultToConsole(insertStatement, parameters);
connection.ExecuteNewCommandAndWriteResultToConsole(selectStatement, parameters);

public static class Extensions
{
    public static void AddParameter(this NpgsqlCommand command, string name, NpgsqlDbType dbType, object value)
    {
        var parameter = command.CreateParameter();
        parameter.ParameterName = name;
        parameter.NpgsqlDbType  = dbType;
        parameter.NpgsqlValue   = value;
        command.Parameters.Add(parameter);
    }

    public static NpgsqlCommand CreateCommand(this NpgsqlConnection connection, 
        string text, 
        IEnumerable<(string Name, NpgsqlDbType DbType, object Value)> parameters)
    {
        var command = connection.CreateCommand();
        command.CommandText = text;
        foreach (var (name, dbType, value) in parameters)
        {
            command.AddParameter(name, dbType, value);
        }

        return command;
    }
    public static void ExecuteAndWriteResultToConsole(this NpgsqlCommand command)
    {
        Console.WriteLine($"Executing command... {command.CommandText}");
        
        using var reader = command.ExecuteReader();
        using var dataTable = new DataTable();
        dataTable.Load(reader);
        var cols = dataTable.Columns.Cast<DataColumn>().ToArray();
        Console.WriteLine(string.Join(Environment.NewLine, cols.Select((x, i) => $"Col{i} = {x}")));
        Console.WriteLine(string.Join("\t", cols.Select((_, i) => $"Col{i}")));
        foreach (var dataRow in dataTable.Rows.Cast<DataRow>())
        {
            Console.WriteLine(string.Join("\t", dataRow.ItemArray));
        }
    }

    public static void ExecuteNewCommandAndWriteResultToConsole(this NpgsqlConnection connection, 
        string text,
        IEnumerable<(string Name, NpgsqlDbType DbType, object Value)> parameters)
    {
        using var command = connection.CreateCommand(text, parameters);
        command.ExecuteAndWriteResultToConsole();
    }
}

输出:

Executing command... INSERT INTO sch_brand_payment_data_lake_proxy.supplier_balances (accounting_document, accounting_document_type, company_code, document_date_year) SEL
ECT * FROM unnest(@accounting_document_texts, @accounting_document_types, @company_codes, @document_date_years) RETURNING id;
Col0 = id
Col0
28
29
30
Executing command... SELECT * FROM sch_brand_payment_data_lake_proxy.supplier_balances WHERE (accounting_document, accounting_document_type, company_code, document_date_y
ear) IN (SELECT * FROM  unnest(@accounting_document_texts, @accounting_document_types, @company_codes, @document_date_years))
Col0 = id
Col1 = accounting_document
Col2 = accounting_document_type
Col3 = company_code
Col4 = document_date_year
Col5 = accounting_doc_created_by_user
Col6 = accounting_clerk
Col7 = assignment_reference
Col8 = document_reference_id
Col9 = original_reference_document
Col10 = payment_terms
Col11 = supplier
Col12 = supplier_name
Col13 = document_date
Col14 = posting_date
Col15 = net_due_date
Col16 = created_on
Col17 = modified_on
Col18 = pushed_on
Col19 = is_modified
Col20 = is_pushed
Col0    Col1    Col2    Col3    Col4    Col5    Col6    Col7    Col8    Col9    Col10   Col11   Col12   Col13   Col14   Col15   Col16   Col17   Col18   Col19   Col20
28      G       Y       Z       1                                                                                                                       False   False
29      G       Y       Z       2                                                                                                                       False   False
30      G       Y       Z       3                                                                                                                       False   False

[编辑 1]

由于@Charlieface 指出这不是合适的答案,我认为最好从 npgsql 维护者/贡献者那里获得答案/信息。

因此在他们的 GitHub 存储库上提交问题:https://github.com/npgsql/npgsql/issues/4437


原回答:

截至今天,除其他外,无法将元组或集合作为复合“类型”或通过 positional-slash-implicit“定义”传递,(然后可以在本应被使用的集合中使用传递给参数值 属性),npgslq 需要先前的 PostgreSQL 类型定义(但元组和嵌套集合仍然无法解决,因为维护者或至少其中之一认为不够安全) . https://github.com/npgsql/npgsql/issues/2154

As the exception says the corresponding composite is required in the database. This is because anonymous types are not mapped to records.

So, you should create a type and a struct which must be mapped to the type.

FYI, there is a similar issue #2097 to track mapping composites to value tuples.

但这需要 npgsql 的一些其他相关开发人员,例如 #2097 which has been dropped the author / main contributed deemed as too brittle in https://github.com/dotnet/efcore/issues/14661#issuecomment-462440199

Note that after discussion in npgsql/npgsql#2097 we decided to drop this idea. C# value tuples don't have names, so any mapping to PostgreSQL composites would rely on field definition ordering, which seems quite dangerous/brittle.

我终于决定接受 jsonb 替代方案,不是一个超级粉丝,但至少它允许以相对安全的方式传递集合(只要传递 jsonb 的序列化受到控制)。

但我最初设想的方法到今天已经无法实现了。


还有一件事我在写 post:

的过程中学到了