Git Product home page Git Product logo

faas-flow's Issues

Implement StateStore on 3rd party KV Store - consul, etcd

Execution state currently being forwarded as a data for the next phase. This will work well until the pipeline is less complicated than a chain of function. For a more complicated example such as dag, we need to have a secondary KV store to manage the execution state. A ExecutionState manager can be ZooKeeper or Consul

ditch async call, adapt only Dags

Async Call and DAG with multiple edges provide similar functionalities
A async chain with four async call will have five nodes, the same can be achieved with creating the nodes separately

Currently operation-chain and dag are managed separately. Although dag allows to chain multiple operation by adding it to same node. We will adapt the chaining approach for DAG as well

flow.CreateNode().Apply().Apply()

make faas-flow execution logic runtime agnostic by providing as a library

i want to use this package in different projects, that needs saga like patter to control microservices.
I think that this is suitable for such kind of stuff. I can register rpc services and create workflow with dag.
But i need also custom executor what via provided workflow can do needed actions (in my case execute different microservices, get it results)

Add timeout option for operations

When performing a Operation, user can mention a timeout. This timeout will be applied for each operation types. If timeout happen, operation will be killed and execution will be failed as if the operation has failed. If operation retry is configured, it will be retried.

Allow to consider Sub-Dag based on Select Condition (Choices)

A branching logic that allows to consider a SUB-Dag when a certain condition has met

One of the way possible could be the edge Options:

AddEdge(from, to string, Choice(caseString, choiceFunc))

choiceFunc takes a []byte (the result of the node) and returns a choice string, which determines the branch that will be executed. A selection among multiple branch could be done by providing same choice function to multiple edge

func myChoiceFunc(data []byte) )(string, error) {
        // do something 
        if (data == "something") {
             return "choice1", nil
        }
        return "choice2", nil
} 

// ....
// at edge definition
AddEdge("node1", "node2", Choice("choice1", myChoiceFunc);
AddEdge("node1", "node3", Choice("choice2", myChoiceFunc);

Handle Operation start and End Event

Currently faas-flow template only handles the Node start and stop event. SDK emits the Operation start and end event, although not being handled for monitoring.

go.mod

thanks for you many pr's, can you remove vendor and add go.mod ?
as of go 1.11 it supported and very useful. also repo does not needs to be have external deps.

Use global persistence connection to enhance performance

All call to function and for next node execution are submitted via Gateway. In the current code

  • A http client was being created for each incoming message in the handler. this is a overkill, cause for every new request there will be a Handshake.
  • The http client created was using default transport. default http client does have a keep-alive setting.

We can reuse a connection instead of creating new one. We can do so by creating a global client as:

var client *http.Client

func init() {
    tr := &http.Transport{
           MaxIdleConnsPerHost: 1024,
     }
   client = &http.Client{Transport: tr }
}

When we are using common client, we also need to make sure that we

  • Read until Response is complete (i.e. ioutil.ReadAll(rep.Body))
  • Call Body.Close()

Set faas-flow unique request id on the header of the initial response

Currently Request ID is being generated, although the only way to know the request ID is:

  • Via looking at logs
  • Via looking at request traces

We will set a response Header as

X-Faas-Flow-ReqId : <request_id>

For Async request with callback url, we will set the header on callback request:

X-Faas-Flow-ReqId : <request_id>

close method for store

my toy store needs to be closed at the end of work, does i need to put close to Cleanup method or we need something new for closing store?

If StateManager is provided, allow to store intermediate result in StateManager

Currently intermediate data always gets forwarded to flow function as an input. But for async forward due to nats limit a large data might not get forwarded. Alternatively the data can be saved and retrieved in the next phase from the StateManager. When a StateManager specified, a optional flag will be provided to store the intermediate data via the StateManager

Environment:
         intermediate-storage: true

Provide a way to download dotted graph form a deployed Faas-Flow

Dotted graph can provide better visualisation of the overall workflow
Faas-Flow currently provide library to create a dotted graph for either a chain/dag.

The same can be used to provide a way to only download a dotted graph from the faasflow function itself

One option could be adding query param to the request

--query generate-dot-graph=true

Provide More Information in Dotted Graph

Dotted Graph is a quick way to debug a pipeline
Below are the update

Color Code (Green Start-Nodes, Orange End-Nodes, Lighter Blue Nodes and Grey Edges etc)
Edge Attributes (Edge Properties, Intermediate-Result-Key etc)
Node Attributes (Function, Modifier, Callback details, timeout etc)

Add option to modify Request before its executed for function and callback

Currently func OnReponse(handler sdk.RespHandler) Option use sdk.RespHandler which is defined as:

type RespHandler func(*http.Response) ([]byte, error)

It is useful to handle a HTTP response to a function manually and extract the response in proper format.
https://github.com/s8sg/faas-flow/blob/master/template/faas-flow/handler.go#L326

       if operation.OnResphandler != nil {
		result, err = operation.OnResphandler(resp)
	} else {
		if resp.StatusCode < 200 || resp.StatusCode > 299 {
			result, _ = ioutil.ReadAll(resp.Body)
			return result, err
		}

		result, err = ioutil.ReadAll(resp.Body)
		if err != nil {
			return result, err
		}
	}

Similarly we need sdk.RequestHandler which will be able to update a Request for function execution.

type ReqHandler func(*http.Request)

Wrapping up in Option as:
func Request(handler sdk.RequestHandler) Option

The RequestHandler should execute before the request executes we can pass the request which is being build as.
https://github.com/s8sg/faas-flow/blob/master/template/faas-flow/handler.go#L318

httpreq := buildFunctionRequest(name, data, params, headers)

Create a white-paper based on the core fundaments of faas-flow

Faas-flow is based on some core design fundaments that allows the faas-flow to work as a openfaas function without violating the core principle of server-less model

The few key points of the design are

  1. Recursive interface pattern
  2. Workflow as a function
  3. No external executor
  4. External Storage for maintaining Intermediate Data and State

As faas-flow is growing its feature, its important to document the core design principle.

Change StateStore Interface to more granular

Current state store interface is not flexible, which makes it hard to fit new usecases
We will change the interface into more granular so that we can use the same to support more operation

The new state store definition would be

// StateStore for saving execution state
type StateStore interface {
	// Configure the StateStore with flow name and request ID
	Configure(flowName string, requestId string)
	// Initialize the StateStore (called only once in a request span)
	Init() error
	// Set a value (override existing, or create one)
	Set(key string, value string) error
	// Get a value
	Get(key string) (string, error)
	// Compare and Update a value
	Update(key string, oldValue string, newValue string) error
	// Cleanup all the resorces in StateStore (called only once in a request span)
	Cleanup() error
}

Function call should be made via gateway

Current faas-flow makes the call directly to the flow function for function operation,

http://<function-name>:8080

but the call should be made via the gateway as

http://<gateway-url>/function/<function-name>

Delay call to statemanager when forwarding request for next phase

Currently DataStore implementation will be called whenever a Set(), Get() or Del() has been called on the context. Although state can be saved and managed locally until the request is being forwarded for next phase. This will reduce multiple call to DataStore when theres only one phase

Manage state of chain with `StateManager` interface

Faaschain is stateless in nature. Currently the only state in chain is executionposition and requestId. For partial request the state value are forwarded as a data.

Although It might be a use-case when user need more data to be stored in the state. To achieve that we can provide user an interface StateManager as:

type StateManager interface {
         Set(key string, value interface{}) error
         Get(key string) (interface{}, error)
         Del(key string) error
}

User should be able to request the global state manager from request context as

      context := faaschain.GetContext()
      context.Get(key)
      context.Set(key)

By Default the StateManager will be set to RequestEmbedStateManager which is provided by the faaschain. It will forward the state as a data to the next call with json to encode.
User can implement and set the state manager as SetStateManager()

An example of setting state and handling may look like

         chain.SetStateManager(monio_state_manager)
         chain.ApplyAsync("function").ApplyModifier(func ([]byte { faaschain.GetContext.Set("some", "data") }).
                    chain.ApplyAsync("function"). ApplyModifier(func ([]byte { data := faaschain.GetContext.Get("some") })

DAG definition includes both sub-DAG and foreach DAG

Exporter includes both subdag and foreach dag in case of foreach branch node. Although the behaviour is different, we store foreach DAG as a subdag to reduce space. When exporting graph definition the Subdag must not be considered if a node is foreach

Use HMAC to verify that partial request was made form faaschain

Faaschain now forwards partial request to faaschain. But it could easily be spoofed if someone else send a partial pipeline request to faaschain. One way to handle it is to use HMAC. We can sign the request of partial faaschain with the secret key and forward with the digest. Once the data is received back we can verify if the request has been generated by faaschain with the secret key and the digest

Default edges should not be considered as an execution Only Edge

@s8sg I need you help. about dag and AddForEachBranch

here is my code. this code is modify from syncflow

code

package function

import (
	"bytes"
	"fmt"
	faasflow "github.com/s8sg/faas-flow"
	consulStateStore "github.com/s8sg/faas-flow-consul-statestore"
	"net/http"
	"os"
	"strings"
)

// Define provide definiton of the workflow
func Define(flow *faasflow.Workflow, context *faasflow.Context) (err error) {
	dag := faasflow.CreateDag()

	foreachDag := dag.AddForEachBranch("foreach", func(data []byte) map[string][]byte {
		http.Post("http://requestbin.fullcontact.com/1d0l7661", "text/plain", strings.NewReader("input:"+string(data)))
		values := bytes.Split(data, []byte("-"))
		rmap := make(map[string][]byte)
		for i := 0; i < len(values); i++ {
			rmap[fmt.Sprintf("%d", i)] = values[i]
		}
		fmt.Println("foreach input:", string(data), rmap)
		return rmap
	}, faasflow.Aggregator(func(results map[string][]byte) ([]byte, error) {
		buf := bytes.NewBuffer([]byte("["))
		var start bool
		for _, v := range results {
			if start {
				start = true
			} else {
				buf.WriteByte(',')
			}
			buf.Write(v)
		}
		buf.WriteByte(']')
		fmt.Println("foreach-arggress: ", buf.String())
		http.Post("http://requestbin.fullcontact.com/1d0l7661", "text/plain", strings.NewReader("foreach:"+buf.String()))
		return buf.Bytes(), nil
	}))
	foreachDag.AddFunction("call-f1", "func1")
	//foreachDag.AddEdge("foreach", "callf1", faasflow.Sync)

	dag.AddFunction("call-f2", "func2")
	dag.AddEdge("foreach", "call-f2", faasflow.Sync)

	dag.AddModifier("dump", func(data []byte) ([]byte, error) {
		fmt.Println("dump: ", string(data))
		http.Post("http://requestbin.fullcontact.com/1d0l7661", "text/plain", strings.NewReader("dupm:"+string(data)))
		return data, nil
	})
	dag.AddEdge("call-f2", "dump", faasflow.Sync)

	flow.ExecuteDag(dag)
	return
}

// DefineStateStore provides the override of the default StateStore
func DefineStateStore() (faasflow.StateStore, error) {
	consulss, err := consulStateStore.GetConsulStateStore(os.Getenv("consul_url"), os.Getenv("consul_dc"))
	if err != nil {
		return nil, err
	}
	return consulss, nil
}

// ProvideDataStore provides the override of the default DataStore
func DefineDataStore() (faasflow.DataStore, error) {
	return nil, nil
}

DAG
image

python func1

def handle(req):
    return "func1("+req+")"

python func2

def handle(req):
    return "func2("+req+")"

run

echo "aa-bb-cc" | faas-cli invoke synflow

expect

recieve dump data from http://requestbin.fullcontact.com.
dump: func2('somedata')

problem

only can recieve, no more other data

input:aa-bb-cc-dd

image

Allow Dynamic Edge based on For Each Value

For Each is a way what allow an edge to have 1:N relation ship where N is determined dynamically or at runtime.

It can be added with Option as:

AddEdge(from, to string, ForEach(func([]byte) (branches [][]byte, err error){
         // implement
})

Based on the length of returned array (no of branches ) of []byte, the N will be determined at runtime.

In case the length is 0 or [][]byte is nil, the execution will be terminated

http read panic

resdata, _ := ioutil.ReadAll(res.Body)

This line has a bug?

	client := &http.Client{}
	res, resErr := client.Do(httpreq)
	resdata, _ := ioutil.ReadAll(res.Body)
	if resErr != nil {
		return resdata, resErr
	}

if resErr was not nil, ioutil.ReadAll(res.Body) will cause a painic

Implement Serialiser - to serialise inputs to a vertex when there is more than one InDegree

Serialiser is a fassflow Operations Option that can be added to serialise inputs to a vertex when there is more than one InDegree. Serialiser assigned to a Vertex will be called before the actual call to the operations in the vertex:

dag.CreateFunctionVertex("f1", "do-something", Serializer(func(inputs map[string][]byte) ([]byte, error) {
        //  inputs is a map of <id(string)>: <data([]byte)>
         // ... perform serialisation
         return serialised_data, nil
})

Any type of Operations in a DAG/CHAIN can be assigned with a Serializer.
For Chain or Dag vertex with only one in-degree will result a call to Serializer() with only one input.

Unlike airflow, the Serializer need not to have any previous knowledge of the incoming data making it more dynamic and flexible in nature.

Allow DAG edge Option to Alter Node result

Currently when executing a DAG the result of node gets forwarded to other nodes.
Although it might be a scenario when having more than one children node each of them might need result in different format or no result at all. AddEdge(from, to string) can be altered to have a functionality that would help to provide Options i.e.

AddEdge(from, to string, Option...)

One of the Option can be

Forwarder(func(result []byte) ([]byte){
})

to alter data when forwarded to the to node,
or use the NullForwarder

NullForwarder = Forwarder(func(result []byte) ([]byte){
        return nil
})

to avoid forwarding data at all

Question/Enhancement/Research Batch Managmenent

TLDR
Wondering the place faas-flow/faas-tower has in batch management ie batch management built on top of faas-flow or completely separate. What I call batch management: (job here is referencing a function call that is part of a batch not a k8s job) viewing batch/job states, pausing/canceling jobs/batches, notification if a batch/job fails, exponential backoff of failed functions, caching of function results.

Longer version
I've recently been doing some PoC at work wrt to OpenFaas. Aside from using it as just a FaaS many are interested in shifting their batch processing to OpenFaas. I don't think thats necessarily wrong and potentially OpenFaas itself may add some functionality wrt to batch management (openfaas/faas#657). In the mean time I was looking to build a small management layer that could do the above stated tasks.

I was looking to get some input from you (@s8sg ) if you think its best to build on top of faas-tower to get this functionality (using the distributed tracing you already have built) or maybe adding some extra functionality into faas-flow that could facilitate batch management ๐Ÿ˜„

Create web UI visualize flows

Create a separate flow UI as a Function to debug flow that are deployed.
Debug information could be:
1> Execution Count
2> Dotted Graph
3> Node information (Modifier/Function/Callback)

Make Default `Apply` as Async and Add Sync as a Option

Currently we provide Apply() and ApplyAsync() call. We can change that to Apply() with optional Sync as Option to Apply()

// chain.ApplyAsync("function")
chain.Apply("function")

and

//chain.Apply("function")
chain.Apply("function", Sync)

Which will make all call by default Async

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.