s8sg / faas-flow Goto Github PK
View Code? Open in Web Editor NEWFunction Composition for OpenFaaS
License: MIT License
Function Composition for OpenFaaS
License: MIT License
Execution state currently being forwarded as a data for the next phase. This will work well until the pipeline is less complicated than a chain of function. For a more complicated example such as dag
, we need to have a secondary KV store to manage the execution state. A ExecutionState
manager can be ZooKeeper
or Consul
Async Call and DAG with multiple edges provide similar functionalities
A async chain with four async call will have five nodes, the same can be achieved with creating the nodes separately
Currently operation-chain and dag are managed separately. Although dag allows to chain multiple operation by adding it to same node. We will adapt the chaining approach for DAG as well
flow.CreateNode().Apply().Apply()
Retry an operation if failed or timeout happen. If configured, when a operation fails the it will be retried, for n time. Where n is less than retry count.
i want to use this package in different projects, that needs saga like patter to control microservices.
I think that this is suitable for such kind of stuff. I can register rpc services and create workflow with dag.
But i need also custom executor what via provided workflow can do needed actions (in my case execute different microservices, get it results)
When performing a Operation, user can mention a timeout. This timeout will be applied for each operation types. If timeout happen, operation will be killed and execution will be failed as if the operation has failed. If operation retry is configured, it will be retried.
A branching logic that allows to consider a SUB-Dag when a certain condition has met
One of the way possible could be the edge Options
:
AddEdge(from, to string, Choice(caseString, choiceFunc))
choiceFunc
takes a []byte
(the result of the node) and returns a choice string, which determines the branch that will be executed. A selection among multiple branch could be done by providing same choice function to multiple edge
func myChoiceFunc(data []byte) )(string, error) {
// do something
if (data == "something") {
return "choice1", nil
}
return "choice2", nil
}
// ....
// at edge definition
AddEdge("node1", "node2", Choice("choice1", myChoiceFunc);
AddEdge("node1", "node3", Choice("choice2", myChoiceFunc);
DAG node completion coordination fails in some scenario for foreach
branch.
Currently faas-flow
template only handles the Node start and stop event. SDK emits the Operation start and end event, although not being handled for monitoring.
New feature. Like we are performing the Callback, we will add a feature to perform HTTP request. In case of callback the response is omitted. HTTP request construct will allow user to perform HTTP/HTTPS request and retrieve the result.
thanks for you many pr's, can you remove vendor and add go.mod ?
as of go 1.11 it supported and very useful. also repo does not needs to be have external deps.
All call to function and for next node execution are submitted via Gateway. In the current code
We can reuse a connection instead of creating new one. We can do so by creating a global client as:
var client *http.Client
func init() {
tr := &http.Transport{
MaxIdleConnsPerHost: 1024,
}
client = &http.Client{Transport: tr }
}
When we are using common client, we also need to make sure that we
Currently Request ID is being generated, although the only way to know the request ID is:
We will set a response Header as
X-Faas-Flow-ReqId : <request_id>
For Async request with callback url, we will set the header on callback request:
X-Faas-Flow-ReqId : <request_id>
a modifier: NODE[ function ...] -> (async to) -> NODE [ modifier ] -> NODE...
a forwarder: NODE[function -> forwarder] -> (async to ) -> NODE...
my toy store needs to be closed at the end of work, does i need to put close to Cleanup method or we need something new for closing store?
Currently intermediate data always gets forwarded to flow
function as an input. But for async forward
due to nats limit a large data might not get forwarded. Alternatively the data can be saved and retrieved in the next phase from the StateManager
. When a StateManager
specified, a optional flag will be provided to store the intermediate data via the StateManager
Environment:
intermediate-storage: true
Readme doesn't contain any dynamic branch example. We can add simple example for ForEach
and Conditional
branching,
As suggested in issue-#81 by @kwojcicki
We will provide ability to stop a workflow
The main idea here is to make use of AWS State Language to define pipelines
Dotted graph can provide better visualisation of the overall workflow
Faas-Flow currently provide library to create a dotted graph for either a chain/dag
.
The same can be used to provide a way to only download a dotted graph from the faasflow function itself
One option could be adding query param to the request
--query generate-dot-graph=true
Dotted Graph is a quick way to debug a pipeline
Below are the update
Color Code (Green Start-Nodes, Orange End-Nodes, Lighter Blue Nodes and Grey Edges etc)
Edge Attributes (Edge Properties, Intermediate-Result-Key etc)
Node Attributes (Function, Modifier, Callback details, timeout etc)
Currently func OnReponse(handler sdk.RespHandler) Option
use sdk.RespHandler
which is defined as:
type RespHandler func(*http.Response) ([]byte, error)
It is useful to handle a HTTP response to a function manually and extract the response in proper format.
https://github.com/s8sg/faas-flow/blob/master/template/faas-flow/handler.go#L326
if operation.OnResphandler != nil {
result, err = operation.OnResphandler(resp)
} else {
if resp.StatusCode < 200 || resp.StatusCode > 299 {
result, _ = ioutil.ReadAll(resp.Body)
return result, err
}
result, err = ioutil.ReadAll(resp.Body)
if err != nil {
return result, err
}
}
Similarly we need sdk.RequestHandler
which will be able to update a Request for function execution.
type ReqHandler func(*http.Request)
Wrapping up in Option
as:
func Request(handler sdk.RequestHandler) Option
The RequestHandler should execute before the request executes we can pass the request which is being build as.
https://github.com/s8sg/faas-flow/blob/master/template/faas-flow/handler.go#L318
httpreq := buildFunctionRequest(name, data, params, headers)
currently workflow_name
must equal to function name, maybe we can get it from
workflow_name1 := strings.Split(os.Getenv("Http_Host"), ".")[0]
This issue is related with faasflow/faas-flow-tower#9
We need to add the dynamic option into node traces to have unique traces for all foreach branches
Faas-flow
is based on some core design fundaments that allows the faas-flow
to work as a openfaas function without violating the core principle of server-less model
The few key points of the design are
As faas-flow is growing its feature, its important to document the core design principle.
Current state store interface is not flexible, which makes it hard to fit new usecases
We will change the interface into more granular so that we can use the same to support more operation
The new state store definition would be
// StateStore for saving execution state
type StateStore interface {
// Configure the StateStore with flow name and request ID
Configure(flowName string, requestId string)
// Initialize the StateStore (called only once in a request span)
Init() error
// Set a value (override existing, or create one)
Set(key string, value string) error
// Get a value
Get(key string) (string, error)
// Compare and Update a value
Update(key string, oldValue string, newValue string) error
// Cleanup all the resorces in StateStore (called only once in a request span)
Cleanup() error
}
Current faas-flow makes the call directly to the flow function for function operation,
http://<function-name>:8080
but the call should be made via the gateway as
http://<gateway-url>/function/<function-name>
Currently DataStore
implementation will be called whenever a Set()
, Get()
or Del()
has been called on the context
. Although state can be saved and managed locally until the request is being forwarded for next phase. This will reduce multiple call to DataStore
when theres only one phase
Faaschain is stateless in nature. Currently the only state in chain is executionposition
and requestId
. For partial request the state value are forwarded as a data.
Although It might be a use-case when user need more data to be stored in the state. To achieve that we can provide user an interface StateManager
as:
type StateManager interface {
Set(key string, value interface{}) error
Get(key string) (interface{}, error)
Del(key string) error
}
User should be able to request the global state manager from request context as
context := faaschain.GetContext()
context.Get(key)
context.Set(key)
By Default the StateManager will be set to RequestEmbedStateManager
which is provided by the faaschain
. It will forward the state as a data to the next call with json
to encode.
User can implement and set the state manager as SetStateManager()
An example of setting state and handling may look like
chain.SetStateManager(monio_state_manager)
chain.ApplyAsync("function").ApplyModifier(func ([]byte { faaschain.GetContext.Set("some", "data") }).
chain.ApplyAsync("function"). ApplyModifier(func ([]byte { data := faaschain.GetContext.Get("some") })
Exporter includes both subdag and foreach dag in case of foreach branch node. Although the behaviour is different, we store foreach DAG as a subdag to reduce space. When exporting graph definition the Subdag must not be considered if a node is foreach
Faaschain now forwards partial request to faaschain. But it could easily be spoofed if someone else send a partial pipeline request to faaschain. One way to handle it is to use HMAC
. We can sign the request of partial faaschain
with the secret key
and forward with the digest
. Once the data is received back we can verify if the request has been generated by faaschain
with the secret key
and the digest
@s8sg I need you help. about dag and AddForEachBranch
here is my code. this code is modify from syncflow
package function
import (
"bytes"
"fmt"
faasflow "github.com/s8sg/faas-flow"
consulStateStore "github.com/s8sg/faas-flow-consul-statestore"
"net/http"
"os"
"strings"
)
// Define provide definiton of the workflow
func Define(flow *faasflow.Workflow, context *faasflow.Context) (err error) {
dag := faasflow.CreateDag()
foreachDag := dag.AddForEachBranch("foreach", func(data []byte) map[string][]byte {
http.Post("http://requestbin.fullcontact.com/1d0l7661", "text/plain", strings.NewReader("input:"+string(data)))
values := bytes.Split(data, []byte("-"))
rmap := make(map[string][]byte)
for i := 0; i < len(values); i++ {
rmap[fmt.Sprintf("%d", i)] = values[i]
}
fmt.Println("foreach input:", string(data), rmap)
return rmap
}, faasflow.Aggregator(func(results map[string][]byte) ([]byte, error) {
buf := bytes.NewBuffer([]byte("["))
var start bool
for _, v := range results {
if start {
start = true
} else {
buf.WriteByte(',')
}
buf.Write(v)
}
buf.WriteByte(']')
fmt.Println("foreach-arggress: ", buf.String())
http.Post("http://requestbin.fullcontact.com/1d0l7661", "text/plain", strings.NewReader("foreach:"+buf.String()))
return buf.Bytes(), nil
}))
foreachDag.AddFunction("call-f1", "func1")
//foreachDag.AddEdge("foreach", "callf1", faasflow.Sync)
dag.AddFunction("call-f2", "func2")
dag.AddEdge("foreach", "call-f2", faasflow.Sync)
dag.AddModifier("dump", func(data []byte) ([]byte, error) {
fmt.Println("dump: ", string(data))
http.Post("http://requestbin.fullcontact.com/1d0l7661", "text/plain", strings.NewReader("dupm:"+string(data)))
return data, nil
})
dag.AddEdge("call-f2", "dump", faasflow.Sync)
flow.ExecuteDag(dag)
return
}
// DefineStateStore provides the override of the default StateStore
func DefineStateStore() (faasflow.StateStore, error) {
consulss, err := consulStateStore.GetConsulStateStore(os.Getenv("consul_url"), os.Getenv("consul_dc"))
if err != nil {
return nil, err
}
return consulss, nil
}
// ProvideDataStore provides the override of the default DataStore
func DefineDataStore() (faasflow.DataStore, error) {
return nil, nil
}
python func1
def handle(req):
return "func1("+req+")"
python func2
def handle(req):
return "func2("+req+")"
echo "aa-bb-cc" | faas-cli invoke synflow
recieve dump data from http://requestbin.fullcontact.com.
dump: func2('somedata')
only can recieve, no more other data
input:aa-bb-cc-dd
https://github.com/s8sg/faas-flow/raw/master/doc/overview.jpg
Async Chian should be Async Chain :)
The SYNC Chain
example calls two functions but returns only the result of the second function.
How can I pipe the result of the first function into the second function?
Each sequential chain itself a DAG. We don't need to have different Structure and handling for chain as we can use the same as of DAG, where each node forwards and receives data from only one Node
The API
should stay same as it is now.
For Each is a way what allow an edge to have 1:N
relation ship where N
is determined dynamically
or at runtime
.
It can be added with Option as:
AddEdge(from, to string, ForEach(func([]byte) (branches [][]byte, err error){
// implement
})
Based on the length of returned array (no of branches
) of []byte
, the N
will be determined at runtime.
In case the length is 0
or [][]byte
is nil
, the execution will be terminated
faas-flow/template/faas-flow/handler.go
Line 509 in 86030be
This line has a bug?
client := &http.Client{}
res, resErr := client.Do(httpreq)
resdata, _ := ioutil.ReadAll(res.Body)
if resErr != nil {
return resdata, resErr
}
if resErr was not nil, ioutil.ReadAll(res.Body) will cause a painic
Serialiser is a fassflow Operation
s Option
that can be added to serialise inputs to a vertex when there is more than one InDegree. Serialiser assigned to a Vertex will be called before the actual call to the operations in the vertex:
dag.CreateFunctionVertex("f1", "do-something", Serializer(func(inputs map[string][]byte) ([]byte, error) {
// inputs is a map of <id(string)>: <data([]byte)>
// ... perform serialisation
return serialised_data, nil
})
Any type of Operation
s in a DAG/CHAIN can be assigned with a Serializer
.
For Chain or Dag vertex with only one in-degree will result a call to Serializer()
with only one input.
Unlike airflow
, the Serializer
need not to have any previous knowledge of the incoming data making it more dynamic and flexible in nature.
Currently if using existing DAG
s vertex id
to add a operation, the vertex get replaced by the new operation, although it should be appended in a operation set serially
based on the order they were added
Currently when executing a DAG the result of node gets forwarded to other nodes.
Although it might be a scenario when having more than one children node each of them might need result in different format or no result at all. AddEdge(from, to string)
can be altered to have a functionality that would help to provide Options
i.e.
AddEdge(from, to string, Option...)
One of the Option
can be
Forwarder(func(result []byte) ([]byte){
})
to alter data when forwarded to the to
node,
or use the NullForwarder
NullForwarder = Forwarder(func(result []byte) ([]byte){
return nil
})
to avoid forwarding data at all
TLDR
Wondering the place faas-flow/faas-tower has in batch management ie batch management built on top of faas-flow or completely separate. What I call batch management: (job here is referencing a function call that is part of a batch not a k8s job) viewing batch/job states, pausing/canceling jobs/batches, notification if a batch/job fails, exponential backoff of failed functions, caching of function results.
Longer version
I've recently been doing some PoC at work wrt to OpenFaas. Aside from using it as just a FaaS many are interested in shifting their batch processing to OpenFaas. I don't think thats necessarily wrong and potentially OpenFaas itself may add some functionality wrt to batch management (openfaas/faas#657). In the mean time I was looking to build a small management layer that could do the above stated tasks.
I was looking to get some input from you (@s8sg ) if you think its best to build on top of faas-tower to get this functionality (using the distributed tracing you already have built) or maybe adding some extra functionality into faas-flow that could facilitate batch management ๐
Create a separate flow UI as a Function to debug flow that are deployed.
Debug information could be:
1> Execution Count
2> Dotted Graph
3> Node information (Modifier/Function/Callback)
Currently we provide Apply()
and ApplyAsync()
call. We can change that to Apply()
with optional Sync
as Option
to Apply()
// chain.ApplyAsync("function")
chain.Apply("function")
and
//chain.Apply("function")
chain.Apply("function", Sync)
Which will make all call by default Async
faas-flow forgot to implement health check
Currently Options
are used for
It make confusion which option applies in which call
We will provide separate options for these
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.