Git Product home page Git Product logo

karafka-testing's Introduction

karafka logo

Build Status Gem Version Join the chat at https://slack.karafka.io

About Karafka

Karafka is a Ruby and Rails multi-threaded efficient Kafka processing framework that:

# Define what topics you want to consume with which consumers in karafka.rb
Karafka::App.routes.draw do
  topic 'system_events' do
    consumer EventsConsumer
  end
end

# And create your consumers, within which your messages will be processed
class EventsConsumer < ApplicationConsumer
  # Example that utilizes ActiveRecord#insert_all and Karafka batch processing
  def consume
    # Store all of the incoming Kafka events locally in an efficient way
    Event.insert_all messages.payloads
  end
end

Karafka uses threads to handle many messages simultaneously in the same process. It does not require Rails but will integrate tightly with any Ruby on Rails applications to make event processing dead simple.

Getting started

karafka web ui

If you're entirely new to the subject, you can start with our "Kafka on Rails" articles series, which will get you up and running with the terminology and basic ideas behind using Kafka:

If you want to get started with Kafka and Karafka as fast as possible, then the best idea is to visit our Getting started guides and the example apps repository.

We also maintain many integration specs illustrating various use-cases and features of the framework.

TL;DR (1 minute from setup to publishing and consuming messages)

Prerequisites: Kafka running. You can start it by following instructions from here.

  1. Add and install Karafka:
# Make sure to install Karafka 2.4
bundle add karafka --version ">= 2.4.0"

bundle exec karafka install
  1. Dispatch a message to the example topic using the Rails or Ruby console:
Karafka.producer.produce_sync(topic: 'example', payload: { 'ping' => 'pong' }.to_json)
  1. Run Karafka server and see the consumption magic happen:
bundle exec karafka server

[86d47f0b92f7] Polled 1 message in 1000ms
[3732873c8a74] Consume job for ExampleConsumer on example started
{"ping"=>"pong"}
[3732873c8a74] Consume job for ExampleConsumer on example finished in 0ms

Want to Upgrade? LGPL is not for you? Want to help?

I also sell Karafka Pro subscriptions. It includes a commercial-friendly license, priority support, architecture consultations, enhanced Web UI and high throughput data processing-related features (virtual partitions, long-running jobs, and more).

10% of the income will be distributed back to other OSS projects that Karafka uses under the hood.

Help me provide high-quality open-source software. Please see the Karafka homepage for more details.

Support

Karafka has Wiki pages for almost everything and a pretty decent FAQ. It covers the installation, setup, and deployment, along with other useful details on how to run Karafka.

If you have questions about using Karafka, feel free to join our Slack channel.

Karafka has priority support for technical and architectural questions that is part of the Karafka Pro subscription.

karafka-testing's People

Contributors

mensfeld avatar nijikon avatar ojab avatar renovate[bot] avatar valentinorusconi-eh avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar

karafka-testing's Issues

Allow specifying target consumer_group when creating a test consumer with `#karafka_consumer_for`

When multiple consumer groups are configured to consume from the same topic, as in the example below, the #karafka_consumer_for method will register its returned consumer only to the consumer group is defined last in the Karafka configuration file.

my_app.rb

class MyApp < Karafka::App
  consumer_groups.draw do
      consumer_group :group_one do
        topic("incoming_events") { consumer EventProcessingConsumer }
      end
  
      consumer_group :group_two do
        topic("incoming_events") { consumer EventReportingConsumer }
      end
    end
end

event_processing_consumer_spec.rb

RSpec.describe EventProcessingConsumer do
  subject(:consumer) { karafka_consumer_for("incoming_events") }

  # consumer is registered to the topic for  consumer group two 
  # since it is the last one defined in the consumer_group.draw block
  consumer.topic.consumer_group # => group_two 
end

I think this issue occurs because of the way the method iterates over the available topic names here, which seems to disregard the consumer_group for which the topic is registered. I would be helpful to be able to optionally pass a target consumer_group_name arg to this method in order to ensure the consumer was registered to the correct group/topic combination.

Dynamic `App`

Hi,
First, Thank you for your work for this gem.

in these lines, there is a hard coded App which actually can differ from one application to another.

I just thought of 2 alternatives:

1- Maybe ask the gem user to specify a let(:karafka_app) { MyCustomKarafkaApp } and then use that. (since you already expect the rspec subject to be defined by the user as well)

2- or perhaps there is a way to get this value from the Karafka gem. Actually one way to do so is to do something like this : Karafka::App.descendants.first (or .last) which returns the class that is defined in karafka.rb that inherits from Karafka::App.

Or maybe a hybrid approach. However, I'd go for the Karafka::App.descendants.first

EDIT:

@mensfeld A third idea, I just tried Karafka::App.consumer_groups in console and it worked and listed all my consumer groups. And since all karafka applications inherit from Karafka::App I think this would work.

karafka helpers initialize `described_class`, they shouldn't

Consider the following innocent spec:

class Xxx
  def initialize(foo:)
  end
end

RSpec.describe Xxx do
  include Karafka::Testing::RSpec::Helpers

  specify do
    expect { Karafka.producer.produce_sync(topic: 'foo', payload: 'bar') }
      .to change(karafka.produced_messages, :size).by(1)
  end
end

#=>

Failures:

  1) Xxx is expected to change `Array#size` by 1
     Failure/Error:
       def initialize(foo:)
       end
     
     ArgumentError:
       missing keyword: :foo
     # ./spec/xxx_spec.rb:2:in `initialize'
     # …/rspec-core-3.11.0/lib/rspec/core/memoized_helpers.rb:60:in `new'
…
     # …/karafka-testing-2.0.5/lib/karafka/testing/rspec/helpers.rb:91:in `_karafka_add_message_to_consumer_if_needed'


https://github.com/karafka/karafka-testing/blob/master/lib/karafka/testing/rspec/helpers.rb#L87-L94

        def _karafka_add_message_to_consumer_if_needed(message)
          # We're interested in adding message to subject only when it is a consumer
          # Users may want to test other things (models producing messages for example) and in
          # their case subject will not be a consumer
          return unless subject.is_a?(Karafka::BaseConsumer)
          # We target to the consumer only messages that were produced to it, since specs may also
          # produce other messages targeting other topics
          return unless message[:topic] == subject.topic.name

https://github.com/rspec/rspec-core/blob/main/lib/rspec/core/memoized_helpers.rb#L57-L62

      def subject
        __memoized.fetch_or_store(:subject) do
          described = described_class || self.class.metadata.fetch(:description_args).first
          Class === described ? described.new : described
        end
      end

so by calling subject we're trying to initialize the class that requires some arguments.

Support for testing around pattern matching in topic to consumer mapping

Thank you for this library, it's been very helpful for testing my topic to consumer mappings and making sure the correct consumer class is invoked and the correct behavior is found.

I'm switching now to using patterns instead of named topics, specifically for the DLQ case as described in the docs.

In my spec I'm using:

karafka.consumer_for("dlq-demo-topic")

And in my consumer mapping I have:

pattern(:dlqs_pattern, /^dlq.*/) do
  consumer DeadLetterQueueConsumer
end

I've tried a few different variations of the regex, but I always get Karafka::Testing::Errors::TopicNotFoundError when running my tests. As I understand, testing pattern matches like this is not currently supported.

Breaking change in 2.0.2 - karafka.publish to karafka.produce

I can see this in the 2.0.2 patch notes:

Rename `karafka.publish` to `karafka.produce` to align naming conventions.

But it doesn't call it out as a breaking change and probably shouldn't be in a patch release.

Even though it's a testing library the change broke all of our tests - probably worth noting in the CHANGELOG.md as its a public method.

have_enqueued_job rspec matcher does not work with :karafka ActiveJob queue_adapter

If you configure a Rails application to use the karafka queue adapter as described in https://karafka.io/docs/Active-Job/, then when writing rspec tests that use the have_enqueued_job matcher, they will fail reporting no jobs were enqueued


Expected behavior

When configuring rails to use the karafka ActiveJob adapter with:

config.active_job.queue_adapter = :karafka

Then if I write ruby code to publish a job:

class SomeJob < ApplicationJob
  def perform
    puts "performed"
  end
end

SomeJob.perform_later

I expect an rspec test like this to pass:

expect do 
  SomeJob.perform_later
end.to have_enqueued_job(SomeJob).once

Actual behavior

The rspec test fails with

expected to enqueue exactly 1 jobs, but enqueued 0

I also observed this happens regardless of whether I use the produce_sync option or not

karafka_options(
  dispatch_method: :produce_sync
)

Steps to reproduce the problem

Create a rails 7 application, and configure it to use the :karafka queue adapter for ActiveJob as per above

Set up the karafka gem

Write a test that enqueues a job

See that it fails to report enqueueing the job

Your setup details

I am using Rails 7.0.4.3 and rspec 3.12.0

Karafka info is below:

$ [bundle exec] karafka info
2023-04-21 09:30:25.307444 I [65649:57120] Rails -- Karafka version: 2.0.38
Ruby version: ruby 3.2.1 (2023-02-08 revision 31819e82c8) +YJIT [arm64-darwin22]
Rdkafka version: 0.12.1
Consumer groups count: 4
Subscription groups count: 4
Workers count: 5
...

Incompatibility of Deserializer with Testing

re: https://karafka.io/docs/Deserialization/#schema-registry_1

We are deserialising using Avro and Schema Registry. What we found was that the suggested AvroRegistryDeserializer code in the docs wasn't compatible with the Testing recommendations by Avro Turf.

Essentially the web mock stub wasn't able to stub the AvroTurf::Messaging.new client due to the client being intialised in the AVRO constant before the stub. Our workaround was to define a method instead, e.g:
def avro
@avro ||= AvroTurf::Messaging.new(...)
end

This meant the client is initialised after the stub has been called.

Configuration testing

We're checking if karafka configuration is properly filled in specs, because we're getting all the configuration via

ENV['something'] = 'foo' # dotenv
MyApp.config.something = ENV['something'] # dry-c w/ contract
KarafkaApp.config.something = MyApp.config.something

and it's good to test if seed brokers etc is properly propagated.

With karafka-1 it was (simplified):
karafka.rb:

require 'my_app/karafka_app'

MyApp::KarafkaApp.boot!

my_app/karafka_app.rb

module MyApp
  # Main MyApp karafka consumer application. Starting via `<APP_ROOT>/karafka.rb`.
  class KarafkaApp < Karafka::App
    class << self
      # Setup application. This method allow us to re-run setup (e.g. for testing purposes)
      def boot!
        setup do |config|
          config.something = MyApp.config.something
        end

        draw_routes

        super

        setup_waterdrop

        config.finalize!
      end
  end
end

spec/my_app/karafka_app_spec.rb

RSpec.describe MyApp::KarafkaApp do
  def setup_karafka_app
    allow(Karafka::Setup::Config).to receive(:config).and_return(described_class.config.pristine)
    allow(WaterDrop::Config).to receive(:config).and_return(WaterDrop.config.pristine)
    allow(DeliveryBoy).to receive(:config).and_return(DeliveryBoy::Config.new(env: ENV))

    described_class.boot!
  end

  it { expect { setup_karafka_app }.to get_config(:something).from_envvar(:something) }
end

and now setup is approx. the same, but it's not clear what should be used instead of #pristine to get a copy of the config in these specs:

(ruby) described_class.config.concurrency
5
(ruby) described_class.config.deep_dup.concurrency
eval error: undefined method `concurrency' for #<Karafka::Core::Configurable::Node:0x00007f0ea0fe8f48 @name=:root,…

described_class.config.deep_dup.concurrency
                               ^^^^^^^^^^^^

So some method to get a config that could be messed with is needed.

Dependency Dashboard

This issue lists Renovate updates and detected dependencies. Read the Dependency Dashboard docs to learn more.

This repository currently has no open or pending branches.

Detected dependencies

bundler
Gemfile
github-actions
.github/workflows/ci.yml
  • actions/checkout v4
  • ruby/setup-ruby v1
  • actions/checkout v4
  • ruby/setup-ruby v1
  • actions/checkout v4
ruby-version
.ruby-version
  • ruby 3.3.1

  • Check this box to trigger a request for Renovate to run again on this repository

NameError: uninitialized constant WaterDrop::Producer::DummyClient

I'm seeing the following error after upgrading to 2.1.1:

NameError: uninitialized constant WaterDrop::Producer::DummyClient

    class SpecProducerClient < ::WaterDrop::Producer::DummyClient
                                                    ^^^^^^^^^^^^^
from /Users/jordan/.rbenv/versions/3.1.2/lib/ruby/gems/3.1.0/gems/karafka-testing-2.1.1/lib/karafka/testing/spec_producer_client.rb:7:in `<module:Testing>'

It looks like WaterDrop::Producer::DummyClient was renamed to WaterDrop::Clients::Dummy in this PR: karafka/waterdrop#353, but there was no corresponding change in this project to update it.

I see a closed issue here that looks like it may be related, but not 100% sure: #142

Consumer variable error

After upgrading to karafka-testing 2.0.2 I faced the next error:

NameError:
       undefined local variable or method `consumer'

The reason for this is that I had the subject defined without any name
subject { karafka.consumer_for('topic_name') }
but to fix the error you need to specify the name
subject(:consumer) { karafka.consumer_for('topic_name') }
because there is a dependency on consumer variable inside karafka-testing here
https://github.com/karafka/karafka-testing/blob/master/lib/karafka/testing/rspec/helpers.rb#L90

Error in v2.3.1: undefined method `ensure_karafka_initialized!' for Karafka::Testing:Module

Just caught this error in our CI when upgrading karafka-testing via Dependabot.

We don't automatically require our gems in the Gemfile and, following the testing docs, we had only this in spec_helper:

require 'karafka/testing/rspec/helpers'

RSpec.configure do |config|
  config.include Karafka::Testing::RSpec::Helpers
end

Simply adding require 'karafka/testing' as well fixed the issue. Not sure if this is a bug or the documentation should be updated.

Test event production without defining a subject

As reported on Slack

I received a undefined method topic' for #<Model ... error when writing come model specs in my Rails app because my model was producing Kafka events.

The source of the bug seems to be related to this line which forces me to define my test subject as a Karakfa testing consumer instead of an instance on my model class.

But as I do not consume any event from the topic on which I produce the event I am unable to use the consumer_for helper and have to define test double to avoid the bug

@mensfeld wrote this in response to my question

need to decommission the subject from consumer reference and then for consumer specs use the subject but otherwise direct reference

Testing producer code

I want to be able test my code that produces messages. For example, if I have the following method

def test_method(val)
  Karafka.producer.produce_async(
      topic: "test",
      payload: {
        val: val
      }.to_json
  )
end

I would ideally like to test this by checking an event was added to the "test" topic with an appropriate payload. Is there a way to achieve this with the karafka-testing gem? Or maybe an alternative recommended way to test producer code?

Wrong number of arguments in `subject.topic.deserializer`

Hi, thank's for the gem.
Have an issue, may be dependent on using this gem with rails.

described_class.new(selected_topic)
=> wrong number of arguments (given 1, expected 0)
described_class
=> BeaconEventsConsumer

../app/consumers/beacon_events_consumer.rb

class BeaconEventsConsumer < ApplicationConsumer
  # This method just logs parsed data with Karafka logger
  def consume
    Rails.logger.info "Consumed following message: #{params}"

  end
end

../karafka.rb

  consumer_groups.draw do
    consumer_group :batched_group do
      topic :beacon_events do
        consumer BeaconEventsConsumer
        batch_consuming false
      end
    end


  end

MissingClientError on manual offset commit with rspec

My karafka.rb is as follows

class KarafkaApp < Karafka::App
  setup do |config|
    config.kafka.automatically_mark_as_consumed = false
    config.kafka.seed_brokers = Rails.configuration.kafka_broker_list.split(',').map { |broker| "kafka://#{broker}" }
    config.kafka.start_from_beginning = false
    config.batch_fetching = false
    config.client_id = Rails.configuration.kafka_client_id

    config.logger = Rails.logger if ENV['RAILS_LOG_TO_STDOUT'].present?
  end

  Karafka.monitor.subscribe(WaterDrop::Instrumentation::StdoutListener.new)
  Karafka.monitor.subscribe(Karafka::Instrumentation::StdoutListener.new)
  Karafka.monitor.subscribe(Karafka::Instrumentation::ProctitleListener.new)
  Karafka.monitor.subscribe(Karafka::CodeReloader.new(*Rails.application.reloaders))
  Karafka.monitor.subscribe(RollbarListener)

  consumer_groups.draw do
    ...
  end
end

And on my ApplicationConsumer I have

def consume
    @message = (topic.batch_consuming ? params_batch : params.payload)
    mark_as_consumed!(topic.batch_consuming ? @message.last : @message)

    DATADOG.time('process_message') do # rubocop:disable Style/ExplicitBlockArgument
      yield
    end
rescue StandardException => e
  do_some_stuff
end

When running the specs, every single test runs into this error Karafka::Errors::MissingClientError. I can't figure out what's causing it, and given that the documentation says this error shouldn't be raised at all, I don't know how to approach this issue.

Our setup is a Rails project, karafka (v 1.3.1) and karafka-testing (latest).

`certs/cert_chain.pem` permissions are wrong in the gem from rubygems

I have no name!@33477f4db36a:/app$ ls -larth /app/vendor/bundle/ruby/3.1.0/gems/karafka-testing-2.0.6/certs/cert_chain.pem
-rw-------. 1 root root 1.6K Nov  2 09:20 /app/vendor/bundle/ruby/3.1.0/gems/karafka-testing-2.0.6/certs/cert_chain.pem

in the docker image using karafka-testing and

Run cp -r /app/vendor/bundle ./vendor/
cp: cannot open '/app/vendor/bundle/ruby/3.1.0/gems/karafka-testing-2.0.6/certs/cert_chain.pem' for reading: Permission denied
Error: Process completed with exit code 1.

in CI where gems are installed by root and everything is run by unprivileged user.

certs/cert_chain.pem should be a+r (and it is in the repo), it's not a security issue in this case anyway.

Restore the ability to use implicit subject in consumer specs

Follow-up of #106

To avoid situations when calling subject was having side effects it has been decided that the gem will use an explicit consumer reference to keep track of messages sent to Kafka.

This has the downside to require developers to declare a consumer object (or a named subject) in their tests instead of relying on implicit subjects.

In the issue comment thread, I thought about looking at the described_class and testing if it inherits from Karafka::BaseConsumer. If that is the case the gem's helper could try to look at the consumer referenced object or call the subject reference.

If the described_class is not a Karafka::BaseConsumer the helper will only look for the consumer object

karafka-testing 2.1.2 "consumer_for" helper, passed consumer_group not working.

If you have this route config, two consumer groups with same topics.

consumer_group 'A' do
  topic 'a'
  topic 'b'
end
  
consumer_group 'B' do
  topic 'a'
  topic 'b'
end

In testing helper method consumer_for works incorrectly, it returns consumer with consumer group 'A' even though 'B' was passed as the second parameter.

I believe the error is in helpers.rb, coordinator is set the wrong way.

  def _karafka_build_consumer_for(topic, topics_array)
          coordinators = Karafka::Processing::CoordinatorsBuffer.new(
            Karafka::Routing::Topics.new(topics_array)
          )

          consumer = topic.consumer.new
          consumer.producer = Karafka::App.producer
          # Inject appropriate strategy so needed options and components are available
          strategy = Karafka::App.config.internal.processing.strategy_selector.find(topic)
          consumer.singleton_class.include(strategy)
          consumer.client = _karafka_consumer_client
          consumer.coordinator = coordinators.find_or_create(topic.name, 0)
          consumer.coordinator.seek_offset = 0

          consumer
 end

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.