Git Product home page Git Product logo

gush's Introduction

Gush

Gem Version GitHub Actions Workflow Status

Gush is a parallel workflow runner using only Redis as storage and ActiveJob for scheduling and executing jobs.

Theory

Gush relies on directed acyclic graphs to store dependencies, see Parallelizing Operations With Dependencies by Stephen Toub to learn more about this method.

WARNING - version notice

This README is about the latest master code, which might differ from what is released on RubyGems. See tags to browse previous READMEs.

Installation

1. Add gush to Gemfile

gem 'gush', '~> 3.0'

2. Create Gushfile

When using Gush and its CLI commands you need a Gushfile in the root directory. Gushfile should require all your workflows and jobs.

Ruby on Rails

For RoR it is enough to require the full environment:

require_relative './config/environment.rb'

and make sure your jobs and workflows are correctly loaded by adding their directories to autoload_paths, inside config/application.rb:

config.autoload_paths += ["#{Rails.root}/app/jobs", "#{Rails.root}/app/workflows"]

Ruby

Simply require any jobs and workflows manually in Gushfile:

require_relative 'lib/workflows/example_workflow.rb'
require_relative 'lib/jobs/some_job.rb'
require_relative 'lib/jobs/some_other_job.rb'

Example

The DSL for defining jobs consists of a single run method. Here is a complete example of a workflow you can create:

# app/workflows/sample_workflow.rb
class SampleWorkflow < Gush::Workflow
  def configure(url_to_fetch_from)
    run FetchJob1, params: { url: url_to_fetch_from }
    run FetchJob2, params: { some_flag: true, url: 'http://url.com' }

    run PersistJob1, after: FetchJob1
    run PersistJob2, after: FetchJob2

    run Normalize,
        after: [PersistJob1, PersistJob2],
        before: Index

    run Index
  end
end

and this is how the graph will look like:

graph TD
    A{Start} --> B[FetchJob1]
    A --> C[FetchJob2]
    B --> D[PersistJob1]
    C --> E[PersistJob2]
    D --> F[NormalizeJob]
    E --> F
    F --> G[IndexJob]
    G --> H{Finish}

Defining workflows

Let's start with the simplest workflow possible, consisting of a single job:

class SimpleWorkflow < Gush::Workflow
  def configure
    run DownloadJob
  end
end

Of course having a workflow with only a single job does not make sense, so it's time to define dependencies:

class SimpleWorkflow < Gush::Workflow
  def configure
    run DownloadJob
    run SaveJob, after: DownloadJob
  end
end

We just told Gush to execute SaveJob right after DownloadJob finishes successfully.

But what if your job must have multiple dependencies? That's easy, just provide an array to the after attribute:

class SimpleWorkflow < Gush::Workflow
  def configure
    run FirstDownloadJob
    run SecondDownloadJob

    run SaveJob, after: [FirstDownloadJob, SecondDownloadJob]
  end
end

Now SaveJob will only execute after both its parents finish without errors.

With this simple syntax you can build any complex workflows you can imagine!

Alternative way

run method also accepts before: attribute to define the opposite association. So we can write the same workflow as above, but like this:

class SimpleWorkflow < Gush::Workflow
  def configure
    run FirstDownloadJob, before: SaveJob
    run SecondDownloadJob, before: SaveJob

    run SaveJob
  end
end

You can use whatever way you find more readable or even both at once :)

Passing arguments to workflows

Workflows can accept any primitive arguments in their constructor, which then will be available in your configure method.

Let's assume we are writing a book publishing workflow which needs to know where the PDF of the book is and under what ISBN it will be released:

class PublishBookWorkflow < Gush::Workflow
  def configure(url, isbn)
    run FetchBook, params: { url: url }
    run PublishBook, params: { book_isbn: isbn }, after: FetchBook
  end
end

and then create your workflow with those arguments:

PublishBookWorkflow.create("http://url.com/book.pdf", "978-0470081204")

and that's basically it for defining workflows, see below on how to define jobs:

Defining jobs

The simplest job is a class inheriting from Gush::Job and responding to perform method. Much like any other ActiveJob class.

class FetchBook < Gush::Job
  def perform
    # do some fetching from remote APIs
  end
end

But what about those params we passed in the previous step?

Passing parameters into jobs

To do that, simply provide a params: attribute with a hash of parameters you'd like to have available inside the perform method of the job.

So, inside workflow:

(...)
run FetchBook, params: {url: "http://url.com/book.pdf"}
(...)

and within the job we can access them like this:

class FetchBook < Gush::Job
  def perform
    # you can access `params` method here, for example:

    params #=> {url: "http://url.com/book.pdf"}
  end
end

Executing workflows

Now that we have defined our workflow and its jobs, we can use it:

1. Start background worker process

Important: The command to start background workers depends on the backend you chose for ActiveJob. For example, in case of Sidekiq this would be:

bundle exec sidekiq -q gush

Click here to see backends section in official ActiveJob documentation about configuring backends

Hint: gush uses gush queue name by default. Keep that in mind, because some backends (like Sidekiq) will only run jobs from explicitly stated queues.

2. Create the workflow instance

flow = PublishBookWorkflow.create("http://url.com/book.pdf", "978-0470081204")

3. Start the workflow

flow.start!

Now Gush will start processing jobs in the background using ActiveJob and your chosen backend.

4. Monitor its progress:

flow.reload
flow.status
#=> :running|:finished|:failed

reload is needed to see the latest status, since workflows are updated asynchronously.

Advanced features

Pipelining

Gush offers a useful tool to pass results of a job to its dependencies, so they can act differently.

Example:

Let's assume you have two jobs, DownloadVideo, EncodeVideo. The latter needs to know where the first one saved the file to be able to open it.

class DownloadVideo < Gush::Job
  def perform
    downloader = VideoDownloader.fetch("http://youtube.com/?v=someytvideo")

    output(downloader.file_path)
  end
end

output method is used to ouput data from the job to all dependant jobs.

Now, since DownloadVideo finished and its dependant job EncodeVideo started, we can access that payload inside it:

class EncodeVideo < Gush::Job
  def perform
    video_path = payloads.first[:output]
  end
end

payloads is an array containing outputs from all ancestor jobs. So for our EncodeVideo job from above, the array will look like:

[
  {
    id: "DownloadVideo-41bfb730-b49f-42ac-a808-156327989294" # unique id of the ancestor job
    class: "DownloadVideo",
    output: "https://s3.amazonaws.com/somebucket/downloaded-file.mp4" #the payload returned by DownloadVideo job using `output()` method
  }
]

Note: Keep in mind that payloads can only contain data which can be serialized as JSON, because that's how Gush stores them internally.

Dynamic workflows

There might be a case when you have to construct the workflow dynamically depending on the input.

As an example, let's write a workflow which accepts an array of users and has to send an email to each one. Additionally after it sends the e-mail to every user, it also has to notify the admin about finishing.

class NotifyWorkflow < Gush::Workflow
  def configure(user_ids)
    notification_jobs = user_ids.map do |user_id|
      run NotificationJob, params: {user_id: user_id}
    end

    run AdminNotificationJob, after: notification_jobs
  end
end

We can achieve that because run method returns the id of the created job, which we can use for chaining dependencies.

Now, when we create the workflow like this:

flow = NotifyWorkflow.create([54, 21, 24, 154, 65]) # 5 user ids as an argument

it will generate a workflow with 5 NotificationJobs and one AdminNotificationJob which will depend on all of them:

graph TD
    A{Start} --> B[NotificationJob]
    A --> C[NotificationJob]
    A --> D[NotificationJob]
    A --> E[NotificationJob]
    A --> F[NotificationJob]
    B --> G[AdminNotificationJob]
    C --> G
    D --> G
    E --> G
    F --> G
    G --> H{Finish}

Dynamic queue for jobs

There might be a case you want to configure different jobs in the workflow using different queues. Based on the above the example, we want to config AdminNotificationJob to use queue admin and NotificationJob use queue user.

class NotifyWorkflow < Gush::Workflow
  def configure(user_ids)
    notification_jobs = user_ids.map do |user_id|
      run NotificationJob, params: {user_id: user_id}, queue: 'user'
    end

    run AdminNotificationJob, after: notification_jobs, queue: 'admin'
  end
end

Dynamic waitable time for jobs

There might be a case you want to configure a job to be executed after a time. Based on above example, we want to configure AdminNotificationJob to be executed after 5 seconds.

class NotifyWorkflow < Gush::Workflow
  def configure(user_ids)
    notification_jobs = user_ids.map do |user_id|
      run NotificationJob, params: {user_id: user_id}, queue: 'user'
    end

    run AdminNotificationJob, after: notification_jobs, queue: 'admin', wait: 5.seconds
  end
end

Command line interface (CLI)

Checking status

  • of a specific workflow:

    bundle exec gush show <workflow_id>
    
  • of all created workflows:

    bundle exec gush list
    

Vizualizing workflows as image

This requires that you have imagemagick installed on your computer:

bundle exec gush viz <NameOfTheWorkflow>

Customizing locking options

In order to prevent getting the RedisMutex::LockError error when having a large number of jobs, you can customize these 2 fields locking_duration and polling_interval as below

# config/initializers/gush.rb
Gush.configure do |config|
  config.redis_url = "redis://localhost:6379"
  config.concurrency = 5
  config.locking_duration = 2 # how long you want to wait for the lock to be released, in seconds
  config.polling_interval = 0.3 # how long the polling interval should be, in seconds
end

Cleaning up afterwards

Running NotifyWorkflow.create inserts multiple keys into Redis every time it is ran. This data might be useful for analysis but at a certain point it can be purged via Redis TTL. By default gush and Redis will keep keys forever. To configure expiration you need to 2 things. Create initializer (specify config.ttl in seconds, be different per environment).

# config/initializers/gush.rb
Gush.configure do |config|
  config.redis_url = "redis://localhost:6379"
  config.concurrency = 5
  config.ttl = 3600*24*7
end

And you need to call flow.expire! (optionally passing custom TTL value overriding config.ttl). This gives you control whether to expire data for specific workflow. Best NOT to set TTL to be too short (like minutes) but about a week in length. And you can run Client.expire_workflow and Client.expire_job passing appropriate IDs and TTL (pass -1 to NOT expire) values.

Avoid overlapping workflows

Since we do not know how long our workflow execution will take we might want to avoid starting the next scheduled workflow iteration while the current one with same class is still running. Long term this could be moved into core library, perhaps Workflow.find_by_class(klass)

# config/initializers/gush.rb
GUSH_CLIENT = Gush::Client.new
# call this method before NotifyWorkflow.create
def find_by_class klass
  GUSH_CLIENT.all_workflows.each do |flow|
    return true if flow.to_hash[:name] == klass && flow.running?
  end
  return false
end

Contributors

Contributing

  1. Fork it ( http://github.com/chaps-io/gush/fork )
  2. Create your feature branch (git checkout -b my-new-feature)
  3. Commit your changes (git commit -am 'Add some feature')
  4. Push to the branch (git push origin my-new-feature)
  5. Create new Pull Request

gush's People

Contributors

anirbanmu avatar devilankur18 avatar dmitrypol avatar helmuthfilho avatar iacobus avatar joshrpowell avatar keqi avatar krzyzak avatar kzkn avatar mlen avatar mmmaia avatar mofumofu3n avatar nrakochy avatar orthographic-pedant avatar pinglamb avatar pokonski avatar rmorlok avatar saicheg avatar suonlight avatar thukim avatar toreym avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

gush's Issues

Overlapping workflows

I am confused about this section of the README: https://github.com/chaps-io/gush#avoid-overlapping-workflows

Can someone please explain why I should "avoid starting the next scheduled workflow iteration while the current one with same class is still running."? What I'm looking into using gush for explicitly would require having multiple iterations of the same workflow (albeit with different parameters) running concurrently. Is this a problem for gush? Are there some internal assumptions that break if there are multiple versions of a workflow in flight at once?

Error handling

How the Gush is handling (or not) errors that might occur in a Job class? I've tried to use raise, but all of the tasks has passed without an error - nothing has showed up either in Sidekiq or Workflow (the flow.status is returning :running)

Is there any way to integrate Gush with Sidekiq worker handling? Or it is too complicated?

Long-running jobs and human job completion

Hi,

I'm looking to use Gush to manage workflows, and it seems like a good fit, but some of my workflows involve waiting for a human person to signify that a job has been completed. For example, if one or more jobs in a workflow requires the data to be reviewed before continuing, or needs to get more information from someone before continuing.

This not really limited to "human" jobs either, the same issue applies to long-running external tasks. What would solve this is a way to set a job as "waiting for an external event" or trigger before it is enqueued to run in ActiveJob. So in the case of a human review job, the job before it could send an e-mail to the user, then when it gets to the review job it enqueues it but in a paused or waiting state. Then the user gets the e-mail, does what they need to in my app to mark it as reviewed, and then my app would call something that would update the job/workflow state with some new payload data and trigger it to continue. Or in the case of waiting for an external event, my app could be waiting for AWS S3 to publish events in a bucket, and when that happens it could call a Gush API to tell the workflow that that job for that file can continue, and pass the file name as part of the payload.

I think this would be a very intuitive and useful addition to Gush. What do you think?

Many mutex fails after #49000a30fd34ac21cabdc7d

Hey @pokonski !

We start seeing many mutex-failed issues after you commit #49000a30fd34ac21cabdc7d

Problem is that when you have 1000s of really fast-jobs they all start checking if final job should be
enqueued and conflicting too much.

I am thinking that you are actually wrapping wrong part with mutex, client.enqueue_job(workflow_id, out) feels like more right one

def enqueue_outgoing_jobs
  job.outgoing.each do |job_name|
    out = client.find_job(workflow_id, job_name)

     if out.ready_to_start?
       RedisMutex.with_lock("gush_enqueue_outgoing_jobs_#{workflow_id}-#{job_name}", sleep: 0.3, block: 2) do
          client.enqueue_job(workflow_id, out)
        end
      end
  end
end

What do you think? @pokonski

Cannot assign requested address - connect(2) for [::1]:6379

I added Gush in a Rails app along with Sidekiq.

I have added the Gushfile and the 'gush' queue to the sidekiq.yml. When I try to start a workflow I am getting strange Redis errors:

Errno::EADDRNOTAVAIL in NetworksController#create

Cannot assign requested address - connect(2) for [::1]:6379

I know I am missing something but what?

params cast to symbol

 run MyJob, {
      params: {
        "project_id" => opts["project_id"]
      }

--> project_id becomes a symbol, why?

cannot create workflow instance

Hi, I try to create workflow instance at rails console, but i cant

its never ends
image

gush 2.0.1

# app/workflows/sample_workflow.rb
class SampleWorkflow < Gush::Workflow
  def configure
    run SleepJob
  end
end
# app/jobs/sleep_job.rb
class SleepJob < Gush::Job
  def perform
    logger.debug("start")
    sleep(30)
    logger.debug("end")
  end
end
# Gushfile
require_relative './config/environment.rb'
# application.rb
config.autoload_paths += %W[#{config.root}/lib #{Rails.root}/app/workflows #{Rails.root}/app/jobs]

sorry, what should I check?

Depend on user input

Hi,

I've had a look at your workflow gem. It looks really clean and neat.
I do have a question about depending on user input. I've got some workflows that require user input in the middle of the flow. Is there any way to specify in the workflow, that an external update is required?
I understand that this requires me to explicitly push the object in de workflow to the next state after the user input is received.

Cheers,
Benoist

Gush.configure with namespace issue

In the Gush.configure, If we add namespace XXX then sidekiq is not pulling the jobs.

If we add namespace as "gush" then its pulling the job.

Shortcircuit workflows

Let's say we have the following, simple, workflow:

A -> B -> C

And Job A checks for a value, which if false, the whole workflow should be stopped immediately. Is this possible at the moment with Gush?

As an example, let's say we process a new user signup with the above workflow. Job A retrieves the newest user from the database. If the user has been already processed, we'd like to call the workflow complete; the subsequent steps are not necessary anymore, since the user has been already processed.

Redis connection pool investigation

We are trying to use Gush in production and we are constantly gettingthe number of connections exhausted.

If we run 2 workflows that fires ~5 workers each, everything run fine.
If we run 3 workflows, then we hit the connections limit.

I wonder if there is a easy way to calculate how many connections are needed, If I try to run a few thousand workflows, do I need concurrency + constant factor, for example, 10 * 5 connections or do I need thousands connections + (concurrency * constant factor)?

Multiple jobs with the same class name

Hey,
I need to run the same jobs with different parameters, but I have occured a strange behaviour - when I set the run function with the same name, only the last one of the jobs is being fired, the other ones does not.

This is also affecting the after: option, as it is checking if all jobs with given name has been fired. Simple workflow example:

class FetchPricesWorkflow < Gush::Workflow
  def configure(pages)

    start_time = DateTime.now

    for i in 0..pages
      run FetchPricesJob, params: { page_num: i }
    end

    run SummaryJob, after: FetchPricesJob, params: {start_time: start_time}
  end

Can you clarify on this matter? It is a super important feature.

Inline execution

Rails.application.config.active_job.queue_adapter = :inline

Doesn't seem work - gush still uses redis

Make gush handle wokflows with 1000s of jobs

Hello!

First of all, thank you for this great library. We've using this for couple of our project and it's been really great.

Issue i am facing right now is that my workflow with 1000s of jobs is dramatically slow because of gush. Let's put some example here:

class LinkFlow < Gush::Workflow
  def configure(batch_id, max_index)
    map_jobs = []

    100_000.times do
      map_jobs << run Map
    end

    run Reduce, after: map_jobs
  end
end

Playing around gush i found out that after each job completed it has to visit all dependent jobs ( which is Reduce in our case ) and try to enqueue them. But in order to understand if this job can be enqueued gush needs to understand that all dependent ( Map in our case ) jobs are finished.

https://github.com/chaps-io/gush/blob/master/lib/gush/job.rb#L90

Problem with this code right now is that for each Map job after every Map jobs is finished it will call Gush::Client#find_job.

This will produce massive number of SCAN operations and dramatically decrease a performance because of this line of code.

https://github.com/chaps-io/gush/blob/master/lib/gush/client.rb#L119

I am not sure what is best solution in this case. I've tried to solve this problem by changing way of how gush stores serialized jobs. Idea is instead of storing jobs individually to store them on hash for every workflow/job type. Already have my own implemenation, but have to play around with benchmarks around this:

rubyroidlabs@4ee1b15

@pokonski what do you think here?

Project status

Hi maintainers , I've noticed this project and found it extremely useful, but also noticed there haven't been any new releases since March last year and no commits since February.

So it got me wondering whether including it in new project is a sensible idea:

  • Is this project being actively maintained?
  • Is there plan to continue to maintain this project in the future?
  • Is there a roadmap?

Thanks!

Jobs that eventually succeed will not queue downstream jobs

When a job throws an exception, Gush marks it as failed and reraises the error. Job frameworks like Sidekiq retry failed jobs by default. When a job has failed but later succeeds, since Gush marked the job as failed from the first run, downstream jobs that depend on it won't run.

It would be great if there was an option to unset the failed status if a job re-runs, so that if it succeeds, the downstream workflow can continue.

Can I create a dynamic workflow based on job's output?

Hey! Not actually an issue, more of a question really.

So the question is: can I run a job only if it meets a condition based on the output of a previous job (inside the workflow)? It sounds a bit too dynamic to me hahah.

I'm thinking something among the lines of:

class PublishBookWorkflow < Gush::Workflow
  def configure(url, isbn)
    id = run FetchBook, params: { url: url }
    job = find_job(id)
    if job.output == "fetched_with_success"
      run PublishBook, params: { book_isbn: isbn }
    end
  end
end

Although it does not seem doable this way, as the workflow probably won't have the output at that point in time. Maybe something like this:

class PublishBookWorkflow < Gush::Workflow
  def configure(url, isbn)
    run FetchBook, params: { url: url }
    run PublishBook, params: { book_isbn: isbn }, after: FetchBook, if: -> {previous_job.output == "fetched_with_success"}
  end
end

Or should I just check these conditions directly at the start of PublishBook's perform?

Thanks!
Awesome gem by the way.

After gets called n-times

Tried the README's example:

class SimpleWorkflow < Gush::Workflow
  def configure
    run FirstDownloadJob
    run SecondDownloadJob

    run SaveJob, after: [FirstDownloadJob, SecondDownloadJob]
  end
end

SaveJob will be invoked twice:

2017-11-04T15:51:12.522Z 5060 TID-ovvcwd854 Gush::Worker JID-f3bddd0cf26cd11492d97a6b INFO: start
2017-11-04T15:51:12.527Z 5060 TID-ovvcwd95s Gush::Worker JID-beda8811a9c7976ef33abd41 INFO: start
SecondDownloadJob#perform {}
FirstDownloadJob#perform {}
2017-11-04T15:51:12.569Z 5060 TID-ovvcwcmx8 Gush::Worker JID-18a48a9a6ce45b6aee78bb35 INFO: start
2017-11-04T15:51:12.571Z 5060 TID-ovvcwd854 Gush::Worker JID-f3bddd0cf26cd11492d97a6b INFO: done: 0.049 sec
2017-11-04T15:51:12.580Z 5060 TID-ovvcwd9o4 Gush::Worker JID-b98176b881db8a61e8f7cdb9 INFO: start
2017-11-04T15:51:12.586Z 5060 TID-ovvcwd95s Gush::Worker JID-beda8811a9c7976ef33abd41 INFO: done: 0.059 sec
SaveJob#perform {}
2017-11-04T15:51:12.597Z 5060 TID-ovvcwcmx8 Gush::Worker JID-18a48a9a6ce45b6aee78bb35 INFO: done: 0.029 sec
SaveJob#perform {}
2017-11-04T15:51:12.601Z 5060 TID-ovvcwd9o4 Gush::Worker JID-b98176b881db8a61e8f7cdb9 INFO: done: 0.02 sec

Not able to access Rails project classes in jobs

Whats the recommended way to run Gush in a Rails (api) project?

I'm trying:

APP_PATH = File.expand_path('./config/application', __FILE__)
require_relative './config/boot'
require_relative './config/application'

Dir[Rails.root.join("app/workflows/**/*.rb")].each do |file|
  require file
end
Dir[Rails.root.join("app/jobs/**/*.rb")].each do |file|
  require file
end

But is not working properly, every job just fail on start because I'm using clasess of my rails proj in the jobs (like Services or Models).

Stack:

  • rails 5 (master) api
  • sidekiq 4

Any suggestion?

Handling workflow failures

It would be nice to have a callback method of some sort if the workflow would fail at some point - that would help implementing error logging/reporting (sending a webhook of some sort on fail just as an example).

bundle exec viz workflow with parameters

How can I run viz for a workflow with parameters?

When I try to run viz I am getting:

ArgumentError: wrong number of arguments (given 0, expected 1)

My workflow is:

class AssessmentWorkflow < Gush::Workflow
  def configure(assessment_id)
    run Job1, params: {assessment_id: assessment_id}
    run Job2, params: {assessment_id: assessment_id}, after: Job1
    run Job3, params: {assessment_id: assessment_id}, after: Job2
  end
end

Possible race condition?

I read the code, and couldn't figure out, how you handle a race condition when job C is dependent on jobs A and B which finish at the same time.

Update Redis gem

gush enforces Redis < 4.0 whereas there seems no problem at all using new version 4.0.

Logging the jobs

Hey - is there any way to log some things in jobs/workflow by integrating with Sidekiq logger method? I'd like to save some information while performing the tasks.

running unique jobs?

This is more of a question than an issue, but after reading the announcement I'm very excited about the option of using gush:

  1. Is it possible to enforce jobs only being run once (for a certain amount of time), similar to the "unique jobs" feature in Sidekiq Enterprise?

Here's an example of my use case: my app parses a stream incoming tweets, and for each url in a tweet it will follow the redirects of the url (it might be a short url), then parse the html-file, then search for images, then download these images. If multiple tweets within a batch contain the same url, I do not want to re-query this url (within a certain amount of time, if I re-encounter it a day later, well then I will re-query it).

Currently (with Sidekiq 4.1) race conditions of multiple parallel workers cause urls to be enqueued in parallel. I certainly could save that an url is enqueued in sql/redis and check for that when queueing, but this adds complexity to the code. Also this is just one, simplified example.

What I'm looking for is a way to enqueue the processing of an array of tweets, process them in parallel (including optional followup-jobs such as parsing the html, extracting keywords, downloading images), and once all tweets have completed with all these possible sub-routines, followup with another job. Therefor:

  1. Can I design a workflow to not run a job after all batches of the previous job have completed, but after all batches of the previous job plus all followup-jobs of these batches have completed?

(Having each job enqueue followup-jobs also gives me the advantage that while some workers may still process tweets, others might parse a html-document, and others might already process images. This is helpful as it will not first run everything that's CPU-bound to then run everything that's I/O bound, and I'd prefer to spread this out even more, but am unsure how I could do so)

Is this a use case where gush will help? Thanks a lot.

Update: it looks as if my second question is related to #26 .

Project Idea - Web UI

One of the difficulties we face when running complex workflows is monitoring them in real-time, seeing the whole picture per specific executions and/or investigating what went wrong.

These are some of my thoughts on the capabilities:

  • List all workflow executions in a table with timestamps and a status
  • Monitor individual execution:
    • Interactive graph with colors indicating job statuses (red failure, green success etc.)
    • Display useful information about jobs (duration, input/output, logs)
    • Ability to do ad-hoc operations with individual jobs within a running workflow - place it "On Hold", "Mark as Success", "Cancel" etc.

I started fiddling around a bit on creating a dependency manager + Admin UI before I came across this gem. I'm curious would this be a feature desired as an enhancement for Gush?

Conditionally execution

In continuation of the conversation at issue #41

May be it possible to run Job only that satisfied condition. Due to in runtime, it may be not need to execute. The api can looks like:

class SupplierWorkflow < Gush::Workflow
  def configure
    run AskSupplierAboutAvailableAmount
    run TryToOrderProducts, should(or condition): ->(previous_result, passed_params) { true }
  end
end

All this questions about to adopt this gem for complex workflows, that I have in project, where they are not linear, and branching based on conditions or failures.

P.S.
When do jobs create? When method .start! on workflow ran all described jobs creates or after previous jobs executed, next creates and put to the queue?

Accessing job output inside workflow

I have one job that downloads file and then I queue multiple jobs to process each row in the file. How do I access the file path inside the Workflow (not descendant job) so I can loop through it? I need to get payloads output and assign it to variable but after studying the code I do not see a solution.

class DownloadCsvJob < Gush::Job
  def perform
    output('path/to/data.csv')
  end
end
class ImportWorkflow < Gush::Workflow
  def configure    
    run DownloadCsvJob
    csv_jobs = CSV.foreach("path/to/data.csv").map do |row|
      run ImportCsvRowJob, params: row, after: DownloadCsvJob
    end
    run GenReportJob, after: csv_jobs
  end
end

Another use case is when one job performs some kind of check and the subsequent job(s) are queued based on the results of that.

Thank you very much.

include modules instead of inheritance.

Hey,

This looks pretty cool. One improvement would be to include modules to decorate the classes instead of using inheritance.
Here is what i have in mind:

# workflows/sample_workflow.rb
class SampleWorkflow 
  include Gush::Workflow
  gush do
    run FetchJob1
    run FetchJob2, params: {some_flag: true, url: 'http://url.com'}

    run PersistJob1, after: FetchJob1
    run PersistJob2, after: FetchJob2

    run Normalize,
        after: [PersistJob1, PersistJob2],
        before: Index

    run Index
  end
end
class FetchJob1
  include Gush::Job
  def self.perform(params={})
    # do some fetching from remote APIs

    params #=> {url: "http://some.com/url"}
  end
end

what do you think?

Is it possible to describe negative behaviour

For example, how to branching workflow and run described job if some job in workflow failed or event workflow failed?

How is it difficult to implement and have it sense?
Because just rerun failed job not that need. Usually jobs can work with third party services and that services can responded with errors.
At such case when job failed workflow should continue executing due it know how to process such error and even wait for error.

Let me provide synthetic example:

class SupplierWorkflow < Gush::Workflow
  def configure()
    run AskSupplierAboutAvailableAmount
    run TryToOrderProducts

    run OrderSuccess, after_success: TryToOrderProducts
    run OrderFail, after_fail: TryToOrderProducts

    # one of them never executes
    run SendMailReport, after_or: [OrderSuccess, OrderFail] 
  end
end

may be related issues

#18
#20

Workflow#reload not mutating workflow instance

I've found either a bug in the code, or a bug in the docs. The README says:

screen shot 2017-05-25 at 14 53 26

However, when I try calling flow.reload, it doesn't mutate the flow instance status. Instead, I have to run flow.reload.status to get an up-to-date status:

> flow = MyWorkflow.create
> flow.status
 => :running 
> flow.start!
> flow.reload
> flow.status
=> :running
> flow.reload.status
=> :finished

Wrong workflow status after failed jobs

Currently if any jobs will fail (by throwing an exception in a job), the status of the workflow is not properly updated:

  • flow.status is returning :running
  • flow.failed? is returning false

Processing job sequentially

Is it possible to process jobs sequentially, based on arguments?

Example jobs:

Jobs: [User A, Operation A], [User B, Operation B], [User A, Operation C], [User B, Operation D], .. stream of jobs

Process all jobs of each user sequentially, but concurrently across users.

Example:

  1. Sequentially execute all jobs for User A: Operation A, Operation C
  2. Sequentially execute all jobs for User B: Operation B, Operation D

However (1), (2) get executed concurrently.

I couldn't make out from the documentation if its possible. Any pointers would be much appreciated.

Listing all workflows

In Rails, Gush::Client.new.all_workflows explodes in params TypeError: no implicit conversion of Symbol into Integer (and so does gush list)

class MyWorkflow < Gush::Workflow
  def configure(opts={})
    run MyJob1, {
      params: {
        project_id: opts[:project_id],
        version: opts[:version]
      }
    }
    run MyJob2, {
      after: MyJob1,
      params: {
        project_id: opts[:project_id]
      }
    }
  end
end

Running Workflow Inside of Workflow

Is it possible to run a workflow inside of a workflow?

I have a couple of Jobs that run in sequential order but then I have a step where I can fan out to multiple workers. The problem is I don't know the number of workers I need ahead of time in that parallel step and I need to bring them all back together in one step before continuing in the workflow. I've gone through the code and I feel like I'm missing something like a callback to say that a workflow is finished.

Scheduling

So is scheduling/periodic workflows/jobs something that might be inside gush or should one look into rufus etc to implement it?

Gush falls down under large-scale workflows

Hi, I was super excited to integrate gush into my latest project, but just towards the end I hit a wall and cannot proceed any further.

The basis for my issue is twofold; Gush#configure must be deterministic because it runs on every job setup, and when a workflow has thousands of jobs, execution becomes so slow as to defeat the benefit of parallelism.

My gush workflow has ~4000 jobs, and the dependency graph is quite deep. The process of generating the graph each time a job is started causes jobs to take several seconds to actually execute. I experimented with caching the dependency graph in redis, but that actually makes the problem worse, not better, because of the serialization/deserialization overhead.

So, in conclusion, Gush is not able to handle workflows multiple orders of magnitude larger than the example. It seems that 10s or 100s of nodes is doable, but the overhead to loading a workflow with 1000s of nodes is too much.

Have you any thoughts or experience with such huge workflows? Advice? I'm back to the drawing board, maybe cooking up something with basic Sidekiq more suited to my needs.

Scaling to thousands of jobs in workflow

I really like gush library and want to use it for is tasks like importing records. The workflow will not be very complex but there will be LOTS (many thousands) of small identical jobs. I am not sure how this library will scale. After studying code I see that it stores ALL jobs inside workflow Redis record. That record will get huge and serializing it to/from JSON will slow things down.

I realize that this is probably not a small change but is it possible to use gush.jobs.WORKFLOW_ID.ImportCsvRowJob-JOB_ID keys instead? Why does job data need to be duplicated inside workflow key AND job keys? Looking in workflow.rb class I see how it looks up jobs but is it possible to look up separate job keys (they have workflow_id in the key).

Has anyone tried something like that? Thank you very much.

Here is my basic workflow:

class ImportWorkflow < Gush::Workflow
  def configure    
    run DownloadCsvJob
    csv_jobs = CSV.foreach("path/to/data.csv").map do |row|
      run ImportCsvRowJob, params: row, after: DownloadCsvJob
    end
    run GenReportJob, after: csv_jobs
  end
end

Activejob callbacks not available in the Gush::Job

The Gush::Job should be inherited by Active job, so it would be possible to extend for after/before callbacks. Is there a way to inherit from Gush::Worker and set before/after callbacks. Might make sense to merge Job/Worker classes in to one.

Update Gem on RubyGems

The recent commit (609f19d) bumped the activejob dependency which allows it to work with rails 6+.

However this commit is not present on the latest gem available RubyGems.
This throws dependency issue with activejob while using the gem directly from rubygems.

gem 'gush', '~> 2.0', '>= 2.0.1'

Workaround is to specify the git commit

gem 'gush', github: 'chaps-io/gush', ref: '609f19d47042a37c33d21997d1cde8947ab021c0'

Please update the gem on rubygems :)

Screenshot 2020-03-17 at 3 34 10 PM

cc - @pokonski

unable to run

hi there, noob question.

I have set up gush inside my rails4 project. I attempt to start! (3 times), but i the workflow seems stuck in :running, and i'm not sure what's happening with it.

(1) set up test workflow (below).
(2) I started the 'be gush workers' (and i see sidekiq started with the 'gush' queue)
(3) in a rails console, i

test = GushTestWorkflow.new
test.start!
test.status ( = :running)
[ruby-2.3.3]sb/sidekiq-concurrent-queues:datawarehouse $ bundle exec gush list
+--------------------------------------+------------------+----------------+
|                  id                  |       name       |     status     |
+--------------------------------------+------------------+----------------+
| 924986e4-116f-4ce3-bc74-be445c08f9e4 | GushTestWorkflow | ready to start |
| b0eab0c9-3427-4666-8f9f-85a0b49400a3 | GushTestWorkflow | ready to start |
| f6b30370-b5f8-406c-910c-e5142716dd9a | GushTestWorkflow | ready to start |
+--------------------------------------+------------------+----------------+
class GushTestWorkflow < Gush::Workflow
  def configure
    run GushTestSeqJob, before: [GushTestParallelJob]

    run GushTestParallelJob, after: GushTestSeqJob

    run GushTestSeqJob, after: [GushTestParallelJob]
  end
end
class GushTestSeqJob < Gush::Job
  def work
    Rails.logger.debug("Running TestSeqJob. #{params} - TID #{Thread.current.object_id.to_s(36)}")
    sleep 45
    Rails.logger.debug("Running TestSeqJob #{params} - TID #{Thread.current.object_id.to_s(36)}")
  end
end
class GushTestParallelJob < Gush::Job
  def work
    Rails.logger.debug("Running TestParallelJob #{params} - TID #{Thread.current.object_id.to_s(36)}")
    sleep 45
    Rails.logger.debug("Running TestParallelJob #{params} - TID #{Thread.current.object_id.to_s(36)}")
  end
end

sidekiq gem dependency

4.x sidekiq has released, but up till now in gush's gem dependency:
sidekiq '~> 3.3.4'

can gush update the dependency?

Detailed job status?

Hi,

I use Sidekiq workers already, but Gush appeals to me because Sidekiq doesn't have a way to define such workflows/dependencies. Since Gush "just uses Sidekiq" is it compatible with Sidekiq plugins? Specifically, I would like a way to track progress/status of jobs. Let's say I have a job that is going to process 100 files, I would like to be able to add logic into my process method to count how many files have been done at any given moment, and store it in Redis so that I can query it while the job or workflow runs.

For some tasks, this will mean keeping track of the progress of a number of serial/parallel jobs, for others I want progress of an individual job. Is there a way to do this with Gush, if not, could it be planned for the future?

Thanks for listening,
Justin

first tryout, can't start Gush server

Hi,
I'm trying Gush out for the first time, followed the Readme but either with Rails app or simple Ruby I got this message when running "bundle exec gush workers"

/var/lib/gems/2.0.0/gems/gush-0.1.1/lib/gush/cli.rb:134:in load_gushfile': undefined methodexist?' for nil:NilClass (NoMethodError)

my Gushfile.rb is in the root of my project, even though with Rails app it shouldn't be necessary as stated in Readme

trying with a non Rails app my Gushfile looks like this :

require 'gush'

Dir["#{File.dirname(__FILE__)}/app/workflows/**/*.rb"].each { |f| puts f; require f }
Dir["#{File.dirname(__FILE__)}/app/jobs/**/*.rb"].each { |f| puts f; require f }

but still same error
any ideas anyone ?

I'm using ruby 2.0.0p384 (2014-01-12) [x86_64-linux-gnu]

ty

Use keyword arguments rather than positional arguments.

One thing that might make this library a bit easier to use would be keyword arguments to configure a Workflow, rather than positional arguments. A quick look at the code suggests this is already doable (as the arguments are being collected by * and splatted back out when passing them along. So maybe just a tweak or mention in the README?

Thoughts?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.