Git Product home page Git Product logo

queue's Introduction

esync is a library to help keep some services synchronised with other services.

Should be safe for multiple processors to run simultaneously. A driver's pop() function should not return the same value at the next call.

Usage

The following is a basic example of how to use this library.

package main

import (
	"log"
	"os"
	"time"

	"github.com/episub/spawn/queue"
)

// Use a scheduled action to run tasks at intervals
type myScheduledAction struct{}

func (m myScheduledAction) Do() error {
	log.Printf("myScheduledAction done")
	return nil
}

func (m myScheduledAction) Stream() string {
	return "my-scheduled-action"
}

// Use a queue action to handle tasks in a queue
type myQueueAction struct{}

func (m myQueueAction) Do(task queue.Task) (queue.TaskResult, queue.TaskMessage) {
	log.Printf("myQueueAction done.  Task key: %s", task.Key)

	return queue.TaskResultSuccess, queue.TaskMessage("No Problem")
}

func queueErrorManager(err error) {
	log.Printf("Error from sync manager: %s", err)
}

func main() {
	dbHost := os.Getenv("DB_HOST")
	dbUser := os.Getenv("DB_USER")
	dbPass := os.Getenv("DB_PASS")
	dbName := os.Getenv("DB_DB")
	dbTable := os.Getenv("DB_TABLE")

	postgresDriver, err := queue.NewPostgresDriver(dbUser, dbPass, dbHost, dbName, dbTable)

	if err != nil {
		log.Fatal(err)
	}

	sm := queue.NewSyncManager(postgresDriver)

	// Optionally set an error handler so that you can catch errors from the running loop and put them through your own logging solution
	sm.SetErrorHandler(queueErrorManager)

	// Schedule a regular action to perform at specified intervals
	sm.Schedule(myScheduledAction{}, time.Second*3)

	// Register our queue handler for tasks with name "exampleTask"
	sm.RegisterTaskHandler(myQueueAction{}, "exampleTask")
	data := map[string]interface{}{
		"testData": true,
	}

	tm := queue.NewTaskManager(postgresDriver)

	// Add one task to the queue
	err = tm.AddTask("exampleTask", "myKey", data)
	if err != nil {
		panic(err)
	}

	// Off we go
	sm.Run()
}

It may be possible in future versions for the same action to be used simultaneously, so be careful with pointer functions that may end up sharing values across goroutines. Avoid pointer functions where possible.

Details

You need to create an actionManager object, providing it a database driver object that meets the 'Driver' interface. Included drivers:

  • PostgreSQL (PostgresDriver)

You can use this library for either creating a service to run the synchronising actions, or for creating entries in a queue to be acted on by the synchronisation service. At the very least you need a SyncManager.

Concepts

  • Data: an action can store data in the queue
  • Task Key: this should uniquely identify a particular action. Think of it as the primary key, though it may not be the actual primary key, depending on driver implementation. If there is more than one READY entry for the same task key, only the most recent will be performed.
  • Task Name: this identifies the type of task. Action managers may handle particular task types. For example, you may have a task name such as "CUSTOMER_UPDATE", with multiple database entries of that sort. Try to keep one action per task name.
  • Stream: some tasks can be run simultaneously, while others may need to block. Put them in the same stream if they should block each other, and separate streams if safe to run concurrently.

Actions

Actions are descriptions of an act to perform. You provide them with a function that will run when the action is to be performed. You will then need to schedule the action to occur, either at specific intervals, or as the action to be performed by a particular queue item.

Actions should be designed to be safe to be used by multiple processes. Therefore, avoid pointers.

Actions should gracefully return if they take too long, as they will block the main loop.

Register action

Actions need to be registered for each task name. If there is no registered action for a task name, then the particular task is cancelled when its turn comes.

Queue

One will wish to create actions in the queue to be performed in good time. Not every action needs to form part of a queue, but it is helpful to be able to queue actions to be performed in time. To use the queue, you need a driver that provides a connection to the queue. The driver needs to fulfil the 'Driver' interface.

SyncManager

Running

In some cases, another service may not handle multiple connections well -- for example, NetSuite. In these cases you should ensure that you are only running one instance of this service.

Driver

When designing a driver, you need to be careful that you don't implement a 'pop' that will ignore newer tasks. Suppose that a task to update a customer is added, actioned, but before the action is finished a new update customer task is added. You then return the action and mark it as finished. This task should be performed again, so you need to be careful that the "mark as finished" task does not override the newer update task.

PostgreSQL

CREATE TABLE public.message_queue(
	message_queue_id uuid NOT NULL DEFAULT gen_random_uuid(),
	data jsonb NOT NULL DEFAULT '{}',
	task_key varchar(64) NOT NULL,
	task_name varchar(64) NOT NULL,
	created_at timestamptz DEFAULT Now(),
  created_by varchar(64) NOT NULL,
	last_attempted timestamptz NOT NULL DEFAULT Now(),
	state varchar(16) NOT NULL,
	last_attempt_message varchar NOT NULL,
  do_after timestamptz NOT NULL DEFAULT Now(),
	CONSTRAINT message_queue_id_pk PRIMARY KEY (message_queue_id)
);

CREATE TABLE public.cdc_hash(
	cdc_hash_id uuid NOT NULL DEFAULT gen_random_uuid(),
	cdc_controller_id uuid NOT NULL,
	object_id varchar NOT NULL,
	hash uuid,
	created_at timestamptz NOT NULL DEFAULT Now(),
	updated_at timestamptz NOT NULL DEFAULT Now(),
	CONSTRAINT cdc_hash_pk PRIMARY KEY (cdc_hash_id),
	CONSTRAINT cdc_hash_controller_object_uq UNIQUE (cdc_controller_id,object_id)
);

CREATE INDEX idx_cdc_hash_id ON public.cdc_hash (cdc_controller_id, object_id);
CREATE INDEX idx_cdc_hash_id_hash ON public.cdc_hash (cdc_controller_id, object_id, hash);

)

TODO:

  • Implement a timeout so that if some action blocks, then we can perform other actions
  • Per-task retry intervals so that some tasks can be tried every minute, while others wait every hour. Solution: separate queue runners per task type. They can then check for and pop tasks that are specific to their task name. When a task is registered, an interval is passed in the register call, and a new queue is spun up for that task.

queue's People

Contributors

myrovh avatar charles-dahl avatar

Watchers

James Cloos avatar  avatar

queue's Issues

Skip locked may not be working correctly

For a separate project, using a similar query, it appeared that a second process trying to pull tasks off would sometimes get none back, instead of the next unlocked one. The following query, similar to queue's query, did not work well (the returning

WITH u AS (
  SELECT
    repository_id AS target_repository_id,
    format(full_path, path) AS final_path
  FROM site.repository
  JOIN site.provider p ON p.provider_id = provider_id_provider
  WHERE last_checked IS NULL or last_checked < Now() - INTERVAL '1 hour'
  ORDER BY last_checked asc
  FOR UPDATE SKIP LOCKED
  LIMIT 1
)
UPDATE site.repository
SET last_checked = Now()
FROM u
WHERE repository_id = target_repository_id
RETURNING repository_id, final_path

Whereas this one worked as expected:

with selected AS (
  UPDATE site.repository
  SET last_checked = Now()
  WHERE repository_id = (
    SELECT
      repository_id
    FROM site.repository
    WHERE last_checked IS NULL or last_checked < Now() - INTERVAL '1 hour'
    ORDER BY last_checked, repository_id asc
    FOR UPDATE SKIP LOCKED
    LIMIT 1
  )
  RETURNING repository_id
)
SELECT
  repository_id AS target_repository_id,
  format(full_path, path) AS final_path
FROM site.repository
JOIN site.provider p ON p.provider_id = provider_id_provider
WHERE repository_id = (select repository_id from selected)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.