Git Product home page Git Product logo

dexecutor-core's Issues

Handling multiple requests

Hi @mnadeem ,

I was trying a poc to use the library in our production environment and had a question. If I added 7 independent graphs and executed them at the same time with 2/3 threads.I observed that the execution followed a level order traversal even with different sleep time of threads.

Can we change that to a customised version i.e. for my use case a dfs approach would be better so that 2 - 3 users are served before picking up more requests.

Control Task execution order

Hi @mnadeem

Can you please help me with how do I control the order of task execution. For example, the independent tasks are A, B, and C. In the thread count of 3, I should retain the same order of A, B, and C. But the actual result is sometimes

  • B, A, and C
  • C, B, and A

How do I retain the natural order of A, B, and C. Please help

Task.endTime is null when arriving to listener (onSuccess, onError)

When using a listener to monitor task executions, the endDateTime is always null.
The cause seems to be that, in the DefaultExecutionEngine.class, the method markEnd is called after the notification of the listener.

On this subject, it would be nice (at least for me) to have on the listener an onStart event for the tasks start execution.

API to execute only a subset of the dependencies

Hi,

I think there's no way, but just leaving it here as a feature idea (Filtering on leaf nodes is actually enough for me).

The "real-world" use case:

  1. Lets say a have a tool which executes a plugin graph writing N entities (i.e. leaf tasks write one entity each), but I only want one of the entities to be written in some API calls/executions because I know all others are already correctly written. I would want to skip all plugins which are not needed to write all other entities.

The technical question:

Is there an API/ExecutorConfig option to launch "filtered" execute? (i.e. in the following example, once I have the dependency graph). Based in the following example:
https://github.com/dexecutor/dexecutor-core/wiki#usage

Something like

executor.execute(ExecutionConfig.NON_TERMINATING, List.of(4,3)); -- This would only launch task 1,2,3,4 and skip all others.

In a more complex example:
dexecutor-graph.png

Something to only calculate "app1-2". I.e.

executor.execute(ExecutionConfig.NON_TERMINATING, List.of("app1-2")); --  This would only launch the tasks on the left branch and skip all others

If not, as an alternative, is there any Traverser which allows to get the "reversed" dependency paths? I.e. from a destination dependency (like 4), get all the Tasks/Graph nodes which will be executed to reach it. I think I can create this one by using getLeafNodes + getInComingNodes() for each of them (of the ones I'm interested on).

Thanks beforehand!,

Add judging condition for function areAlreadyProcessed

Hello!
I use DExecutor for task scheduling, and I find that in some case, one node is skiped, but it's out coming node will still be executed. After debug for hours, I find that modifing the code here may be helpful.
I changed this place to "return this.processedNodes.stream().filter(Node::isSuccess).collect(Collectors.toSet()).containsAll(nodes);",and it worked.

return this.processedNodes.containsAll(nodes);

DefaultEngine will throw a NPE when a task throws a Error

private Callable<ExecutionResult<T, R>> newCallable(final Task<T, R> task) {
		return new IdentifiableCallable<T, ExecutionResult<T,R>>() {

			@Override
			public ExecutionResult<T, R> call() throws Exception {
				R r = null;
				ExecutionResult<T, R> result = null;
				try {
					task.markStart();
                                        executionListener.onStart(task);
					r = task.execute();  // throw a Error
					result = ExecutionResult.success(task.getId(), r);
					state.removeErrored(result);
					task.markEnd();
					executionListener.onSuccess(task);
				} catch (Exception e) { //cannot catch Error 
					result = ExecutionResult.errored(task.getId(), r, e.getMessage());
					state.addErrored(result);
					task.markEnd();
					executionListener.onError(task, e);
					logger.error("Error Execution Task # {}", task.getId(), e);
				} finally {		
                                         // result is NULL			
					result.setTimes(task.getStartTime(), task.getEndTime());
				}
				return result;
			}

			@Override
			public T getIdentifier() {
				return task.getId();
			}
		};
	}

Timeout and cancellation support

Hi, thanks for making this amazing library! I'm trying to make a requests execution flow as efficient as possible.

To give an example, say E depends on B and B, C & D depend on A.

         A
      /  |   \
    B    C    D
    |
    E

Currently, we are using a concept of execution rounds, where we define sets of requests to be executed in parallel, using an executor service and futures, so for this case:

Step 1. Execute A
Step 2. Execute B, C, D
Step 3. Execute E

But we want to do a dependency graph based execution instead, as we want E to be executed as soon as B completes, instead of waiting for all of B, C & D to complete.

For this we are adapting our execution logic to use Dexecutor. But one one missing feature which is stopping us from migrating is the ability to set timeout for and cancel the underlying scheduled task. Is this support planned? Has this been skipped on purpose due to implementation details?

Run particular independent task sequentially

I am using this package in my automation project. Some tasks (automation test case) have IE browser mapped, as per selenium is not recommended to run the IE test cases in parallel. To avoid running it in parallel I made it sequential by marking dependency something like this.

IE.TC1, IE.TC2

TC2 depends on TC1, So handled error scenario in should Execute method. But my requirement is to run them as independent tasks and sequentially (one after the other). Is there any other better approach?

Thanks
Krishna

Support for persitence, pause / resume executions

What would be the best way to incorporate the following requirements:

  • Support for pause / resume executions (async) - Receive a "Pause" command for an execution should finish running tasks but not trigger next tasks, until a "Resume" command is issued
  • How could this model be persisted, thinking, for instance, on a Stop / Start of the JVM or the previously refered Pause / resume? I'm thinking on some logic, maybe on Listener...
  • Both of previous scenarios in a Distributed context

Thanks in advance!

How to stop the task or the entire graph from executing?

Hi,
I'm trying to use Dexeutor as a workflow engine in my Sping project. Basically I'm using Restful API to controll the workflow. When the workflow is submitted and running, I need to stop it from running manully.
ex:

public class WorkFlowManager {
	...
	private void buildGraph(String workflowId) {
		// build graph;
	}

	public void execute(String workflowId) {
                buildGraph(workflowId);
		this.dexecutor.execute(ExecutionConfig.TERMINATING);
	}

	public void stop(String workflowId) {
                // **how do I do this?**
        }
}

Controllers:

public class WorkflowController {
	final WorkflowManager workflowManager;

	public void execute(String workflowId) {
		return workflowManager.execute(workflowId);
	}

	// **How to do this?**
	public void kill(String workflowId) {
		return workflowManager.stop(workflowId);
	}

Fetch the result of processed nodes

This is related to this issue #10

@mnadeem I looked into the Dexecutor.execute method here https://github.com/dexecutor/dexecutor-core/blob/master/src/main/java/com/github/dexecutor/core/DefaultDexecutor.java#L121-L141 but it seems this method only returns the execution result of those nodes which are errored but not those which are successfully processed.

Previously, i used the dencapsulation class provided by Jmockit to fetch the results of processed nodes similar to what has been done here in the example provided https://github.com/dexecutor/dexecutor-core/wiki

However, the dencapsulation class has been deprecated from JMockit and thus I no longer can use the above solution. Can you please provide the insights on how i can fetch the results of processed nodes?

Apologies for the late reply.

Question on Task's ID type

This is not really an issue but a question. The requirement for the Task's ID (T) is that is implements Comparable. Why? There is nothing that calls the API compareTo (the only requirement for Comparable) and nothing the T is used by requires it that I could see. Was this a prior implementation when the ID was used for comparison order?

Traverser Priority

How do I create a traverser that considers the number of dependents and offers the node with the most outgoing edges first?

Timeout issue

Hey many thanks for the wonderful library.
We are scheduling some long running jobs.

However we keep getting the exception :-

xception, elapsed time: 1800000
at com.vnum.road.workflowservice.jobpipe.RoadJobStepTask.execute(RoadJobStepTask.java:190)
at com.vnum.road.workflowservice.jobpipe.RoadJobStepTask.execute(RoadJobStepTask.java:38)
at com.github.dexecutor.core.task.LoggerTask.execute(LoggerTask.java:45)
at com.github.dexecutor.core.DefaultExecutionEngine$1.call(DefaultExecutionEngine.java:94)
at com.github.dexecutor.core.DefaultExecutionEngine$1.call(DefaultExecutionEngine.java:87)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at

Can you please suggest a solution ?

Spring Support

Hi, i am using this lib for long time now and its great!
Thanks for all the hard work to make this possible.

Currently i am working with

  • spring-boot (latest)
  • java 1.8

and trying to get your lib working with spring.
Did you ever used this in spring-boot?

My Goal --> use inside a Task the spring-context

I am using the SpringContext (ApplicationContextAware) (described here) to access many other content in Unmanaged-Spring-Classes but in the Tasks-Class, where i implement my doings, it doesn't work.

Maybe any ideas? Maybe to add configurable spring-support or workarounds

And again, great lib! thanks!!!

How to generate visual pictures

How to generate visual pictures

DefaultDexecutor<Integer, Integer> executor = newTaskExecutor();
//Building
executor.addDependency(1, 2);
executor.addDependency(2, 3);
executor.addDependency(3, 4);
executor.addDependency(4, 5);

Task submitted for execution more than once when parent node is skipped

Hello. I am using your library to schedule tasks in directed acyclic graphs (DAGs). I implemented the "shouldExecute" method so that I can choose whether a node is executed based on whether at least one parent node has a success result (ANY) or if all of them have terminated successfully (ALL).

@Override
    public boolean shouldExecute(ExecutionResults<String, Boolean> parentResults) {
        boolean shouldExecute = true;

        if (parentResults.hasAnyResult()) {
            List<Boolean> results = parentResults.getAll().stream()
                    .map(ExecutionResult::getResult)
                    .collect(Collectors.toList());

            if (shouldExecuteConfig.equals(SHOULD_EXECUTE_ALL)) {
                // All parents must end with success
                shouldExecute = results.stream().allMatch(Boolean.TRUE::equals);
            } else if (shouldExecuteConfig.equals(SHOULD_EXECUTE_ANY)) {
                // At least one parent must end with success
                shouldExecute = results.stream().anyMatch(Boolean.TRUE::equals);
            }
        }
        return shouldExecute;
    }

By doing that, I encountered an error when executing the next type of DAG:
image
where stages 1, 2 and 3 are of type ALL and stage final is type ANY.
In this scenario, stage final should be executed when stages 1 2 and 3 finish, when at least one of them finishes with success.
In the case where stage 1 finishes with success and stage 2 with error, stage 3 is skipped and stage final is executed twice, which is wrong.

2024-07-12 11:24:45 INFO  Paths: 
Path #0
Stage 1[] 
Stage 2[Stage 1] 
Stage 3[Stage 2] 
Stage final[Stage 1, Stage 2, Stage 3] 

2024-07-12 11:25:26 INFO Finished with success Stage 1
2024-07-12 11:26:02 ERROR Finished with error Stage 2
2024-07-12 11:26:02 INFO  Started Stage final
2024-07-12 11:26:02 INFO  Started Stage final
....

A work around to solve this problem is to create another status to the node entity: SUBMITTED. This way a task will only be submitted once to the execution engine.

enum NodeStatus {
      SUBMITTED,ERRORED,SKIPPED,SUCCESS,CANCELLED;
}
private void doExecute(final Collection<Node<T, R>> nodes, final ExecutionConfig config) {
		for (Node<T, R> node : nodes) {
			forceStopIfRequired();
			if (this.state.shouldProcess(node) && !node.isSubmitted()) { // check is node is already submitted
				Task<T, R> task = newTask(config, node);
				ExecutionResults<T, R> parentResults = parentResults(task, node);
				task.setParentResults(parentResults);
				task.setNodeProvider(new DefaultNodeProvider<T, R>(state));
				if (node.isNotProcessed() && task.shouldExecute(parentResults)) {					
					this.state.incrementUnProcessedNodesCount();
					logger.debug("Submitting {} node for execution", node.getValue());
					this.executionEngine.submit(task);
					node.setSubmmitted(); //set node status to submitted 
				} else if (node.isNotProcessed()){
					node.setSkipped();
					logger.debug("Execution Skipped for node # {} ", node.getValue());
					this.state.markProcessingDone(node);
					doExecute(node.getOutGoingNodes(), config);
				}
			} else {
				logger.debug("node {} depends on {}", node.getValue(), node.getInComingNodes());
			}
		}
	}

I expect your comments on this solution, or on any other approach that you may think of. Feel free to add this changes to your repo if you feel it is adecuate. Thank you.

Query the graph?

I'm looking for a way to query the dependency graph to extract a subgraph for execution when a dependency has changed. Can't find anything in the documentation - is this supported?

Get the results of the nodes according to one's need.

I am working on my college project in which I have tried implementing the dexecutor's approach. After creating the dependency graph , the nodes are getting executed and I have verified this by printing the responses for every node during its execution. But at the end , when the execution is completed, if now I want a particular node's response, how will I get it? I have tried getting the response of ExecutionResults but it gives me empty response and which is logical because, execution results returns all the errored nodes. But , what is the actual preferred approach?

Fetch the response of all the tasks

how do i fetch the response of all the tasks executed after the executor.execute() is complete, i found one way in here https://github.com/dexecutor/dexecutor-core/wiki

Collection<Node<Integer, Integer>> processedNodesOrder = TestUtil.processedNodesOrder(executor);

public static Collection<Node<Integer, Integer>> processedNodesOrder(DefaultDexecutor<Integer, Integer> executor) {
DefaultDexecutorState<Integer, Integer> state = Deencapsulation.getField(executor, "state");
return Deencapsulation.getField(state, "processedNodes");
}

but it seems like this should not be the ideal way to fetch the response, can you please help me here?

How to specify a taskId to start with programmatically?

Task graph example:

         A
      /  |   \
    B    C    D
    |
    E

In my use case, I have several scenarioes, for example, in scenario 1 is to execute from task A, while in scenario 2, I can specify executor to start with task B, so only task B and task E will be executed.
So how do I do this?
I'm thinking about something like this:

public void runDexecutor(int scenario) {
	LOGGER.debug("Running..");
	DefaultDexecutor<Integer, Integer> executor = newDexecutor();
	addDependency(executor);

	if (scenario == 1) {
		executor.execute(ExecutionConfig.NON_TERMINATING);
	}
	if (scenario == 2) {
		// How to do this?
		executor.execute(ExecutionConfig.NON_TERMINATING, taskIdToStartWith);
	}
}

Distributed Execution Targeting

Just wondering, is the intent behind the distributed execution for distributing tasks to be executed across a pool of nodes? is it possible to target tasks to identified nodes such as;

Task 1 -> Node 1
Task 2 -> Node 2
Task 3 -> Node 3
Results -> Master/Parent?

i could see where something like this would enable some interesting capabilities for distributed microservice architectures where each node performs a specific tasks.

Thanks!

Stuck on executing parallel task n-1 of total n tasks

We have been running into an issue where in some of the cases, we are experiencing no progress. For some cases, we are at

Completed parallel task 99 of 100

and then we are not seeing any progress. There is no progress on task 100, and therefore the barrier for the thread pool is never met.

We took thread dumps of the situation to gather context about the problem. It looks like multiple threads are in WAITING (parking) state to obtain a lock. But we don't see that lock being obtained by anyone in the thread dump. One instance of it looks like the following where we can't find anyone obtaining 0x000000071882bf80 in the thread dump but multiple threads waiting for it.

"pool-2-thread-4" #65 prio=5 os_prio=0 cpu=1742209.88ms elapsed=13615.90s tid=0x00007f6ad405f800 nid=0x246c waiting on condition  [0x00007f6bb1999000]
   java.lang.Thread.State: WAITING (parking)
        at jdk.internal.misc.Unsafe.park([email protected]/Native Method)
        - parking to wait for  <0x000000071882bf80> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park([email protected]/LockSupport.java:194)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await([email protected]/AbstractQueuedSynchronizer.java:2081)
        at java.util.concurrent.LinkedBlockingQueue.take([email protected]/LinkedBlockingQueue.java:433)
        at java.util.concurrent.ThreadPoolExecutor.getTask([email protected]/ThreadPoolExecutor.java:1054)
        at java.util.concurrent.ThreadPoolExecutor.runWorker([email protected]/ThreadPoolExecutor.java:1114)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run([email protected]/ThreadPoolExecutor.java:628)
        at java.lang.Thread.run([email protected]/Thread.java:834)

   Locked ownable synchronizers:
        - None

I'm attaching the full thread-dump at the end here.

thread-dump.txt

deexecuter example for TableMigration Task

Hi Nadeem,
first of all i must say this is very nice work. Thanks for doing this for us.
I have below use case for which I want to use this API
I have text file which contains certain steps to be executed for each table and my txt file as give as below.
1|-1|0|Clear Old data <cleanup.sh>|${LOG_DIR}/{table_name}.log
2|1|0| Create Table <createTable.sh>|${LOG_DIR}/{table_name}.log
3|2|0| Insert into Table <insertIntoTable.sh>|${LOG_DIR}/{table_name}.log

where
first element is Task
Second is dependent Task
Third ins Retry
Fourth is actual script<eg. .sh or sql>

I tried modifying the xml (given in one your Migration example )like below

select * from table1, table3, table4 where xyx= 1;
<task module="Create Data Output Dir" taskId="2" version="1.0.2" release="1">
	<task>select * from table5, table6 where xyx= 1;</task>
</task>
<task module="Execute NDW Active Count Check" taskId="3" version="1.0.2" release="1">
	<task>select * from table10, table11 where xyx= 1;</task>
</task>

<task module="Execute NDW History Partition Count Check" taskId="4" version="1.0.2" release="2">
	<task>select * from table4, table8 where xyx= 1;</task>
</task>
<task module="Execute MELD Active Count Check" taskId="5" version="1.0.2" release="2">
	<task>select * from table8, table9 where xyx= 1;</task>
</task>

<task module="Execute MELD History Partition Count Check" taskId="6" version="1.0.2" release="2">
	<task>select * from table10, table19 where xyx= 1;</task>
</task>
<task module="Execute Source Count" taskId="7" version="1.0.2" release="2">
	<task>select * from table11, table17 where xyx= 1;</task>
</task>

<task module="Check DistCP Completion Status" taskId="8" version="1.0.2" release="3">
	<task>select * from table6, table7 where xyx= 1</task>
</task>

Could you please help to set dependencies in xml which matches my requirement.
Let me know If you have any issues regarding the explanation.

Thanks in advance!

Hope to see your response.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.