smith-carson / phalcon-queue-db Goto Github PK
View Code? Open in Web Editor NEWPhalcon extension that implements Queuing system through the Database
Home Page: https://packagist.org/packages/igorsantos07/phalcon-queue-db
Phalcon extension that implements Queuing system through the Database
Home Page: https://packagist.org/packages/igorsantos07/phalcon-queue-db
Currently there's no implementation of time-to-run hooks. Jobs may timeout and we will never know?
Possible implementation: when a job is reserved, turn the delay
into ttr
: if there's no TTR, delay becomes zero; if there's TTR, delay becomes time()
+TTR. Finally, peeks/reserves will return jobs with state=reserved AND delay<time()
, but won't return any reserved job with delay=0
- meaning a job that gets reserved with no timeout will be reserved only once, but if it's reserved with a timeout and it expires, it can be reserved again, and if it's ready, it can be reserved as soon as its delay expired.
Example:
$queue->process(function($body, $job) {
//do stuff with $body or $job
return true; //true would delete the job, false would bury it, null would release it
});
If so, it would be cool that the Queue class allow its end user to select a persistent connection instead of having them to implement a second DI key just for this.
This was already made in the main project, but I found no way to supply this feature in a good way from this queue package. Maybe we could add those classes and the developer simply extends/implements what's needed, since there's no way to reference a task that's outside of the main project files?
tasks/WorkTask.php
<?php
use Phalcon\CLI\Task;
use Phalcon\Queue\Db as DbQueue;
use Phalcon\Text;
use PhalconRest\tasks\traits\Options;
use PhalconRest\tasks\workers\Worker;
class WorkTask extends Task
{
use Options;
protected static $options = [];
protected static $optionsSpec = [
'limit' => '\d+',
'delay' => '[\d\.]+',
];
public function __call($action, array $arguments = [])
{
$name = substr($action, 0, -strlen('action'));
$class = '\PhalconRest\tasks\workers\\'.Text::camelize($name);
if (class_exists($class)) {
$options = self::processOptions(isset($arguments[0])? $arguments[0] : []);
$worker = new $class($options);
if (!($worker instanceof Worker)) {
echo "$class must be an instance of tasks\\workers\\Worker\n";
die(254);
}
$result = (new DbQueue)
->watch($name, true)
->process([$worker,'handleJob'], static::$options['delay']?: 1, static::$options['limit']);
echo "\nStats: ".print_r($result, true);
} else {
echo "There's no such worker class: $name ($class)\n";
die(254);
}
}
public function mainAction()
{
echo <<<HELP
-= DbQueue Task runner =-
This scripts walks through available jobs and processes them given a specific class handler.
Usage: app/cli.php work «tube»
Implementation: tasks/workers/«Tube»::handleJob(\$body, Job \$job):bool|void
Options:
--delay=F Delay between asking for new jobs when the queue is over
--limit=I How much jobs to process before exiting
--???=??? Other options are passed directly to the Worker constructor
HELP;
}
}
tasks/worker/Worker.php
<?php namespace PhalconRest\tasks\workers;
use Phalcon\Queue\Db\Job;
interface Worker
{
public function handleJob($body, Job $job);
}
tasks/traits/Options.php
<?php namespace PhalconRest\tasks\traits;
/**
* Trait Options
* Adds ability to process options.
* @static array $optionsSpec Associative array of option name => regex
*
* <code>
* protected static $optionsSpec = [
* 'limit' => '\d*', //allows --limit=123
* 'category' => '.*', //allows --category=u8nreijgndfjn
* 'fast' => true/false, //allows --fast
*
* </code>
*/
trait Options
{
protected static function processOptions(array $args)
{
if (!isset(static::$optionsSpec)) {
return false;
}
$spec = static::$optionsSpec;
$options = array_combine(array_keys($spec), array_fill(0, sizeof($spec), null));
foreach (static::$optionsSpec as $name => $regex) {
$opt = "--$name";
$found = false;
foreach ($args as $k => $arg) {
if ($regex === true && $arg == $opt) {
$options[$name] = true;
$found = true;
} elseif ($regex == '.*' && strpos($arg, $opt.'=')) {
$options[$name] = substr($arg, strlen($opt) + 1);
$found[] = $arg;
$found = true;
} elseif (preg_match("/$opt=($regex)/", $arg, $matches)) {
$options[$name] = $matches[1];
$found[] = $arg;
$found = true;
}
if ($found) {
unset($args[$k]); //removes what's found
$found = false;
break;
}
}
}
static::$options = $options;
$rest = [];
foreach ($args as $arg) {
$arg = ltrim($arg, '-');
$pieces = explode('=', $arg);
$rest[$pieces[0]] = isset($pieces[1])? $pieces[1] : true;
}
return $rest; //returns what's left
}
}
The original Beanstalk implementation returns the number of currently connected workers, so this should be stored in some sort of meta-table for maintenance purposes?
Currently, adding $this->useDynamicUpdate(true);
to the model makes some tests fail :(
There could be a flag on the Db class - or inside each of those methods - that would reduce the number of database reads.
Currently, each iteration over reserve() issues a SELECT
. If there are 100 jobs waiting to be run, 200 queries will happen: 100 selecting and 100 deleting. Instead, one one SELECT
query could be issued if the developer sets a given flag, and a pool of jobs would be held inside the Queue class for further processing.
Example:
$queue = new DbQueue();
$queue->bulkFetches = true;
$queue->put(1);
$queue->put(2);
$queue->put(3);
while ($job = $queue->reserve()) { //one query, three jobs
echo $job->getBody();
$job->delete(); //one query per job; can't be avoided as further jobs may hang
}
A call to reserve()
, peekReady()
, peekBuried()
or peekDelayed()
would retrieve all available jobs given those constraints, store them in a cache (a class property for instance) and returns one every time the method is called again. Once that cache is emptied, a new query would happen.
This way we keep consistency throughout the API.
stable
We should add a $pool
argument to Db::reserve()
to allow it to run an internal loop to get new jobs.
This additional argument could be useful to not overload the database connection, while creating an infinite pool of jobs in an easier fashion.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.