Git Product home page Git Product logo

sqewer's Introduction

An AWS SQS-based queue processor, for highly distributed job engines.

Build Status

The shortest introduction possible

In your environment, set SQS_QUEUE_URL. Then, define a job class:

class MyJob
  def run
   File.open('output', 'a') { ... }
  end
end

Then submit the job:

Sqewer.submit!(MyJob.new)

and to start processing, in your commandline handler:

#!/usr/bin/env ruby
require 'my_applicaion'
Sqewer::CLI.start

To add arguments to the job

class JobWithArgs
  include Sqewer::SimpleJob
  attr_accessor :times

  def run
    ...
  end
end
...
Sqewer.submit!(JobWithArgs.new(times: 20))

Submitting jobs from other jobs (the job will go to the same queue the parent job came from):

class MyJob
  def run(worker_context)
    ...
    worker_context.submit!(CleanupJob.new)
  end
end

The messages will only be deleted from SQS once the job execution completes without raising an exception.

Requirements

Ruby 3+, version 2 of the AWS SDK. You can also run Sqewer backed by a SQLite database file, which can be handy for development situations.

Job storage

Jobs are (by default) stored in SQS as JSON blobs. A very simple job ticket looks like this:

{"_job_class": "MyJob", "_job_params": null}

When this ticket is being picked up by the worker, the worker will do the following:

job = MyJob.new
job.run

So the smallest job class has to be instantiatable, and has to respond to the run message.

Jobs with arguments and parameters

Job parameters can be passed as keyword arguments. Properties in the job ticket (encoded as JSON) are directly translated to keyword arguments of the job constructor. With a job ticket like this:

{
  "_job_class": "MyJob",
  "_job_params": {"ids": [1,2,3]}
}

the worker will instantiate your MyJob class with the ids: keyword argument:

job = MyJob.new(ids: [1,2,3])
job.run

Note that at this point only arguments that are raw JSON types are supported:

  • Hash
  • Array
  • Numeric
  • String
  • nil/false/true

If you need marshalable Ruby types there instead, you might need to implement a custom Serializer.

Sqewer::SimpleJob

The module Sqewer::SimpleJob can be included to a job class to add some features, specially dealing with attributes, see more details here.

Jobs spawning dependent jobs

If your run method on the job object accepts arguments (has non-zero arity ) the ExecutionContext will be passed to the run method.

job = MyJob.new(ids: [1,2,3])
job.run(execution_context)

The execution context has some useful methods:

  • logger, for logging the state of the current job. The logger messages will be prefixed with the job's inspect.
  • submit! for submitting more jobs to the same queue

A job submitting a subsequent job could look like this:

class MyJob
  def run(ctx)
    ...
    ctx.submit!(DeferredCleanupJob.new)
  end
end

Job submission

In general, a job object that needs some arguments for instantiation must return a Hash from it's to_h method. The hash must include all the keyword arguments needed to instantiate the job when executing. For example:

class SendMail
  def initialize(to:, body:)
    ...
  end

  def run()
    ...
  end

  def to_h
    {to: @to, body: @body}
  end
end

Or if you are using simple Struct you could inherit your Job from it:

class SendMail < Struct.new(:to, :body, keyword_init: true)
  def run
    ...
  end
end

Job marshaling

By default, the jobs are converted to JSON and back from JSON using the Sqewer::Serializer object. You can override that object if you need to handle job tickets that come from external sources and do not necessarily conform to the job serialization format used internally. For example, you can handle S3 bucket notifications:

class CustomSerializer < Sqewer::Serializer
  # Overridden so that we can instantiate a custom job
  # from the AWS notification payload.
  # Return "nil" and the job will be simply deleted from the queue
  def unserialize(message_blob)
    message = JSON.load(message_blob)
    return if message['Service'] # AWS test
    return HandleS3Notification.new(message) if message['Records']

    super # as default
  end
end

Or you can override the serialization method to add some metadata to the job ticket on job submission:

class CustomSerializer < Sqewer::Serializer
  def serialize(job_object)
    json_blob = super
    parsed = JSON.load(json_blob)
    parsed['_submitter_host'] = Socket.gethostname
    JSON.dump(parsed)
  end
end

If you return nil from your unserialize method the job will not be executed, but will just be deleted from the SQS queue.

Starting and running the worker

The very minimal executable for running jobs would be this:

#!/usr/bin/env ruby
require 'my_applicaion'
Sqewer::CLI.start

This will connect to the queue at the URL set in the SQS_QUEUE_URL environment variable, and use all the default parameters. The CLI module will also set up a signal handler to terminate the current jobs cleanly if the commandline app receives a USR1 and TERM.

You can also run a worker without signal handling, for example in test environments. Note that the worker is asynchronous, it has worker threads which do all the operations by themselves.

worker = Sqewer::Worker.new
worker.start
# ...and once you are done testing
worker.stop

Configuring the worker

One of the reasons this library exists is that sometimes you need to set up some more things than usually assumed to be possible. For example, you might want to have a special logging library:

worker = Sqewer::Worker.new(logger: MyCustomLogger.new)

Or you might want a different job serializer/deserializer (for instance, if you want to handle S3 bucket notifications coming into the same queue):

worker = Sqewer::Worker.new(serializer: CustomSerializer.new)

You can also elect to inherit from the Worker class and override some default constructor arguments:

class CustomWorker < Sqewer::Worker
  def initialize(**kwargs)
    super(serializer: CustomSerializer.new, ..., **kwargs)
  end
end

The Sqewer::CLI module that you run from the commandline handler application can be started with your custom Worker of choice:

custom_worker = Sqewer::Worker.new(logger: special_logger)
Sqewer::CLI.start(custom_worker)

Threads versus processes

sqewer uses threads. If you need to run your job from a forked subprocess (primarily for memory management reasons) you can do so from the run method. Note that you might need to apply extra gymnastics to submit extra jobs in this case, as it is the job of the controlling worker thread to submit the messages you generate. For example, you could use a pipe. But in a more general case something like this can be used:

class MyJob
  def run
    pid = fork do
      SomeRemoteService.reconnect # you are in the child process now
      ActiveRAMGobbler.fetch_stupendously_many_things.each do |...|
      end
    end

    _, status = Process.wait2(pid)

    # Raise an error in the parent process to signal Sqewer that the job failed
    # if the child exited with a non-0 status
    raise "Child process crashed" unless status.exitstatus && status.exitstatus.zero?
  end
end

Execution and serialization wrappers (middleware)

You can wrap job processing in middleware. A full-featured middleware class looks like this:

class MyWrapper
  # Surrounds the job instantiation from the string coming from SQS.
  def around_deserialization(serializer, msg_id, msg_payload, msg_attributes)
    # msg_id is the receipt handle, msg_payload is the message body string, msg_attributes are the message's attributes
    yield
  end

  # Surrounds the actual job execution
  def around_execution(job, context)
    # job is the actual job you will be running, context is the ExecutionContext.
    yield
  end
end

You need to set up a MiddlewareStack and supply it to the Worker when instantiating:

stack = Sqewer::MiddlewareStack.new
stack << MyWrapper.new
w = Sqewer::Worker.new(middleware_stack: stack)

Execution guarantees

As a queue worker system, Sqewer makes a number of guarantees, which are as solid as the Ruby's ensure clause.

  • When a job succeeds (raises no exceptions), it will be deleted from the queue
  • When a job submits other jobs, and succeeds, the submitted jobs will be sent to the queue
  • When a job, or any wrapper routing of the job execution, raises any exception, the job will not be deleted
  • When a submit spun off from the job, or the deletion of the job itself, cause an exception, the job will not be deleted

Use those guarantees to your advantage. Always make your jobs horizontally repeatable (if two hosts start at the same job at the same time), idempotent (a job should be able to run twice without errors), and traceable (make good use of logging).

Usage with Rails via ActiveJob

This gem includes a queue adapter for usage with ActiveJob in Rails 5+. The functionality is well-tested and should function for any well-conforming ActiveJob subclasses.

To run the default sqewer worker setup against your Rails application, first set it as the executing backend for ActiveJob in your Rails app configuration, set your SQS_QUEUE_URL in the environment variables, and make sure you can access it using your default (envvar-based or machine role based) AWS credentials. Then, set sqewer as the adapter for ActiveJob:

class Application < Rails::Application
  ...
  config.active_job.queue_adapter = :sqewer
end

and then run

$ bundle exec sqewer_rails

in your rails source tree, via a foreman Procfile or similar. If you want to run your own worker binary for executing the jobs, be aware that you have to eager-load your Rails application's code explicitly before the Sqewer worker is started. The worker is threaded and any kind of autoloading does not generally play nice with threading. So do not forget to add this in your worker code:

Rails.application.eager_load!

For handling error reporting within your Sqewer worker, set up a middleware stack as described in the documentation.

ActiveJob feature support matrix

Compared to the matrix of features as seen in the official ActiveJob documentation sqewer has the following support for various ActiveJob options, in comparison to the builtin ActiveJob adapters:

|                   | Async | Queues | Delayed    | Priorities | Timeout | Retries |
|-------------------|-------|--------|------------|------------|---------|---------|
| sqewer            | Yes   | No     | Yes        | No         | No      | Global  |
|       //          |  //   |  //    |  //        | //         |  //     | //      |
| Active Job Async  | Yes   | Yes    | Yes        | No         | No      | No      |
| Active Job Inline | No    | Yes    | N/A        | N/A        | N/A     | N/A     |

Retries are set up globally for the entire SQS queue. There is no specific queue setting per job, since all the messages go to the queue available to Sqewer.submit!.

There is no timeout handling, if you need it you may want to implement it within your jobs proper. Retries are handled on Sqewer level for as many deliveries as your SQS settings permit.

Delay handling

Delayed execution is handled via a combination of the delay_seconds SQS parameter and the _execute_after job key (see the serializer documentation in Sqewer for more). In a nutshell - if you postpone a job by less than 900 seconds, the standard delivery delay option will be used - and the job will become visible for workers on the SQS queue only after this period.

If a larger delay is used, the job will receive an additional field called _execute_after, which will contain a UNIX timestamp in seconds of when it must be executed at the earliest. In addition, the maximum permitted SQS delivery delay will be set for it. If the job then gets redelivered, Sqewer will automatically put it back on the queue with the same maximum delay, and will continue doing so for as long as necessary.

Note that this will incur extra receives and sends on the queue, and even though it is not substantial, it will not be free. We think that this is an acceptable workaround for now, though. If you want a better approach, you may be better off using a Rails scheduling system and use a cron job or similar to spin up your enqueue for the actual, executable background task.

Frequently asked questions (A.K.A. why is it done this way)

This document tries to answer some questions that may arise when reading or using the library. Hopefully this can provide some answers with regards to how things are put together.

Why separate new and run methods instead of just perform?

Because the job needs access to the execution context of the worker. It turned out that keeping the context in global/thread/class variables was somewhat nasty, and jobs needed access to the current execution context to enqueue the subsequent jobs, and to get access to loggers (and other context-sensitive objects). Therefore it makes more sense to offer Jobs access to the execution context, and to make a Job a command object.

Also, Jobs usually use their parameters in multiple smaller methods down the line. It therefore makes sense to save those parameters in instance variables or in struct members.

Why keyword constructors for jobs?

Because keyword constructors map very nicely to JSON objects and provide some (at least rudimentary) arity safety, by checking for missing keywords and by allowing default keyword argument values. Also, we already have some products that use those job formats. Some have dozens of classes of jobs, all with those signatures and tests.

Why no weighted queues?

Because very often when you want to split queues servicing one application it means that you do not have enough capacity to serve all of the job types in a timely manner. Then you try to assign priority to separate jobs, whereas in fact what you need are jobs that execute roughly at the same speed - so that your workers do not stall when clogged with mostly-long jobs. Also, multiple queues introduce more configuration, which, for most products using this library, was a very bad idea (more workload for deployment).

Why so many configurable components?

Because sometimes your requirements differ just-a-little-bit from what is provided, and you have to swap your implementation in instead. One product needs foreign-submitted SQS jobs (S3 notifications). Another product needs a custom Logger subclass. Yet another product needs process-based concurrency on top of threads. Yet another process needs to manage database connections when running the jobs. Have 3-4 of those, and a pretty substantial union of required features will start to emerge. Do not fear - most classes of the library have a magic .default method which will liberate you from most complexities.

Why multithreading for workers?

Because it is fast and relatively memory-efficient. Most of the workload we encountered was IO-bound or even network-IO bound. In that situation it makes more sense to use threads that switch quickly, instead of burdening the operating system with too many processes. An optional feature for one-process-per-job is going to be added soon, for tasks that really warrant it (like image manipulation). For now, however, threads are working quite OK.

Why no Celluloid?

Because I found that a producer-consumer model with a thread pool works quite well, and can be created based on the Ruby standard library alone.

Contributing to the library

  • Check out the latest master to make sure the feature hasn't been implemented or the bug hasn't been fixed yet.
  • Check out the issue tracker to make sure someone already hasn't requested it and/or contributed it.
  • Fork the project.
  • Start a feature/bugfix branch.
  • Commit and push until you are happy with your contribution.
  • Make sure to add tests for it. This is important so I don't break it in a future version unintentionally.
  • Run your tests against a real SQS queue. You will need your tests to have permissions to create and delete SQS queues.
  • Please try not to mess with the Rakefile, version, or history. If you want to have your own version, or is otherwise necessary, that is fine, but please isolate to its own commit so I can cherry-pick around it.

Copyright

Copyright (c) 2016 WeTransfer. See LICENSE.txt for further details.

sqewer's People

Contributors

davidenko87 avatar dependabot[bot] avatar depfu[bot] avatar dsnipe avatar fabioperrella avatar grdw avatar idanci avatar julik avatar linkyndy avatar nickdowse avatar wjwh avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

sqewer's Issues

Proposal for an API redesign

We've had a good run with Sqewer, with 5 years of operation and billions of jobs being processed. There were good times and bad times, but it seems like we have a chance of meaningfully upgrading Sqewer into the next phase. I believe it is necessary so the various services that we run can benefit from more SQS features than previously, and also provide some cost savings.

I propose replacing the .submit! API we have at the moment and the configuration API.

Configuration API

The proposed configuration API looks like this:

Sqewer.configure do |config|
  config.sqs_endpoint_url = some_url # defaults to SQS_QUEUE_URL environment variable value
  config.serializer = MyCustomSerializer.new # defaults to Sqewer::Serializer.default
  config.logger = SomeLogger.new # defaults to Sqewer::Logger.default
  config.middleware_handlers = [some_middleware, another_middleware, yet_another_middleware]
  config.worker = MyCustomizedWorker.new # defaults to Sqewer::Worker.default
end

Reasoning: currently most of our services define a custom Sqewer worker class. The customizations that we apply in that custom class are pretty much in-scope for Sqewer itself though. I think that moving these to stable configuration options and allowing them to be configured from one place is appropriate.

Submission (message sending) API

Currently we have two ways of sending messages using Sqewer. One is using Sqewer.submit! which guarantees that your message will be delivered to the queue immediately, and the call will fail if that doesn't succeed. We also have the Executor object passed to every job's run() method, which also responds to .submit! with the same arguments. When an Executor is used, we apply batching to the messages generated during a single run of a job. This was done so that the if a job forward-spools descendant jobs, they would be delivered to the queue at once - saving money due to batching. When using bare Sqewer.submit! batching was not used for fear of "dropping jobs" if a process that has buffered some jobs terminates, but the messages have not been sent.

I propose changing the submission API in a way that prioritizes batching over guaranteed delivery to the queue, by removing the Executor objects entirely. Instead of providing submit! on an Executor and submit! on Sqewer itself with different semantics, we will instead provide submit! and submit_immediately! on Sqewer itself.

Sqewer.submit_immediately! would ensure that the jobs passed to it get serialized together (and if any job fails to serialize none get delivered to SQS) and then deliver them to SQS in batches before returning.

Sqewer.submit! would first serialize all jobs, if any of the jobs fails to serialize it would raise an exception before buffering them for sending. Once the jobs are converted into messages ready for sending, they will be added to an in-memory Queue. This queue would be emptied by a Thread running in the background at regular intervals - for instance once every 2 seconds. All messages picked up after this fixed interval would be delivered to SQS in batches.

The reasoning is as follows:

  • Even though we prioritised message delivery over batching, we still had a bug where this delivery and/or serialization could fail silently
  • Our ActiveJob adapter, which currently powers the main site, does not do batching at all because ActiveJob does not provide a "transaction" semantic for buffering the jobs during an execution of another job - or during the execution of another Rails unit of work for that matter. Using buffering for submit! would immediately enable batching for ActiveJob, allowing for substantial savings
  • Delivery can be wrapped in a separate error-tracking transaction, in all cases
  • The choice is clearly placed on the developer/consumer to make a choice for less efficient batching and delivery guarantee with submit_immediately! as opposed to sane default batching and a deferred failure with just submit!.

Very interested in your thoughts about this!

Submit hangs if the SQS endpoint cannot be reached

We have this when using fake_sqs and the daemon is not started up. It seems that the SDK does something that puts it into a no-timeout request mode, and it would be cool if we could kill that and let it fail non-gracefully if it is unable to connect. Currently it will just cause everything calling in to do a submit do a monumental busy wait.

Investigate if we're managing dependencies properly.

Describe the bug
Without some explicit require statements in spec_helper, tests become flaky and depend on the order of execution.

To Reproduce
Steps to reproduce the behavior:

  1. Checkout to an older version of sqewer, before 8.1.0 or comment out this line
  2. Run rspec spec/sqewer/connection_spec.rb:41
  3. See uninitialized constant Aws error

Expected behavior
sqewer should not explicitly depend on Aws::SQS

Retries for instance credentials

If running on an EC2 instance, the AWS SDK can get credentials from the instance metadata, but those are not always available due to random cloud failures. The default behavior is to have a one second timeout and zero (!) retries on failure. We should set a reasonable number of retries for this.

See also aws/aws-sdk-ruby#1301. Relevant creation of the SQS client object is at

def client
.

Configurable AWS SQS open and read timeout

The AWS SQS client by default uses the following timeouts:

  • http_open_timeout: 15 seconds
  • http_read_timeout: 60 seconds
  • http_idle_timeout: 5 seconds

For job submit Sqewer uses a retriable request, so we could set a shorter open timeout, as it will retry a number of times. Especially when SQS messages are submitted within the handling of a web request with a 30 second timeout, it would be nice to be able to configure the timeout and number of retries so that it will fit within that 30 seconds request timeout, so that an error will be logged in Appsignal.

Here's a link to an example where this appears to happen that a SQS message submit takes 15 seconds, likely hitting the AWS SQS Client open timeout after 15 seconds and then quickly succeeding on the retry:
https://wetransfer.atlassian.net/browse/AB-29?focusedCommentId=16417

This means that if it times out on the first and second attempt, that the request would already have exceeded the 30 seconds and be aborted.

So configuring a shorter timeout would allow more than 2 attempts, and result in faster response times.

Bumping the version

To test the improved error messages we need to roll a release. Give me your email that you are going to be using for publishing gems on rubygems.org - then I can add you as a maintaner.

Afterwards, bump the tiny version number and roll a release using the standard Bundler gem tasks. When the new version is on Rubygems we can do a bundle update sqewer on the dependent codebases to see it take effect. Good luck!

Refactor batch sends/receives

to use take instead of the various message buffer objects. It will be easier to replace the actual component taking to SQS in tests and reduce the amount of boilerplate needed to keep various components operational.

Can't use sqlite in memory for tests

When I run the tests of my application in a container, I need to use Sqlite in memory.

The way the methods Sqewer::Connection.default and Sqewer::LocalConnection are done, it's not possible to use it.

Sqewer::SimpleJob doesn't help adding new attributes to a Job class

Sometimes, we want to add a new attribute to a Job class, which includes Sqewer::SimpleJob, but it's not possible because Sqewer::SimpleJob.new raises an error when it detects that there are missing attributes.

Example:

Assuming that exists the following job:

class MyJob
  include Sqewer::SimpleJob
  attr_accessor :width

  def run
   puts 'run'
  end
end

And we want to add the attribute height:

class MyJob
  include Sqewer::SimpleJob
  attr_accessor :width, :height

  def run
   puts 'run'
  end
end

By doing this, when the new version is deployed, if there are old jobs enqueued, it will raise MissingAttribute because of this piece of code:

# lib/sqewer/simple_job.rb
accessors = methods.grep(EQ_END).map{|method_name| method_name.to_s.gsub(EQ_END, '\1').to_sym }
settable_attributes = Set.new(accessors)
missing_attributes = settable_attributes - touched_attributes

missing_attributes.each do | attr |
  raise MissingAttribute, "Missing job attribute #{attr.inspect}"
end

So, the only option to add a new attribute is to add a new job, duplicating the previous one, for example:

class MyJobV2
  include Sqewer::SimpleJob
  attr_accessor :width, :height

  def run
   puts 'run'
  end
end

This has the advantage of being safer when changing the job's payload, but it complicates the analysis if the PR because a whole new file has been added.

My suggestion is adding a method to allow changing this behaviour, such as:

class MyJob
  include Sqewer::SimpleJob
  allow_missing_attributes

  attr_accessor :width, :height

By doing this we can use the new attribute assuming that it may not exist, for example:

class MyJob
  include Sqewer::SimpleJob
  allow_missing_attributes # <-- new method

  attr_accessor :width, :height

 def run
   if height.present?
     # do something
   end
 end
end

wdyt?

cc @julik @linkyndy @nitika080289 @martijnvermaat @lorenzograndi4

Add LocalConnection

For a number of reasons, fake_sqs gets very CPU-hot with our pattern of polling for messages every second. We should implement some very lightweight file store backend LocalConnection that we can then use for development. If we decide to go with awsgo instead, we will have to implement polling and SendMessageBatch support there, which involves a lot of struct and err != nil. Given that Sqewer abstracts SQS communication into one place, we can leverage that abstraction to get rid of SQS-simulating services alltogether.

Wrong instruction in README?

I think there is something wrong with the instructions in README.

image

I followed them and I got the following error:

irb(main):016:1* class MyJob
irb(main):017:2*   def run
irb(main):018:2*     puts 'run'
irb(main):019:1*   end
irb(main):020:0> end
=> :run
irb(main):021:0> Sqewer.submit!(MyJob.new)
=> []
irb(main):022:0> Sqewer::CLI.run
Traceback (most recent call last):
        4: from /Users/fabioperrella/.rbenv/versions/2.7.1/bin/irb:23:in `<main>'
        3: from /Users/fabioperrella/.rbenv/versions/2.7.1/bin/irb:23:in `load'
        2: from /Users/fabioperrella/.rbenv/versions/2.7.1/lib/ruby/gems/2.7.0/gems/irb-1.2.7/exe/irb:11:in `<top (required)>'
        1: from (irb):22
NoMethodError (undefined method `run' for Sqewer::CLI:Module)

From what I saw, the method Sqewer::CLI.run doesn't exist and I'm almost sure it never existed.

I might be missing something ๐Ÿค”

@julik @linkyndy do you know about it?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.