如何使用 DBD::Pg 和 Parallel::ForkManager 在多线程 Perl 中创建多个并行数据库连接?
How to create multiple parallel database connections in multi-threaded Perl with DBD::Pg and Parallel::ForkManager?
2021 年 7 月 21 日编辑如下
我有一个脚本,我在其中连接到 PostgreSQL DB 并且还使用 Parallel::ForkManager
.
生成多个线程
我创建了一个数据库句柄,然后准备了一个 SELECT
SQL 语句和一个 INSERT
SQL 语句。
我希望能够 运行 来自 Parallel::ForkManager
产生的每个线程的 SQL 语句,但它失败了,因为我无法在线程之间共享数据库句柄。
我需要使用下面的脚本(如果我生成多个线程,它会失败,但如果我只生成一个线程,它就可以工作),并更改它,使每个线程都可以 read/write from/to 数据库。
我知道我可以 clone
一个数据库句柄,但我也知道它还有更多。
如何使用并行数据库 handles/SQL 语句?
对于这篇文章的篇幅,我深表歉意,但我需要给出一个尽可能完整的例子。
示例:
use 5.24.0;
use strict;
use warnings;
use Parallel::ForkManager;
use SQL::Abstract;
use DBI ':sql_types';
use DBD::Pg qw/:pg_types/;
#@codes = ("A"); # testing single thread
@codes = ("A","B","F","M","S"); # testing multi-thread
################################################################
# connect to db
################################################################
my $dsn = 'DBI:Pg:dbname=DB_NAME';
my $userid = "DB_USERNAME";
my $sesame = "DB_PASSWORD";
my $dbh = DBI->connect($dsn, $userid, $sesame, {
AutoCommit => 1,
RaiseError => 1,
PrintError => 1
}) or die "Connection failed!\n" . $DBI::errstr;
################################################################
# test db connection
################################################################
my $me = $dbh->{Driver}{Name};
my $sversion = $dbh->{pg_server_version};
print "DBI is version $DBI::VERSION, "
. "I am $me, "
. "version of DBD::Pg is $DBD::Pg::VERSION, "
. "server is $sversion\n";
print "Name: $dbh->{Name}\n";
my %columns; # hash for persistent mapping of column-values
my @columns; # deterministic - $values[$columns{$column}];
set_columns(); # define column<->value mapping for db table
my $placeholders = join(", ", map { '?' } @columns);
################################################################
# Build the SELECT SQL statement
################################################################
my $sql_select_statement =
qq(SELECT count(*) FROM mytable WHERE id = ?;);
# prepare the SELECT statement handle
my $sth_select_statement = $dbh->prepare_cached($sql_select_statement);
################################################################
# Build the INSERT SQL statement
################################################################
my $sql_insert_statement =
"INSERT INTO mytable ("
. join(", ", @columns) # column names
. ") VALUES ($placeholders)";
# prepare the INSERT statement handle
my $sth_insert_statement = $dbh->prepare_cached($sql_insert_statement);
################################################################
# create Parallel::ForkManager object for @codes
################################################################
my $optimization = Parallel::ForkManager->new(scalar @codes);
$optimization->run_on_start(sub{
my ($pid,$ident) = @_;
print "Starting $ident under process id $pid\n";
});
$optimization->run_on_finish(sub{
my ($pid,
$exit_code,
$ident,
$exit_signal,
$core_dump,
$data_structure_reference) = @_;
});
my $thread_count = 0;
OPTIMIZATION:
for my $code (@codes) {
$thread_count++;
print "Thread $thread_count running for $code\n";
# fork optimization threads - per code
$optimization->start($code) and next OPTIMIZATION;
if ($code =~ m/A/i) {
sub_a("A");
} elsif ($code =~ m/B/i) {
sub_b("B");
} elsif ($code =~ m/F/i) {
sub_f("F");
} elsif ($code =~ m/M/i) {
sub_m("M");
} elsif ($code =~ m/S/i) {
sub_s("S");
}
print "$optimization->finish on child $code\n";
$optimization->finish(0);
}
print "$optimization->wait_all_children() is waiting...\n";
$optimization->wait_all_children();
################################################################
sub sub_a {
################################################################
my $code = shift; # Code
my @values;
# generate values specific to A
$varset{field1} = 'a_f1'; # for illustrative purposes
$varset{field2} = 'a_f2';
$varset{field3} = 'a_f3';
$varset{field4} = 'a_f4';
foreach my $key (keys %varset) {
my $column = $key; # the column name as key
my $value = $varset{$key}; # the column value (field variable)
$values[$columns{$column}] = $value; # add to list of column values
}
$values[0] = $code;
write_to_db($code, @values);
}
################################################################
sub sub_b {
################################################################
my $code = shift; # Code
my @values;
# generate values specific to B
$varset{field1} = 'b_f1'; # for illustrative purposes
$varset{field2} = 'b_f2';
$varset{field3} = 'b_f3';
$varset{field4} = 'b_f4';
foreach my $key (keys %varset) {
my $column = $key; # the column name as key
my $value = $varset{$key}; # the column value (field variable)
$values[$columns{$column}] = $value; # add to list of column values
}
$values[0] = $code;
write_to_db($code, @values);
}
################################################################
sub sub_f {
################################################################
my $code = shift; # Code
my @values;
# generate values specific to F
$varset{field1} = 'f_f1'; # for illustrative purposes
$varset{field2} = 'f_f2';
$varset{field3} = 'f_f3';
$varset{field4} = 'f_f4';
foreach my $key (keys %varset) {
my $column = $key; # the column name as key
my $value = $varset{$key}; # the column value (field variable)
$values[$columns{$column}] = $value; # add to list of column values
}
$values[0] = $code;
write_to_db($code, @values);
}
################################################################
sub sub_m {
################################################################
my $code = shift; # Code
my @values;
# generate values specific to M
$varset{field1} = 'm_f1'; # for illustrative purposes
$varset{field2} = 'm_f2';
$varset{field3} = 'm_f3';
$varset{field4} = 'm_f4';
foreach my $key (keys %varset) {
my $column = $key; # the column name as key
my $value = $varset{$key}; # the column value (field variable)
$values[$columns{$column}] = $value; # add to list of column values
}
$values[0] = $code;
write_to_db($code, @values);
}
################################################################
sub sub_s {
################################################################
my $code = shift; # Code
my @values;
# generate values specific to S
$varset{field1} = 's_f1'; # for illustrative purposes
$varset{field2} = 's_f2';
$varset{field3} = 's_f3';
$varset{field4} = 's_f4';
foreach my $key (keys %varset) {
my $column = $key; # the column name as key
my $value = $varset{$key}; # the column value (field variable)
$values[$columns{$column}] = $value; # add to list of column values
}
$values[0] = $code;
write_to_db($code, @values);
}
################################################################
sub write_to_db {
################################################################
my ($code_name, @values) = @_;
my $rv_code = $sth_select_statement->execute($code_name);
if($rv_code < 0) {
print $DBI::errstr;
}
my @row = $sth_select_statement->fetchrow_array();
# if the SELECT found no existing records for this strategy, then INSERT it
unless ($row[0] > 0) {
# INSERT settings into 'mytable'
$sth_insert_statement->execute(@values);
}
}
################################################################
sub set_columns {
################################################################
$columns{code} = 0;
$columns{field1} = 1;
$columns{field2} = 2;
$columns{field3} = 3;
$columns{field4} = 4;
$columns[0] = 'code';
$columns[1] = 'field1';
$columns[2] = 'field2';
$columns[3] = 'field3';
$columns[4] = 'field4';
}
编辑 2021-07-21:
我添加了clone_dbh
和create_dbh
子程序来尝试不同的方法。既不工作。我仍然遇到同样的错误 - can't share DBH between threads
。除了放弃子例程并为 Parallel::ForkManager
产生的每个线程编写重复代码外,我不知道还能做什么,我真的不想这样做。
示例 2:
use 5.24.0;
use strict;
use warnings;
use Parallel::ForkManager;
use SQL::Abstract;
use DBI ':sql_types';
use DBD::Pg qw/:pg_types/;
#@codes = ("A"); # testing single thread
my @codes = ("A","B","F","M","S"); # testing multi-thread
my %varset;
################################################################
# connect to db
################################################################
my $dsn = 'DBI:Pg:dbname=$ENV{DB_NAME}';
my $userid = $ENV{DBI_USER};
my $sesame = $ENV{DBI_PASS};
my %dbh; # hash for storing dbh handles
my $dbh = DBI->connect($dsn, $userid, $sesame, {
AutoCommit => 1,
RaiseError => 1,
PrintError => 1
}) or die "Connection failed!\n" . $DBI::errstr;
################################################################
# test db connection
################################################################
my $me = $dbh->{Driver}{Name};
my $sversion = $dbh->{pg_server_version};
print "DBI is version $DBI::VERSION, "
. "I am $me, "
. "version of DBD::Pg is $DBD::Pg::VERSION, "
. "server is $sversion\n";
print "Name: $dbh->{Name}\n";
################################################################
# prepare array and hash for matching db columns
################################################################
my %columns; # hash for persistent mapping of column-values
my @columns; # deterministic - $values[$columns{$column}];
set_columns(); # define column<->value mapping for db table
my $placeholders = join(", ", map { '?' } @columns);
################################################################
# Build the SELECT SQL statement
################################################################
my $sql_select_statement =
qq(SELECT count(*) FROM mytable WHERE id = ?;);
################################################################
# Build the INSERT SQL statement
################################################################
my $sql_insert_statement =
"INSERT INTO mytable ("
. join(", ", @columns) # column names
. ") VALUES ($placeholders)";
################################################################################################
# create clones of database handle and SQL statements for threads
################################################################################################
my %sth_select_code;
my %sth_insert_code;
#for my $code (@codes) {
# $dbh{$code} = $dbh->clone();
# # prepare the SELECT statement handle
# $sth_select_code{$code} = $dbh{$code}->prepare_cached($sql_select_statement);
# # prepare the INSERT statement handle
# $sth_insert_code{$code} = $dbh{$code}->prepare_cached($sql_insert_statement);
#}
################################################################
# create Parallel::ForkManager object for @codes
################################################################
my $optimization = Parallel::ForkManager->new(scalar @codes);
$optimization->run_on_start(sub{
my ($pid,$ident) = @_;
print "Starting $ident under process id $pid\n";
});
$optimization->run_on_finish(sub{
my ($pid,
$exit_code,
$ident,
$exit_signal,
$core_dump,
$data_structure_reference) = @_;
});
my $thread_count = 0;
OPTIMIZATION:
for my $code (@codes) {
$thread_count++;
print "Thread $thread_count running for $code\n";
# fork optimization threads - per code
$optimization->start($code) and next OPTIMIZATION;
if ($code =~ m/A/i) {
sub_a("A");
} elsif ($code =~ m/B/i) {
sub_b("B");
} elsif ($code =~ m/F/i) {
sub_f("F");
} elsif ($code =~ m/M/i) {
sub_m("M");
} elsif ($code =~ m/S/i) {
sub_s("S");
}
print "$optimization->finish on child $code\n";
$optimization->finish(0);
}
print "$optimization->wait_all_children() is waiting...\n";
$optimization->wait_all_children();
################################################################################################
# disconnect from database
################################################################################################
for my $code (@codes) {
$sth_select_code{$code}->finish();
$sth_insert_code{$code}->finish();
$dbh{$code}->disconnect;
}
$dbh->disconnect;
################################################################################################
# end
################################################################################################
exit;
################################################################
sub sub_a {
################################################################
my $code = shift; # Code
my @values;
# clone_dbh($code);
create_dbh($code);
# my ($dbh, $sel_sth, $ins_sth) = create_dbh($code);
# generate values specific to A
$varset{field2} = 'a_f1'; # for illustrative purposes
$varset{field3} = 'a_f2';
$varset{field4} = 'a_f3';
$varset{field5} = 'a_f4';
foreach my $key (keys %varset) {
my $column = $key; # the column name as key
my $value = $varset{$key}; # the column value (field variable)
$values[$columns{$column}] = $value; # add to list of column values
}
$values[0] = $code;
write_to_db($code, @values);
}
################################################################
sub sub_b {
################################################################
my $code = shift; # Code
my @values;
# clone_dbh($code);
create_dbh($code);
# my ($dbh, $sel_sth, $ins_sth) = create_dbh($code);
# generate values specific to B
$varset{field2} = 'b_f1'; # for illustrative purposes
$varset{field3} = 'b_f2';
$varset{field4} = 'b_f3';
$varset{field5} = 'b_f4';
foreach my $key (keys %varset) {
my $column = $key; # the column name as key
my $value = $varset{$key}; # the column value (field variable)
$values[$columns{$column}] = $value; # add to list of column values
}
$values[0] = $code;
write_to_db($code, @values);
}
################################################################
sub sub_f {
################################################################
my $code = shift; # Code
my @values;
# clone_dbh($code);
create_dbh($code);
# my ($dbh, $sel_sth, $ins_sth) = create_dbh($code);
# generate values specific to F
$varset{field2} = 'f_f1'; # for illustrative purposes
$varset{field3} = 'f_f2';
$varset{field4} = 'f_f3';
$varset{field5} = 'f_f4';
foreach my $key (keys %varset) {
my $column = $key; # the column name as key
my $value = $varset{$key}; # the column value (field variable)
$values[$columns{$column}] = $value; # add to list of column values
}
$values[0] = $code;
write_to_db($code, @values);
}
################################################################
sub sub_m {
################################################################
my $code = shift; # Code
my @values;
# clone_dbh($code);
create_dbh($code);
# my ($dbh, $sel_sth, $ins_sth) = create_dbh($code);
# generate values specific to M
$varset{field2} = 'm_f1'; # for illustrative purposes
$varset{field3} = 'm_f2';
$varset{field4} = 'm_f3';
$varset{field5} = 'm_f4';
foreach my $key (keys %varset) {
my $column = $key; # the column name as key
my $value = $varset{$key}; # the column value (field variable)
$values[$columns{$column}] = $value; # add to list of column values
}
$values[0] = $code;
write_to_db($code, @values);
}
################################################################
sub sub_s {
################################################################
my $code = shift; # Code
my @values;
# clone_dbh($code);
create_dbh($code);
# my ($dbh, $sel_sth, $ins_sth) = create_dbh($code);
# generate values specific to S
$varset{field2} = 's_f1'; # for illustrative purposes
$varset{field3} = 's_f2';
$varset{field4} = 's_f3';
$varset{field5} = 's_f4';
foreach my $key (keys %varset) {
my $column = $key; # the column name as key
my $value = $varset{$key}; # the column value (field variable)
$values[$columns{$column}] = $value; # add to list of column values
}
$values[0] = $code;
write_to_db($code, @values);
}
################################################################################
sub create_dbh {
################################################################################
my $code = shift;
$dbh{$code} = DBI->connect($dsn, $userid, $sesame, {
AutoCommit => 1,
RaiseError => 1,
PrintError => 1
}) or die "Connection failed!\n" . $DBI::errstr;
# did it work? are we there yet?
my $me = $dbh{$code}->{Driver}{Name};
my $sversion = $dbh{$code}->{pg_server_version};
print "DBI is version $DBI::VERSION, "
. "I am $me, "
. "version of DBD::Pg is $DBD::Pg::VERSION, "
. "server is $sversion\n";
print "Name: $dbh->{Name}\n";
# prepare the SELECT statement handle
$sth_select_code{$code} = $dbh{$code}->prepare_cached($sql_select_statement);
# prepare the INSERT statement handle
$sth_insert_code{$code} = $dbh{$code}->prepare_cached($sql_insert_statement);
}
################################################################
sub clone_dbh {
################################################################
my $code = shift;
$dbh{$code} = $dbh->clone();
# prepare the SELECT statement handle
$sth_select_code{$code} = $dbh{$code}->prepare_cached($sql_select_statement);
# prepare the INSERT statement handle
$sth_insert_code{$code} = $dbh{$code}->prepare_cached($sql_insert_statement);
}
################################################################
sub write_to_db {
################################################################
my ($code, @values) = shift @_;
my $rv_code = $sth_select_code{$code}->execute($code);
if($rv_code < 0) {
print $DBI::errstr;
}
my @row = $sth_select_code{$code}->fetchrow_array();
# if the SELECT found no existing records for this strategy, then INSERT it
unless ($row[0] > 0) {
# INSERT settings into 'mytable'
$sth_insert_code{$code}->execute(@values);
}
}
################################################################
sub set_columns {
################################################################
$columns{field1} = 0;
$columns{field2} = 1;
$columns{field3} = 2;
$columns{field4} = 3;
$columns{field5} = 4;
$columns[0] = 'field1';
$columns[1] = 'field2';
$columns[2] = 'field3';
$columns[3] = 'field4';
$columns[4] = 'field5';
}
我是否必须从调用 sub_x
线程中明确地将 dbh
和 statement
句柄传递给 create_dbh
子程序?
我是否必须 return handles
或 handle_refs
从 create_dbh
回到调用 sub_x
线程?
我不知道如何解决这个问题,但这似乎是词法作用域或 object/memory 访问问题。
还有什么想法吗?
我不太确定你想要什么,但也许这些指示会有所帮助:
准备好的语句在创建它们的数据库会话中是本地的。因此,如果您希望在所有数据库会话中使用它们,则必须在每个数据库会话中都进行准备。
您始终可以并行 运行 个语句,只要每个语句 运行 在其自己的数据库会话中。
您还可以在数据库中对单个查询进行并行化,以便多个数据库进程一起处理它。如果 PostgreSQL 被适当配置并且优化器认为查询将受益,这会自动发生。有关详细信息,请参阅 the documentation。
我使用下面的代码让它工作:
use 5.24.0;
use strict;
use warnings;
use Parallel::ForkManager;
use SQL::Abstract;
use DBI ':sql_types';
use DBD::Pg qw/:pg_types/;
#my @codes = ("A"); # testing single thread
my @codes = ("A","B","F","M","S"); # testing multi-thread
my %varset;
################################################################
# get db connection info
################################################################
my $dsn = 'DBI:Pg:dbname=mt4_test';
my $userid = $ENV{DBI_USER};
my $sesame = $ENV{DBI_PASS};
my %dbh; # hash for storing dbh handles
################################################################
# prepare array and hash for matching db columns
################################################################
my %columns; # hash for persistent mapping of column-values
my @columns; # deterministic - $values[$columns{$column}];
set_columns(); # define column<->value mapping for db table
my $placeholders = join(", ", map { '?' } @columns);
################################################################
# Build the SELECT SQL statement
################################################################
my $sql_select_statement =
qq(SELECT count(*) FROM mytable WHERE field1 = ?;);
################################################################
# Build the INSERT SQL statement
################################################################
my $sql_insert_statement =
"INSERT INTO mytable ("
. join(", ", @columns) # column names
. ") VALUES ($placeholders)";
################################################################
# hash for storing SQL statement handles for threads
################################################################
my %sth_select_code;
my %sth_insert_code;
################################################################
# create Parallel::ForkManager object for @codes
################################################################
my $optimization = Parallel::ForkManager->new(scalar @codes);
$optimization->run_on_start(sub{
my ($pid,$ident) = @_;
print "Starting $ident under process id $pid\n";
});
$optimization->run_on_finish(sub{
my ($pid,
$exit_code,
$ident,
$exit_signal,
$core_dump,
$data_structure_reference) = @_;
});
my $thread_count = 0;
OPTIMIZATION:
for my $code (@codes) {
$thread_count++;
print "Thread $thread_count running for $code\n";
# fork optimization threads - per code
if (scalar @codes > 1) {
$optimization->start($code) and next OPTIMIZATION;
} else {
$optimization->start($code);
}
launch_sub($code);
print "$optimization->finish on child $code\n";
$optimization->finish(0);
}
print "$optimization->wait_all_children() is waiting...\n";
$optimization->wait_all_children();
################################################################
# THE END
################################################################
exit;
################################################################
################################################################
sub launch_sub {
################################################################
my $code = shift;
if ($code =~ m/A/i) {
sub_a("A");
} elsif ($code =~ m/B/i) {
sub_b("B");
} elsif ($code =~ m/F/i) {
sub_f("F");
} elsif ($code =~ m/M/i) {
sub_m("M");
} elsif ($code =~ m/S/i) {
sub_s("S");
}
}
################################################################
sub sub_a {
################################################################
my $code = shift; # Code
my @values;
my ($dbh, $sel_sth, $ins_sth) = create_dbh($dbh{$code},$sth_select_code{$code},$sth_insert_code{$code});
# generate values specific to A
$varset{field2} = 'a_f1'; # for illustrative purposes
$varset{field3} = 'a_f2';
$varset{field4} = 'a_f3';
$varset{field5} = 'a_f4';
foreach my $key (keys %varset) {
my $column = $key; # the column name as key
my $value = $varset{$key}; # the column value (field variable)
$values[$columns{$column}] = $value; # add to list of column values
}
$values[0] = $code;
write_to_db($code, @values);
disconnect_dbh($dbh{$code},$sth_select_code{$code},$sth_insert_code{$code});
}
################################################################
sub sub_b {
################################################################
my $code = shift; # Code
my @values;
my ($dbh, $sel_sth, $ins_sth) = create_dbh($dbh{$code},$sth_select_code{$code},$sth_insert_code{$code});
# generate values specific to B
$varset{field2} = 'b_f1'; # for illustrative purposes
$varset{field3} = 'b_f2';
$varset{field4} = 'b_f3';
$varset{field5} = 'b_f4';
foreach my $key (keys %varset) {
my $column = $key; # the column name as key
my $value = $varset{$key}; # the column value (field variable)
$values[$columns{$column}] = $value; # add to list of column values
}
$values[0] = $code;
write_to_db($code, @values);
disconnect_dbh($dbh{$code},$sth_select_code{$code},$sth_insert_code{$code});
}
################################################################
sub sub_f {
################################################################
my $code = shift; # Code
my @values;
my ($dbh, $sel_sth, $ins_sth) = create_dbh($dbh{$code},$sth_select_code{$code},$sth_insert_code{$code});
# generate values specific to F
$varset{field2} = 'f_f1'; # for illustrative purposes
$varset{field3} = 'f_f2';
$varset{field4} = 'f_f3';
$varset{field5} = 'f_f4';
foreach my $key (keys %varset) {
my $column = $key; # the column name as key
my $value = $varset{$key}; # the column value (field variable)
$values[$columns{$column}] = $value; # add to list of column values
}
$values[0] = $code;
write_to_db($code, @values);
disconnect_dbh($dbh{$code},$sth_select_code{$code},$sth_insert_code{$code});
}
################################################################
sub sub_m {
################################################################
my $code = shift; # Code
my @values;
my ($dbh, $sel_sth, $ins_sth) = create_dbh($dbh{$code},$sth_select_code{$code},$sth_insert_code{$code});
# generate values specific to M
$varset{field2} = 'm_f1'; # for illustrative purposes
$varset{field3} = 'm_f2';
$varset{field4} = 'm_f3';
$varset{field5} = 'm_f4';
foreach my $key (keys %varset) {
my $column = $key; # the column name as key
my $value = $varset{$key}; # the column value (field variable)
$values[$columns{$column}] = $value; # add to list of column values
}
$values[0] = $code;
write_to_db($code, @values);
disconnect_dbh($dbh{$code},$sth_select_code{$code},$sth_insert_code{$code});
}
################################################################
sub sub_s {
################################################################
my $code = shift; # Code
my @values;
my ($dbh, $sel_sth, $ins_sth) = create_dbh($dbh{$code},$sth_select_code{$code},$sth_insert_code{$code});
# generate values specific to S
$varset{field2} = 's_f1'; # for illustrative purposes
$varset{field3} = 's_f2';
$varset{field4} = 's_f3';
$varset{field5} = 's_f4';
foreach my $key (keys %varset) {
my $column = $key; # the column name as key
my $value = $varset{$key}; # the column value (field variable)
$values[$columns{$column}] = $value; # add to list of column values
}
$values[0] = $code;
write_to_db($code, @values);
disconnect_dbh($dbh{$code},$sth_select_code{$code},$sth_insert_code{$code});
}
################################################################
sub create_dbh {
################################################################
my $dbh_ref = shift;
my $sel_ref = shift;
my $ins_ref = shift;
${$dbh_ref} = DBI->connect($dsn, $userid, $sesame, {
AutoCommit => 1,
RaiseError => 1,
PrintError => 1
}) or die "Connection failed!\n" . $DBI::errstr;
# did it work? are we there yet?
my $me = ${$dbh_ref}->{Driver}{Name};
my $sversion = ${$dbh_ref}->{pg_server_version};
print "DBI is version $DBI::VERSION, "
. "I am $me, "
. "version of DBD::Pg is $DBD::Pg::VERSION, "
. "server is $sversion\n";
print "Name: ${$dbh_ref}->{Name}\n";
# prepare the SELECT statement handle
${$sel_ref} = ${$dbh_ref}->prepare_cached($sql_select_statement);
# prepare the INSERT statement handle
${$ins_ref} = ${$dbh_ref}->prepare_cached($sql_insert_statement);
}
################################################################
sub write_to_db {
################################################################
my ($code, @values) = @_;
my $rv_code = $sth_select_code{$code}->execute($code);
say "SQL SELECT for $code: rv_code = $rv_code";
if($rv_code < 0) {
print $DBI::errstr;
}
my @row = $sth_select_code{$code}->fetchrow_array();
# if the SELECT found no existing records for this strategy, then INSERT it
unless ($row[0] > 0) {
# INSERT settings into 'mytable'
$sth_insert_code{$code}->execute(@values);
say "SQL INSERT for $code";
}
}
################################################################
sub disconnect_dbh {
################################################################
my $dbh_ref = shift;
my $sel_ref = shift;
my $ins_ref = shift;
${$sel_ref}->finish();
${$ins_ref}->finish();
${$dbh_ref}->disconnect;
say "disconnected dbh_ref: $dbh_ref";
}
################################################################
sub set_columns {
################################################################
$columns{field1} = 0;
$columns{field2} = 1;
$columns{field3} = 2;
$columns{field4} = 3;
$columns{field5} = 4;
$columns[0] = 'field1';
$columns[1] = 'field2';
$columns[2] = 'field3';
$columns[3] = 'field4';
$columns[4] = 'field5';
}
2021 年 7 月 21 日编辑如下
我有一个脚本,我在其中连接到 PostgreSQL DB 并且还使用 Parallel::ForkManager
.
我创建了一个数据库句柄,然后准备了一个 SELECT
SQL 语句和一个 INSERT
SQL 语句。
我希望能够 运行 来自 Parallel::ForkManager
产生的每个线程的 SQL 语句,但它失败了,因为我无法在线程之间共享数据库句柄。
我需要使用下面的脚本(如果我生成多个线程,它会失败,但如果我只生成一个线程,它就可以工作),并更改它,使每个线程都可以 read/write from/to 数据库。
我知道我可以 clone
一个数据库句柄,但我也知道它还有更多。
如何使用并行数据库 handles/SQL 语句?
对于这篇文章的篇幅,我深表歉意,但我需要给出一个尽可能完整的例子。
示例:
use 5.24.0;
use strict;
use warnings;
use Parallel::ForkManager;
use SQL::Abstract;
use DBI ':sql_types';
use DBD::Pg qw/:pg_types/;
#@codes = ("A"); # testing single thread
@codes = ("A","B","F","M","S"); # testing multi-thread
################################################################
# connect to db
################################################################
my $dsn = 'DBI:Pg:dbname=DB_NAME';
my $userid = "DB_USERNAME";
my $sesame = "DB_PASSWORD";
my $dbh = DBI->connect($dsn, $userid, $sesame, {
AutoCommit => 1,
RaiseError => 1,
PrintError => 1
}) or die "Connection failed!\n" . $DBI::errstr;
################################################################
# test db connection
################################################################
my $me = $dbh->{Driver}{Name};
my $sversion = $dbh->{pg_server_version};
print "DBI is version $DBI::VERSION, "
. "I am $me, "
. "version of DBD::Pg is $DBD::Pg::VERSION, "
. "server is $sversion\n";
print "Name: $dbh->{Name}\n";
my %columns; # hash for persistent mapping of column-values
my @columns; # deterministic - $values[$columns{$column}];
set_columns(); # define column<->value mapping for db table
my $placeholders = join(", ", map { '?' } @columns);
################################################################
# Build the SELECT SQL statement
################################################################
my $sql_select_statement =
qq(SELECT count(*) FROM mytable WHERE id = ?;);
# prepare the SELECT statement handle
my $sth_select_statement = $dbh->prepare_cached($sql_select_statement);
################################################################
# Build the INSERT SQL statement
################################################################
my $sql_insert_statement =
"INSERT INTO mytable ("
. join(", ", @columns) # column names
. ") VALUES ($placeholders)";
# prepare the INSERT statement handle
my $sth_insert_statement = $dbh->prepare_cached($sql_insert_statement);
################################################################
# create Parallel::ForkManager object for @codes
################################################################
my $optimization = Parallel::ForkManager->new(scalar @codes);
$optimization->run_on_start(sub{
my ($pid,$ident) = @_;
print "Starting $ident under process id $pid\n";
});
$optimization->run_on_finish(sub{
my ($pid,
$exit_code,
$ident,
$exit_signal,
$core_dump,
$data_structure_reference) = @_;
});
my $thread_count = 0;
OPTIMIZATION:
for my $code (@codes) {
$thread_count++;
print "Thread $thread_count running for $code\n";
# fork optimization threads - per code
$optimization->start($code) and next OPTIMIZATION;
if ($code =~ m/A/i) {
sub_a("A");
} elsif ($code =~ m/B/i) {
sub_b("B");
} elsif ($code =~ m/F/i) {
sub_f("F");
} elsif ($code =~ m/M/i) {
sub_m("M");
} elsif ($code =~ m/S/i) {
sub_s("S");
}
print "$optimization->finish on child $code\n";
$optimization->finish(0);
}
print "$optimization->wait_all_children() is waiting...\n";
$optimization->wait_all_children();
################################################################
sub sub_a {
################################################################
my $code = shift; # Code
my @values;
# generate values specific to A
$varset{field1} = 'a_f1'; # for illustrative purposes
$varset{field2} = 'a_f2';
$varset{field3} = 'a_f3';
$varset{field4} = 'a_f4';
foreach my $key (keys %varset) {
my $column = $key; # the column name as key
my $value = $varset{$key}; # the column value (field variable)
$values[$columns{$column}] = $value; # add to list of column values
}
$values[0] = $code;
write_to_db($code, @values);
}
################################################################
sub sub_b {
################################################################
my $code = shift; # Code
my @values;
# generate values specific to B
$varset{field1} = 'b_f1'; # for illustrative purposes
$varset{field2} = 'b_f2';
$varset{field3} = 'b_f3';
$varset{field4} = 'b_f4';
foreach my $key (keys %varset) {
my $column = $key; # the column name as key
my $value = $varset{$key}; # the column value (field variable)
$values[$columns{$column}] = $value; # add to list of column values
}
$values[0] = $code;
write_to_db($code, @values);
}
################################################################
sub sub_f {
################################################################
my $code = shift; # Code
my @values;
# generate values specific to F
$varset{field1} = 'f_f1'; # for illustrative purposes
$varset{field2} = 'f_f2';
$varset{field3} = 'f_f3';
$varset{field4} = 'f_f4';
foreach my $key (keys %varset) {
my $column = $key; # the column name as key
my $value = $varset{$key}; # the column value (field variable)
$values[$columns{$column}] = $value; # add to list of column values
}
$values[0] = $code;
write_to_db($code, @values);
}
################################################################
sub sub_m {
################################################################
my $code = shift; # Code
my @values;
# generate values specific to M
$varset{field1} = 'm_f1'; # for illustrative purposes
$varset{field2} = 'm_f2';
$varset{field3} = 'm_f3';
$varset{field4} = 'm_f4';
foreach my $key (keys %varset) {
my $column = $key; # the column name as key
my $value = $varset{$key}; # the column value (field variable)
$values[$columns{$column}] = $value; # add to list of column values
}
$values[0] = $code;
write_to_db($code, @values);
}
################################################################
sub sub_s {
################################################################
my $code = shift; # Code
my @values;
# generate values specific to S
$varset{field1} = 's_f1'; # for illustrative purposes
$varset{field2} = 's_f2';
$varset{field3} = 's_f3';
$varset{field4} = 's_f4';
foreach my $key (keys %varset) {
my $column = $key; # the column name as key
my $value = $varset{$key}; # the column value (field variable)
$values[$columns{$column}] = $value; # add to list of column values
}
$values[0] = $code;
write_to_db($code, @values);
}
################################################################
sub write_to_db {
################################################################
my ($code_name, @values) = @_;
my $rv_code = $sth_select_statement->execute($code_name);
if($rv_code < 0) {
print $DBI::errstr;
}
my @row = $sth_select_statement->fetchrow_array();
# if the SELECT found no existing records for this strategy, then INSERT it
unless ($row[0] > 0) {
# INSERT settings into 'mytable'
$sth_insert_statement->execute(@values);
}
}
################################################################
sub set_columns {
################################################################
$columns{code} = 0;
$columns{field1} = 1;
$columns{field2} = 2;
$columns{field3} = 3;
$columns{field4} = 4;
$columns[0] = 'code';
$columns[1] = 'field1';
$columns[2] = 'field2';
$columns[3] = 'field3';
$columns[4] = 'field4';
}
编辑 2021-07-21:
我添加了clone_dbh
和create_dbh
子程序来尝试不同的方法。既不工作。我仍然遇到同样的错误 - can't share DBH between threads
。除了放弃子例程并为 Parallel::ForkManager
产生的每个线程编写重复代码外,我不知道还能做什么,我真的不想这样做。
示例 2:
use 5.24.0;
use strict;
use warnings;
use Parallel::ForkManager;
use SQL::Abstract;
use DBI ':sql_types';
use DBD::Pg qw/:pg_types/;
#@codes = ("A"); # testing single thread
my @codes = ("A","B","F","M","S"); # testing multi-thread
my %varset;
################################################################
# connect to db
################################################################
my $dsn = 'DBI:Pg:dbname=$ENV{DB_NAME}';
my $userid = $ENV{DBI_USER};
my $sesame = $ENV{DBI_PASS};
my %dbh; # hash for storing dbh handles
my $dbh = DBI->connect($dsn, $userid, $sesame, {
AutoCommit => 1,
RaiseError => 1,
PrintError => 1
}) or die "Connection failed!\n" . $DBI::errstr;
################################################################
# test db connection
################################################################
my $me = $dbh->{Driver}{Name};
my $sversion = $dbh->{pg_server_version};
print "DBI is version $DBI::VERSION, "
. "I am $me, "
. "version of DBD::Pg is $DBD::Pg::VERSION, "
. "server is $sversion\n";
print "Name: $dbh->{Name}\n";
################################################################
# prepare array and hash for matching db columns
################################################################
my %columns; # hash for persistent mapping of column-values
my @columns; # deterministic - $values[$columns{$column}];
set_columns(); # define column<->value mapping for db table
my $placeholders = join(", ", map { '?' } @columns);
################################################################
# Build the SELECT SQL statement
################################################################
my $sql_select_statement =
qq(SELECT count(*) FROM mytable WHERE id = ?;);
################################################################
# Build the INSERT SQL statement
################################################################
my $sql_insert_statement =
"INSERT INTO mytable ("
. join(", ", @columns) # column names
. ") VALUES ($placeholders)";
################################################################################################
# create clones of database handle and SQL statements for threads
################################################################################################
my %sth_select_code;
my %sth_insert_code;
#for my $code (@codes) {
# $dbh{$code} = $dbh->clone();
# # prepare the SELECT statement handle
# $sth_select_code{$code} = $dbh{$code}->prepare_cached($sql_select_statement);
# # prepare the INSERT statement handle
# $sth_insert_code{$code} = $dbh{$code}->prepare_cached($sql_insert_statement);
#}
################################################################
# create Parallel::ForkManager object for @codes
################################################################
my $optimization = Parallel::ForkManager->new(scalar @codes);
$optimization->run_on_start(sub{
my ($pid,$ident) = @_;
print "Starting $ident under process id $pid\n";
});
$optimization->run_on_finish(sub{
my ($pid,
$exit_code,
$ident,
$exit_signal,
$core_dump,
$data_structure_reference) = @_;
});
my $thread_count = 0;
OPTIMIZATION:
for my $code (@codes) {
$thread_count++;
print "Thread $thread_count running for $code\n";
# fork optimization threads - per code
$optimization->start($code) and next OPTIMIZATION;
if ($code =~ m/A/i) {
sub_a("A");
} elsif ($code =~ m/B/i) {
sub_b("B");
} elsif ($code =~ m/F/i) {
sub_f("F");
} elsif ($code =~ m/M/i) {
sub_m("M");
} elsif ($code =~ m/S/i) {
sub_s("S");
}
print "$optimization->finish on child $code\n";
$optimization->finish(0);
}
print "$optimization->wait_all_children() is waiting...\n";
$optimization->wait_all_children();
################################################################################################
# disconnect from database
################################################################################################
for my $code (@codes) {
$sth_select_code{$code}->finish();
$sth_insert_code{$code}->finish();
$dbh{$code}->disconnect;
}
$dbh->disconnect;
################################################################################################
# end
################################################################################################
exit;
################################################################
sub sub_a {
################################################################
my $code = shift; # Code
my @values;
# clone_dbh($code);
create_dbh($code);
# my ($dbh, $sel_sth, $ins_sth) = create_dbh($code);
# generate values specific to A
$varset{field2} = 'a_f1'; # for illustrative purposes
$varset{field3} = 'a_f2';
$varset{field4} = 'a_f3';
$varset{field5} = 'a_f4';
foreach my $key (keys %varset) {
my $column = $key; # the column name as key
my $value = $varset{$key}; # the column value (field variable)
$values[$columns{$column}] = $value; # add to list of column values
}
$values[0] = $code;
write_to_db($code, @values);
}
################################################################
sub sub_b {
################################################################
my $code = shift; # Code
my @values;
# clone_dbh($code);
create_dbh($code);
# my ($dbh, $sel_sth, $ins_sth) = create_dbh($code);
# generate values specific to B
$varset{field2} = 'b_f1'; # for illustrative purposes
$varset{field3} = 'b_f2';
$varset{field4} = 'b_f3';
$varset{field5} = 'b_f4';
foreach my $key (keys %varset) {
my $column = $key; # the column name as key
my $value = $varset{$key}; # the column value (field variable)
$values[$columns{$column}] = $value; # add to list of column values
}
$values[0] = $code;
write_to_db($code, @values);
}
################################################################
sub sub_f {
################################################################
my $code = shift; # Code
my @values;
# clone_dbh($code);
create_dbh($code);
# my ($dbh, $sel_sth, $ins_sth) = create_dbh($code);
# generate values specific to F
$varset{field2} = 'f_f1'; # for illustrative purposes
$varset{field3} = 'f_f2';
$varset{field4} = 'f_f3';
$varset{field5} = 'f_f4';
foreach my $key (keys %varset) {
my $column = $key; # the column name as key
my $value = $varset{$key}; # the column value (field variable)
$values[$columns{$column}] = $value; # add to list of column values
}
$values[0] = $code;
write_to_db($code, @values);
}
################################################################
sub sub_m {
################################################################
my $code = shift; # Code
my @values;
# clone_dbh($code);
create_dbh($code);
# my ($dbh, $sel_sth, $ins_sth) = create_dbh($code);
# generate values specific to M
$varset{field2} = 'm_f1'; # for illustrative purposes
$varset{field3} = 'm_f2';
$varset{field4} = 'm_f3';
$varset{field5} = 'm_f4';
foreach my $key (keys %varset) {
my $column = $key; # the column name as key
my $value = $varset{$key}; # the column value (field variable)
$values[$columns{$column}] = $value; # add to list of column values
}
$values[0] = $code;
write_to_db($code, @values);
}
################################################################
sub sub_s {
################################################################
my $code = shift; # Code
my @values;
# clone_dbh($code);
create_dbh($code);
# my ($dbh, $sel_sth, $ins_sth) = create_dbh($code);
# generate values specific to S
$varset{field2} = 's_f1'; # for illustrative purposes
$varset{field3} = 's_f2';
$varset{field4} = 's_f3';
$varset{field5} = 's_f4';
foreach my $key (keys %varset) {
my $column = $key; # the column name as key
my $value = $varset{$key}; # the column value (field variable)
$values[$columns{$column}] = $value; # add to list of column values
}
$values[0] = $code;
write_to_db($code, @values);
}
################################################################################
sub create_dbh {
################################################################################
my $code = shift;
$dbh{$code} = DBI->connect($dsn, $userid, $sesame, {
AutoCommit => 1,
RaiseError => 1,
PrintError => 1
}) or die "Connection failed!\n" . $DBI::errstr;
# did it work? are we there yet?
my $me = $dbh{$code}->{Driver}{Name};
my $sversion = $dbh{$code}->{pg_server_version};
print "DBI is version $DBI::VERSION, "
. "I am $me, "
. "version of DBD::Pg is $DBD::Pg::VERSION, "
. "server is $sversion\n";
print "Name: $dbh->{Name}\n";
# prepare the SELECT statement handle
$sth_select_code{$code} = $dbh{$code}->prepare_cached($sql_select_statement);
# prepare the INSERT statement handle
$sth_insert_code{$code} = $dbh{$code}->prepare_cached($sql_insert_statement);
}
################################################################
sub clone_dbh {
################################################################
my $code = shift;
$dbh{$code} = $dbh->clone();
# prepare the SELECT statement handle
$sth_select_code{$code} = $dbh{$code}->prepare_cached($sql_select_statement);
# prepare the INSERT statement handle
$sth_insert_code{$code} = $dbh{$code}->prepare_cached($sql_insert_statement);
}
################################################################
sub write_to_db {
################################################################
my ($code, @values) = shift @_;
my $rv_code = $sth_select_code{$code}->execute($code);
if($rv_code < 0) {
print $DBI::errstr;
}
my @row = $sth_select_code{$code}->fetchrow_array();
# if the SELECT found no existing records for this strategy, then INSERT it
unless ($row[0] > 0) {
# INSERT settings into 'mytable'
$sth_insert_code{$code}->execute(@values);
}
}
################################################################
sub set_columns {
################################################################
$columns{field1} = 0;
$columns{field2} = 1;
$columns{field3} = 2;
$columns{field4} = 3;
$columns{field5} = 4;
$columns[0] = 'field1';
$columns[1] = 'field2';
$columns[2] = 'field3';
$columns[3] = 'field4';
$columns[4] = 'field5';
}
我是否必须从调用 sub_x
线程中明确地将 dbh
和 statement
句柄传递给 create_dbh
子程序?
我是否必须 return handles
或 handle_refs
从 create_dbh
回到调用 sub_x
线程?
我不知道如何解决这个问题,但这似乎是词法作用域或 object/memory 访问问题。
还有什么想法吗?
我不太确定你想要什么,但也许这些指示会有所帮助:
准备好的语句在创建它们的数据库会话中是本地的。因此,如果您希望在所有数据库会话中使用它们,则必须在每个数据库会话中都进行准备。
您始终可以并行 运行 个语句,只要每个语句 运行 在其自己的数据库会话中。
您还可以在数据库中对单个查询进行并行化,以便多个数据库进程一起处理它。如果 PostgreSQL 被适当配置并且优化器认为查询将受益,这会自动发生。有关详细信息,请参阅 the documentation。
我使用下面的代码让它工作:
use 5.24.0;
use strict;
use warnings;
use Parallel::ForkManager;
use SQL::Abstract;
use DBI ':sql_types';
use DBD::Pg qw/:pg_types/;
#my @codes = ("A"); # testing single thread
my @codes = ("A","B","F","M","S"); # testing multi-thread
my %varset;
################################################################
# get db connection info
################################################################
my $dsn = 'DBI:Pg:dbname=mt4_test';
my $userid = $ENV{DBI_USER};
my $sesame = $ENV{DBI_PASS};
my %dbh; # hash for storing dbh handles
################################################################
# prepare array and hash for matching db columns
################################################################
my %columns; # hash for persistent mapping of column-values
my @columns; # deterministic - $values[$columns{$column}];
set_columns(); # define column<->value mapping for db table
my $placeholders = join(", ", map { '?' } @columns);
################################################################
# Build the SELECT SQL statement
################################################################
my $sql_select_statement =
qq(SELECT count(*) FROM mytable WHERE field1 = ?;);
################################################################
# Build the INSERT SQL statement
################################################################
my $sql_insert_statement =
"INSERT INTO mytable ("
. join(", ", @columns) # column names
. ") VALUES ($placeholders)";
################################################################
# hash for storing SQL statement handles for threads
################################################################
my %sth_select_code;
my %sth_insert_code;
################################################################
# create Parallel::ForkManager object for @codes
################################################################
my $optimization = Parallel::ForkManager->new(scalar @codes);
$optimization->run_on_start(sub{
my ($pid,$ident) = @_;
print "Starting $ident under process id $pid\n";
});
$optimization->run_on_finish(sub{
my ($pid,
$exit_code,
$ident,
$exit_signal,
$core_dump,
$data_structure_reference) = @_;
});
my $thread_count = 0;
OPTIMIZATION:
for my $code (@codes) {
$thread_count++;
print "Thread $thread_count running for $code\n";
# fork optimization threads - per code
if (scalar @codes > 1) {
$optimization->start($code) and next OPTIMIZATION;
} else {
$optimization->start($code);
}
launch_sub($code);
print "$optimization->finish on child $code\n";
$optimization->finish(0);
}
print "$optimization->wait_all_children() is waiting...\n";
$optimization->wait_all_children();
################################################################
# THE END
################################################################
exit;
################################################################
################################################################
sub launch_sub {
################################################################
my $code = shift;
if ($code =~ m/A/i) {
sub_a("A");
} elsif ($code =~ m/B/i) {
sub_b("B");
} elsif ($code =~ m/F/i) {
sub_f("F");
} elsif ($code =~ m/M/i) {
sub_m("M");
} elsif ($code =~ m/S/i) {
sub_s("S");
}
}
################################################################
sub sub_a {
################################################################
my $code = shift; # Code
my @values;
my ($dbh, $sel_sth, $ins_sth) = create_dbh($dbh{$code},$sth_select_code{$code},$sth_insert_code{$code});
# generate values specific to A
$varset{field2} = 'a_f1'; # for illustrative purposes
$varset{field3} = 'a_f2';
$varset{field4} = 'a_f3';
$varset{field5} = 'a_f4';
foreach my $key (keys %varset) {
my $column = $key; # the column name as key
my $value = $varset{$key}; # the column value (field variable)
$values[$columns{$column}] = $value; # add to list of column values
}
$values[0] = $code;
write_to_db($code, @values);
disconnect_dbh($dbh{$code},$sth_select_code{$code},$sth_insert_code{$code});
}
################################################################
sub sub_b {
################################################################
my $code = shift; # Code
my @values;
my ($dbh, $sel_sth, $ins_sth) = create_dbh($dbh{$code},$sth_select_code{$code},$sth_insert_code{$code});
# generate values specific to B
$varset{field2} = 'b_f1'; # for illustrative purposes
$varset{field3} = 'b_f2';
$varset{field4} = 'b_f3';
$varset{field5} = 'b_f4';
foreach my $key (keys %varset) {
my $column = $key; # the column name as key
my $value = $varset{$key}; # the column value (field variable)
$values[$columns{$column}] = $value; # add to list of column values
}
$values[0] = $code;
write_to_db($code, @values);
disconnect_dbh($dbh{$code},$sth_select_code{$code},$sth_insert_code{$code});
}
################################################################
sub sub_f {
################################################################
my $code = shift; # Code
my @values;
my ($dbh, $sel_sth, $ins_sth) = create_dbh($dbh{$code},$sth_select_code{$code},$sth_insert_code{$code});
# generate values specific to F
$varset{field2} = 'f_f1'; # for illustrative purposes
$varset{field3} = 'f_f2';
$varset{field4} = 'f_f3';
$varset{field5} = 'f_f4';
foreach my $key (keys %varset) {
my $column = $key; # the column name as key
my $value = $varset{$key}; # the column value (field variable)
$values[$columns{$column}] = $value; # add to list of column values
}
$values[0] = $code;
write_to_db($code, @values);
disconnect_dbh($dbh{$code},$sth_select_code{$code},$sth_insert_code{$code});
}
################################################################
sub sub_m {
################################################################
my $code = shift; # Code
my @values;
my ($dbh, $sel_sth, $ins_sth) = create_dbh($dbh{$code},$sth_select_code{$code},$sth_insert_code{$code});
# generate values specific to M
$varset{field2} = 'm_f1'; # for illustrative purposes
$varset{field3} = 'm_f2';
$varset{field4} = 'm_f3';
$varset{field5} = 'm_f4';
foreach my $key (keys %varset) {
my $column = $key; # the column name as key
my $value = $varset{$key}; # the column value (field variable)
$values[$columns{$column}] = $value; # add to list of column values
}
$values[0] = $code;
write_to_db($code, @values);
disconnect_dbh($dbh{$code},$sth_select_code{$code},$sth_insert_code{$code});
}
################################################################
sub sub_s {
################################################################
my $code = shift; # Code
my @values;
my ($dbh, $sel_sth, $ins_sth) = create_dbh($dbh{$code},$sth_select_code{$code},$sth_insert_code{$code});
# generate values specific to S
$varset{field2} = 's_f1'; # for illustrative purposes
$varset{field3} = 's_f2';
$varset{field4} = 's_f3';
$varset{field5} = 's_f4';
foreach my $key (keys %varset) {
my $column = $key; # the column name as key
my $value = $varset{$key}; # the column value (field variable)
$values[$columns{$column}] = $value; # add to list of column values
}
$values[0] = $code;
write_to_db($code, @values);
disconnect_dbh($dbh{$code},$sth_select_code{$code},$sth_insert_code{$code});
}
################################################################
sub create_dbh {
################################################################
my $dbh_ref = shift;
my $sel_ref = shift;
my $ins_ref = shift;
${$dbh_ref} = DBI->connect($dsn, $userid, $sesame, {
AutoCommit => 1,
RaiseError => 1,
PrintError => 1
}) or die "Connection failed!\n" . $DBI::errstr;
# did it work? are we there yet?
my $me = ${$dbh_ref}->{Driver}{Name};
my $sversion = ${$dbh_ref}->{pg_server_version};
print "DBI is version $DBI::VERSION, "
. "I am $me, "
. "version of DBD::Pg is $DBD::Pg::VERSION, "
. "server is $sversion\n";
print "Name: ${$dbh_ref}->{Name}\n";
# prepare the SELECT statement handle
${$sel_ref} = ${$dbh_ref}->prepare_cached($sql_select_statement);
# prepare the INSERT statement handle
${$ins_ref} = ${$dbh_ref}->prepare_cached($sql_insert_statement);
}
################################################################
sub write_to_db {
################################################################
my ($code, @values) = @_;
my $rv_code = $sth_select_code{$code}->execute($code);
say "SQL SELECT for $code: rv_code = $rv_code";
if($rv_code < 0) {
print $DBI::errstr;
}
my @row = $sth_select_code{$code}->fetchrow_array();
# if the SELECT found no existing records for this strategy, then INSERT it
unless ($row[0] > 0) {
# INSERT settings into 'mytable'
$sth_insert_code{$code}->execute(@values);
say "SQL INSERT for $code";
}
}
################################################################
sub disconnect_dbh {
################################################################
my $dbh_ref = shift;
my $sel_ref = shift;
my $ins_ref = shift;
${$sel_ref}->finish();
${$ins_ref}->finish();
${$dbh_ref}->disconnect;
say "disconnected dbh_ref: $dbh_ref";
}
################################################################
sub set_columns {
################################################################
$columns{field1} = 0;
$columns{field2} = 1;
$columns{field3} = 2;
$columns{field4} = 3;
$columns{field5} = 4;
$columns[0] = 'field1';
$columns[1] = 'field2';
$columns[2] = 'field3';
$columns[3] = 'field4';
$columns[4] = 'field5';
}