Skip Menu |
 

This queue is for tickets about the IO-Async CPAN distribution.

Report information
The Basics
Id: 129918
Status: patched
Priority: 0/
Queue: IO-Async

People
Owner: Nobody in particular
Requestors: futuramedium [...] yandex.ru
Cc:
AdminCc:

Bug Information
Severity: Wishlist
Broken in: (no value)
Fixed in: (no value)



Subject: Enhancement suggestion: simple priority for IO::Async::Function calls
Very simple, 2-level priority control, please see patch file.
Subject: priority.patch
Download priority.patch
text/x-diff 2k
--- Function.pm.orig Wed Jun 12 18:57:20 2019 +++ Function.pm Wed Jun 26 13:19:58 2019 @@ -55,10 +55,11 @@ The object represents the function code itself, rather than one specific invocation of it. It can be called multiple times, by the C<call> method. Multiple outstanding invocations can be called; they will be dispatched in -the order they were queued. If only one worker process is used then results -will be returned in the order they were called. If multiple are used, then -each request will be sent in the order called, but timing differences between -each worker may mean results are returned in a different order. +the order they were queued; C<urgent> parameter, if supplied, determines call +placement at start or end of queue. If only one worker process is used then +results will be returned in the order they were queued. If multiple are used, +then each request will be sent in the order queued, but timing differences +between each worker may mean results are returned in a different order. Since the code block will be called multiple times within the same child process, it must take care not to modify any of its state that might affect @@ -329,6 +330,10 @@ A reference to the array of arguments to pass to the code. +=item urgent => BOOL + +Optional. If true, call is placed at start of queue. + =back If the function body returns normally the list of results are provided as the @@ -403,6 +408,8 @@ my $args = delete $params{args}; ref $args eq "ARRAY" or croak "Expected 'args' to be an array"; + + my $urgent = delete $params{urgent}; my ( $on_done, $on_fail ); if( defined $params{on_result} ) { @@ -440,7 +447,9 @@ } else { $self->debug_printf( "QUEUE" ); - push @{ $self->{pending_queue} }, my $wait_f = $self->loop->new_future; + my $wait_f = $self->loop->new_future; + $urgent ? unshift @{ $self->{pending_queue} }, $wait_f + : push @{ $self->{pending_queue} }, $wait_f; $future = $wait_f->then( sub { my ( $self, $worker ) = @_;
Download (untitled) / with headers
text/plain 988b
On Wed Jun 26 06:47:56 2019, vadimr wrote: Show quoted text
> Very simple, 2-level priority control, please see patch file.
Mm, an exciting idea. One thought though, is that this implementation `unshift`s the urgent items, meaning that if you make a few urgent calls in sequence, they'll be invoked in the reverse order to how you submitted them. Perhaps it would be better to store the urgent flag in the queue as well, and insert new urgent calls just before the first non-urgent one, so a sequence of a few urgent ones will go in the right order. At which point it's not much of a stretch of imagination to extend this to more than two levels - just have a "priority" field that's compared numerically; defaults to zero. Any new calls get inserted just before the first call of numerically-lower value. I keep meaning to move such logic into a more abstract "Future::Queue" and have IaFunction just use that actually. See also https://rt.cpan.org/Ticket/Display.html?id=124774 -- Paul Evans
Download (untitled) / with headers
text/plain 748b
I was looking at https://metacpan.org/pod/List::PriorityQueue to "steal" implementation, but overhead of added code can be unwelcome by those who don't need the feature, and hardly ever more than 3 levels of priority are required. Then I thought about 3 different queues, push into appropriate array according to supplied "priority" parameter 0,1,2 or default, and dequeue simply as my $next = shift @{ $self->{pending_queue_0} } or shift @{ $self->{pending_queue_1} } or shift @{ $self->{pending_queue_2} } or similar, simple and minimal overhead. Then thought it too clunky and suggested patch as above. But with issue you pointed at (urgent calls getting "stale"), perhaps 2 or 3 arrays would still be better.
Download (untitled) / with headers
text/plain 303b
You wouldn't need anything too fancy. An application of `List::Util::first` would find the insert point: my $queue = $self->{queue}; my $idx = first { $queue->[$_]->{prio} < $prio } 0 .. $#$queue; splice @$queue, $idx // scalar @$queue, 0, ( $newitem ); This is a sorted insert -- Paul Evans
Download (untitled) / with headers
text/plain 347b
Actually, having looked again at the code it doesn't seem very easy to turn it into a Future::Queue, even if such module did exist, but just adding the priority logic didn't seem too hard. Patch attached. You can implement your existing `urgent` logic by just providing some positive value for `priority` - even just 1 would do. -- Paul Evans
Subject: rt129918.patch
Download rt129918.patch
text/x-diff 3.5k
=== modified file 'Build.PL' --- Build.PL 2018-04-02 18:45:51 +0000 +++ Build.PL 2019-06-27 17:38:03 +0000 @@ -11,6 +11,7 @@ 'Exporter' => '5.57', 'File::stat' => 0, 'IO::Poll' => 0, + 'List::Util' => 0, 'Socket' => '2.007', 'Storable' => 0, 'Struct::Dumb' => 0, === modified file 'lib/IO/Async/Function.pm' --- lib/IO/Async/Function.pm 2019-06-12 15:52:23 +0000 +++ lib/IO/Async/Function.pm 2019-06-27 17:38:03 +0000 @@ -1,7 +1,7 @@ # You may distribute under the terms of either the GNU General Public License # or the Artistic License (the same terms as Perl itself) # -# (C) Paul Evans, 2011-2016 -- leonerd@leonerd.org.uk +# (C) Paul Evans, 2011-2019 -- leonerd@leonerd.org.uk package IO::Async::Function; @@ -15,6 +15,12 @@ use Carp; +use List::Util qw( first ); + +use Struct::Dumb qw( readonly_struct ); + +readonly_struct Pending => [qw( priority f )]; + =head1 NAME C<IO::Async::Function> - call a function asynchronously @@ -329,6 +335,15 @@ A reference to the array of arguments to pass to the code. +=item priority => NUM + +Optional. Defines the sorting order when no workers are available and calls +must be queued for later. A default of zero will apply if not provided. + +Higher values cause the call to be considered more important, and will be +placed earlier in the queue than calls with a smaller value. Calls of equal +priority are still handled in FIFO order. + =back If the function body returns normally the list of results are provided as the @@ -440,7 +455,20 @@ } else { $self->debug_printf( "QUEUE" ); - push @{ $self->{pending_queue} }, my $wait_f = $self->loop->new_future; + my $queue = $self->{pending_queue}; + + my $next = Pending( + my $priority = $params{priority} || 0, + my $wait_f = $self->loop->new_future, + ); + + if( $priority ) { + my $idx = first { $queue->[$_]->priority < $priority } 0 .. $#$queue; + splice @$queue, $idx // $#$queue+1, 0, ( $next ); + } + else { + push @$queue, $next; + } $future = $wait_f->then( sub { my ( $self, $worker ) = @_; @@ -576,10 +604,12 @@ while( my $next = shift @{ $self->{pending_queue} } ) { my $worker = $self->_get_worker or return; - next if $next->is_cancelled; + my $f = $next->f; + + next if $f->is_cancelled; $self->debug_printf( "UNQUEUE" ); - $next->done( $self, $worker ); + $f->done( $self, $worker ); return; } === modified file 't/42function.t' --- t/42function.t 2015-07-31 19:02:02 +0000 +++ t/42function.t 2019-06-27 17:38:03 +0000 @@ -131,6 +131,34 @@ $loop->remove( $function ); } +# Queue priority +{ + my $serial = 0; + my $function = IO::Async::Function->new( + # Keep exactly 1 process so captured lexical works for testing + min_workers => 1, + max_workers => 1, + code => sub { return $serial++ }, + ); + + $loop->add( $function ); + + # Push something just to make the function busy first + $function->call( args => [], on_return => sub {}, on_error => sub {} ); + + my $f = Future->needs_all( + $function->call( args => [] ), # no priority + $function->call( args => [], priority => 1 ), + $function->call( args => [], priority => 1 ), + $function->call( args => [], priority => 2 ), + ); + + is_deeply( [ ( wait_for_future $f )->get ], + [ 4, 2, 3, 1 ], '$function->call with priority enqueues correctly' ); + + $loop->remove( $function ); +} + # References { my $function = IO::Async::Function->new(


This service is sponsored and maintained by Best Practical Solutions and runs on Perl.org infrastructure.

Please report any issues with rt.cpan.org to rt-cpan-admin@bestpractical.com.