Skip Menu |
 

This queue is for tickets about the POE CPAN distribution.

Report information
The Basics
Id: 38669
Status: rejected
Priority: 0/
Queue: POE

People
Owner: RCAPUTO [...] cpan.org
Requestors: martijn [...] cpan.org
Cc:
AdminCc:

Bug Information
Severity: Wishlist
Broken in: (no value)
Fixed in: (no value)



Subject: Data::Transform support
Download (untitled) / with headers
text/plain 805b
So, I uploaded a first version of Data::Transform to CPAN today, and I thought I'd add a ticket with the patches to POE I would like to see for it. (NOTE: these aren't final, ready-to-apply patches, but more a starting point for discussion) There isn't very much to them. One is a patch for POE::Wheel::ReadWrite to send special packets right back into the driver without a round-trip to the user (mostly needed for the ssl protocol in the upcoming Data::Transform::SSL). Patches like this one are possibly needed for some other Wheels, but I haven't yet looked into that. The other two are patches to PoCo::*::TCP to send an EOF packet through the filter chain (which may or may not end up generating a network packet), so you can cleanly end the connection for protocols that send a 'goodbye' message.
Subject: data-transform.patch
diff -ur ../poe/lib/POE/Component/Client/TCP.pm lib/POE/Component/Client/TCP.pm --- ../poe/lib/POE/Component/Client/TCP.pm 2008-07-15 23:47:51.000000000 +0200 +++ lib/POE/Component/Client/TCP.pm 2008-08-07 20:22:50.000000000 +0200 @@ -274,6 +274,7 @@ if ($heap->{connected}) { $heap->{connected} = 0; if (defined $heap->{server}) { + $heap->{server}->put(Data::Transform::Meta::EOF->new()); if ( $heap->{got_an_error} or not $heap->{server}->get_driver_out_octets() diff -ur ../poe/lib/POE/Component/Server/TCP.pm lib/POE/Component/Server/TCP.pm --- ../poe/lib/POE/Component/Server/TCP.pm 2008-07-15 23:47:51.000000000 +0200 +++ lib/POE/Component/Server/TCP.pm 2008-08-07 20:22:35.000000000 +0200 @@ -261,6 +261,7 @@ my $heap = $_[HEAP]; $heap->{shutdown} = 1; if (defined $heap->{client}) { + $heap->{client}->put(Data::Transform::Meta::EOF->new()); if ( $heap->{got_an_error} or not $heap->{client}->get_driver_out_octets() Only in ../poe/lib/POE/: Component.pm Only in ../poe/lib/POE/: Driver Only in ../poe/lib/POE/: Driver.pm Only in ../poe/lib/POE/Filter: Block.pm Only in ../poe/lib/POE/Filter: Grep.pm Only in ../poe/lib/POE/Filter: HTTPD.pm Only in ../poe/lib/POE/Filter: Line.pm Only in ../poe/lib/POE/Filter: Map.pm Only in ../poe/lib/POE/Filter: RecordBlock.pm Only in ../poe/lib/POE/Filter: Reference.pm Only in ../poe/lib/POE/Filter: Stackable.pm Only in ../poe/lib/POE/Filter: Stream.pm Only in ../poe/lib/POE/: Filter.pm Only in ../poe/lib/POE/: Kernel.pm Only in ../poe/lib/POE/: Loader.pm Only in ../poe/lib/POE/: Loop Only in ../poe/lib/POE/: Loop.pm Only in ../poe/lib/POE/: NFA.pm Only in ../poe/lib/POE/: Pipe Only in ../poe/lib/POE/: Pipe.pm Only in ../poe/lib/POE/: Queue Only in ../poe/lib/POE/: Queue.pm Only in ../poe/lib/POE/: Resource Only in ../poe/lib/POE/: Resource.pm Only in ../poe/lib/POE/: Resources.pm Only in ../poe/lib/POE/: Session.pm Only in ../poe/lib/POE/Wheel: Curses.pm Only in ../poe/lib/POE/Wheel: FollowTail.pm Only in ../poe/lib/POE/Wheel: ListenAccept.pm Only in ../poe/lib/POE/Wheel: ReadLine.pm diff -ur ../poe/lib/POE/Wheel/ReadWrite.pm lib/POE/Wheel/ReadWrite.pm --- ../poe/lib/POE/Wheel/ReadWrite.pm 2008-06-27 11:40:25.000000000 +0200 +++ lib/POE/Wheel/ReadWrite.pm 2008-08-22 21:36:37.000000000 +0200 @@ -8,6 +8,7 @@ $VERSION = do {my($r)=(q$Revision$=~/(\d+)/);sprintf"1.%04d",$r}; use Carp qw( croak carp ); +use Scalar::Util qw(blessed); use POE qw(Wheel Driver::SysRW Filter::Line); # Offsets into $self. @@ -256,6 +257,7 @@ my $driver = $self->[DRIVER_BOTH]; my $input_filter = \$self->[FILTER_INPUT]; my $event_input = \$self->[EVENT_INPUT]; + my $handle_output = $self->[HANDLE_OUTPUT]; my $event_error = \$self->[EVENT_ERROR]; my $unique_id = $self->[UNIQUE_ID]; @@ -281,6 +283,13 @@ my $next_rec = $$input_filter->get_one(); last unless @$next_rec; foreach my $cooked_input (@$next_rec) { + if (blessed ($cooked_input)) { + if ($cooked_input->isa('Data::Transform::Meta::SENDBACK')) { + $driver->put([$cooked_input->data]); + $k->select_resume_write($handle_output); + next; + } + } $k->call($me, $$event_input, $cooked_input, $unique_id); } } @@ -473,6 +482,12 @@ my $next_rec = $self->[FILTER_INPUT]->get_one(); last unless @$next_rec; foreach my $cooked_input (@$next_rec) { + if (blessed ($cooked_input) and + $cooked_input->isa('Data::Transform::Meta::SENDBACK')) { + $self->[DRIVER_BOTH]->put([$cooked_input->{data}]); + $poe_kernel->select_resume_write($self->[HANDLE_OUTPUT]); + next; + } $poe_kernel->call( $poe_kernel->get_active_session(), $self->[EVENT_INPUT], Only in ../poe/lib/POE/Wheel: Run.pm Only in ../poe/lib/POE/Wheel: SocketFactory.pm Only in ../poe/lib/POE/: Wheel.pm
Download (untitled) / with headers
text/plain 302b
Update the patch to Wheel::ReadWrite to filter out Data::Transform::Meta packets on put, either before putting filter output in the driver (for Data::Transform filters) or before putting data into the filter (for POE::Filter filters) now passes wheel_readwrite.t, comp_tcp.t and comp_tcp_concurrent.t.
diff -ur poe/lib/POE/Component/Client/TCP.pm Data-Transform/lib/POE/Component/Client/TCP.pm --- poe/lib/POE/Component/Client/TCP.pm 2008-07-15 23:47:51.000000000 +0200 +++ Data-Transform/lib/POE/Component/Client/TCP.pm 2008-08-26 22:21:09.000000000 +0200 @@ -10,6 +10,8 @@ use Carp qw(carp croak); use Errno qw(ETIMEDOUT ECONNRESET); +use Data::Transform::Meta; + # Explicit use to import the parameter constants; use POE::Session; use POE::Driver::SysRW; @@ -274,6 +276,7 @@ if ($heap->{connected}) { $heap->{connected} = 0; if (defined $heap->{server}) { + $heap->{server}->put(Data::Transform::Meta::EOF->new()); if ( $heap->{got_an_error} or not $heap->{server}->get_driver_out_octets() diff -ur poe/lib/POE/Component/Server/TCP.pm Data-Transform/lib/POE/Component/Server/TCP.pm --- poe/lib/POE/Component/Server/TCP.pm 2008-07-15 23:47:51.000000000 +0200 +++ Data-Transform/lib/POE/Component/Server/TCP.pm 2008-08-26 22:21:37.000000000 +0200 @@ -11,6 +11,8 @@ use Socket qw(INADDR_ANY inet_ntoa inet_aton AF_INET AF_UNIX PF_UNIX); use Errno qw(ECONNABORTED ECONNRESET); +use Data::Transform::Meta; + # Explicit use to import the parameter constants. use POE::Session; use POE::Driver::SysRW; @@ -261,6 +263,7 @@ my $heap = $_[HEAP]; $heap->{shutdown} = 1; if (defined $heap->{client}) { + $heap->{client}->put(Data::Transform::Meta::EOF->new()); if ( $heap->{got_an_error} or not $heap->{client}->get_driver_out_octets() diff -ur poe/lib/POE/Wheel/ReadWrite.pm Data-Transform/lib/POE/Wheel/ReadWrite.pm --- poe/lib/POE/Wheel/ReadWrite.pm 2008-06-27 11:40:25.000000000 +0200 +++ Data-Transform/lib/POE/Wheel/ReadWrite.pm 2008-08-26 22:20:19.000000000 +0200 @@ -8,6 +8,7 @@ $VERSION = do {my($r)=(q$Revision$=~/(\d+)/);sprintf"1.%04d",$r}; use Carp qw( croak carp ); +use Scalar::Util qw(blessed); use POE qw(Wheel Driver::SysRW Filter::Line); # Offsets into $self. @@ -256,6 +257,7 @@ my $driver = $self->[DRIVER_BOTH]; my $input_filter = \$self->[FILTER_INPUT]; my $event_input = \$self->[EVENT_INPUT]; + my $handle_output = $self->[HANDLE_OUTPUT]; my $event_error = \$self->[EVENT_ERROR]; my $unique_id = $self->[UNIQUE_ID]; @@ -281,6 +283,13 @@ my $next_rec = $$input_filter->get_one(); last unless @$next_rec; foreach my $cooked_input (@$next_rec) { + if (blessed ($cooked_input)) { + if ($cooked_input->isa('Data::Transform::Meta::SENDBACK')) { + $driver->put([$cooked_input->data]); + $k->select_resume_write($handle_output); + next; + } + } $k->call($me, $$event_input, $cooked_input, $unique_id); } } @@ -419,11 +428,27 @@ sub put { my ($self, @chunks) = @_; - my $old_buffered_out_octets = $self->[DRIVER_BUFFERED_OUT_OCTETS]; - my $new_buffered_out_octets = - $self->[DRIVER_BUFFERED_OUT_OCTETS] = - $self->[DRIVER_BOTH]->put($self->[FILTER_OUTPUT]->put(\@chunks)); + my $new_buffered_out_octets; + + if ($self->[FILTER_OUTPUT]->can('meta')) { + my @filtered_chunks = grep { + not blessed $_ or not $_->isa('Data::Transform::Meta'); + } @{$self->[FILTER_OUTPUT]->put(\@chunks)}; + $new_buffered_out_octets = + $self->[DRIVER_BUFFERED_OUT_OCTETS] = + $self->[DRIVER_BOTH]->put(\@filtered_chunks); + } else { + $new_buffered_out_octets = + $self->[DRIVER_BUFFERED_OUT_OCTETS] = + $self->[DRIVER_BOTH]->put( + $self->[FILTER_OUTPUT]->put([ + grep { + not (blessed $_ and $_->isa('Data::Transform::Meta::EOF')); + } @chunks + ]) + ); + } if ( $self->[AUTOFLUSH] && @@ -473,6 +498,12 @@ my $next_rec = $self->[FILTER_INPUT]->get_one(); last unless @$next_rec; foreach my $cooked_input (@$next_rec) { + if (blessed ($cooked_input) and + $cooked_input->isa('Data::Transform::Meta::SENDBACK')) { + $self->[DRIVER_BOTH]->put([$cooked_input->{data}]); + $poe_kernel->select_resume_write($self->[HANDLE_OUTPUT]); + next; + } $poe_kernel->call( $poe_kernel->get_active_session(), $self->[EVENT_INPUT],
Download (untitled) / with headers
text/plain 206b
Are there any figures for the performance penalty to Data::Transform non-users? All the can()ning seems like a lot of work, but maybe it's not? Maybe it can be put behind a compile- time switch if it is.
Download (untitled) / with headers
text/plain 592b
On Thu Dec 15 15:24:43 2011, RCAPUTO wrote: Show quoted text
> Are there any figures for the performance penalty to Data::Transform > non-users? All the > can()ning seems like a lot of work, but maybe it's not? Maybe it can > be put behind a compile- > time switch if it is.
martijn@zzz:~$ time for (( i=10; i--; 1 )); do perl test.pl; done with POE::Filter::Line and no Data::Transform loaded: real 0m40.132s user 0m39.814s sys 0m0.284s with POE::Filter::Line and Data::Transform loaded: real 0m41.681s user 0m41.307s sys 0m0.348s with Data::Transform::Line: real 0m43.990s user 0m43.723s sys 0m0.248s
Subject: test.pl
Download test.pl
text/x-perl 581b
use strict; use warnings; use POE; use POE::Wheel::ReadWrite; #use Data::Transform::POE; #use Data::Transform::Line; POE::Session->create( inline_states => { _start => sub { open( my $dict, 'british.med+.mwl' ); # from /usr/share/ispell $_[HEAP]->{wheel} = POE::Wheel::ReadWrite->new( Handle => $dict, #Filter => Data::Transform::Line->new, InputEvent => 'line', ErrorEvent => 'end', ); }, line => sub { $_[HEAP]->{count}++; }, end => sub { #warn delete $_[HEAP]->{count}; delete $_[HEAP]->{wheel}; } }, ); $poe_kernel->run;
Download (untitled) / with headers
text/plain 202b
I've attached a version of your benchmark that loops internally and reports basic processing rates excluding Perl and POE setup and teardown time. It doesn't report CPU times, which might be a problem.
Subject: lotr-bench-wheel-readwrite.pl
use strict; use warnings; use POE; use POE::Wheel::ReadWrite; use Time::HiRes qw(time); use constant ITERATIONS_PER_RUN => 10; POE::Session->create( inline_states => { _start => sub { $_[HEAP]->{tests_to_go} = ITERATIONS_PER_RUN; restart_test($_[HEAP]); }, line => sub { $_[HEAP]->{count}++; }, end => sub { my $elapsed = time() - $_[HEAP]{start}; my $count = $_[HEAP]{count}; printf "%d / %.3f = %.3f lines/sec\n", $count, $elapsed, $count / $elapsed; delete $_[HEAP]->{wheel}; restart_test($_[HEAP]) if --$_[HEAP]{tests_to_go} > 0; }, }, ); POE::Kernel->run(); exit; sub restart_test { my $heap = shift(); open(my $dict, '<', '/usr/share/dict/words') or die $!; $heap->{count} = 0; $heap->{start} = time(); $heap->{wheel} = POE::Wheel::ReadWrite->new( Handle => $dict, InputEvent => 'line', ErrorEvent => 'end', ); }
Subject: benchmark results
Download (untitled) / with headers
text/plain 1.3k
martijn@meh:~/devel/Perl/Data-Transform$ perl bench.pl 99156 / 8.988 = 11031.461 lines/sec 99156 / 9.110 = 10884.420 lines/sec 99156 / 9.042 = 10966.727 lines/sec 99156 / 9.029 = 10982.229 lines/sec 99156 / 8.950 = 11078.893 lines/sec 99156 / 8.953 = 11075.001 lines/sec 99156 / 8.968 = 11056.675 lines/sec 99156 / 8.930 = 11103.259 lines/sec 99156 / 8.903 = 11137.796 lines/sec 99156 / 8.924 = 11110.619 lines/sec martijn@meh:~/devel/Perl/Data-Transform$ perl bench.pl # with Data::Transform loaded 99156 / 9.287 = 10677.037 lines/sec 99156 / 9.155 = 10830.577 lines/sec 99156 / 9.259 = 10709.368 lines/sec 99156 / 9.165 = 10819.160 lines/sec 99156 / 9.156 = 10829.699 lines/sec 99156 / 9.136 = 10853.406 lines/sec 99156 / 9.150 = 10837.063 lines/sec 99156 / 9.135 = 10854.154 lines/sec 99156 / 9.142 = 10846.662 lines/sec 99156 / 9.227 = 10746.417 lines/sec martijn@meh:~/devel/Perl/Data-Transform$ perl -I lib bench.pl # with Data::Transform::Line used 99156 / 9.541 = 10393.082 lines/sec 99156 / 9.398 = 10550.579 lines/sec 99156 / 9.392 = 10557.869 lines/sec 99156 / 9.453 = 10488.941 lines/sec 99156 / 9.410 = 10536.966 lines/sec 99156 / 9.530 = 10404.112 lines/sec 99156 / 9.626 = 10300.745 lines/sec 99156 / 9.442 = 10501.358 lines/sec 99156 / 9.447 = 10495.481 lines/sec 99156 / 9.435 = 10509.591 lines/sec martijn@meh:~/devel/Perl/Data-Transform$
Download (untitled) / with headers
text/plain 347b
On Tue Jan 03 14:11:14 2012, MARTIJN wrote: [benchmarks] The benchmarks average 2.2% slower with Data::Transform loaded (but not used). The average slowdown using Data::Transform is 5.2%. Please commit your changes to POE::Wheel::ReadWrite. I may end up placing them behind a USE_DATA_TRANSFORM constant if someone can't tolerate the overhead.
Requested changes to be committed two months ago. Marking as stalled.
Requested the submitter apply their patch 20 months ago.


This service is sponsored and maintained by Best Practical Solutions and runs on Perl.org infrastructure.

Please report any issues with rt.cpan.org to rt-cpan-admin@bestpractical.com.