Git Product home page Git Product logo

conductor-go's People

Contributors

boney9 avatar c4lm avatar coderabhigupta avatar dougsillars avatar gardusig avatar kalexmills avatar manan164 avatar rizafarheen avatar v1r3n avatar vikashegde-rapid avatar vuon9 avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar

conductor-go's Issues

Add Tests

Test coverage goal:

  • Authentication is supported for all the APIs
  • Update task by reference name (from Orkes playground)
  • Optional metrics usage
    • Avoid metrics server at opt out case
  • Recent changes for Python SDK

`taskRunner.WaitWorkers()` blocks forever because `sync.WaitGroup` will never be empty

When using taskRunner.WaitWorkers() I was curious if there is a way to gracefully shut the task runner down.
The documentation says:

WaitWorkers uses an internal waitgroup to block the calling thread until all workers started by this TaskRunner have been stopped.

Which implies, that after stopping all the workers the function WaitWorkers should stop blocking the calling thread.
After looking at the source code (sdk/worker/task_runner.go) I found out, that "stopping" a worker means setting it's batch size to zero. This correctly logs to the console, that the worker has been stopped, but the function WaitWorkers was still blocking the calling thread.

From inspecting the source code this behavior makes sense:

  1. The WaitWorkers function simply waits for the internal wait group to be empty/done.
    func (c *TaskRunner) WaitWorkers() {
        c.workerWaitGroup.Wait()
    }
  2. When a worker is started c.workerWaitGroup.Add(1) and go c.work4ever() are executed.
  3. The function work4ever handles the Done signal to the c.workerWaitGroup like this:
    func (c *TaskRunner) work4ever(taskName string, executeFunction model.ExecuteTaskFunction, domain string) {
        defer c.workerWaitGroup.Done()
        defer concurrency.HandlePanicError("poll_and_execute")
        for c.isWorkerRegistered(taskName) {
            c.workOnce(taskName, executeFunction, domain)
        }
    }
    This means that c.workerWaitGroup.Done() is only called when the for loop is done. The for loop only stops when c.isWorkerRegistered(taskName) returns false. So let's have a look at this function.
  4. Looking at the implementation of isWorkerRegistered it's clear, that the batch size of 0 does not affect the return value of isWorkerRegistered because it only checks if the task name is present as a key in the map.
    func (c *TaskRunner) isWorkerRegistered(taskName string) bool {
        c.batchSizeByTaskNameMutex.RLock()
        defer c.batchSizeByTaskNameMutex.RUnlock()
        _, ok := c.batchSizeByTaskName[taskName]
        return ok
    }

If this is the expected behavior, the documentation should clearly differentiate between a registered worker and a stopped worker. Also the difference between these two "states" and a paused worker is not clear to me.

In my case I want to dynamically modify the task runner via an REST API. This means adding and "removing" workers at runtime. As there is no clear way on how to "remove" a worker from a task runner I'm unsure if pausing is sufficient, if I should set the batch size to 0 or if I need to create a PR and add a RemoveWorker(...) functionality which actually deletes task name key from the batchSizeByTaskName map.

Hopefully someone can help me out on this issue and shine some light on the differences between the different "states" of a worker.

Edit: I have an additional question. Why is there a possible goroutine leak when calling Pause(taskName)?

Even though BATCH_SIZE=1 but worker dequeue 2 task while still first task not finished.

Issue:

Even though batch size is set as 1. Noticed that 2 tasks were dequeued by the worker.

RCA:

There is a race between getAvailableWorkerAmount() and executeAndUpdateTask() in workOnce() code. Here are the sequence of events:

STEPS to reproduced
BATCH_SIZE is set to 1 in my case.

  1. More than 1 tasks are present in the work queue.
  2. Worker is started and a task is dequeued and processing started but not finished yet for first task.
  3. But Worker dequeuing another task without checking as first task still running

FIX
Conductor GO SDK need to fix and below PR raised for same . Please take a look and merge it and new release .or let me know whats the process to get it fixed on condutor-go as its blocking our release.
#118

conductor version
conductor-go v1.3.2

Note:
it seems like the Java SDK client has moved away from this design and using TaskRunnerConfig to poll for tasks. But go client isn't quite there yet. But as this is blocking our release we fixed this issue on conductor-go client in below PR
#118

Improve new pkg setup docs

As a user, I would like to have the easiest possible onboard.

Some users might face missing imports. Consider reviewing current guide and add necessary steps as needed. Probably worth mentioning about go mod tidy - it might solve most problems related to missing package or failed import.

Parse JSON Workflow

Hello! Just wondering if there is a function available to parse a JSON workflow into a typed ConductorWorkflow object? Thank you!

Writing workers in go

Hi ,

I am running conductor as docker-compose in local, and wrote a simple program in go for worker - https://github.com/sandeep540/netflix_conductor

It's running, but not completing task and there are no logs, can you please help me in debuggin where am I going wrong,

P.S: I got it working with java, but we have requirement with go

RegisterTaskDef without any retry

Hi, just wonder is there any way to create a task with retryCount=0? CauseretryCount is an "omitempty" attribute.

Also try some other value like retryCount=-1 etc. however there's validation on server side

Maybe using *int32 for retryCount is a better way in TaskDef?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.