Add support live-migration

Signed-off-by: YAMAMOTO Takashi <yamamoto@valinux.co.jp>
Signed-off-by: Isaku Yamahata <yamahata@valinux.co.jp>
Signed-off-by: Yoshihiro Kaneko <ykaneko0929@gmail.com>
Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
This commit is contained in:
Yoshihiro Kaneko 2013-02-08 12:28:10 +09:00 committed by FUJITA Tomonori
parent 6b4a5f7470
commit 9bd88b5a5d
6 changed files with 219 additions and 67 deletions

View File

@ -14,8 +14,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import collections
import logging
from collections import defaultdict
from ryu import exception as ryu_exc
from ryu.app.rest_nw_id import (NW_ID_VPORT_GRE,
@ -177,6 +177,10 @@ class PortSet(handler_utils.QueueSerializer):
port_no, mac_address, add_del))
def _vm_port_mac_handler(self, dpid, port_no, network_id, add_del):
if network_id == NW_ID_VPORT_GRE:
self._tunnel_port_handler(dpid, port_no, add_del)
return
try:
mac_address = self.nw.get_mac(dpid, port_no)
except ryu_exc.PortNotFound:
@ -473,6 +477,29 @@ class GRETunnel(app_manager.RyuApp):
def _link_is_up(self, dp, port_no):
return _link_is_up(self.dpset, dp, port_no)
def _port_is_active(self, network_id, dp, nw_port):
return (nw_port.network_id == network_id and
nw_port.mac_address is not None and
self._link_is_up(dp, nw_port.port_no))
def _tunnel_port_with_mac(self, remote_dp, dpid, network_id, port_no,
mac_address):
tunnel_ports = []
ports = self.nw.get_ports_with_mac(network_id, mac_address).copy()
ports.discard((dpid, port_no))
assert len(ports) <= 1
for port in ports:
try:
tunnel_port_no = self.tunnels.get_port(remote_dp.id, port.dpid)
except ryu_exc.PortNotFound:
pass
else:
if self._link_is_up(remote_dp, tunnel_port_no):
tunnel_ports.append(tunnel_port_no)
assert len(tunnel_ports) <= 1
return tunnel_ports
def _vm_port_add(self, ev):
dpid = ev.dpid
dp = self.dpset.get(dpid)
@ -486,8 +513,12 @@ class GRETunnel(app_manager.RyuApp):
remote_dpids.remove(dpid)
# LOCAL_OUT_TABLE: unicast
# live-migration: there can be two ports with same mac_address
ports = self.nw.get_ports(dpid, network_id, mac_address)
assert ev.port_no in [port.port_no for port in ports]
rule = cls_rule(tun_id=tunnel_key, dl_dst=mac_address)
actions = [ofproto_parser.OFPActionOutput(ev.port_no)]
actions = [ofproto_parser.OFPActionOutput(port.port_no)
for port in ports if self._link_is_up(dp, port.port_no)]
self.send_flow_mod(dp, rule, self.LOCAL_OUT_TABLE, ofproto.OFPFC_ADD,
self.LOCAL_OUT_PRI_MAC, actions)
@ -495,9 +526,7 @@ class GRETunnel(app_manager.RyuApp):
rule = cls_rule(tun_id=tunnel_key, dl_dst=mac.BROADCAST)
actions = []
for port in self.nw.get_ports(dpid):
if (port.network_id != network_id or port.mac_address is None):
continue
if not self._link_is_up(dp, port.port_no):
if not self._port_is_active(network_id, dp, port):
continue
actions.append(ofproto_parser.OFPActionOutput(port.port_no))
@ -519,6 +548,7 @@ class GRETunnel(app_manager.RyuApp):
ofproto.OFPFC_ADD, self.LOCAL_OUT_PRI_DROP, [])
# TUNNEL_OUT_TABLE: unicast
mac_to_ports = collections.defaultdict(set)
for remote_dpid in remote_dpids:
remote_dp = self.dpset.get(remote_dpid)
if remote_dp is None:
@ -531,19 +561,12 @@ class GRETunnel(app_manager.RyuApp):
continue
for port in self.nw.get_ports(remote_dpid):
if port.network_id != network_id or port.mac_address is None:
continue
if not self._link_is_up(remote_dp, port.port_no):
if not self._port_is_active(network_id, remote_dp, port):
continue
# TUNNEL_OUT_TABLE: unicast
rule = cls_rule(tun_id=tunnel_key, dl_dst=port.mac_address)
output = ofproto_parser.OFPActionOutput(tunnel_port_no)
resubmit_table = ofproto_parser.NXActionResubmitTable(
in_port=ofproto.OFPP_IN_PORT, table=self.LOCAL_OUT_TABLE)
actions = [output, resubmit_table]
self.send_flow_mod(dp, rule, self.TUNNEL_OUT_TABLE,
ofproto.OFPFC_ADD, self.TUNNEL_OUT_PRI_MAC,
actions)
# live-migration: there can be more than one tunnel-ports that
# have a given mac address
mac_to_ports[port.mac_address].add(tunnel_port_no)
if first_instance:
# SRC_TABLE: TUNNEL-port: resubmit to LOAL_OUT_TABLE
@ -555,6 +578,18 @@ class GRETunnel(app_manager.RyuApp):
ofproto.OFPFC_ADD, self.SRC_PRI_TUNNEL_PASS,
actions)
# TUNNEL_OUT_TABLE: unicast
for remote_mac_address, tunnel_ports in mac_to_ports.items():
rule = cls_rule(tun_id=tunnel_key, dl_dst=remote_mac_address)
outputs = [ofproto_parser.OFPActionOutput(tunnel_port_no)
for tunnel_port_no in tunnel_ports]
resubmit_table = ofproto_parser.NXActionResubmitTable(
in_port=ofproto.OFPP_IN_PORT, table=self.LOCAL_OUT_TABLE)
actions = outputs + [resubmit_table]
self.send_flow_mod(dp, rule, self.TUNNEL_OUT_TABLE,
ofproto.OFPFC_ADD, self.TUNNEL_OUT_PRI_MAC,
actions)
if first_instance:
# TUNNEL_OUT_TABLE: catch-all(resubmit to LOCAL_OUT_TABLE)
rule = cls_rule(tun_id=tunnel_key)
@ -610,29 +645,37 @@ class GRETunnel(app_manager.RyuApp):
remote_ofproto_parser = remote_dp.ofproto_parser
# TUNNEL_OUT_TABLE: unicast
# live-migration: there can be another port that has
# same mac address
tunnel_ports = self._tunnel_port_with_mac(remote_dp, dpid,
network_id, ev.port_no,
mac_address)
tunnel_ports.append(tunnel_port_no)
rule = cls_rule(tun_id=ev.tunnel_key, dl_dst=mac_address)
output = remote_ofproto_parser.OFPActionOutput(tunnel_port_no)
outputs = [remote_ofproto_parser.OFPActionOutput(port_no)
for port_no in tunnel_ports]
resubmit_table = remote_ofproto_parser.NXActionResubmitTable(
in_port=remote_ofproto.OFPP_IN_PORT,
table=self.LOCAL_OUT_TABLE)
actions = [output, resubmit_table]
actions = outputs + [resubmit_table]
self.send_flow_mod(remote_dp, rule, self.TUNNEL_OUT_TABLE,
remote_ofproto.OFPFC_ADD,
self.TUNNEL_OUT_PRI_MAC, actions)
if first_instance:
# SRC_TABLE: TUNNEL-port
rule = cls_rule(in_port=tunnel_port_no, tun_id=ev.tunnel_key)
resubmit_table = remote_ofproto_parser.NXActionResubmitTable(
in_port=remote_ofproto.OFPP_IN_PORT,
table=self.LOCAL_OUT_TABLE)
actions = [resubmit_table]
self.send_flow_mod(remote_dp, rule, self.SRC_TABLE,
remote_ofproto.OFPFC_ADD,
self.SRC_PRI_TUNNEL_PASS, actions)
else:
if not first_instance:
continue
# SRC_TABLE: TUNNEL-port
rule = cls_rule(in_port=tunnel_port_no, tun_id=ev.tunnel_key)
resubmit_table = remote_ofproto_parser.NXActionResubmitTable(
in_port=remote_ofproto.OFPP_IN_PORT,
table=self.LOCAL_OUT_TABLE)
actions = [resubmit_table]
self.send_flow_mod(remote_dp, rule, self.SRC_TABLE,
remote_ofproto.OFPFC_ADD,
self.SRC_PRI_TUNNEL_PASS, actions)
# TUNNEL_OUT_TABLE: broadcast
rule = cls_rule(tun_id=ev.tunnel_key, dl_dst=mac.BROADCAST)
tunnel_ports = self._list_tunnel_port(remote_dp, remote_dpids)
@ -667,9 +710,7 @@ class GRETunnel(app_manager.RyuApp):
for port in self.nw.get_ports(dpid):
if port.port_no == ev.port_no:
continue
if (port.network_id != network_id or port.mac_address is None):
continue
if not self._link_is_up(dp, port.port_no):
if not self._port_is_active(network_id, dp, port):
continue
local_ports.append(port.port_no)
@ -704,10 +745,23 @@ class GRETunnel(app_manager.RyuApp):
[]) # priority is ignored
else:
# LOCAL_OUT_TABLE: unicast
rule = cls_rule(tun_id=tunnel_key, dl_src=mac_address)
self.send_flow_del(dp, rule, self.LOCAL_OUT_TABLE,
ofproto.OFPFC_DELETE_STRICT,
self.LOCAL_OUT_PRI_MAC, ev.port_no)
# live-migration: there can be two ports with same mac_address
ports = self.nw.get_ports(dpid, network_id, mac_address)
port_nos = [port.port_no for port in ports
if (port.port_no != ev.port_no and
self._link_is_up(dp, port.port_no))]
rule = cls_rule(tun_id=tunnel_key, dl_dst=mac_address)
if port_nos:
assert len(ports) == 1
actions = [ofproto_parser.OFPActionOutput(port_no)
for port_no in port_nos]
self.send_flow_mod(dp, rule, self.LOCAL_OUT_TABLE,
ofproto.OFPFC_MODIFY_STRICT,
self.LOCAL_OUT_PRI_MAC, actions)
else:
self.send_flow_del(dp, rule, self.LOCAL_OUT_TABLE,
ofproto.OFPFC_DELETE_STRICT,
self.LOCAL_OUT_PRI_MAC, ev.port_no)
# LOCAL_OUT_TABLE: broadcast
rule = cls_rule(tun_id=tunnel_key, dl_dst=mac.BROADCAST)
@ -721,7 +775,8 @@ class GRETunnel(app_manager.RyuApp):
# remote dp
remote_dpids = self.nw.get_dpids(ev.network_id)
remote_dpids.remove(dpid)
if dpid in remote_dpids:
remote_dpids.remove(dpid)
for remote_dpid in remote_dpids:
remote_dp = self.dpset.get(remote_dpid)
if remote_dp is None:
@ -744,7 +799,7 @@ class GRETunnel(app_manager.RyuApp):
self.SRC_PRI_TUNNEL_PASS, None)
# SRC_TABLE: TUNNEL-port catch-call drop rule
rule = cls_rule(in_port=tunnel_port_no, tun_id=tunnel_key)
rule = cls_rule(in_port=tunnel_port_no)
self.send_flow_del(remote_dp, rule, self.SRC_TABLE,
remote_ofproto.OFPFC_DELETE_STRICT,
self.SRC_PRI_TUNNEL_DROP, None)
@ -771,15 +826,31 @@ class GRETunnel(app_manager.RyuApp):
actions)
# TUNNEL_OUT_TABLE: unicast
# live-migration: there can be more than one (dpid, port_no)
# with a given mac address
tunnel_ports = self._tunnel_port_with_mac(remote_dp, dpid,
network_id, ev.port_no,
mac_address)
rule = cls_rule(tun_id=tunnel_key, dl_dst=mac_address)
self.send_flow_del(remote_dp, rule, self.TUNNEL_OUT_TABLE,
remote_ofproto.OFPFC_DELETE_STRICT,
self.TUNNEL_OUT_PRI_MAC, tunnel_port_no)
if tunnel_ports:
outputs = [remote_ofproto_parser.OFPActionOutput(port_no)
for port_no in tunnel_ports]
resubmit_table = remote_ofproto_parser.NXActionResubmitTable(
in_port=remote_ofproto.OFPP_IN_PORT,
table=self.LOCAL_OUT_TABLE)
actions = outputs + [resubmit_table]
self.send_flow_mod(remote_dp, rule, self.TUNNEL_OUT_TABLE,
remote_ofproto.OFPFC_ADD,
self.TUNNEL_OUT_PRI_MAC, actions)
else:
self.send_flow_del(remote_dp, rule, self.TUNNEL_OUT_TABLE,
remote_ofproto.OFPFC_DELETE_STRICT,
self.TUNNEL_OUT_PRI_MAC, tunnel_port_no)
# TODO:XXX multicast
def _get_vm_ports(self, dpid):
ports = defaultdict(list)
ports = collections.defaultdict(list)
for port in self.nw.get_ports(dpid):
if port.network_id in RESERVED_NETWORK_IDS:
continue

View File

@ -188,9 +188,16 @@ class OVSSwitch(object):
iface_id = port.ext_ids.get('iface-id')
if iface_id is None:
return
try:
network_id = self.ifaces.get_key(iface_id,
QuantumIfaces.KEY_NETWORK_ID)
except KeyError:
return
if not add:
self.network_api.remove_port(network_id, self.dpid, port.ofport)
ports = self.ifaces.get_key(iface_id, QuantumIfaces.KEY_PORTS)
other_ovs_ports = None
for p in ports:
dpid = p.get(QuantumIfaces.SUBKEY_DATAPATH_ID)
if dpid is None:
@ -201,9 +208,9 @@ class OVSSwitch(object):
other_ovs_ports = self.ifaces.del_key(iface_id,
QuantumIfaces.KEY_PORTS,
p)
if other_ovs_ports:
# When live-migration, one of the two OVS ports is deleted.
return
if other_ovs_ports:
# When live-migration, one of the two OVS ports is deleted.
return
port_data = {
'datapath_id': dpid_lib.dpid_to_str(self.dpid),
@ -227,11 +234,6 @@ class OVSSwitch(object):
return
# update {network, port, mac}
try:
network_id = self.ifaces.get_key(iface_id,
QuantumIfaces.KEY_NETWORK_ID)
except KeyError:
return
self.network_api.update_network(network_id)
self.network_api.update_port(network_id, self.dpid, port.ofport)
mac = port.ext_ids.get('attached-mac')
@ -240,7 +242,6 @@ class OVSSwitch(object):
mac_lib.haddr_to_bin(mac))
def update_port(self, port_no, port_name, add):
port_name = port_name.rstrip('\x00')
LOG.debug('update_port port_no %d %s %s', port_no, port_name, add)
assert port_name is not None
old_port = self.ports.get(port_no)
@ -252,9 +253,9 @@ class OVSSwitch(object):
if self.ovs_bridge:
port_cfg = self.ovs_bridge.get_quantum_ports(port_name)
if port_cfg:
if 'ofport' not in port_cfg:
if 'ofport' not in port_cfg or not port_cfg['ofport']:
port_cfg['ofport'] = port_no
if port_cfg['ofport'] != port_no:
elif port_cfg['ofport'] != port_no:
LOG.warn('inconsistent port_no: %d port_cfg %s',
port_no, port_cfg)
return
@ -369,12 +370,14 @@ class QuantumAdapter(app_manager.RyuApp):
@handler.set_ev_cls(dpset.EventPortAdd)
def port_add_handler(self, ev):
port = ev.port
self._port_handler(ev.dp.id, port.port_no, port.name, True)
name = port.name.rstrip('\0')
self._port_handler(ev.dp.id, port.port_no, name, True)
@handler.set_ev_cls(dpset.EventPortDelete)
def port_del_handler(self, ev):
port = ev.port
self._port_handler(ev.dp.id, port.port_no, port.name, False)
name = port.name.rstrip('\0')
self._port_handler(ev.dp.id, port.port_no, name, False)
def _conf_switch_set_ovsdb_addr(self, dpid, value):
ovs_switch = self._get_ovs_switch(dpid)

View File

@ -431,17 +431,20 @@ class TunnelPortUpdater(app_manager.RyuApp):
def _vm_port_del(self, network_id, dpid):
LOG.debug('_vm_port_del %s %s', network_id, dpid_lib.dpid_to_str(dpid))
if len(self.nw.get_ports(dpid, network_id)) > 1:
if len(self.nw.get_ports(dpid, network_id)) > 0:
return
tunnel_networks = self.nw.get_networks(dpid).copy()
tunnel_networks = set(p.network_id
for p in self.nw.get_networks(dpid))
tunnel_networks.discard(network_id)
tunnel_networks.difference_update(rest_nw_id.RESERVED_NETWORK_IDS)
dpids = self.nw.get_dpids(network_id).copy()
dpids.discard(dpid)
del_dpids = []
for remote_dpid in dpids:
if tunnel_networks & self.nw.get_networks(remote_dpid):
remote_networks = set(p.network_id
for p in self.nw.get_networks(remote_dpid))
if tunnel_networks & remote_networks:
continue
self.tunnel_requests.remove(dpid, remote_dpid)
del_dpids.append(remote_dpid)

View File

@ -14,6 +14,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import collections
import logging
from ryu.base import app_manager
@ -145,7 +146,7 @@ class Port(object):
class DPIDs(dict):
"""dpid -> port_no -> network_id"""
"""dpid -> port_no -> Port(port_no, network_id, mac_address)"""
def __init__(self, f, nw_id_unknown):
super(DPIDs, self).__init__()
self.send_event = f
@ -180,11 +181,16 @@ class DPIDs(dict):
except KeyError:
raise PortNotFound(dpid=dpid, port=port_no, network_id=None)
def get_ports(self, dpid, network_id=None):
def get_ports(self, dpid, network_id=None, mac_address=None):
if network_id is None:
return self.get(dpid, {}).values()
if mac_address is None:
return [p for p in self.get(dpid, {}).values()
if p.network_id == network_id]
# live-migration: There can be two ports that have same mac address.
return [p for p in self.get(dpid, {}).values()
if p.network_id == network_id]
if p.network_id == network_id and p.mac_address == mac_address]
def get_port(self, dpid, port_no):
try:
@ -242,6 +248,48 @@ class DPIDs(dict):
mac_address=port.mac_address)
MacPort = collections.namedtuple('MacPort', ('dpid', 'port_no'))
class MacToPort(collections.defaultdict):
"""mac_address -> set of MacPort(dpid, port_no)"""
def __init__(self):
super(MacToPort, self).__init__(set)
def add_port(self, dpid, port_no, mac_address):
self[mac_address].add(MacPort(dpid, port_no))
def remove_port(self, dpid, port_no, mac_address):
ports = self[mac_address]
ports.discard(MacPort(dpid, port_no))
if not ports:
del self[mac_address]
def get_ports(self, mac_address):
return self[mac_address]
class MacAddresses(dict):
"""network_id -> mac_address -> set of (dpid, port_no)"""
def add_port(self, network_id, dpid, port_no, mac_address):
mac2port = self.setdefault(network_id, MacToPort())
mac2port.add_port(dpid, port_no, mac_address)
def remove_port(self, network_id, dpid, port_no, mac_address):
mac2port = self.get(network_id)
if mac2port is None:
return
mac2port.remove_port(dpid, port_no, mac_address)
if not mac2port:
del self[network_id]
def get_ports(self, network_id, mac_address):
mac2port = self.get(network_id)
if not mac2port:
return set()
return mac2port.get_ports(mac_address)
class Network(app_manager.RyuApp):
def __init__(self, nw_id_unknown=NW_ID_UNKNOWN):
super(Network, self).__init__()
@ -249,6 +297,7 @@ class Network(app_manager.RyuApp):
self.nw_id_unknown = nw_id_unknown
self.networks = Networks(self.send_event_to_observers)
self.dpids = DPIDs(self.send_event_to_observers, nw_id_unknown)
self.mac_addresses = MacAddresses()
def _check_nw_id_unknown(self, network_id):
if network_id == self.nw_id_unknown:
@ -303,10 +352,25 @@ class Network(app_manager.RyuApp):
def update_port(self, network_id, dpid, port):
self._update_port(network_id, dpid, port, True)
def remove_port(self, network_id, dpid, port):
def _get_old_mac(self, network_id, dpid, port_no):
try:
port = self.dpids.get_port(dpid, port_no)
except PortNotFound:
pass
else:
if port.network_id == network_id:
return port.mac_address
return None
def remove_port(self, network_id, dpid, port_no):
# generate event first, then do the real task
self.dpids.remove_port(dpid, port)
self.networks.remove(network_id, dpid, port)
old_mac_address = self._get_old_mac(network_id, dpid, port_no)
self.dpids.remove_port(dpid, port_no)
self.networks.remove(network_id, dpid, port_no)
if old_mac_address is not None:
self.mac_addresses.remove_port(network_id, dpid, port_no,
old_mac_address)
#
# methods for gre tunnel
@ -322,10 +386,17 @@ class Network(app_manager.RyuApp):
return self.dpids.get_networks(dpid)
def create_mac(self, network_id, dpid, port_no, mac_address):
self.mac_addresses.add_port(network_id, dpid, port_no, mac_address)
self.dpids.set_mac(network_id, dpid, port_no, mac_address)
def update_mac(self, network_id, dpid, port_no, mac_address):
old_mac_address = self._get_old_mac(network_id, dpid, port_no)
self.dpids.update_mac(network_id, dpid, port_no, mac_address)
if old_mac_address is not None:
self.mac_addresses.remove_port(network_id, dpid, port_no,
old_mac_address)
self.mac_addresses.add_port(network_id, dpid, port_no, mac_address)
def get_mac(self, dpid, port_no):
return self.dpids.get_mac(dpid, port_no)
@ -336,12 +407,15 @@ class Network(app_manager.RyuApp):
return []
return [mac_address]
def get_ports(self, dpid, network_id=None):
return self.dpids.get_ports(dpid, network_id)
def get_ports(self, dpid, network_id=None, mac_address=None):
return self.dpids.get_ports(dpid, network_id, mac_address)
def get_port(self, dpid, port_no):
return self.dpids.get_port(dpid, port_no)
def get_ports_with_mac(self, network_id, mac_address):
return self.mac_addresses.get_ports(network_id, mac_address)
#
# methods for simple_isolation
#

View File

@ -35,6 +35,8 @@ def is_multicast(addr):
def haddr_to_str(addr):
"""Format mac address in internal representation into human readable
form"""
if addr is None:
return 'None'
assert len(addr) == _HADDR_LEN
return ':'.join('%02x' % ord(char) for char in addr)

View File

@ -1309,7 +1309,6 @@ class VSCtl(object):
else:
# When port is created, ofport column might be None.
# So try with port name if it happended
port_name = port_name.rstrip('\0')
for vsctl_port in br.ports:
iface_cfgs.extend(
self._iface_to_dict(vsctl_iface.iface_cfg)