Git Product home page Git Product logo

Comments (8)

s8sg avatar s8sg commented on June 6, 2024

Hi @chennqqi, Thank you for creating the issue. The example for the project are still shallow.

I have created an example at:
https://github.com/s8sg/branching-in-faas-flow.

This works fine, there is also a conditional dag example available as:
https://github.com/s8sg/travel-agent-saga

Although there is a bug in the new faas-flow-dashboard which makes the dot graph distorted for foreach condition.

from faas-flow.

s8sg avatar s8sg commented on June 6, 2024

The foreach example above should create the below flow:
Screenshot 2019-06-17 at 10 15 49 PM

from faas-flow.

chennqqi avatar chennqqi commented on June 6, 2024

Thank you for your enthusiasm and timely response!But i still confused about how to create 1:N flow

My Requirement like this:

//handler: functionA
function Handler(req []byte) {
	//some thing processing(req), then return data
	result := process(req)
	return result
}

//handler: functionB
function B(req []byte) {
	//some thing processing(req), then return data
	result := process2(req)
	return result
}

//flow: faasflow
function Define() {
		foreach := func(data []byte) [][]byte{
			//split functionA's output 
                       //cut functionA's output into piceces, each one as functionB's input
                       //then parallel run functionB with these input 
			return bytes.Split(data, '-')
		}
		aggregator := func(inputs map[string][]byte /* or [][]byte  */) {
                        wait all parallel functionB return and Aggregate all functionB's output
		}

  	flow.Apply("functionA").Foreach("functionB", foreach, aggregator).Modifier(func(data []byte){
  		//callback or store result
  	})
}

I hope my expression of this fake code is accurate. All english above was translated by google.

from faas-flow.

s8sg avatar s8sg commented on June 6, 2024

As per my understanding the desired patter is:

Call A()
For Each result of A:
      Call B()
Aggregate()
Store()

flow.Apply().Apply() this pattern is considered as a chain expression.
Currently in faas-flow chain expression is only suitable for DAG that doesn’t have a branch.
For branches you need to use DAG expression.
Which is basically create nodes and define edges between them.

The DAG defintion will be like below

maindag:
call-a -----------> foreach -----------> store
(modifier-node)    (foreachDag-node)     (modifier-node)

foreachDag:
call-b
(modifier-node)

First we need to define a DAG for the flow

// Create a new DAG 
maindag := faasflow.CreateDag()
// set the DAG in the flow
flow.ExecuteDag(maindag)

From your example it seems you are not calling the openfaas function but your local function. For local function execution we have Modifier Expression. You can call the local function from your modifier. For calling function A() you can create a node with modifier as:

// this creates a node named `call-a` with a modifier
// the modifier calls your local function `A()` and returns the result
maindag.AddModifier("call-a", func(data []byte) ([]byte, error) {
		data = A(data)
		return data, nil
})

For calling function B() using a foreach pattern first you need to add a foreach node and defined the brached DAG. AddForEachBranch() will create a node. The arguments are a foreach function that generates the splits as map[string][]byte and the Aggregator() that aggregates the results.

The call to AddForEachBranch returns the foreachDag that will be executed for all results returned from the foreach function. We need to add the modifier to call B() in the foreachDag

// define the foreach branching node
foreachDag := maindag.AddForEachBranch("foreach",
		// function that splits the result of A() 
		func(data []byte) map[string][]byte {
			splits := bytes.Split(data, '-')
                        result := make(map[string][]byte)
                        key := ""
                        // we create a map with all splits
                        for index, split := range splits {
                              key = strconv.Itoa(index)
                              result[key] = split
                        }
                        return result
		},
                //  function that aggregated the result of the branched DAG
		faasflow.Aggregator(func(results map[string][]byte) ([]byte, error) {
			// aggregate results of all B() execution
			return []byte(""), nil
		}),
)

// define a node to call B from foreachDag 
foreachDag.AddModifier("call-b", func(data []byte) ([]byte, error) {
		data = B(data)
		return data, nil
})

Now you can define another modifier to store the result, but remember to define it in the maindag

maindag.AddModifier("store", func(data []byte) ([]byte, error) {
		Store(data)
		return nil, nil
})

Then define the edges between the nodes you created

maindag.AddEdge("call-a", "foreach")
maindag.AddEdge("foreach", "store")

We don't need any edge for foreachDag as it has only one node

So the handler will look like:

//handler: functionA
func A(req []byte) {
	//some thing processing(req), then return data
	result := process(req)
	return result
}

//handler: functionB
func B(req []byte) {
	//some thing processing(req), then return data
	result := process2(req)
	return result
}

// handler: Store
func Store(data []byte) {
      // do something
}

// Define provide definiton of the workflow
func Define(flow *faasflow.Workflow, context *faasflow.Context) (err error) {
      maindag := faasflow.CreateDag()
      flow.ExecuteDag(maindag)
      maindag.AddModifier("call-a", func(data []byte) ([]byte, error) {
             data = A(data)
              return data, nil
     })
     foreachDag := maindag.AddForEachBranch("foreach",
		// function that splits the result of A() 
		func(data []byte) map[string][]byte {
			splits := bytes.Split(data, '-')
                        result := make(map[string][]byte)
                        key := ""
                        // we create a map with all splits
                        for index, split := range splits {
                              key = strconv.Itoa(index)
                              result[key] = split
                        }
                        return result
		},
                //  function that aggregated the result of the branched DAG
		faasflow.Aggregator(func(results map[string][]byte) ([]byte, error) {
			// aggregate results of all B() execution
			return []byte(""), nil
		}),
     )
     foreachDag.AddModifier("call-b", func(data []byte) ([]byte, error) {
		data = B(data)
		return data, nil
     })
     maindag.AddModifier("store", func(data []byte) ([]byte, error) {
		Store(data)
		return nil, nil
    })
    maindag.AddEdge("call-a", "foreach")
    maindag.AddEdge("foreach", "store")
}

As you passing data in between nodes you also need to consider defining the data store. You may use minio for that. Check https://github.com/s8sg/faas-flow-minio-datastore

from faas-flow.

chennqqi avatar chennqqi commented on June 6, 2024

An Modifier is a function(method) implemented by faas-flow lang
An handler is a function(faas) implemented by other faas function
So
I still can add an function(faas) into Define function

maindag.AddFunction("functionA", "run-a")
maindag.AddEdge("run-a", "call-a")

or call directly

maindag.AddEdge("run-a", "foreach")

from faas-flow.

s8sg avatar s8sg commented on June 6, 2024

I'm not sure what you meant by other faas flow, if functionA and functionB are openfaas function it will be:

// for calling functionA
maindag.AddFunction("call-a", "functionA") 
// ...
// for calling functionB
foreachDag.AddFunction("call-b", "functionB")

Everything else stays the same, so the Define() becomes:

// handler: Store
func Store(data []byte) {
      // do something
}

// Define provide definiton of the workflow
func Define(flow *faasflow.Workflow, context *faasflow.Context) (err error) {
      maindag := faasflow.CreateDag()
      flow.ExecuteDag(maindag)
      maindag.AddFunction("call-a", "functionA") 
      foreachDag := maindag.AddForEachBranch("foreach",
		// function that splits the result of A() 
		func(data []byte) map[string][]byte {
			splits := bytes.Split(data, '-')
                        result := make(map[string][]byte)
                        key := ""
                        // we create a map with all splits
                        for index, split := range splits {
                              key = strconv.Itoa(index)
                              result[key] = split
                        }
                        return result
		},
                //  function that aggregated the result of the branched DAG
		faasflow.Aggregator(func(results map[string][]byte) ([]byte, error) {
			// aggregate results of all B() execution
			return []byte(""), nil
		}),
     )
     foreachDag.AddFunction("call-b", "functionB")
     maindag.AddModifier("store", func(data []byte) ([]byte, error) {
		Store(data)
		return nil, nil
    })
    maindag.AddEdge("call-a", "foreach")
    maindag.AddEdge("foreach", "store")
}

from faas-flow.

chennqqi avatar chennqqi commented on June 6, 2024

other faas function not other faas flow, my mistake
I think i could finish my flow now.
Thank you for your help again!

from faas-flow.

s8sg avatar s8sg commented on June 6, 2024

Great. I'm closing the issue for now.

from faas-flow.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.