Git Product home page Git Product logo

pew's People

Contributors

jevaughan avatar petrospapapa avatar

Stargazers

 avatar  avatar  avatar

Watchers

 avatar  avatar  avatar

pew's Issues

Simulation should be able to start from a custom, intermediate PiInstance

We want to be able to combine monitoring and simulation. For example, monitor a workflow until half way through and simulate/predict how the rest of it will play out.

For this, we need to grab the intermediate PiState, replace the ProcessStore and run the simulator on that.

Atomic processes that are already running may need special treatment to subtract the duration they have already been working for. What is the best way to manage that starting timestamp?

This resume functionality could be generalized to all executors in case it becomes useful in other contexts.

Results Topic Tests for KafkaExecutor

There's been 2 issues found related to the PiEvents on the KafkaExecutor:

  • Missing PiEventCalls
  • Duplicate PiEventReturned

The following unit tests would be helpful:

  • Always 1 PiEventStart
  • At most 1 PiEventResult / exception per PiInstance ID.
  • At most 1 PiEventCall per reference
  • At most 1 PiEventReturn / exception per reference
  • PiEventCall reference ids should be sequential

Setting up simulations in an external configuration file

Each simulation running in the coordinator requires an instance of a subclass of Simulation and a timestamp of when it should start.

We would like to be able to read this information from a configuration file and load it to the Coordinator. JSON is likely the best option for this.

`MetricsAggregator` keeps everything in memory

Issue

com.workflowfm.pew.metrics.MetricsAggregator stores all metrics from all workflows executed in memory.

This means it will keep consuming memory for each executed workflow, which is bad, particularly for long-term, persistent executions.

Suggestion 1

We need to introduce 2 implementations:

  1. An extension of the current in-memory implementation which stores metrics of completed processes and workflows in persistent storage (Mongo?) and clears them from memory.
  2. A fully persistent version that stores everything in persistent storage (Mongo?) and retrieves stuff it needs to update (we might also need CAS checking for conflicts).

We need both implementations to be available, as the second one is probably not good for fast workflows executing in memory.

The action of storing to persistent storage must be handled by a concurrent actor to avoid delaying workflow execution.

Suggestion 2

We might want to update the way outputs work so that they have the option to:

  1. Clear completed workflows from the aggregator.
  2. Only function on completed workflows.

The current implementation mirrors the behaviour of the simulation metrics, where it makes sense to collect all the metrics and run the output once when the simulation is finished. This, however, does not make sense for long-term or persistent execution.

Executors do not need a process store

Need to check whether the process store is ever used and remove it from all executors that currently require it.

This has caused a lot of pain, particularly for processes that want to call upon other processes. There is a chicken/egg problem there as the processes need to know their executor, but the executor needs to know its processes at init.

Such processes are becoming more and more relevant as we are starting to use recursion.

Why should executors know the available processes in advance? This doesn't sound necessary, particularly since a PiInstance uses its own list of atomic processes.

AkkaPiObservable subscribes PiEventHandlers globally

AkkaPiObservable uses Akka's eventStream which is shared among all AkkaPiObservables (AkkaExecutors).

For example, if you subscribe a PrintEventHandler to one unit test, it will print all events from subsequent unit tests even though they use a separate AkkaExecutor.

This is ok for now, because we typically only use one AkkaExecutor at a time, but must be solved by using a custom Akka Stream.

PiObservable using Akka BroadcastHub

I want to finish an alternative implementation of PiObservable that uses Akka BroadcastHub so you can flexibly use PiEvent flows in a reactive way.

I am guessing @JeVaughan has implemented something similar for Aurora's live monitoring (perhaps subscribing to SimplePiObservable?) so I should make sure these 2 things integrate and/or reuse some of that code.

This is also practically already done in https://github.com/PetrosPapapa/WorkflowFM-PEW/tree/petros/broadcasthub but I want to generalize it so that it replaces SimplePiObservable as the norm for PiEvent handling.

It would be even more interesting if the user was allowed to swap between SimplePiObservable or this Akka Stream observable by mixing in whichever they prefer. This depends on the implementations of the Executors.

MultiStateExecutor still has race condition problems

Repeating the unit tests a few times will certainly demonstrate the problem. I haven't detected the problem yet.

This is a low priority issue because we shouldn't really be using this executor in the first place.

I've considered removing it, but it is actually helpful for testing simple execution when changing things at a lower level.

Pew-kafka breaks with Alpakka 1.0 or later

KafkaExecutor unit tests fail with akka-stream-kafka v1.0 or later. It does not even compile with v2.0 or above.

This means pew-kafka had been broken in versions 1.4 and 1.5 since e255b55.

We should update the implementation to use the latest stable version.

Switching class loader in AnyCodec does not work for Mongo

In Kafka, AnyCodec requires an explicit class loader to be provided, hence this trick:

// Jev, unset the `ClassLoader` to ensure the default is used.
def classForName(name: String): Class[_] = ClassLoaderUtil.withClassLoader(null) {
Class.forName(name)
}

See also:

This requires this switch using setContextClassLoader:

val pushedClassLoader = Thread.currentThread().getContextClassLoader
try {
Thread.currentThread().setContextClassLoader(tmpClassLoader)
fnWrapped
} finally {
Thread.currentThread().setContextClassLoader(pushedClassLoader)
}

This fails in pew-mongo because everything runs on the default ForkJoinPool which produces InnocuousThreads which do not allow you to setContextClassLoader leading to a security exception.

We want an easy way to bypass this loader switch, perhaps as simpke as a flag in AnyCodec and the codec provider.

Perhaps later on after #55 is implemented we will have a more elegant solution available.

Extend PiEvents with custom/virtual timestamps

It would be useful to add another timestamp (Long) field to the PiEvents.

PiEvents currently act as system logs for the Executors, using system timestamps.

Using an additional, custom timestamp can help us associate PiEvents to things happening in the "real" workflow. For example, when we retroactively analyse a workflow from Aurora, we'd like to associate PiEvents to the movement events that happened in Aurora.

This requires some work:

  • 1. Extending the PiEvents
  • 2. Updating the PiEvents encoders/decoders
    - [ ] 3. Extending the ProcessExecutor trait
  • 4. Implementing the corresponding functionality in all ProcessExecutors

For (3): We need a way for AtomicProcesses to report the custom time to the Executor. I suggest a sub-trait, say VirtualProcess or something like that, that has this capability. We maintain backwards compatibility with the generic AtomicProcesses so that we don't force them to have to deal with the extra functionality. The Executors can then handle VirtualProcesses differently, incorporating the custom timestamp they provide in every associated PiEvent.

Asking an AkkaExecutor.Call may timeout

Although 10 seconds is a long time, this still timed out when running several simulations on a busy processor. This should be very fast, but it isn't. Why?

Actor system does not get shut down

The Coordinator used to shut down the actor system. Now noone does! It is still not clear where this should be happening.

The MetricsActor could do this for now, but it needs to be controlled by the user somehow. Otherwise we won't be able to run unit tests with a single actor system.

Perhaps we can use an ask pattern in the Main function, but this partially defeats the purpose of the MetricsActor. This is likely the best option to keep things nice and modular and separate.

The simulation should make use of PiEvents

Now that PiEvents can hold additional metadata (see #21), it would be nice to disentangle the MetricsAggregator and have it work with PiEvents as a PiEventListener.

  • Tasks can return metadata from the Coordinator instead of Unit.
  • SimulatedProcesses can automatically add this metadata to their result.
  • Each simulation currently has its own executor. Do we need to register listeners to all of them? Probably... (but care for #7).
  • Apart from times and costs, the Coordinator can also pass along progress metadata (current time, simulations remaining, etc).

Generalize PromiseHandler

PromiseHandler currently only returns the result of the overall workflow.

The CounterHandler introduced in 0488604 is a good example of how promises can be used to return different things.

It would be nice to generalize the PromiseHandler so that it is simpler to implement handlers that need to return stuff.

So you can implement the CounterHandler like this:

class CounterHandler[T](override val name:String) extends PromiseHandler[T,Int] {   
  private var counter:Int = 0
  def count = counter
  
  override def apply(e:PiEvent[T]) = counter += 1
  override def onSuccess(result:Any) = promise.success(counter)
  override def onFailure(result:Any) = promise.success(counter)
}

The current version of the PromiseHandler that returns the result of the workflow can be named ResultHandler or something like that instead, as a simple instance of the PromiseHandler.

  • New PromiseHandler
  • ResultHandler
  • CounterHandler
  • New PromiseHandleFactory
  • ResultHandleFactory
  • New CounterHandlerFactory
  • Resolve things that break in ProcessExecutor.scala and anywhere else

Refactor common executor code into single trait

Most (all?) executors share some code related to updating the PiInstance, checking results and process calls, checking completion, publishing related events, etc. It would be nice to refactor this code in some common trait, perhaps ProcessExecutor itself. This should make development of new executors easier.

PiEventCallCodec Bson codec can't serialise MetadataAtomicProcesses

PiEventCallCodec casts to an AtomicProcess internally, causing crashes when passed MetadataAtomicProcesses.

org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition Result-0 at offset 0. If needed, please seek past the record to continue consumption. Caused by: java.lang.ClassCastException: workflow.monitor.instances.InitPressingInstance cannot be cast to com.workflowfm.pew.AtomicProcess at com.workflowfm.pew.mongodb.bson.events.PiEventCallCodec.decodeBody(PiEventCallCodec.scala:51) at com.workflowfm.pew.mongodb.bson.events.PiEventCallCodec.decodeBody(PiEventCallCodec.scala:9) at com.workflowfm.pew.mongodb.bson.auto.ClassCodec.decode(AutoCodecTypes.scala:34)

Unused AckCall message in AkkaExecutor

There is a AckCall message used between the AkkaExecutor actors to acknowledge a call to an atomic process. It is sent in AkkaAtomicProcessExecutor::receive:

    case AkkaExecutor.ACall(id, ref, p, args, actor) =>
      p.runMeta(args).onComplete{
        case Success(res) => actor ! AkkaExecutor.Result(id,ref,res)
        case Failure(ex) => actor ! AkkaExecutor.Error(id,ref,ex)
      }
-->   actor ! AkkaExecutor.AckCall

but the receiving AkkaExecActor does nothing with it:

case AkkaExecutor.AckCall => Unit

This unused message could be removed from the system. If there is need for such an acknowledgement, the manner of sending the current call message can be changed from:

system.actorOf(AkkaExecutor.atomicprops()) ! AkkaExecutor.ACall(i.id, ref, p, objs, self)

with:

system.actorOf(AkkaExecutor.atomicprops()) ? AkkaExecutor.ACall(i.id, ref, p, objs, self)
                                           ^

and the resulting Feature can be used for that purpose.

Simulation metrics code needs improvement

The event-based metrics code we introduced for generic workflows (see #15) looks great. Simulation metrics now look awful in comparison.

Need to clean up the code and follow the same pattern. Since all simulations are Akka based we can have message-based tracking of metrics through the Coordinator (i.e. we don't need another generic event bus).

Need to take care to update Aurora's simulation accordingly once this is done.

Attach metrics to ProcessExecutor instead of the simulation Coordinator

It would be useful to be able to track metrics for any executed workflow, not only simulated ones.

  • We can have ProcessExecutors send events of when things are happening: workflow start, process called, process returned, workflow completed
  • We can then register handlers (such as a MetricsAggregator) to record data from these events

This can allow us to have a real time timeline of the workflows that are executing, as well as other visualisations and monitoring capabilities.

This is quite a significant refactoring that should not break the existing executors (Akka, Mongo, Kafka).

Better management of addSim and Start in Coordinator

Currently in order to run a set of simulations you have to do the following:

  1. Create a Coordinator actor
  2. Add all your simulations one by one, with the corresponding starting timestamp
  3. Assuming you want to use MetricsOutput(s), create a MetricsActor actor.
  4. Start the Coordinator (via the MetricsActor).

It would be nice to collapse all this to a single message. You can construct a "manifest" of all the simulations and their timestamps and pass it to the actor(s) in one go. The Coordinator should then start immediately.

This should not break backwards compatibility. The current way of doing things allows us to dynamically add new simulations while the Coordinator is already started!

Simulation Scheduler has issues

While writing the wiki page on Simulation I noticed the DefaultScheduler implementation (written quite some time ago) is pretty bad and potentially buggy.

Specifically, the filter in DefaultScheduler.getNextTask:

  • 1. Uses t.priority >= t2.priority to compare priorities instead of the comparison t >= t2
  • 2. Can now use t.id to check for identical tasks
    - [ ] 3. Calculates all candidate tasks even though we just need the first
  • 4. Compares each candidate to all tasks, even though we only need to compare with the ones with higher priority

  • We need unit tests
    - [ ] I am considering moving TaskResource.nextAvailableTimestamp and Task.nextPossibleStart in DefaultScheduler or maybe a Scheduler companion object so all the relevant code is in one place

Improve pi-calculus implementation

The simple term structure we are currently using is not very efficient, both in terms of memory and execution. Channels, substitution and fresh naming are particularly inefficient.

We should consider a better structure with some indexing and fast substitution.

Timeline functionality

Goal: add and remove items from the timeline

  • timeline-plus exposes some methods to interact with items, but it's more focused towards user interaction than programmatic changes

  • to make programmatic changes: find out what methods are invoked on user interaction, call them from loop, referencing ID

  • setItems(items) - need to check whether this keeps existing or overrides all items currently displayed

Methods to implement

  • updateEndTimes() - makes events that have not yet ended keep up with the rolling timeline
    [bug: the timeline does not refresh when rolling mode is disabled, even when a redraw is forced -- this means that event end points do not update until the timeline is updated]
  • addItem(id, props) - add an item with the following properties
  • updateItem(id, props) - make changes to a timeline item based on a server event, changing the following properties
  • removeItem(id) - remove this item from the timeline

Unused counter in AkkaExecActor

There is a counter in AkkaExecActor that is used to initialize the starting PiInstance's ID. It is only used there, starts at zero and is incremented at the end of init. I think that its original purpose (providing IDs for process instances in the store) was taken over by an internal counter of PiInstance that is used when PiInstance::execute prepares instantiated threads for external execution. This suggests that the counter in AkkaExecActor could be removed and its use in init replaced with a constant zero.

When trying to find its uses, I thought about this counter possibly being critical to be able to run init multiple times (as there seems to be nothing stopping me from sending multiple Init messages to the same actor). However, even with that counter, doing that after at least one further instance was added to the store still breaks (tries to place two instances with equal ID into the store). That is because the first instance added to the store from execution of the starting one will get ID 1, which is equal to what init would assign on the second time around.

Therefore I propose we remove this counter from AkkaExecActor, replace its use by zero, and possibly add some condition to prevent the actor from being initialized multiple times.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.