Git Product home page Git Product logo

Comments (24)

s8sg avatar s8sg commented on June 6, 2024

You should provide an external data store in order to execute any DAG that has more than one node, and you expect data to be forwarded.
https://github.com/s8sg/faas-flow/blob/master/template/faas-flow/handler.go#L1029

// If dags has more than one nodes
// and nodes forwards data, data store need to be external
if fhandler.getPipeline().Dag.HasEdge() &&
	!fhandler.getPipeline().Dag.IsExecutionFlow() {
	if !dataSOverride {
		panic(fmt.Sprintf("[Request `%s`] Failed not an execution flow, DAG data flow need external DataStore", fhandler.id))
	}
}

The expected behaviour should be a panic on the graph validation, please check the log

faasflow.Sync is only applicable for chain expression which allows operation to be chained in a single node. But for a dag it has no use. I agree its confusing and maybe having separate Options for Chain is reasonable.

If you using DAG the only way to put multiple operations in a single node is to use the same nodeId for all operations

foreachDag.AddFunction("nodeX", "func1")
foreachDag.AddFunction("nodeX", "func2")

But for a DAG that use branching (foreach, condition), its always has more than one Node.
And if you want to forward data you need to define datastore.
Example Minio:

minioDataStore "github.com/s8sg/faas-flow-minio-datastore"

func DefineDataStore() (faasflow.DataStore, error) {
       // initialize minio DataStore
       miniods, err := minioDataStore.InitFromEnv()
       if err != nil {
               return nil, err
       }

       return miniods, nil
}

Check https://github.com/s8sg/faas-flow-minio-datastore

from faas-flow.

chennqqi avatar chennqqi commented on June 6, 2024

I change some code. add minio data store.

package function

import (
	"bytes"
	"fmt"
	faasflow "github.com/s8sg/faas-flow"
	consulStateStore "github.com/s8sg/faas-flow-consul-statestore"
	minioDataStore "github.com/s8sg/faas-flow-minio-datastore"
	"log"
	"net/http"
	"os"
	"strings"
)

// Define provide definiton of the workflow
func Define(flow *faasflow.Workflow, context *faasflow.Context) (err error) {
	dag := faasflow.CreateDag()

	foreachDag := dag.AddForEachBranch("foreach", func(data []byte) map[string][]byte {
		http.Post("http://requestbin.fullcontact.com/1d0l7661", "text/plain", strings.NewReader("input:"+string(data)))
		values := bytes.Split(data, []byte("-"))
		rmap := make(map[string][]byte)
		for i := 0; i < len(values); i++ {
			rmap[fmt.Sprintf("%d", i)] = values[i]
		}
		//fmt.Println("foreach input:", string(data), rmap)
		return rmap
	},
		//faasflow.ExecutionBranch, // delete this if intermidiate data need data store
		faasflow.Aggregator(func(results map[string][]byte) ([]byte, error) {
			buf := bytes.NewBuffer([]byte("["))
			var start bool
			for _, v := range results {
				if start {
					start = true
				} else {
					buf.WriteByte(',')
				}
				buf.Write(v)
			}
			buf.WriteByte(']')
			//fmt.Println("foreach-arggress: ", buf.String())
			http.Post("http://requestbin.fullcontact.com/1d0l7661", "text/plain", strings.NewReader("foreach:"+buf.String()))
			return buf.Bytes(), nil
		}))
	foreachDag.AddFunction("call-f1", "func1")
	foreachDag.AddModifier("log-f1", func(data []byte) ([]byte, error) {
		log.Println("log-f1:", string(data))
		http.Post("http://requestbin.fullcontact.com/1d0l7661", "text/plain", strings.NewReader("log-f1:"+string(data)))
		return data, nil
	})
	foreachDag.AddEdge("call-f1", "log-f1")

	dag.AddFunction("call-f2", "func2")
	dag.AddEdge("foreach", "call-f2")

	dag.AddModifier("dump", func(data []byte) ([]byte, error) {
		log.Println("dump:", string(data))
		http.Post("http://requestbin.fullcontact.com/1d0l7661", "text/plain", strings.NewReader("dupm:"+string(data)))
		return data, nil
	})
	dag.AddEdge("call-f2", "dump")

	flow.ExecuteDag(dag)
	return
}

// DefineStateStore provides the override of the default StateStore
func DefineStateStore() (faasflow.StateStore, error) {
	consulss, err := consulStateStore.GetConsulStateStore(os.Getenv("consul_url"), os.Getenv("consul_dc"))
	if err != nil {
		return nil, err
	}
	return consulss, nil
}

// ProvideDataStore provides the override of the default DataStore
func DefineDataStore() (faasflow.DataStore, error) {
	// initialize minio DataStore
	miniods, err := minioDataStore.InitFromEnv()
	if err != nil {
		return nil, err
	}

	return miniods, nil
}

logs:
image

func1 and func2 still not run. no sync or async call to func1 and func2

from faas-flow.

chennqqi avatar chennqqi commented on June 6, 2024

Jiǎnhuà dàimǎ
4/5000
Simplified code, no branch, just dag

package function

import (
	//"bytes"
	//"fmt"
	faasflow "github.com/s8sg/faas-flow"
	consulStateStore "github.com/s8sg/faas-flow-consul-statestore"
	minioDataStore "github.com/s8sg/faas-flow-minio-datastore"
	"log"
	"net/http"
	"os"
	"strings"
)

// Define provide definiton of the workflow
func Define(flow *faasflow.Workflow, context *faasflow.Context) (err error) {
	dag := faasflow.CreateDag()

	dag.AddFunction("call-f1", "func1")
	dag.AddModifier("log-f1", func(data []byte) ([]byte, error) {
		log.Println("log-f1:", string(data))
		http.Post("http://requestbin.fullcontact.com/1d0l7661", "text/plain", strings.NewReader("log-f1:"+string(data)))
		return data, nil
	})
	dag.AddEdge("call-f1", "log-f1")
	dag.AddFunction("call-f2", "func2")
	dag.AddEdge("log-f1", "call-f2")
	dag.AddModifier("dump", func(data []byte) ([]byte, error) {
		log.Println("dump:", string(data))
		http.Post("http://requestbin.fullcontact.com/1d0l7661", "text/plain", strings.NewReader("dupm:"+string(data)))
		return data, nil
	})
	dag.AddEdge("call-f2", "dump")

	flow.ExecuteDag(dag)
	return
}

// DefineStateStore provides the override of the default StateStore
func DefineStateStore() (faasflow.StateStore, error) {
	consulss, err := consulStateStore.GetConsulStateStore(os.Getenv("consul_url"), os.Getenv("consul_dc"))
	if err != nil {
		return nil, err
	}
	return consulss, nil
}

// ProvideDataStore provides the override of the default DataStore
func DefineDataStore() (faasflow.DataStore, error) {
	// initialize minio DataStore
	miniods, err := minioDataStore.InitFromEnv()
	if err != nil {
		return nil, err
	}

	return miniods, nil
}

logs:

2019/06/21 08:16:26 Version: 0.8.0	SHA: 829262e493baf739fbd1c75d0ee5e853d15c7561
2019/06/21 08:16:26 Writing lock-file to: /tmp/.lock
2019/06/21 08:18:20 Forking fprocess.
2019/06/21 08:18:20 Query  
2019/06/21 08:18:20 Path  /
2019/06/21 08:18:24 stderr: 2019/06/21 08:18:20 tracing is disabled
2019/06/21 08:18:20 [Request `bk695j0994hrd77gckmg`] Created
2019/06/21 08:18:20 [Request `bk695j0994hrd77gckmg`] DAG state initiated at StateStore
2019/06/21 08:18:20 [Request `bk695j0994hrd77gckmg`] Executing node 0_1_call-f1
2019/06/21 08:18:20 [Request `bk695j0994hrd77gckmg`] Executing function `func1`
2019/06/21 08:18:23 [Request `bk695j0994hrd77gckmg`] Completed execution of Node 0_1_call-f1
2019/06/21 08:18:24 [Request `bk695j0994hrd77gckmg`] Intermidiate result from Node 0_1_call-f1 to 0_2_log-f1 stored as 0_1_call-f1--0_2_log-f1
2019/06/21 08:18:24 [Request `bk695j0994hrd77gckmg`] Async request submitted for Node 0_2_log-f1
2019/06/21 08:18:24 Duration: 4.035092 seconds

2019/06/21 08:22:35 Forking fprocess.
2019/06/21 08:22:35 Query  export-dag=true
2019/06/21 08:22:35 Path  /
2019/06/21 08:22:35 Duration: 0.004253 seconds
{
    "id": "0",
    "start-node": "call-f1",
    "end-node": "dump",
    "has-branch": false,
    "has-edge": true,
    "is-execution-dag": true,
    "nodes": {
        "call-f1": {
            "id": "call-f1",
            "node-index": 1,
            "is-dynamic": false,
            "is-condition": false,
            "is-foreach": false,
            "has-aggregator": false,
            "has-sub-aggregator": false,
            "has-subdag": false,
            "in-degree": 0,
            "out-degree": 0,
            "operations": [
                {
                    "is-mod": false,
                    "is-function": true,
                    "is-callback": false,
                    "name": "func1",
                    "has-response-handler": false,
                    "has-failure-handler": false
                }
            ],
            "childrens": [
                "log-f1"
            ]
        },
        "call-f2": {
            "id": "call-f2",
            "node-index": 3,
            "is-dynamic": false,
            "is-condition": false,
            "is-foreach": false,
            "has-aggregator": false,
            "has-sub-aggregator": false,
            "has-subdag": false,
            "in-degree": 0,
            "out-degree": 0,
            "operations": [
                {
                    "is-mod": false,
                    "is-function": true,
                    "is-callback": false,
                    "name": "func2",
                    "has-response-handler": false,
                    "has-failure-handler": false
                }
            ],
            "childrens": [
                "dump"
            ]
        },
        "dump": {
            "id": "dump",
            "node-index": 4,
            "is-dynamic": false,
            "is-condition": false,
            "is-foreach": false,
            "has-aggregator": false,
            "has-sub-aggregator": false,
            "has-subdag": false,
            "in-degree": 0,
            "out-degree": 0,
            "operations": [
                {
                    "is-mod": true,
                    "is-function": false,
                    "is-callback": false,
                    "name": "",
                    "has-response-handler": false,
                    "has-failure-handler": false
                }
            ]
        },
        "log-f1": {
            "id": "log-f1",
            "node-index": 2,
            "is-dynamic": false,
            "is-condition": false,
            "is-foreach": false,
            "has-aggregator": false,
            "has-sub-aggregator": false,
            "has-subdag": false,
            "in-degree": 0,
            "out-degree": 0,
            "operations": [
                {
                    "is-mod": true,
                    "is-function": false,
                    "is-callback": false,
                    "name": "",
                    "has-response-handler": false,
                    "has-failure-handler": false
                }
            ],
            "childrens": [
                "call-f2"
            ]
        }
    },
    "is-valid": true
}
2019/06/21 08:22:35 Forking fprocess.
2019/06/21 08:22:35 Query  export-dag=true
2019/06/21 08:22:35 Path  /
2019/06/21 08:22:35 Duration: 0.004995 seconds
{
    "id": "0",
    "start-node": "call-f1",
    "end-node": "dump",
    "has-branch": false,
    "has-edge": true,
    "is-execution-dag": true,
    "nodes": {
        "call-f1": {
            "id": "call-f1",
            "node-index": 1,
            "is-dynamic": false,
            "is-condition": false,
            "is-foreach": false,
            "has-aggregator": false,
            "has-sub-aggregator": false,
            "has-subdag": false,
            "in-degree": 0,
            "out-degree": 0,
            "operations": [
                {
                    "is-mod": false,
                    "is-function": true,
                    "is-callback": false,
                    "name": "func1",
                    "has-response-handler": false,
                    "has-failure-handler": false
                }
            ],
            "childrens": [
                "log-f1"
            ]
        },
        "call-f2": {
            "id": "call-f2",
            "node-index": 3,
            "is-dynamic": false,
            "is-condition": false,
            "is-foreach": false,
            "has-aggregator": false,
            "has-sub-aggregator": false,
            "has-subdag": false,
            "in-degree": 0,
            "out-degree": 0,
            "operations": [
                {
                    "is-mod": false,
                    "is-function": true,
                    "is-callback": false,
                    "name": "func2",
                    "has-response-handler": false,
                    "has-failure-handler": false
                }
            ],
            "childrens": [
                "dump"
            ]
        },
        "dump": {
            "id": "dump",
            "node-index": 4,
            "is-dynamic": false,
            "is-condition": false,
            "is-foreach": false,
            "has-aggregator": false,
            "has-sub-aggregator": false,
            "has-subdag": false,
            "in-degree": 0,
            "out-degree": 0,
            "operations": [
                {
                    "is-mod": true,
                    "is-function": false,
                    "is-callback": false,
                    "name": "",
                    "has-response-handler": false,
                    "has-failure-handler": false
                }
            ]
        },
        "log-f1": {
            "id": "log-f1",
            "node-index": 2,
            "is-dynamic": false,
            "is-condition": false,
            "is-foreach": false,
            "has-aggregator": false,
            "has-sub-aggregator": false,
            "has-subdag": false,
            "in-degree": 0,
            "out-degree": 0,
            "operations": [
                {
                    "is-mod": true,
                    "is-function": false,
                    "is-callback": false,
                    "name": "",
                    "has-response-handler": false,
                    "has-failure-handler": false
                }
            ],
            "childrens": [
                "call-f2"
            ]
        }
    },
    "is-valid": true
}

from faas-flow.

s8sg avatar s8sg commented on June 6, 2024

Can you see is there any other POD that serving the request Node 0_2_log-f1
Maybe search by label
It doesn't seems like a Minio Problem

from faas-flow.

s8sg avatar s8sg commented on June 6, 2024

Also check the nats-streaming-server log.

from faas-flow.

chennqqi avatar chennqqi commented on June 6, 2024

there's no node name or label like Node 0_2_log-f1.
i checked minio work well. now try to check nats

from faas-flow.

chennqqi avatar chennqqi commented on June 6, 2024

@s8sg finally i found why flow call can't trigger next function(flow node).
In stack.yaml( or function configure file)
flow function name must equal to environment.workflow_name

here is only other problem data not pass to next flow-node

more simplified code

                dag.AddFunction("call-f1", "func1")
                dag.AddFunction("call-f2", "func2")
                dag.AddEdge("call-f1", "call-f2")
                dag.AddModifier("dump", func(data []byte) ([]byte, error) {
                        log.Println("dump:", string(data))
                        http.Post("http://requestbin.fullcontact.com/1c399km1", "text/plain", strings.NewReader("dump:" + string(data)))
                        return data, nil
                })
                dag.AddEdge("call-f2", "dump")

pipeline data be set to empty after func1 run

from faas-flow.

chennqqi avatar chennqqi commented on June 6, 2024

problem was here

if fhandler.partial &&

		if fhandler.partial &&
			!fhandler.getPipeline().Dag.IsExecutionFlow() {

			data, gerr = getDagIntermediateData(fhandler, context)
			if gerr != nil {
				gerr := fmt.Errorf("failed to retrive intermediate result, error %v", gerr)
				handleFailure(fhandler, context, gerr)
			}
		}

before excute a modifier, should getDagIntermediateData
fhandler.getPipeline().Dag.IsExecutionFlow() returned true
skip getDagIntermediateData
so modifier request data is empty.

from faas-flow.

s8sg avatar s8sg commented on June 6, 2024

The problem is not the check itself, the check is there to determine if the dag forwards data or not. But it should have returned false in your definition. I’ll check the code and let you know.

from faas-flow.

chennqqi avatar chennqqi commented on June 6, 2024

OK, Thank you.

from faas-flow.

s8sg avatar s8sg commented on June 6, 2024

Fixed in https://github.com/s8sg/faas-flow/tree/315a019bf9645c41fae9a68b95f56cccb1077885

from faas-flow.

s8sg avatar s8sg commented on June 6, 2024

@chennqqi Can you please check the same with latest template

from faas-flow.

chennqqi avatar chennqqi commented on June 6, 2024

I'm checking when you commit.

dag simple call test ok.
I'm checking branch call now

from faas-flow.

chennqqi avatar chennqqi commented on June 6, 2024

test code:

	dag := faasflow.CreateDag()

	foreachDag := dag.AddForEachBranch("foreach", func(data []byte) map[string][]byte {
		http.Post("http://q1.ipaddr.top:5888", "text/plain", strings.NewReader("input:"+string(data)))
		values := bytes.Split(data, []byte("-"))
		rmap := make(map[string][]byte)
		for i := 0; i < len(values); i++ {
			rmap[fmt.Sprintf("%d", i)] = values[i]
		}
		log.Println("foreach input:", string(data), rmap)
		return rmap
	},
		//	faasflow.ExecutionBranch, // delete this if intermidiate data need data store
		faasflow.Aggregator(func(results map[string][]byte) ([]byte, error) {
			buf := bytes.NewBuffer([]byte("["))
			var start bool
			for _, v := range results {
				if !start {
					start = true
				} else {
					buf.WriteByte(',')
				}
				buf.Write(v)
			}
			buf.WriteByte(']')
			log.Println("foreach-arggress: ", buf.String())
			http.Post("http://q1.ipaddr.top:5888", "text/plain", strings.NewReader("foreach:"+buf.String()))
			return buf.Bytes(), nil
		}))
	foreachDag.AddFunction("call-f1", "func1")
	if false {
		foreachDag.AddModifier("log-f1", func(data []byte) ([]byte, error) {
			log.Println("log-f1:", string(data))
			http.Post("http://q1.ipaddr.top:5888", "text/plain", strings.NewReader("log-f1:"+string(data)))
			return data, nil
		})
		foreachDag.AddEdge("call-f1", "log-f1")
	}

	dag.AddFunction("call-f2", "func2")
	dag.AddEdge("foreach", "call-f2")

	dag.AddModifier("dump", func(data []byte) ([]byte, error) {
		log.Println("dump:", string(data))
		http.Post("http://q1.ipaddr.top:5888", "text/plain", strings.NewReader("dupm:"+string(data)))
		return data, nil
	})
	dag.AddEdge("call-f2", "dump")

	flow.ExecuteDag(dag)
	return

input: aa-bb
expect: func2[func1(aa),func2(aa)]
actual: func2[func1(),func1()]
logs:

2019/06/24 12:54:16 Version: 0.8.0	SHA: 829262e493baf739fbd1c75d0ee5e853d15c7561
2019/06/24 12:54:16 Writing lock-file to: /tmp/.lock
2019/06/24 12:54:55 Forking fprocess.
2019/06/24 12:54:55 Query  
2019/06/24 12:54:55 Path  /
2019/06/24 12:54:57 stderr: 2019/06/24 12:54:56 tracing is disabled
2019/06/24 12:54:56 [Request `bk8cg87le8oq10sc51f0`] Created
2019/06/24 12:54:56 addEdge from foreach to call-f2 before excution is false
2019/06/24 12:54:56 addEdge from foreach to call-f2  after excution is false
2019/06/24 12:54:56 addEdge from call-f2 to dump before excution is false
2019/06/24 12:54:56 addEdge from call-f2 to dump  after excution is false
2019/06/24 12:54:56 FUNCTION DEFINE=> fhandler.partial false false
2019/06/24 12:54:57 [Request `bk8cg87le8oq10sc51f0`] DAG state initiated at StateStore
2019/06/24 12:54:57 fhandler.partial false false
2019/06/24 12:54:57 execute before data: aa-bb
2019/06/24 12:54:57 execute data: aa-bb
2019/06/24 12:54:57 [Request `bk8cg87le8oq10sc51f0`] Executing node 0_1_foreach
2019/06/24 12:54:57 [Request `bk8cg87le8oq10sc51f0`] Processing dynamic node 0_1_foreach
2019/06/24 12:54:57 foreach input: aa-bb map[0:[97 97] 1:[98 98]]
2019/06/24 12:54:57 [Request `bk8cg87le8oq10sc51f0`] Intermidiate result for dynamic id 0 from Node 0_1_foreach to 1_1_call-f1 stored as 0--0_1_foreach--1_1_call-f1
2019/06/24 12:54:57 [Request `bk8cg87le8oq10sc51f0`] Async request submitted for Node 0_1_foreach dynamic id 0
2019/06/24 12:54:57 [Request `bk8cg87le8oq10sc51f0`] Intermidiate result for dynamic id 1 from Node 0_1_foreach to 1_1_call-f1 stored as 1--0_1_foreach--1_1_call-f1
2019/06/24 12:54:57 [Request `bk8cg87le8oq10sc51f0`] Async request submitted for Node 0_1_foreach dynamic id 1
2019/06/24 12:54:57 Duration: 1.326729 seconds

2019/06/24 12:54:57 Forking fprocess.
2019/06/24 12:54:57 Query  
2019/06/24 12:54:57 Path  /
2019/06/24 12:54:57 Forking fprocess.
2019/06/24 12:54:57 Query  
2019/06/24 12:54:57 Path  /
2019/06/24 12:54:58 stderr: 2019/06/24 12:54:57 tracing is disabled
2019/06/24 12:54:57 [Request `bk8cg87le8oq10sc51f0`] Received
2019/06/24 12:54:57 addEdge from foreach to call-f2 before excution is false
2019/06/24 12:54:57 addEdge from foreach to call-f2  after excution is false
2019/06/24 12:54:57 addEdge from call-f2 to dump before excution is false
2019/06/24 12:54:57 addEdge from call-f2 to dump  after excution is false
2019/06/24 12:54:57 FUNCTION DEFINE=> fhandler.partial true false
2019/06/24 12:54:57 fhandler.partial true false
2019/06/24 12:54:57 execute before data: 
2019/06/24 12:54:57 execute data: 
2019/06/24 12:54:57 [Request `bk8cg87le8oq10sc51f0`] Executing node 1_1_call-f1
2019/06/24 12:54:57 [Request `bk8cg87le8oq10sc51f0`] Executing function `func1`
2019/06/24 12:54:58 [Request `bk8cg87le8oq10sc51f0`] Completed execution of Node 1_1_call-f1
2019/06/24 12:54:58 [Request `bk8cg87le8oq10sc51f0`] Executing parent dag Node 0_1_foreach
2019/06/24 12:54:58 [Request `bk8cg87le8oq10sc51f0`] Intermidiate result from Node 0_1_foreach to 0_2_call-f2 stored as 0--0_1_foreach--0_2_call-f2
2019/06/24 12:54:58 [Request `bk8cg87le8oq10sc51f0`] request for Node 0_2_call-f2 is delayed, completed indegree: 1/2
2019/06/24 12:54:58 Duration: 1.322632 seconds

2019/06/24 12:54:59 stderr: 2019/06/24 12:54:57 tracing is disabled
2019/06/24 12:54:57 [Request `bk8cg87le8oq10sc51f0`] Received
2019/06/24 12:54:57 addEdge from foreach to call-f2 before excution is false
2019/06/24 12:54:57 addEdge from foreach to call-f2  after excution is false
2019/06/24 12:54:57 addEdge from call-f2 to dump before excution is false
2019/06/24 12:54:57 addEdge from call-f2 to dump  after excution is false
2019/06/24 12:54:57 FUNCTION DEFINE=> fhandler.partial true false
2019/06/24 12:54:57 fhandler.partial true false
2019/06/24 12:54:57 execute before data: 
2019/06/24 12:54:57 execute data: 
2019/06/24 12:54:57 [Request `bk8cg87le8oq10sc51f0`] Executing node 1_1_call-f1
2019/06/24 12:54:57 [Request `bk8cg87le8oq10sc51f0`] Executing function `func1`
2019/06/24 12:54:58 [Request `bk8cg87le8oq10sc51f0`] Completed execution of Node 1_1_call-f1
2019/06/24 12:54:58 [Request `bk8cg87le8oq10sc51f0`] Executing parent dag Node 0_1_foreach
2019/06/24 12:54:59 [Request `bk8cg87le8oq10sc51f0`] Intermidiate result from Node 0_1_foreach to 0_2_call-f2 stored as 1--0_1_foreach--0_2_call-f2
2019/06/24 12:54:59 [Request `bk8cg87le8oq10sc51f0`] Async request submitted for Node 0_2_call-f2
2019/06/24 12:54:59 Duration: 1.328646 seconds

2019/06/24 12:54:59 Forking fprocess.
2019/06/24 12:54:59 Query  
2019/06/24 12:54:59 Path  /

2019/06/24 12:55:01 stderr: 2019/06/24 12:54:59 tracing is disabled
2019/06/24 12:54:59 [Request `bk8cg87le8oq10sc51f0`] Received
2019/06/24 12:54:59 addEdge from foreach to call-f2 before excution is false
2019/06/24 12:54:59 addEdge from foreach to call-f2  after excution is false
2019/06/24 12:54:59 addEdge from call-f2 to dump before excution is false
2019/06/24 12:54:59 addEdge from call-f2 to dump  after excution is false
2019/06/24 12:54:59 FUNCTION DEFINE=> fhandler.partial true false
2019/06/24 12:54:59 fhandler.partial true false
2019/06/24 12:54:59 execute before data: 
2019/06/24 12:54:59 [Request `bk8cg87le8oq10sc51f0`] Intermidiate result from Node 0_1_foreach to Node 0_2_call-f2 for option 0 retrived from 0--0_1_foreach--0_2_call-f2
2019/06/24 12:54:59 [Request `bk8cg87le8oq10sc51f0`] Intermidiate result from Node 0_1_foreach to Node 0_2_call-f2 for option 1 retrived from 1--0_1_foreach--0_2_call-f2
2019/06/24 12:54:59 foreach-arggress:  [,func1()
,func1()
]
2019/06/24 12:54:59 execute data: [,func1()
,func1()
]
2019/06/24 12:54:59 [Request `bk8cg87le8oq10sc51f0`] Executing node 0_2_call-f2
2019/06/24 12:54:59 [Request `bk8cg87le8oq10sc51f0`] Executing function `func2`
2019/06/24 12:55:01 [Request `bk8cg87le8oq10sc51f0`] Completed execution of Node 0_2_call-f2
2019/06/24 12:55:01 [Request `bk8cg87le8oq10sc51f0`] Intermidiate result from Node 0_2_call-f2 to 0_3_dump stored as 0_2_call-f2--0_3_dump
2019/06/24 12:55:01 [Request `bk8cg87le8oq10sc51f0`] Async request submitted for Node 0_3_dump
2019/06/24 12:55:01 Duration: 2.567324 seconds
2019/06/24 12:55:01 Forking fprocess.
2019/06/24 12:55:01 Query  
2019/06/24 12:55:01 Path  /
2019/06/24 12:55:02 stderr: 2019/06/24 12:55:01 tracing is disabled
2019/06/24 12:55:01 [Request `bk8cg87le8oq10sc51f0`] Received
2019/06/24 12:55:01 addEdge from foreach to call-f2 before excution is false
2019/06/24 12:55:01 addEdge from foreach to call-f2  after excution is false
2019/06/24 12:55:01 addEdge from call-f2 to dump before excution is false
2019/06/24 12:55:01 addEdge from call-f2 to dump  after excution is false
2019/06/24 12:55:01 FUNCTION DEFINE=> fhandler.partial true false
2019/06/24 12:55:02 fhandler.partial true false
2019/06/24 12:55:02 execute before data: 
2019/06/24 12:55:02 [Request `bk8cg87le8oq10sc51f0`] Intermidiate result from Node 0_2_call-f2 to Node 0_3_dump retrived from 0_2_call-f2--0_3_dump
2019/06/24 12:55:02 execute data: func2([,func1()
,func1()
])

2019/06/24 12:55:02 [Request `bk8cg87le8oq10sc51f0`] Executing node 0_3_dump
2019/06/24 12:55:02 [Request `bk8cg87le8oq10sc51f0`] Executing modifier
2019/06/24 12:55:02 dump: func2([,func1()
,func1()
])

2019/06/24 12:55:02 [Request `bk8cg87le8oq10sc51f0`] Completed execution of Node 0_3_dump
2019/06/24 12:55:02 [Request `bk8cg87le8oq10sc51f0`] Completed successfully
2019/06/24 12:55:02 Duration: 0.251855 seconds
func2([,func1()
,func1()
])

2019/06/24 12:59:22 Forking fprocess.
2019/06/24 12:59:22 Query  export-dag=true
2019/06/24 12:59:22 Path  /
2019/06/24 12:59:22 stderr: 2019/06/24 12:59:22 addEdge from foreach to call-f2 before excution is false
2019/06/24 12:59:22 addEdge from foreach to call-f2  after excution is false
2019/06/24 12:59:22 addEdge from call-f2 to dump before excution is false
2019/06/24 12:59:22 addEdge from call-f2 to dump  after excution is false
2019/06/24 12:59:22 Duration: 0.006460 seconds
{
    "id": "0",
    "start-node": "foreach",
    "end-node": "dump",
    "has-branch": true,
    "has-edge": true,
    "exec-only-dag": false,
    "nodes": {
        "call-f2": {
            "id": "call-f2",
            "node-index": 2,
            "is-dynamic": false,
            "is-condition": false,
            "is-foreach": false,
            "has-aggregator": false,
            "has-sub-aggregator": false,
            "has-subdag": false,
            "in-degree": 0,
            "out-degree": 0,
            "dynamic-exec-only": false,
            "operations": [
                {
                    "is-mod": false,
                    "is-function": true,
                    "is-callback": false,
                    "name": "func2",
                    "has-response-handler": false,
                    "has-failure-handler": false
                }
            ],
            "childrens": [
                "dump"
            ],
            "child-exec-only": {
                "dump": false
            }
        },
        "dump": {
            "id": "dump",
            "node-index": 3,
            "is-dynamic": false,
            "is-condition": false,
            "is-foreach": false,
            "has-aggregator": false,
            "has-sub-aggregator": false,
            "has-subdag": false,
            "in-degree": 0,
            "out-degree": 0,
            "dynamic-exec-only": false,
            "operations": [
                {
                    "is-mod": true,
                    "is-function": false,
                    "is-callback": false,
                    "name": "",
                    "has-response-handler": false,
                    "has-failure-handler": false
                }
            ],
            "child-exec-only": {}
        },
        "foreach": {
            "id": "foreach",
            "node-index": 1,
            "is-dynamic": true,
            "is-condition": false,
            "is-foreach": true,
            "has-aggregator": false,
            "has-sub-aggregator": true,
            "has-subdag": false,
            "in-degree": 0,
            "out-degree": 0,
            "foreach-dag": {
                "id": "1",
                "start-node": "call-f1",
                "end-node": "call-f1",
                "has-branch": false,
                "has-edge": false,
                "exec-only-dag": true,
                "nodes": {
                    "call-f1": {
                        "id": "call-f1",
                        "node-index": 1,
                        "is-dynamic": false,
                        "is-condition": false,
                        "is-foreach": false,
                        "has-aggregator": false,
                        "has-sub-aggregator": false,
                        "has-subdag": false,
                        "in-degree": 0,
                        "out-degree": 0,
                        "dynamic-exec-only": false,
                        "operations": [
                            {
                                "is-mod": false,
                                "is-function": true,
                                "is-callback": false,
                                "name": "func1",
                                "has-response-handler": false,
                                "has-failure-handler": false
                            }
                        ],
                        "child-exec-only": {}
                    }
                },
                "is-valid": false
            },
            "dynamic-exec-only": false,
            "childrens": [
                "call-f2"
            ],
            "child-exec-only": {
                "call-f2": false
            }
        }
    },
    "is-valid": true
}

foreach didn't pass data to call-1

from faas-flow.

chennqqi avatar chennqqi commented on June 6, 2024

When I track code I think

func getDagIntermediateData(handler *flowHandler, context *faasflow.Context) ([]byte, error) {

getDagIntermediateData didn't not return right data to func1

from faas-flow.

chennqqi avatar chennqqi commented on June 6, 2024

after this commit faas-flow-tower can't generate flow graph any more

from faas-flow.

s8sg avatar s8sg commented on June 6, 2024

from faas-flow.

chennqqi avatar chennqqi commented on June 6, 2024

update faas-flow-tower latest code, graph show OK now.

from faas-flow.

chennqqi avatar chennqqi commented on June 6, 2024

@s8sg dag flow's node pass data by context.Set and context.GetBytes, so a add some stub before all these two functions.

conclusion:

  1. dag without branch(dynamic) run ok, pass data ok.
  2. dag with foreach branch(dynamic) didn't pass data to subdag

my test code upwards stub logs:

 Execute foreach
[1] 2019/06/25 03:20:56 context.Set 0--0_1_foreach--1_1_call-f1 value aa err <nil>
[2] 2019/06/25 03:20:56 context.Set 1--0_1_foreach--1_1_call-f1 value bb err <nil>
 Execute call-f1 func1
[3] 2019/06/25 03:20:58 context.Set 1--0_1_foreach--0_2_call-f2 value func1() err <nil>
 Exexute call-f1 func1
[4] 2019/06/25 03:20:58 context.Set 0--0_1_foreach--0_2_call-f2 value func1() err <nil>
 aggregator

dependencies := currentNode.Dependency()

[getDagIntermediate] current node id: call-f1 deps: []

call-f1 dependicies are empty. so func1 couldn't get intermediate data

from faas-flow.

s8sg avatar s8sg commented on June 6, 2024

Yeah, dependencies only include previous nodes in the same DAG
For initial node the deps: []
But as Its a BranchDag of a dynamic Node we need to get the data from parent dag

It seems current version doesn't implements It, I just left a comment and forgot 🤦‍♂

// XXX - handle data from parent Node - and multiple options of prev node

Here parent Node is basically for any parent node of the SubDAG
and
multiple options of prev node is basically the BrachDag Parent Node

This need to be carefully added as Subdag and BranchDag can be recursive

I'll add the code in a day or two

from faas-flow.

s8sg avatar s8sg commented on June 6, 2024

@chennqqi Please check with the above PR

from faas-flow.

chennqqi avatar chennqqi commented on June 6, 2024

can't wait now! haha

from faas-flow.

chennqqi avatar chennqqi commented on June 6, 2024

@s8sg foreach test pass
image

from faas-flow.

s8sg avatar s8sg commented on June 6, 2024

Okay, thank you for confirming. I'll merge the same

from faas-flow.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.