Git Product home page Git Product logo

oban's Introduction

Oban logo

Robust job processing in Elixir, backed by modern PostgreSQL or SQLite3. Reliable,
observable, and loaded with enterprise grade features.

Hex Version Hex Docs CI Status Apache 2 License

Table of Contents


Note

This README is for the unreleased main branch, please reference the official documentation on hexdocs for the latest stable release.


Features

Oban's primary goals are reliability, consistency and observability.

Oban is a powerful and flexible library that can handle a wide range of background job use cases, and it is well-suited for systems of any size. It provides a simple and consistent API for scheduling and performing jobs, and it is built to be fault-tolerant and easy to monitor.

Oban is fundamentally different from other background job processing tools because it retains job data for historic metrics and inspection. You can leave your application running indefinitely without worrying about jobs being lost or orphaned due to crashes.

Advantages Over Other Tools

  • Fewer Dependencies β€” If you are running a web app there is a very good chance that you're running on top of a RDBMS. Running your job queue within a SQL database minimizes system dependencies and simplifies data backups.

  • Transactional Control β€” Enqueue a job along with other database changes, ensuring that everything is committed or rolled back atomically.

  • Database Backups β€” Jobs are stored inside of your primary database, which means they are backed up together with the data that they relate to.

Advanced Features

  • Isolated Queues β€” Jobs are stored in a single table but are executed in distinct queues. Each queue runs in isolation, ensuring that a job in a single slow queue can't back up other faster queues.

  • Queue Control β€” Queues can be started, stopped, paused, resumed and scaled independently at runtime locally or across all running nodes (even in environments like Heroku, without distributed Erlang).

  • Resilient Queues β€” Failing queries won't crash the entire supervision tree, instead a backoff mechanism will safely retry them again in the future.

  • Job Canceling β€” Jobs can be canceled in the middle of execution regardless of which node they are running on. This stops the job at once and flags it as cancelled.

  • Triggered Execution β€” Insert triggers ensure that jobs are dispatched on all connected nodes as soon as they are inserted into the database.

  • Unique Jobs β€” Duplicate work can be avoided through unique job controls. Uniqueness can be enforced at the argument, queue, worker and even sub-argument level for any period of time.

  • Scheduled Jobs β€” Jobs can be scheduled at any time in the future, down to the second.

  • Periodic (CRON) Jobs β€” Automatically enqueue jobs on a cron-like schedule. Duplicate jobs are never enqueued, no matter how many nodes you're running.

  • Job Priority β€” Prioritize jobs within a queue to run ahead of others with ten levels of granularity.

  • Historic Metrics β€” After a job is processed the row isn't deleted. Instead, the job is retained in the database to provide metrics. This allows users to inspect historic jobs and to see aggregate data at the job, queue or argument level.

  • Node Metrics β€” Every queue records metrics to the database during runtime. These are used to monitor queue health across nodes and may be used for analytics.

  • Graceful Shutdown β€” Queue shutdown is delayed so that slow jobs can finish executing before shutdown. When shutdown starts queues are paused and stop executing new jobs. Any jobs left running after the shutdown grace period may be rescued later.

  • Telemetry Integration β€” Job life-cycle events are emitted via Telemetry integration. This enables simple logging, error reporting and health checkups without plug-ins.

Oban Web+Pro

Tip

A web dashboard for managing Oban, along with an official set of extensions, plugins, and workers that expand what Oban is capable are available as licensed packages:

Learn more at getoban.pro!

Requirements

Oban requires Elixir 1.13+, Erlang 23+, and PostgreSQL 12.0+ or SQLite3 3.37.0+.

Installation

See the installation guide for details on installing and configuring Oban in your application.

Running with SQLite3

Oban ships with engines for PostgreSQL and SQLite3. Both engines support the same core functionality for a single node, while the Postgres engine is more advanced and designed to run in a distributed environment.

Running with SQLite3 requires adding ecto_sqlite3 to your app's dependencies and setting the Oban.Engines.Lite engine:

config :my_app, Oban,
  engine: Oban.Engines.Lite,
  queues: [default: 10],
  repo: MyApp.Repo

High Concurrency Systems {: .warning}

SQLite3 may not be suitable for high-concurrency systems or for systems that need to handle large amounts of data. If you expect your background jobs to generate high loads, it would be better to use a more robust database solution that supports horizontal scalability, like Postgres.

Configuring Queues

Queues are specified as a keyword list where the key is the name of the queue and the value is the maximum number of concurrent jobs. The following configuration would start four queues with concurrency ranging from 5 to 50:

config :my_app, Oban,
  queues: [default: 10, mailers: 20, events: 50, media: 5],
  repo: MyApp.Repo

You may also use an expanded form to configure queues with individual overrides:

queues: [
  default: 10,
  events: [limit: 50, paused: true]
]

The events queue will now start in a paused state, which means it won't process anything until Oban.resume_queue/2 is called to start it.

There isn't a limit to the number of queues or how many jobs may execute concurrently in each queue. Some additional guidelines:

Caveats & Guidelines

  • Each queue will run as many jobs as possible concurrently, up to the configured limit. Make sure your system has enough resources (i.e. database connections) to handle the concurrent load.

  • Queue limits are local (per-node), not global (per-cluster). For example, running a queue with a local limit of one on three separate nodes is effectively a global limit of three. If you require a global limit you must restrict the number of nodes running a particular queue.

  • Only jobs in the configured queues will execute. Jobs in any other queue will stay in the database untouched.

  • Be careful how many concurrent jobs make expensive system calls (i.e. FFMpeg, ImageMagick). The BEAM ensures that the system stays responsive under load, but those guarantees don't apply when using ports or shelling out commands.

Defining Workers

Worker modules do the work of processing a job. At a minimum they must define a perform/1 function, which is called with an %Oban.Job{} struct.

Note that the args field of the job struct will always have string keys, regardless of the key type when the job was enqueued. The args are stored as JSON and the serialization process automatically stringifies all keys. Also, because args are always encoded as JSON, you must ensure that all values are serializable, otherwise you'll have encoding errors when inserting jobs.

Define a worker to process jobs in the events queue:

defmodule MyApp.Business do
  use Oban.Worker, queue: :events

  @impl Oban.Worker
  def perform(%Oban.Job{args: %{"id" => id} = args}) do
    model = MyApp.Repo.get(MyApp.Business.Man, id)

    case args do
      %{"in_the" => "business"} ->
        IO.inspect(model)

      %{"vote_for" => vote} ->
        IO.inspect([vote, model])

      _ ->
        IO.inspect(model)
    end

    :ok
  end
end

The use macro also accepts options to customize max_attempts, priority, tags, unique, and replace options:

defmodule MyApp.LazyBusiness do
  use Oban.Worker,
    queue: :events,
    priority: 3,
    max_attempts: 3,
    tags: ["business"],
    unique: true,
    replace: [scheduled: [:scheduled_at]]

  @impl Oban.Worker
  def perform(_job) do
    # do business slowly

    :ok
  end
end

Like all use macros, options are defined at compile time. Avoid using Application.get_env/2 to define worker options. Instead, pass dynamic options at runtime by passing them to MyWorker.new/2:

MyApp.MyWorker.new(args, queue: dynamic_queue)

Successful jobs should return :ok or an {:ok, value} tuple. The value returned from perform/1 is used to control whether the job is treated as a success, a failure, cancelled or deferred for retrying later.

See the Oban.Worker docs for more details on failure conditions and Oban.Telemetry for details on job reporting.

Enqueueing Jobs

Jobs are simply Ecto structs and are enqueued by inserting them into the database. For convenience and consistency all workers provide a new/2 function that converts an args map into a job changeset suitable for insertion:

%{id: 1, in_the: "business", of_doing: "business"}
|> MyApp.Business.new()
|> Oban.insert()

The worker's defaults may be overridden by passing options:

%{id: 1, vote_for: "none of the above"}
|> MyApp.Business.new(queue: :special, max_attempts: 5)
|> Oban.insert()

Unique jobs can be configured in the worker, or when the job is built:

%{email: "[email protected]"}
|> MyApp.Mailer.new(unique: false)
|> Oban.insert()

Job priority can be specified using an integer from 0 to 9, with 0 being the default and highest priority:

%{id: 1}
|> MyApp.Backfiller.new(priority: 2)
|> Oban.insert()

Any number of tags can be added to a job dynamically, at the time it is inserted:

id = 1

%{id: id}
|> MyApp.OnboardMailer.new(tags: ["mailer", "record-#{id}"])
|> Oban.insert()

Multiple jobs can be inserted in a single transaction:

Ecto.Multi.new()
|> Oban.insert(:b_job, MyApp.Business.new(%{id: 1}))
|> Oban.insert(:m_job, MyApp.Mailer.new(%{email: "[email protected]"}))
|> Repo.transaction()

Occasionally you may need to insert a job for a worker that exists in another application. In that case you can use Oban.Job.new/2 to build the changeset manually:

%{id: 1, user_id: 2}
|> Oban.Job.new(queue: :default, worker: OtherApp.Worker)
|> Oban.insert()

Oban.insert/2,4 is the preferred way of inserting jobs as it provides some of Oban's advanced features (i.e., unique jobs). However, you can use your application's Repo.insert/2 function if necessary.

See Oban.Job.new/2 for a full list of job options.

Scheduling Jobs

Jobs may be scheduled down to the second any time in the future:

%{id: 1}
|> MyApp.Business.new(schedule_in: 5)
|> Oban.insert()

Jobs may also be scheduled at a specific datetime in the future:

%{id: 1}
|> MyApp.Business.new(scheduled_at: ~U[2020-12-25 19:00:56.0Z])
|> Oban.insert()

Scheduling is always in UTC. You'll have to shift timestamps in other zones to UTC before scheduling:

%{id: 1}
|> MyApp.Business.new(scheduled_at: DateTime.shift_zone!(datetime, "Etc/UTC"))
|> Oban.insert()

Caveats & Guidelines

Usually, scheduled job management operates in global mode and notifies queues of available jobs via PubSub to minimize database load. However, when PubSub isn't available, staging switches to a local mode where each queue polls independently.

Local mode is less efficient and will only happen if you're running in an environment where neither Postgres nor PG notifications work. That situation should be rare and limited to the following conditions:

  1. Running with a connection pooler, i.e., pg_bouncer, in transaction mode.
  2. Running without clustering, i.e., without Distributed Erlang

If both of those criteria apply and PubSub notifications won't work, then staging will switch to polling in local mode.

Prioritizing Jobs

Normally, all available jobs within a queue are executed in the order they were scheduled. You can override the normal behavior and prioritize or de-prioritize a job by assigning a numerical priority.

  • Priorities from 0-9 are allowed, where 0 is the highest priority and 9 is the lowest.

  • The default priority is 0, unless specified all jobs have an equally high priority.

  • All jobs with a higher priority will execute before any jobs with a lower priority. Within a particular priority jobs are executed in their scheduled order.

Caveats & Guidelines

The default priority is defined in the jobs table. The least intrusive way to change it for all jobs is to change the column default:

alter table("oban_jobs") do
  modify :priority, :integer, default: 1, from: {:integer, default: 0}
end

Unique Jobs

The unique jobs feature lets you specify constraints to prevent enqueueing duplicate jobs. Uniqueness is based on a combination of job attribute based on the following options:

  • :period β€” The number of seconds until a job is no longer considered duplicate. You should always specify a period, otherwise Oban will default to 60 seconds. :infinity can be used to indicate the job be considered a duplicate as long as jobs are retained.

  • :fields β€” The fields to compare when evaluating uniqueness. The available fields are :args, :queue, :worker, and :meta. By default, fields is set to [:worker, :queue, :args].

  • :keys β€” A specific subset of the :args or :meta to consider when comparing against historic jobs. This allows a job with multiple key/value pairs in the args to be compared using only a subset of them.

  • :states β€” The job states that are checked for duplicates. The available states are :available, :scheduled, :executing, :retryable, :completed, :cancelled and :discarded. By default all states except for :discarded and :cancelled are checked, which prevents duplicates even if the previous job has been completed.

  • :timestamp β€” Which timestamp to check the period against. The available timestamps are :inserted_at or :scheduled_at, and it defaults to :inserted_at for legacy reasons.

The simplest form of uniqueness will configure uniqueness for as long as a matching job exists in the database, regardless of state:

use Oban.Worker, unique: true

Configure the worker to be unique only for 60 seconds:

use Oban.Worker, unique: [period: 60]

Check the :scheduled_at timestamp instead of :inserted_at for uniqueness:

use Oban.Worker, unique: [period: 120, timestamp: :scheduled_at]

Only consider the :url key rather than the entire args:

use Oban.Worker, unique: [keys: [:url]]

Use Oban.Job.states/0 to specify uniqueness across all states, including cancelled and discarded:

use Oban.Worker, unique: [period: :infinity, states: Oban.Job.states()]

Detecting Unique Conflicts

When unique settings match an existing job, the return value of Oban.insert/2 is still {:ok, job}. However, you can detect a unique conflict by checking the jobs' :conflict? field. If there was an existing job, the field is true; otherwise it is false.

You can use the :conflict? field to customize responses after insert:

case Oban.insert(changeset) do
  {:ok, %Job{id: nil, conflict?: true}} ->
    {:error, :failed_to_acquire_lock}

  {:ok, %Job{conflict?: true}} ->
    {:error, :job_already_exists}

  result ->
    result
end

Note that conflicts are only detected for jobs enqueued through Oban.insert/2,3. Jobs enqueued through Oban.insert_all/2 do not use per-job unique configuration.

Replacing Values

In addition to detecting unique conflicts, passing options to replace can update any job field when there is a conflict. Any of the following fields can be replaced per state: args, max_attempts, meta, priority, queue, scheduled_at, tags, worker.

For example, to change the priority and increase max_attempts when there is a conflict with a job in a scheduled state:

BusinessWorker.new(
  args,
  max_attempts: 5,
  priority: 0,
  replace: [scheduled: [:max_attempts, :priority]]
)

Another example is bumping the scheduled time on conflict. Either scheduled_at or schedule_in values will work, but the replace option is always scheduled_at.

UrgentWorker.new(args, schedule_in: 1, replace: [scheduled: [:scheduled_at]])

NOTE: If you use this feature to replace a field (e.g. args) in the executing state by doing something like: UniqueWorker.new(new_args, replace: [executing: [:args]]) Oban will update the args, but the job will continue executing with the original value.

Strong Guarantees

Unique jobs are guaranteed through transactional locks and database queries: they do not rely on unique constraints in the database. This makes uniqueness entirely configurable by application code, without the need for database migrations.

Testing

Find testing setup, helpers, and strategies in the testing guide.

Pruning Historic Jobs

Job stats and queue introspection are built on keeping job rows in the database after they have completed. This allows administrators to review completed jobs and build informative aggregates, at the expense of storage and an unbounded table size. To prevent the oban_jobs table from growing indefinitely, Oban provides active pruning of completed, cancelled and discarded jobs.

By default, the Pruner plugin retains jobs for 60 seconds. You can configure a longer retention period by providing a max_age in seconds to the Pruner plugin.

# Set the max_age for 5 minutes
config :my_app, Oban,
  plugins: [{Oban.Plugins.Pruner, max_age: 300}]
  ...

Caveats & Guidelines

  • Pruning is best-effort and performed out-of-band. This means that all limits are soft; jobs beyond a specified age may not be pruned immediately after jobs complete.

  • Pruning is only applied to jobs that are completed, cancelled or discarded. It'll never delete a new job, a scheduled job or a job that failed and will be retried.

Periodic Jobs

Oban's Cron plugin registers workers a cron-like schedule and enqueues jobs automatically. Periodic jobs are declared as a list of {cron, worker} or {cron, worker, options} tuples:

config :my_app, Oban,
  repo: MyApp.Repo,
  plugins: [
    {Oban.Plugins.Cron,
     crontab: [
       {"* * * * *", MyApp.MinuteWorker},
       {"0 * * * *", MyApp.HourlyWorker, args: %{custom: "arg"}},
       {"0 0 * * *", MyApp.DailyWorker, max_attempts: 1},
       {"0 12 * * MON", MyApp.MondayWorker, queue: :scheduled, tags: ["mondays"]},
       {"@daily", MyApp.AnotherDailyWorker}
     ]}
  ]

The crontab would insert jobs as follows:

  • MyApp.MinuteWorker β€” Inserted once every minute
  • MyApp.HourlyWorker β€” Inserted at the first minute of every hour with custom args
  • MyApp.DailyWorker β€” Inserted at midnight every day with no retries
  • MyApp.MondayWorker β€” Inserted at noon every Monday in the "scheduled" queue
  • MyApp.AnotherDailyWorker β€” Inserted at midnight every day with no retries

The crontab format respects all standard rules and has one minute resolution. Jobs are considered unique for most of each minute, which prevents duplicate jobs with multiple nodes and across node restarts.

Like other jobs, recurring jobs will use the :queue specified by the worker module (or :default if one is not specified).

Cron Expressions

Standard Cron expressions are composed of rules specifying the minutes, hours, days, months and weekdays. Rules for each field are comprised of literal values, wildcards, step values or ranges:

  • * β€” Wildcard, matches any value (0, 1, 2, ...)
  • 0 β€” Literal, matches only itself (only 0)
  • */15 β€” Step, matches any value that is a multiple (0, 15, 30, 45)
  • 0-5 β€” Range, matches any value within the range (0, 1, 2, 3, 4, 5)
  • 0-9/2 - Step values can be used in conjunction with ranges (0, 2, 4, 6, 8)

Each part may have multiple rules, where rules are separated by a comma. The allowed values for each field are as follows:

  • minute β€” 0-59
  • hour β€” 0-23
  • days β€” 1-31
  • month β€” 1-12 (or aliases, JAN, FEB, MAR, etc.)
  • weekdays β€” 0-6 (or aliases, SUN, MON, TUE, etc.)

The following Cron extensions are supported:

  • @hourly β€” 0 * * * *
  • @daily (as well as @midnight) β€” 0 0 * * *
  • @weekly β€” 0 0 * * 0
  • @monthly β€” 0 0 1 * *
  • @yearly (as well as @annually) β€” 0 0 1 1 *
  • @reboot β€” Run once at boot across the entire cluster

Some specific examples that demonstrate the full range of expressions:

  • 0 * * * * β€” The first minute of every hour
  • */15 9-17 * * * β€” Every fifteen minutes during standard business hours
  • 0 0 * DEC * β€” Once a day at midnight during December
  • 0 7-9,4-6 13 * FRI β€” Once an hour during both rush hours on Friday the 13th

For more in depth information see the man documentation for cron and crontab in your system. Alternatively you can experiment with various expressions online at Crontab Guru.

Caveats & Guidelines

  • All schedules are evaluated as UTC unless a different timezone is provided. See Oban.Plugins.Cron for information about configuring a timezone.

  • Workers can be used for regular and scheduled jobs so long as they accept different arguments.

  • Duplicate jobs are prevented through transactional locks and unique constraints. Workers that are used for regular and scheduled jobs must not specify unique options less than 60s.

  • Long running jobs may execute simultaneously if the scheduling interval is shorter than it takes to execute the job. You can prevent overlap by passing custom unique opts in the crontab config:

    custom_args = %{scheduled: true}
    
    unique_opts = [
      period: 60 * 60 * 24,
      states: [:available, :scheduled, :executing]
    ]
    
    config :my_app, Oban,
      repo: MyApp.Repo,
      plugins: [
        {Oban.Plugins.Cron,
         crontab: [
           {"* * * * *", MyApp.SlowWorker, args: custom_args, unique: unique_opts}
         ]}
      ]

Error Handling

When a job returns an error value, raises an error, or exits during execution the details are recorded within the errors array on the job. When the number of execution attempts is below the configured max_attempts limit, the job will automatically be retried in the future.

The retry delay has an exponential backoff, meaning the job's second attempt will be after 16s, third after 31s, fourth after 1m 36s, etc.

See the Oban.Worker documentation on "Customizing Backoff" for alternative backoff strategies.

Error Details

Execution errors are stored as a formatted exception along with metadata about when the failure occurred and which attempt caused it. Each error is stored with the following keys:

  • at The UTC timestamp when the error occurred at
  • attempt The attempt number when the error occurred
  • error A formatted error message and stacktrace

See the Instrumentation docs for an example of integrating with external error reporting systems.

Limiting Retries

By default, jobs are retried up to 20 times. The number of retries is controlled by the max_attempts value, which can be set at the Worker or Job level. For example, to instruct a worker to discard jobs after three failures:

use Oban.Worker, queue: :limited, max_attempts: 3

Limiting Execution Time

By default, individual jobs may execute indefinitely. If this is undesirable you may define a timeout in milliseconds with the timeout/1 callback on your worker module.

For example, to limit a worker's execution time to 30 seconds:

def MyApp.Worker do
  use Oban.Worker

  @impl Oban.Worker
  def perform(_job) do
    something_that_may_take_a_long_time()

    :ok
  end

  @impl Oban.Worker
  def timeout(_job), do: :timer.seconds(30)
end

The timeout/1 function accepts an Oban.Job struct, so you can customize the timeout using any job attributes.

Define the timeout value through job args:

def timeout(%_{args: %{"timeout" => timeout}}), do: timeout

Define the timeout based on the number of attempts:

def timeout(%_{attempt: attempt}), do: attempt * :timer.seconds(5)

Instrumentation, Error Reporting, and Logging

Oban provides integration with Telemetry, a dispatching library for metrics. It is easy to report Oban metrics to any backend by attaching to :oban events.

Here is an example of a sample unstructured log handler:

defmodule MyApp.ObanLogger do
  require Logger

  def handle_event([:oban, :job, :start], measure, meta, _) do
    Logger.warning("[Oban] :started #{meta.worker} at #{measure.system_time}")
  end

  def handle_event([:oban, :job, event], measure, meta, _) do
    Logger.warning("[Oban] #{event} #{meta.worker} ran in #{measure.duration}")
  end
end

Attach the handler to success and failure events in application.ex:

events = [[:oban, :job, :start], [:oban, :job, :stop], [:oban, :job, :exception]]

:telemetry.attach_many("oban-logger", events, &MyApp.ObanLogger.handle_event/4, [])

The Oban.Telemetry module provides a robust structured logger that handles all of Oban's telemetry events. As in the example above, attach it within your application.ex module:

:ok = Oban.Telemetry.attach_default_logger()

For more details on the default structured logger and information on event metadata see docs for the Oban.Telemetry module.

Reporting Errors

Another great use of execution data is error reporting. Here is an example of integrating with Honeybadger to report job failures:

defmodule MyApp.ErrorReporter do
  def attach do
    :telemetry.attach(
      "oban-errors",
      [:oban, :job, :exception],
      &__MODULE__.handle_event/4,
      []
    )
  end

  def handle_event([:oban, :job, :exception], measure, meta, _) do
    Honeybadger.notify(meta.reason, stacktrace: meta.stacktrace)
  end
end

MyApp.ErrorReporter.attach()

You can use exception events to send error reports to Sentry, AppSignal, Honeybadger, Rollbar, or any other application monitoring platform.

Some of these services support reporting Oban errors out of the box:

Instance and Database Isolation

You can run multiple Oban instances with different prefixes on the same system and have them entirely isolated, provided you give each supervisor a distinct id.

Here we configure our application to start three Oban supervisors using the "public", "special" and "private" prefixes, respectively:

def start(_type, _args) do
  children = [
    Repo,
    Endpoint,
    {Oban, name: ObanA, repo: Repo},
    {Oban, name: ObanB, repo: Repo, prefix: "special"},
    {Oban, name: ObanC, repo: Repo, prefix: "private"}
  ]

  Supervisor.start_link(children, strategy: :one_for_one, name: MyApp.Supervisor)
end

Umbrella Apps

If you need to run Oban from an umbrella application where more than one of the child apps need to interact with Oban, you may need to set the :name for each child application that configures Oban.

For example, your umbrella contains two apps: MyAppA and MyAppB. MyAppA is responsible for inserting jobs, while only MyAppB actually runs any queues.

Configure Oban with a custom name for MyAppA:

config :my_app_a, Oban,
  name: MyAppA.Oban,
  repo: MyApp.Repo

Then configure Oban for MyAppB with a different name:

config :my_app_b, Oban,
  name: MyAppB.Oban,
  repo: MyApp.Repo,
  queues: [default: 10]

Now, use the configured name when calling functions like Oban.insert/2, Oban.insert_all/2, Oban.drain_queue/2, etc., to reference the correct Oban process for the current application.

Oban.insert(MyAppA.Oban, MyWorker.new(%{}))
Oban.insert_all(MyAppB.Oban, multi, :multiname, [MyWorker.new(%{})])
Oban.drain_queue(MyAppB.Oban, queue: :default)

Database Prefixes

Oban supports namespacing through PostgreSQL schemas, also called "prefixes" in Ecto. With prefixes your jobs table can reside outside of your primary schema (usually public) and you can have multiple separate job tables.

To use a prefix you first have to specify it within your migration:

defmodule MyApp.Repo.Migrations.AddPrefixedObanJobsTable do
  use Ecto.Migration

  def up do
    Oban.Migrations.up(prefix: "private")
  end

  def down do
    Oban.Migrations.down(prefix: "private")
  end
end

The migration will create the "private" schema and all tables, functions and triggers within that schema. With the database migrated you'll then specify the prefix in your configuration:

config :my_app, Oban,
  prefix: "private",
  repo: MyApp.Repo,
  queues: [default: 10]

Now all jobs are inserted and executed using the private.oban_jobs table. Note that Oban.insert/2,4 will write jobs in the private.oban_jobs table, you'll need to specify a prefix manually if you insert jobs directly through a repo.

Not only is the oban_jobs table isolated within the schema, but all notification events are also isolated. That means that insert/update events will only dispatch new jobs for their prefix.

Dynamic Repositories

Oban supports Ecto dynamic repositories through the :get_dynamic_repo option. To make this work, you need to run a separate Oban instance per each dynamic repo instance. Most often it's worth bundling each Oban and repo instance under the same supervisor:

def start_repo_and_oban(instance_id) do
  children = [
    {MyDynamicRepo, name: nil, url: repo_url(instance_id)},
    {Oban, name: instance_id, get_dynamic_repo: fn -> repo_pid(instance_id) end}
  ]

  Supervisor.start_link(children, strategy: :one_for_one)
end

The function repo_pid/1 must return the pid of the repo for the given instance. You can use Registry to register the repo (for example in the repo's init/2 callback) and discover it.

If your application exclusively uses dynamic repositories and doesn't specify all credentials upfront, you must implement an init/1 callback in your Ecto Repo. Doing so provides the Postgres notifier with the correct credentials on init, allowing jobs to process as expected.

Ecto Multi-tenancy

If you followed the Ecto guide on setting up multi-tenancy with foreign keys, you need to add an exception for queries originating from Oban. All of Oban's queries have the custom option oban: true to help you identify them in prepare_query/3 or other instrumentation:

# Sample code, only relevant if you followed the Ecto guide on multi tenancy with foreign keys.
defmodule MyApp.Repo do
  use Ecto.Repo, otp_app: :my_app

  require Ecto.Query

  @impl true
  def prepare_query(_operation, query, opts) do
    cond do
      opts[:skip_org_id] || opts[:schema_migration] || opts[:oban] ->
        {query, opts}

      org_id = opts[:org_id] ->
        {Ecto.Query.where(query, org_id: ^org_id), opts}

      true ->
        raise "expected org_id or skip_org_id to be set"
    end
  end
end

Community

There are a few places to connect and communicate with other Oban users:

Contributing

To run the Oban test suite you must have PostgreSQL 12+ and SQLite3 3.37+ running. Follow these steps to create the database, create the database and run all migrations:

mix test.setup

To ensure a commit passes CI you should run mix test.ci locally, which executes the following commands:

  • Check formatting (mix format --check-formatted)
  • Check deps (mix deps.unlock --check-unused)
  • Lint with Credo (mix credo --strict)
  • Run all tests (mix test --raise)
  • Run Dialyzer (mix dialyzer)

oban's People

Contributors

akoutmos avatar antedeguemon avatar anthonator avatar axelson avatar c4710n avatar chaodhib avatar davidsulc avatar dependabot-preview[bot] avatar dependabot[bot] avatar eprothro avatar iautom8things avatar jc00ke avatar jonrowe avatar kianmeng avatar maltoe avatar milmazz avatar nathanl avatar oo6 avatar preciz avatar sasa1977 avatar smaximov avatar sorenone avatar sorentwo avatar stefanchrobot avatar taylordowns2000 avatar tverlaan avatar v0idpwn avatar wingyplus avatar yordis avatar zorbash avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

oban's Issues

Limiting demand in subquery may lock too many rows

Oban may be susceptible to locking too many rows if the postgres query planner decides not to materialize the subquery used here.

There is a good write up of the problem here

I haven't reproduced this problem in Oban, just browsed the code and noticed the structure looked similar to that from the linked article.

Compile time validation of all passed options

It doesn't resolve the real value of max_attempts option. Can't even do the following:

@max_attempts 1
use Oban.Worker, queue: "background", max_attempts: @max_attempts

since it results in

{:@, [line: 17], [{:max_attempts, [line: 17], nil}]}

== Compilation error in file lib/pleroma/workers/background_worker.ex ==
** (ArgumentError) expected :max_attempts to be an integer greater than 0

This feature limitation seems unnecesary. Please make it optional.

Unique by Scheduled At

Hello guys! :)

I'm developing some code for the company that I work for. And I'm in love with Oban!

My doubt is: Can I set a unique job for the field "scheduled_at"?

Whats happen is: I want to schedule a Oban Job and after, I want to schedule another job for the next day.

Anyone have any tip?

Thank you very much :)

Run job in transaction

I really like the idea of storing jobs in the db so that you can schedule jobs atomically together with your data (e.g. user signing up I can save the user and schedule welcome email atomically so if either fails, nothing is saved).

However, I haven’t found a way to save my data atomically with my job succeeding. There may be good reasons for this but would I can’t think of any. My use case is that I want to get some data from an external rest api and store the result in the database, could I do this within a transaction together with the job so that if it fails the data will never be stored unless the entire job succeeds?

Verify that a worker exists when inserting the job

Could Oban.Job.new/2 verify that the worker module does exist and implements the Oban.Worker behavior?

While initially developing with oban I decided to rename my worker, but I forgot to rename the name of the worker at the place in the code that I enqueued the job. This resulted in a job being inserted into the oban_jobs table that has state executing but is not actually being executed.

Although perhaps the docs should more heavily recommend using MyApp.Worker.new/2 to generate the worker changeset instead of Oban.Job.new/2 since that way you will get an error if MyApp.Worker.new/2 does not exist.

Also is there a way to recover from a job whose state is executing, but in reality is not executing?

Version doesn't seem to be respected in migration on CI

I have 2 migrations:

# 20190530181624_add_oban_jobs_table.exs
defmodule MyApp.Repo.Migrations.AddObanJobsTable do
  use Ecto.Migration

  defdelegate up, to: Oban.Migrations
  defdelegate down, to: Oban.Migrations
end

# 20190917224931_upgrade_oban_to_v3.exs
defmodule MyApp.Repo.Migrations.UpgradeObanToV3 do
  use Ecto.Migration

  def up do
    Oban.Migrations.up(version: 3)
  end

  def down do
    Oban.Migrations.down(version: 2)
  end
end

This last migration worked fine with my staging database, but when building fresh for CI, I run into the following error:

.
.
.
.
.
23:21:39.543 [info]  == Running 20190530181624 MyApp.Repo.Migrations.AddObanJobsTable.up/0 forward

23:21:39.550 [info]  execute "CREATE SCHEMA IF NOT EXISTS public"

23:21:39.550 [info]  schema "public" already exists, skipping

23:21:39.550 [info]  execute "DO $$\nBEGIN\nIF NOT EXISTS (SELECT 1 FROM pg_type\n               WHERE typname = 'oban_job_state'\n                 AND typnamespace = 'public'::regnamespace::oid) THEN\n    CREATE TYPE public.oban_job_state AS ENUM (\n      'available',\n      'scheduled',\n      'executing',\n      'retryable',\n      'completed',\n      'discarded'\n    );\n  END IF;\nEND$$;\n"

23:21:39.552 [info]  create table if not exists public.oban_jobs

23:21:39.556 [info]  create index if not exists public.oban_jobs_queue_index

23:21:39.557 [info]  create index if not exists public.oban_jobs_state_index

23:21:39.559 [info]  create index if not exists public.oban_jobs_scheduled_at_index

23:21:39.560 [info]  execute "CREATE OR REPLACE FUNCTION public.oban_jobs_notify() RETURNS trigger AS $$\nDECLARE\n  channel text;\n  notice json;\nBEGIN\n  IF (TG_OP = 'INSERT') THEN\n    channel = 'public.oban_insert';\n    notice = json_build_object('queue', NEW.queue, 'state', NEW.state);\n\n    -- No point triggering for a job that isn't scheduled to run now\n    IF NEW.scheduled_at IS NOT NULL AND NEW.scheduled_at > now() AT TIME ZONE 'utc' THEN\n      RETURN null;\n    END IF;\n  ELSE\n    channel = 'public.oban_update';\n    notice = json_build_object('queue', NEW.queue, 'new_state', NEW.state, 'old_state', OLD.state);\n  END IF;\n\n  PERFORM pg_notify(channel, notice::text);\n\n  RETURN NULL;\nEND;\n$$ LANGUAGE plpgsql;\n"

23:21:39.561 [info]  execute "DROP TRIGGER IF EXISTS oban_notify ON public.oban_jobs"

23:21:39.561 [info]  trigger "oban_notify" for relation "public.oban_jobs" does not exist, skipping

23:21:39.561 [info]  execute "CREATE TRIGGER oban_notify\nAFTER INSERT OR UPDATE OF state ON public.oban_jobs\nFOR EACH ROW EXECUTE PROCEDURE public.oban_jobs_notify();\n"

23:21:39.562 [info]  execute "COMMENT ON TABLE public.oban_jobs IS '1'"

23:21:39.563 [info]  drop index if exists public.oban_jobs_scheduled_at_index

23:21:39.563 [info]  create index public.oban_jobs_scheduled_at_index

23:21:39.564 [info]  create check constraint worker_length on table public.oban_jobs

23:21:39.565 [info]  create check constraint queue_length on table public.oban_jobs

23:21:39.566 [info]  execute "CREATE OR REPLACE FUNCTION public.oban_wrap_id(value bigint) RETURNS int AS $$\nBEGIN\n  RETURN (CASE WHEN value > 2147483647 THEN mod(value, 2147483647) ELSE value END)::int;\nEND;\n$$ LANGUAGE plpgsql IMMUTABLE;\n"

23:21:39.566 [info]  execute "COMMENT ON TABLE public.oban_jobs IS '2'"

23:21:39.567 [info]  alter table public.oban_jobs

23:21:39.567 [info]  create table if not exists public.oban_beats

23:21:39.569 [info]  create index if not exists public.oban_beats_inserted_at_index

23:21:39.571 [info]  execute "COMMENT ON TABLE public.oban_jobs IS '3'"

23:21:39.571 [info]  execute "DROP FUNCTION IF EXISTS public.oban_wrap_id(value bigint)"

23:21:39.572 [info]  execute "COMMENT ON TABLE public.oban_jobs IS '4'"

23:21:39.572 [info]  == Migrated 20190530181624 in 0.0s

.
.
.
.
.

23:21:39.687 [info]  == Running 20190917224931 MyApp.Repo.Migrations.UpgradeObanToV3.up/0 forward

23:21:39.688 [info]  execute "DROP FUNCTION IF EXISTS public.oban_wrap_id(value bigint)"

23:21:39.688 [info]  function public.oban_wrap_id(pg_catalog.int8) does not exist, skipping

23:21:39.688 [info]  execute "COMMENT ON TABLE public.oban_jobs IS '4'"

23:21:39.689 [info]  alter table public.oban_jobs

** (Postgrex.Error) ERROR 42701 (duplicate_column) column "attempted_by" of relation "oban_jobs" already exists

    (ecto_sql) lib/ecto/adapters/sql.ex:629: Ecto.Adapters.SQL.raise_sql_call_error/1

    (elixir) lib/enum.ex:1327: Enum."-map/2-lists^map/1-0-"/2

    (ecto_sql) lib/ecto/adapters/sql.ex:716: Ecto.Adapters.SQL.execute_ddl/4

    (ecto_sql) lib/ecto/migration/runner.ex:338: Ecto.Migration.Runner.log_and_execute_ddl/3

    (ecto_sql) lib/ecto/migration/runner.ex:117: anonymous fn/6 in Ecto.Migration.Runner.flush/0

    (elixir) lib/enum.ex:1940: Enum."-reduce/3-lists^foldl/2-0-"/3

    (ecto_sql) lib/ecto/migration/runner.ex:116: Ecto.Migration.Runner.flush/0

    (stdlib) timer.erl:166: :timer.tc/1

It looks like Oban got fully migrated (we're not live on production, so a little bit of downtime is acceptable) but unless I remove a migration, I'm not sure how to keep both CI and staging/prod happy.

Silence debug logging from Oban?

Is there an easy way to silence the debug logging from Oban when in dev mode? I can see a lot of lines of things like:

[debug] QUERY OK db=0.2ms queue=0.3ms
SELECT pg_notify($1, $2) ["oban_gossip", "{\"count\":0,\"limit\":1,\"node\":\"tetra\",\"paused\":false,\"queue\":\"slack\"}"]
[debug] QUERY OK db=0.2ms queue=0.1ms
SELECT pg_notify($1, $2) ["oban_gossip", "{\"count\":0,\"limit\":5,\"node\":\"tetra\",\"paused\":false,\"queue\":\"mailer\"}"]
[debug] QUERY OK source="oban_jobs" db=0.2ms
UPDATE "oban_jobs" AS o0 SET "state" = $1, "attempted_at" = $2, "attempt" = o0."attempt" + $3 FROM (SELECT o0."id" AS "id", pg_try_advisory_lock_shared('oban_jobs'::regclass::oid::int, oban_wrap_id(o0."id")) AS "lock" FROM "oban_jobs" AS o0 WHERE (o0."state" = 'available') AND (o0."queue" = $4) AND (o0."scheduled_at" <= $5) ORDER BY o0."scheduled_at", o0."id" LIMIT $6 FOR UPDATE SKIP LOCKED) AS s1 WHERE (o0."id" = s1."id") RETURNING o0."id", o0."state", o0."queue", o0."worker", o0."args", o0."errors", o0."attempt", o0."max_attempts", o0."attempted_at", o0."completed_at", o0."inserted_at", o0."scheduled_at" ["executing", ~U[2019-07-01 10:03:38.769207Z], 1, "mailer", ~U[2019-07-01 10:03:38.769196Z], 5]
[debug] QUERY OK source="oban_jobs" db=0.3ms
UPDATE "oban_jobs" AS o0 SET "state" = $1, "attempted_at" = $2, "attempt" = o0."attempt" + $3 FROM (SELECT o0."id" AS "id", pg_try_advisory_lock_shared('oban_jobs'::regclass::oid::int, oban_wrap_id(o0."id")) AS "lock" FROM "oban_jobs" AS o0 WHERE (o0."state" = 'available') AND (o0."queue" = $4) AND (o0."scheduled_at" <= $5) ORDER BY o0."scheduled_at", o0."id" LIMIT $6 FOR UPDATE SKIP LOCKED) AS s1 WHERE (o0."id" = s1."id") RETURNING o0."id", o0."state", o0."queue", o0."worker", o0."args", o0."errors", o0."attempt", o0."max_attempts", o0."attempted_at", o0."completed_at", o0."inserted_at", o0."scheduled_at" ["executing", ~U[2019-07-01 10:03:38.769157Z], 1, "slack", ~U[2019-07-01 10:03:38.769147Z], 1]
[debug] QUERY OK source="oban_jobs" db=0.3ms queue=0.1ms
UPDATE "oban_jobs" AS o0 SET "state" = $1 WHERE (o0."state" IN ('scheduled','retryable')) AND (o0."queue" = $2) AND (o0."scheduled_at" <= $3) ["available", "mailer", ~U[2019-07-01 10:03:38.814383Z]]
[debug] QUERY OK source="oban_jobs" db=0.4ms
UPDATE "oban_jobs" AS o0 SET "state" = $1 WHERE (o0."state" IN ('scheduled','retryable')) AND (o0."queue" = $2) AND (o0."scheduled_at" <= $3) ["available", "default", ~U[2019-07-01 10:03:38.814252Z]]
[debug] QUERY OK source="oban_jobs" db=0.7ms
UPDATE "oban_jobs" AS o0 SET "state" = $1 WHERE (o0."state" IN ('scheduled','retryable')) AND (o0."queue" = $2) AND (o0."scheduled_at" <= $3) ["available", "slack", ~U[2019-07-01 10:03:38.814226Z]]

It'd be really great to be able to silence this, so I can see the debug logging coming from my application, when in dev mode.

Errors when trying to run tests

Hey πŸ‘‹ ! I'm trying to switch from exq to oban but I'm having issues with the tests. I've followed the recommendations on the README related to testing but when I run my tests I get a lot of errors that seem to be related to processes that Oban creates in the background:

13:32:18.928 [error] GenServer Oban.Queue.Default.Producer terminating
** (DBConnection.OwnershipError) cannot find ownership process for #PID<0.429.0>.

When using ownership, you must manage connections in one
of the four ways:

* By explicitly checking out a connection
* By explicitly allowing a spawned process
* By running the pool in shared mode
* By using :caller option with allowed process

The first two options require every new process to explicitly
check a connection out or be allowed by calling checkout or
allow respectively.

The third option requires a {:shared, pid} mode to be set.
If using shared mode in tests, make sure your tests are not
async.

The fourth option requires [caller: pid] to be used when
checking out a connection from the pool. The caller process
should already be allowed on a connection.

If you are reading this error, it means you have not done one
of the steps above or that the owner process has crashed.

See Ecto.Adapters.SQL.Sandbox docs for more information.
    (ecto_sql) lib/ecto/adapters/sql.ex:615: Ecto.Adapters.SQL.raise_sql_call_error/1
    (ecto_sql) lib/ecto/adapters/sql.ex:551: Ecto.Adapters.SQL.execute/5
    (ecto) lib/ecto/repo/queryable.ex:153: Ecto.Repo.Queryable.execute/4
    (oban) lib/oban/queue/producer.ex:180: Oban.Queue.Producer.fetch_jobs/3
    (oban) lib/oban/queue/producer.ex:170: Oban.Queue.Producer.dispatch/1
    (stdlib) gen_server.erl:637: :gen_server.try_dispatch/4
    (stdlib) gen_server.erl:388: :gen_server.loop/7
    (stdlib) proc_lib.erl:249: :proc_lib.init_p_do_apply/3
Last message: {:continue, :start}
13:32:18.951 [error] GenServer Oban.Queue.Default.Watchman terminating
** (stop) exited in: GenServer.call(Oban.Queue.Default.Producer, :pause, 5000)
    ** (EXIT) no process: the process is not alive or there's no process currently associated with the given name, possibly because its application isn't started
    (elixir) lib/gen_server.ex:979: GenServer.call/3
    (oban) lib/oban/queue/watchman.ex:43: Oban.Queue.Watchman.terminate/2
    (stdlib) gen_server.erl:673: :gen_server.try_terminate/3
    (stdlib) gen_server.erl:858: :gen_server.terminate/10
    (stdlib) proc_lib.erl:249: :proc_lib.init_p_do_apply/3
Last message: {:EXIT, #PID<0.426.0>, :shutdown}
13:32:18.952 [error] GenServer Oban.Queue.Default.Producer terminating
** (DBConnection.OwnershipError) cannot find ownership process for #PID<0.453.0>.

When using ownership, you must manage connections in one
of the four ways:

* By explicitly checking out a connection
* By explicitly allowing a spawned process
* By running the pool in shared mode
* By using :caller option with allowed process

The first two options require every new process to explicitly
check a connection out or be allowed by calling checkout or
allow respectively.

The third option requires a {:shared, pid} mode to be set.
If using shared mode in tests, make sure your tests are not
async.

The fourth option requires [caller: pid] to be used when
checking out a connection from the pool. The caller process
should already be allowed on a connection.

If you are reading this error, it means you have not done one
of the steps above or that the owner process has crashed.

See Ecto.Adapters.SQL.Sandbox docs for more information.
    (ecto_sql) lib/ecto/adapters/sql.ex:615: Ecto.Adapters.SQL.raise_sql_call_error/1
    (ecto_sql) lib/ecto/adapters/sql.ex:551: Ecto.Adapters.SQL.execute/5
    (oban) lib/oban/queue/producer.ex:116: Oban.Queue.Producer.rescue_orphans/1
    (oban) lib/oban/queue/producer.ex:49: Oban.Queue.Producer.handle_continue/2
    (stdlib) gen_server.erl:637: :gen_server.try_dispatch/4
    (stdlib) gen_server.erl:388: :gen_server.loop/7
    (stdlib) proc_lib.erl:249: :proc_lib.init_p_do_apply/3
Last message: {:continue, :start}
... (this happens more ~15 times)

I've tried a few different strategies using Ecto.Adapters.SQL.Sandbox.allow and different Sandbox :shared configs but I couldn't stop those errors as those processes seems to be created when the app starts, even before the tests start to run, right?

Here are the parts my configs that might be helpful:

# test_helper.exs
...
ExUnit.start()
Ecto.Adapters.SQL.Sandbox.mode(Pidgey.Repo, :manual)


# application.ex
def start(_type, _args) do
    children = [
      Pidgey.Repo,
      PidgeyWeb.Endpoint,
      Pidgey.Vault,
      {Oban, Application.get_env(:pidgey, Oban)}
    ]

    opts = [strategy: :one_for_one, name: Pidgey.Supervisor]

    Supervisor.start_link(children, opts)
  end

# config.exs
config :pidgey, Oban,
  repo: Pidgey.Repo,
  queues: [default: 10]

# config/test.exs
config :pidgey, Oban, poll_interval: :timer.minutes(60)

It is a Phoenix app, some tests are using ExUnit.Case, others DataCase or ConnCase but when trying to run even a single test file those errors will be raised, even though the specs will pass.

I'm creating this issue because I think it might be an issue with the package, let me know if it is not and that I should create a question on StackOverflow or on the Elixir forum :)

Migration issue (on unreleased master)

Coming from #52 I switched deps to {:oban, github: "sorentwo/oban"} and added a migration file according to the Changelog:

  def up do
    Oban.Migrations.up(version: 3)
  end

  def down do
    Oban.Migrations.down(version: 2)
  end

Migrating this fails because it does migrations for V2 and V3 and since I am already on V2 it fails with ** (Postgrex.Error) ERROR 42710 (duplicate_object) constraint "worker_length" for relation "oban_jobs" already exists

I am at V2, which I verified by running the query in Oban.Migrations.get_current/1 manually.

Looking at Oban.Migrations.merge_defaults/1 I feel like you are off by one in the range because the first migration you want to run is the one after the current one, right?

  defp merge_defaults(opts) do
    opts = Map.merge(@default_opts, Map.new(opts))
-   Map.put(opts, :range, get_current(opts.prefix)..opts.version)
+   Map.put(opts, :range, (get_current(opts.prefix)+1)..opts.version)
  end

Default queue with unique jobs.

It seems like no matter what I do, I cannot seem to use the "default" queue while also using uniqueness.

config :simple, Oban,
  repo: Simple.DB.Postgres,                                                                  
  queues: [default: 10, foo: 2],                                                        
  verbose: false,
  prune: {:maxage, 60 * 60 * 24}

The job

  use Oban.Worker,
    max_attempts: 4,
    unique: [period: 3600, states: [:available, :scheduled, :executing]]

def foo do
    %{bar: 123}
    |> IO.inspect()
    |> __MODULE__.new()
    |> Oban.insert()
end

** (ArgumentError) nil given for :queue. Comparison with nil is forbidden as it is unsafe. Instead write a query with is_nil/1, for example: is_nil(s.queue)
(ecto) lib/ecto/query/builder/filter.ex:128: Ecto.Query.Builder.Filter.kw!/7
(ecto) lib/ecto/query/builder/filter.ex:121: Ecto.Query.Builder.Filter.kw!/3
(ecto) lib/ecto/query/builder/filter.ex:103: Ecto.Query.Builder.Filter.filter!/6
(ecto) lib/ecto/query/builder/filter.ex:115: Ecto.Query.Builder.Filter.filter!/7
(oban) lib/oban/query.ex:202: Oban.Query.get_unique_job/3
(oban) lib/oban/query.ex:37: Oban.Query.fetch_or_insert_job/2

Oban seems to be bringing down my application

I think it happens when the database is unavailable. I see this error in our error monitoring service:

no match of right hand side value: {:error, %DBConnection.ConnectionError{message: "tcp connect (ec2-<db-ip>.compute-1.amazonaws.com:5432): connection refused - :econnrefused"}}

Which happens for: "registered_name" => "Elixir.MyAppOban.Notifier.Conn"

and there's also this at around the same time:

FATAL 57P01 (admin_shutdown) terminating connection due to administrator command

which happens for "registered_name" => "Elixir.MyAppOban.Queue.Default.Producer"

So far I've seen this on Heroku as well as locally. My suspicion is that when the database is inaccessible the MyAppOban is restarting fast enough to overwhelm MyApp.Supervisor.

Oban notifier eats up postgres connections

Environment

  • Oban Version: v0.10.0
  • PostgreSQL Version 11.4
  • Elixir & Erlang/OTP Versions 1.9.0 22.0.4

Current Behavior

We observed in our running system that all instances were failing, since non of the instances could get postgres connections anymore. After debugging into the issue we did a quick activity check in postgres to see who owns all the connections:

SELECT pid, query, state FROM pg_stat_activity;

    [11660, "LISTEN \"public.oban_signal\"", "idle"],
    [13727, "LISTEN \"public.oban_signal\"", "idle"],
    [13728, "LISTEN \"public.oban_signal\"", "idle"],
    [13729, "LISTEN \"public.oban_signal\"", "idle"],
    [11661, "LISTEN \"public.oban_signal\"", "idle"],
    [13731, "LISTEN \"public.oban_signal\"", "idle"],
    [13732, "LISTEN \"public.oban_signal\"", "idle"],
    [11662, "LISTEN \"public.oban_signal\"", "idle"],
    [13734, "LISTEN \"public.oban_signal\"", "idle"],
    [13735, "LISTEN \"public.oban_signal\"", "idle"],
    [11663, "LISTEN \"public.oban_signal\"", "idle"],
    [13737, "LISTEN \"public.oban_signal\"", "idle"],
    [11664, "LISTEN \"public.oban_signal\"", "idle"],
    [13740, "LISTEN \"public.oban_signal\"", "idle"],
    [13738, "LISTEN \"public.oban_signal\"", "idle"],
    [13741, "LISTEN \"public.oban_signal\"", "idle"],
    [13742, "LISTEN \"public.oban_signal\"", "idle"],
    [11665, "LISTEN \"public.oban_signal\"", "idle"],
    [11666, "LISTEN \"public.oban_signal\"", "idle"],
    [13744, "LISTEN \"public.oban_signal\"", "idle"],
    [13746, "LISTEN \"public.oban_signal\"", "idle"],
    [11667, "LISTEN \"public.oban_signal\"", "idle"],
    [13748, "LISTEN \"public.oban_signal\"", "idle"],
    [13749, "LISTEN \"public.oban_signal\"", "idle"],
    [11668, "LISTEN \"public.oban_signal\"", "idle"],
    [13751, "LISTEN \"public.oban_signal\"", "idle"],
    [13752, "LISTEN \"public.oban_signal\"", "idle"],
    [11669, "LISTEN \"public.oban_signal\"", "idle"],
    [13754, "LISTEN \"public.oban_signal\"", "idle"],
    [13755, "LISTEN \"public.oban_signal\"", "idle"],
    [11670, "LISTEN \"public.oban_signal\"", "idle"],

  ...

We found out that nearly all our connections are blocked by the oban notifier.

We suspect the recently added changes in the notifications as the culprit but we are not quite sure how to fix this on our end.

Expected Behavior

Our expectation was that only one connection is used by oban.

Thank you in advance πŸ™ .
So far our experience with oban has been great and we also wanted to thank you again for this amazing library πŸ’š .

UPDATE:

After killing them with:

Ecto.Adapters.SQL.query!(Everyworks.Repo, "SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE query = 'LISTEN \"public.oban_signal\"'")

New oban_signal processes spawn and instantly fill the connection slots again.

Oban crashes when Ecto logging is disabled

I've noticed this on our team app and I managed to replicate it with a standalone Phoenix app. Disabling a Repo log option with:

diff --git a/config/dev.exs b/config/dev.exs
index 108ccc1..59d9332 100644
--- a/config/dev.exs
+++ b/config/dev.exs
@@ -5,6 +5,7 @@ config :testapp, Testapp.Repo,
   username: "lucas",
   database: "testapp_dev",
   hostname: "localhost",
+  log: false,
   show_sensitive_data_on_connection_error: true,
   pool_size: 10

Makes Oban crash with the following:

[error] an exception was raised logging %DBConnection.LogEntry{call: :prepare_execute, connection_time: 3421000, decode_time: 1568000, params: ["available", ~U[2019-09-25 18:30:55.041959Z], "default"], pool_time: 534252000, query: %Postgrex.Query{cache: :reference, columns: nil, name: "ecto_3205", param_formats: nil, param_oids: nil, param_types: nil, ref: nil, result_formats: nil, result_oids: nil, result_types: nil, statement: "UPDATE \"public\".\"oban_jobs\" AS o0 SET \"state\" = $1 FROM (SELECT o0.\"id\" AS \"id\" FROM \"public\".\"oban_jobs\" AS o0 LEFT OUTER JOIN \"public\".\"oban_beats\" AS o1 ON (o0.\"attempted_by\" = ARRAY[o1.\"node\",o1.\"queue\",o1.\"nonce\"]) AND (o1.\"inserted_at\" > $2) WHERE ((o0.\"state\" = 'executing') AND (o0.\"queue\" = $3)) AND (o1.\"node\" IS NULL)) AS s1 WHERE (o0.\"id\" = s1.\"id\")", types: nil}, result: {:ok, %Postgrex.Query{cache: :reference, columns: nil, name: "ecto_3205", param_formats: [:binary, :binary, :binary], param_oids: [16916, 1114, 25], param_types: [Postgrex.Extensions.Raw, Postgrex.Extensions.Timestamp, Postgrex.Extensions.Raw], ref: #Reference<0.701822816.1820852225.69693>, result_formats: [], result_oids: nil, result_types: nil, statement: "UPDATE \"public\".\"oban_jobs\" AS o0 SET \"state\" = $1 FROM (SELECT o0.\"id\" AS \"id\" FROM \"public\".\"oban_jobs\" AS o0 LEFT OUTER JOIN \"public\".\"oban_beats\" AS o1 ON (o0.\"attempted_by\" = ARRAY[o1.\"node\",o1.\"queue\",o1.\"nonce\"]) AND (o1.\"inserted_at\" > $2) WHERE ((o0.\"state\" = 'executing') AND (o0.\"queue\" = $3)) AND (o1.\"node\" IS NULL)) AS s1 WHERE (o0.\"id\" = s1.\"id\")", types: {Postgrex.DefaultTypes, #Reference<0.701822816.1820983299.68770>}}, %Postgrex.Result{columns: nil, command: :update, connection_id: 73888, messages: [], num_rows: 0, rows: nil}}}: ** (FunctionClauseError) no function clause matching in Logger.__should_log__/1
    (logger) lib/logger.ex:668: Logger.__should_log__(false)
    (ecto_sql) lib/ecto/adapters/sql.ex:810: Ecto.Adapters.SQL.log/4
    (db_connection) lib/db_connection.ex:1377: DBConnection.log/5
    (ecto_sql) lib/ecto/adapters/sql.ex:570: Ecto.Adapters.SQL.execute!/4
    (ecto_sql) lib/ecto/adapters/sql.ex:562: Ecto.Adapters.SQL.execute/5
    (oban) lib/oban/queue/producer.ex:153: Oban.Queue.Producer.rescue_orphans/1
    (oban) lib/oban/queue/producer.ex:83: Oban.Queue.Producer.handle_continue/2
    (stdlib) gen_server.erl:637: :gen_server.try_dispatch/4
    (stdlib) gen_server.erl:388: :gen_server.loop/7
    (stdlib) proc_lib.erl:249: :proc_lib.init_p_do_apply/3

Removing Oban from the supervision tree and/or skipping the log: false option fixes the issue, and the web server still work as expected even when log is false.

High CPU usage after sleep

Oban uses very high CPU usage when I wake my Macbook up after sleep. I often leave my development server running and shut the lid at the end of the day. When I come back the following day and open the lid my BEAM process sits using 100% of the CPU.

Observer shows me that there's a large number of messages (currently ~111000) in each queue process, e.g. Elixir.Oban.Queue.Default. I'm guessing there's some kind of timer function which queues messages according to when the last one was sent, and putting the application to sleep causes it to schedule a large number upon wake.

This isn't a major issue - putting the machine to sleep would never happen in production, and it's easy to kill the process and start again in development. But logging it here in case it's an easy fix!

sql_error_code = 01000 WARNING: you don't own a lock of type ShareLock

Context: I have a nightly task that spins up a bunch of jobs that calculate/set a value on rows in one of my tables. I don't have any sort of batching implemented for it as of right now, so that task spins up a good number of jobs quickly, I estimate around 90k jobs. Since swapping out my redis job setup w/ Oban, I've noticed a bunch of sql_error_code = 01000 WARNING: you don't own a lock of type ShareLock in my DB's console (I'm on Heroku fwiw). This seems limited to only one of my queues, email. I manually changed the queue on a few of the jobs from email to default and the jobs were immediately completed.

Any ideas here? I did notice a closed issue #4 but I'm not entirly sure if this issue is related.

Wondering about migration best practice

Here's what I did:

First migration when setting up Oban

  defdelegate up, to: Oban.Migrations
  defdelegate down, to: Oban.Migrations

Second migration that came later

  def up do
    Oban.Migrations.up(version: 3)
  end

  def down do
    Oban.Migrations.down(version: 2)
  end

Issue with these migrations

Now, when setting up the project from scratch, the first migration already migrates all the way and subsequent migrations fail because they try to execute versions that are already done.

The fix for me, I think, is to change the initial migration to:

  def up do
    Oban.Migrations.up(version: 2)
  end

  def down do
    Oban.Migrations.down(version: 1)
  end

However I wonder if the documentation should be adjusted so others don't run into the same issue.

Can't view emails sent from a Job

First of all sorry if it is not the place to put this issue. I really don't know whether it belongs here or to the Bamboo repository, but I can move it without any problem.

I have a Oban job which sends a notification email. In my local environment I am using the Bamboo.LocalAdapter to catch the emails and see them in my Phoenix server.

The job code is really simple:

defmodule MyApp.ObanEmail do
  use Oban.Worker, queue: "notifications"

  alias MyApp.Mailer

  def perform(_args) do
    Bamboo.Email.new_email()
    |> Bamboo.Email.from("[email protected]")
    |> Bamboo.Email.to("[email protected]")
    |> Bamboo.Email.subject("Test Email")
    |> Mailer.deliver_now()
  end
end

And the mailer is configured as:

# lib/myapp/mailer.ex
defmodule MyApp.Mailer do
  use Bamboo.Mailer, otp_app: :my_app
end

# config/dev.exs
config :my_app, MyApp.Mailer, adapter: Bamboo.LocalAdapter

Turns out that when I call the perform function directly, I can see the sent email. When I enqueue the job and Oban runs it, it succeeds and gets marked as completed, but I can not see the email. Any clue on why there would be that difference?

Multiple Instances of Oban in an Umbrella Application

Hi all,

We're already using Oban in one of our umbrella applications. We have another application in the umbrella that uses a separate database (and a separate Repo). I'm trying to add Oban to a second umbrella application, but the supervisor won't start since it's already started in the umbrella.

When I run our tests, this is the output:

** (Mix) Could not start application application_a: ApplicationA.Application.start(:normal, []) returned an error: shutdown: failed to start child: Oban
    ** (EXIT) already started: #PID<0.586.0>

Is there a better way to do this?

Pool vs DynamicSupervisor

I think it would be helpful to spell out whether queues use a pool of processes or create processes on-demand. That may change the approach people take when deciding how big they want their queues to be.

How to repeat a job?

What is the best way to repeat (as in, reschedule somewhere in the future) a running job?
Can it update its own scheduled_at and set its status again to available?
Or should it simply create a new job?

Faktory / Sidekiq Style Dashboard

Problem

I'd like to capture job states and failures through a UI of some kind e.g. like faktory

Solution

I can imagine something sort of like Faktory's UI, but obviously Oban works a little differently (e.g. tends to keep many, many more jobs around for a lot, lot longer).

Alternatives

I did a cursory search and didn't see anything like this out there for Oban, but I could have totally missed that effort.

Additional Comments

I'm pretty new to Elixir and have been poking at Oban as a replacement for a Ruby / Sidekiq project just for fun. I'd be really interested in working on some sort of UI on top of Oban (maybe a web UI, maybe even something on the terminal?) if it seems like something of interest - or maybe pitching in with an existing project.

Thoughts?

Thanks for all the work on this project, I'm really enjoying working with Oban so far.

oban 0.7.0 doesn't supports `max_attempts: 1`

Hi everybody!

In our project, we have a worker that updates the client status. We defined the max_attempts: 1 option to avoid inconsistent caused by a retry without treatment (it was a good and cheap option at the moment).

But now that we updated to 0.7.0 version, our worker test case broke because of this change, and I didn't found any explanation about this.

I would like to know why this change was made, and how can I get around this?

Thanks for now!

add testing functions to allow for pattern matching on `assert_enqueued`

It would be nice if there was an extra testing option along with assert_enqueued that returned the arguments that were enqueued. This would allow to pattern match on the arguments instead of having to provide the complete args.

This is useful when you don't know the full args up front in the test, or you're not interested in the full args.

%{"foo" => 1} = assert_enqueued(worker: MyWorker, queue: "foobar")

Web UI

Hello!

Is there a web-based user interface that we can use to monitor and administrate our oban work queues? :)

Thanks,
Louis

`scheduled_at` option isn't being respected

Following the scheduled_at example on the README insert jobs as completed rather than scheduled, while schedule_in results in the expected behaviour. I first suspected this after trying out on our app by reading the integration tests on https://github.com/sorentwo/oban/blob/master/test/integration/scheduling_test.exs and noticing that the test is only exercising the schedule_in option

%{id: 1}
|> MyApp.Business.new(scheduled_at: ~U[2020-12-25 19:00:56.0Z])
|> Oban.insert()
# job will be `completed` with a `completed_at` at now and with the correct `scheduled_at` column

%{id: 1}
|> MyApp.Business.new(schedule_in: 5)
|> Oban.insert()
# job will be `scheduled` and with a null `completed_at` column

GenServer.call `:noproc` error

After running on a server (heroku) for a while I occasionally get the following error:

Erlang error: {:noproc, {GenServer, :call, [Oban.Queue.Default.Producer, :pause, 5000]}}
Stacktrace:

lib/gen_server.ex:979:in `call/3'
lib/oban/queue/watchman.ex:43:in `terminate/2'
gen_server.erl:673:in `try_terminate/3'
gen_server.erl:858:in `terminate/10'
proc_lib.erl:249:in `init_p_do_apply/3'

I believe :noproc means that there is not currently a process running with the name Oban.Queue.Default.Producer so perhaps there's a race condition with process registration.

Actually I also have the following error in my logs which seems to have happened at roughly the same time (and may predate the :noproc error):

Erlang error: {:timeout, {GenServer, :call, [Oban.Notifier, {:notify, "oban_gossip", %{count: 0, limit: 10, node: "web.1", paused: false, queue: "default"}}, 5000]}}

lib/gen_server.ex:989:in `call/3'
lib/oban/queue/producer.ex:192:in `gossip/1'
lib/oban/queue/producer.ex:86:in `handle_info/2'
gen_server.erl:637:in `try_dispatch/4'
gen_server.erl:711:in `handle_msg/6'
proc_lib.erl:249:in `init_p_do_apply/3'

Support a `discarded` callback

I have a requirement to take some action when a job reaches it's max attempts and is discarded from the queue. I was hoping it would be possible to add a callback to Oban.Worker to handle this. I imagine something like this...

defmodule MyWorker do
  use Oban.Worker, max_attempts: 5

  def discarded(job) do
    Logger.info("job #{job.id} was discarded after #{job.attempts}")
  end
end

note: my use case is more complex than simple logging but hopefully it illustrates the point.

If this is something you would be willing to support in Oban I can have a go at putting together a PR to add the functionality.

Postgres performance question

Hi,

We've been running Oban on Postgres 9.5 for a couple of weeks and I've noticed that performance has degraded significantly.

We have 2 workers - one is a "normal" worker, the job is enqueued on demand by the app, another one is "recurring" worker, that reschedules itself on every run and runs every 5 seconds.

Here's how its performance looks over time (the purple line is recurring worker):
image

DB logs are full of warnings about slow queries:

duration: 102.274 ms  execute ecto_68: SELECT o0."id", o0."state", o0."queue", o0."worker", o0."args", o0."errors", o0."attempt", o0."attempted_by", o0."max_attempts", o0."attempted_at", o0."completed_at", o0."inserted_at", o0."scheduled_at" FROM "public"."oban_jobs" AS o0 WHERE (o0."state" = ANY($1)) AND (o0."inserted_at" > $2) AND ((o0."worker" = $3) AND (o0."queue" = $4)) ORDER BY o0."id" DESC LIMIT 1

But when I run this query with EXPLAIN ANALYZE it shows nothing:

# explain analyze SELECT * FROM "public"."oban_jobs" AS o0 WHERE (o0."state" = any('{scheduled}')) AND (o0."inserted_at" > '2019-10-07 11:19:46.702893') AND ((o0."worker" = 'EngagementRouter.EngagementRequests.SynchronizeExpiredRequestsWorker') AND (o0."queue" = 'engagement_requests')) ORDER BY o0."id" DESC LIMIT 1;;
                                                                                          QUERY PLAN
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 Limit  (cost=6.20..6.21 rows=1 width=257) (actual time=0.040..0.041 rows=1 loops=1)
   ->  Sort  (cost=6.20..6.21 rows=1 width=257) (actual time=0.038..0.038 rows=1 loops=1)
         Sort Key: id DESC
         Sort Method: quicksort  Memory: 25kB
         ->  Index Scan using oban_jobs_queue_state_scheduled_at_id_index on oban_jobs o0  (cost=0.42..6.19 rows=1 width=257) (actual time=0.016..0.016 rows=1 loops=1)
               Index Cond: ((queue = 'engagement_requests'::text) AND (state = ANY ('{scheduled}'::oban_job_state[])))
               Filter: ((inserted_at > '2019-10-07 11:19:46.702893'::timestamp without time zone) AND (worker = 'EngagementRouter.EngagementRequests.SynchronizeExpiredRequestsWorker'::text))
 Planning time: 0.504 ms
 Execution time: 0.106 ms
(9 rows)

Purging and vacuuming the table (delete from oban_jobs where state = 'completed') helps, but I don't think it's a good solution.

What I can't understand is how the same query performs so differently when ran from Oban and from psql under explain analyze. And how this query be so slow on a relatively small table (about 41 MB in total).

UPD. We run Oban 0.10.0 with the latest migration applied.

Postgres schemas

Does Oban support Postgres schemas (:prefix in Ecto) or are there plans to support them?

dynamic queue's

Is there a way to create dynamic queues? on forehand I don't know the queues, I would like to create and queues on the fly ..

Oban.Migrations.down generates wrong range

When I'm on V2 and call Oban.Migrations.down, it should use a range of 2..1 but the current implementation generates 2..4.

Possible implementation (tested):

  def up(opts \\ []) when is_list(opts) do
    opts
    |> merge_defaults()
    |> put_range(:up)
    |> change(:up)
  end

  def down(opts \\ []) when is_list(opts) do
    opts
    |> merge_defaults()
    |> put_range(:down)
    |> change(:down)
  end

  defp merge_defaults(opts) do
    Map.merge(@default_opts, Map.new(opts))
  end

  defp put_range(opts, :up) do
    Map.put(opts, :range, (get_initial(opts.prefix) + 1)..opts.version)
  end

  defp put_range(opts, :down) do
    Map.put(opts, :range, get_initial(opts.prefix)..1)
  end

assert|refute_enqueued matches completed jobs

The assert|refute_enqueued helpers are super handy, but they match completed jobs, which was a bit confusing. Semantically, I think of enqueued as "will possibly run in the future" but of course completed jobs won't ever be executed again.

This kept failing...

refute_enqueued(
  worker: MyApp.Workers.NotifyUser,
  args: %{user_id: userid}
)

I couldn't figure out why the job was being queued up, since my logic to queue the job in the first place was limited on the size of an array, and that part was right.

After this change...

refute_enqueued(
  worker: MyApp.Workers.NotifyUser,
-  args: %{user_id: userid}
+  args: %{user_id: userid, state: "available"}
)

my test passed.

So, my question is, should assert|refute_enqueued match completed jobs?

Migrations.change/3 will call Migrations.V5 if initial add_oban_jobs_table uses defdelegate

I believe the installation instructions used to specify an initial migration like so:

defmodule MyApp.Migrations.AddObanJobsTable do
  use Ecto.Migration

  defdelegate up, to: Oban.Migrations
  defdelegate down, to: Oban.Migrations
end

When running MIX_ENV=test mix ecto.reset and I CI I get the following error:

** (ArgumentError) argument error
    :erlang.binary_to_existing_atom("Elixir.Oban.Migrations.V5", :utf8)
    lib/oban/migrations.ex:45: anonymous fn/4 in Oban.Migrations.change/3
    (elixir) lib/enum.ex:3011: Enum.reduce_range_dec/4
    lib/oban/migrations.ex:43: Oban.Migrations.change/3
    (ecto_sql) lib/ecto/migration/runner.ex:280: Ecto.Migration.Runner.perform_operation/3
    (stdlib) timer.erl:166: :timer.tc/1
    (ecto_sql) lib/ecto/migration/runner.ex:27: Ecto.Migration.Runner.run/7

I noticed that the readme now specifies an initial add_oban_jobs_table migration where up/1 doesn't specify a version opt which results in the same error listed above. Specifying a version like Oban.Migrations.up(version: 2) works though.

https://github.com/sorentwo/oban/blob/master/lib/oban/migrations.ex#L12 gets a version of 4 unless specified and the range is incremented by 1 which lands us at Migrations.V5 on line 45 (I think).

-- edit

Yeah I think I'm wrong about the actual cause, but if I can figure it out soon I'll PR it :D

Ecto.Repo.insert_all/3 and Ecto.Multi.insert_all/5 support?

We have a case where we are doing a series of insert_all/3 operations (with on conflict update resolution) and one of the things we need to do is to start a background task (sending emails) for those operations. What would be ideal is something like:

customers
|> Enum.map(&Worker.new_for_insert_all/1)
|> Oban.insert_all()

We can implement something ourselves for this, but as Oban’s jobs schema changes from time to time, we would always be playing catch-up or having to hold on an older version. What do you think?

Deprecate perform/1 in favor of perform/2

Thanks for this fantastic library!

Currently, users expect the job args to be passed to perform/1, and when they need access to the %Job{}, they define a new perform/1 clause which matches on the job. This works because Oban injects:

      def perform(%Job{args: args}), do: perform(args)
      def perform(args) when is_map(args), do: :ok

While this works with very little code injection, it's non obvious to users how to reach the Job struct when needed, and non obvious that once they define their own clause that matches the struct, they may invalidate other previously valid perform/1 clauses depending on what they are doing. Instead of the argument type dance, I propose we introduce perform/2 which receives the job struct, followed by arguments. For the general use case, this still looks quite nice:

def perform(_job, %{some: arg}) do

This way it's clear what is called and when, and also makes the job metadata instantly accessible once needed. I think the throwaway arg is a very reasonable tradeoff for the times the job info isn't needed. Thoughts?

Specify `oban_jobs` table primary key as bigserial

All of our migration primary keys are of type uuid. So we specified it with config :app, App.Repo, migration_primary_key: [type: :binary_id], but Oban needs the id to be a bigserial. So Oban should explicitely specify the type of the id like that:

create_if_not_exists table(:oban_jobs, primary_key: false) do
      add(:id, :bigserial, primary_key: true)

How to prevent some nodes from performing jobs?

How would one indicate that, for a given context, I don’t want a particular node to take on any workers? I’m thinking that I have a temporary node whose responsibility is to run mix tasks that can insert jobs into Oban queues, but should not process jobs from those queues.

As an enhanced form of the question, is it possible for a node to specifically subscribe to (or exclude subscription from) particular queues? That is, if I wanted to have two worker nodes, each one handling a different subset of queues, how would I do so?

[Question] Chaining jobs

Hi!
Thanks for a great software!
I'm wondering on a pattern for chaining jobs, my use case is a file download from an FTP server and database imports of multiple files when the data has been downloaded.
I was thinking on creating a job that does the file download and create a Task to check whether the file download has completed and that task enqueues a multiple import jobs.
The problem I was seeing with that approach was what would happen if the Task fails for some reason?
Any suggestions on the how a pattern like this should/could be implemented?

Br,
Mats

Pruner is losing the db connection from timeout

Environment

  • Oban Version: 0.10.1
  • PostgreSQL Version: 11.4
  • Elixir & Erlang/OTP Versions (elixir --version): 1.8.1/21

Current Behavior

image

This is from the Heroku console.

Expected Behavior

To live forever and ever.

Notes

I'm going to be upgrading to 0.11 soon, but I don't think that'll have any bearing on this issue.

Crashes on Heroku

Oban works great on my development machine, but when I deploy to Heroku the application refuses to start.

When I remove the line {Oban, Application.get_env(:myapp, Oban)} from the children list the application starts normally.

def start(_type, _args) do
    children = [
      MyApp.Repo,
      MyAppWeb.Endpoint,
      {Oban, Application.get_env(:myapp, Oban)}
    ]

    opts = [strategy: :one_for_one, name: MyApp.Supervisor]
    Supervisor.start_link(children, opts)
  end

The config details are:

config :myapp, Oban,
  repo: MyApp.Repo,
  prune: {:maxlen, 100_000},
  queues: [default: 5]

The logs given by Heroku don't seem to be helpful.

Here's a snippet of the logs:

2019-07-16T06:28:37.821079+00:00 heroku[web.1]: State changed from crashed to starting
2019-07-16T06:28:58.301285+00:00 heroku[web.1]: Starting process with command `MIX_ENV=prod mix phx.server`
2019-07-16T06:29:01.467000+00:00 app[web.1]: [heroku-exec] Starting
2019-07-16T06:29:02.466364+00:00 app[web.1]: mix.exs:15
2019-07-16T06:29:02.466366+00:00 app[web.1]:
2019-07-16T06:29:08.000526+00:00 app[web.1]: 06:29:08.000 [info] Running MyApp.Endpoint with cowboy 2.6.3 at :::12628 (http)
2019-07-16T06:29:08.054219+00:00 app[web.1]: 06:29:08.053 [info] Access MyApp.Endpoint at https://myapp.com
2019-07-16T06:29:08.400370+00:00 app[web.1]: 06:29:08.400 [info] Application myapp exited: shutdown
2019-07-16T06:29:09.923062+00:00 app[web.1]: {"Kernel pid terminated",application_controller,"{application_terminated,myapp,shutdown}"}
2019-07-16T06:29:09.923244+00:00 app[web.1]: Kernel pid terminated (application_controller) ({application_terminated,myapp,shutdown})
2019-07-16T06:29:09.923533+00:00 app[web.1]:
2019-07-16T06:29:12.706848+00:00 app[web.1]: Crash dump is being written to: erl_crash.dump...done

I've reviewed the erl_crash.dump, but didn't notice anything out of the ordinary, albeit the file is super long.

Thanks for the help!

Cron jobs are not compatible with Ecto 3.1.x

Environment

  • Oban Version: {:oban, "~> 0.11.0"}
  • PostgreSQL Version: psql (PostgreSQL) 11.5
  • Elixir & Erlang/OTP Versions (elixir --version) :
    Erlang/OTP 22 [erts-10.4.4] [source] [64-bit] [smp:16:16] [ds:16:16:10] [async-threads:1] [hipe]
    Elixir 1.9.1 (compiled with Erlang/OTP 22)

Current Behavior

Having jobs in crontab

config :merchant, Oban,
  repo: Merchant.Repo,
  queues: [queue: 10],
  verbose: :debug,
  crontab: [
    {"0 01 25 * *", Merchant.MonthlyJob},
  ]

Oban.Sheduler is terminating with error

GenServer Oban.Scheduler terminating
** (FunctionClauseError) no function clause matching in Ecto.Repo.Transaction.transaction/4
    (ecto) lib/ecto/repo/transaction.ex:5: Ecto.Repo.Transaction.transaction(Merchant.Repo, Merchant.Repo, #Function<1.128120480/1 in Oban.Crontab.Scheduler.lock_and_enqueue/1>, [log: :debug, timeout: 60000])
    (oban) lib/oban/crontab/scheduler.ex:95: Oban.Crontab.Scheduler.lock_and_enqueue/1
    (oban) lib/oban/crontab/scheduler.ex:70: Oban.Crontab.Scheduler.handle_info/2
    (stdlib) gen_server.erl:637: :gen_server.try_dispatch/4
    (stdlib) gen_server.erl:388: :gen_server.loop/7
    (stdlib) proc_lib.erl:249: :proc_lib.init_p_do_apply/3
Last message: {:continue, :start} 
State: %Oban.Crontab.Scheduler.State{circuit: :enabled, circuit_backoff: 30000, conf: %Oban.Config{crontab: [{%Oban.Crontab.Cron{days: [25], hours: [1], minutes: [0], months: [:*], weekdays: [:*]}, Merchant.MonthlyJob, []}], name: Oban, node: "arch-ryzen", poll_interval: 1000, prefix: "public", prune: :disabled, prune_interval: 60000, prune_limit: 5000, queues: [billing: 10], repo: Merchant.Repo, rescue_after: 60, rescue_interval: 60000, shutdown_grace_period: 15000, verbose: :debug}, poll_interval: 60000, poll_ref: nil}

Expected Behavior

Oban.Sheduler should not terminate with no function clause matching in Ecto.Repo.Transaction.transaction/4 error

This problem is specific to ecto 3.1.7 and probably earlier versions. After upgrading ecto to 3.2.5 everything works correctly. It may be possible that's because upgrading ecto also upgrades ecto_sql.

I can't update Ecto at this moment due to issues not connected to Oban.

Semantics of how to `:error` tuples should be treated as job failures

We are digging into integrating oban telemetry events with AppSignal and we are thinking of how oban could better support idiomatic ok/error tuples from application code to understand that something failed. We have something like this:

def perform(%{"id" => id}) do
  record = MyApp.Repo.get!(MyApp.Client, id)
  MyApp.Context.do_work(record)
end

Context.do_work/1 was originally written to return {:ok, %MyApp.Client{}} or {:error, error} and it should never raise an exception, but as oban ignores return values the job runs successfully and is never retried/discarded. We could have some code to manually raise/throw inside our job module, but it feels like something that Oban could handle by itself (I don't know how other elixir libraries deal with it).

I'm proposing that we change the following function:

def safe_call(%Job{args: args, worker: worker} = job) do
worker
|> to_module()
|> apply(:perform, [args])
{:success, job}
rescue
exception ->
{:failure, job, :error, exception, __STACKTRACE__}
catch
kind, value ->
{:failure, job, kind, value, __STACKTRACE__}
end

If perform/1 returns {:error, error}, I think safe_call should return {:failure, job, :error, error, __STACKTRACE__} and the job will be treated as failed, and the scenario where exceptions were raised should use :exception instead of :error for the kind prop (this makes easier for the event listeners to know it's an actual exception or some other data structure that represents the error).

What do you think?

First deploy: start the application without the migrations

Hi. First of all, thank you for the project!

We are running our first deploy with Oban and we had to remove it from the supervision tree for the first deploy. This is because our application couldn't find the table oban_jobs and was crashing because of this.

Our solution was to comment that line that add the Oban application to the supervision tree, deploy and migrate, comment out and deploy again.

Is there a best practice or a recommendation to deploy the application with Oban for the first time?

There is a detail about our project: we are using Mix release.

Telemetry and testing questions

Hi @sorentwo,

Great work on the library! I am really glad to see a database-based library with this robust feature set. I have two quick questions (if there is a better place to post them, please let me know):

  1. Regarding telemetry, we have been standardizing around the name :duration for the time measurement to execute something. So what do you think about replacing the current %{timing: timing} even in favor of %{duration: timing}? Other than that, thank you for using telemetry and help us push it as a standard in the community! πŸŽ‰ I also like how you structured the events around success and failure.

  2. Regarding testing, sometimes I may actually want the jobs to run (for example, during integration tests) because they may do important work that is needed for the continuation of the testing. Is it possible to have a testing function that executes all of the jobs in a given queue? This way I can do some action and execute all enqueued jobs in the test before proceeding.

Thanks!

Pruning beats separately from jobs

Is your feature request related to a problem? Please describe.

Our beats table is growing very quickly, and for the time being we're not pruning any completed or discarded jobs in the jobs table. Is it safe to prune the beats w/o negatively affecting the jobs?

Describe the Solution You'd Like

A sweeper that's independent of the pruner.

Describe Alternatives You've Considered

DELETE FROM oban_beats;

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    πŸ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. πŸ“ŠπŸ“ˆπŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❀️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.