Skip Menu |
 

This queue is for tickets about the TheSchwartz CPAN distribution.

Report information
The Basics
Id: 65712
Status: resolved
Priority: 0/
Queue: TheSchwartz

People
Owner: Jeff.Fearn [...] gmail.com
Requestors: felix.ostmann [...] thewar.de
Cc:
AdminCc:

Bug Information
Severity: Critical
Broken in: 1.10
Fixed in: 1.12



Subject: replace_job can make a database dead for ever
Download (untitled) / with headers
text/plain 754b
When a replace_job fail with a duplicate error the database will not come back alive, because no commit/rollback will be executed and every next query only get a error. The patch fix some other things to: Set ->did_something as last as possible (or it will be that and then a txn will fail) Change the schema for pg: - job.jobid from SERIAL to BIGSERIAL (or the other jobid-foreign-columns from BIGINT to INT) - error.message to TEXT, mysql will silently truncate if > 255 bytes will be insertet, pg die if the message is greater. Call $driver->rollback if $errstr is set in ->work_safely Added query_sql to t/lib/db-common.pl Changes t/priority.t & t/unique.t for special pg behavior Added t/replace_abort.t for special duplicate entry case in pg
Subject: theschwartz-1.11.patch
diff -Nru TheSchwartz-1.10/doc/schema-postgres.sql TheSchwartz-1.10.patched/doc/schema-postgres.sql --- TheSchwartz-1.10/doc/schema-postgres.sql 2010-03-15 22:00:55.000000000 +0100 +++ TheSchwartz-1.10.patched/doc/schema-postgres.sql 2011-02-14 12:40:08.000000000 +0100 @@ -23,7 +23,7 @@ ); CREATE TABLE job ( - jobid SERIAL, + jobid BIGSERIAL, funcid INT NOT NULL, arg BYTEA, uniqkey VARCHAR(255) NULL, @@ -50,7 +50,7 @@ CREATE TABLE error ( error_time INTEGER NOT NULL, jobid BIGINT NOT NULL, - message VARCHAR(255) NOT NULL, + message TEXT NOT NULL, funcid INT NOT NULL DEFAULT 0 ); diff -Nru TheSchwartz-1.10/lib/TheSchwartz/Job.pm TheSchwartz-1.10.patched/lib/TheSchwartz/Job.pm --- TheSchwartz-1.10/lib/TheSchwartz/Job.pm 2010-03-15 22:00:55.000000000 +0100 +++ TheSchwartz-1.10.patched/lib/TheSchwartz/Job.pm 2011-02-14 11:12:53.000000000 +0100 @@ -190,9 +190,9 @@ $job->debug("can't call 'completed' on already finished job"); return 0; } - $job->did_something(1); $job->set_exit_status(0); $job->driver->remove($job); + $job->did_something(1); } sub permanent_failure { @@ -237,7 +237,6 @@ sub _failed { my ($job, $msg, $exit_status, $_retry, $failures) = @_; - $job->did_something(1); $job->debug("job failed: " . ($msg || "<no message>")); ## Mark the failure in the error table. @@ -254,6 +253,7 @@ $job->set_exit_status($exit_status || 1); $job->driver->remove($job); } + $job->did_something(1); } sub replace_with { diff -Nru TheSchwartz-1.10/lib/TheSchwartz/Worker.pm TheSchwartz-1.10.patched/lib/TheSchwartz/Worker.pm --- TheSchwartz-1.10/lib/TheSchwartz/Worker.pm 2010-03-15 22:00:55.000000000 +0100 +++ TheSchwartz-1.10.patched/lib/TheSchwartz/Worker.pm 2011-02-14 11:12:50.000000000 +0100 @@ -33,6 +33,10 @@ my $cjob = $client->current_job; if ($errstr) { + # something went wrong, better make a rollback! + my $driver = $cjob->driver; + $driver->rollback; + $job->debug("Eval failure: $errstr"); $cjob->failed($@); } diff -Nru TheSchwartz-1.10/t/lib/db-common.pl TheSchwartz-1.10.patched/t/lib/db-common.pl --- TheSchwartz-1.10/t/lib/db-common.pl 2010-03-15 22:11:59.000000000 +0100 +++ TheSchwartz-1.10.patched/t/lib/db-common.pl 2011-02-14 13:44:55.000000000 +0100 @@ -221,4 +221,16 @@ split /;\s*/, $sql; } +sub query_sql { + my ($dbh, $sql) = @_; + my ($query, $bind) = ref($sql) ? @$sql : ($sql, []); + my $sth = $dbh->prepare($sql); + my $i = 0; + $sth->bind_param(++$i, $_) for @$bind; + $sth->execute; + $sth->bind_columns(\my $result); + $sth->fetch; + return $result; +} + 1; diff -Nru TheSchwartz-1.10/t/priority.t TheSchwartz-1.10.patched/t/priority.t --- TheSchwartz-1.10/t/priority.t 2010-03-15 22:00:55.000000000 +0100 +++ TheSchwartz-1.10.patched/t/priority.t 2011-02-14 12:56:37.000000000 +0100 @@ -20,10 +20,11 @@ $TheSchwartz::FIND_JOB_BATCH_SIZE = 1; for (1..10) { + # Postgres uses ORDER BY priority NULLS FIRST when DESC is used my $job = TheSchwartz::Job->new( funcname => 'Worker::PriorityTest', arg => { num => $_ }, - ( $_ == 1 ? () : ( priority => $_ ) ), + ( !$ENV{USE_PGSQL} && $_ == 1 ? () : ( priority => $_ ) ), ); my $h = $client->insert($job); ok($h, "inserted job (priority $_)"); @@ -35,7 +36,8 @@ Worker::PriorityTest->set_client($client); for (1..10) { - $record_expected = 11 - $_ == 1 ? undef : 11 - $_; + # Postgres uses ORDER BY priority NULLS FIRST when DESC is used + $record_expected = !$ENV{USE_PGSQL} && 11 - $_ == 1 ? undef : 11 - $_; my $rv = eval { $client->work_once; }; ok($rv, "did stuff"); } diff -Nru TheSchwartz-1.10/t/replace-abort.t TheSchwartz-1.10.patched/t/replace-abort.t --- TheSchwartz-1.10/t/replace-abort.t 1970-01-01 01:00:00.000000000 +0100 +++ TheSchwartz-1.10.patched/t/replace-abort.t 2011-02-14 12:36:58.000000000 +0100 @@ -0,0 +1,168 @@ +# -*-perl-*- + +use strict; +use warnings; + +require 't/lib/db-common.pl'; + +use TheSchwartz; +use Test::More tests => 13; + +run_tests_pgsql(13, sub { + my $client1 = test_client(dbs => ['ts1']); + my $client2 = test_client(dbs => ['ts1']); + + my $driver = $client1->driver_for( ($client1->shuffled_databases)[0] ); + my $dbh = $driver->rw_handle; + + is( + query_sql($dbh, "SELECT COUNT(*) FROM job WHERE uniqkey IN ('1','2','3','4','5');"), + 0, + 'namespace empty', + ); + + + $client1->can_do('Test::Job::Completed'); + $client2->can_do('Test::Job::Replace'); + +# job 1 + $client1->insert(TheSchwartz::Job->new( + funcname => 'Test::Job::Completed', + uniqkey => 1, + )); + + is( + query_sql($dbh, "SELECT COUNT(*) FROM job WHERE uniqkey = '1';"), + 1, + 'Job 1 gepostet', + ); + + +# Job 1 + $client1->work_once; + + is( + query_sql($dbh, "SELECT COUNT(*) FROM job WHERE uniqkey = '1';"), + 0, + 'Job 1 abgearbeitet', + ); + +# Job 2 + $client2->insert(TheSchwartz::Job->new( + funcname => 'Test::Job::Replace', + uniqkey => 2, + arg => 3, + )); + + is( + query_sql($dbh, "SELECT COUNT(*) FROM job WHERE uniqkey = '2';"), + 1, + 'Job 2 gepostet', + ); + +# Job 2 + $client2->work_once; + + is( + query_sql($dbh, "SELECT COUNT(*) FROM job WHERE uniqkey = '2';"), + 0, + 'Job 2 abgearbeitet', + ); + is( + query_sql($dbh, "SELECT COUNT(*) FROM job WHERE uniqkey = '3';"), + 1, + 'Job 2 ersetzt durch Job 3', + ); + +# Job 4 + $client2->insert(TheSchwartz::Job->new( + funcname => 'Test::Job::Replace', + uniqkey => 4, + arg => 3, + )); + + is( + query_sql($dbh, "SELECT COUNT(*) FROM job WHERE uniqkey = '4';"), + 1, + 'Job 4 gepostet', + ); + +# Job 4 + $client2->work_once; + + is( + query_sql($dbh, "SELECT COUNT(*) FROM job WHERE uniqkey = '4';"), + 1, + 'Job 4 abgebrochen', + ); + is( + query_sql($dbh, "SELECT COUNT(*) FROM job WHERE uniqkey = '3';"), + 1, + 'Job 4 nicht durch Job 3 ersetzt', + ); + +# Job 3 + $client1->work_once; + + is( + query_sql($dbh, "SELECT COUNT(*) FROM job WHERE uniqkey = '3';"), + 0, + 'Job 3 abgearbeitet', + ); + +# cleanup job.run_after & retry_at, so we dont have to wait + $dbh->do("UPDATE job SET run_after = 0 WHERE uniqkey = '4';"); + $client2->{retry_at} = {}; + +# Job 4 + $client2->work_once; + + + is( + query_sql($dbh, "SELECT COUNT(*) FROM job WHERE uniqkey = '4';"), + 0, + 'Job 4 abgearbeitet', + ); + is( + query_sql($dbh, "SELECT COUNT(*) FROM job WHERE uniqkey = '3';"), + 1, + 'Job 4 ersetzt durch Job 3', + ); + +# Job 5 + $client1->work_once; + + is( + query_sql($dbh, "SELECT COUNT(*) FROM job WHERE uniqkey = '3';"), + 0, + 'Job 3 erneut abgearbeitet', + ); +}); + + + + +# TheSchwartz Worker/Jobs +package Test::Job::Completed; + +use base qw(TheSchwartz::Worker); + +sub work { + my ($client, $job) = @_; + $job->completed; +} +sub max_retries { 10; } + +package Test::Job::Replace; + +use base qw(TheSchwartz::Worker); + +sub work { + my ($client, $job) = @_; + $job->replace_with(TheSchwartz::Job->new( + funcname => 'Test::Job::Completed', + uniqkey => $job->arg, + )); +} +sub max_retries { 10; } + diff -Nru TheSchwartz-1.10/t/unique.t TheSchwartz-1.10.patched/t/unique.t --- TheSchwartz-1.10/t/unique.t 2010-03-15 22:00:55.000000000 +0100 +++ TheSchwartz-1.10.patched/t/unique.t 2011-02-14 13:01:13.000000000 +0100 @@ -34,6 +34,9 @@ $handle = $client->insert($job); ok(! $handle, 'no handle'); + # pg failes and marks the database as dead + $client->{retry_at} = {}; + # insert same uniqkey, but different func $job = TheSchwartz::Job->new( funcname => 'scratch',
From: felix.ostmann [...] thewar.de
Download (untitled) / with headers
text/plain 408b
one small question: also in work_safely there is the following code: if ($errstr) { $job->debug("Eval failure: $errstr"); $cjob->failed($@); } Why not $cjob->failed($errstr)? I got the following output: Eval failure: Cmd 'sudo /usr/bin/rsync -z --bwlimit=1024 ... failed: 255' job failed. considering retry. is max_retries of 1000000 >= failures of 1? job failed: <no message>
Download (untitled) / with headers
text/plain 248b
Hi, Would you mind, making one or more pull requests on github? https://github.com/sixapart/TheSchwartz/ What would be helpful is to get one commit per individual change, so that we can investigate each logical change separately. Thanks, Yann
From: felix.ostmann [...] thewar.de
Download (untitled) / with headers
text/plain 332b
ofc i can :) Am Di 15. Feb 2011, 15:41:34, YANNK schrieb: Show quoted text
> Hi, > > Would you mind, making one or more pull requests on github? > https://github.com/sixapart/TheSchwartz/ > > What would be helpful is to get one commit per individual change, so > that we can investigate > each logical change separately. > > Thanks, > > Yann
Download (untitled) / with headers
text/plain 156b
Hi Felix, can you do separate pull requests for these changes on my github? git@github.com:jfearn/TheSchwartz.git Sorry the repeat request. Cheers, Jeff.
This fix was shipped in 1.12


This service is sponsored and maintained by Best Practical Solutions and runs on Perl.org infrastructure.

Please report any issues with rt.cpan.org to rt-cpan-admin@bestpractical.com.