Git Product home page Git Product logo

beaneater's Introduction

Beaneater

Build Status Coverage Status

Beaneater is the best way to interact with beanstalkd from within Ruby. Beanstalkd is a simple, fast work queue. Its interface is generic, but was originally designed for reducing the latency of page views in high-volume web applications by running time-consuming tasks asynchronously. Read the yardocs and/or the beanstalk protocol for more details.

Important Note: This README is for branch 1.0.x which is under development. Please switch to latest 0.x branch for stable version.

Why Beanstalk?

Illya has an excellent blog post Scalable Work Queues with Beanstalk and Adam Wiggins posted an excellent comparison.

You will find that beanstalkd is an underrated but incredibly powerful project that is extremely well-suited as a job or messaging queue. Significantly better suited for this task than Redis or a traditional RDBMS. Beanstalk is a simple, and fast work queue service rolled into a single binary - it is the memcached of work queues. Originally built to power the backend for the 'Causes' Facebook app, it is a mature and production ready open source project. PostRank has used beanstalk to reliably process millions of jobs a day.

A single instance of Beanstalk is perfectly capable of handling thousands of jobs a second (or more, depending on your job size) because it is an in-memory, event-driven system. Powered by libevent under the hood, it requires zero setup (launch and forget, à la memcached), optional log based persistence, an easily parsed ASCII protocol, and a rich set of tools for job management that go well beyond a simple FIFO work queue.

Beanstalkd supports the following features out of the box:

Feature Description
Easy Setup Quick to install, no files to edit, no settings to tweak.
Speed Process thousands of jobs per second without breaking a sweat.
Client Support Client libraries exist for over 21 languages including Python, Ruby, and Go.
Tubes Supports multiple work queues created automatically on demand.
Reliable Beanstalk’s reserve, work, delete cycle ensures reliable processing.
Scheduling Delay enqueuing jobs by a specified interval to be processed later.
Priorities Important jobs go to the head of the queue and get processed sooner.
Persistence Jobs are stored in memory for speed, but logged to disk for safe keeping.
Scalability Client-side federation provides effortless horizontal scalability.
Error Handling Bury any job which causes an error for later debugging and inspection.
Simple Debugging Talk directly to the beanstalkd server over telnet to get a handle on your app.
Efficiency Each beanstalkd process can handle tens of thousands of open connections.
Memory Usage Use the built-in ulimit OS feature to cap beanstalkd's memory consumption.

Keep in mind that these features are supported out of the box with beanstalk and requires no special ruby specific logic. In the end, beanstalk is the ideal job queue and has the added benefit of being easy to setup and configure.

Installation

Install beanstalkd:

Mac OS

brew install beanstalkd
beanstalkd -p 11300

Ubuntu

apt-get install beanstalkd
beanstalkd -p 11300

Install beaneater as a gem:

gem install beaneater

or add this to your Gemfile:

# Gemfile
gem 'beaneater'

and run bundle install to install the dependency.

Breaking Changes since 1.0!

Starting in 1.0, we removed the concept of the Beaneater::Pool which introduced considerable complexity into this gem.

  • Beginning from version 1.0.0 the support for Beaneater::Pool has been dropped. The specific feature may be supported again in the next versions as separate module or through a separate gem. If you want to use the pool feature you should switch to 0.x stable branches instead.
  • Jobs#find_all method has been removed, since it is no longer necessary.

To manage a pool of beanstalkd instances, we'd prefer to leave the handling to the developer or other higher-level libraries.

Quick Overview:

The concise summary of how to use beaneater:

# Connect to pool
@beanstalk = Beaneater.new('localhost:11300')
# Enqueue jobs to tube
@tube = @beanstalk.tubes["my-tube"]
@tube.put '{ "key" : "foo" }', :pri => 5
@tube.put '{ "key" : "bar" }', :delay => 3
# Process jobs from tube
while @tube.peek(:ready)
  job = @tube.reserve
  puts "job value is #{JSON.parse(job.body)["key"]}!"
  job.delete
end
# Disconnect the pool
@beanstalk.close

For a more detailed rundown, check out the Usage section below.

Usage

Configuration

To setup advanced options for beaneater, you can pass configuration options using:

Beaneater.configure do |config|
  # config.default_put_delay   = 0
  # config.default_put_pri     = 65536
  # config.default_put_ttr     = 120
  # config.job_parser          = lambda { |body| body }
  # config.job_serializer      = lambda { |body| body }
  # config.beanstalkd_url      = 'localhost:11300'
end

The above options are all defaults, so only include a configuration block if you need to make changes.

Connection

To interact with a beanstalk queue, first establish a connection by providing an address:

@beanstalk = Beaneater.new('10.0.1.5:11300')

# Or if ENV['BEANSTALKD_URL'] == '127.0.0.1:11300'
@beanstalk = Beaneater.new
@beanstalk.connection # => localhost:11300

You can conversely close and dispose of a connection at any time with:

@beanstalk.close

Tubes

Beanstalkd has one or more tubes which can contain any number of jobs. Jobs can be inserted (put) into the used tube and pulled out (reserved) from watched tubes. Each tube consists of a ready, delayed, and buried queue for jobs.

When a client connects, its watch list is initially just the tube named default. Tube names are at most 200 bytes. It specifies the tube to use. If the tube does not exist, it will be automatically created.

To interact with a tube, first find the tube:

@tube = @beanstalk.tubes.find "some-tube-here"
# => <Tube name='some-tube-here'>

To reserve jobs from beanstalk, you will need to 'watch' certain tubes:

# Watch only the tubes listed below (!)
@beanstalk.tubes.watch!('some-tube')
# Append tubes to existing set of watched tubes
@beanstalk.tubes.watch('another-tube')
# You can also ignore tubes that have been watched previously
@beanstalk.tubes.ignore('some-tube')

You can easily get a list of all, used or watched tubes:

# The list-tubes command returns a list of all existing tubes
@beanstalk.tubes.all
# => [<Tube name='foo'>, <Tube name='bar'>]

# Returns the tube currently being used by the client (for insertion)
@beanstalk.tubes.used
# => <Tube name='bar'>

# Returns a list tubes currently being watched by the client (for consumption)
@beanstalk.tubes.watched
# => [<Tube name='foo'>]

You can also temporarily 'pause' the execution of a tube by specifying the time:

tube = @beanstalk.tubes["some-tube-here"]
tube.pause(3) # pauses tube for 3 seconds

or even clear the tube of all jobs:

tube = @beanstalk.tubes["some-tube-here"]
tube.clear # tube will now be empty

In summary, each beanstalk client manages two separate concerns: which tube newly created jobs are put into, and which tube(s) jobs are reserved from. Accordingly, there are two separate sets of functions for these concerns:

  • use and using affect where 'put' places jobs
  • watch and watching control where reserve takes jobs from

Note that these concerns are fully orthogonal: for example, when you 'use' a tube, it is not automatically 'watched'. Neither does 'watching' a tube affect the tube you are 'using'.

Jobs

A job in beanstalk gets inserted by a client and includes the 'body' and job metadata. Each job is enqueued into a tube and later reserved and processed. Here is a picture of the typical job lifecycle:

   put            reserve               delete
  -----> [READY] ---------> [RESERVED] --------> *poof*

A job at any given time is in one of three states: ready, delayed, or buried:

State Description
ready waiting to be reserved and processed after being put onto a tube.
delayed waiting to become ready after the specified delay.
buried waiting to be kicked, usually after job fails to process

In addition, there are several actions that can be performed on a given job, you can:

  • reserve which locks a job from the ready queue for processing.
  • touch which extends the time before a job is autoreleased back to ready.
  • release which places a reserved job back onto the ready queue.
  • delete which removes a job from beanstalk.
  • bury which places a reserved job into the buried state.
  • kick which places a buried job from the buried queue back to ready.

You can insert a job onto a beanstalk tube using the put command:

@tube.put "job-data-here"

Beanstalkd can only stores strings as job bodies, but you can easily encode your data into a string:

@tube.put({:foo => 'bar'}.to_json)

Moreover, you can provide a default job serializer by setting the corresponding configuration option (job_serializer), in order to apply the encoding on each job body which is going to be send using the put command. For example, to encode a ruby object to JSON format:

Beaneater.configure do |config|
  config.job_serializer = lambda { |body| JSON.dump(body) }
end

Each job has various metadata associated such as priority, delay, and ttr which can be specified as part of the put command:

# defaults are priority 0, delay of 0 and ttr of 120 seconds
@tube.put "job-data-here", :pri => 1000, :delay => 50, :ttr => 200

The priority argument is an integer < 2**32. Jobs with a smaller priority take precedence over jobs with larger priorities. The delay argument is an integer number of seconds to wait before putting the job in the ready queue. The ttr argument is the time to run -- is an integer number of seconds to allow a worker to run this job.

Processing Jobs (Manually)

In order to process jobs, the client should first specify the intended tubes to be watched. If not specified, this will default to watching just the default tube.

@beanstalk = Beaneater.new('10.0.1.5:11300')
@beanstalk.tubes.watch!('tube-name', 'other-tube')

Next you can use the reserve command which will return the first available job within the watched tubes:

job = @beanstalk.tubes.reserve
# => <Beaneater::Job id=5 body="foo">
puts job.body
# prints 'job-data-here'
print job.stats.state # => 'reserved'

By default, reserve will wait indefinitely for the next job. If you want to specify a timeout, simply pass that in seconds into the command:

job = @beanstalk.tubes.reserve(5) # wait 5 secs for a job, then return
# => <Beaneater::Job id=5 body="foo">

You can 'release' a reserved job back onto the ready queue to retry later:

job = @beanstalk.tubes.reserve
# ...job has ephemeral fail...
job.release :delay => 5
print job.stats.state # => 'delayed'

You can also 'delete' jobs that are finished:

job = @beanstalk.tubes.reserve
job.touch # extends ttr for job
# ...process job...
job.delete

Beanstalk jobs can also be buried if they fail, rather than being deleted:

job = @beanstalk.tubes.reserve
# ...job fails...
job.bury
print job.stats.state # => 'buried'

Burying a job means that the job is pulled out of the queue into a special 'holding' area for later inspection or reuse. To reanimate this job later, you can 'kick' buried jobs back into being ready:

@beanstalk.tubes['some-tube'].kick(3)

This kicks 3 buried jobs for 'some-tube' back into the 'ready' state. Jobs can also be inspected using the 'peek' commands. To find and peek at a particular job based on the id:

@beanstalk.jobs.find(123)
# => <Beaneater::Job id=123 body="foo">

or you can peek at jobs within a tube:

@tube = @beanstalk.tubes.find('foo')
@tube.peek(:ready)
# => <Beaneater::Job id=123 body="ready">
@tube.peek(:buried)
# => <Beaneater::Job id=456 body="buried">
@tube.peek(:delayed)
# => <Beaneater::Job id=789 body="delayed">

When dealing with jobs there are a few other useful commands available:

job = @beanstalk.tubes.reserve
print job.tube      # => "some-tube-name"
print job.reserved? # => true
print job.exists?   # => true
job.delete
print job.exists?   # => false

Processing Jobs (Automatically)

Instead of using watch and reserve, you can also use the higher level register and process methods to process jobs. First you can 'register' how to handle jobs from various tubes:

@beanstalk.jobs.register('some-tube', :retry_on => [SomeError]) do |job|
  do_something(job)
end

@beanstalk.jobs.register('other-tube') do |job|
  do_something_else(job)
end

Once you have registered the handlers for known tubes, calling process! will begin a loop processing jobs as defined by the registered processor blocks:

@beanstalk.jobs.process!

Processing runs the following steps:

  1. Watch all registered tubes
  2. Reserve the next job
  3. Once job is reserved, invoke the registered handler based on the tube name
  4. If no exceptions occur, delete the job (success)
  5. If 'retry_on' exceptions occur, call 'release' (retry)
  6. If other exception occurs, call 'bury' (error)
  7. Repeat steps 2-5

The process! command is ideally suited for a beanstalk job processing daemon. Even though process! is intended to be a long-running process, you can stop the loop at any time by raising AbortProcessingError while processing is running.

Handling Errors

While using Beaneater, certain errors may be encountered. Errors are encountered when a command is sent to beanstalk and something unexpected happens. The most common errors are listed below:

Errors Description
Beaneater::NotConnected Client connection to beanstalk cannot be established.
Beaneater::InvalidTubeName Specified tube name for use or watch is not valid.
Beaneater::NotFoundError Specified job or tube could not be found.
Beaneater::TimedOutError Job could not be reserved within time specified.
Beaneater::JobNotReserved Job has not been reserved and action cannot be taken.

There are other exceptions that are less common such as OutOfMemoryError, DrainingError, DeadlineSoonError, InternalError, BadFormatError, UnknownCommandError, ExpectedCRLFError, JobTooBigError, NotIgnoredError. Be sure to check the beanstalk protocol for more information.

Stats

Beanstalk has plenty of commands for introspecting the state of the queues and jobs. To get stats for beanstalk overall:

# Get overall stats about the job processing that has occurred
print @beanstalk.stats
# => #<Beaneater::StatStruct current_jobs_urgent=0, current_jobs_ready=0, current_jobs_reserved=0, current_jobs_delayed=0, current_jobs_buried=0, ...

print @beanstalk.stats.current_tubes
# => 1

For stats on a particular tube:

# Get statistical information about the specified tube if it exists
print @beanstalk.tubes['some_tube_name'].stats
# => { 'current_jobs_ready': 0, 'current_jobs_reserved': 0, ... }

For stats on an individual job:

# Get statistical information about the specified job if it exists
print @beanstalk.jobs[some_job_id].stats
# => {'age': 0, 'id': 2, 'state': 'reserved', 'tube': 'default', ... }

Be sure to check the beanstalk protocol for more details about the stats commands.

Resources

There are other resources helpful when learning about beanstalk:

Contributors

beaneater's People

Contributors

aethelflaed avatar albb0920 avatar alup avatar avgerin0s avatar bfolkens avatar carlosmoutinho avatar ctrochalakis avatar edwardbetts avatar funkyboy avatar ifiht avatar kml avatar macobo avatar manuelmeurer avatar nesquena avatar nicolasleger avatar nicotaing avatar pond avatar r3jack avatar rochefort avatar tank-bohr avatar tdg5 avatar tjchambers avatar utilum avatar vad4msiu avatar vidarh avatar yahonda avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

beaneater's Issues

ENV for the URL :: not working?

When I try to call Beaneater.new with teh env variable set I get this:

ArgumentError: wrong number of arguments (given 0, expected 1)
/path/vendor/bundle/ruby/2.5.0/gems/beaneater-1.0.0/lib/beaneater.rb:24:in `initialize'

I assume that's not intended. When I adjusted both of these:

/path/vendor/bundle/ruby/2.5.0/gems/beaneater-1.0.0/lib/beaneater/connection.rb: def initialize(address)
/path/vendor/bundle/ruby/2.5.0/gems/beaneater-1.0.0/lib/beaneater.rb: def initialize(address)

To be address = nil it works. However, I don't know how we want to solve that problem. At the moment I'm solving it like this:

Beaneater.new(ENV['BEANSTALKD_URL'])

Which I doubt is the intended behavior. I'm assuming no one uses this ENV at this point as it seems to not work.

Beaneater is not throwing proper exception when the beanstalk-server dies

I have a simple Rack application that does this:

require 'beaneater'

@beanstalk = Beaneater::Pool.new(['localhost:11300'])

if @beanstalk
  begin
    tube = @beanstalk.tubes["process_file"]
    work_file = original_path
    tube.put path, {:ttr => 60 * 10}
  rescue Beaneater::NotConnected => e
    # if errors are met when dealing with beanstalk, we disable the connection
    @beanstalk = false
    logger.warn "Disabling the beanstalk-connection"
  end
end

If I kill the beanstalk-server after my app has connected to it, I get this error:

NoMethodError: undefined method `chomp' for nil:NilClass
  /Users/kasper/.rbenv/versions/1.9.3-p374/lib/ruby/gems/1.9.1/gems/beaneater-0.3.0/lib/beaneater/connection.rb:112:in `parse_response'
  /Users/kasper/.rbenv/versions/1.9.3-p374/lib/ruby/gems/1.9.1/gems/beaneater-0.3.0/lib/beaneater/connection.rb:53:in `transmit'
  /Users/kasper/.rbenv/versions/1.9.3-p374/lib/ruby/gems/1.9.1/gems/beaneater-0.3.0/lib/beaneater/pool.rb:111:in `block in transmit_to_rand'
  /Users/kasper/.rbenv/versions/1.9.3-p374/lib/ruby/gems/1.9.1/gems/beaneater-0.3.0/lib/beaneater/pool.rb:141:in `safe_transmit'
  /Users/kasper/.rbenv/versions/1.9.3-p374/lib/ruby/gems/1.9.1/gems/beaneater-0.3.0/lib/beaneater/pool.rb:109:in `transmit_to_rand'
  /Users/kasper/.rbenv/versions/1.9.3-p374/lib/ruby/gems/1.9.1/gems/beaneater-0.3.0/lib/beaneater/pool_command.rb:40:in `method_missing'
  /Users/kasper/.rbenv/versions/1.9.3-p374/lib/ruby/gems/1.9.1/gems/beaneater-0.3.0/lib/beaneater/tube/record.rb:39:in `block in put'
  /Users/kasper/.rbenv/versions/1.9.3-p374/lib/ruby/gems/1.9.1/gems/beaneater-0.3.0/lib/beaneater/tube/record.rb:153:in `safe_use'
  /Users/kasper/.rbenv/versions/1.9.3-p374/lib/ruby/gems/1.9.1/gems/beaneater-0.3.0/lib/beaneater/tube/record.rb:35:in `put'
  /Users/kasper/projects/secret_project/lib/secret_project.rb:210:in `process_file'
  /Users/kasper/projects/secret_project/lib/secret_project.rb:154:in `block in <class:SecretProject>'

It should throw the Beaneater::NotConnected-exception, right?

Steps to reproduce:

  1. Start the beanstalk-server.
  2. Start your app and let it connect.
  3. Kill the beanstalk-server.
  4. You should see the before-mentioned issue.

License missing from gemspec

RubyGems.org doesn't report a license for your gem. This is because it is not specified in the gemspec of your last release.

via e.g.

spec.license = 'MIT'
# or
spec.licenses = ['MIT', 'GPL-2']

Including a license in your gemspec is an easy way for rubygems.org and other tools to check how your gem is licensed. As you can imagine, scanning your repository for a LICENSE file or parsing the README, and then attempting to identify the license or licenses is much more difficult and more error prone. So, even for projects that already specify a license, including a license in your gemspec is a good practice. See, for example, how rubygems.org uses the gemspec to display the rails gem license.

There is even a License Finder gem to help companies/individuals ensure all gems they use meet their licensing needs. This tool depends on license information being available in the gemspec. This is an important enough issue that even Bundler now generates gems with a default 'MIT' license.

I hope you'll consider specifying a license in your gemspec. If not, please just close the issue with a nice message. In either case, I'll follow up. Thanks for your time!

Appendix:

If you need help choosing a license (sorry, I haven't checked your readme or looked for a license file), GitHub has created a license picker tool. Code without a license specified defaults to 'All rights reserved'-- denying others all rights to use of the code.
Here's a list of the license names I've found and their frequencies

p.s. In case you're wondering how I found you and why I made this issue, it's because I'm collecting stats on gems (I was originally looking for download data) and decided to collect license metadata,too, and make issues for gemspecs not specifying a license as a public service :). See the previous link or my blog post about this project for more information.

Configurable tube defaults

Here's an example:

Beaneater::Tube.configure do |c|
  c.default_delay = 0
  c.default_priority = 65536 
  c.default_ttr = 120
end

Better pool connection fail handling

As per @kr suggestion, if a connection in the pool has an error, it's probably best to close
it, then attempt to reconnect periodically.

Also perhaps be able to add new connections to the pool as well?

# different job_serializer/job_parser function by tube

I using Beanstalk as primary message queue with Beaneater. In my scenario, every service got its own Beanstalkd Tube.

As system growing, There is an idea about using optimized job serializer/parser for different service to get better performance.

Any suggestion :D?

How to gracefully stop a worker

While there is plenty of information how to interact with Beanstalk I could not find details regarding how to implement a production grade worker with beaneater.

Mainly I'm concerned with how to stop a worker gracefully when, for example, it is restarted/stopped via systemd. As far as my understanding is I will want to let the worker finish it's job before I stop it. How can I accomplish this?

My current approach looks like this. It works if the worker is currently doing work, it will stop after the job. But if the worker is currently doing nothing it will stop only after the next job is processed. What am I doing wrong?

Thanks for your help.

#!/usr/bin/env ruby

require 'rubygems'
require 'bundler/setup'
require 'beaneater'

beanstalk = Beaneater.new '127.0.0.1:11300'
tube_name = "app.default"

beanstalk.jobs.register(tube_name) do |job|
  # Do the actual work.
end

trap 'SIGTERM' do
  beanstalk.jobs.stop!
end

trap 'SIGINT' do
  beanstalk.jobs.stop!
end

beanstalk.jobs.process!
beanstalk.close

Beaneater::Jobs#process shouldn't rescue StandardError

I was trying to figure out why I got "lib/beaneater/job/record.rb:39:in bury': undefined method pri' for nil:NilClass (NoMethodError)" when using beaneater.

After some digging, it turns out my config.job_parser is faulty, the exception was caught by beaneater, then beaneater tries to bury the job, it will fail like that due to job id being nil.

rescue StandardError # handles unspecified errors

Maybe beaneater shouldn't resuce StandardError?

jobs are not reserved but the tube is being watched

I'm using @beanstalk.jobs.process! to automatically process the jobs. But I find that, after some time, my scripts are not getting any jobs.

Issuing stats-tube command shows that there are x number of clients watching the tube, but there are 0 clients in waiting state. The number of jobs keep increasing, but none of the clients receives any job.

Sample out of stats-tube below

stats-tube email.send.v1
OK 263

---
name: email.send.v1
current-jobs-urgent: 0
current-jobs-ready: 10
current-jobs-reserved: 0
current-jobs-delayed: 0
current-jobs-buried: 0
total-jobs: 196704
current-using: 0
current-watching: 1
current-waiting: 0
cmd-pause-tube: 0
pause: 0
pause-time-left: 0

If I stop and start my scripts, jobs get processed. Any input on this will helpful

using colon in one parameter causes a weird replacement by underscores

I'm using ruby 2.6.6, and when changed from beaneater 1.1.1 to 1.1.2, got a json parse problem.

If I use colon in one of the parameters, I get this error (removing the colon will have success):

[2022-10-14 15:46:09] #00 ERROR: Exception Backburner::Job::JobFormatInvalid -> Job body could not be parsed: #<JSON::ParserError: 859: unexpected token at '{"action_list":["reconfigure_journals"],"app_digest":"","brand":"toconline","destination_tube":"accounting-ops-reconfigure","entity_id":"265843","entity_schema":"pt999999990_c265843","entity_variable_ids":"","fiscal_year_name":"Exercício de 2022","id":"3376","module_mask":"511","notification_title":"Redefinir configuração: Exercício de 2022","product"_"toconline","role_mask"_"32772","sharded_schema"_"pt999999990_c265843","source_company_id"_63571,"source_prefix"_"y2022_12_","source_schema"_"pt221170391_97586","source_user_id"_null,"subentity_id"_"pt999999990_16_1#y2022_1_","subentity_prefix"_"y2022_1_","subentity_schema"_"pt999999990_16_1","ttr"_21600,"tube"_"job_controller","user_email"_"[email protected]","user_id"_"750063","validity"_21600,"x_brand"_"toconline","x_product"_"toconline"}
'>
   /Users/joana/.rbenv/versions/2.6.6/lib/ruby/2.6.0/delegate.rb:85:in `call'
   /Users/joana/.rbenv/versions/2.6.6/lib/ruby/2.6.0/delegate.rb:85:in `method_missing'
   /Users/joana/.rbenv/versions/2.6.6/gemsets/toconline-rubies-jobs/gems/backburner-1.6.0/lib/backburner/job.rb:30:in `rescue in initialize'
   /Users/joana/.rbenv/versions/2.6.6/gemsets/toconline-rubies-jobs/gems/backburner-1.6.0/lib/backburner/job.rb:22:in `initialize'
   /Users/joana/.rbenv/versions/2.6.6/gemsets/toconline-rubies-jobs/gems/backburner-1.6.0/lib/backburner/worker.rb:178:in `new'
   /Users/joana/.rbenv/versions/2.6.6/gemsets/toconline-rubies-jobs/gems/backburner-1.6.0/lib/backburner/worker.rb:178:in `reserve_job'
   /Users/joana/work/sp-job/lib/sp/job/worker.rb:52:in `work_one_job'
   /Users/joana/work/sp-job/lib/sp/job/worker.rb:30:in `block in start'
   /Users/joana/work/sp-job/lib/sp/job/worker.rb:28:in `loop'
   /Users/joana/work/sp-job/lib/sp/job/worker.rb:28:in `start'
   /Users/joana/.rbenv/versions/2.6.6/gemsets/toconline-rubies-jobs/gems/backburner-1.6.0/lib/backburner/worker.rb:60:in `start'
   /Users/joana/.rbenv/versions/2.6.6/gemsets/toconline-rubies-jobs/gems/backburner-1.6.0/lib/backburner.rb:40:in `work'
   /Users/joana/work/toconline/jobs/toc-fast/toc-fast:47:in `<top (required)>'

With the following job submitted:

Screenshot 2022-10-14 at 16 44 23

ENV['BEANSTALKD_URL'] not working?

When I try to call Beaneater.new with teh env variable set I get this:

ArgumentError: wrong number of arguments (given 0, expected 1)
/path/vendor/bundle/ruby/2.5.0/gems/beaneater-1.0.0/lib/beaneater.rb:24:in `initialize'

I assume that's not intended. When I adjusted both of these:

/path/vendor/bundle/ruby/2.5.0/gems/beaneater-1.0.0/lib/beaneater/connection.rb: def initialize(address)
/path/vendor/bundle/ruby/2.5.0/gems/beaneater-1.0.0/lib/beaneater.rb: def initialize(address)

To be address = nil it works. However, I don't know how we want to solve that problem. At the moment I'm solving it like this:

Beaneater.new(ENV['BEANSTALKD_URL'])

Which I doubt is the intended behavior. I'm assuming no one uses this ENV at this point as it seems to not work.

Read bytes size from results

Need to fix the issue where if there're multiple beaneater (telnet) connections, that could mix two responses from beanstalkd.

Stop trying to distinguish different response with regex (ref) and read byte size instead

Unexpected behavior when using multiple beanstalkd connections

When you create a Beaneater::Pool with multiple connections, you sometimes see unexpected behavior on some boundary conditions.

I included some Ruby code below to demonstrate the situation.

The gist of the issue is Pool hides the fact that you are working with multiple connections. If you add data to a tube in one connection, calls such as Tube#stats might throw a NotFoundError when trying to retrieve stats on a tube that does not exist for the next connection. Or Jobs#find might return nil if checking on a connection that did not have a job of that id inserted.

require 'beaneater'

#beanstalk = Beaneater::Pool.new(['localhost:11300'])
beanstalk = Beaneater::Pool.new(['localhost:11300', 'localhost:11400'])

tube1 = beanstalk.tubes['test']
response = tube1.put '{"name":"A"}'
id1 = response[:id]
puts "Added Job #{id1}"

tube2 = beanstalk.tubes['test2']
response = tube2.put '{"name":"B"}'
id2 = response[:id]
puts "Added Job #{id2}"

puts beanstalk.tubes['test'].stats
puts beanstalk.tubes['test2'].stats
puts beanstalk.jobs.find(id1)
puts beanstalk.jobs.find(id2)

This was an issue with the original beanstalk-client gem as well.
I had a hideous monkey patch in beanstalkd_view to get around it: https://github.com/denniskuczynski/beanstalkd_view/blob/8e726f2a280538b5a8cc4be1f5428b3079b53e81/lib/beanstalkd_view/extensions/beanstalk-pool.rb

But perhaps with the new gem we can find a better solution.

Something along the lines of having Pool#safe_transmit always rescue from NotFoundErrors -- only throwing the exception if NotFound on all connections might work for most cases.

The new gem looks great by the way. I've already converted my beanstalkd_view gem to use it.

Let me know what you think,
Thanks,
Dennis

Is there a broadcast?

I'm looking for a shortcut to transmit_to_all. Essentially a way to broadcast messages to anyone watching. The usecase I'm in is transmitting progress for jobs but I'll have several watchers on it.

Technically I'm getting stale watchers and the jobs are getting put into no-where. I'm trying to work that out (possibly an issue with ActionController::Live not ever disconnecting on a refresh.

Automated processing and signal handling

I wrote a small script to multiplex a tube into multiple other tubes: https://github.com/martint17r/beanstalk-multiplex

I would like to harden it against data loss, i.e. when the script gets interrupted, it should still process the current job and only after finishing it, exit the processing loop and close the connection to the beanstalkd.
If there is no job being processed it would be best to close the connection and immediately exit.

I tried using trap(...) but it requires some jumping through burning hoops to bring the signal into the processing loop.

What is the best way to achieve proper signal handling?

Process! is not propagating Beaneater::NotConnected exception.

As mentioned in the title, the process! method does not propagate or break from the while loop when there is Beaneater::NotConnected exception i.e when the Beanstalkd goes down.

Steps to reproduce:

  1. Start beanstalk: beanstalkd -l 192.168.50.21 -p 11300 &
  2. Put messages to the queue.
  3. Start the consumer.
@beanstalk = Beaneater.new('192.168.50.21:11300')
begin
  @beanstalk.jobs.register('my-tube') do |job|
      puts "Job: #{job.inspect}"
    end
  @beanstalk.jobs.process!({:reserve_timeout => 10})
rescue Exception => e
  puts "Exception: #{e.inspect}"
  @beanstalk.close
end
  1. Stop the beanstalk server.
  2. The consumer is now hung as the process! method keeps retrying considering Beaneater::NotConnected as a StandardError.

Beaneater is eating exceptions

I have a simple worker like this:

require 'subexec'
require 'beaneater'

@beanstalk = Beaneater::Pool.new(['localhost:11300'])

@beanstalk.jobs.register('do.job') do |job|
  puts "Error out!"
  STDOUT.flush
  sleep 3
  raise "errors?"
end

@beanstalk.jobs.process!

And this Procfile:

beanstalkd:  beanstalkd -p 11300
worker_1:   bundle exec ruby lib/project/beanstalk_worker.rb
worker_2:   bundle exec ruby lib/project/beanstalk_worker.rb
[project (features/beanstalk_queue)]=> foreman start
15:08:31 beanstalkd.1 | started with pid 15270
15:08:31 worker_1.1  | started with pid 15271
15:08:31 worker_2.1  | started with pid 15272
15:08:35 worker_1.1  | Error out!
15:08:36 worker_2.1  | Error out!

Is it intentional that the workers don't throw the exceptions or at least notify about them?

Segmentation faults

Sometimes, it seems when the workers are very busy we get segmentation faults.

/data/server/vendor/bundle/ruby/2.4.0/gems/beaneater-1.0.0/lib/beaneater/tube/collection.rb:106:in watched': undefined method map' for #String:0x00000000074411f8 (NoMethodError)

Is the following returning a string instead of an array?

last_watched = transmit('list-tubes-watched')[:body]

Better handling of Pool and job reserve for multiple connections

There is an issue with the pool concept and then with put and reserve. We should change beaneater_test with https://github.com/beanstalkd/beaneater/blob/master/test/beaneater_test.rb#L6 and test with multiple connections.

The issue is around how reserving works in this case:

@beanstalk = Beaneater::Pool.new(['127.0.0.1:11300', '127.0.0.1:11301'])
@tube = @beanstalk.tubes["foo"]
@tube.put("bar") # <-- put onto connection one
@tube.put("baz") # <-- put onto connection one

So now we have two jobs on connection one. If we now call

@tube.reserve

Reserve could randomly pick connection two in which case there are no jobs and it would hang forever. Also other variations of this problem when 2 jobs fall into one connection and one into another and calling reserve will eventually start to hang up even though the other connection still has jobs.

Add YARD docs for all methods

We need good YARD docs for all methods in beaneater. Then we can generate YARD docs on rdoc.info and provide solid docs for users. Example of good docs:

# Summary of what this does
#
# @!attribute [r] count
# @yield [a, b, c] Gives 3 random numbers to the block
# @param [type] Name description
# @option name [Types] option_key (default_value) description
# @return [type] description
# @raise [Types] description
# @example
#  something.foo # => "test"
#

Newlines are not handled correctly

Beaneater doesn't deal with newlines very well:

beanstalk = Beaneater::Pool.new(['localhost:11300'])
tube = beanstalk.tubes['my-tube']

payload = "foo\nbar"
tube.put payload
/usr/local/share/gems/gems/beaneater-0.2.2/lib/beaneater/connection.rb:116:in `parse_response': Response failed with: EXPECTED_CRLF (Beaneater::ExpectedCrlfError)
    from /usr/local/share/gems/gems/beaneater-0.2.2/lib/beaneater/connection.rb:55:in `transmit'
    from /usr/local/share/gems/gems/beaneater-0.2.2/lib/beaneater/pool.rb:111:in `block in transmit_to_rand'
    from /usr/local/share/gems/gems/beaneater-0.2.2/lib/beaneater/pool.rb:141:in `safe_transmit'
    from /usr/local/share/gems/gems/beaneater-0.2.2/lib/beaneater/pool.rb:109:in `transmit_to_rand'
    from /usr/local/share/gems/gems/beaneater-0.2.2/lib/beaneater/pool_command.rb:40:in `method_missing'
    from /usr/local/share/gems/gems/beaneater-0.2.2/lib/beaneater/tube/record.rb:39:in `block in put'
    from /usr/local/share/gems/gems/beaneater-0.2.2/lib/beaneater/tube/record.rb:153:in `safe_use'
    from /usr/local/share/gems/gems/beaneater-0.2.2/lib/beaneater/tube/record.rb:35:in `put'
    from ./producer.rb:27:in `<main>'

(By the way, that's after patching errors.rb: http://fpaste.org/KKAV/)

I looked at what actually went over the wire, and \n got converted into \r\n, but the job size was still payload.length.

Using "foo\r\nbar" doesn't work either, this will be converted to \r\r\n.

Using Hash#slice forces ruby 2.5+

The slice method was added in ruby 2.5+. On ruby 2.3 it causes this error:

NoMethodError: undefined method `slice' for {}:Hash
        /var/lib/gems/2.3.0/gems/beaneater-1.1.1/lib/beaneater/connection.rb:73:in `transmit'
        /var/lib/gems/2.3.0/gems/beaneater-1.1.1/lib/beaneater/tube/collection.rb:33:in `transmit'
        /var/lib/gems/2.3.0/gems/beaneater-1.1.1/lib/beaneater/tube/collection.rb:79:in `all'

If that's deliberate, ok, but I would expect explicit mention on README. If it's not (and seems it's only slice in whole code), you may want to re-think 13e9791

Weighted queues

Hi there,
First, thanks for writing this excellent software - I've been using it for 10+ years and processed literally billions of jobs with it.

I recently started using weighted queues in another project which uses Sidekiq and I really enjoy not having to worry about fine turning priorities and running multiple job processors to avoid queue starvation. It's nice to get a predictable amount of processing for every queue.

Is there any way to achieve something similar with Beanstalkd? Essentially, selecting from job queues in a weighted random fashion?

I imagine I could rig up a system that uses weights to randomly select a queue, peak to see if there are jobs ready and if so, reserve a job from that queue. Rough code below:

pipes = [{pipe: 'low_priority', weight: 1}, {pipe: 'medium_priority', weight: 2}, {pipe: 'high_priority', weight: 4}]

q_min = pipes.min_by {|q| q.dig(:weight)}.dig(:weight)
q_max = pipes.inject(0) {|r, q| r + q.dig(:weight) }
range = q_min..q_max

loop do
  ## Randomly select a queue based on weights and assign queue name to pipe_to_process

  q_rand = Random.rand(range)
  q_accumulate = 0
  pipe_to_process = pipes.find do |q| 
    q_accumulate += q.dig(:weight)
    q_accumulate >= q_rand
  end

  pipe = pipe_to_process.dig(:pipe)
  
  unless beanstalk.tubes.find(pipe).peek(:ready).nil?
    puts "Getting job from #{pipe}"
    beanstalk.tubes.watch!(pipe)
    job = beanstalk.tubes.reserve(1)
    puts "Got job: #{job.id} : tube: #{pipe}"
    job.release delay: 5
  else
    puts "No jobs in #{pipe}"
  end
end

Not ideal to wait on a queue that might be empty ( if running multiple processors ). And when there are no jobs it thrashes between all the queues. Any thoughts on better ways to get weighted queues?

'bury' on StandardError in Process! needs checking

in the collection.rb https://github.com/beanstalkd/beaneater/blob/master/lib/beaneater/job/collection.rb#L98 when a StandardError is caught, before job.bury is used the job needs to be checked like in the ensure clause otherwise it may lead to try to bury a job which has already been deleted.

This may occur when

@beanstalk.jobs.register('whatever', :retry_on => [Timeout::Error]) do |job|
  process(job)
  screw_up_here_and_raise_some_exception()
end

in which case a weird Beaneater::NotFoundError: Response failed with: NOT_FOUND will percolate up from the parse_response(cmd, res) when it tries to send the bury command (which is very confusing)

Is there a "queue is empty" hook?

I have a bunch of tasks that belong to a user and I'd like to notify him via email when they are all done. My idea is to create a tube, watch it until it's empty, then delete it and send the email.
Is there a way to hook up with a "queue x is empty" event?

Beaneater.new requires one argument

client = Beaneater.new

raises

beaneater-1.0.0/lib/beaneater.rb:24:in `initialize': wrong number of arguments (0 for 1) (ArgumentError)

But according to the documentation, it should default to the configuration or then environment variable.

Automated retries

Automatically try to reconnect on Beaneater::NotConnected instead of throwing an error.

Duplicate jobs in the queue

Our system process many jobs from the queue and there are times that those jobs were not yet finish processing. There is a chance that our system will put jobs with the same name of the jobs that are currently process.

Is there a beaneater checker that will tell us that the job with the same name is already in the queue before we add it in the queue?

Thanks,
Michael

body of tube.put method is always empty

So I run a beanstalkd instance with
'''
beanstalkd -b ~/beanstore &
'''
Then I have the following code:
'''ruby
require 'beaneater'
bean = Beaneater::Pool.new(['0.0.0.0:11300'])
tube = bean.tubes['msg-tube']
tube.put "5"
'''
Which is pretty standard beaneater code, but the put method returns the following
'''ruby
=> {:status=>"INSERTED", :body=>nil, :id=>"2", :connection=>#<Beaneater::Connection host="0.0.0.0" port=11300>}
'''
I have a loop running in another process waiting on input, and it does receive a connection and tries to process the job, but since their is nothing in the body it fails

Ideas

Get Number of Ready Jobs Enqueued into a Tube

Hi @nesquena, I developed a code that inserts tube's job into an array.

This is my code that inserts a job to an array.

beanstalk = Beaneater.new(Beaneater.configuration.beanstalkd_url)
tube = beanstalk.tubes["my-tube"]
array = Array.new
loop do 
	@array << tube.reserve
	break if @array.size == 100
end

Is there a way to get the number of ready jobs in a tube?

I was thinking to implement like this.

loop do 
	@array << tube.reserve
	break if @array.size == tube.size
end

Thanks,
Michael

Job is nil when burying

I am not sure if this is related to #28, but we see something the following in our logs:

17:59:12 jobs.1 | /Users/amos/.gem/ruby/2.1.3/gems/beaneater-0.3.3/lib/beaneater/job/collection.rb:107:in `rescue in block in process!': undefined method `bury' for nil:NilClass (NoMethodError)
17:59:12 jobs.1 |       from /Users/amos/.gem/ruby/2.1.3/gems/beaneater-0.3.3/lib/beaneater/job/collection.rb:110:in `block in process!'
17:59:12 jobs.1 |       from /Users/amos/.gem/ruby/2.1.3/gems/beaneater-0.3.3/lib/beaneater/job/collection.rb:92:in `loop'
17:59:12 jobs.1 |       from /Users/amos/.gem/ruby/2.1.3/gems/beaneater-0.3.3/lib/beaneater/job/collection.rb:92:in `process!'
17:59:12 jobs.1 |       from /Users/amos/Dev/memoways/kura/jobs/init.rb:100:in `work'
17:59:12 jobs.1 |       from jobs/worker.rb:5:in `<main>'
17:59:12 jobs.1 | exited with code 1

I am not sure why reserve returns nil:

https://github.com/beanstalkd/beaneater/blob/master/lib/beaneater/job/collection.rb#L94

It seems this commit 109b795 moved the reserve after the begin statement, which makes exceptions raised in reserve caught by the rescue.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.