Comments (24)
You should provide an external data store in order to execute any DAG that has more than one node, and you expect data to be forwarded.
https://github.com/s8sg/faas-flow/blob/master/template/faas-flow/handler.go#L1029
// If dags has more than one nodes
// and nodes forwards data, data store need to be external
if fhandler.getPipeline().Dag.HasEdge() &&
!fhandler.getPipeline().Dag.IsExecutionFlow() {
if !dataSOverride {
panic(fmt.Sprintf("[Request `%s`] Failed not an execution flow, DAG data flow need external DataStore", fhandler.id))
}
}
The expected behaviour should be a panic on the graph validation, please check the log
faasflow.Sync
is only applicable for chain expression which allows operation to be chained in a single node. But for a dag it has no use. I agree its confusing and maybe having separate Options
for Chain is reasonable.
If you using DAG
the only way to put multiple operations in a single node is to use the same nodeId for all operations
foreachDag.AddFunction("nodeX", "func1")
foreachDag.AddFunction("nodeX", "func2")
But for a DAG that use branching (foreach, condition)
, its always has more than one Node.
And if you want to forward data you need to define datastore.
Example Minio:
minioDataStore "github.com/s8sg/faas-flow-minio-datastore"
func DefineDataStore() (faasflow.DataStore, error) {
// initialize minio DataStore
miniods, err := minioDataStore.InitFromEnv()
if err != nil {
return nil, err
}
return miniods, nil
}
Check https://github.com/s8sg/faas-flow-minio-datastore
from faas-flow.
I change some code. add minio data store.
package function
import (
"bytes"
"fmt"
faasflow "github.com/s8sg/faas-flow"
consulStateStore "github.com/s8sg/faas-flow-consul-statestore"
minioDataStore "github.com/s8sg/faas-flow-minio-datastore"
"log"
"net/http"
"os"
"strings"
)
// Define provide definiton of the workflow
func Define(flow *faasflow.Workflow, context *faasflow.Context) (err error) {
dag := faasflow.CreateDag()
foreachDag := dag.AddForEachBranch("foreach", func(data []byte) map[string][]byte {
http.Post("http://requestbin.fullcontact.com/1d0l7661", "text/plain", strings.NewReader("input:"+string(data)))
values := bytes.Split(data, []byte("-"))
rmap := make(map[string][]byte)
for i := 0; i < len(values); i++ {
rmap[fmt.Sprintf("%d", i)] = values[i]
}
//fmt.Println("foreach input:", string(data), rmap)
return rmap
},
//faasflow.ExecutionBranch, // delete this if intermidiate data need data store
faasflow.Aggregator(func(results map[string][]byte) ([]byte, error) {
buf := bytes.NewBuffer([]byte("["))
var start bool
for _, v := range results {
if start {
start = true
} else {
buf.WriteByte(',')
}
buf.Write(v)
}
buf.WriteByte(']')
//fmt.Println("foreach-arggress: ", buf.String())
http.Post("http://requestbin.fullcontact.com/1d0l7661", "text/plain", strings.NewReader("foreach:"+buf.String()))
return buf.Bytes(), nil
}))
foreachDag.AddFunction("call-f1", "func1")
foreachDag.AddModifier("log-f1", func(data []byte) ([]byte, error) {
log.Println("log-f1:", string(data))
http.Post("http://requestbin.fullcontact.com/1d0l7661", "text/plain", strings.NewReader("log-f1:"+string(data)))
return data, nil
})
foreachDag.AddEdge("call-f1", "log-f1")
dag.AddFunction("call-f2", "func2")
dag.AddEdge("foreach", "call-f2")
dag.AddModifier("dump", func(data []byte) ([]byte, error) {
log.Println("dump:", string(data))
http.Post("http://requestbin.fullcontact.com/1d0l7661", "text/plain", strings.NewReader("dupm:"+string(data)))
return data, nil
})
dag.AddEdge("call-f2", "dump")
flow.ExecuteDag(dag)
return
}
// DefineStateStore provides the override of the default StateStore
func DefineStateStore() (faasflow.StateStore, error) {
consulss, err := consulStateStore.GetConsulStateStore(os.Getenv("consul_url"), os.Getenv("consul_dc"))
if err != nil {
return nil, err
}
return consulss, nil
}
// ProvideDataStore provides the override of the default DataStore
func DefineDataStore() (faasflow.DataStore, error) {
// initialize minio DataStore
miniods, err := minioDataStore.InitFromEnv()
if err != nil {
return nil, err
}
return miniods, nil
}
func1 and func2 still not run. no sync or async call to func1 and func2
from faas-flow.
Jiǎnhuà dàimǎ
4/5000
Simplified code, no branch, just dag
package function
import (
//"bytes"
//"fmt"
faasflow "github.com/s8sg/faas-flow"
consulStateStore "github.com/s8sg/faas-flow-consul-statestore"
minioDataStore "github.com/s8sg/faas-flow-minio-datastore"
"log"
"net/http"
"os"
"strings"
)
// Define provide definiton of the workflow
func Define(flow *faasflow.Workflow, context *faasflow.Context) (err error) {
dag := faasflow.CreateDag()
dag.AddFunction("call-f1", "func1")
dag.AddModifier("log-f1", func(data []byte) ([]byte, error) {
log.Println("log-f1:", string(data))
http.Post("http://requestbin.fullcontact.com/1d0l7661", "text/plain", strings.NewReader("log-f1:"+string(data)))
return data, nil
})
dag.AddEdge("call-f1", "log-f1")
dag.AddFunction("call-f2", "func2")
dag.AddEdge("log-f1", "call-f2")
dag.AddModifier("dump", func(data []byte) ([]byte, error) {
log.Println("dump:", string(data))
http.Post("http://requestbin.fullcontact.com/1d0l7661", "text/plain", strings.NewReader("dupm:"+string(data)))
return data, nil
})
dag.AddEdge("call-f2", "dump")
flow.ExecuteDag(dag)
return
}
// DefineStateStore provides the override of the default StateStore
func DefineStateStore() (faasflow.StateStore, error) {
consulss, err := consulStateStore.GetConsulStateStore(os.Getenv("consul_url"), os.Getenv("consul_dc"))
if err != nil {
return nil, err
}
return consulss, nil
}
// ProvideDataStore provides the override of the default DataStore
func DefineDataStore() (faasflow.DataStore, error) {
// initialize minio DataStore
miniods, err := minioDataStore.InitFromEnv()
if err != nil {
return nil, err
}
return miniods, nil
}
logs:
2019/06/21 08:16:26 Version: 0.8.0 SHA: 829262e493baf739fbd1c75d0ee5e853d15c7561
2019/06/21 08:16:26 Writing lock-file to: /tmp/.lock
2019/06/21 08:18:20 Forking fprocess.
2019/06/21 08:18:20 Query
2019/06/21 08:18:20 Path /
2019/06/21 08:18:24 stderr: 2019/06/21 08:18:20 tracing is disabled
2019/06/21 08:18:20 [Request `bk695j0994hrd77gckmg`] Created
2019/06/21 08:18:20 [Request `bk695j0994hrd77gckmg`] DAG state initiated at StateStore
2019/06/21 08:18:20 [Request `bk695j0994hrd77gckmg`] Executing node 0_1_call-f1
2019/06/21 08:18:20 [Request `bk695j0994hrd77gckmg`] Executing function `func1`
2019/06/21 08:18:23 [Request `bk695j0994hrd77gckmg`] Completed execution of Node 0_1_call-f1
2019/06/21 08:18:24 [Request `bk695j0994hrd77gckmg`] Intermidiate result from Node 0_1_call-f1 to 0_2_log-f1 stored as 0_1_call-f1--0_2_log-f1
2019/06/21 08:18:24 [Request `bk695j0994hrd77gckmg`] Async request submitted for Node 0_2_log-f1
2019/06/21 08:18:24 Duration: 4.035092 seconds
2019/06/21 08:22:35 Forking fprocess.
2019/06/21 08:22:35 Query export-dag=true
2019/06/21 08:22:35 Path /
2019/06/21 08:22:35 Duration: 0.004253 seconds
{
"id": "0",
"start-node": "call-f1",
"end-node": "dump",
"has-branch": false,
"has-edge": true,
"is-execution-dag": true,
"nodes": {
"call-f1": {
"id": "call-f1",
"node-index": 1,
"is-dynamic": false,
"is-condition": false,
"is-foreach": false,
"has-aggregator": false,
"has-sub-aggregator": false,
"has-subdag": false,
"in-degree": 0,
"out-degree": 0,
"operations": [
{
"is-mod": false,
"is-function": true,
"is-callback": false,
"name": "func1",
"has-response-handler": false,
"has-failure-handler": false
}
],
"childrens": [
"log-f1"
]
},
"call-f2": {
"id": "call-f2",
"node-index": 3,
"is-dynamic": false,
"is-condition": false,
"is-foreach": false,
"has-aggregator": false,
"has-sub-aggregator": false,
"has-subdag": false,
"in-degree": 0,
"out-degree": 0,
"operations": [
{
"is-mod": false,
"is-function": true,
"is-callback": false,
"name": "func2",
"has-response-handler": false,
"has-failure-handler": false
}
],
"childrens": [
"dump"
]
},
"dump": {
"id": "dump",
"node-index": 4,
"is-dynamic": false,
"is-condition": false,
"is-foreach": false,
"has-aggregator": false,
"has-sub-aggregator": false,
"has-subdag": false,
"in-degree": 0,
"out-degree": 0,
"operations": [
{
"is-mod": true,
"is-function": false,
"is-callback": false,
"name": "",
"has-response-handler": false,
"has-failure-handler": false
}
]
},
"log-f1": {
"id": "log-f1",
"node-index": 2,
"is-dynamic": false,
"is-condition": false,
"is-foreach": false,
"has-aggregator": false,
"has-sub-aggregator": false,
"has-subdag": false,
"in-degree": 0,
"out-degree": 0,
"operations": [
{
"is-mod": true,
"is-function": false,
"is-callback": false,
"name": "",
"has-response-handler": false,
"has-failure-handler": false
}
],
"childrens": [
"call-f2"
]
}
},
"is-valid": true
}
2019/06/21 08:22:35 Forking fprocess.
2019/06/21 08:22:35 Query export-dag=true
2019/06/21 08:22:35 Path /
2019/06/21 08:22:35 Duration: 0.004995 seconds
{
"id": "0",
"start-node": "call-f1",
"end-node": "dump",
"has-branch": false,
"has-edge": true,
"is-execution-dag": true,
"nodes": {
"call-f1": {
"id": "call-f1",
"node-index": 1,
"is-dynamic": false,
"is-condition": false,
"is-foreach": false,
"has-aggregator": false,
"has-sub-aggregator": false,
"has-subdag": false,
"in-degree": 0,
"out-degree": 0,
"operations": [
{
"is-mod": false,
"is-function": true,
"is-callback": false,
"name": "func1",
"has-response-handler": false,
"has-failure-handler": false
}
],
"childrens": [
"log-f1"
]
},
"call-f2": {
"id": "call-f2",
"node-index": 3,
"is-dynamic": false,
"is-condition": false,
"is-foreach": false,
"has-aggregator": false,
"has-sub-aggregator": false,
"has-subdag": false,
"in-degree": 0,
"out-degree": 0,
"operations": [
{
"is-mod": false,
"is-function": true,
"is-callback": false,
"name": "func2",
"has-response-handler": false,
"has-failure-handler": false
}
],
"childrens": [
"dump"
]
},
"dump": {
"id": "dump",
"node-index": 4,
"is-dynamic": false,
"is-condition": false,
"is-foreach": false,
"has-aggregator": false,
"has-sub-aggregator": false,
"has-subdag": false,
"in-degree": 0,
"out-degree": 0,
"operations": [
{
"is-mod": true,
"is-function": false,
"is-callback": false,
"name": "",
"has-response-handler": false,
"has-failure-handler": false
}
]
},
"log-f1": {
"id": "log-f1",
"node-index": 2,
"is-dynamic": false,
"is-condition": false,
"is-foreach": false,
"has-aggregator": false,
"has-sub-aggregator": false,
"has-subdag": false,
"in-degree": 0,
"out-degree": 0,
"operations": [
{
"is-mod": true,
"is-function": false,
"is-callback": false,
"name": "",
"has-response-handler": false,
"has-failure-handler": false
}
],
"childrens": [
"call-f2"
]
}
},
"is-valid": true
}
from faas-flow.
Can you see is there any other POD that serving the request Node 0_2_log-f1
Maybe search by label
It doesn't seems like a Minio Problem
from faas-flow.
Also check the nats-streaming-server
log.
from faas-flow.
there's no node name or label like Node 0_2_log-f1.
i checked minio work well. now try to check nats
from faas-flow.
@s8sg finally i found why flow call can't trigger next function(flow node).
In stack.yaml( or function configure file)
flow function name
must equal to environment.workflow_name
here is only other problem data not pass to next flow-node
more simplified code
dag.AddFunction("call-f1", "func1")
dag.AddFunction("call-f2", "func2")
dag.AddEdge("call-f1", "call-f2")
dag.AddModifier("dump", func(data []byte) ([]byte, error) {
log.Println("dump:", string(data))
http.Post("http://requestbin.fullcontact.com/1c399km1", "text/plain", strings.NewReader("dump:" + string(data)))
return data, nil
})
dag.AddEdge("call-f2", "dump")
pipeline data be set to empty after func1 run
from faas-flow.
problem was here
faas-flow/template/faas-flow/handler.go
Line 1061 in db26577
if fhandler.partial &&
!fhandler.getPipeline().Dag.IsExecutionFlow() {
data, gerr = getDagIntermediateData(fhandler, context)
if gerr != nil {
gerr := fmt.Errorf("failed to retrive intermediate result, error %v", gerr)
handleFailure(fhandler, context, gerr)
}
}
before excute a modifier, should getDagIntermediateData
fhandler.getPipeline().Dag.IsExecutionFlow()
returned true
skip getDagIntermediateData
so modifier request data is empty.
from faas-flow.
The problem is not the check itself, the check is there to determine if the dag forwards data or not. But it should have returned false in your definition. I’ll check the code and let you know.
from faas-flow.
OK, Thank you.
from faas-flow.
Fixed in https://github.com/s8sg/faas-flow/tree/315a019bf9645c41fae9a68b95f56cccb1077885
from faas-flow.
@chennqqi Can you please check the same with latest template
from faas-flow.
I'm checking when you commit.
dag simple call test ok.
I'm checking branch call now
from faas-flow.
test code:
dag := faasflow.CreateDag()
foreachDag := dag.AddForEachBranch("foreach", func(data []byte) map[string][]byte {
http.Post("http://q1.ipaddr.top:5888", "text/plain", strings.NewReader("input:"+string(data)))
values := bytes.Split(data, []byte("-"))
rmap := make(map[string][]byte)
for i := 0; i < len(values); i++ {
rmap[fmt.Sprintf("%d", i)] = values[i]
}
log.Println("foreach input:", string(data), rmap)
return rmap
},
// faasflow.ExecutionBranch, // delete this if intermidiate data need data store
faasflow.Aggregator(func(results map[string][]byte) ([]byte, error) {
buf := bytes.NewBuffer([]byte("["))
var start bool
for _, v := range results {
if !start {
start = true
} else {
buf.WriteByte(',')
}
buf.Write(v)
}
buf.WriteByte(']')
log.Println("foreach-arggress: ", buf.String())
http.Post("http://q1.ipaddr.top:5888", "text/plain", strings.NewReader("foreach:"+buf.String()))
return buf.Bytes(), nil
}))
foreachDag.AddFunction("call-f1", "func1")
if false {
foreachDag.AddModifier("log-f1", func(data []byte) ([]byte, error) {
log.Println("log-f1:", string(data))
http.Post("http://q1.ipaddr.top:5888", "text/plain", strings.NewReader("log-f1:"+string(data)))
return data, nil
})
foreachDag.AddEdge("call-f1", "log-f1")
}
dag.AddFunction("call-f2", "func2")
dag.AddEdge("foreach", "call-f2")
dag.AddModifier("dump", func(data []byte) ([]byte, error) {
log.Println("dump:", string(data))
http.Post("http://q1.ipaddr.top:5888", "text/plain", strings.NewReader("dupm:"+string(data)))
return data, nil
})
dag.AddEdge("call-f2", "dump")
flow.ExecuteDag(dag)
return
input: aa-bb
expect: func2[func1(aa),func2(aa)]
actual: func2[func1(),func1()]
logs:
2019/06/24 12:54:16 Version: 0.8.0 SHA: 829262e493baf739fbd1c75d0ee5e853d15c7561
2019/06/24 12:54:16 Writing lock-file to: /tmp/.lock
2019/06/24 12:54:55 Forking fprocess.
2019/06/24 12:54:55 Query
2019/06/24 12:54:55 Path /
2019/06/24 12:54:57 stderr: 2019/06/24 12:54:56 tracing is disabled
2019/06/24 12:54:56 [Request `bk8cg87le8oq10sc51f0`] Created
2019/06/24 12:54:56 addEdge from foreach to call-f2 before excution is false
2019/06/24 12:54:56 addEdge from foreach to call-f2 after excution is false
2019/06/24 12:54:56 addEdge from call-f2 to dump before excution is false
2019/06/24 12:54:56 addEdge from call-f2 to dump after excution is false
2019/06/24 12:54:56 FUNCTION DEFINE=> fhandler.partial false false
2019/06/24 12:54:57 [Request `bk8cg87le8oq10sc51f0`] DAG state initiated at StateStore
2019/06/24 12:54:57 fhandler.partial false false
2019/06/24 12:54:57 execute before data: aa-bb
2019/06/24 12:54:57 execute data: aa-bb
2019/06/24 12:54:57 [Request `bk8cg87le8oq10sc51f0`] Executing node 0_1_foreach
2019/06/24 12:54:57 [Request `bk8cg87le8oq10sc51f0`] Processing dynamic node 0_1_foreach
2019/06/24 12:54:57 foreach input: aa-bb map[0:[97 97] 1:[98 98]]
2019/06/24 12:54:57 [Request `bk8cg87le8oq10sc51f0`] Intermidiate result for dynamic id 0 from Node 0_1_foreach to 1_1_call-f1 stored as 0--0_1_foreach--1_1_call-f1
2019/06/24 12:54:57 [Request `bk8cg87le8oq10sc51f0`] Async request submitted for Node 0_1_foreach dynamic id 0
2019/06/24 12:54:57 [Request `bk8cg87le8oq10sc51f0`] Intermidiate result for dynamic id 1 from Node 0_1_foreach to 1_1_call-f1 stored as 1--0_1_foreach--1_1_call-f1
2019/06/24 12:54:57 [Request `bk8cg87le8oq10sc51f0`] Async request submitted for Node 0_1_foreach dynamic id 1
2019/06/24 12:54:57 Duration: 1.326729 seconds
2019/06/24 12:54:57 Forking fprocess.
2019/06/24 12:54:57 Query
2019/06/24 12:54:57 Path /
2019/06/24 12:54:57 Forking fprocess.
2019/06/24 12:54:57 Query
2019/06/24 12:54:57 Path /
2019/06/24 12:54:58 stderr: 2019/06/24 12:54:57 tracing is disabled
2019/06/24 12:54:57 [Request `bk8cg87le8oq10sc51f0`] Received
2019/06/24 12:54:57 addEdge from foreach to call-f2 before excution is false
2019/06/24 12:54:57 addEdge from foreach to call-f2 after excution is false
2019/06/24 12:54:57 addEdge from call-f2 to dump before excution is false
2019/06/24 12:54:57 addEdge from call-f2 to dump after excution is false
2019/06/24 12:54:57 FUNCTION DEFINE=> fhandler.partial true false
2019/06/24 12:54:57 fhandler.partial true false
2019/06/24 12:54:57 execute before data:
2019/06/24 12:54:57 execute data:
2019/06/24 12:54:57 [Request `bk8cg87le8oq10sc51f0`] Executing node 1_1_call-f1
2019/06/24 12:54:57 [Request `bk8cg87le8oq10sc51f0`] Executing function `func1`
2019/06/24 12:54:58 [Request `bk8cg87le8oq10sc51f0`] Completed execution of Node 1_1_call-f1
2019/06/24 12:54:58 [Request `bk8cg87le8oq10sc51f0`] Executing parent dag Node 0_1_foreach
2019/06/24 12:54:58 [Request `bk8cg87le8oq10sc51f0`] Intermidiate result from Node 0_1_foreach to 0_2_call-f2 stored as 0--0_1_foreach--0_2_call-f2
2019/06/24 12:54:58 [Request `bk8cg87le8oq10sc51f0`] request for Node 0_2_call-f2 is delayed, completed indegree: 1/2
2019/06/24 12:54:58 Duration: 1.322632 seconds
2019/06/24 12:54:59 stderr: 2019/06/24 12:54:57 tracing is disabled
2019/06/24 12:54:57 [Request `bk8cg87le8oq10sc51f0`] Received
2019/06/24 12:54:57 addEdge from foreach to call-f2 before excution is false
2019/06/24 12:54:57 addEdge from foreach to call-f2 after excution is false
2019/06/24 12:54:57 addEdge from call-f2 to dump before excution is false
2019/06/24 12:54:57 addEdge from call-f2 to dump after excution is false
2019/06/24 12:54:57 FUNCTION DEFINE=> fhandler.partial true false
2019/06/24 12:54:57 fhandler.partial true false
2019/06/24 12:54:57 execute before data:
2019/06/24 12:54:57 execute data:
2019/06/24 12:54:57 [Request `bk8cg87le8oq10sc51f0`] Executing node 1_1_call-f1
2019/06/24 12:54:57 [Request `bk8cg87le8oq10sc51f0`] Executing function `func1`
2019/06/24 12:54:58 [Request `bk8cg87le8oq10sc51f0`] Completed execution of Node 1_1_call-f1
2019/06/24 12:54:58 [Request `bk8cg87le8oq10sc51f0`] Executing parent dag Node 0_1_foreach
2019/06/24 12:54:59 [Request `bk8cg87le8oq10sc51f0`] Intermidiate result from Node 0_1_foreach to 0_2_call-f2 stored as 1--0_1_foreach--0_2_call-f2
2019/06/24 12:54:59 [Request `bk8cg87le8oq10sc51f0`] Async request submitted for Node 0_2_call-f2
2019/06/24 12:54:59 Duration: 1.328646 seconds
2019/06/24 12:54:59 Forking fprocess.
2019/06/24 12:54:59 Query
2019/06/24 12:54:59 Path /
2019/06/24 12:55:01 stderr: 2019/06/24 12:54:59 tracing is disabled
2019/06/24 12:54:59 [Request `bk8cg87le8oq10sc51f0`] Received
2019/06/24 12:54:59 addEdge from foreach to call-f2 before excution is false
2019/06/24 12:54:59 addEdge from foreach to call-f2 after excution is false
2019/06/24 12:54:59 addEdge from call-f2 to dump before excution is false
2019/06/24 12:54:59 addEdge from call-f2 to dump after excution is false
2019/06/24 12:54:59 FUNCTION DEFINE=> fhandler.partial true false
2019/06/24 12:54:59 fhandler.partial true false
2019/06/24 12:54:59 execute before data:
2019/06/24 12:54:59 [Request `bk8cg87le8oq10sc51f0`] Intermidiate result from Node 0_1_foreach to Node 0_2_call-f2 for option 0 retrived from 0--0_1_foreach--0_2_call-f2
2019/06/24 12:54:59 [Request `bk8cg87le8oq10sc51f0`] Intermidiate result from Node 0_1_foreach to Node 0_2_call-f2 for option 1 retrived from 1--0_1_foreach--0_2_call-f2
2019/06/24 12:54:59 foreach-arggress: [,func1()
,func1()
]
2019/06/24 12:54:59 execute data: [,func1()
,func1()
]
2019/06/24 12:54:59 [Request `bk8cg87le8oq10sc51f0`] Executing node 0_2_call-f2
2019/06/24 12:54:59 [Request `bk8cg87le8oq10sc51f0`] Executing function `func2`
2019/06/24 12:55:01 [Request `bk8cg87le8oq10sc51f0`] Completed execution of Node 0_2_call-f2
2019/06/24 12:55:01 [Request `bk8cg87le8oq10sc51f0`] Intermidiate result from Node 0_2_call-f2 to 0_3_dump stored as 0_2_call-f2--0_3_dump
2019/06/24 12:55:01 [Request `bk8cg87le8oq10sc51f0`] Async request submitted for Node 0_3_dump
2019/06/24 12:55:01 Duration: 2.567324 seconds
2019/06/24 12:55:01 Forking fprocess.
2019/06/24 12:55:01 Query
2019/06/24 12:55:01 Path /
2019/06/24 12:55:02 stderr: 2019/06/24 12:55:01 tracing is disabled
2019/06/24 12:55:01 [Request `bk8cg87le8oq10sc51f0`] Received
2019/06/24 12:55:01 addEdge from foreach to call-f2 before excution is false
2019/06/24 12:55:01 addEdge from foreach to call-f2 after excution is false
2019/06/24 12:55:01 addEdge from call-f2 to dump before excution is false
2019/06/24 12:55:01 addEdge from call-f2 to dump after excution is false
2019/06/24 12:55:01 FUNCTION DEFINE=> fhandler.partial true false
2019/06/24 12:55:02 fhandler.partial true false
2019/06/24 12:55:02 execute before data:
2019/06/24 12:55:02 [Request `bk8cg87le8oq10sc51f0`] Intermidiate result from Node 0_2_call-f2 to Node 0_3_dump retrived from 0_2_call-f2--0_3_dump
2019/06/24 12:55:02 execute data: func2([,func1()
,func1()
])
2019/06/24 12:55:02 [Request `bk8cg87le8oq10sc51f0`] Executing node 0_3_dump
2019/06/24 12:55:02 [Request `bk8cg87le8oq10sc51f0`] Executing modifier
2019/06/24 12:55:02 dump: func2([,func1()
,func1()
])
2019/06/24 12:55:02 [Request `bk8cg87le8oq10sc51f0`] Completed execution of Node 0_3_dump
2019/06/24 12:55:02 [Request `bk8cg87le8oq10sc51f0`] Completed successfully
2019/06/24 12:55:02 Duration: 0.251855 seconds
func2([,func1()
,func1()
])
2019/06/24 12:59:22 Forking fprocess.
2019/06/24 12:59:22 Query export-dag=true
2019/06/24 12:59:22 Path /
2019/06/24 12:59:22 stderr: 2019/06/24 12:59:22 addEdge from foreach to call-f2 before excution is false
2019/06/24 12:59:22 addEdge from foreach to call-f2 after excution is false
2019/06/24 12:59:22 addEdge from call-f2 to dump before excution is false
2019/06/24 12:59:22 addEdge from call-f2 to dump after excution is false
2019/06/24 12:59:22 Duration: 0.006460 seconds
{
"id": "0",
"start-node": "foreach",
"end-node": "dump",
"has-branch": true,
"has-edge": true,
"exec-only-dag": false,
"nodes": {
"call-f2": {
"id": "call-f2",
"node-index": 2,
"is-dynamic": false,
"is-condition": false,
"is-foreach": false,
"has-aggregator": false,
"has-sub-aggregator": false,
"has-subdag": false,
"in-degree": 0,
"out-degree": 0,
"dynamic-exec-only": false,
"operations": [
{
"is-mod": false,
"is-function": true,
"is-callback": false,
"name": "func2",
"has-response-handler": false,
"has-failure-handler": false
}
],
"childrens": [
"dump"
],
"child-exec-only": {
"dump": false
}
},
"dump": {
"id": "dump",
"node-index": 3,
"is-dynamic": false,
"is-condition": false,
"is-foreach": false,
"has-aggregator": false,
"has-sub-aggregator": false,
"has-subdag": false,
"in-degree": 0,
"out-degree": 0,
"dynamic-exec-only": false,
"operations": [
{
"is-mod": true,
"is-function": false,
"is-callback": false,
"name": "",
"has-response-handler": false,
"has-failure-handler": false
}
],
"child-exec-only": {}
},
"foreach": {
"id": "foreach",
"node-index": 1,
"is-dynamic": true,
"is-condition": false,
"is-foreach": true,
"has-aggregator": false,
"has-sub-aggregator": true,
"has-subdag": false,
"in-degree": 0,
"out-degree": 0,
"foreach-dag": {
"id": "1",
"start-node": "call-f1",
"end-node": "call-f1",
"has-branch": false,
"has-edge": false,
"exec-only-dag": true,
"nodes": {
"call-f1": {
"id": "call-f1",
"node-index": 1,
"is-dynamic": false,
"is-condition": false,
"is-foreach": false,
"has-aggregator": false,
"has-sub-aggregator": false,
"has-subdag": false,
"in-degree": 0,
"out-degree": 0,
"dynamic-exec-only": false,
"operations": [
{
"is-mod": false,
"is-function": true,
"is-callback": false,
"name": "func1",
"has-response-handler": false,
"has-failure-handler": false
}
],
"child-exec-only": {}
}
},
"is-valid": false
},
"dynamic-exec-only": false,
"childrens": [
"call-f2"
],
"child-exec-only": {
"call-f2": false
}
}
},
"is-valid": true
}
foreach didn't pass data to call-1
from faas-flow.
When I track code I think
faas-flow/template/faas-flow/handler.go
Line 835 in 315a019
getDagIntermediateData
didn't not return right data to func1
from faas-flow.
after this commit faas-flow-tower can't generate flow graph any more
from faas-flow.
from faas-flow.
update faas-flow-tower latest code, graph show OK now.
from faas-flow.
@s8sg dag flow's node pass data by context.Set
and context.GetBytes
, so a add some stub before all these two functions.
conclusion:
- dag without branch(dynamic) run ok, pass data ok.
- dag with foreach branch(dynamic) didn't pass data to subdag
my test code upwards stub logs:
Execute foreach
[1] 2019/06/25 03:20:56 context.Set 0--0_1_foreach--1_1_call-f1 value aa err <nil>
[2] 2019/06/25 03:20:56 context.Set 1--0_1_foreach--1_1_call-f1 value bb err <nil>
Execute call-f1 func1
[3] 2019/06/25 03:20:58 context.Set 1--0_1_foreach--0_2_call-f2 value func1() err <nil>
Exexute call-f1 func1
[4] 2019/06/25 03:20:58 context.Set 0--0_1_foreach--0_2_call-f2 value func1() err <nil>
aggregator
faas-flow/template/faas-flow/handler.go
Line 844 in 315a019
[getDagIntermediate] current node id: call-f1 deps: []
call-f1 dependicies are empty. so func1 couldn't get intermediate data
from faas-flow.
Yeah, dependencies only include previous nodes in the same DAG
For initial node the deps: []
But as Its a BranchDag of a dynamic Node we need to get the data from parent dag
It seems current version doesn't implements It, I just left a comment and forgot 🤦♂
// XXX - handle data from parent Node - and multiple options of prev node
Here parent Node is basically for any parent node of the SubDAG
and
multiple options of prev node is basically the BrachDag Parent Node
This need to be carefully added as Subdag and BranchDag can be recursive
I'll add the code in a day or two
from faas-flow.
@chennqqi Please check with the above PR
from faas-flow.
can't wait now! haha
from faas-flow.
@s8sg foreach test pass
from faas-flow.
Okay, thank you for confirming. I'll merge the same
from faas-flow.
Related Issues (20)
- add CutDag(id string) and Reverse() sdk.Dag funcs HOT 6
- How do you think about dapr as data store?
- are you thinking of visual based programming? HOT 2
- Allow ability to create aggregators as a function
- Change documentation and configuration to Kubernets default
- Faas-flow Infra should deploy the whole Faas-flow stack HOT 1
- Keep the default flow configuration in the Dockerfile of Template HOT 1
- Make Consul and Minio as default StateStore and DataStore
- build failed HOT 2
- Failed to init flow HOT 1
- Multiple-node Dag's execution fails HOT 3
- Flow with more than one nodes HOT 7
- Flow execution time measurement
- Parallel execution speed up HOT 4
- faas build the demo case yml failed
- Function output pipe into next function HOT 2
- Cannot use context.Set in faas-flow pipeline. HOT 1
- Fass flow has an inncorrect flow name HOT 2
- Docker images problem HOT 3
- Is there some function examples? HOT 1
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from faas-flow.