Git Product home page Git Product logo

phalcon-queue-db's People

Contributors

brisc avatar gte451f avatar igorsantos07 avatar marceloandrader avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar

phalcon-queue-db's Issues

Implement TTR

Currently there's no implementation of time-to-run hooks. Jobs may timeout and we will never know?

Possible implementation: when a job is reserved, turn the delay into ttr: if there's no TTR, delay becomes zero; if there's TTR, delay becomes time()+TTR. Finally, peeks/reserves will return jobs with state=reserved AND delay<time(), but won't return any reserved job with delay=0 - meaning a job that gets reserved with no timeout will be reserved only once, but if it's reserved with a timeout and it expires, it can be reserved again, and if it's ready, it can be reserved as soon as its delay expired.

  • implement TTR behaviour
  • implement Job::touch()
  • include this on tests

Add a middle-man to run tasks wired to specific tubes

This was already made in the main project, but I found no way to supply this feature in a good way from this queue package. Maybe we could add those classes and the developer simply extends/implements what's needed, since there's no way to reference a task that's outside of the main project files?

tasks/WorkTask.php

<?php
use Phalcon\CLI\Task;
use Phalcon\Queue\Db as DbQueue;
use Phalcon\Text;
use PhalconRest\tasks\traits\Options;
use PhalconRest\tasks\workers\Worker;

class WorkTask extends Task
{
    use Options;

    protected static $options     = [];
    protected static $optionsSpec = [
        'limit' => '\d+',
        'delay' => '[\d\.]+',
    ];

    public function __call($action, array $arguments = [])
    {
        $name  = substr($action, 0, -strlen('action'));
        $class = '\PhalconRest\tasks\workers\\'.Text::camelize($name);
        if (class_exists($class)) {
            $options = self::processOptions(isset($arguments[0])? $arguments[0] : []);
            $worker  = new $class($options);

            if (!($worker instanceof Worker)) {
                echo "$class must be an instance of tasks\\workers\\Worker\n";
                die(254);
            }

            $result = (new DbQueue)
                ->watch($name, true)
                ->process([$worker,'handleJob'], static::$options['delay']?: 1, static::$options['limit']);

            echo "\nStats: ".print_r($result, true);
        } else {
            echo "There's no such worker class: $name ($class)\n";
            die(254);
        }
    }

    public function mainAction()
    {
        echo <<<HELP
  -= DbQueue Task runner =-
This scripts walks through available jobs and processes them given a specific class handler.

Usage: app/cli.php work «tube»
Implementation: tasks/workers/«Tube»::handleJob(\$body, Job \$job):bool|void
Options:
    --delay=F   Delay between asking for new jobs when the queue is over
    --limit=I   How much jobs to process before exiting
    --???=???   Other options are passed directly to the Worker constructor

HELP;
    }

}

tasks/worker/Worker.php

<?php namespace PhalconRest\tasks\workers;

use Phalcon\Queue\Db\Job;

interface Worker
{
    public function handleJob($body, Job $job);
}

tasks/traits/Options.php

<?php namespace PhalconRest\tasks\traits;

/**
 * Trait Options
 * Adds ability to process options.
 * @static array $optionsSpec Associative array of option name => regex
 *
 * <code>
 *     protected static $optionsSpec = [
 *         'limit'    => '\d*',        //allows --limit=123
 *         'category' => '.*',         //allows --category=u8nreijgndfjn
 *         'fast'     => true/false,   //allows --fast
 *
 * </code>
 */
trait Options
{

    protected static function processOptions(array $args)
    {
        if (!isset(static::$optionsSpec)) {
            return false;
        }

        $spec    = static::$optionsSpec;
        $options = array_combine(array_keys($spec), array_fill(0, sizeof($spec), null));
        foreach (static::$optionsSpec as $name => $regex) {
            $opt   = "--$name";
            $found = false;

            foreach ($args as $k => $arg) {
                if ($regex === true && $arg == $opt) {
                    $options[$name] = true;
                    $found = true;
                } elseif ($regex == '.*' && strpos($arg, $opt.'=')) {
                    $options[$name] = substr($arg, strlen($opt) + 1);
                    $found[] = $arg;
                    $found = true;
                } elseif (preg_match("/$opt=($regex)/", $arg, $matches)) {
                    $options[$name] = $matches[1];
                    $found[] = $arg;
                    $found = true;
                }

                if ($found) {
                    unset($args[$k]); //removes what's found
                    $found = false;
                    break;
                }
            }
        }
        static::$options = $options;

        $rest = [];
        foreach ($args as $arg) {
            $arg    = ltrim($arg, '-');
            $pieces = explode('=', $arg);
            $rest[$pieces[0]] = isset($pieces[1])? $pieces[1] : true;
        }

        return $rest; //returns what's left
    }

}

Implement loop flag on Db::reserve(), peekReady() and friends

There could be a flag on the Db class - or inside each of those methods - that would reduce the number of database reads.

Currently, each iteration over reserve() issues a SELECT. If there are 100 jobs waiting to be run, 200 queries will happen: 100 selecting and 100 deleting. Instead, one one SELECT query could be issued if the developer sets a given flag, and a pool of jobs would be held inside the Queue class for further processing.

Example:

$queue = new DbQueue();
$queue->bulkFetches = true;
$queue->put(1);
$queue->put(2);
$queue->put(3);
while ($job = $queue->reserve()) { //one query, three jobs
    echo $job->getBody();
    $job->delete(); //one query per job; can't be avoided as further jobs may hang
}

A call to reserve(), peekReady(), peekBuried() or peekDelayed() would retrieve all available jobs given those constraints, store them in a cache (a class property for instance) and returns one every time the method is called again. Once that cache is emptied, a new query would happen.

Write README

  • introduction (wtf is this package)
  • how to install
  • how to use
  • differences from the original implementation
  • set package stability to stable

Db::reserve() should block when $timeout = null

We should add a $pool argument to Db::reserve() to allow it to run an internal loop to get new jobs.
This additional argument could be useful to not overload the database connection, while creating an infinite pool of jobs in an easier fashion.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.