使用 DBIC 将多个 "new" 项插入数据库

Inserting several "new" items into the database with DBIC

我在一个 生物信息学 项目中工作,该项目要求我从各种生物中读取基因组数据(没什么特别的,只是把它想象成字符串)并将其插入到数据库。每个读数属于一个生物体,可以包含 5000 到 5000 万个基因,我需要在存储之前对其进行处理和分析。

当前执行此操作的脚本是用 perl 编写的,在所有计算之后,将结果存储在散列中,例如:

$new{$id}{gene_name}              = $id;
$new{$id}{gene_database_source} = $gene_database_source
$new{$id}{product}            = $product;
$new{$id}{sequence}               = $sequence;
$new{$id}{seqlength}              = $seqlength;
$new{$id}{digest}             = $digest;
$new{$id}{mw}                     = $mw;
$new{$id}{iep}                = $iep;
$new{$id}{tms}                = $tms;

读取所有基因后,插入通过散列循环进入 eval{} 语句。

eval {
foreach my $id (keys %new) {

  my $rs = $schema->resultset('Genes')->create(
    {
        gene_name               => $new{$id}{gene_name},
        gene_product            => $new{$id}{product},
        sequence                => $new{$id}{sequence},
        gene_protein_length     => $new{$id}{seqlength},
        digest                  => $new{$id}{digest},
        gene_isoelectric_point  => $new{$id}{iep},
        gene_molecular_weight   => $new{$id}{mw},
        gene_tmd_count          => $new{$id}{tms},
        gene_species            => $species,
        species_code            => $spc,
        user_id                 => $tdruserid,
        gene_database_source    => $new{$id}{gene_database_source}

    }
  );
}; 

虽然这个 "works",它至少有两个我想解决的问题:

我一直在考虑使用 DBIX $schema->new({..stuff..});new 指令,然后进行大量插入事务,而不是创建哈希。这将解决双重迭代,并且 eval 将对单个事务起作用(或不起作用),这将执行 < 要么所有插入或 none > ... 的预期行为是否有如何做到这一点?

您可以使用 TxnScopeGuard in DBIC 创建大量交易。在最基本的形式中,如下所示。

eval { # or try from Try::Tiny
    my $guard = $schema->txn_scope_guard;

    foreach my $id ( keys %new ) {
        my $rs = $schema->resultset('Genes')->create(
            {
                gene_name              => $new{$id}{gene_name},
                gene_product           => $new{$id}{product},
                sequence               => $new{$id}{sequence},
                gene_protein_length    => $new{$id}{seqlength},
                digest                 => $new{$id}{digest},
                gene_isoelectric_point => $new{$id}{iep},
                gene_molecular_weight  => $new{$id}{mw},
                gene_tmd_count         => $new{$id}{tms},
                gene_species           => $species,
                species_code           => $spc,
                user_id                => $tdruserid,
                gene_database_source   => $new{$id}{gene_database_source}

            }
        );
    }
    $guard->commit;
}

您创建一个范围保护对象,当您完成 transaction 的设置后,您 commit 它。如果对象超出范围,即因为 died,它会自动回滚事务。

eval可以捕捉到die,你的程序不会崩溃。您的那部分是正确的,但是您的代码不会撤消以前的插入也是正确的。请注意 Try::Tinytry 提供了更好的语法。不过这里不需要。

Transaction 在这种情况下意味着收集所有查询并同时 运行。

请注意,这仍将仅在每个 INSERT 语句中插入一行!

如果您想创建更大的 INSERT 语句,如下所示,您需要 populate,而不是 new

INSERT INTO foo (bar, baz) VALUES
(1, 1),
(2, 2),
(3, 3),
...

populate 方法允许您一次传入包含多行的数组引用。这应该比一次插入一个要快得多。

$schema->resultset("Artist")->populate([
  [ qw( artistid name ) ],
  [ 100, 'A Formally Unknown Singer' ],
  [ 101, 'A singer that jumped the shark two albums ago' ],
  [ 102, 'An actually cool singer' ],
]);

转换为您的循环,如下所示。请注意,文档声称如果您在 void context.

中 运行 它会更快
eval {
    $schema->resultset('Genes')->populate(
        [
            [
                                qw(
                    gene_name             gene_product   sequence
                    gene_protein_length   digest         gene_isoelectric_point
                    gene_molecular_weight gene_tmd_count gene_species
                    species_code          user_id        gene_database_source
        )
            ],
            map {
                [
                    $new{$_}{gene_name}, $new{$_}{product},
                    $new{$_}{sequence},  $new{$_}{seqlength},
                    $new{$_}{digest},    $new{$_}{iep},
                    $new{$_}{mw},        $new{$_}{tms},
                    $species,            $spc,
                    $tdruserid,          $new{$_}{gene_database_source},
                ]
            } keys %new
        ],
    );
}

像这样不需要作用域守卫。但是,我建议您不要在每个语句中执行超过 1000 行。出于性能原因,以块的形式处理它可能是个好主意。在那种情况下,您将一次遍历 1000 个键。 List::MoreUtils 有一个很好的 natatime 功能。

use List::MoreUtils 'natatime';

eval {
    my $guard = $schema->txn_scope_guard;

    my $it = natatime 1_000, keys %new;

    while ( my @keys = $it->() ) {
        $schema->resultset('Genes')->populate(
            [
                [
                    qw(
                        gene_name             gene_product   sequence
                        gene_protein_length   digest         gene_isoelectric_point
                        gene_molecular_weight gene_tmd_count gene_species
                        species_code          user_id        gene_database_source
                        )
                ],
                map {
                    [
                        $new{$_}{gene_name}, $new{$_}{product},
                        $new{$_}{sequence},  $new{$_}{seqlength},
                        $new{$_}{digest},    $new{$_}{iep},
                        $new{$_}{mw},        $new{$_}{tms},
                        $species,            $spc,
                        $tdruserid,          $new{$_}{gene_database_source},
                    ]
                } @keys
            ],
        );
    }

    $guard->commit;
}

现在每次插入将执行 1000 行,并且 运行 所有这些查询都在一个大事务中。如果其中之一失败,none将完成。

The script needs to loop twice through very large datasets (one while reading and creating the hashes, and once again when reading the hashes and performing the insertions). This makes the process' performance rather poor.

除此作业外,您没有显示如何创建数据。

$new{$id}{gene_name}              = $id;
$new{$id}{gene_database_source} = $gene_database_source
$new{$id}{product}            = $product;

如果仅此而已,没有什么能阻止您直接使用我在上面展示的方法,即您第一次处理数据并构建散列。以下代码不完整,因为您没有告诉我们数据来自何处,但您应该了解要点。

eval {
    my $guard = $schema->txn_scope_guard;

    # we use this to collect rows to process
    my @rows;

    # this is where your data comes in
    while ( my $foo = <DATA> ) {

        # here you process the data and come up with your variables
        my ( $id, $gene_database_source, $product, $sequence, $seqlength, 
             $digest, $mw, $iep, $tms );

        # collect the row so we can insert it later
        push(
            @rows,
            [
                $id, $gene_database_source, $product, $sequence, $seqlength, 
                $digest, $mw, $iep, $tms,
            ]
        );

        # only insert if we reached the limit
        if ( scalar @rows == 1000 ) {
            $schema->resultset('Genes')->populate(
                [
                    [
                        qw(
                            gene_name             gene_product   sequence
                            gene_protein_length   digest         gene_isoelectric_point
                            gene_molecular_weight gene_tmd_count gene_species
                            species_code          user_id        gene_database_source
                            )
                    ],
                    \@rows,
                ],
            );

            # empty the list of values
            @rows = ();
        }
    }
    $guard->commit;
}

基本上我们在处理它们时直接收集最多 1000 行作为数组引用,当我们达到限制时,我们将它们传递给数据库。然后我们重置我们的行数组并重新开始。同样,所有这些都包含在一个事务中,因此只有在所有插入都正常的情况下才会提交。


有更多信息on transactions in DBIC in the cookbook

请注意,我尚未测试任何此代码。