Git Product home page Git Product logo

action_subscriber's Introduction

Build Status Code Climate Dependency Status Join the chat at https://gitter.im/mxenabled/action_subscriber

ActionSubscriber

ActionSubscriber is a DSL for for easily intergrating your Rails app with a RabbitMQ messaging server.

Requirements

I test on Ruby 2.2.1 and Jruby 9.x. MRI 1.9 and jRuby 1.7 are still supported.

If you want to use MRI 1.9 you will need to lock down the amq-protocol and bunny gems to < 2.0 since they both require ruby 2.0+.

Migrating from ActionSubscriber 3.X or earlier

If you were using the --mode=pop from the 2.X or 3.X version of ActionSubscriber you can get the same sort of behavior by drawing your routes like this:

::ActionSubscriber.draw_routes do
  # instead of creating custom threadpools you set the threadpool size of your connection here in the routes
  # you can set the threadpool size for the default connection via the `::ActionSubscriber.configuration.threadpool_size = 16`
  route UserSubscriber, :created,
    :prefetch => 1,
    :concurrency => 16,
    :acknowledgements => true

  # in user_subscriber.rb make sure to set `at_most_once!` like this
  #
  # class UserSubscriber < ::ActionSubscriber::Base
  #   at_most_once!
  # end

  # If you were previously using custom threadpools for different routes you can mimic that behavior by opening multiple connections
  connection(:slow_work, :threadpool_size => 32) do
    route UserSubscriber, :created,
      :prefetch => 1,
      :concurrency => 32,
      :acknowledgements => true
  end
end

That will give you a similar behavior to the old --mode=pop where messages polled from the server, but with reduced latency.

Supported Message Types

ActionSubscriber support JSON and plain text out of the box, but you can easily add support for any custom message type.

Example

A subscriber is set up by creating a class that inherits from ActionSubscriber::Base.

class UserSubscriber < ::ActionSubscriber::Base
  def created
    # do something when a user is created
  end
end

checkout the examples dir for more detailed examples.

Usage

In your application setup you will draw your subscription routes. In a rails app this is usually done in config/initializers/action_subscriber.rb.

::ActionSubscriber.draw_routes do
  # you can define routes one-by-one for fine-grained controled
  route UserSubscriber, :created

  # or you can setup default routes for all the public methods in a subscriber
  default_routes_for UserSubscriber
end

Now you can start your subscriber process with:

$ bundle exec action_subscriber start

This will connect your subscribers to the rabbitmq broker and allow it to push messages down to your subscribers.

Around Filters

"around" filters are responsible for running their associated actions by yielding, similar to how Rack middlewares work (and Rails around filters work)

class UserSubscriber < ::ActionSubscriber::Base
  around_filter :log_things

  def created
    # do something when a user is created
  end

  private

  def log_things
    puts "before I do some stuff"
    yield
    puts "I did some stuff"
  end
end

Warning: an around filter will only be added once to the chain, duplicate around filters are not supported

Configuration

ActionSubscriber needs to know how to connect to your rabbit server to start getting messages.

In an initializer, you can set the host and the port like this :

ActionSubscriber.configure do |config|
  config.hosts = ["rabbit1", "rabbit2", "rabbit3"]
  config.port = 5672
end

Other configuration options include :

  • config.add_decoder - add a custom decoder for a custom content type
  • config.allow_low_priority_methods - Subscribe to *_low queues in addition to normal queues.
  • config.connection_reaping_interval - Connection reaping interval when using a project ActiveRecord
  • config.connection_reaping_timeout_interval - Connection reaping timeout interval when using a project ActiveRecord
  • config.default_exchange - set the default exchange that your queues will use, using the default RabbitMQ exchange is not recommended
  • config.error_handler - handle error like you want to handle them!
  • config.heartbeat - number of seconds between hearbeats (default 5) see bunny documentation for more details
  • config.hosts - an array of hostnames in your cluster (ie ["rabbit1.myapp.com", "rabbit2.myapp.com"])
  • config.network_recovery_interval - reconnection interval for TCP connection failures (default 1)
  • config.password - RabbitMQ password (default "guest")
  • config.prefetch - number of messages to hold in the local queue in subscriber mode
  • config.resubscribe_on_consumer_cancellation - resubscribe when the consumer is cancelled (queue deleted or cluster fails, default true)
  • config.seconds_to_wait_for_graceful_shutdown - time to wait before force stopping server after shutdown signal
  • config.threadpool_size - set the number of threads available to action_subscriber
  • config.timeout - how many seconds to allow rabbit to respond before timing out
  • config.tls - true/false whether to use TLS when connecting to the server
  • config.tls_ca_certificats - a list of ca certificates to use for verifying the servers TLS certificate
  • config.tls_cert - a client certificate to use during the TLS handshake
  • config.tls_key - a key to use during the TLS handshake
  • config.username - RabbitMQ username (default "guest")
  • config.verify_peer - whether to attempt to validate the server's TLS certificate
  • config.virtual_host - RabbitMQ virtual host (default "/")

Note: TLS is not handled identically in bunny and march_hare. The configuration options we provide are passed through as provided. For details on expected behavior please check the bunny or march_hare documentation based on whether you are running in MRI or jRuby.

Message Acknowledgment

no_acknolwedgement!

This mode is the default. Rabbit is told to not expect any message acknowledgements so messages will be lost if an error occurs. This also allows the broker to send messages as quickly as it wants down to your subscriber.

Warning: If messages arrive very quickly this could cause your process to crash as your memory fills up with unprocessed message. We highly recommend you use at_least_once! mode to provide a throttle so the broker does not overwhelm your process with messages.

manual_acknowledgement!

This mode leaves it up to the subscriber to handle acknowledging or rejecting messages. In your subscriber you can just call acknowledge, reject, or nack.

at_most_once!

Rabbit is told to expect message acknowledgements, but sending the acknowledgement is left up to ActionSubscriber. We send the acknowledgement right before calling your subscriber.

at_least_once!

Rabbit is told to expect message acknowledgements, but sending the acknowledgement is left up to ActionSubscriber. We send the acknowledgement right after calling your subscriber. If an error is raised your message will be retried on a sent back to rabbitmq and retried on an exponential backoff schedule.

safe_nack

If you turn on acknowledgements and a message is not acknowledged by your code manually or using one of the filters above the ErrorHandler middleware which wraps the entire block with call nack this is a last resort so the connection does not get backed up in cases of unexpected or unhandled errors.

redeliver

A message can be sent to "redeliver" with ::ActionSubscriber::MessageRetry.redeliver_message_with_backoff or the DSL method redeliver and optionally takes a "backoff schedule" which is a hash of backoff milliseconds for each redeliver, the default:

  SCHEDULE = {
    2  =>        100,
    3  =>        500,
    4  =>      2_500,
    5  =>     12_500,
    6  =>     62_500,
    7  =>    312_500,
    8  =>  1_562_500,
    9  =>  7_812_500,
    10 => 39_062_500,
  }

when the schedule "returns" nil the message will not be retried

Warning: If you use redeliver you need to handle reject/acknowledge according how errors are handled; if an error is caught and the ack/reject is already done then you may duplicate the message in at_least_once! mode

Testing

ActionSubscriber includes support for easy unit testing with RSpec.

In your spec_helper.rb:

require 'action_subscriber/rspec'

RSpec.configure do |config|
  config.include ::ActionSubscriber::RSpec
end

In your_subscriber_spec.rb : subject { mock_subscriber }

Your test subject will be an instance of your subscriber class, and you can easily test your public methods without dependence on data from Rabbit. You can optionally pass data for your mock subscriber to consume if you wish.

subject { mock_subscriber(:header => "test_header", :payload => "payload") }

Development

If you want to work on action_subscriber you will need to have a rabbitmq instance running locally on port 5672 with a management plugin enabled on port 15672. Usually the easiest way to accomplish this is to use docker and run the command:

$ docker run --net=host --rm=true --hostname diagon --name rabbit rabbitmq:3.6.6-management

Now that rabbitmq is running you can clone this project and run:

$ cd action_subscriber
$ bundle install
$ bundle exec rspec

action_subscriber's People

Contributors

abrandoned avatar andrew-lewin avatar brettallred avatar brianstien avatar equaleffect avatar etetzlaff avatar film42 avatar gitter-badger avatar goutkannan avatar jgaxn avatar liveh2o avatar localshred avatar mmmries avatar mogman1 avatar newellista avatar quixoten avatar ryanjonesmx avatar skunkworker avatar sshock avatar zenovich avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

action_subscriber's Issues

Connection Recovery Fails under MRI

I noticed the other day that if my connection tries to auto-recover when there is active work in the threadpools, the connection gets re-established, but it immediately gets killed because the outstanding work tries to do message acknowledgements for messages that were received under the previous connection.

When the acknowledgements are attempted rabbitmq reports it as a protocol violation (which it is) and kills our new connection ๐Ÿ˜ญ.

I think this will need to be fixed in the upstream bunny gem, but wanted to document it here for any brave explorers.

Cancel Subscriptions For Graceful Shutdown

Right now when you kill a process in poll mode, we stop polling for new messages and wait for the worker pool to empty out before exiting.

We should support this same behavior for subscribe mode by cancelling the subscriptions and waiting a few seconds for the worker pool to empty out before we actually shut down the connection and exit.

Maintenance Mode

Now that support for graceful shutdowns is in place (or almost in place), it would be nice to be able to pause action_subscriber for purposes of maintenance mode (introspection without killing the subscriber process). Anyone else interested in this?

cc @mmmries @brianstien @localshred @abrandoned @liveh2o

Drop Support for Ruby 1.9

I noticed a recent travis failure on ruby 1.9 which is related to our usage of the amq-protocol gem. They dropped support for Ruby 1.9, so we would have to lock down to previous versions of their gem if we want to continue supporting it.

I vote we change our gemspec to specify ruby 2.0+

Add Logging

Not having any subscriber logs has been a pain point for us in the past. We have statsd metrics around the events which alleviates some of the pain, but it would be really helpful to also keep a local log file for each subscriber.

Broken Test: ActionSubscriber, JRuby and acknowledges

I recently received the following email from a user and wanted to provide an answer to any other people using ActionSubscriber.


Hello Michael,

I kindly ask for help with using ActionSubscriber. I'm making a RoR-application that gets messages from RabbitMq, saves them into DB, and sends a manual acknowledgement after successfull saving or a reject if an error occurs.
I also have a test for this code. Both the code and the test worked pretty well in MRI 2.2.2. After switching the project into JRuby 9.0.4.0 or 9.0.5.0 where ActionSubriber uses MarchHare instead of Bunny the program still works well, but the test does not work anymore. I tried several latest versions of MarchHare and ActionSubscriber, but the problem persists. Am I doing something wrong? The test fails on sending acknowledgement with this backtrace:

Error processing message: #method<channel.close>(reply-code=200, reply-text=Goodbye, class-id=0, method-id=0)
Backtrace: /home/dzenovich/.rvm/gems/jruby-9.0.4.0@lochness/gems/march_hare-2.16.0-java/lib/march_hare/exceptions.rb:121:in `convert_and_reraise'
/home/dzenovich/.rvm/gems/jruby-9.0.4.0@lochness/gems/march_hare-2.16.0-java/lib/march_hare/channel.rb:976:in `converting_rjc_exceptions_to_ruby'
/home/dzenovich/.rvm/gems/jruby-9.0.4.0@lochness/gems/march_hare-2.16.0-java/lib/march_hare/channel.rb:748:in `basic_ack'
/home/dzenovich/.rvm/gems/jruby-9.0.4.0@lochness/gems/march_hare-2.16.0-java/lib/march_hare/channel.rb:663:in `block in ack'
org/jruby/RubyProc.java:318:in `call'
/home/dzenovich/.rvm/gems/jruby-9.0.4.0@lochness/gems/march_hare-2.16.0-java/lib/march_hare/channel.rb:991:in `guarding_against_stale_delivery_tags'
/home/dzenovich/.rvm/gems/jruby-9.0.4.0@lochness/gems/march_hare-2.16.0-java/lib/march_hare/channel.rb:662:in `ack'
/home/dzenovich/.rvm/gems/jruby-9.0.4.0@lochness/gems/action_subscriber-2.2.1-java/lib/action_subscriber/middleware/env.rb:45:in `acknowledge'
/home/dzenovich/.rvm/gems/jruby-9.0.4.0@lochness/gems/action_subscriber-2.2.1-java/lib/action_subscriber/base.rb:60:in `acknowledge'
...

Here is my subscriber:

class MySubscriber < ::ActionSubscriber::Base
  manual_acknowledgement!

  def created
    data = JSON.parse(payload)

    # skip messages without id field

    unless data["id"]
      acknowledge
      return
    end 

    delivery_tag = env.instance_variable_get(:@delivery_tag)
    unless delivery_tag.is_a? Fixnum
      delivery_tag = delivery_tag.tag
    end 
    id_str = data["id"]
    document = MyModel.where(_id: id_str,
                             delivery_tag: delivery_tag,
                             state: 'pending').first
    document ||= MyModel.new _id: id_str,
                             payload: payload,
                             delivery_tag: delivery_tag,
                             state: 'pending'
    if document.save
      acknowledge
      document.update(state: 'commited')
    else
      reject
    end 
  end 
end

My initializer for ActionSubscriber (config/initializers/action_subscriber.rb):

ActionSubscriber.draw_routes do
  route MySubscriber, :created,
        exchange:         'events',
        acknowledgements: true
end

ActionSubscriber.configure do |config|
  config.hosts = ["localhost"]
  config.port = 5672
  config.error_handler = lambda do |error, env_hash|
    Rails.logger.error("Error processing message: #{error.message}\n"\
                       "Backtrace: #{error.backtrace.join("\n")}")
    raise error
  end 
end

unless defined?(Rails::Console) or Rails.env.test?
  ActionSubscriber.start_subscribers
end

My test/test_helper.rb:

ENV["RAILS_ENV"] ||= "test"
require File.expand_path('../../config/environment', __FILE__)
require 'rails/test_help'

class ActiveSupport::TestCase
  def before_setup
    DatabaseCleaner.start

    $messages = Set.new
    draw_routes if respond_to?(:draw_routes)
    ::ActionSubscriber::RabbitConnection.subscriber_connection
    ::ActionSubscriber.setup_queues!

    super
  end 

  def after_teardown
    super

    ::ActionSubscriber.stop_subscribers!
    ::ActionSubscriber::RabbitConnection.subscriber_disconnect!
    ::ActionSubscriber.instance_variable_set("@route_set", nil)

    DatabaseCleaner.clean
  end 
end

require 'mocha/mini_test'

Here is my test:

  def test_real_publishing
    successful_payload = "{"id": 12345}"
    assert_difference("MyModel.count", 1) do
      ActionSubscriber::Publisher.publish(route_tag,
                                          successful_payload, 'events')
      ActionSubscriber::auto_pop!
    end 
  end

Thanks in advance.

Publishing

Part of #14 is to add publishing support.

Offline @liveh2o expressed a desire to either not include publishing in this gem or to rename the gem to something that isn't subscribing specific.

Here are some pros and cons I have been thinking about:

  • The only direct dependencies required by both the publishing and subscribing code are bunny or march_hare depending on the platform. Releasing both an MRI and jRuby version of this gem is an annoying maintenance cost and I would rather not pay that cost twice.
  • The only connection between publishing and subscribing is the naming convention so putting them in the same gem doesn't buy us much in terms of keeping the two gems in sync.
  • ActionSubscriber::Publisher is kind of a weird name

The tradeoff that makes the most sense to me would be to have just a single ActionSubscriber::Publisher.publish call supported in this library. It will deal with the difference between bunny and march_hare, but won't include any support for detecting the content type or encoding the message.

That way most of the publishing logic that we currently have in our internal Buttress gem would stay there, it would only the platform-specific differences that are supported by this gem since that is a big part of its overall reason for existing.

Thoughts?
/cc @ah @abrandoned @brettallred @localshred @quixoten

Breaking changes in YAML loading (Psych)

Psych (aka YAML) 4.x included a breaking change to how YAML.load works. In Psych 4.0, load calls safe_load under the hood, and is therefore "safe" by default, but that breaks configurations that support (among other things) aliases, which are disabled when using "safe" loading.

YAML.unsafe_load (and YAML.unsafe_load_file) is now the canonical way to load trusted documents (i.e., config files).

To ensure maximum compatibility with old versions of Psych for ActionSubscriber, we also need to set a minimum version of Psych to ensure unsafe_load (and unsafe_load_file) is defined. The methods were introduced in v3.3.2.

Handle a subscriber being removed if it is still in routes

I deleted a subscriber and after a couple of hours we realized that none of the subscribers at all were running. This is because I had forgotten to delete the subscriber from the routes and so upon initialization it would blow up. I think this should be handled gracefully

Deprecating Multiple Connections

At MX we've run into scaling issues with our RabbitMQ cluster. One of the biggest issues is that our usage of the multiple connections and :concurrency settings features in this library creates a lot of channels. Our production cluster has over 15k channels and consumers registered.

In order to work around these issues we've introduced the new threadpool feature which will take the place of multiple connections and it will allow multiple messages process at the same time for the same consumer. Based on our testing this is within 10% of the throughput of using multiple connections and channels for routes even when processing extremely small messages at high throughputs.

Trying to maintain both of these feature sets is a lot of extra work so we are leaning towards removing support for multiple connections and the :concurrency setting, but if this breaks your usage of the library we would like to know. Please comment below and explain how you are using the library. If you are interested in helping to maintain the features for multiple connections and :concurrency please note that as well.

Active Record Publisher

@mmmries Didn't we another module that we would include in Active Record objects that would automatically publish events on created, updated, deleted? For some reason I thought we had that in here but I guess we just put the basic publisher. Is that something you feel we should add into this gem or do you guys just want to keep your implementation in Buttress.

exponential backoff for at_least_once! mode

Right now the at_least_once! mode is pretty naive in how it handles failures. If an error occurs we will just requeue the message endlessly. If the error that occurs is not recoverable (ie. it tries to hit a service which is completely gone from the internet) then this message will get retried over and over in a tight loop.

I think a better default behavior would be to handle failures with an exponential backoff algorithm similar to sidekiq.

This is not a feature built into RabbitMQ, but there are ways to handle it client side. We could add this behavior as an additional middleware in a totally separate gem, but I think it would make sense to have this be the default.

Surprising Retry Behavior

In talking with @abrandoned about a similar system I realized that if you have two subscribers that are both listening to the same routing key, and one of them fails a job and retries it (with the new at_least_once! mode), the job would be re-published to both of the subscribers.

This seems like a very surprising behavior to me. I think each of those subscribers should be able to set themselves as at_least_once!, at_most_once! or have no acknowledgements at all without worrying about whether the other subscribers are going to re-publish the message.

Is there a way for us to setup the at_least_once! behavior so that it re-queues the message just to one subscriber's queue rather than re-publishing it to the exchange (and giving a copy to all of the subscribers listening to that routing key)?

issue requiring actionsubscriber/rspec when using rspec-rails gem

Ran into a problem when using rspec-rails gem instead of just rspec.

/Users/aj/.rvm/gems/ruby-2.3.3@alfred/gems/action_subscriber-4.5.1/lib/action_subscriber/rspec.rb:1:in 'require': cannot load such file -- rspec (LoadError)
  from /Users/aj/.rvm/gems/ruby-2.3.3@alfred/gems/action_subscriber-4.5.1/lib/action_subscriber/rspec.rb:1:in '<top (required)>'
  from /Users/aj/Projects/alfred/spec/spec_helper.rb:1:in 'require'
  from /Users/aj/Projects/alfred/spec/spec_helper.rb:1:in '<top (required)>'
  from /Users/aj/.rvm/gems/ruby-2.3.3@alfred/gems/rspec-core-3.7.0/lib/rspec/core/configuration.rb:1455:in 'require'
  from /Users/aj/.rvm/gems/ruby-2.3.3@alfred/gems/rspec-core-3.7.0/lib/rspec/core/configuration.rb:1455:in 'block in requires='
  from /Users/aj/.rvm/gems/ruby-2.3.3@alfred/gems/rspec-core-3.7.0/lib/rspec/core/configuration.rb:1455:in 'each'
  from /Users/aj/.rvm/gems/ruby-2.3.3@alfred/gems/rspec-core-3.7.0/lib/rspec/core/configuration.rb:1455:in 'requires='
  from /Users/aj/.rvm/gems/ruby-2.3.3@alfred/gems/rspec-core-3.7.0/lib/rspec/core/configuration_options.rb:112:in 'block in process_options_into'
  from /Users/aj/.rvm/gems/ruby-2.3.3@alfred/gems/rspec-core-3.7.0/lib/rspec/core/configuration_options.rb:111:in 'each'
  from /Users/aj/.rvm/gems/ruby-2.3.3@alfred/gems/rspec-core-3.7.0/lib/rspec/core/configuration_options.rb:111:in 'process_options_into'
  from /Users/aj/.rvm/gems/ruby-2.3.3@alfred/gems/rspec-core-3.7.0/lib/rspec/core/configuration_options.rb:21:in 'configure'
  from /Users/aj/.rvm/gems/ruby-2.3.3@alfred/gems/rspec-core-3.7.0/lib/rspec/core/runner.rb:99:in 'setup'
  from /Users/aj/.rvm/gems/ruby-2.3.3@alfred/gems/rspec-core-3.7.0/lib/rspec/core/runner.rb:86:in 'run'
  from /Users/aj/.rvm/gems/ruby-2.3.3@alfred/gems/rspec-core-3.7.0/lib/rspec/core/runner.rb:71:in 'run'
  from /Users/aj/.rvm/gems/ruby-2.3.3@alfred/gems/rspec-core-3.7.0/lib/rspec/core/runner.rb:45:in 'invoke'
  from /Users/aj/.rvm/gems/ruby-2.3.3@alfred/gems/rspec-core-3.7.0/exe/rspec:4:in '<top (required)>'
  from /Users/aj/.rvm/gems/ruby-2.3.3@alfred/bin/rspec:23:in 'load'
  from /Users/aj/.rvm/gems/ruby-2.3.3@alfred/bin/rspec:23:in '<main>'
  from /Users/aj/.rvm/gems/ruby-2.3.3@alfred/bin/ruby_executable_hooks:15:in 'eval'
  from /Users/aj/.rvm/gems/ruby-2.3.3@alfred/bin/ruby_executable_hooks:15:in '<main>'

I found that this is due to the
require 'rspec' line in /lib/action_subscriber/rspec.rb
Once I commented out that line my rspecs were working perfectly. I also tested it on a branch that had the rspec gem to make sure it still worked. Here is a PR for your consideration. #93

I also threw in a tiny change in your readme that is also related to rspec.

Apply prefetch to the consumer, not the whole channel

According to the rabbitmq documentation having prefetch values on channels requires extra] work for the broker. Instead it is recommend to set the global = false and apply the prefetch per consumer.

We will need to look into bunny and march_hare to see if they both provide this functionality and determine if the APIs are compatible or whether we need to have some kind of conditional in our library to deal with the difference.

MarchHare::ChannelAlreadyClosed after Acknowledge

Everything works fine for a variable amount of time (15-60 minutes on average), but eventually I get this error:

Error processing message: MarchHare::ChannelAlreadyClosed
Backtrace: /app/vendor/bundle/jruby/2.2.0/gems/march_hare-2.16.0-java/lib/march_hare/exceptions.rb:121:in `convert_and_reraise'
/app/vendor/bundle/jruby/2.2.0/gems/march_hare-2.16.0-java/lib/march_hare/channel.rb:976:in `converting_rjc_exceptions_to_ruby'
/app/vendor/bundle/jruby/2.2.0/gems/march_hare-2.16.0-java/lib/march_hare/channel.rb:748:in `basic_ack'
/app/vendor/bundle/jruby/2.2.0/gems/march_hare-2.16.0-java/lib/march_hare/channel.rb:663:in `block in ack'
org/jruby/RubyProc.java:318:in `call'
/app/vendor/bundle/jruby/2.2.0/gems/march_hare-2.16.0-java/lib/march_hare/channel.rb:991:in `guarding_against_stale_delivery_tags'
/app/vendor/bundle/jruby/2.2.0/gems/march_hare-2.16.0-java/lib/march_hare/channel.rb:662:in `ack'
/app/vendor/bundle/jruby/2.2.0/gems/action_subscriber-2.4.0-java/lib/action_subscriber/middleware/env.rb:45:in `acknowledge'
/app/vendor/bundle/jruby/2.2.0/gems/action_subscriber-2.4.0-java/lib/action_subscriber/base.rb:60:in `acknowledge'
/app/app/subscribers/semantria_activity_subscriber.rb:13:in `created'
org/jruby/RubyKernel.java:1823:in `public_send'
/app/vendor/bundle/jruby/2.2.0/gems/action_subscriber-2.4.0-java/lib/action_subscriber/dsl.rb:81:in `block in run_action_with_filters'
org/jruby/RubyProc.java:318:in `call'
/app/vendor/bundle/jruby/2.2.0/gems/action_subscriber-2.4.0-java/lib/action_subscriber/dsl.rb:86:in `run_action_with_filters'
/app/vendor/bundle/jruby/2.2.0/gems/action_subscriber-2.4.0-java/lib/action_subscriber/middleware/router.rb:12:in `call'
/app/vendor/bundle/jruby/2.2.0/gems/action_subscriber-2.4.0-java/lib/action_subscriber/middleware/active_record/query_cache.rb:14:in `call'
/app/vendor/bundle/jruby/2.2.0/gems/action_subscriber-2.4.0-java/lib/action_subscriber/middleware/active_record/connection_management.rb:10:in `call'
/app/vendor/bundle/jruby/2.2.0/gems/action_subscriber-2.4.0-java/lib/action_subscriber/middleware/decoder.rb:15:in `call'
/app/vendor/bundle/jruby/2.2.0/gems/action_subscriber-2.4.0-java/lib/action_subscriber/middleware/error_handler.rb:11:in `call'
/app/vendor/bundle/jruby/2.2.0/gems/middleware-0.1.0/lib/middleware/runner.rb:31:in `call'
/app/vendor/bundle/jruby/2.2.0/gems/middleware-0.1.0/lib/middleware/builder.rb:102:in `call'
/app/vendor/bundle/jruby/2.2.0/gems/action_subscriber-2.4.0-java/lib/action_subscriber/march_hare/subscriber.rb:76:in `block in enqueue_env'
/app/vendor/bundle/jruby/2.2.0/gems/activesupport-4.2.6/lib/active_support/notifications.rb:166:in `instrument'
/app/vendor/bundle/jruby/2.2.0/gems/action_subscriber-2.4.0-java/lib/action_subscriber/march_hare/subscriber.rb:75:in `block in enqueue_env'
org/jruby/RubyProc.java:318:in `call'
/app/vendor/bundle/jruby/2.2.0/gems/lifeguard-0.0.9/lib/lifeguard/threadpool.rb:75:in `block in async'

I'm running JRuby 9.0.5.0 and action_subscriber (2.4.0-java)

Since it's dying on acknowledge, the message is never cleared and the thread is essentially "stuck". Once this happens on all 5 of my threads, the service essentially stops.

Circular dependency detected while autoloading

Error

RuntimeError: Circular dependency detected while autoloading constant 
Transform::TwitterPowertrack

Specs

action_subscriber (2.3.0-java)
Rails (4.2.6)
Ruby: jruby 9.0.5.0

Directory Structure

Normal Rails, plus:

app
 |-- service_objects
   |-- transform
     |-- twitter_powertrack.rb

Object creation looks like this:

class Transform::TwitterPowertrack < ::ServiceObject
end

Additional Notes

Only thing loaded extra in application.rb is:
config.autoload_paths << Rails.root.join("lib")

Procfile

web: bundle exec rails server puma -p $PORT -e $RACK_ENV
worker: bundle exec action_subscriber start --host $RABBITMQ_HOST --port $RABBITMQ_PORT

Runtime

foreman start

Backtrace

/Users/brian.broderick/.rvm/gems/jruby-9.0.5.0@pallet-jack/gems/activesupport-4.2.6/lib/active_support/dependencies.rb:492:in `load_missing_constant'
10:18:21 worker.1 | /Users/brian.broderick/.rvm/gems/jruby-9.0.5.0@pallet-jack/gems/activesupport-4.2.6/lib/active_support/dependencies.rb:184:in `const_missing'
10:18:21 worker.1 | /projects/pallet_jack/app/subscribers/powertrack_v2_activity_subscriber.rb:5:in `created'
10:18:21 worker.1 | org/jruby/RubyKernel.java:1823:in `public_send'
10:18:21 worker.1 | /Users/brian.broderick/.rvm/gems/jruby-9.0.5.0@pallet-jack/gems/action_subscriber-2.3.0-java/lib/action_subscriber/dsl.rb:81:in `block in run_action_with_filters'
10:18:21 worker.1 | org/jruby/RubyProc.java:318:in `call'
10:18:21 worker.1 | /Users/brian.broderick/.rvm/gems/jruby-9.0.5.0@pallet-jack/gems/action_subscriber-2.3.0-java/lib/action_subscriber/dsl.rb:86:in `run_action_with_filters'
10:18:21 worker.1 | /Users/brian.broderick/.rvm/gems/jruby-9.0.5.0@pallet-jack/gems/action_subscriber-2.3.0-java/lib/action_subscriber/middleware/router.rb:12:in `call'
10:18:21 worker.1 | /Users/brian.broderick/.rvm/gems/jruby-9.0.5.0@pallet-jack/gems/action_subscriber-2.3.0-java/lib/action_subscriber/middleware/active_record/query_cache.rb:14:in `call'
10:18:21 worker.1 | /Users/brian.broderick/.rvm/gems/jruby-9.0.5.0@pallet-jack/gems/action_subscriber-2.3.0-java/lib/action_subscriber/middleware/active_record/connection_management.rb:10:in `call'
10:18:21 worker.1 | /Users/brian.broderick/.rvm/gems/jruby-9.0.5.0@pallet-jack/gems/action_subscriber-2.3.0-java/lib/action_subscriber/middleware/decoder.rb:15:in `call'
10:18:21 worker.1 | /Users/brian.broderick/.rvm/gems/jruby-9.0.5.0@pallet-jack/gems/action_subscriber-2.3.0-java/lib/action_subscriber/middleware/error_handler.rb:11:in `call'
10:18:21 worker.1 | /Users/brian.broderick/.rvm/gems/jruby-9.0.5.0@pallet-jack/gems/middleware-0.1.0/lib/middleware/runner.rb:31:in `call'
10:18:21 worker.1 | /Users/brian.broderick/.rvm/gems/jruby-9.0.5.0@pallet-jack/gems/middleware-0.1.0/lib/middleware/builder.rb:102:in `call'
10:18:21 worker.1 | /Users/brian.broderick/.rvm/gems/jruby-9.0.5.0@pallet-jack/gems/action_subscriber-2.3.0-java/lib/action_subscriber/march_hare/subscriber.rb:76:in `block in enqueue_env'
10:18:21 worker.1 | /Users/brian.broderick/.rvm/gems/jruby-9.0.5.0@pallet-jack/gems/activesupport-4.2.6/lib/active_support/notifications.rb:166:in `instrument'
10:18:21 worker.1 | /Users/brian.broderick/.rvm/gems/jruby-9.0.5.0@pallet-jack/gems/action_subscriber-2.3.0-java/lib/action_subscriber/march_hare/subscriber.rb:75:in `block in enqueue_env'
10:18:21 worker.1 | org/jruby/RubyProc.java:318:in `call'
10:18:21 worker.1 | /Users/brian.broderick/.rvm/gems/jruby-9.0.5.0@pallet-jack/gems/lifeguard-0.0.9/lib/lifeguard/threadpool.rb:75:in `block in async'

Additional Notes

ActionSubscriber hangs about 25% of the time after getting this error.

Shared Threadpools vs Multiple Consumers

The ActionSubscriber community has started to see issues with seeing subscribers (or in some cases just single queues/routes) freeze after operating for a while (see #66, #68 and #69).

After digging into this for a while it seem so to be related to the fact that message acknowledgement is not thread-safe. I found a forum thread where Michael Klishin (the author of march_hare, bunny and a contributor to the core java client codebase) discussed this. I think the crux of his discussion for ActionSubscriber's use case boils down to:

It says that you should not share channels between threads, specifically publishing.
Having multiple consumers on a shared channel is fine, including if your consumers are
concurrent, as long as you are careful with how you acknowledge (one-by-one should work fine,
multiple-at-a-time will result in double-acks and channel termination).

We have some experiment underway right now where we could use a single thread to handle all message acknowledgement for an entire process of subscribers, a thread-per-channel for message acknowledgements.

Considering all of these changes, I wondered what people's feelings would be about potentially removing the idea of shared threadpools entirely from ActionSubscriber?

An alternate design would be to allow a route to set a number_of_threads => 5 option. This would register 5 consumers for that route and those 5 consumers could each run in their own thread. Locking a consumer to a thread requires the developer to allocate resources to each route at startup time, but it makes the mental model of the library a lot simpler as well.

I'm not sure this is a good direction for the library to take, but just wanted to get some feedback on the idea.

Addtional Features: Action Publisher & Babou

What has the conversation been regarding rounding out this gem as an end to end solution rather than only part of the messaging solution.

This could include

  • Publishable support for an ActiveRecord Model
  • Native Subscriber processor (Babou) in the same way that Promiscuous has their subscriber built in rather than in a different repo.
  • Update RabbitConnection to support Rx & Tx connections for separating out connections for consumers and publishers

I have a proof of concept working internally that does this as a single package for event messaging but was curious the direction you guys are taking on this?

Multiple threadpools

I would really like to deprecate the --allow-low-priority-queues option. Running two separate subscriber processes is a pain when what we really want is just to avoid having the low priority work take up all the threads in the threadpool.

I propose that we make it possible to assign a route to a named threadpool. This would allow us to setup a low_priority threadpool with just a few threads and connect all the *_low subscriptions to that threadpool while we keep the rest of the subscriptions connected to the main threadpool.

config/action_subscriber.yml

production:
  threadpool_size: 10 #the simpler form (will still be supported)
  threadpool_sizes:
    default: 10
    low_priority: 5

config/initializers/action_subscriber.rb

ActionSubscriber.draw_routes do
  route ::ClientSnapshotSubscriber, :created, :acknowledgements => true, :publisher => :amigo
  route ::AggregationSubscriber, :completed_low, :threadpool => :low_priority, :acknowledgements => true, :publisher => :grunt
end

Or perhaps we could skip the yaml configuration and just set things up in the route definitions like:

low_priority_threadpool = ::ActionSubscriber::Threadpool.new(5)

::ActionSubscriber.draw_routes do
  route ::ClientSnapshotSubscriber, :created, :acknowledgements => true, :publisher => :amigo
  route ::AggregationSubscriber, :completed_low, :threadpool => low_priority_threadpool, :acknowledgements => true, :publisher => :grunt
end

/cc @abrandoned @localshred @liveh2o @brianstien

Extract action_subscriber-bunny and action_subscriber-march_hare

After chatting with @localshred the other day about the protobuf gem and where it's headed it occurred to me that releasing both a java and mri version of this gem might be silly.

Most of the ugliest parts of this gem right now are the areas where we have to handle the differences between bunny and march_hare.

I have also been asked by a few people at URUG meetups and MWRC whether ActionSubscriber could support other message queuing services like NSQueue.

So I'm thinking about making a action_subscriber-bunny gem which would have all the bunny specific stuff in it and would add_runtime_dependency 'action_subscriber', '~> 2.0'. What do other people think?

draw_routes is ignoring config

When drawing explicit routes, CLI and yaml configs are ignored.

Example:

ENV["APP_NAME"] = "yolo"

require "action_subscriber"

class UserSubscriber < ::ActionSubscriber::Base
  def created
  end

  def created_low
  end
end

::ActionSubscriber.draw_routes do
  default_routes_for ::UserSubscriber
end

::Thread.new do
  # Wait for subscribers to startup and then check the thread pool
  sleep 2
  puts ::ActionSubscriber::Threadpool.pool(:default).inspect
end
$ action_subscriber start --allow-low-priority-methods --threadpool-size 40 --prefetch 5
I, [2016-02-18T21:19:13.345970 #49177]  INFO -- : Starting in-memory publisher adapter.
I, [2016-02-18T21:19:13.346747 #49177]  INFO -- : Loading configuration...
I, [2016-02-18T21:19:13.346901 #49177]  INFO -- : Starting server...
I, [2016-02-18T21:19:13.356552 #49177]  INFO -- : Rabbit Host: localhost
Rabbit Port: 5672
Threadpool Size: 40
Low Priority Subscriber: true
Decoders:
  --application/json
  --text/plain
I, [2016-02-18T22:35:06.379808 #61299]  INFO -- : UserSubscriber
I, [2016-02-18T22:35:06.379836 #61299]  INFO -- :   -- method: created
I, [2016-02-18T22:35:06.379856 #61299]  INFO -- :     --    exchange: events
I, [2016-02-18T22:35:06.379875 #61299]  INFO -- :     --       queue: yolo.user.created
I, [2016-02-18T22:35:06.379892 #61299]  INFO -- :     -- routing_key: user.created
I, [2016-02-18T22:35:06.379914 #61299]  INFO -- : Action Subscriber connected

#<Lifeguard::InfiniteThreadpool:0x007f9d7a09f4d8 @pool_size=8>

So despite having a created_low queue, and setting the thread pool to 40, both options are ignored.

Update README and examples

With some of the recent changes to routes and the action_subscriber binstub we really need to update the README and the examples to show off these new features and intended usage of the library.

Warnings for drawing routes with inconsistent expectations of message acknowledgement

I've messed this up several times now where I made a subscriber with at_most_once! and forgot to pass :acknolwedgements => true in the route or vice versa. It's always a pain to troubleshoot why the messages aren't going anywhere.

I think we should do a sanity check during the draw_routes phase to check for routes where the expectation of acknowledgements doesn't match what the subscriber expects. Doing it during draw_routes means that we'll usually find these while running unit tests even if action_subscriber isn't going to establish an actual connection.

If we find a mismatch we should make some noise.

noise

Open Source `harness-action_subsriber`

We have an internal gem hooking up the ActiveSupport instrumentation in action_subscriber to the harness gem so it can easily be forwarded onto a statsd server.

We should open source that gem.

Minimum Psych version is incompatible with Rails < 6.1

Rails < 6.0 uses the old way of loading YAML files. This breaks loading database configs with aliases (a common development practice) with Psych 4.x. Adding a minimum version of Psych to the gemspec here forces an upgrade to Psych 4.x, which means we end up having to pin Psych to 3.3.2.

Rails 6.1 solves this problem by checking if the new unsafe_load method exists, and falling back to load if it doesn't. That's a better way.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.