Git Product home page Git Product logo

mosquito's Introduction

mosquito

GitHub

Mosquito is a generic background job runner written primarily for Crystal. Significant inspiration from experience with the successes and failings many Ruby gems in this vein. Once compiled, a mosquito binary can start work in about 10 milliseconds.

Mosquito currently provides these features:

  • Delayed execution (SendEmailJob.new(email: :welcome, address: user.email).enqueue(in: 3.minutes))
  • Scheduled / Periodic execution (RunEveryHourJob.new)
  • Job Storage in Redis
  • Automatic rescheduling of failed jobs
  • Progressively increasing delay of rescheduled failed jobs
  • Dead letter queue of jobs which have failed too many times
  • Rate limited jobs

Current Limitations:

Project State

The Mosquito project is stable. A few folks are using Mosquito in production, and it's going okay.

There are some features which would be nice to have, but what is here is both tried and tested.

If you're using Mosquito, please get in touch on the Discussion board or on Crystal chat with any questions, feature suggestions, or feedback.

Installation

Update your shard.yml to include mosquito:

dependencies:
+  mosquito:
+    github: mosquito-cr/mosquito

Usage

Step 1: Define a queued job

# src/jobs/puts_job.cr
class PutsJob < Mosquito::QueuedJob
  param message : String

  def perform
    puts message
  end
end

Step 2: Trigger that job

# src/<somewher>/<somefile>.cr
PutsJob.new(message: "ohai background job").enqueue

Step 3: Run your worker to process the job

# src/worker.cr

Mosquito.configure do |settings|
  settings.redis_url = ENV["REDIS_URL"]
end

Mosquito::Runner.start
crystal run src/worker.cr

Success

> crystal run src/worker.cr
2017-11-06 17:07:29 - Mosquito is buzzing...
2017-11-06 17:07:51 - Running task puts_job<...> from puts_job
2017-11-06 17:07:51 - [PutsJob] ohai background job
2017-11-06 17:07:51 - task puts_job<...> succeeded, took 0.0 seconds

More information about queued jobs in the manual.


Periodic Jobs

Periodic jobs run according to a predefined period -- once an hour, etc.

This periodic job:

class PeriodicallyPutsJob < Mosquito::PeriodicJob
  run_every 1.minute

  def perform
    emotions = %w{happy sad angry optimistic political skeptical epuhoric}
    puts "The time is now #{Time.local} and the wizard is feeling #{emotions.sample}"
  end
end

Would produce this output:

2017-11-06 17:20:13 - Mosquito is buzzing...
2017-11-06 17:20:13 - Queues: periodically_puts_job
2017-11-06 17:20:13 - Running task periodically_puts_job<...> from periodically_puts_job
2017-11-06 17:20:13 - [PeriodicallyPutsJob] The time is now 2017-11-06 17:20:13 and the wizard is feeling skeptical
2017-11-06 17:20:13 - task periodically_puts_job<...> succeeded, took 0.0 seconds
2017-11-06 17:21:14 - Queues: periodically_puts_job
2017-11-06 17:21:14 - Running task periodically_puts_job<...> from periodically_puts_job
2017-11-06 17:21:14 - [PeriodicallyPutsJob] The time is now 2017-11-06 17:21:14 and the wizard is feeling optimistic
2017-11-06 17:21:14 - task periodically_puts_job<...> succeeded, took 0.0 seconds
2017-11-06 17:22:15 - Queues: periodically_puts_job
2017-11-06 17:22:15 - Running task periodically_puts_job<...> from periodically_puts_job
2017-11-06 17:22:15 - [PeriodicallyPutsJob] The time is now 2017-11-06 17:22:15 and the wizard is feeling political
2017-11-06 17:22:15 - task periodically_puts_job<...> succeeded, took 0.0 seconds

More information on periodic jobs in the manual.

Advanced usage

For more advanced topics, including use with Lucky Framework, throttling or rate limiting, check out the full manual.

Contributing

Contributions are welcome. Please fork the repository, commit changes on a branch, and then open a pull request.

Crystal Versions

Mosquito aims to be compatible with the latest Crystal release, and the latest patch for all post-1.0 minor crystal versions.

For development purposes you're encouraged to stay in sync with .tool-versions.

Testing

This repository uses minitest for testing. As a result, crystal spec doesn't do anything helpful. Do this instead:

make test

In lieu of crystal spec bells and whistles, Minitest provides a nice alternative to running one test at a time instead of the whole suite.

mosquito's People

Contributors

alex-lairan avatar anapsix avatar blacksmoke16 avatar jwoertink avatar mamantoha avatar psikoz avatar robacarp avatar watzon avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

mosquito's Issues

Test Mode

It would be nice if there were some sort of test mode where I can either just assert that a job was added to the queue or run the jobs immediately when queued.

Here is some information on how this is done in Sidekiq (for ruby):
https://github.com/mperham/sidekiq/wiki/Testing

Thanks!

Postgres Backend

The most often requested feature for Mosquito is to use postgres instead of redis as a backend for metadata and queue functions, and I'm surprised there isn't an issue about it.

Using multiple params

I guess this isn't a thing? I'm assuming I have to make a special object and do my own custom serialization? If that's the case, then there may need to be a better error message for this.

class AfterLoginWorker < Mosquito::QueuedJob
  params member_id : String
  params ip : String

  def perform
  end
end
web    | no argument named 'member_id'
web    | Matches are:
web    |  - AfterLoginWorker.new(ip : String)
web    |  - AfterLoginWorker.new() (trying this one)

Ability to throttle jobs

The ability to throttle the rate and concurrency of jobs would be of great value to me.

Ideally this would work similar to the sidekiq-throttle gem where i can specify in a job:
sidekiq_throttle(concurrency: { limit: 1 }, threshold: { limit: 5, period: 1.minute })

This will limit the job to 1 concurrent worker processing 5 messages per minute.

The use case behind this is I am adding messages to a queue to asynchronously process that send off requests to a third party API. The endpoint i am sending the requests to is limited to 5 per minute. Having this would be a super easy way to handle rate limited jobs either due to third party/internal/performance reasons.

Job crashing with Missing hash key: "limit" (KeyError)

Please include some details:

Crystal version: 1.2.2
Mosquito Shard version: latest

One of my periodic jobs is crashing occasionally with the following error:

Unhandled exception: Missing hash key: "limit" (KeyError)
from /usr/share/crystal/src/hash.cr:1905:15 in 'run_next_task'
from lib/mosquito/src/mosquito/runner.cr:132:9 in 'run'
from lib/mosquito/src/mosquito/runner.cr:21:9 in 'start'
from src/app_worker.cr:11:1 in '__crystal_main'
from /lib/x86_64-linux-gnu/libc.so.6 in '__libc_start_main'
from /var/apps/dendrorithms.com/current/app_worker in '_start'
from ???

Most of the time, systemd is able to restart the service, but sometimes it's not. That's something I need to tweak but I'm curious to know why it's crashing. Does it ring a bell with you?

Custom serializer docs update

The docs on the Wiki for the custom serializer needs a little updating. The current example wouldn't be able to compile

  class SpecialtyObject
    property name : String
    property value : Int32
+  def initialize(@name, @value)
+  end
  end

  def deserialize_specialty_object(raw : String) : SpecialtyObject
-    specialty = SpecialtyObject.new
    parts = raw.split "="
-    specialty.name = parts.first
-    specialty.value = parts.last
+   SpecialtyObject.new(name: parts.first, value: parts.last)
  end

In this case, the method has to return the instance of SpecialtyObject, plus crystal requires the instance variables to be initialized in some initialize method.

Reinventing the wheel with custom serialization?

In order to store job data in the queue, it needs to be serialized and deserialized for execution.

I'm wondering why this shard pulls up a completely proprietary serialization mechanism while there are widely supported general-purpose solutions to this problem. This makes it unnecessarily difficult for users because you need to implement custom serialization methods for anything but the most basic data types.

In my opinion, using a standard format like JSON for this would provide great benefits. There is practically no downside I can think of besides the cost for switching (which is not very high).
JSON is a universal data format and with Crystal's JSON implementation, it is really easy to provide bindings for custom datatypes, which are not limited to use with mosquito. It is string based, so instead of storing a hash in redis, mosquito would store a string. I don't think that makes much of a difference. The current implementation has some issues, because job data and task metadata are stored in the same hash, which will cause problems if a task uses eg. enqueue_time as parameter.

A huge benefit besides usability is also the reduction of complexity. Serializing and deserializing JSON is all provided in Crystal's stdlib. The entire params macro could be reduced or potentially even completely removed in favour of using plain properties/ivars.

Provide a job metadata store

Currently Job.config is a bespoke implementation used only for the throttling logic, which is a leak in the Job abstraction. In order to move the throttling logic into it's own layer Jobs needd a general purpose metadata hash which is accessible in a before_run hook (#69).

Can't install newest version of mosquito.

I get an error after adding mosquito to my shards.yml. Using version 0.2.0 works but anything newer breaks redis.

Error resolving redis (~> 2.0.0, *, ~> 2.1.1)

Using amber (0.28.0 at master)
Using amber_router (0.3.0)
Using cli (0.7.0)
Using optarg (0.5.8)
Using callback (0.6.3)
Using string_inflection (0.2.1)
Using compiled_license (0.1.3)
Using kilt (0.4.0)
Using liquid (0.3.0)
Using inflector (0.1.8)
Using micrate (0.3.3)
Using db (0.5.1)
Using mysql (0.6.0)
Using pg (0.16.1)
Using redis (2.0.0)
Using pool (0.2.3)
Using shell-table (0.9.2 at 078a04ea58ead5203bb435a3b5fff448ddabaeea)
Using slang (1.7.1)
Using sqlite3 (0.11.0)
Using teeplate (0.7.0)
Using exception_page (0.1.2)
Using granite (0.15.2)
Using quartz_mailer (0.5.3)
Using email (0.3.3)
Using jasper_helpers (0.2.4)
Using citrine-i18n (0.3.2)
Using i18n (0.2.0)
Installing mosquito (0.2.0)
Using garnet_spec (0.2.1)
Using selenium (0.4.0)

Expiring jobs

Thanks for all the work that's gone into this.
One feature I'd like to see added would be expiring jobs. If a job can't/isn't processed within/until X, it won't run.
Example: if I add a job to the queue, I only want it to run within the next 30 seconds. If the worker is not running or doesn't process the job within 30 seconds, it shouldn't work the job.

I'll look into implementing this myself sometime the next week.

Support crl-0.27.0 undefined epoch_ms

When we try to enqueue a job to task store, we get an issue:

in lib/mosquito/src/mosquito/queued_job.cr:108: instantiating 'Mosquito::Task#store()'

      task.store
           ^~~~~

in lib/mosquito/src/mosquito/task.cr:46: undefined method 'epoch_ms' for Time

      epoch = time.epoch_ms.to_s
                   ^~~~~~~~

Rerun with --error-trace to show a complete error trace.

epoch_ms is removed in favour to Times to_unix_ms.

Run mosquito using multiple processes

I've got 5 jobs in my app. One of them takes about 45 seconds to complete and it's holding up all the other ones. Is there a way to run mosquito with multiple processes?

The alternative I have now is to create a small separate app and run it from there. And then scale by creating multiple instances of that app. But if there's another way, that would be better of course. Something like https://github.com/jwoertink/lucky-cluster.

Add before/after hooks to jobs

This should provide a nice interface to tackle meta-questions about jobs and tasks:

  • #58
  • #41
  • Rate limiting jobs - perhaps even with a runtime computed rate limiting key

A before hook should be able to raise or return in some way that prevents the job from executing.

Error resolving redis (~> 2.0.0, ~> 2.1.1) v0.3.0

shard.yml

name: install-test-app
version: 0.1.0

authors:

  • Brennick Langston

crystal: 0.27.2

license: UNLICENSED

targets:
install-test-app:
main: src/install-test-app.cr

amber:
main: lib/amber/src/amber/cli.cr

dependencies:
amber:
github: amberframework/amber
version: 0.27.0
#branch: master

granite:
github: amberframework/granite
version: ~> 0.15.0

quartz_mailer:
github: amberframework/quartz-mailer
version: ~> 0.5.1

jasper_helpers:
github: amberframework/jasper-helpers
version: ~> 0.2.1

pg:
github: will/crystal-pg
version: ~> 0.15.0
citrine-i18n:
github: amberframework/citrine-i18n
version: 0.3.2

mosquito:
github: robacarp/mosquito

development_dependencies:
garnet_spec:
github: amberframework/garnet-spec
version: ~> 0.2.1


US7074149-M001:install-test-app brennicklangston$ shards install
Fetching https://github.com/amberframework/amber.git
Fetching https://github.com/amberframework/amber-router.git
Fetching https://github.com/mosop/cli.git
Fetching https://github.com/mosop/optarg.git
Fetching https://github.com/mosop/callback.git
Fetching https://github.com/mosop/string_inflection.git
Fetching https://github.com/elorest/compiled_license.git
Fetching https://github.com/jeromegn/kilt.git
Fetching https://github.com/TechMagister/liquid.cr.git
Fetching https://github.com/phoffer/inflector.cr.git
Fetching https://github.com/amberframework/micrate.git
Fetching https://github.com/crystal-lang/crystal-db.git
Fetching https://github.com/crystal-lang/crystal-mysql.git
Fetching https://github.com/will/crystal-pg.git
Fetching https://github.com/stefanwille/crystal-redis.git
Fetching https://github.com/ysbaddaden/pool.git
Fetching https://github.com/luckyframework/shell-table.cr.git
Fetching https://github.com/jeromegn/slang.git
Fetching https://github.com/crystal-lang/crystal-sqlite3.git
Fetching https://github.com/mosop/teeplate.git
Fetching https://github.com/crystal-loot/exception_page.git
Fetching https://github.com/amberframework/granite.git
Fetching https://github.com/amberframework/quartz-mailer.git
Fetching https://github.com/arcage/crystal-email.git
Fetching https://github.com/amberframework/jasper-helpers.git
Fetching https://github.com/amberframework/citrine-i18n.git
Fetching https://github.com/TechMagister/i18n.cr.git
Fetching https://github.com/robacarp/mosquito.git
Error resolving redis (~> 2.0.0, ~> 2.1.1)


Temporary Fix:

  • version: 0.2.1

Update use of ::Log to better indicate where a message is coming from

Currently, mosquito logs look like this:

INFO - mosquito: Success: task queued_job<1642475392146:636> finished and took 13.0µs
INFO - mosquito: Running task custom_serializers_job<1642475392145:418> from custom_serializers_job
INFO - mosquito: using custom serialization: 12
INFO - mosquito: deserialized: 120
INFO - mosquito: Success: task custom_serializers_job<1642475392145:418> finished and took 17.0µs
INFO - mosquito: Running task rate_limited_job<1642475392146:766> from rate_limited_job
INFO - mosquito: mosquito:rate_limit:rate_limited_job
INFO - mosquito: Success: task rate_limited_job<1642475392146:766> finished and took 37.0µs
INFO - mosquito: Running task custom_serializers_job<1642475392145:480> from custom_serializers_job
INFO - mosquito: using custom serialization: 525600
INFO - mosquito: deserialized: 5256000
INFO - mosquito: Success: task custom_serializers_job<1642475392145:480> finished and took 25.0µs
INFO - mosquito: Running task rate_limited_job<1642475392146:571> from rate_limited_job
INFO - mosquito: mosquito:rate_limit:rate_limited_job
INFO - mosquito: Success: task rate_limited_job<1642475392146:571> finished and took 25.0µs
INFO - mosquito: Running task rate_limited_job<1642475392147:32> from rate_limited_job
WARN - mosquito: Failure: task rate_limited_job<1642475392147:32> failed, taking 34.0µs and will run again in 00:00:02 (at 2022-01-18 03:09:54 UTC)
INFO - mosquito: Running task periodically_puts<1642475392149:419> from periodically_puts
INFO - mosquito: Hello from PeriodicallyPuts

This defeats some of the purposes of using Log because messages cannot be filtered by topic easily. Ideally mosquito should be one of:

  • mosquito.runner
  • mosquito.job
  • mosquito.job.{job name}

undefined macro variable 'types'

This happens when adding in a type that isn't supported by serialization.

class SomeJob < Mosquito::QueuedJob
  params ip : String?

  def perform
  end
end
web    | in macro 'inherited' /app/lib/mosquito/src/mosquito/queued_job.cr:3, line 33:
....
web    |   32.   if type.is_a?(Union)
web    | > 33.     raise("Mosquito Job: Unable to generate a constructor for Union Types: #{types}")
web    |   34.   else
....
web    |
web    | undefined macro variable 'types'

Implement single-binary concurrent workers

This would also be a good opportunity to clean up the Runner class, as it's currently wired up like a pile of spaghetti.

Off the top of my head, here's a checklist:

  • reconsider the singleton pattern in Runner
  • take a runtime parameter for spawns (and forks?)
  • both runner and overmind have infinite loops...are both necessary?

Mosquito does not display job timing accurately, clarification needed

Please clarify what finished and took ... log output measures.
It looks like it does not actually reflect how long the job took to complete.

Crystal version: 0.35.1
Mosquito Shard version: 0.4.4

Example code:

require "mosquito"

Mosquito::Redis.instance.flushall

class PutsJob < Mosquito::QueuedJob
  params seconds : Int32
  def perform
    puts "sleeping for #{seconds} seconds.."
    sleep seconds
  end
end

PutsJob.new(seconds: 1).enqueue
PutsJob.new(seconds: 2).enqueue
PutsJob.new(seconds: 3).enqueue
PutsJob.new(seconds: 4).enqueue
PutsJob.new(seconds: 5).enqueue

spawn do
  Mosquito::Runner.start
end

sleep 20

Output:

2020-12-11T14:03:29.342249Z   INFO - mosquito: Mosquito is buzzing...
2020-12-11T14:03:29.343754Z   INFO - mosquito: Running task puts_job<1607695409338:670> from puts_job
sleeping for 1 seconds..
2020-12-11T14:03:30.349457Z   INFO - mosquito: Success: task puts_job<1607695409338:670> finished and took 36.0µs
2020-12-11T14:03:30.350865Z   INFO - mosquito: Running task puts_job<1607695409338:521> from puts_job
sleeping for 2 seconds..
2020-12-11T14:03:32.355791Z   INFO - mosquito: Success: task puts_job<1607695409338:521> finished and took 28.0µs
2020-12-11T14:03:32.357121Z   INFO - mosquito: Running task puts_job<1607695409339:896> from puts_job
sleeping for 3 seconds..
2020-12-11T14:03:35.359185Z   INFO - mosquito: Success: task puts_job<1607695409339:896> finished and took 21.0µs
2020-12-11T14:03:35.360249Z   INFO - mosquito: Running task puts_job<1607695409340:325> from puts_job
sleeping for 4 seconds..
2020-12-11T14:03:39.363053Z   INFO - mosquito: Success: task puts_job<1607695409340:325> finished and took 43.0µs
2020-12-11T14:03:39.364571Z   INFO - mosquito: Running task puts_job<1607695409340:586> from puts_job
sleeping for 5 seconds..
2020-12-11T14:03:44.365458Z   INFO - mosquito: Success: task puts_job<1607695409340:586> finished and took 40.0µs

Can't use months with PeriodicJob

class AccountExpirationWorker < Mosquito::PeriodicJob
  run_every 1.month

  def perform
  end
end
no overload matches 'Mosquito::Base.register_job_interval' with types NotifyCardsExpiringWorker.class, Time::MonthSpan

This one might be a little tricky since months change length. For now, using 30.days works.

Crystal 1.1.1
Mosquito 0.11.0

allow tasks to be scheduled to run at a specific wallclock time

for now, this can be implemented as a periodic job which enqueues a job at a specific time:

class WallclockJob < Mosquito::QueuedJob
# whatever
end

class SchedulerJob < Mosquito::PeriodicJob
  run_every 1.minute

  def perform
    if Time.now.hour == 3 && Time.now.minute == 45
      WallclockJob.new.enqueue
    end
  end
end

Expected a parameter named courier but found nil instead.

mosquito.cr

require "redis"
require "./EveToolsApi/tasks/**"
require "./EveToolsApi/models/**"

Clear.logger.level = ::Logger::DEBUG
include EveToolsApi::Models
include EveToolsApi::Helpers

courier = Courier.query.first!

pp courier
pp typeof(courier)

EveToolsApi::Tasks::SendMail.new(courier).enqueue

Mosquito::Runner.start

send_mail.cr

require "mosquito"
require "../models/courier"

module EveToolsApi::Tasks
  class SendMail < Mosquito::QueuedJob
    params(courier : EveToolsApi::Models::Courier | Nil)

    def perform
      pp courier
    end
  end
end

output

mosquito    | #<EveToolsApi::Models::Courier:0x230edd0
mosquito    |  @acceptor_id_column=262543627,
mosquito    |  @attributes={},
mosquito    |  @cache=
mosquito    |   #<Clear::Model::QueryCache:0x22fa620 @cache={}, @cache_activation=Set{}>,
mosquito    |  @collateral_column=0.0,
mosquito    |  @contract_id_column=135484934,
mosquito    |  @date_accepted_column=2018-08-11 13:28:39.0 +00:00,
mosquito    |  @date_completed_column=2018-08-11 15:30:42.0 +00:00,
mosquito    |  @date_expired_column=2018-08-24 00:24:21.0 +00:00,
mosquito    |  @date_issued_column=2018-08-10 00:24:21.0 +00:00,
mosquito    |  @days_to_complete_column=14,
mosquito    |  @end_location_id_column=1022568876022_i64,
mosquito    |  @errors=[],
mosquito    |  @errors_column=[],
mosquito    |  @is_correct_column=true,
mosquito    |  @issuer_alliance_id_column=1555742183,
mosquito    |  @issuer_corporation_id_column=98173218,
mosquito    |  @issuer_id_column=1894412054,
mosquito    |  @mail_sent_column=nil,
mosquito    |  @persisted=true,
mosquito    |  @reward_column=66000000.0,
mosquito    |  @start_location_id_column=60003760_i64,
mosquito    |  @status_column="finished",
mosquito    |  @title_column="",
mosquito    |  @volume_column=94130.0703125>
mosquito    | EveToolsApi::Models::Courier
mosquito    | I, [2018-09-09 22:34:12 +00:00 #923]  INFO -- : Mosquito is buzzing...
mosquito    | I, [2018-09-09 22:34:12 +00:00 #923]  INFO -- : Found 1 delayed tasks
mosquito    | I, [2018-09-09 22:34:12 +00:00 #923]  INFO -- : Running task eve_tools_api::tasks::send_mail<mosquito:task:1536532452450:830> from eve_tools_api::tasks::send_mail
mosquito    | I, [2018-09-09 22:34:12 +00:00 #923]  INFO -- : [EveToolsApi::Tasks::SendMail-mosquito:task:1536532452450:830] Job failed! Raised Exception: Expected a parameter named courier but found nil instead. The record may not exist in the database or the parameter may not have been provided when the job was enqueued.
api2        | [production] Kemal is ready to lead at http://0.0.0.0:6000
mosquito    | I, [2018-09-09 22:34:12 +00:00 #923]  INFO -- : [EveToolsApi::Tasks::SendMail-mosquito:task:1536532452450:830] src/EveToolsApi/tasks/send_mail.cr:6:5 in 'courier'
mosquito    | I, [2018-09-09 22:34:12 +00:00 #923]  INFO -- : [EveToolsApi::Tasks::SendMail-mosquito:task:1536532452450:830] src/EveToolsApi/tasks/send_mail.cr:9:7 in 'perform'
mosquito    | I, [2018-09-09 22:34:12 +00:00 #923]  INFO -- : [EveToolsApi::Tasks::SendMail-mosquito:task:1536532452450:830] lib/mosquito/src/mosquito/job.cr:35:7 in 'run'
mosquito    | I, [2018-09-09 22:34:12 +00:00 #923]  INFO -- : [EveToolsApi::Tasks::SendMail-mosquito:task:1536532452450:830] lib/mosquito/src/mosquito/task.cr:73:10 in 'run'
mosquito    | I, [2018-09-09 22:34:12 +00:00 #923]  INFO -- : [EveToolsApi::Tasks::SendMail-mosquito:task:1536532452450:830] lib/mosquito/src/mosquito/runner.cr:122:14 in 'run_next_task'
mosquito    | I, [2018-09-09 22:34:12 +00:00 #923]  INFO -- : [EveToolsApi::Tasks::SendMail-mosquito:task:1536532452450:830] lib/mosquito/src/mosquito/runner.cr:86:9 in 'dequeue_and_run_tasks'
mosquito    | I, [2018-09-09 22:34:12 +00:00 #923]  INFO -- : [EveToolsApi::Tasks::SendMail-mosquito:task:1536532452450:830] lib/mosquito/src/mosquito/runner.cr:30:9 in 'run'
mosquito    | I, [2018-09-09 22:34:12 +00:00 #923]  INFO -- : [EveToolsApi::Tasks::SendMail-mosquito:task:1536532452450:830] lib/mosquito/src/mosquito/runner.cr:10:7 in 'start'
mosquito    | I, [2018-09-09 22:34:12 +00:00 #923]  INFO -- : [EveToolsApi::Tasks::SendMail-mosquito:task:1536532452450:830] src/mosquito.cr:0:1 in '__crystal_main'
mosquito    | I, [2018-09-09 22:34:12 +00:00 #923]  INFO -- : [EveToolsApi::Tasks::SendMail-mosquito:task:1536532452450:830] /usr/share/crystal/src/crystal/main.cr:97:5 in 'main_user_code'
mosquito    | I, [2018-09-09 22:34:12 +00:00 #923]  INFO -- : [EveToolsApi::Tasks::SendMail-mosquito:task:1536532452450:830] /usr/share/crystal/src/crystal/main.cr:86:7 in 'main'
mosquito    | I, [2018-09-09 22:34:12 +00:00 #923]  INFO -- : [EveToolsApi::Tasks::SendMail-mosquito:task:1536532452450:830] /usr/share/crystal/src/crystal/main.cr:106:3 in 'main'
mosquito    | I, [2018-09-09 22:34:12 +00:00 #923]  INFO -- : [EveToolsApi::Tasks::SendMail-mosquito:task:1536532452450:830] __libc_start_main
mosquito    | I, [2018-09-09 22:34:12 +00:00 #923]  INFO -- : [EveToolsApi::Tasks::SendMail-mosquito:task:1536532452450:830] _start
mosquito    | I, [2018-09-09 22:34:12 +00:00 #923]  INFO -- : [EveToolsApi::Tasks::SendMail-mosquito:task:1536532452450:830] ???
mosquito    | I, [2018-09-09 22:34:12 +00:00 #923]  INFO -- : Failure: task eve_tools_api::tasks::send_mail<mosquito:task:1536532452450:830> failed, taking 0.23s and will run again in 00:00:02 (at 2018-09-09 22:34:14 +00:00)
mosquito    | I, [2018-09-09 22:34:12 +00:00 #923]  INFO -- : Running task eve_tools_api::tasks::send_mail<mosquito:task:1536532438737:795> from eve_tools_api::tasks::send_mail

Address developer experience around Job#job_type

The current implementation for how to specify which queues a job should land in is opaque:

class GeneratorJob < QueuedJob
   def self.job_type
     "generator"
  end

  # ...etc
end

Should Job#job_type be renamed? Should a macro be provided to make that feature less verbose? At a minimum documentation should be added to the method so it's clear what it's supposed to do.

see #80 (comment)

Wrong log snippet in Wikipages

Reading logs paragraph on Wiki for this library and saw an error in the snippet code:

Should be: (replaced puts with log)

class PutsJob < Mosquito::QueuedJob
  def perform
    log "ohai background job"
  end
end

Unable to write a job with no params

Making a job worker with no params like this

class SyncStatusWorker < Mosquito::QueuedJob
  def perform
    # code
  end
end

SyncStatusWorker.new.enqueue

will throw this compile error

❯ crystal src/start_server.cr
Showing last frame. Use --error-trace for full trace.

In lib/mosquito/src/mosquito/queued_job.cr:107:14

 107 | task = build_task
              ^---------
Error: undefined local variable or method 'build_task' for SyncStatusWorker

To get around it, you can just call params in the head of the class.

Crystal version: 1.1.1
Mosquito Shard version: 0.11.0

[Feature request] add delayed jobs

It would be nice if it were possible to queue a delayed job. Maybe with the following API:

# src/<somewher>/<somefile>.cr
PutsJob.new(message: "ohai background job").enqueue(in: 30.seconds)
# and
PutsJob.new(message: "ohai background job").enqueue(at: Time.utc(2022, 1, 1, 0, 0,0))

in the first case it would save the job in redis with the timestamp Time.utc + 30.seconds and in the second it would save it with the timestamp of the new year, then when Time.utc is greater than or equal to that timestamp it would add the job to the queue.

Sidekiq has something like this already, but I like Mosquito more and would like to see something like that here.

Error: undefined method 'serialize_uuid' for SomeJob

Please include some details:

Using UUID in params seems to fail.

class SomeJob < Mosquito::QueuedJob
  params user_id : UUID

  def perform
  end
end
web          |  > 71 |               task.config["user_id"] = serialize_uuid(user_id)
web          |                                                ^-------------
web          | Error: undefined method 'serialize_uuid' for SomeJob

Crystal version: 0.36.1
Mosquito Shard version: 0.9.0+git.commit.5f28c5ee997ac40e0177743161db09ec8bd2e477

Provide a way to indicate that a job definition may have changed

Here's a theoretical timeline:

  • Job is defined, code is deployed to both worker and dispatcher
  • Jobs are enqueued
  • Job definition is changed, code is deployed to dispatcher
  • Worker should be able to continue to process old jobs and not process any new jobs

Currently mosquito has no code coverage for the scenario where a worker and a dispatcher have different jobs, or different versions of jobs.

Weird issue when kicking off multiple workers

I have this worker that kicks off 4 other workers when it runs. I noticed that when it would kick off, it was like it was picking and choosing which of those 4 it wanted to start running. I decided to move all 4 of those up a level, and still the same issue.

Basically the flow is

  • User logs in
  • If successful, kick off 4 separate workers
  • Redirect user to member's area.

Tailing the logs, I see that there's always at least 1 that runs, but it's literally any combination of the 4.

It basically looks like this:

if user
  PasswordWorker.new(user_id: user.id).enqueue
  EmailWorker.new(user_id: user.id).enqueue
  LoginWorker.new(user_id: user.id).enqueue
  PostbackWorker.new(user_id: user.id).enqueue
  redirect to: "/path"
end

Please include some details:

Crystal version: 0.31.1
Mosquito Shard version: commit: 0212a74

Change the params api

It would be nice if you could just pass the params you want directly into the perform method. Basically take the api from:

class MyWorker < Mosquito::QueuedJob
  params(param_one : String, other : String)

  def perform
  # ...
  end
end

to

class MyWorker < Mosquito::QueuedJob
  def perform(param_one : String, other : String)
  # ...
  end
end

Just a way to simplify the API a bit.

Replace job#succeeded/failed/executed with an enum

In several places the logic which determines the state of a job is getting complicated because the sate is held in several places. It would probably improve readability to combine #executed? #succeeded? and #failed? under an Enum:

    def run
      before_hook

      raise DoubleRun.new if executed
      @state = State::Running
      perform
    rescue JobFailed
      @state = State::Failed
    rescue e : DoubleRun
      raise e
    rescue e
      log "Job failed! Raised #{e.class}: #{e.message}"
      e.backtrace.each do |trace|
        log trace
      end

      @state = State::Failed
    else
      @state = State::Succeeded
    end

Provide API for inspecting the backend data

Currently inspecting the backend data requires querying the backend directly. It would be nice to be able to ask mosquito's backend for data about:

  • The current list of queues
  • The current list of job runs
  • A list of active runners
  • Statistics about a runner
    • How many tasks have been finished in the last hour,minute,second
    • How many tasks have succeed or failed in the last
  • How many tasks have been enqueued in the last hour/minute/second
  • What a runner is currently doing
  • JobRuns that have recently failed or succeeded
    • The parameters given to a job run
    • when it was started
    • how long it took to run

Prototyping for these features is being monkeypatched in the TUI Visualizer project.

Logo proposal

Hey @robacarp

I designed a logo for mosquito to contribute. If you want, i can send a pull-request. Also, Do you have a special color you want? I will wait for your comment. Best regards.

mosquito

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.