kytos / flow_manager Goto Github PK
View Code? Open in Web Editor NEWLicense: MIT License
License: MIT License
This NApp keeps a dictionary of recently sent flow mods, indexed by xid. In addition to the flow, the command (add or remove) must also be kept. See #73 .
To be used in API, we should export to and import from JSON. There should an instance to_json
method and a static method from_json
that returns an instance.
Currently, if the dpid is None, it's installing flows in all switches: https://github.com/kytos/flow_manager/blob/master/main.py#L52-L59
The same logic is also being applied for the delete method. So, this issue is to review this logic and assess a better one. Maybe, a new method which could iterate over all switches, but stop relying on this 'None' to consider the whole list.
We currently support netmasks for IPv4 addresses in match fields for OpenFlow1.0 natively, because we can calculate the wildcard bits when packing. We need to implement the support for OpenFlow1.3 matches.
Instead of a list, return a dictionary with a key 'flows' and the list as a value.
To remove a flow, it is being used a POST request to /v2/delete
. It should be changed to a DELETE request to /v2/flows
.
Today, flow_manager
is only supported for installing flows using REST endpoints. Because of this, it is necessary to implement support to install flows
using Kytos Events.
It is not like OF 1.0 because we have structs inside structs inside lists, ...
Currently, we have basic support for OF 1.0. OF 1.3 support is expected. Careful design is necessary so both versions reuse as much code as possible.
When a Bad Request HTTP code is returned, return a JSON with the data that originated the problem.
Please note the openapi documentation must have an example of this behavior.
These are examples of all action types we have today:
[
{
"action_type": "output",
"port": 42
},
{
"action_type": "set_vlan",
"vlan_id": 42
}
]
A model should have all available fields, despite any subset shown in the examples
Dear guys,
According to the OpenFlow specification:
State modification messages from the controller may be executed in an arbitrary order by the switch. A barrier request can be used by the controller to set a synchronization point, ensuring that all previous state messages are completed before the barrier response is sent back to the controller.
Source: http://flowgrammable.org/sdn/openflow/message-layer/barrier/
I'm running some tests that creates many EVCs (mef_eline) in batch (see more details on how to replicate the tests in kytos/mef_eline#223). For each EVC, mef_eline sends a FlowMod to remove "old" flows based on the cookie and then send the FlowMods to actually add the specific forwarding OpenFlow rules. When running in batch with many events being submitted at the same time, it might happened that the switch process the events in a different order than the one sent. In fact, according to the specification the order is not guaranteed. As so, the use of BarrierRequest/BarrierReply is recommended.
In the scenario I'm testing, I've created 1200 EVCs one after other, and at the end some of them informs that they are Active (enable=True, active=True), but if you check the flows on the switch, some of the flows are missing.
I've enabled debug mode on OVS (ovs-appctl vlog/set vconn:dbg and ovs-appctl vlog/disable-rate-limit vconn), but no clue why the flows get removed (or not installed).
Anyway, it is a good practice to enable barrier request/barrier reply on the flow_mod messages.
Here is a example on how we can do it 1) waiting for all pending barrier_requests before sending a new flow_mod, and 2) waiting on a specific barrier_reply right after send the flow mod (my preferred way):
--- a/main.py
+++ b/main.py
@@ -1,5 +1,6 @@
"""kytos/flow_manager NApp installs, lists and deletes switch flows."""
from collections import OrderedDict
+import time
from flask import jsonify, request
@@ -10,6 +11,9 @@ from napps.kytos.of_core.flow import FlowFactory
from .exceptions import InvalidCommandError
from .settings import FLOWS_DICT_MAX_SIZE
+from pyof.v0x01.controller2switch.barrier_request import BarrierRequest as BReq10
+from pyof.v0x04.controller2switch.barrier_request import BarrierRequest as BReq13
+
class Main(KytosNApp):
"""Main class to be used by Kytos controller."""
@@ -23,6 +27,7 @@ class Main(KytosNApp):
log.debug("flow-manager starting")
self._flow_mods_sent = OrderedDict()
self._flow_mods_sent_max_size = FLOWS_DICT_MAX_SIZE
+ self.pending_barrier_reply = dict()
def execute(self):
"""Run once on NApp 'start' or in a loop.
@@ -132,6 +137,8 @@ class Main(KytosNApp):
self._flow_mods_sent[xid] = (flow, command)
def _send_flow_mod(self, switch, flow_mod):
+ self._wait_for_pending_barrier_reply(switch)
+
event_name = 'kytos/flow_manager.messages.out.ofpt_flow_mod'
content = {'destination': switch.connection,
@@ -139,6 +146,35 @@ class Main(KytosNApp):
event = KytosEvent(name=event_name, content=content)
self.controller.buffers.msg_out.put(event)
+ self._send_barrier_request(switch)
+
+ def _send_barrier_request(self, switch):
+ event_name = 'kytos/flow_manager.messages.out.ofpt_barrier_request'
+
+ of_version = switch.connection.protocol.version
+ barrier_request = None
+ if of_version == 0x01:
+ barrier_request = BReq10()
+ elif of_version == 0x04:
+ barrier_request = BReq13()
+ else:
+ return
+ if switch.id not in self.pending_barrier_reply:
+ self.pending_barrier_reply[switch.id] = set()
+ self.pending_barrier_reply[switch.id].add(barrier_request.header.xid)
+ content = {'destination': switch.connection,
+ 'message': barrier_request}
+
+ event = KytosEvent(name=event_name, content=content)
+ self.controller.buffers.msg_out.put(event)
+
+ def _wait_for_pending_barrier_reply(self, switch):
+ timeout = time.time() + 10
+ while len(self.pending_barrier_reply.get(switch.id, [])) > 0 and time.time() <= timeout:
+ time.sleep(0.1)
+ if time.time() > timeout:
+ log.warning("Timeout waiting for barrier_reply from switch=%s pending_barrier_reply=%s" % (switch.id, self.pending_barrier_reply.get(switch.id, [])))
+
def _send_napp_event(self, switch, flow, command, **kwargs):
"""Send an Event to other apps informing about a FlowMod."""
@@ -156,6 +192,18 @@ class Main(KytosNApp):
event_app = KytosEvent(name, content)
self.controller.buffers.app.put(event_app)
+ @listen_to('.*of_core.*.ofpt_barrier_reply')
+ def handle_barrier_reply(self, event):
+ """Receive OpenFlow Barrier Reply.
+ """
+ switch = event.source.switch.id
+ xid = event.content["message"].header.xid.value
+ log.debug("barrier reply: switch=%s xid=%s" % (switch, xid))
+ try:
+ self.pending_barrier_reply[switch].remove(xid)
+ except Exception as e:
+ log.warning("failed to remove received barrier_reply (xid=%s) from pending_barrier_reply: %s" % (xid, e))
+
@listen_to('.*.of_core.*.ofpt_error')
def handle_errors(self, event):
"""Receive OpenFlow error and send a event.
diff --git a/main.py b/main.py
index 676a956..762e001 100644
--- a/main.py
+++ b/main.py
@@ -1,5 +1,6 @@
"""kytos/flow_manager NApp installs, lists and deletes switch flows."""
from collections import OrderedDict
+import time
from flask import jsonify, request
@@ -10,6 +11,9 @@ from napps.kytos.of_core.flow import FlowFactory
from .exceptions import InvalidCommandError
from .settings import FLOWS_DICT_MAX_SIZE
+from pyof.v0x01.controller2switch.barrier_request import BarrierRequest as BReq10
+from pyof.v0x04.controller2switch.barrier_request import BarrierRequest as BReq13
+
class Main(KytosNApp):
"""Main class to be used by Kytos controller."""
@@ -23,6 +27,7 @@ class Main(KytosNApp):
log.debug("flow-manager starting")
self._flow_mods_sent = OrderedDict()
self._flow_mods_sent_max_size = FLOWS_DICT_MAX_SIZE
+ self.pending_barrier_reply = dict()
def execute(self):
"""Run once on NApp 'start' or in a loop.
@@ -139,6 +144,36 @@ class Main(KytosNApp):
event = KytosEvent(name=event_name, content=content)
self.controller.buffers.msg_out.put(event)
+ self._send_barrier_request(switch)
+
+ def _send_barrier_request(self, switch):
+ event_name = 'kytos/flow_manager.messages.out.ofpt_barrier_request'
+
+ of_version = switch.connection.protocol.version
+ barrier_request = None
+ if of_version == 0x01:
+ barrier_request = BReq10()
+ elif of_version == 0x04:
+ barrier_request = BReq13()
+ else:
+ return
+ if switch.id not in self.pending_barrier_reply:
+ self.pending_barrier_reply[switch.id] = set()
+ self.pending_barrier_reply[switch.id].add(barrier_request.header.xid)
+ content = {'destination': switch.connection,
+ 'message': barrier_request}
+
+ event = KytosEvent(name=event_name, content=content)
+ self.controller.buffers.msg_out.put(event)
+ self._wait_for_pending_barrier_reply(switch, barrier_request.header.xid)
+
+ def _wait_for_pending_barrier_reply(self, switch, xid):
+ timeout = time.time() + 10
+ while xid in self.pending_barrier_reply.get(switch.id, []) and time.time() <= timeout:
+ time.sleep(0.05)
+ if time.time() > timeout:
+ log.warning("Timeout waiting for barrier_reply from switch=%s xid=%s" % (switch.id, xid))
+
def _send_napp_event(self, switch, flow, command, **kwargs):
"""Send an Event to other apps informing about a FlowMod."""
@@ -156,6 +191,18 @@ class Main(KytosNApp):
event_app = KytosEvent(name, content)
self.controller.buffers.app.put(event_app)
+ @listen_to('.*of_core.*.ofpt_barrier_reply')
+ def handle_barrier_reply(self, event):
+ """Receive OpenFlow Barrier Reply.
+ """
+ switch = event.source.switch.id
+ xid = event.content["message"].header.xid.value
+ log.debug("barrier reply: switch=%s xid=%s" % (switch, xid))
+ try:
+ self.pending_barrier_reply[switch].remove(xid)
+ except Exception as e:
+ log.warning("failed to remove received barrier_reply (xid=%s) from pending_barrier_reply: %s" % (xid, e))
+
@listen_to('.*.of_core.*.ofpt_error')
def handle_errors(self, event):
"""Receive OpenFlow error and send a event.
Hi Kytos Team,
Every Kytos Napp have a README describing the a basic documentation and the installation process. For those who know the Kytos structure, they are familiar with kytos.json which describes the dependencies on other napps. It would be good if the README also have the information about the napp dependencies as well, so that who is not familiar with kytos could quickly see what is the dependencies.
Just to make it clear, this is only a suggestion for documentation improvement, since when you run "kytos napps install xxxx" the dependencies are already resolved.
Such as malformed flow JSON or inexistent datapath
The NApps 'of_lldp' and 'of_l2ls' needs remove their installed flows once a switch is disabled. This change is necessary to allow the mentioned NApps install and remove flows using "flow_manager".
Today, the flow added by the user is not stored in the storehouse.
Possible solution:
Related:
kytos/kytos#1127
Use standard OF flow mods to manage flows because the current non-standard hash will require a lot of effort for OF 1.3. Unique identification can be done by settings OF cookies, for example.
We should support, at least:
As non-standard flow ID will be removed as a consequence of 1.3 support, update the 1.0 code accordingly.
Must be changed in kytos.json
Kytos Project currently use a directory named requirements, to maintain all requirements files. However, flow_manager have requirements files outside of requirements directory. The files are: requirements-dev.txt, requirements-docs.txt and
requirements.txt. These files should be moved to requirements directory and renamed to follow the standard used in kytos (dev.txt, run.txt)
Update the README fles, get rid of the EXAMPLES one, create an yml for each endpoint with the example input/output.
When upgrading from kytos 2020.2rc1 to 2020.2 with flow_manager 3.0 and running mininet, a KeyError: 'flow'
is issued on the Kytos console. After that it says Flows loaded.
Steps to reproduce: upgrade an existing instance of Kytos, run kytosd -f
and then run Mininet.
sudo mn --topo linear,3 --mac --controller=remote,ip=127.0.0.1 --switch ovsk,protocols=OpenFlow13
2020-12-30 17:11:01,817 - INFO [kytos.core.atcp_server] (MainThread) New connection from 127.0.0.1:35060
2020-12-30 17:11:01,820 - INFO [kytos.core.atcp_server] (MainThread) New connection from 127.0.0.1:35062
2020-12-30 17:11:01,821 - INFO [kytos.core.atcp_server] (MainThread) New connection from 127.0.0.1:35064
2020-12-30 17:11:02,399 - INFO [kytos.napps.kytos/of_core] (Thread-38) Connection ('127.0.0.1', 35060), Switch 00:00:00:00:00:00:00:01: OPENFLOW HANDSHAKE COMPLETE
2020-12-30 17:11:02,402 - INFO [kytos.napps.kytos/of_core] (Thread-40) Connection ('127.0.0.1', 35064), Switch 00:00:00:00:00:00:00:02: OPENFLOW HANDSHAKE COMPLETE
2020-12-30 17:11:02,409 - INFO [kytos.napps.kytos/of_core] (Thread-39) Connection ('127.0.0.1', 35062), Switch 00:00:00:00:00:00:00:03: OPENFLOW HANDSHAKE COMPLETE
Exception in thread Thread-45:
Traceback (most recent call last):
File "/usr/lib/python3.8/threading.py", line 932, in _bootstrap_inner
self.run()
File "/usr/lib/python3.8/threading.py", line 870, in run
self._target(*self._args, **self._kwargs)
File kytos/core/helpers.py", line 70, in threaded_handler
handler(*args)
File "venv/ky/var/lib/kytos/napps/kytos/flow_manager/main.py", line 68, in resend_stored_flows
flows_dict = {"flows": [flow['flow']]}
KeyError: 'flow'
kytos $> 2020-12-30 17:12:00,421 - INFO [kytos.napps.kytos/flow_manager] (flow_manager) Flows loaded.
Change the expected body to be a dictionary with a 'flows' key with the list of flows as a value, instead of the list alone.
Watch every datapath's flows to send a KytosEvent for each change.
Each flow_mod sent to a switch must be recorded by xid. That will allow the application to be able to catch error and know from which flow_mod it was.
So there is no need to know the controller port by heart.
It solves compatibility details also.
There are architectural differences between the Set VLAN action in OpenFlow 1.0 and the Push VLAN + Set Field actions in OpenFlow 1.3 which need to be reviewed to assure support.
Today it does not have any test implemented.
@diraol commented on Thu Oct 19 2017
Beraldo requested it to be added hardcoded for now.
Current Flow_manager Napp doesn't support complex forwarding pipelines, where more than one forwarding table is available. We need to find a way of expressing the pipeline we would like to use with a fallback option for switches that support a single forwarding table.
To be used in API, we should export to and import from JSON. There should an instance to_json
method and a static method from_json
that returns an instance.
Documenting the issue originally reported by @josemauro and @cmagnobarbosa:
sudo mn --topo linear,3 --mac --controller=remote,ip=127.0.0.1 --switch ovsk,protocols=OpenFlow10
curl --header "Content-Type: application/json" \
--request POST \
--data '{
"flows":[
{
"priority": 17,
"idle_timeout":360,
"hard_timeout":1200,
"cookie":84114904,
"match":{
"in_port":2,
"dl_src":"00:1f:3a:3e:9a:cf",
"dl_dst":"00:15:af:d5:38:98",
"dl_type":2048,
"dl_vlan":300,
"dl_vlan_pcp":0,
"nw_src":"192.168.0.1",
"nw_dst":"192.168.0.2",
"nw_proto":17
},
"actions":[
{
"action_type":"output",
"port":42
},
{
"action_type":"set_vlan",
"vlan_id":42
}
]
}
]
}' \
http://127.0.0.1:8181/api/kytos/flow_manager/v2/flows/00:00:00:00:00:00:00:01
kytos $> controller.switches['00:00:00:00:00:00:00:01'].enable()
curl http://127.0.0.1:8181/api/kytos/flow_manager/v2/flows
2020-11-26 18:41:18,413 - ERROR [kytos.core.controller] (Thread-933) Exception on /api/kytos/flow_manager/v2/flows [GET]
Traceback (most recent call last):
File "venv/lib/python3.8/site-packages/flask/app.py", line 2447, in wsgi_app
response = self.full_dispatch_request()
File "venv/lib/python3.8/site-packages/flask/app.py", line 1952, in full_dispatch_request
rv = self.handle_user_exception(e)
File "venv/lib/python3.8/site-packages/flask_cors/extension.py", line 161, in wrapped_function
return cors_after_request(app.make_response(f(*args, **kwargs)))
File "venv/lib/python3.8/site-packages/flask/app.py", line 1821, in handle_user_exception
reraise(exc_type, exc_value, tb)
File "venv/lib/python3.8/site-packages/flask/_compat.py", line 39, in reraise
raise value
File "venv/lib/python3.8/site-packages/flask/app.py", line 1950, in full_dispatch_request
rv = self.dispatch_request()
File "venv/lib/python3.8/site-packages/flask/app.py", line 1936, in dispatch_request
return self.view_functions[rule.endpoint](**req.view_args)
File "venv/var/lib/kytos/napps/kytos/flow_manager/main.py", line 167, in list
flows_dict = [flow.as_dict() for flow in switch.flows]
File "venv/var/lib/kytos/napps/kytos/flow_manager/main.py", line 167, in <listcomp>
flows_dict = [flow.as_dict() for flow in switch.flows]
File "venv/var/lib/kytos/napps/../napps/kytos/of_core/flow.py", line 122, in as_dict
flow_dict['id'] = self.id
File "venv/var/lib/kytos/napps/../napps/kytos/of_core/flow.py", line 94, in id
flow_str = self.as_json(sort_keys=True, include_id=False)
File "venv/var/lib/kytos/napps/../napps/kytos/of_core/flow.py", line 167, in as_json
return json.dumps(self.as_dict(include_id), sort_keys=sort_keys)
File "/usr/lib/python3.8/json/__init__.py", line 234, in dumps
return cls(
File "/usr/lib/python3.8/json/encoder.py", line 199, in encode
chunks = self.iterencode(o, _one_shot=True)
File "/usr/lib/python3.8/json/encoder.py", line 257, in iterencode
return _iterencode(o, 0)
File "/usr/lib/python3.8/json/encoder.py", line 179, in default
raise TypeError(f'Object of type {o.__class__.__name__} '
TypeError: Object of type UBInt16 is not JSON serializable
This happens because it gets the list of keys of the controller.switches
dictionaries instead of the values,
IDs will be removed by #1.
As we cannot always guarantee that the flow will be properly installed, only that the message will be sent.
flow_manager
is using pylama, but should use the same linter as kytos core: yala. Also there are 18 linter issues pending.
Remove 'example' from the schema names and change 'definitions' to 'components'
Note we need also to change the behavior in the code
Those events shall inform when the NApp sends a FlowMod to add or remove a flow.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.