# SDN作业7 负载均衡 > 完整复现可能较难 > 实在有点头疼了,代码部分不是特别完整,还是可能有问题,先这样吧。 ## 题目 ![](https://cmd.dayi.ink/uploads/upload_4d93e02a4dc9f551add4925667037a65.png) ## 打开控制器 ![](https://cmd.dayi.ink/uploads/upload_82572579664badc6f5c6a5a74f3e3656.png) 看拓扑: ![](https://cmd.dayi.ink/uploads/upload_3da32da9a18e0f1c75fa323428255cfa.png) ## 拓扑: ![](https://cmd.dayi.ink/uploads/upload_85819263939b492ea4560439abd62149.png) ![](https://cmd.dayi.ink/uploads/upload_858fbea3d4ecd4d4ac41c0564ef84f76.png) ## Floodlight 负载均衡 ```bash sudo su # 安装docker apt install docker-compose -y #启动控制器 docker run -it -p 8080:8080 -p 6653:6653 --rm latarc/floodlight # 另外一个终端 sudo su mn -c python3 topo.py ``` 完整的拓扑需要先PINGALL控制器才能发现全部主机: ```bash pingall 1 ``` ```bash mininet> pingall 1 *** Ping: testing ping reachability h1 -> h2 X X X X X X h2 -> h1 X X h5 h6 h7 h8 h3 -> h1 h2 h4 h5 h6 h7 h8 h4 -> h1 h2 h3 h5 h6 h7 h8 h5 -> h1 h2 h3 h4 h6 h7 h8 h6 -> h1 h2 h3 h4 h5 h7 h8 h7 -> h1 h2 h3 h4 h5 h6 h8 h8 -> h1 h2 h3 h4 h5 h6 h7 *** Results: 14% dropped (48/56 received) mininet> pingall 1 *** Ping: testing ping reachability h1 -> h2 h3 h4 h5 h6 h7 h8 h2 -> h1 h3 h4 h5 h6 h7 h8 h3 -> h1 h2 h4 h5 h6 h7 h8 h4 -> h1 h2 h3 h5 h6 h7 h8 h5 -> h1 h2 h3 h4 h6 h7 h8 h6 -> h1 h2 h3 h4 h5 h7 h8 h7 -> h1 h2 h3 h4 h5 h6 h8 h8 -> h1 h2 h3 h4 h5 h6 h7 *** Results: 0% dropped (56/56 received) ``` ![](https://cmd.dayi.ink/uploads/upload_12d24986ec6d73ca8eecc538544b4c48.png) 拓扑大概这个样子: <http://localhost:8080/ui/pages/topology.html> ![](https://cmd.dayi.ink/uploads/upload_85819263939b492ea4560439abd62149.png) ![](https://cmd.dayi.ink/uploads/upload_4d9bd3b670e5aa4abaf7917a4d9f3867.png) ### 负载均衡脚本 ```python import requests import json import time import datetime # 常量定义 FLOODLIGHT_API_BASE_URL = 'http://127.0.0.1:8080' TARGET_SWITCH = '00:00:00:00:00:00:00:02' REDIRECT_PORT = 1 LOAD_THRESHOLD = 10 * 1024 # 10 MB CHECK_INTERVAL = 1 # 每1秒检查一次 AVERAGE_INTERVAL = 60 # 计算平均值的时间间隔为60秒 # HTTP请求头部 headers = { 'Content-Type': 'application/json', } # 存储过去每个端口的最后一个样本 last_samples = {} def calculate_speed(port, current_sample): global last_samples if port not in last_samples: last_samples[port] = current_sample return 0 last_sample = last_samples[port] time_diff = current_sample[0] - last_sample[0] if time_diff == 0: return 0 byte_diff = current_sample[1] - last_sample[1] speed = byte_diff / time_diff last_samples[port] = current_sample return speed def get_port_statistics(switch, port): response = requests.get(f"{FLOODLIGHT_API_BASE_URL}/wm/core/switch/{switch}/port/json", headers=headers) if response.status_code == 200: data = response.json() for port_stat in data['port_reply'][0]['port']: if str(port_stat['port_number']) == str(port): return port_stat else: raise Exception('获取端口统计信息失败') # 存储过去AVERAGE_INTERVAL秒的传输字节时间戳和字节数 transmit_bytes_history = [] def is_overloaded(port_stat, port): current_timestamp = int(time.time()) if isinstance(port_stat['transmit_bytes'], dict): transmit_bytes = int(port_stat['transmit_bytes']['value']) else: transmit_bytes = int(port_stat['transmit_bytes']) current_sample = (current_timestamp, transmit_bytes) transmit_bytes_history.append((port, current_sample)) while len(transmit_bytes_history) > 1 and (current_timestamp - transmit_bytes_history[0][1][0]) > AVERAGE_INTERVAL: transmit_bytes_history.pop(0) if len(transmit_bytes_history) < 2: return False total_bytes_diff = sum(transmit_bytes_history[i][1][1] - transmit_bytes_history[i-1][1][1] for i in range(1, len(transmit_bytes_history))) average_speed = total_bytes_diff / AVERAGE_INTERVAL real_time_speed = calculate_speed(port, current_sample) current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") print(f"[{current_time}] 端口[{port}] 实时速度: {real_time_speed/1024} MB/s, 平均速度: {average_speed} 字节每秒") return real_time_speed > LOAD_THRESHOLD def redirect_traffic(switch, in_port, out_port): flow_payload = { "switch": switch, "name": "redirect-flow", "cookie": "0", "priority": "32768", "in_port": str(in_port), "active": "true", "actions": f"output={out_port}" } response = requests.post(f"{FLOODLIGHT_API_BASE_URL}/wm/staticflowpusher/json", data=json.dumps(flow_payload), headers=headers) if response.status_code == 200: print(f"成功将流量从交换机 {switch} 端口 {in_port} 重定向到端口 {out_port}") else: raise Exception(f'下发流规则失败: {response.text}') def main(): monitor_ports = [1, 2] # 监控端口 1, 2, 和 3 try: while True: for port in monitor_ports: port_stat = get_port_statistics(TARGET_SWITCH, port) if is_overloaded(port_stat, port): if port ==1: dayi_redict = 2 if port ==2: dayi_redict =1 redirect_traffic(TARGET_SWITCH, port, dayi_redict) time.sleep(CHECK_INTERVAL) except KeyboardInterrupt: print("用户中断了脚本") except Exception as e: print(f"错误: {e}") if __name__ == "__main__": main() ``` 测速两个机子: h1 和 h8 ```bash h1 nohup iperf -s >> log1.log 2>&1 & h8 iperf -c 10.0.0.1 -l 1M mininet> h1 nohup iperf -s >> log1.log 2>&1 & mininet> h8 iperf -c 10.0.0.1 -l 1M ------------------------------------------------------------ Client connecting to 10.0.0.1, TCP port 5001 TCP window size: 85.3 KByte (default) ------------------------------------------------------------ [ 3] local 10.0.0.8 port 56288 connected with 10.0.0.1 port 5001 [ ID] Interval Transfer Bandwidth [ 3] 0.0-10.0 sec 42.3 GBytes 36.3 Gbits/sec mininet> ``` 执行结果: ![](https://cmd.dayi.ink/uploads/upload_af530f9687bf2f5763ad883e5edbed92.png) ![](https://cmd.dayi.ink/uploads/upload_3e0ab49f0f316ad831e8f6dcfa3653b2.png) 测速: ![](https://cmd.dayi.ink/uploads/upload_f8877fe20ec8bdf6ba47a0a151d9a160.png) ## RYU 1. 先代理ARP,防止广播风暴 直接用这个哥的ARP代理: <https://www.sdnlab.com/2318.html> 2. 启动脚本: ```bash sudo su # 安装docker apt install docker-compose -y mn -c # 启动拓扑 python3 topo.py ``` 注意环境目录 ```bash docker run -p 8080:8080 \ -p 6653:6633 \ -v $(pwd):/app \ -it latarc/ryu \ /app/ryu/arp-proxy.py \ /app/ryu/dayi-balance.py ``` ![](https://cmd.dayi.ink/uploads/upload_b046f6687fd2c0e2eb5959311c486670.png) 3. 测试 ![](https://cmd.dayi.ink/uploads/upload_50ead02467cd1e623a8a01277de9282b.png) ![](https://cmd.dayi.ink/uploads/upload_39cc0b4d568656010187e6e4be62efdf.png) 4. 代码(dayi-balance.py) ```python from ryu.base import app_manager from ryu.controller import ofp_event from ryu.controller.handler import MAIN_DISPATCHER, DEAD_DISPATCHER from ryu.controller.handler import set_ev_cls from ryu.lib import hub from ryu.ofproto import ofproto_v1_3 class LoadBalancer(app_manager.RyuApp): OFP_VERSIONS = [ofproto_v1_3.OFP_VERSION] def __init__(self, *args, **kwargs): super(LoadBalancer, self).__init__(*args, **kwargs) self.datapaths = {} self.monitor_thread = hub.spawn(self._monitor) # 处理交换机状态变化的事件 @set_ev_cls(ofp_event.EventOFPStateChange, [MAIN_DISPATCHER, DEAD_DISPATCHER]) def _state_change_handler(self, ev): datapath = ev.datapath if ev.state == MAIN_DISPATCHER: if datapath.id not in self.datapaths: self.logger.debug('注册数据路径: %016x', datapath.id) self.datapaths[datapath.id] = datapath elif ev.state == DEAD_DISPATCHER: if datapath.id in self.datapaths: self.logger.debug('注销数据路径: %016x', datapath.id) del self.datapaths[datapath.id] # 定时发送统计请求的线程 def _monitor(self): while True: for dp in self.datapaths.values(): self._request_stats(dp) hub.sleep(10) # 每10秒钟执行一次 # 向交换机发送端口统计请求 def _request_stats(self, datapath): self.logger.debug('发送统计请求: %016x', datapath.id) ofproto = datapath.ofproto parser = datapath.ofproto_parser req = parser.OFPPortStatsRequest(datapath, 0, ofproto.OFPP_ANY) datapath.send_msg(req) # 处理端口统计信息回复事件 @set_ev_cls(ofp_event.EventOFPPortStatsReply, MAIN_DISPATCHER) def _port_stats_reply_handler(self, ev): body = ev.msg.body for stat in body: self.logger.info(f'端口 {stat.port_no} 当前流量: 发送 {stat.tx_bytes} 字节, 接收 {stat.rx_bytes} 字节') # 判断端口2的接收字节是否超过阈值 if stat.port_no == 2 and stat.rx_bytes > 100000: self.logger.warning('端口 2 负载过高,正在下发流表以重定向流量...') self._redirect_traffic(ev.msg.datapath, stat.port_no) # 下发流表以重定向端口流量 def _redirect_traffic(self, datapath, port_no): ofproto = datapath.ofproto parser = datapath.ofproto_parser # 设置动作为通过端口1转发 actions = [parser.OFPActionOutput(1)] # 设置匹配条件为进入端口为port_no match = parser.OFPMatch(in_port=port_no) # 创建流表修改消息以安装新的流表项 flow_mod_msg = parser.OFPFlowMod( datapath=datapath, command=ofproto.OFPFC_ADD, match=match, instructions=[parser.OFPInstructionActions(ofproto.OFPIT_APPLY_ACTIONS, actions)], priority=ofproto.OFP_DEFAULT_PRIORITY ) # 发送流表修改消息 datapath.send_msg(flow_mod_msg) self.logger.info('流表已下发:重定向端口 %s 的流量到端口 1', port_no) ``` ## 附录 ### topo.py ```python from mininet.topo import Topo from mininet.net import Mininet from mininet.util import dumpNodeConnections from mininet.log import setLogLevel, info from mininet.node import CPULimitedHost from mininet.node import RemoteController from mininet.link import TCLink from mininet.cli import CLI class MyTopo(Topo): def __init__(self): "Create custom topo." # initilaize topology Topo.__init__(self) layer = 3 # Representing the height of the Binary tree switch_count = 0 host_count = 0 # creat main core router,default count is 3 router = [] count_router = 3 for i in range(0, count_router): switch_count = switch_count + 1 name = 's' + str(switch_count) router1 = self.addSwitch(name) router.append(router1) for i in range(0, count_router): for j in range(i + 1, count_router): self.addLink(router[i], router[j]) # create tree_topo sw1 = [] host = [] for i in range(0, layer): for j in range(0, 2 ** i): if not i == layer - 1: switch_count = switch_count + 1 str1 = 's' + str(switch_count) sw = self.addSwitch(str1) sw1.append(sw) else: host_count = host_count + 1 str1 = 'h' + str(host_count) if host_count < 9: address = "00:00:00:00:00:0" + str(host_count) else: address = "00:00:00:00:00:" + str(host_count) host1 = self.addHost(str1, mac=address) host.append(host1) count = 0 for i in range(0, 2 ** (layer - 2) - 1): self.addLink(sw1[i], sw1[2 * i + 1]) self.addLink(sw1[i], sw1[2 * i + 2]) for i in range(2 ** (layer - 2) - 1, 2 ** (layer - 1) - 1): self.addLink(sw1[i], host[count]) self.addLink(sw1[i], host[count + 1]) count = count + 2 # the second tree_topo sw2 = [] host2 = [] for i in range(0, layer): for j in range(0, 2 ** i): if not i == layer - 1: switch_count = switch_count + 1 str1 = 's' + str(switch_count) sw = self.addSwitch(str1) sw2.append(sw) else: host_count = host_count + 1 str1 = 'h' + str(host_count) if host_count < 9: address = "00:00:00:00:00:0" + str(host_count) else: address = "00:00:00:00:00:" + str(host_count) host1 = self.addHost(str1, mac=address) host2.append(host1) count = 0 for i in range(0, 2 ** (layer - 2) - 1): self.addLink(sw2[i], sw2[2 * i + 1]) self.addLink(sw2[i], sw2[2 * i + 2]) for i in range(2 ** (layer - 2) - 1, 2 ** (layer - 1) - 1): self.addLink(sw2[i], host2[count]) self.addLink(sw2[i], host2[count + 1]) count = count + 2 self.addLink(sw1[0], router[-2]) self.addLink(sw2[0], router[-1]) if __name__ == '__main__': setLogLevel('info') info('*** Creating network\n') topo = MyTopo() net = Mininet(topo=topo, controller=None) net.addController('c0', controller=RemoteController, ip='127.0.0.1', port=6653, autoSetMac=True) net.start() CLI(net) ```` ### ARP代理 来自:<https://www.sdnlab.com/2318.html> ```python # Author:muzixing # Time:2014/10/19 # from ryu.base import app_manager from ryu.controller import ofp_event from ryu.controller.handler import CONFIG_DISPATCHER, MAIN_DISPATCHER from ryu.controller.handler import set_ev_cls from ryu.ofproto import ofproto_v1_3 from ryu.lib.packet import packet from ryu.lib.packet import ethernet from ryu.lib.packet import arp ETHERNET = ethernet.ethernet.__name__ ETHERNET_MULTICAST = "ff:ff:ff:ff:ff:ff" ARP = arp.arp.__name__ class ARP_PROXY_13(app_manager.RyuApp): OFP_VERSIONS = [ofproto_v1_3.OFP_VERSION] def __init__(self, *args, **kwargs): super(ARP_PROXY_13, self).__init__(*args, **kwargs) self.mac_to_port = {} self.arp_table = {} self.sw = {} @set_ev_cls(ofp_event.EventOFPSwitchFeatures, CONFIG_DISPATCHER) def switch_features_handler(self, ev): datapath = ev.msg.datapath ofproto = datapath.ofproto parser = datapath.ofproto_parser # install table-miss flow entry # # We specify NO BUFFER to max_len of the output action due to # OVS bug. At this moment, if we specify a lesser number, e.g., # 128, OVS will send Packet-In with invalid buffer_id and # truncated packet data. In that case, we cannot output packets # correctly. match = parser.OFPMatch() actions = [parser.OFPActionOutput(ofproto.OFPP_CONTROLLER, ofproto.OFPCML_NO_BUFFER)] self.add_flow(datapath, 0, match, actions) def add_flow(self, datapath, priority, match, actions): ofproto = datapath.ofproto parser = datapath.ofproto_parser inst = [parser.OFPInstructionActions(ofproto.OFPIT_APPLY_ACTIONS, actions)] mod = parser.OFPFlowMod(datapath=datapath, priority=priority, idle_timeout=5, hard_timeout=15, match=match, instructions=inst) datapath.send_msg(mod) @set_ev_cls(ofp_event.EventOFPPacketIn, MAIN_DISPATCHER) def _packet_in_handler(self, ev): msg = ev.msg datapath = msg.datapath ofproto = datapath.ofproto parser = datapath.ofproto_parser in_port = msg.match['in_port'] pkt = packet.Packet(msg.data) eth = pkt.get_protocols(ethernet.ethernet)[0] dst = eth.dst src = eth.src dpid = datapath.id header_list = dict( (p.protocol_name, p)for p in pkt.protocols if type(p) != str) if ARP in header_list: self.arp_table[header_list[ARP].src_ip] = src # ARP learning self.mac_to_port.setdefault(dpid, {}) self.logger.info("packet in %s %s %s %s", dpid, src, dst, in_port) # learn a mac address to avoid FLOOD next time. self.mac_to_port[dpid][src] = in_port if dst in self.mac_to_port[dpid]: out_port = self.mac_to_port[dpid][dst] else: if self.arp_handler(header_list, datapath, in_port, msg.buffer_id): # 1:reply or drop; 0: flood # print "ARP_PROXY_13" print("ARP_PROXY_13") return None else: out_port = ofproto.OFPP_FLOOD # print 'OFPP_FLOOD' print('OFPP_FLOOD') actions = [parser.OFPActionOutput(out_port)] # install a flow to avoid packet_in next time if out_port != ofproto.OFPP_FLOOD: match = parser.OFPMatch(in_port=in_port, eth_dst=dst) self.add_flow(datapath, 1, match, actions) data = None if msg.buffer_id == ofproto.OFP_NO_BUFFER: data = msg.data out = parser.OFPPacketOut(datapath=datapath, buffer_id=msg.buffer_id, in_port=in_port, actions=actions, data=data) datapath.send_msg(out) def arp_handler(self, header_list, datapath, in_port, msg_buffer_id): header_list = header_list datapath = datapath in_port = in_port if ETHERNET in header_list: eth_dst = header_list[ETHERNET].dst eth_src = header_list[ETHERNET].src if eth_dst == ETHERNET_MULTICAST and ARP in header_list: arp_dst_ip = header_list[ARP].dst_ip if (datapath.id, eth_src, arp_dst_ip) in self.sw: # Break the loop if self.sw[(datapath.id, eth_src, arp_dst_ip)] != in_port: out = datapath.ofproto_parser.OFPPacketOut( datapath=datapath, buffer_id=datapath.ofproto.OFP_NO_BUFFER, in_port=in_port, actions=[], data=None) datapath.send_msg(out) return True else: self.sw[(datapath.id, eth_src, arp_dst_ip)] = in_port if ARP in header_list: hwtype = header_list[ARP].hwtype proto = header_list[ARP].proto hlen = header_list[ARP].hlen plen = header_list[ARP].plen opcode = header_list[ARP].opcode arp_src_ip = header_list[ARP].src_ip arp_dst_ip = header_list[ARP].dst_ip actions = [] if opcode == arp.ARP_REQUEST: if arp_dst_ip in self.arp_table: # arp reply actions.append(datapath.ofproto_parser.OFPActionOutput( in_port) ) ARP_Reply = packet.Packet() ARP_Reply.add_protocol(ethernet.ethernet( ethertype=header_list[ETHERNET].ethertype, dst=eth_src, src=self.arp_table[arp_dst_ip])) ARP_Reply.add_protocol(arp.arp( opcode=arp.ARP_REPLY, src_mac=self.arp_table[arp_dst_ip], src_ip=arp_dst_ip, dst_mac=eth_src, dst_ip=arp_src_ip)) ARP_Reply.serialize() out = datapath.ofproto_parser.OFPPacketOut( datapath=datapath, buffer_id=datapath.ofproto.OFP_NO_BUFFER, in_port=datapath.ofproto.OFPP_CONTROLLER, actions=actions, data=ARP_Reply.data) datapath.send_msg(out) return True return False ```
{}