Git Product home page Git Product logo

kaptinlin / queue Goto Github PK

View Code? Open in Web Editor NEW
2.0 1.0 1.0 86 KB

A Go queue management library designed for robust and scalable job processing, leveraging Asynq and Redis. It supports advanced features such as error handling, retries, priority queues, and rate limiting, ideal for tasks from simple to complex in distributed systems.

Go 98.49% Makefile 1.51%
queue task-scheduler job-queue job-scheduler task-runner

queue's Introduction

Golang Queue Processing Library

This library offers a robust and flexible solution for managing and processing queued jobs in Go applications. Built on top of the Asynq task processing library, which uses Redis for storage, it provides advanced features like custom error handling, retries, priority queues, rate limiting, and job retention. Whether you're building a simple task runner or a complex distributed system, this library is designed to meet your needs with efficiency and ease. It also supports the setup of multiple workers across different machines, allowing for scalable and distributed job processing.

Getting Started

Installation

Ensure your Go environment is ready (requires Go version 1.21.4 or higher), then install the library:

go get -u github.com/kaptinlin/queue

Configuring Redis

Set up your Redis connection with minimal hassle:

import "github.com/kaptinlin/queue"

redisConfig := queue.NewRedisConfig(
    queue.WithRedisAddress("localhost:6379"),
    queue.WithRedisDB(0),
    queue.WithRedisPassword("your_password"),
)

Client Initialization

Create a client using the Redis configuration:

client, err := queue.NewClient(redisConfig)
if err != nil {
    log.Fatalf("Error initializing client: %v", err)
}

Job Enqueueing

Enqueue jobs by specifying their type and a structured payload for clear and concise data handling:

type EmailPayload struct {
    Email   string `json:"email"`
    Content string `json:"content"`
}

jobType := "email:send"
payload := EmailPayload{Email: "[email protected]", Content: "Welcome to our service!"}

_, err = client.Enqueue(jobType, payload, queue.WithDelay(5*time.Second))
if err != nil {
    log.Printf("Failed to enqueue job: %v", err)
}

Alternatively, for direct control over job configuration, use a Job instance:

job := queue.NewJob(jobType, payload, queue.WithDelay(5*time.Second))
if _, err := client.EnqueueJob(job); err != nil {
    log.Printf("Failed to enqueue job: %v", err)
}

This approach allows you to specify additional job options such as execution delay, directly within the Job object.

Handling Jobs

Define a function to process jobs of a specific type. Utilize the EmailPayload struct for type-safe payload handling:

func handleEmailSendJob(ctx context.Context, job *queue.Job) error {
    var payload EmailPayload
    if err := job.DecodePayload(&payload); err != nil {
        return fmt.Errorf("failed to decode payload: %w", err)
    }

    log.Printf("Sending email to: %s with content: %s", payload.Email, payload.Content)
    // Implement the email sending logic here.
    return nil
}

To achieve scalable and distributed job processing, you can register your function and start workers on different machines. Each worker independently processes jobs enqueued by the client:

worker, err := queue.NewWorker(redisConfig, queue.WithWorkerQueue("default", 1))
if err != nil {
    log.Fatalf("Error creating worker: %v", err)
}

err = worker.Register("email:send", handleEmailSendJob)
if err != nil {
    log.Fatalf("Failed to register handler: %v", err)
}

if err := worker.Start(); err != nil {
    log.Fatalf("Failed to start worker: %v", err)
}

Graceful Shutdown

Ensure a clean shutdown process:

func main() {
    // Initialization...

    c := make(chan os.Signal, 1)
    signal.Notify(c, os.Interrupt)
    <-c

    if err := client.Stop(); err != nil {
        log.Fatalf("Failed to stop client: %v", err)
    }
    worker.Stop()
}

Advanced Features

Learn more about the library's advanced features by exploring our documentation on:

Contributing

We welcome contributions! Please submit issues or pull requests on GitHub.

License

This library is licensed under the MIT License.

Credits

Special thanks to the creators of neoq and Asynq for inspiring this library.

queue's People

Contributors

kaptinlin avatar dependabot[bot] avatar

Stargazers

Tejas Sawant avatar  avatar

Watchers

 avatar

Forkers

tejas456sawant

queue's Issues

Not able to create new queue

Hello @kaptinlin, I was trying to enqueue my msgs into different queue other than default queue. But the function queue.WithQueue() is not able to override the deafult queue.

Below is the sample code:

func SendTelegramMessageToQueue(payload models.QueueTask) error {
	job := queue.NewJob(telegramJobType, payload, queue.WithQueue("telegram"))
	_, err := client.EnqueueJob(job)
	if err != nil {
		log.Printf("Failed to enqueue job: %v", err)
	}
	return nil
}

The above code enqueues in default queue, in spite of using telegram as the queue.

Also can you provide some insights on how the workers will work to dequeue the msgs, when not in default queue.
Everything works great when in default queue. For my use case I need to create multiple queues. Any help would be highly appreciated.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.