Skip Menu |
 

This queue is for tickets about the DBIx-Class CPAN distribution.

Report information
The Basics
Id: 56446
Status: rejected
Priority: 0/
Queue: DBIx-Class

People
Owner: Nobody in particular
Requestors: DAKKAR [...] cpan.org
Cc:
AdminCc:

Bug Information
Severity: Normal
Broken in: 0.08120
Fixed in: (no value)

Attachments


Subject: work-around for DBD::Pg not using cursors
Download (untitled) / with headers
text/plain 537b
As you probably already know, DBD::Pg does not use PostgreSQL's cursors to fetch data. Instead, it loads all the rows into RAM, then iterates over them. DBD::Pg's documentation shows how to explicitly use cursors. The attached patch implements that usage inside DBIx::Class::Storage::DBI::Pg. NOTE: it *breaks* threaded tests. I have a suspicion that such tests ran before only because the shared statement handle was never actually used from more than one test (after the first ->next, no more communication with the db took place).
Subject: DBIx-Class-Pg-cursors.patch
diff --git a/lib/DBIx/Class/Storage/DBI/Cursor/Pg.pm b/lib/DBIx/Class/Storage/DBI/Cursor/Pg.pm new file mode 100644 index 0000000..96acd77 --- /dev/null +++ b/lib/DBIx/Class/Storage/DBI/Cursor/Pg.pm @@ -0,0 +1,89 @@ +package DBIx::Class::Storage::DBI::Cursor::Pg; + +use strict; +use warnings; + +use base qw/DBIx::Class::Storage::DBI::Cursor/; + +__PACKAGE__->mk_group_accessors('simple' => + qw/sub_sth/ +); + +sub _check_cursor_end { + my ($self,$storage)=@_; + if ($self->sub_sth->rows == 0) { + $self->sub_sth(undef); + $self->{done} = 1; + $storage->_delete_cursor($self->sth); + $self->sth(undef); + } +} + +sub _run_sub_sth { + my ($self,$storage)=@_; + + $self->sub_sth($storage->sth("fetch 1000 from ".$storage->_get_cursor_from_map($self->sth))); + $self->sub_sth->execute; +} + +sub _dbh_next { + my ($storage, $dbh, $self) = @_; + + $self->_check_dbh_gen; + + return if $self->{done}; + unless ($self->sth) { + $self->sth(($storage->_select(@{$self->{args}}))[1]); + } + + unless ($self->sub_sth) { + $self->_run_sub_sth($storage); + } + $self->_check_cursor_end($storage); + return if $self->{done}; + + my @row = $self->sub_sth->fetchrow_array; + if (@row) { + $self->{pos}++; + } else { + $self->_run_sub_sth($storage); + $self->_check_cursor_end($storage); + return if $self->{done}; + @row = $self->sub_sth->fetchrow_array; + } + return @row; +} + +sub _dbh_all { + my ($storage, $dbh, $self) = @_; + + $self->_check_dbh_gen; + if ($self->sth && $self->sth->{Active}) { + if ($self->sub_sth) { + $self->sub_sth(undef); + $storage->_delete_cursor($self->sth); + } + $self->sth->finish; + $self->sth(undef); + } + $self->sth(($storage->_select(@{$self->{args}}))[1]); + + my @rows; + $self->{done}=0; + while (!$self->{done}) { + $self->_run_sub_sth($storage); + push @rows, @{$self->sub_sth->fetchall_arrayref}; + $self->_check_cursor_end($storage); + } + + return @rows; +} + +sub DESTROY { + my ($self) = @_; + + $self->{storage}->_delete_cursor($self->sth) if $self->sub_sth; + $self->SUPER::DESTROY; +} + +1; diff --git a/lib/DBIx/Class/Storage/DBI/Pg.pm b/lib/DBIx/Class/Storage/DBI/Pg.pm index 4428b1f..bd560b7 100644 --- a/lib/DBIx/Class/Storage/DBI/Pg.pm +++ b/lib/DBIx/Class/Storage/DBI/Pg.pm @@ -8,14 +8,20 @@ use base qw/ /; use mro 'c3'; +use Carp::Clan qw/^DBIx::Class/; use DBD::Pg qw(:pg_types); use Scope::Guard (); use Context::Preserve (); +use Scalar::Util 'refaddr'; + +require DBIx::Class::Storage::DBI::Cursor::Pg; # Ask for a DBD::Pg with array support warn __PACKAGE__.": DBD::Pg 2.9.2 or greater is strongly recommended\n" if ($DBD::Pg::VERSION < 2.009002); # pg uses (used?) version::qv() +__PACKAGE__->mk_group_accessors('simple' => '_cursors_map'); + sub _supports_insert_returning { my $self = shift; @@ -181,6 +187,67 @@ sub bind_attribute_by_data_type { } } +__PACKAGE__->cursor_class('DBIx::Class::Storage::DBI::Cursor::Pg'); + +sub _cursor_name { + my ($self)=@_; + $self->_cursors_map({}) unless $self->_cursors_map; + my $count=scalar keys %{$self->_cursors_map}; + return 'csr_'.refaddr($self)."_$count"; +} + +sub _add_cursor_to_map { + my ($self,$sth,$id)=@_; + $self->_cursors_map({}) unless $self->_cursors_map; + $self->_cursors_map->{refaddr($sth)}=$id; +} + +sub _get_cursor_from_map { + my ($self,$sth)=@_; + return $self->_cursors_map->{refaddr($sth)}; +} + +sub _remove_cursor_from_map { + my ($self,$sth)=@_; + delete $self->_cursors_map->{refaddr($sth)}; +} + +sub _delete_cursor { + my ($self,$sth) = @_; + my $csr_id=$self->_get_cursor_from_map($sth); + my $csr_sql="CLOSE $csr_id"; + $self->_get_dbh->do($csr_sql); + $self->_remove_cursor_from_map($sth); +} + +sub _dbh_sth { + my ($self, $dbh, $sql) = @_; + + my ($save_cursor,$csr_id); + if ($sql =~ /^SELECT\b/i) { + $csr_id=$self->_cursor_name; + my $hold= ($sql =~ /\bFOR\s+UPDATE\s*\z/i) ? '' : 'WITH HOLD'; + $sql="DECLARE $csr_id CURSOR $hold FOR $sql"; + $save_cursor=1; + } + my $sth=$self->SUPER::_dbh_sth($dbh,$sql); + if ($save_cursor) { + $self->_add_cursor_to_map($sth,$csr_id); + } + return $sth; +} + +sub select_single { + my $self = shift; + my $csr=$self->select(@_); + my @row=$csr->next; + my @nextrow = $csr->next if @row; + if(@row && @nextrow) { + carp "Query returned more than one row. SQL that returns multiple rows is DEPRECATED for ->find and ->single"; + } + return @row; +} + sub _svp_begin { my ($self, $name) = @_; diff --git a/t/72pg_cursors.t b/t/72pg_cursors.t new file mode 100644 index 0000000..00fe987 --- /dev/null +++ b/t/72pg_cursors.t @@ -0,0 +1,66 @@ +use strict; +use warnings; + +use Test::More; +use lib qw(t/lib); +use DBICTest; +use Time::HiRes qw(gettimeofday tv_interval); + +my ($dsn, $dbuser, $dbpass) = @ENV{map { "DBICTEST_PG_${_}" } qw/DSN USER PASS/}; + +plan skip_all => 'Set $ENV{DBICTEST_PG_DSN}, _USER and _PASS to run this test' + unless ($dsn && $dbuser); + +plan tests => 2; + +my $schema = DBICTest::Schema->connection($dsn, $dbuser, $dbpass, { AutoCommit => 1 }); + +my $dbh = $schema->storage->dbh; + +{ + local $SIG{__WARN__} = sub {}; + $dbh->do('DROP TABLE IF EXISTS artist'); + $dbh->do(q[ + CREATE TABLE artist + ( + artistid serial NOT NULL PRIMARY KEY, + name varchar(100), + rank integer, + charfield char(10) + ); + ],{ RaiseError => 1, PrintError => 1 }); +} + +# copied from 100populate.t + +my $start_id = 'populateXaaaaaa'; +my $rows=2005; +my $offset = 3; + +$schema->populate('Artist', [ [ qw/artistid name/ ], map { [ ($_ + $offset) => $start_id++ ] } ( 1 .. $rows ) ] ); +is ( + $schema->resultset ('Artist')->search ({ name => { -like => 'populateX%' } })->count, + $rows, + 'populate created correct number of rows with massive AoA bulk insert', +); + +{ + my $rs=$schema->resultset('Artist')->search({}); + my $count=0; + $count++ while $rs->next; + is($count,$rows,'get all the rows'); +} + +{ + my $rs=$schema->resultset('Artist')->search({}); + my $t0=[gettimeofday]; + $rs->first; + diag('Time for first: '.tv_interval($t0)); +} + +{ + my $rs=$schema->resultset('Artist')->search({}); + my $t0=[gettimeofday]; + $rs->all; + diag('Time for all: '.tv_interval($t0)); +}
Download (untitled) / with headers
text/plain 214b
Rejected current implementation as discussied on IRC. Submitter received commit bit and a new shiny branch for a new take on the problem: http://dev.catalyst.perl.org/repos/bast/DBIx-Class/0.08/branches/pg_cursors/


This service is sponsored and maintained by Best Practical Solutions and runs on Perl.org infrastructure.

Please report any issues with rt.cpan.org to rt-cpan-admin@bestpractical.com.