Git Product home page Git Product logo

camunda-worker-dotnet's Introduction

Camunda.Worker

codecov NuGet

Example

[HandlerTopics("sayHello", LockDuration = 10_000)]
[HandlerVariables("USERNAME")]
public class SayHelloHandler : IExternalTaskHandler
{
    public async Task<IExecutionResult> HandleAsync(ExternalTask externalTask, CancellationToken cancellationToken)
    {
        if (!externalTask.TryGetVariable<StringVariable>("USERNAME", out var usernameVariable))
        {
            return new BpmnErrorResult("NO_USER", "Username not provided");
        }

        var username = usernameVariable.Value;

        await Task.Delay(1000, cancellationToken);

        return new CompleteResult
        {
            Variables = new Dictionary<string, VariableBase>
            {
                ["MESSAGE"] = new StringVariable($"Hello, {username}!"),
                ["USER_INFO"] = JsonVariable.Create(new UserInfo(username, new List<string>
                {
                    "Admin"
                }))
            }
        };
    }
}

camunda-worker-dotnet's People

Contributors

amd989 avatar anserg256 avatar dependabot[bot] avatar ionut-openminds avatar johnacarvana avatar jtone123 avatar kimjor avatar technoberry avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar

camunda-worker-dotnet's Issues

pass variable across tasks & parallel task

Hi Alexey,
I am trying to evaluate this version for one of our dotnet core application. Able to setup and run for simple external tasks. But finding difficult how to implement bellow features:

  • Passing variable across task.
  • Parallel task setup (forking and join)
  • listening to RabbitMQ Event
  • Rest API integration

Any samplecode including model or suggestion would be really great.
Regards,
Umesh

Reading task's extension properties

Hi,

Is it possible to access extension properties of executed tasks? It is returned by fetch-and-lock request when setting includeExtensionProperties parameter.

Handle Camunda Rest Failures

I came across an issue when trying to complete a Service Task. Ultimately, the process I was trying to complete errors out during a downstream Script Task. This causes Camunda to issue a 500 as the response to the request external-task/{taskId}/complete.

This gave me some confusion because the task seems to complete without issue. However, the process would still be waiting on the same Service Task. Tracking this down, it looks like ExternalTaskCamundaClient is not handling failed requests.

private async Task<HttpResponseMessage> SendRequest(string path, object body, CancellationToken cancellationToken)
{
    var basePath = _httpClient.BaseAddress.AbsolutePath.TrimEnd('/');
    var requestPath = $"{basePath}/{path}";
    var response = await _httpClient.PostJsonAsync(requestPath, body, cancellationToken);
    return response;
}

I know this is still version 0, but I just wanted to bring it to your attention in case you were not aware.

I love this library! Keep up the good work!

Implement middleware pattern

Currently handling of external tasks can't be decorated. Decoration of handling of external tasks may provide benefits such as:

  • Automatic extending of lock duration
  • Distributed tracing of external tasks
  • Collecting of metrics of execution of external tasks

Thus, the implementation of middleware pattern is the most important to increase the flexibility and applicability of this library.

Use single service to provide FetchAndLockRequest

Background and motivation

Currently, 2 dependencies are used in the worker to create an instance of FetchAndLockRequest:

  • IOptions<FetchAndLockOptions> which provides WorkerId, MaxTasks, UsePriority, AsyncResponseTimeout
  • ITopicsProvider which provides set of FetchAndLockRequest.Topic. Existing implementation of this interface is implemented by converting set of HandlerMetadata to set of FetchAndLockRequest.Topic.

These 2 dependencies can be replaced with one that will allow to get an instance of FetchAndLockRequest.

Proposal

Add an interface:

interface IFetchAndLockRequestProvider
{
    FetchAndLockRequrest GetRequest();
}

Add a class:

class WorkerServiceOptions
{
    public string WorkerId { get; }
    public IEnumerable<HandlerDescriptor> HandlerDescriptors { get; }
}

Add to ICamundaWorkerBuilder a method for registering the implementation of a new interface with the following signature:

ICamundaWorkerBuilder AddFetchAndLockRequestProvider(
    Func<WorkerServiceOptions, IServiceProvider, IFetchAndLockRequestProvider> factory
);

Add an implementation of proposed interface that uses the 2 dependencies described earlier and register it in DI. Change code in DefaultCamundaWorker to use new IFetchAndLockRequestProvider.

Mark ITopicsProvider and ICamundaWorkerBuilder.AddTopicsProvider as obsolete

Memory leak

Hi,

It seems like we have discovered a memory leak when using the Camunda Worker package.
According to dotMemory we are building up types of:

  • CancellationTokenSource
  • HttpClient
  • ExternalTaskClient
    It slowly grows over time, with the rate increasing under load.

It seems to be caused by the way ExternalTaskClient is resolved inside ExternalTaskContext.

According to
https://docs.microsoft.com/en-us/dotnet/core/extensions/dependency-injection-guidelines#disposable-transient-services-captured-by-container
the container will hold onto these references forever.

Maybe the calls to GetRequiredService should be inside its own scope. I am not sure if transient services will be released on scope exit.

Have you noticed anything similar?

I will continue the investigation.

Exception when using Lamar for DI

When using the IoC tool Lamar, I am getting an exception when trying to run a worker. The exception occurs on the following line within the WorkerHostedService

protected override Task ExecuteAsync(CancellationToken stoppingToken)
{
    var activeTasks = Enumerable.Range(0, _options.WorkerCount)
        .Select(_ => _serviceProvider.GetRequiredService<ICamundaWorker>())
        .Select(worker => worker.Run(stoppingToken))
        .ToList();
    return Task.WhenAll(activeTasks);
}

The exception is as follows

Frame type Lamar.IoC.Resolvers.CastRootScopeFrame does not implement IResolverFrame
at Lamar.IoC.Instances.FuncResolverDefinition.BuildResolver()
at Lamar.IoC.Instances.GeneratedInstance.BuildFuncResolver(Scope scope)
at Lamar.IoC.Instances.GeneratedInstance.buildResolver(Scope scope)
at Lamar.IoC.Instances.GeneratedInstance.ToResolver(Scope topScope)
at Lamar.ServiceGraph.FindResolver(Type serviceType)
at Lamar.IoC.Scope.GetInstance(Type serviceType)
at Microsoft.Extensions.DependencyInjection.ServiceProviderServiceExtensions.GetRequiredService(IServiceProvider provider, Type serviceType)
at Microsoft.Extensions.DependencyInjection.ServiceProviderServiceExtensions.GetRequiredService[T](IServiceProvider provider)
at Camunda.Worker.Execution.WorkerHostedService.<>c__DisplayClass3_0.<ExecuteAsync>b__0(Int32 _) in C:\GitHub\camunda-worker-dotnet\src\Camunda.Worker\Execution\WorkerHostedService.cs:line 33
at System.Linq.Utilities.<>c__DisplayClass2_0`3.<CombineSelectors>b__0(TSource x)
at System.Linq.Enumerable.SelectIPartitionIterator`2.ToList()
at System.Linq.Enumerable.ToList[TSource](IEnumerable`1 source)
at Camunda.Worker.Execution.WorkerHostedService.ExecuteAsync(CancellationToken stoppingToken) in C:\GitHub\camunda-worker-dotnet\src\Camunda.Worker\Execution\WorkerHostedService.cs:line 32
at Microsoft.Extensions.Hosting.BackgroundService.StartAsync(CancellationToken cancellationToken)
at Microsoft.AspNetCore.Hosting.Internal.HostedServiceExecutor.<>c__DisplayClass3_0.<StartAsync>b__0(IHostedService service)
at Microsoft.AspNetCore.Hosting.Internal.HostedServiceExecutor.ExecuteAsync(Func`2 callback)

The root of the issue is that Lamar is unable to inject an IServiceScopeFactory into the DefaultCamundaWorker. I'm not 100% clear on why that is. However, replacing the IServiceScopeFactory with a IServiceProvider works as intended. This also appears to achieve the same behavior since the CreateScope function is the same across both an IServiceScopeFactory and IServiceProvider.

Variables from external task to camunda engine

Hi, I'am trying returning boolean variable from external dotnet core task to camunda engine switch in gateway component, but occourring exception in Camunda Incident view.
How should I do this?

Thank you, your library is very good!

image

Gateway expression
image

Camunda cockpit incident:
condition expression returns non-Boolean: result has class java.lang.String and not java.lang.Boolean

BackgroundService dies if you get a timeout on fetchAndLock

I have observed that my service stop processing tasks when I get a timeout on the http call to /external-task/fetchAndLock e.g. during restart of Camunda.
This error can also be reproduced by setting the AsyncResponseTimeout higher than the http client timeout.

It seems that the problem is in ExternalTaskSelector where OperationCanceledExceptions are not caught but, I guess, assumed to be part of the shutdown operation.
A proposed fix would be to also catch OperationCanceledExceptions in ExternalTaskSelector if cancellationToken.IsCancellationRequested is false

Multipe workers with different configuration

Hi Alexey

First of all thank you for implementing the worker library.
I am currently evaluating it, and I have a question. I would like to be able to use multiple workers with different configurations (number of workers). Would the following be a valid use of the worker library:

`

      services.AddCamundaWorker(options =>
            {
                options.WorkerId = "Worker_1";
                options.WorkerCount = 1;
                options.BaseUri = new Uri("http://localhost:8080/engine-rest");
            })
            .AddHandler<SayHelloHandler>()
            .ConfigurePipeline(pipeline =>
            {
                pipeline.Use(next => async context =>
                {
                    var logger = context.ServiceProvider.GetRequiredService<ILogger<Startup>>();
                    logger.LogInformation("Started processing of task {Id}", context.Task.Id);
                    await next(context);
                    logger.LogInformation("Finished processing of task {Id}", context.Task.Id);
                });
            });
        services.AddCamundaWorker(options =>
            {
                options.WorkerId = "Worker_2";
                options.WorkerCount = 4;
                options.BaseUri = new Uri("http://localhost:8080/engine-rest");
            })
            .AddHandler<SayHelloGuestHandler>()
            .ConfigurePipeline(pipeline =>
            {
                pipeline.Use(next => async context =>
                {
                    var logger = context.ServiceProvider.GetRequiredService<ILogger<Startup>>();
                    logger.LogInformation("Started processing of task {Id}", context.Task.Id);
                    await next(context);
                    logger.LogInformation("Finished processing of task {Id}", context.Task.Id);
                });
            });

`

Would this result in 2 workers - the first with only one worker count and only getting external tasks for the topic "sayHello" for task handler SayHalloHandler.
And a second worker with worker count 4 only getting external tasks for topic "sayHelloGuest" for task handler SayHelloGuestHandler?

If so, how are the TaskHandlers added using AddTaskHandler() tied to the worker?

Thanks.

BR
Michael

Add configurable delay between external task execution

Within the DefaultCamundaWorker, once all the tasks have completed, they are immediately run again. See the code below:

while (!cancellationToken.IsCancellationRequested)
{
    var externalTasks = await SelectExternalTasks(cancellationToken);

    var activeAsyncTasks = externalTasks
        .Select(CreateContext)
        .Select(ExecuteInContext)
        .ToList();

    await Task.WhenAll(activeAsyncTasks);
}

When there are no tasks in any of the topics, this while loop iterates very quickly. On each iteration, the code calls out to POST /external-task/fetchAndLock. This can cause a very high volume of requests to Camunda when there are many workers.

I have not faced any challenges yet in regards to this. However, I do not need my workers to be responding so quickly to tasks. It would be ideal if I was able to set a delay on the worker so that I can reduce the overall load on my Camunda server.

Retry count not handled

Hi @AMalininHere
I'm using your very good library thanks
But one thing I encountered was that all tasks are repeatedly fetching and executing because retry count does not decrease on every execution or I didn't make it to.
Thank you again

Support for other IoC providers

I'm liking the look of this library for implementing camunda workers, however, it seems to be tied heavily to the MS provided IoC container.

Would it be possible to extend this to support other IoC providers like SimpleInjector?

Add custom HTTPHeaders to Camunda network calls

Hey,

Our instance of camunda requires an Authorization header to validate users hitting Camunda's API. I was looking at HttpClientExtensions would it be easy to inject CamundaWorkerOptions and reference a new Dictionary of headers to add in? something like this:

 internal static async Task<HttpResponseMessage> PostJsonAsync(this HttpClient client,
            string path,
            object requestBody,
            CancellationToken cancellationToken = default, IOptions<CamundaWorkerOptions> camundaOptions) 
        {
            using var stream = new MemoryStream();
            using var streamWriter = new StreamWriter(stream);
            using var jsonWriter = new JsonTextWriter(streamWriter);

            Serializer.Serialize(jsonWriter, requestBody);
            await jsonWriter.FlushAsync(cancellationToken);

            stream.Position = 0;

            var requestContent = new StreamContent(stream);
            requestContent.Headers.ContentType = new MediaTypeHeaderValue(JsonContentType)
            {
                CharSet = Encoding.UTF8.WebName
            };

            foreach (var header in camundaOptions.Value.CustomHttpHeaders.Keys)
            {
                requestContent.Headers.Add(header, camundaOptions.Value.CustomHttpHeaders[header]);
            }

            var response = await client.PostAsync(path, requestContent, cancellationToken);
            return response;
        }

That way I could specify my headers on startup:

     
            services.AddCamundaWorker(options =>
            {
                options.BaseUri = new Uri(camundaRestApiUri);
                options.WorkerCount = 1;
                options.CustomHttpHeaders = new Dictionary<string, string> { { "Authorization", "secret" } };
            })

I would be interested to know what you think.

How to inject JWT token to the client

Hello,
my instance of camunda is secured by JWT token.
I can generate and add token to the client, but since token will expire after some minutes, and client have one instance it will fail after some time.

 builder.Services.AddExternalTaskClient(client =>
    {
        client.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue("Bearer", token);
        client.BaseAddress = new Uri( "default/" );
    });

So How I can destroy httpClient instance or reinitialize token?

Also if I would like to use any dependency in that method I need to build serviceProvider.
builder.Services.BuildServiceProvider()

Thanks

Support for tenant id for fetching tasks

Hi,

Is it possible to consider adding the tenant id to the configuration on startup so that only tasks related to the desired tenant will be fetched from the external task API?

A little more documentation please

Greetings,
I'm really interested in your work and supporting your efforts if I can.
I'm a little confused on some of the changes you've implemented since 0.10. Can you provide some comments in the source code or more in the ReadMe?
For instance..

  • What are the different methods doing in the worker builder? I get AddHandler, but what are the others doing?
  • What is the impact of the worker count?
  • What's the proper use of the [HandlerVariables] attribute?
  • How can I get all of the variables?
  • How should I deal with External task topics that I don't have a specific handler for? Can I have some kind of catch all?
  • What's the proper use of the CompleteTask object?
  • What configuration do I need if I'm running multiple instances of my client code? Unique worker names?

Again.. really thankful for your work. When do you think you'll get to version 1.0? And when will the breaking changes slow down?

Support all variable types

Currently not for all variable types present helpers for creation of variable (e.g. Variable.String("Some string")) and reading a value from variable (e.g. variableInstance.AsString()). It would be great to add helpers for following types:

  • Date
  • File
  • Object

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.