Comments (8)
Hi @chennqqi, Thank you for creating the issue. The example for the project are still shallow.
I have created an example at:
https://github.com/s8sg/branching-in-faas-flow.
This works fine, there is also a conditional dag example available as:
https://github.com/s8sg/travel-agent-saga
Although there is a bug in the new faas-flow-dashboard which makes the dot graph distorted for foreach condition.
from faas-flow.
The foreach example above should create the below flow:
from faas-flow.
Thank you for your enthusiasm and timely response!But i still confused about how to create 1:N flow
My Requirement like this:
//handler: functionA
function Handler(req []byte) {
//some thing processing(req), then return data
result := process(req)
return result
}
//handler: functionB
function B(req []byte) {
//some thing processing(req), then return data
result := process2(req)
return result
}
//flow: faasflow
function Define() {
foreach := func(data []byte) [][]byte{
//split functionA's output
//cut functionA's output into piceces, each one as functionB's input
//then parallel run functionB with these input
return bytes.Split(data, '-')
}
aggregator := func(inputs map[string][]byte /* or [][]byte */) {
wait all parallel functionB return and Aggregate all functionB's output
}
flow.Apply("functionA").Foreach("functionB", foreach, aggregator).Modifier(func(data []byte){
//callback or store result
})
}
I hope my expression of this fake code is accurate. All english above was translated by google.
from faas-flow.
As per my understanding the desired patter is:
Call A()
For Each result of A:
Call B()
Aggregate()
Store()
flow.Apply().Apply()
this pattern is considered as a chain expression.
Currently in faas-flow chain expression is only suitable for DAG that doesn’t have a branch.
For branches you need to use DAG expression. Which is basically create nodes and define edges between them.
The DAG defintion will be like below
maindag:
call-a -----------> foreach -----------> store
(modifier-node) (foreachDag-node) (modifier-node)
foreachDag:
call-b
(modifier-node)
First we need to define a DAG for the flow
// Create a new DAG
maindag := faasflow.CreateDag()
// set the DAG in the flow
flow.ExecuteDag(maindag)
From your example it seems you are not calling the openfaas function but your local function. For local function execution we have Modifier Expression. You can call the local function from your modifier. For calling function A()
you can create a node with modifier as:
// this creates a node named `call-a` with a modifier
// the modifier calls your local function `A()` and returns the result
maindag.AddModifier("call-a", func(data []byte) ([]byte, error) {
data = A(data)
return data, nil
})
For calling function B()
using a foreach pattern first you need to add a foreach node and defined the brached DAG. AddForEachBranch()
will create a node. The arguments are a foreach
function that generates the splits as map[string][]byte
and the Aggregator()
that aggregates the results.
The call to AddForEachBranch
returns the foreachDag
that will be executed for all results returned from the foreach
function. We need to add the modifier to call B()
in the foreachDag
// define the foreach branching node
foreachDag := maindag.AddForEachBranch("foreach",
// function that splits the result of A()
func(data []byte) map[string][]byte {
splits := bytes.Split(data, '-')
result := make(map[string][]byte)
key := ""
// we create a map with all splits
for index, split := range splits {
key = strconv.Itoa(index)
result[key] = split
}
return result
},
// function that aggregated the result of the branched DAG
faasflow.Aggregator(func(results map[string][]byte) ([]byte, error) {
// aggregate results of all B() execution
return []byte(""), nil
}),
)
// define a node to call B from foreachDag
foreachDag.AddModifier("call-b", func(data []byte) ([]byte, error) {
data = B(data)
return data, nil
})
Now you can define another modifier to store the result, but remember to define it in the maindag
maindag.AddModifier("store", func(data []byte) ([]byte, error) {
Store(data)
return nil, nil
})
Then define the edges between the nodes you created
maindag.AddEdge("call-a", "foreach")
maindag.AddEdge("foreach", "store")
We don't need any edge for foreachDag
as it has only one node
So the handler will look like:
//handler: functionA
func A(req []byte) {
//some thing processing(req), then return data
result := process(req)
return result
}
//handler: functionB
func B(req []byte) {
//some thing processing(req), then return data
result := process2(req)
return result
}
// handler: Store
func Store(data []byte) {
// do something
}
// Define provide definiton of the workflow
func Define(flow *faasflow.Workflow, context *faasflow.Context) (err error) {
maindag := faasflow.CreateDag()
flow.ExecuteDag(maindag)
maindag.AddModifier("call-a", func(data []byte) ([]byte, error) {
data = A(data)
return data, nil
})
foreachDag := maindag.AddForEachBranch("foreach",
// function that splits the result of A()
func(data []byte) map[string][]byte {
splits := bytes.Split(data, '-')
result := make(map[string][]byte)
key := ""
// we create a map with all splits
for index, split := range splits {
key = strconv.Itoa(index)
result[key] = split
}
return result
},
// function that aggregated the result of the branched DAG
faasflow.Aggregator(func(results map[string][]byte) ([]byte, error) {
// aggregate results of all B() execution
return []byte(""), nil
}),
)
foreachDag.AddModifier("call-b", func(data []byte) ([]byte, error) {
data = B(data)
return data, nil
})
maindag.AddModifier("store", func(data []byte) ([]byte, error) {
Store(data)
return nil, nil
})
maindag.AddEdge("call-a", "foreach")
maindag.AddEdge("foreach", "store")
}
As you passing data in between nodes you also need to consider defining the data store. You may use minio for that. Check https://github.com/s8sg/faas-flow-minio-datastore
from faas-flow.
An Modifier is a function(method) implemented by faas-flow lang
An handler is a function(faas) implemented by other faas function
So
I still can add an function(faas) into Define
function
maindag.AddFunction("functionA", "run-a")
maindag.AddEdge("run-a", "call-a")
or call directly
maindag.AddEdge("run-a", "foreach")
from faas-flow.
I'm not sure what you meant by other faas flow
, if functionA
and functionB
are openfaas function it will be:
// for calling functionA
maindag.AddFunction("call-a", "functionA")
// ...
// for calling functionB
foreachDag.AddFunction("call-b", "functionB")
Everything else stays the same, so the Define() becomes:
// handler: Store
func Store(data []byte) {
// do something
}
// Define provide definiton of the workflow
func Define(flow *faasflow.Workflow, context *faasflow.Context) (err error) {
maindag := faasflow.CreateDag()
flow.ExecuteDag(maindag)
maindag.AddFunction("call-a", "functionA")
foreachDag := maindag.AddForEachBranch("foreach",
// function that splits the result of A()
func(data []byte) map[string][]byte {
splits := bytes.Split(data, '-')
result := make(map[string][]byte)
key := ""
// we create a map with all splits
for index, split := range splits {
key = strconv.Itoa(index)
result[key] = split
}
return result
},
// function that aggregated the result of the branched DAG
faasflow.Aggregator(func(results map[string][]byte) ([]byte, error) {
// aggregate results of all B() execution
return []byte(""), nil
}),
)
foreachDag.AddFunction("call-b", "functionB")
maindag.AddModifier("store", func(data []byte) ([]byte, error) {
Store(data)
return nil, nil
})
maindag.AddEdge("call-a", "foreach")
maindag.AddEdge("foreach", "store")
}
from faas-flow.
other faas function
not other faas flow
, my mistake
I think i could finish my flow now.
Thank you for your help again!
from faas-flow.
Great. I'm closing the issue for now.
from faas-flow.
Related Issues (20)
- add CutDag(id string) and Reverse() sdk.Dag funcs HOT 6
- How do you think about dapr as data store?
- are you thinking of visual based programming? HOT 2
- Allow ability to create aggregators as a function
- Change documentation and configuration to Kubernets default
- Faas-flow Infra should deploy the whole Faas-flow stack HOT 1
- Keep the default flow configuration in the Dockerfile of Template HOT 1
- Make Consul and Minio as default StateStore and DataStore
- build failed HOT 2
- Failed to init flow HOT 1
- Multiple-node Dag's execution fails HOT 3
- Flow with more than one nodes HOT 7
- Flow execution time measurement
- Parallel execution speed up HOT 4
- faas build the demo case yml failed
- Function output pipe into next function HOT 2
- Cannot use context.Set in faas-flow pipeline. HOT 1
- Fass flow has an inncorrect flow name HOT 2
- Docker images problem HOT 3
- Is there some function examples? HOT 1
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from faas-flow.