package MangoX::Queue; use Mojo::Base 'Mojo::EventEmitter'; use Carp 'croak'; use Mojo::Log; use Mango::BSON ':bson'; use MangoX::Queue::Delay; use MangoX::Queue::Job; use DateTime::Tiny; our $VERSION = '0.11'; # A logger has 'log' => sub { Mojo::Log->new->level('error') }; # The Mango::Collection representing the queue has 'collection'; has 'capped'; has 'stats'; # A MangoX::Queue::Delay has 'delay' => sub { MangoX::Queue::Delay->new }; # How long to wait before assuming a job has failed has 'timeout' => sub { $ENV{MANGOX_QUEUE_JOB_TIMEOUT} // 60 }; # How many times to retry a job before giving up has 'retries' => sub { $ENV{MANGOX_QUEUE_JOB_RETRIES} // 5 }; # Current number of jobs that have been consumed but not yet completed has 'job_count' => 0; # Maximum number of jobs allowed to be in a consumed state at any one time has 'concurrent_job_limit' => 10; # Store Mojo::IOLoop->timer IDs has 'consumers' => sub { {} }; # Store plugins has 'plugins' => sub { {} }; sub new { my $self = shift->SUPER::new(@_); croak qq{No Mango::Collection provided to constructor} unless ref($self->collection) eq 'Mango::Collection'; $self->stats($self->collection->stats); $self->capped($self->stats->{capped}); $self->{pending_status} = $self->capped ? 1 : 'Pending'; $self->{processing_status} = $self->capped ? 2 : 'Processing'; $self->{failed_status} = $self->capped ? 3 : 'Failed'; return $self; } sub plugin { my ($self, $name, $options) = @_; croak qq{Plugin $name already loaded} if exists $self->plugins->{$name}; { no strict 'refs'; unless($name->can('new')) { eval "require $name" or croak qq{Failed to load plugin $name: $@}; } } eval { $self->plugins->{$name} = $name->new(%$options); return 1; } or croak qq{Error calling constructor for plugin $name: $@}; eval { $self->plugins->{$name}->register($self); return 1; } or croak qq{Error calling register for plugin $name: $@}; return $self->plugins->{$name}; } sub get_options { my ($self) = @_; return { query => { '$and' => [{ '$or' => [ { delay_until => undef }, { delay_until => { '$lt' => time } } ], },{ '$or' => [{ status => { '$in' => ref($self->{pending_status}) eq 'ARRAY' ? $self->{pending_status} : [ $self->{pending_status} ], }, '$or' => [ { processing => 0 }, { processing => undef } ], },{ status => $self->{processing_status}, processing => { '$lt' => time - $self->timeout, } }], attempt => { '$lte' => $self->retries + 1, }, }] }, sort => bson_doc( # Sort by priority, then in order of creation 'priority' => 1, 'created' => -1, ), update => { '$set' => { processing => time, status => $self->{processing_status}, }, '$inc' => { attempt => 1, } } }; } sub enqueue { my ($self, @args) = @_; # args maybe # - 'job_name' # - foo => bar, 'job_name' # - 'job_name', $callback # - foo => bar, 'job_name', $callback my $callback = ref($args[-1]) eq 'CODE' ? pop @args : undef; my $job = pop @args; my %args; %args = (@args) if scalar @args; my $db_job = { priority => $args{priority} // 1, created => $args{created} // DateTime::Tiny->now, data => $job, status => $args{status} // $self->{pending_status}, attempt => 1, processing => 0, }; $db_job->{delay_until} = $args{delay_until} if $args{delay_until}; if($callback) { return $self->collection->insert($db_job => sub { my ($collection, $error, $oid) = @_; if($error) { $self->emit_safe(error => qq{Error inserting job into collection: $error}, $db_job, $error); $callback->($db_job, $error); return; } $db_job->{_id} = $oid; $self->emit_safe(enqueued => $db_job) if $self->has_subscribers('enqueued'); eval { $callback->($db_job, undef); return 1; } or $self->emit_safe(error => qq{Error in callback: $@}, $db_job, $@); }); } else { eval { $db_job->{_id} = $self->collection->insert($db_job); return 1; } or croak qq{Error inserting job into collection: $@}; $self->emit_safe(enqueued => $db_job) if $self->has_subscribers('enqueued'); return $db_job; } } sub watch { my ($self, $id_or_job, $status, $callback) = @_; my $id = ref($id_or_job) ? $id_or_job->{_id} : $id_or_job; $status //= 'Complete'; # args # - watch $queue $id, 'Status' => $callback if($callback) { # Non-blocking $self->log->debug("Waiting for $id on status $status in non-blocking mode"); return Mojo::IOLoop->timer(0 => sub { $self->_watch_nonblocking($id, $status, $callback) }); } else { # Blocking $self->log->debug("Waiting for $id on status $status in blocking mode"); return $self->_watch_blocking($id, $status); } } sub _watch_blocking { my ($self, $id, $status) = @_; while(1) { my $doc = $self->collection->find_one({'_id' => $id}); $self->log->debug("Job found by Mango: " . ($doc ? 'Yes' : 'No')); if($doc && ((!ref($status) && $doc->{status} eq $status) || (ref($status) eq 'ARRAY' && grep { $_ =~ $doc->{status} } @$status))) { return 1; } else { $self->delay->wait; } } } sub _watch_nonblocking { my ($self, $id, $status, $callback) = @_; $self->collection->find_one({'_id' => $id} => sub { my ($cursor, $err, $doc) = @_; $self->log->debug("Job found by Mango: " . ($doc ? 'Yes' : 'No')); if($doc && ((!ref($status) && $doc->{status} eq $status) || (ref($status) eq 'ARRAY' && grep { $_ =~ $doc->{status} } @$status))) { $self->log->debug("Status is $status"); $self->delay->reset; $callback->($doc, undef); } else { $self->log->debug("Job not found or status doesn't match"); $self->delay->wait(sub { return unless Mojo::IOLoop->is_running; Mojo::IOLoop->timer(0 => sub { $self->_watch_nonblocking($id, $status, $callback) }); }); return undef; } }); } sub requeue { my ($self, $job, $callback) = @_; $job->{status} = ref($self->{pending_status}) eq 'ARRAY' ? $self->{pending_status}->[0] : $self->{pending_status}; return $self->update($job, $callback); } sub dequeue { my ($self, $id_or_job, $callback) = @_; # TODO option to not remove on dequeue? my $id = ref($id_or_job) ? $id_or_job->{_id} : $id_or_job; if($callback) { $self->collection->remove({'_id' => $id} => sub { my ($collection, $error, $doc) = @_; if($error) { $self->emit_safe(error => qq(Error removing job from collection: $error), $id_or_job, $error) if $self->has_subscribers('error'); $callback->($id_or_job, $error); return; } $callback->($id_or_job, undef); $self->emit_safe(dequeued => $id_or_job) if $self->has_subscribers('dequeued'); }); } else { $self->collection->remove({'_id' => $id}); $self->emit_safe(dequeued => $id_or_job) if $self->has_subscribers('dequeued'); } } sub get { my ($self, $id_or_job, $callback) = @_; my $id = ref($id_or_job) ? $id_or_job->{_id} : $id_or_job; if($callback) { return $self->collection->find_one({'_id' => $id} => sub { my ($collection, $error, $doc) = @_; if($error) { $self->emit_safe(error => qq(Error retrieving job: $error), $id_or_job, $error) if $self->has_subscribers('error'); } $callback->($doc, $error); }); } else { return $self->collection->find_one({'_id' => $id}); } } sub update { my ($self, $job, $callback) = @_; # FIXME Temporary fix to remove queue item from MangoX::Queue::Job my $j = {}; for my $key (keys %$job) { $j->{$key} = $job->{$key} if $key ne 'queue'; } $job = $j; if($callback) { return $self->collection->update({'_id' => $job->{_id}}, $job => sub { my ($collection, $error, $doc) = @_; if($error) { $self->emit_safe(error => qq(Error updating job: $error), $job, $error) if $self->has_subscribers('error'); } $callback->($doc, $error); }); } else { return $self->collection->update({'_id' => $job->{_id}}, $job, {upsert => 1}) or croak qq{Error updating collection: $@}; } } sub fetch { my ($self, @args) = @_; # fetch $queue status => 'Complete', sub { my $job = shift; } my $callback = ref($args[-1]) eq 'CODE' ? pop @args : undef; my %args; %args = (@args) if scalar @args; $self->log->debug("In fetch"); if($callback) { $self->log->debug("Fetching in non-blocking mode"); my $consumer_id = (scalar keys %{$self->consumers}) + 1; $self->consumers->{$consumer_id} = Mojo::IOLoop->timer(0 => sub { $self->_consume_nonblocking(\%args, $consumer_id, $callback, 1) }); return $consumer_id; } else { $self->log->debug("Fetching in blocking mode"); return $self->_consume_blocking(\%args, 1); } } sub consume { my ($self, @args) = @_; # consume $queue status => 'Failed', sub { my $job = shift; } my $callback = ref($args[-1]) eq 'CODE' ? pop @args : undef; my %args; %args = (@args) if scalar @args; $self->log->debug("In consume"); if($callback) { $self->log->debug("consuming in non-blocking mode"); my $consumer_id = (scalar keys %{$self->consumers}) + 1; $self->consumers->{$consumer_id} = Mojo::IOLoop->timer(0 => sub { $self->_consume_nonblocking(\%args, $consumer_id, $callback, 0) }); $self->log->debug("Timer scheduled, consumer_id $consumer_id has timer ID: " . $self->consumers->{$consumer_id}); return $consumer_id; } else { $self->log->debug("consuming in blocking mode"); return $self->_consume_blocking(\%args, 0); } } sub release { my ($self, $consumer_id) = @_; $self->log->debug("Releasing consumer $consumer_id with timer ID: " . $self->consumers->{$consumer_id}); Mojo::IOLoop->remove($self->consumers->{$consumer_id}); delete $self->consumers->{$consumer_id}; return 1; } sub _consume_blocking { my ($self, $args, $fetch) = @_; while(1) { my $opts = $self->get_options; $opts->{query} = $args if scalar keys %$args; my $doc = $self->collection->find_and_modify($opts); $self->log->debug("Job found by Mango: " . ($doc ? 'Yes' : 'No')); if($doc && $doc->{attempt} > $self->retries) { $doc->{status} = $self->{failed_status}; $self->update($doc); $doc = undef; $self->log->debug("Job exceeded retries, status set to failed and job abandoned"); } if($doc) { $self->emit_safe(consumed => $doc) if $self->has_subscribers('consumed'); return $doc; } else { last if $fetch; $self->delay->wait; } } } sub _consume_nonblocking { my ($self, $args, $consumer_id, $callback, $fetch) = @_; $self->log->debug("Active jobs: " . $self->job_count . '/' . ($self->concurrent_job_limit < 0 ? '*' : $self->concurrent_job_limit)); # Don't allow consumption if job_count has been reached if ($self->concurrent_job_limit > -1 && $self->job_count >= $self->concurrent_job_limit) { return unless Mojo::IOLoop->is_running; return if $fetch; $self->emit_safe(concurrent_job_limit_reached => $self->concurrent_job_limit) if $self->has_subscribers('concurrent_job_limit_reached'); $self->log->debug("concurrent_job_limit_reached = " . $self->concurrent_job_limit . ", job_count = " . $self->job_count); return unless exists $self->consumers->{$consumer_id}; $self->delay->wait(sub { return unless exists $self->consumers->{$consumer_id}; $self->_consume_nonblocking($args, $consumer_id, $callback, 0); }); $self->log->debug("Timer rescheduled (job_count limit reached), consumer_id $consumer_id has timer ID: " . $self->consumers->{$consumer_id}); return; } my $opts = $self->get_options; $opts->{query} = $args if scalar keys %$args; $self->collection->find_and_modify($opts => sub { my ($cursor, $err, $doc) = @_; $self->log->debug("Job found by Mango: " . ($doc ? 'Yes' : 'No')); if($err) { $self->log->error($err); $self->emit_safe(error => $err); } if($doc && $doc->{attempt} > $self->retries) { $doc->{status} = $self->{failed_status}; $self->update($doc); $doc = undef; $self->log->debug("Job exceeded retries, status set to failed and job abandoned"); } if($doc) { $self->job_count($self->job_count + 1); $self->log->debug("job_count incremented to " . $self->job_count); my $job = MangoX::Queue::Job->new($doc); $job->queue($self); $self->delay->reset; $self->emit_safe(consumed => $job) if $self->has_subscribers('consumed'); eval { $callback->($job); return 1; } or $self->emit_safe(error => "Error in callback: $@"); return unless Mojo::IOLoop->is_running; return if $fetch; return unless exists $self->consumers->{$consumer_id}; Mojo::IOLoop->timer(0, sub { $self->_consume_nonblocking($args, $consumer_id, $callback, 0) } ); $self->log->debug("Timer rescheduled (recursive immediate), consumer_id $consumer_id has timer ID: " . $self->consumers->{$consumer_id}); } else { return unless Mojo::IOLoop->is_running; return if $fetch; $self->delay->wait(sub { return unless exists $self->consumers->{$consumer_id}; $self->_consume_nonblocking($args, $consumer_id, $callback, 0); }); $self->log->debug("Timer rescheduled (recursive delayed), consumer_id $consumer_id has timer ID: " . $self->consumers->{$consumer_id}); return undef; } }); } 1; =encoding utf8 =head1 NAME MangoX::Queue - A MongoDB queue implementation using Mango =head1 DESCRIPTION L is a MongoDB backed queue implementation using L to support blocking and non-blocking queues. L makes no attempt to handle the L connection, database or collection - pass in a collection to the constructor and L will use it. The collection can be plain, capped or sharded. For an introduction to L, see L. =head1 SYNOPSIS =head2 Non-blocking Non-blocking mode requires a running L. my $queue = MangoX::Queue->new(collection => $mango_collection); # To add a job enqueue $queue 'test' => sub { my $id = shift; }; # To set options enqueue $queue priority => 1, created => DateTime::Tiny->now, 'test' => sub { my $id = shift; }; # To watch for a specific job status watch $queue $id, 'Complete' => sub { # Job status is 'Complete' }; # To fetch a job fetch $queue sub { my ($job) = @_; # ... }; # To get a job by id get $queue $id => sub { my $job = shift; }; # To requeue a job requeue $queue $job => sub { my $id = shift; }; # To dequeue a job dequeue $queue $id => sub { }; # To consume a queue my $consumer = consume $queue sub { my ($job) = @_; # ... }; # To stop consuming a queue release $queue $consumer; # To listen for errors on $queue error => sub { my ($queue, $error) = @_; }; =head2 Blocking my $queue = MangoX::Queue->new(collection => $mango_collection); # To add a job my $id = enqueue $queue 'test'; # To set options my $id = enqueue $queue priority => 1, created => DateTime::Tiny->now, 'test'; # To watch for a specific job status watch $queue $id; # To fetch a job my $job = fetch $queue; # To get a job by id my $job = get $queue $id; # To requeue a job my $id = requeue $queue $job; # To dequeue a job dequeue $queue $id; # To consume a queue while(my $job = consume $queue) { # ... } =head2 Other my $queue = MangoX::Queue->new(collection => $mango_collection); # To listen for events on $queue enqueued => sub ( my ($queue, $job) = @_; }; on $queue dequeued => sub ( my ($queue, $job) = @_; }; on $queue consumed => sub { my ($queue, $job) = @_; }; # To register a plugin plugin $queue 'MangoX::Queue::Plugin::Statsd'; =head1 ATTRIBUTES L implements the following attributes. =head2 collection my $collection = $queue->collection; $queue->collection($mango->db('foo')->collection('bar')); my $queue = MangoX::Queue->new(collection => $collection); The L representing the MongoDB queue collection. =head2 delay my $delay = $queue->delay; $queue->delay(MangoX::Queue::Delay->new); The L responsible for dynamically controlling the delay between queue queries. =head2 concurrent_job_limit my $concurrent_job_limit = $queue->concurrent_job_limit; $queue->concurrent_job_limit(20); The maximum number of concurrent jobs (jobs consumed from the queue and unfinished). Defaults to 10. This only applies to jobs on the queue in non-blocking mode. L has an internal counter that is incremented when a job has been consumed from the queue (in non-blocking mode). The job returned is a L instance and has a descructor method that is called to decrement the internal counter. See L for more details. Set to -1 to disable queue concurrency limits. B, this could result in out of memory errors or an extremely slow event loop. =head2 plugins my $plugins = $queue->plugins; Returns a hash containing the plugins registered with this queue. =head2 retries my $retries = $queue->retries; $queue->retries(5); The number of times a job will be picked up from the queue before it is marked as failed. =head2 timeout my $timeout = $queue->timeout; $queue->timeout(10); The time (in seconds) a job is allowed to stay in Retrieved state before it is released back into Pending state. Defaults to 60 seconds. =head1 EVENTS L inherits from L and emits the following events. Events are emitted only for actions on the current queue object, not the entire queue. =head2 consumed on $queue consumed => sub { my ($queue, $job) = @_; # ... }; Emitted when an item is consumed (either via consume or fetch) =head2 dequeued on $queue dequeued => sub { my ($queue, $job) = @_; # ... }; Emitted when an item is dequeued =head2 enqueued on $queue enqueued => sub { my ($queue, $job) = @_; # ... }; Emitted when an item is enqueued =head2 concurrent_job_limit_reached on $queue enqueued => sub { my ($queue, $concurrent_job_limit) = @_; # ... }; Emitted when a job is found but the limit has been reached. =head1 METHODS L implements the following methods. =head2 consume # In blocking mode while(my $job = consume $queue) { # ... } # In non-blocking mode consume $queue sub { my ($job) = @_; # ... }; Waits for jobs to arrive on the queue, sleeping between queue checks using L or L. Currently sets the status to 'Retrieved' before returning the job. =head2 dequeue my $job = fetch $queue; dequeue $queue $job; Dequeues a job. Currently removes it from the collection. =head2 enqueue my $id = enqueue $queue 'job name'; my $id = enqueue $queue [ 'some', 'data' ]; my $id = enqueue $queue +{ foo => 'bar' }; Add an item to the queue in blocking mode. The default priority is 1 and status is 'Pending'. You can set queue options including priority, created and status. my $id = enqueue $queue, priority => 1, created => time, status => 'Pending', +{ foo => 'bar' }; For non-blocking mode, pass in a coderef as the final argument. my $id = enqueue $queue 'job_name' => sub { # ... }; my $id = enqueue $queue priority => 1, +{ foo => 'bar', } => sub { # ... }; Sets the status to 'Pending' by default. =head2 fetch # In blocking mode my $job = fetch $queue; # In non-blocking mode fetch $queue sub { my ($job) = @_; # ... }; Fetch a single job from the queue, returning undef if no jobs are available. Currently sets job status to 'Retrieved'. =head2 get # In non-blocking mode get $queue $id => sub { my ($job) = @_; # ... }; # In blocking mode my $job = get $queue $id; Gets a job from the queue by ID. Doesn't change the job status. You can also pass in a job instead of an ID. $job = get $queue $job; =head2 get_options my $options = $queue->get_options; Returns the L options hash used by find_and_modify to identify and update available queue items. =head2 release my $consumer = consume $queue sub { # ... }; release $queue $consumer; Releases a non-blocking consumer from watching a queue. =head2 requeue my $job = fetch $queue; requeue $queue $job; Requeues a job. Sets the job status to 'Pending'. =head2 update my $job = fetch $queue; $job->{status} = 'Failed'; update $queue $job; Updates a job in the queue. =head2 watch Wait for a job to enter a certain status. # In blocking mode my $id = enqueue $queue 'test'; watch $queue $id, 'Complete'; # blocks until job is complete # In non-blocking mode my $id = enqueue $queue 'test'; watch $queue $id, 'Complete' => sub { # ... }; =head1 FUTURE JOBS Jobs can be queued in advance by setting a delay_until attribute: enqueue $queue delay_until => (time + 20), "job name"; =head1 ERRORS Errors are reported by MangoX::Queue using callbacks and L To listen for all errors on a queue, subscribe to the 'error' event: $queue->on(error => sub { my ($queue, $job, $error) = @_; # ... }); To check for errors against an individual update, enqueue or dequeue call, you can check for an error argument to the callback sub: enqueue $queue +$job => sub { my ($job, $error) = @_; if($error) { # ... } } =head1 CONTRIBUTORS =over =item Ben Vinnerd, ben@vinnerd.com =back =head1 SEE ALSO L, L, L =cut