workflowfm / pew Goto Github PK
View Code? Open in Web Editor NEWA persistent execution engine for pi-calculus workflows
Home Page: http://docs.workflowfm.com/pew
License: Apache License 2.0
A persistent execution engine for pi-calculus workflows
Home Page: http://docs.workflowfm.com/pew
License: Apache License 2.0
We want to be able to combine monitoring and simulation. For example, monitor a workflow until half way through and simulate/predict how the rest of it will play out.
For this, we need to grab the intermediate PiState
, replace the ProcessStore
and run the simulator on that.
Atomic processes that are already running may need special treatment to subtract the duration they have already been working for. What is the best way to manage that starting timestamp?
This resume functionality could be generalized to all executors in case it becomes useful in other contexts.
There's been 2 issues found related to the PiEvents on the KafkaExecutor:
The following unit tests would be helpful:
Each simulation running in the coordinator requires an instance of a subclass of Simulation and a timestamp of when it should start.
We would like to be able to read this information from a configuration file and load it to the Coordinator. JSON is likely the best option for this.
com.workflowfm.pew.metrics.MetricsAggregator
stores all metrics from all workflows executed in memory.
This means it will keep consuming memory for each executed workflow, which is bad, particularly for long-term, persistent executions.
We need to introduce 2 implementations:
We need both implementations to be available, as the second one is probably not good for fast workflows executing in memory.
The action of storing to persistent storage must be handled by a concurrent actor to avoid delaying workflow execution.
We might want to update the way outputs work so that they have the option to:
The current implementation mirrors the behaviour of the simulation metrics, where it makes sense to collect all the metrics and run the output once when the simulation is finished. This, however, does not make sense for long-term or persistent execution.
Need to check whether the process store is ever used and remove it from all executors that currently require it.
This has caused a lot of pain, particularly for processes that want to call upon other processes. There is a chicken/egg problem there as the processes need to know their executor, but the executor needs to know its processes at init.
Such processes are becoming more and more relevant as we are starting to use recursion.
Why should executors know the available processes in advance? This doesn't sound necessary, particularly since a PiInstance
uses its own list of atomic processes.
AkkaPiObservable uses Akka's eventStream which is shared among all AkkaPiObservables (AkkaExecutors).
For example, if you subscribe a PrintEventHandler to one unit test, it will print all events from subsequent unit tests even though they use a separate AkkaExecutor.
This is ok for now, because we typically only use one AkkaExecutor at a time, but must be solved by using a custom Akka Stream.
I want to finish an alternative implementation of PiObservable
that uses Akka BroadcastHub
so you can flexibly use PiEvent
flows in a reactive way.
I am guessing @JeVaughan has implemented something similar for Aurora's live monitoring (perhaps subscribing to SimplePiObservable
?) so I should make sure these 2 things integrate and/or reuse some of that code.
This is also practically already done in https://github.com/PetrosPapapa/WorkflowFM-PEW/tree/petros/broadcasthub but I want to generalize it so that it replaces SimplePiObservable
as the norm for PiEvent
handling.
It would be even more interesting if the user was allowed to swap between SimplePiObservable
or this Akka Stream observable by mixing in whichever they prefer. This depends on the implementations of the Executors
.
PiObject subclasses cannot serialize using all available codecs. This occurs as their "AnyCodec" references the wrong object, and are thus unable to serialize types extended by subclasses.
Repeating the unit tests a few times will certainly demonstrate the problem. I haven't detected the problem yet.
This is a low priority issue because we shouldn't really be using this executor in the first place.
I've considered removing it, but it is actually helpful for testing simple execution when changing things at a lower level.
KafkaExecutor
unit tests fail with akka-stream-kafka
v1.0 or later. It does not even compile with v2.0 or above.
This means pew-kafka
had been broken in versions 1.4 and 1.5 since e255b55.
We should update the implementation to use the latest stable version.
No comment. Just need to remember to fix this later.
In Kafka, AnyCodec
requires an explicit class loader to be provided, hence this trick:
pew/pew/src/main/scala/com/workflowfm/pew/mongodb/bson/AnyCodec.scala
Lines 23 to 26 in 4a5bfae
See also:
This requires this switch using setContextClassLoader
:
pew/pew/src/main/scala/com/workflowfm/pew/util/ClassLoaderUtil.scala
Lines 14 to 20 in 4a5bfae
This fails in pew-mongo because everything runs on the default ForkJoinPool
which produces InnocuousThread
s which do not allow you to setContextClassLoader
leading to a security exception.
We want an easy way to bypass this loader switch, perhaps as simpke as a flag in AnyCodec
and the codec provider.
Perhaps later on after #55 is implemented we will have a more elegant solution available.
Steps:
Going through the kafka code, the following files need at least some minimal comments for the classes:
Adding more as I go through the code.
It would be useful to add another timestamp (Long
) field to the PiEvents.
PiEvents currently act as system logs for the Executors, using system timestamps.
Using an additional, custom timestamp can help us associate PiEvents to things happening in the "real" workflow. For example, when we retroactively analyse a workflow from Aurora, we'd like to associate PiEvents to the movement events that happened in Aurora.
This requires some work:
ProcessExecutor
traitFor (3): We need a way for AtomicProcesses to report the custom time to the Executor. I suggest a sub-trait, say VirtualProcess
or something like that, that has this capability. We maintain backwards compatibility with the generic AtomicProcesses so that we don't force them to have to deal with the extra functionality. The Executors can then handle VirtualProcesses differently, incorporating the custom timestamp they provide in every associated PiEvent.
As seen in workflowfm/proter#17 (comment) the same task may have a different colour in a different simulation run.
Need to update https://github.com/PetrosPapapa/WorkflowFM-PEW/blob/4fbbb012c7422872412d30f71bf64ade8b972011/src/com/workflowfm/pew/metrics/Output.scala#L194 to sort the names alphabetically so they always map to the same colours.
Although 10 seconds is a long time, this still timed out when running several simulations on a busy processor. This should be very fast, but it isn't. Why?
Use log4j
The Coordinator used to shut down the actor system. Now noone does! It is still not clear where this should be happening.
The MetricsActor could do this for now, but it needs to be controlled by the user somehow. Otherwise we won't be able to run unit tests with a single actor system.
Perhaps we can use an ask pattern in the Main function, but this partially defeats the purpose of the MetricsActor. This is likely the best option to keep things nice and modular and separate.
Now that PiEvents
can hold additional metadata (see #21), it would be nice to disentangle the MetricsAggregator
and have it work with PiEvents
as a PiEventListener
.
Tasks
can return metadata from the Coordinator
instead of Unit
.SimulatedProcesses
can automatically add this metadata to their result.Coordinator
can also pass along progress metadata (current time, simulations remaining, etc).PromiseHandler
currently only returns the result of the overall workflow.
The CounterHandler
introduced in 0488604 is a good example of how promises can be used to return different things.
It would be nice to generalize the PromiseHandler
so that it is simpler to implement handlers that need to return stuff.
So you can implement the CounterHandler
like this:
class CounterHandler[T](override val name:String) extends PromiseHandler[T,Int] {
private var counter:Int = 0
def count = counter
override def apply(e:PiEvent[T]) = counter += 1
override def onSuccess(result:Any) = promise.success(counter)
override def onFailure(result:Any) = promise.success(counter)
}
The current version of the PromiseHandler
that returns the result of the workflow can be named ResultHandler
or something like that instead, as a simple instance of the PromiseHandler
.
PromiseHandler
ResultHandler
CounterHandler
PromiseHandleFactory
ResultHandleFactory
CounterHandlerFactory
ProcessExecutor.scala
and anywhere elseMost (all?) executors share some code related to updating the PiInstance
, checking results and process calls, checking completion, publishing related events, etc. It would be nice to refactor this code in some common trait, perhaps ProcessExecutor
itself. This should make development of new executors easier.
PiEventCallCodec casts to an AtomicProcess internally, causing crashes when passed MetadataAtomicProcesses.
org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition Result-0 at offset 0. If needed, please seek past the record to continue consumption. Caused by: java.lang.ClassCastException: workflow.monitor.instances.InitPressingInstance cannot be cast to com.workflowfm.pew.AtomicProcess at com.workflowfm.pew.mongodb.bson.events.PiEventCallCodec.decodeBody(PiEventCallCodec.scala:51) at com.workflowfm.pew.mongodb.bson.events.PiEventCallCodec.decodeBody(PiEventCallCodec.scala:9) at com.workflowfm.pew.mongodb.bson.auto.ClassCodec.decode(AutoCodecTypes.scala:34)
There is a AckCall
message used between the AkkaExecutor
actors to acknowledge a call to an atomic process. It is sent in AkkaAtomicProcessExecutor::receive
:
case AkkaExecutor.ACall(id, ref, p, args, actor) =>
p.runMeta(args).onComplete{
case Success(res) => actor ! AkkaExecutor.Result(id,ref,res)
case Failure(ex) => actor ! AkkaExecutor.Error(id,ref,ex)
}
--> actor ! AkkaExecutor.AckCall
but the receiving AkkaExecActor
does nothing with it:
case AkkaExecutor.AckCall => Unit
This unused message could be removed from the system. If there is need for such an acknowledgement, the manner of sending the current call message can be changed from:
system.actorOf(AkkaExecutor.atomicprops()) ! AkkaExecutor.ACall(i.id, ref, p, objs, self)
with:
system.actorOf(AkkaExecutor.atomicprops()) ? AkkaExecutor.ACall(i.id, ref, p, objs, self)
^
and the resulting Feature
can be used for that purpose.
The event-based metrics code we introduced for generic workflows (see #15) looks great. Simulation metrics now look awful in comparison.
Need to clean up the code and follow the same pattern. Since all simulations are Akka based we can have message-based tracking of metrics through the Coordinator (i.e. we don't need another generic event bus).
Need to take care to update Aurora's simulation accordingly once this is done.
It would be useful to be able to track metrics for any executed workflow, not only simulated ones.
This can allow us to have a real time timeline of the workflows that are executing, as well as other visualisations and monitoring capabilities.
This is quite a significant refactoring that should not break the existing executors (Akka, Mongo, Kafka).
Currently in order to run a set of simulations you have to do the following:
It would be nice to collapse all this to a single message. You can construct a "manifest" of all the simulations and their timestamps and pass it to the actor(s) in one go. The Coordinator should then start immediately.
This should not break backwards compatibility. The current way of doing things allows us to dynamically add new simulations while the Coordinator is already started!
While writing the wiki page on Simulation I noticed the DefaultScheduler
implementation (written quite some time ago) is pretty bad and potentially buggy.
Specifically, the filter in DefaultScheduler.getNextTask
:
t.priority >= t2.priority
to compare priorities instead of the comparison t >= t2
t.id
to check for identical tasksTaskResource.nextAvailableTimestamp
and Task.nextPossibleStart
in DefaultScheduler
or maybe a Scheduler
companion object so all the relevant code is in one placeThe simple term structure we are currently using is not very efficient, both in terms of memory and execution. Channels, substitution and fresh naming are particularly inefficient.
We should consider a better structure with some indexing and fast substitution.
KafkaExecutor suffers from an open issue in the current Alpakka-Kafka release. Revoking partitions causes offsets to be skipped.
See also: https://github.com/PetrosPapapa/ComposerFM/issues/15#issuecomment-537870958
I like this approach: https://medium.com/@kolemannix/type-safe-serialization-in-scala-a55bc513ea30
Need to make this work with uPickle and the Mongo/BSON stuff.
Goal: add and remove items from the timeline
timeline-plus
exposes some methods to interact with items, but it's more focused towards user interaction than programmatic changes
to make programmatic changes: find out what methods are invoked on user interaction, call them from loop, referencing ID
setItems(items)
- need to check whether this keeps existing or overrides all items currently displayed
updateEndTimes()
- makes events that have not yet ended keep up with the rolling timelineaddItem(id, props)
- add an item with the following propertiesupdateItem(id, props)
- make changes to a timeline item based on a server event, changing the following propertiesremoveItem(id)
- remove this item from the timelineThere is a counter in AkkaExecActor
that is used to initialize the starting PiInstance
's ID. It is only used there, starts at zero and is incremented at the end of init
. I think that its original purpose (providing IDs for process instances in the store) was taken over by an internal counter of PiInstance
that is used when PiInstance::execute
prepares instantiated threads for external execution. This suggests that the counter in AkkaExecActor
could be removed and its use in init
replaced with a constant zero.
When trying to find its uses, I thought about this counter possibly being critical to be able to run init
multiple times (as there seems to be nothing stopping me from sending multiple Init messages to the same actor). However, even with that counter, doing that after at least one further instance was added to the store still breaks (tries to place two instances with equal ID into the store). That is because the first instance added to the store from execution of the starting one will get ID 1, which is equal to what init
would assign on the second time around.
Therefore I propose we remove this counter from AkkaExecActor
, replace its use by zero, and possibly add some condition to prevent the actor from being initialized multiple times.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.