Git Product home page Git Product logo

Comments (15)

mnadeem avatar mnadeem commented on September 3, 2024

Kindly enable TRACE for 'com.github.dexecutor' and send the logs

from dexecutor-core.

mnadeem avatar mnadeem commented on September 3, 2024

Further try adding time out and see

https://github.com/dexecutor/dexecutor-core/wiki/How-Do-I-%3F#timeout-support

from dexecutor-core.

musophobia avatar musophobia commented on September 3, 2024

Added the timeout as mentioned above. No luck. Still getting stuck. Got the traces. Attaching the last 50000 lines of the trace here.
s-dexec-tracer.log

from dexecutor-core.

mnadeem avatar mnadeem commented on September 3, 2024

Please share me the example code as github project

Are you honoring the interrupt signal in your task ?

https://github.com/dexecutor/dexecutor-core/blob/master/src/test/java/com/github/dexecutor/core/DexecutorTimeoutTest.java#L139

from dexecutor-core.

mnadeem avatar mnadeem commented on September 3, 2024

Further what is the functionality of your task ?

from dexecutor-core.

musophobia avatar musophobia commented on September 3, 2024

We are developing a static analysis tool. The task of Dexecutor is to parallelize some of our tasks. I'm sorry but the code is company proprietary code and as such it's not possible to be shared as a github project. However, the following snippet is a rough pseudo-code of the Dexecutor portion.

In the log I shared previously, I did not honor the interrupt signal. I've performed another run honoring the signal, but the stuck situation is still there. One thing to mention is that, for lighter tasks, we don't face any stuck situation. Dexecutor performs the parallelization without any trouble. The stuck situation arises mainly when we opt to perform heavier tasks - running for couple of hours.

    private static class OurTaskProvider implements TaskProvider<OurClass, Integer> {

        public OurTaskProvider() {
            super();
        }

        public Task<OurClass, Integer> provideTask(final OurClass cl) {

            return new Task<OurClass, Integer>() {


                public Integer execute() {
                    try {
                         // code: our task X
		             // call the following block in loops of task X
                             // if (Thread.currentThread().isInterrupted()) {
                                 // throw exception
                             // }
                    } catch (Exception | Error e) {
                    } finally {
			// code: one file write operation
	                // code: clean up codes
                    }
                    return 0;
                }
                
                @Override
                public Duration getTimeout() {
                   return Duration.ofMillis(1);
                }
                
            };
        }
    }




   public static void main(String[] args) {

        ExecutorService executorService = Executors.newFixedThreadPool(args[0]);
        try {
            DexecutorConfig<OurClass, Integer> config = new DexecutorConfig<>(executorService, new OurTaskProvider());
            DefaultDexecutor<OurClass, Integer> executor = new DefaultDexecutor<OurClass, Integer>(config);
            // code: construct Dependencies
            executor.execute(ExecutionConfig.TERMINATING);
        } catch (Exception | Error e) {
            e.printStackTrace();
        } finally {
            try {
                executorService.shutdownNow();
                executorService.awaitTermination(1, TimeUnit.SECONDS);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
   }

from dexecutor-core.

mnadeem avatar mnadeem commented on September 3, 2024

This would be tricky to spot the not

from dexecutor-core.

mnadeem avatar mnadeem commented on September 3, 2024

Let me see, If I could do anything here

from dexecutor-core.

mnadeem avatar mnadeem commented on September 3, 2024

Try increasing the number of threads for the executor service and see if it works?

Further every time only last task is not completed ?

from dexecutor-core.

musophobia avatar musophobia commented on September 3, 2024

Performed a run with a thousand fixed thread. Half of the total number of tasks got completed and it proceeded to the next stage without completing the rest half, which is stange!! Can you please elaborate the insight or relation between the increased number of threads and executor service?
About the question you further asked: we perform a printout of task completion after each task is finished. The printout gets stuck at the (n-1)th one. Is it possible that everything gets completed but the execution can't just go back to the main thread?

from dexecutor-core.

mnadeem avatar mnadeem commented on September 3, 2024

Does this mean, the n-1th task is not printing that it is completed ?
Is the task blocked on something ?

On an average how much time it takes for tasks to complete ?

from dexecutor-core.

mnadeem avatar mnadeem commented on September 3, 2024

I can see that 7 threads are waiting

https://fastthread.io/ft-thread-report.jsp?dumpId=1&oTxnId_value=aa8b0548-1f86-46c1-ad6c-f6ce164c2069

from dexecutor-core.

mnadeem avatar mnadeem commented on September 3, 2024

18:09:19.579 [main] DEBUG c.g.dexecutor.core.DefaultDexecutor.doExecute 198 - Submitting 1 node for execution
18:09:19.582 [main] DEBUG c.g.d.core.DefaultExecutionEngine.submit 97 - Received Task 1 
18:09:19.585 [main] DEBUG c.g.dexecutor.core.DefaultDexecutor.doExecute 198 - Submitting 11 node for execution
18:09:19.585 [main] DEBUG c.g.d.core.DefaultExecutionEngine.submit 97 - Received Task 11 
18:09:19.585 [main] DEBUG c.g.dexecutor.core.DefaultDexecutor.doExecute 198 - Submitting 12 node for execution
18:09:19.585 [main] DEBUG c.g.d.core.DefaultExecutionEngine.submit 97 - Received Task 12 
18:09:19.622 [pool-1-thread-2] DEBUG c.g.dexecutor.core.task.LoggerTask.execute 44 - Executing Node # 11
18:09:19.622 [pool-1-thread-1] DEBUG c.g.dexecutor.core.task.LoggerTask.execute 44 - Executing Node # 1
18:09:19.622 [pool-1-thread-3] DEBUG c.g.dexecutor.core.task.LoggerTask.execute 44 - Executing Node # 12
18:13:27.364 [pool-1-thread-2] INFO  c.g.d.c.DexecutorTimeoutTest$SleepyTaskProvider$1.heavyOperation 147 - Service Time(ms)= 247740.9145
18:13:27.366 [pool-1-thread-2] DEBUG c.g.dexecutor.core.task.LoggerTask.execute 46 - Node # 11, Execution Done!
18:13:27.367 [main] DEBUG c.g.dexecutor.core.DefaultDexecutor.doAfterExecutionDone 230 - Processing of node 11 done, with status SUCCESS
18:29:19.624 [pool-1-thread-1] DEBUG c.g.dexecutor.core.task.LoggerTask.execute 46 - Node # 1, Execution Done!
18:29:19.624 [pool-1-thread-3] DEBUG c.g.dexecutor.core.task.LoggerTask.execute 46 - Node # 12, Execution Done!
18:29:19.625 [main] DEBUG c.g.dexecutor.core.DefaultDexecutor.doAfterExecutionDone 230 - Processing of node 1 done, with status SUCCESS
18:29:19.626 [main] DEBUG c.g.dexecutor.core.DefaultDexecutor.doExecute 198 - Submitting 2 node for execution
18:29:19.626 [main] DEBUG c.g.d.core.DefaultExecutionEngine.submit 97 - Received Task 2 
18:29:19.627 [main] DEBUG c.g.dexecutor.core.DefaultDexecutor.doExecute 198 - Submitting 3 node for execution
18:29:19.627 [pool-1-thread-4] DEBUG c.g.dexecutor.core.task.LoggerTask.execute 44 - Executing Node # 2
18:29:19.627 [main] DEBUG c.g.d.core.DefaultExecutionEngine.submit 97 - Received Task 3 
18:29:19.628 [main] DEBUG c.g.dexecutor.core.DefaultDexecutor.doAfterExecutionDone 230 - Processing of node 12 done, with status SUCCESS
18:29:19.628 [pool-1-thread-5] DEBUG c.g.dexecutor.core.task.LoggerTask.execute 44 - Executing Node # 3
18:29:19.628 [main] DEBUG c.g.dexecutor.core.DefaultDexecutor.doExecute 198 - Submitting 13 node for execution
18:29:19.630 [main] DEBUG c.g.d.core.DefaultExecutionEngine.submit 97 - Received Task 13 
18:29:19.630 [pool-1-thread-6] DEBUG c.g.dexecutor.core.task.LoggerTask.execute 44 - Executing Node # 13
18:49:19.641 [pool-1-thread-4] DEBUG c.g.dexecutor.core.task.LoggerTask.execute 46 - Node # 2, Execution Done!
18:49:19.641 [pool-1-thread-6] DEBUG c.g.dexecutor.core.task.LoggerTask.execute 46 - Node # 13, Execution Done!
18:49:19.641 [pool-1-thread-5] DEBUG c.g.dexecutor.core.task.LoggerTask.execute 46 - Node # 3, Execution Done!
18:49:19.642 [main] DEBUG c.g.dexecutor.core.DefaultDexecutor.doAfterExecutionDone 230 - Processing of node 2 done, with status SUCCESS
18:49:19.643 [main] DEBUG c.g.dexecutor.core.DefaultDexecutor.doExecute 198 - Submitting 7 node for execution
18:49:19.643 [main] DEBUG c.g.d.core.DefaultExecutionEngine.submit 97 - Received Task 7 
18:49:19.644 [main] DEBUG c.g.dexecutor.core.DefaultDexecutor.doExecute 198 - Submitting 9 node for execution
18:49:19.644 [main] DEBUG c.g.d.core.DefaultExecutionEngine.submit 97 - Received Task 9 
18:49:19.644 [pool-1-thread-7] DEBUG c.g.dexecutor.core.task.LoggerTask.execute 44 - Executing Node # 7
18:49:19.645 [main] DEBUG c.g.dexecutor.core.DefaultDexecutor.doExecute 198 - Submitting 8 node for execution
18:49:19.645 [pool-1-thread-2] DEBUG c.g.dexecutor.core.task.LoggerTask.execute 44 - Executing Node # 9
18:49:19.645 [main] DEBUG c.g.d.core.DefaultExecutionEngine.submit 97 - Received Task 8 
18:49:19.646 [main] DEBUG c.g.dexecutor.core.DefaultDexecutor.doAfterExecutionDone 230 - Processing of node 13 done, with status SUCCESS
18:49:19.646 [pool-1-thread-3] DEBUG c.g.dexecutor.core.task.LoggerTask.execute 44 - Executing Node # 8
18:49:19.646 [main] DEBUG c.g.dexecutor.core.DefaultDexecutor.doExecute 207 - node 4 depends on [3, 13]
18:49:19.647 [main] DEBUG c.g.dexecutor.core.DefaultDexecutor.doExecute 198 - Submitting 14 node for execution
18:49:19.647 [main] DEBUG c.g.d.core.DefaultExecutionEngine.submit 97 - Received Task 14 
18:49:19.647 [main] DEBUG c.g.dexecutor.core.DefaultDexecutor.doAfterExecutionDone 230 - Processing of node 3 done, with status SUCCESS
18:49:19.647 [pool-1-thread-1] DEBUG c.g.dexecutor.core.task.LoggerTask.execute 44 - Executing Node # 14
18:49:19.647 [main] DEBUG c.g.dexecutor.core.DefaultDexecutor.doExecute 198 - Submitting 4 node for execution
18:49:19.648 [main] DEBUG c.g.d.core.DefaultExecutionEngine.submit 97 - Received Task 4 
18:49:19.648 [main] DEBUG c.g.dexecutor.core.DefaultDexecutor.doExecute 198 - Submitting 5 node for execution
18:49:19.648 [pool-1-thread-4] DEBUG c.g.dexecutor.core.task.LoggerTask.execute 44 - Executing Node # 4
18:49:19.648 [main] DEBUG c.g.d.core.DefaultExecutionEngine.submit 97 - Received Task 5 
18:49:19.649 [main] DEBUG c.g.dexecutor.core.DefaultDexecutor.doExecute 198 - Submitting 6 node for execution
18:49:19.649 [pool-1-thread-6] DEBUG c.g.dexecutor.core.task.LoggerTask.execute 44 - Executing Node # 5
18:49:19.649 [main] DEBUG c.g.d.core.DefaultExecutionEngine.submit 97 - Received Task 6 
18:49:19.650 [pool-1-thread-5] DEBUG c.g.dexecutor.core.task.LoggerTask.execute 44 - Executing Node # 6
19:09:19.646 [pool-1-thread-2] DEBUG c.g.dexecutor.core.task.LoggerTask.execute 46 - Node # 9, Execution Done!
19:09:19.646 [pool-1-thread-7] DEBUG c.g.dexecutor.core.task.LoggerTask.execute 46 - Node # 7, Execution Done!
19:09:19.648 [main] DEBUG c.g.dexecutor.core.DefaultDexecutor.doAfterExecutionDone 230 - Processing of node 9 done, with status SUCCESS
19:09:19.649 [main] DEBUG c.g.dexecutor.core.DefaultDexecutor.doExecute 198 - Submitting 10 node for execution
19:09:19.650 [main] DEBUG c.g.d.core.DefaultExecutionEngine.submit 97 - Received Task 10 
19:09:19.650 [main] DEBUG c.g.dexecutor.core.DefaultDexecutor.doAfterExecutionDone 230 - Processing of node 7 done, with status SUCCESS
19:09:19.651 [pool-1-thread-2] DEBUG c.g.dexecutor.core.task.LoggerTask.execute 44 - Executing Node # 10
19:09:19.662 [pool-1-thread-3] DEBUG c.g.dexecutor.core.task.LoggerTask.execute 46 - Node # 8, Execution Done!
19:09:19.662 [pool-1-thread-4] DEBUG c.g.dexecutor.core.task.LoggerTask.execute 46 - Node # 4, Execution Done!
19:09:19.662 [pool-1-thread-6] DEBUG c.g.dexecutor.core.task.LoggerTask.execute 46 - Node # 5, Execution Done!
19:09:19.662 [pool-1-thread-5] DEBUG c.g.dexecutor.core.task.LoggerTask.execute 46 - Node # 6, Execution Done!
19:09:19.663 [main] DEBUG c.g.dexecutor.core.DefaultDexecutor.doAfterExecutionDone 230 - Processing of node 8 done, with status SUCCESS
19:09:19.664 [main] DEBUG c.g.dexecutor.core.DefaultDexecutor.doAfterExecutionDone 230 - Processing of node 4 done, with status SUCCESS
19:09:19.665 [main] DEBUG c.g.dexecutor.core.DefaultDexecutor.doAfterExecutionDone 230 - Processing of node 5 done, with status SUCCESS
19:09:19.665 [main] DEBUG c.g.dexecutor.core.DefaultDexecutor.doAfterExecutionDone 230 - Processing of node 6 done, with status SUCCESS
19:29:19.664 [pool-1-thread-2] DEBUG c.g.dexecutor.core.task.LoggerTask.execute 46 - Node # 10, Execution Done!
19:29:19.664 [main] DEBUG c.g.dexecutor.core.DefaultDexecutor.doAfterExecutionDone 230 - Processing of node 10 done, with status SUCCESS
19:51:19.660 [pool-1-thread-1] DEBUG c.g.dexecutor.core.task.LoggerTask.execute 46 - Node # 14, Execution Done!
19:51:19.660 [main] DEBUG c.g.dexecutor.core.DefaultDexecutor.doAfterExecutionDone 230 - Processing of node 14 done, with status SUCCESS
19:51:19.671 [main] DEBUG c.g.dexecutor.core.DefaultDexecutor.execute 140 - Total Time taken to process 14 jobs is 6120093 ms.
19:51:19.671 [main] DEBUG c.g.dexecutor.core.DefaultDexecutor.execute 141 - Processed Nodes Ordering [11, 1, 12, 2, 13, 3, 9, 7, 8, 4, 5, 6, 10, 14]
[ExecutionResult [id=11, result=11, status=SUCCESS, message=, startTime=2021-05-02T18:09:19.622, endTime=2021-05-02T18:13:27.367], ExecutionResult [id=1, result=1, status=SUCCESS, message=, startTime=2021-05-02T18:09:19.622, endTime=2021-05-02T18:29:19.625], ExecutionResult [id=12, result=12, status=SUCCESS, message=, startTime=2021-05-02T18:09:19.622, endTime=2021-05-02T18:29:19.625], ExecutionResult [id=2, result=2, status=SUCCESS, message=, startTime=2021-05-02T18:29:19.627, endTime=2021-05-02T18:49:19.642], ExecutionResult [id=13, result=13, status=SUCCESS, message=, startTime=2021-05-02T18:29:19.630, endTime=2021-05-02T18:49:19.642], ExecutionResult [id=3, result=3, status=SUCCESS, message=, startTime=2021-05-02T18:29:19.628, endTime=2021-05-02T18:49:19.642], ExecutionResult [id=9, result=9, status=SUCCESS, message=, startTime=2021-05-02T18:49:19.645, endTime=2021-05-02T19:09:19.648], ExecutionResult [id=7, result=7, status=SUCCESS, message=, startTime=2021-05-02T18:49:19.644, endTime=2021-05-02T19:09:19.648], ExecutionResult [id=8, result=8, status=SUCCESS, message=, startTime=2021-05-02T18:49:19.646, endTime=2021-05-02T19:09:19.662], ExecutionResult [id=4, result=4, status=SUCCESS, message=, startTime=2021-05-02T18:49:19.648, endTime=2021-05-02T19:09:19.663], ExecutionResult [id=5, result=5, status=SUCCESS, message=, startTime=2021-05-02T18:49:19.649, endTime=2021-05-02T19:09:19.663], ExecutionResult [id=6, result=6, status=SUCCESS, message=, startTime=2021-05-02T18:49:19.650, endTime=2021-05-02T19:09:19.664], ExecutionResult [id=10, result=10, status=SUCCESS, message=, startTime=2021-05-02T19:09:19.650, endTime=2021-05-02T19:29:19.664], ExecutionResult [id=14, result=14, status=SUCCESS, message=, startTime=2021-05-02T18:49:19.647, endTime=2021-05-02T19:51:19.660]]

Total Time taken to process 14 jobs is 6120093 ms or 1.7 hours

Here is the code

public class DexecutorTimeoutTest {
	
	private static final Logger logger = LoggerFactory.getLogger(DexecutorTimeoutTest.class);
	
	@Test
	public void testDependentTaskExecution() {
		ExecutorService executorService = newExecutor();

		try {
			DexecutorConfig<Integer, Integer> config = new DexecutorConfig<>(executorService, new SleepyTaskProvider());
			DefaultDexecutor<Integer, Integer> executor = new DefaultDexecutor<Integer, Integer>(config);
			executor.addDependency(1, 2);
			executor.addDependency(1, 2);
			executor.addDependency(1, 3);
			executor.addDependency(3, 4);
			executor.addDependency(3, 5);
			executor.addDependency(3, 6);
			executor.addDependency(2, 7);
			executor.addDependency(2, 9);
			executor.addDependency(2, 8);
			executor.addDependency(9, 10);
			executor.addDependency(12, 13);
			executor.addDependency(13, 4);
			executor.addDependency(13, 14);
			executor.addIndependent(11);

			ExecutionResults<Integer, Integer> result = executor.execute(ExecutionConfig.TERMINATING);
			System.out.println(result);
			
			Collection<Node<Integer, Integer>> processedNodesOrder = TestUtil.processedNodesOrder(executor);
			assertThat(processedNodesOrder).containsAll(executionOrderExpectedResult());
			assertThat(processedNodesOrder).size().isGreaterThanOrEqualTo(4);
			//assertThat(result.anyCancelled()).isTrue();
			
		} finally {
			try {
				executorService.shutdownNow();
				executorService.awaitTermination(1, TimeUnit.SECONDS);
			} catch (InterruptedException e) {
				
			}
		}
	}

	private Collection<Node<Integer, Integer>> executionOrderExpectedResult() {
		List<Node<Integer, Integer>> result = new ArrayList<Node<Integer, Integer>>();
		result.add(new Node<Integer, Integer>(1));
		//result.add(new Node<Integer, Integer>(2));
		result.add(new Node<Integer, Integer>(11));
		result.add(new Node<Integer, Integer>(12));
		return result;
	}

	private ExecutorService newExecutor() {
		return Executors.newFixedThreadPool(7);
	}

	private static class SleepyTaskProvider implements TaskProvider<Integer, Integer> {
		
		public Task<Integer, Integer> provideTask(final Integer id) {

			return new Task<Integer, Integer>() {

				private static final long serialVersionUID = 1L;

				public Integer execute() {
					
					if (id == 14) {
						try {
							TimeUnit.MINUTES.sleep(62);
						} catch (InterruptedException e) {
							logger.error(this.toString(), e);
						}
					} else if (id == 11) {
						heavyOperation(id);
						
					} else {
						try {
							TimeUnit.MINUTES.sleep(20);
						} catch (InterruptedException e) {
							logger.error(this.toString(), e);
						}
					}					

					return id;
				}

				private void heavyOperation(final Integer id) {
					BigInteger factValue = BigInteger.ONE;
					long t1 = System.nanoTime();
					for (int i = 2; i <= 800000; i++) {
						// Future.cancel(...) delivers an interrupt signal to the thread asking it to stop. 
						// You must ensure that your tasks respect the interrupt signals
						// e.g. checks for Thread.currentThread().isInterrupted() at regular intervals.
						if (Thread.currentThread().isInterrupted()) {
							logger.warn("Task #{} Interrupted, returning.....", id);
							break ;
						}
						factValue = factValue.multiply(BigInteger.valueOf(i));
					}
					long t2 = System.nanoTime();
					logger.info("Service Time(ms)= {}", ((double) (t2 - t1) / 1000000));
				}

				@Override
				public Duration getTimeout() {
					if (id == 2) {
						return null;
					}
					//return Duration.ofMillis(1);
					return null;
				}
			};			
		}		
	}

}

Further I see that you are running in OSGI environment, can you run in non-osgi environtment and see if it works

from dexecutor-core.

mnadeem avatar mnadeem commented on September 3, 2024

Note java 8 is required

from dexecutor-core.

mnadeem avatar mnadeem commented on September 3, 2024

Closing this ticket, since there is no issue

from dexecutor-core.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.