# SDN作业7 负载均衡
> 完整复现可能较难
> 实在有点头疼了,代码部分不是特别完整,还是可能有问题,先这样吧。
## 题目

## 打开控制器

看拓扑:

## 拓扑:


## Floodlight 负载均衡
```bash
sudo su
# 安装docker
apt install docker-compose -y
#启动控制器
docker run -it -p 8080:8080 -p 6653:6653 --rm latarc/floodlight
# 另外一个终端
sudo su
mn -c
python3 topo.py
```
完整的拓扑需要先PINGALL控制器才能发现全部主机:
```bash
pingall 1
```
```bash
mininet> pingall 1
*** Ping: testing ping reachability
h1 -> h2 X X X X X X
h2 -> h1 X X h5 h6 h7 h8
h3 -> h1 h2 h4 h5 h6 h7 h8
h4 -> h1 h2 h3 h5 h6 h7 h8
h5 -> h1 h2 h3 h4 h6 h7 h8
h6 -> h1 h2 h3 h4 h5 h7 h8
h7 -> h1 h2 h3 h4 h5 h6 h8
h8 -> h1 h2 h3 h4 h5 h6 h7
*** Results: 14% dropped (48/56 received)
mininet> pingall 1
*** Ping: testing ping reachability
h1 -> h2 h3 h4 h5 h6 h7 h8
h2 -> h1 h3 h4 h5 h6 h7 h8
h3 -> h1 h2 h4 h5 h6 h7 h8
h4 -> h1 h2 h3 h5 h6 h7 h8
h5 -> h1 h2 h3 h4 h6 h7 h8
h6 -> h1 h2 h3 h4 h5 h7 h8
h7 -> h1 h2 h3 h4 h5 h6 h8
h8 -> h1 h2 h3 h4 h5 h6 h7
*** Results: 0% dropped (56/56 received)
```

拓扑大概这个样子:
<http://localhost:8080/ui/pages/topology.html>


### 负载均衡脚本
```python
import requests
import json
import time
import datetime
# 常量定义
FLOODLIGHT_API_BASE_URL = 'http://127.0.0.1:8080'
TARGET_SWITCH = '00:00:00:00:00:00:00:02'
REDIRECT_PORT = 1
LOAD_THRESHOLD = 10 * 1024 # 10 MB
CHECK_INTERVAL = 1 # 每1秒检查一次
AVERAGE_INTERVAL = 60 # 计算平均值的时间间隔为60秒
# HTTP请求头部
headers = {
'Content-Type': 'application/json',
}
# 存储过去每个端口的最后一个样本
last_samples = {}
def calculate_speed(port, current_sample):
global last_samples
if port not in last_samples:
last_samples[port] = current_sample
return 0
last_sample = last_samples[port]
time_diff = current_sample[0] - last_sample[0]
if time_diff == 0:
return 0
byte_diff = current_sample[1] - last_sample[1]
speed = byte_diff / time_diff
last_samples[port] = current_sample
return speed
def get_port_statistics(switch, port):
response = requests.get(f"{FLOODLIGHT_API_BASE_URL}/wm/core/switch/{switch}/port/json", headers=headers)
if response.status_code == 200:
data = response.json()
for port_stat in data['port_reply'][0]['port']:
if str(port_stat['port_number']) == str(port):
return port_stat
else:
raise Exception('获取端口统计信息失败')
# 存储过去AVERAGE_INTERVAL秒的传输字节时间戳和字节数
transmit_bytes_history = []
def is_overloaded(port_stat, port):
current_timestamp = int(time.time())
if isinstance(port_stat['transmit_bytes'], dict):
transmit_bytes = int(port_stat['transmit_bytes']['value'])
else:
transmit_bytes = int(port_stat['transmit_bytes'])
current_sample = (current_timestamp, transmit_bytes)
transmit_bytes_history.append((port, current_sample))
while len(transmit_bytes_history) > 1 and (current_timestamp - transmit_bytes_history[0][1][0]) > AVERAGE_INTERVAL:
transmit_bytes_history.pop(0)
if len(transmit_bytes_history) < 2:
return False
total_bytes_diff = sum(transmit_bytes_history[i][1][1] - transmit_bytes_history[i-1][1][1] for i in range(1, len(transmit_bytes_history)))
average_speed = total_bytes_diff / AVERAGE_INTERVAL
real_time_speed = calculate_speed(port, current_sample)
current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print(f"[{current_time}] 端口[{port}] 实时速度: {real_time_speed/1024} MB/s, 平均速度: {average_speed} 字节每秒")
return real_time_speed > LOAD_THRESHOLD
def redirect_traffic(switch, in_port, out_port):
flow_payload = {
"switch": switch,
"name": "redirect-flow",
"cookie": "0",
"priority": "32768",
"in_port": str(in_port),
"active": "true",
"actions": f"output={out_port}"
}
response = requests.post(f"{FLOODLIGHT_API_BASE_URL}/wm/staticflowpusher/json", data=json.dumps(flow_payload), headers=headers)
if response.status_code == 200:
print(f"成功将流量从交换机 {switch} 端口 {in_port} 重定向到端口 {out_port}")
else:
raise Exception(f'下发流规则失败: {response.text}')
def main():
monitor_ports = [1, 2] # 监控端口 1, 2, 和 3
try:
while True:
for port in monitor_ports:
port_stat = get_port_statistics(TARGET_SWITCH, port)
if is_overloaded(port_stat, port):
if port ==1:
dayi_redict = 2
if port ==2:
dayi_redict =1
redirect_traffic(TARGET_SWITCH, port, dayi_redict)
time.sleep(CHECK_INTERVAL)
except KeyboardInterrupt:
print("用户中断了脚本")
except Exception as e:
print(f"错误: {e}")
if __name__ == "__main__":
main()
```
测速两个机子:
h1 和 h8
```bash
h1 nohup iperf -s >> log1.log 2>&1 &
h8 iperf -c 10.0.0.1 -l 1M
mininet> h1 nohup iperf -s >> log1.log 2>&1 &
mininet> h8 iperf -c 10.0.0.1 -l 1M
------------------------------------------------------------
Client connecting to 10.0.0.1, TCP port 5001
TCP window size: 85.3 KByte (default)
------------------------------------------------------------
[ 3] local 10.0.0.8 port 56288 connected with 10.0.0.1 port 5001
[ ID] Interval Transfer Bandwidth
[ 3] 0.0-10.0 sec 42.3 GBytes 36.3 Gbits/sec
mininet>
```
执行结果:


测速:

## RYU
1. 先代理ARP,防止广播风暴
直接用这个哥的ARP代理:
<https://www.sdnlab.com/2318.html>
2. 启动脚本:
```bash
sudo su
# 安装docker
apt install docker-compose -y
mn -c
# 启动拓扑
python3 topo.py
```
注意环境目录
```bash
docker run -p 8080:8080 \
-p 6653:6633 \
-v $(pwd):/app \
-it latarc/ryu \
/app/ryu/arp-proxy.py \
/app/ryu/dayi-balance.py
```

3. 测试


4. 代码(dayi-balance.py)
```python
from ryu.base import app_manager
from ryu.controller import ofp_event
from ryu.controller.handler import MAIN_DISPATCHER, DEAD_DISPATCHER
from ryu.controller.handler import set_ev_cls
from ryu.lib import hub
from ryu.ofproto import ofproto_v1_3
class LoadBalancer(app_manager.RyuApp):
OFP_VERSIONS = [ofproto_v1_3.OFP_VERSION]
def __init__(self, *args, **kwargs):
super(LoadBalancer, self).__init__(*args, **kwargs)
self.datapaths = {}
self.monitor_thread = hub.spawn(self._monitor)
# 处理交换机状态变化的事件
@set_ev_cls(ofp_event.EventOFPStateChange, [MAIN_DISPATCHER, DEAD_DISPATCHER])
def _state_change_handler(self, ev):
datapath = ev.datapath
if ev.state == MAIN_DISPATCHER:
if datapath.id not in self.datapaths:
self.logger.debug('注册数据路径: %016x', datapath.id)
self.datapaths[datapath.id] = datapath
elif ev.state == DEAD_DISPATCHER:
if datapath.id in self.datapaths:
self.logger.debug('注销数据路径: %016x', datapath.id)
del self.datapaths[datapath.id]
# 定时发送统计请求的线程
def _monitor(self):
while True:
for dp in self.datapaths.values():
self._request_stats(dp)
hub.sleep(10) # 每10秒钟执行一次
# 向交换机发送端口统计请求
def _request_stats(self, datapath):
self.logger.debug('发送统计请求: %016x', datapath.id)
ofproto = datapath.ofproto
parser = datapath.ofproto_parser
req = parser.OFPPortStatsRequest(datapath, 0, ofproto.OFPP_ANY)
datapath.send_msg(req)
# 处理端口统计信息回复事件
@set_ev_cls(ofp_event.EventOFPPortStatsReply, MAIN_DISPATCHER)
def _port_stats_reply_handler(self, ev):
body = ev.msg.body
for stat in body:
self.logger.info(f'端口 {stat.port_no} 当前流量: 发送 {stat.tx_bytes} 字节, 接收 {stat.rx_bytes} 字节')
# 判断端口2的接收字节是否超过阈值
if stat.port_no == 2 and stat.rx_bytes > 100000:
self.logger.warning('端口 2 负载过高,正在下发流表以重定向流量...')
self._redirect_traffic(ev.msg.datapath, stat.port_no)
# 下发流表以重定向端口流量
def _redirect_traffic(self, datapath, port_no):
ofproto = datapath.ofproto
parser = datapath.ofproto_parser
# 设置动作为通过端口1转发
actions = [parser.OFPActionOutput(1)]
# 设置匹配条件为进入端口为port_no
match = parser.OFPMatch(in_port=port_no)
# 创建流表修改消息以安装新的流表项
flow_mod_msg = parser.OFPFlowMod(
datapath=datapath,
command=ofproto.OFPFC_ADD,
match=match,
instructions=[parser.OFPInstructionActions(ofproto.OFPIT_APPLY_ACTIONS, actions)],
priority=ofproto.OFP_DEFAULT_PRIORITY
)
# 发送流表修改消息
datapath.send_msg(flow_mod_msg)
self.logger.info('流表已下发:重定向端口 %s 的流量到端口 1', port_no)
```
## 附录
### topo.py
```python
from mininet.topo import Topo
from mininet.net import Mininet
from mininet.util import dumpNodeConnections
from mininet.log import setLogLevel, info
from mininet.node import CPULimitedHost
from mininet.node import RemoteController
from mininet.link import TCLink
from mininet.cli import CLI
class MyTopo(Topo):
def __init__(self):
"Create custom topo."
# initilaize topology
Topo.__init__(self)
layer = 3 # Representing the height of the Binary tree
switch_count = 0
host_count = 0
# creat main core router,default count is 3
router = []
count_router = 3
for i in range(0, count_router):
switch_count = switch_count + 1
name = 's' + str(switch_count)
router1 = self.addSwitch(name)
router.append(router1)
for i in range(0, count_router):
for j in range(i + 1, count_router):
self.addLink(router[i], router[j])
# create tree_topo
sw1 = []
host = []
for i in range(0, layer):
for j in range(0, 2 ** i):
if not i == layer - 1:
switch_count = switch_count + 1
str1 = 's' + str(switch_count)
sw = self.addSwitch(str1)
sw1.append(sw)
else:
host_count = host_count + 1
str1 = 'h' + str(host_count)
if host_count < 9:
address = "00:00:00:00:00:0" + str(host_count)
else:
address = "00:00:00:00:00:" + str(host_count)
host1 = self.addHost(str1, mac=address)
host.append(host1)
count = 0
for i in range(0, 2 ** (layer - 2) - 1):
self.addLink(sw1[i], sw1[2 * i + 1])
self.addLink(sw1[i], sw1[2 * i + 2])
for i in range(2 ** (layer - 2) - 1, 2 ** (layer - 1) - 1):
self.addLink(sw1[i], host[count])
self.addLink(sw1[i], host[count + 1])
count = count + 2
# the second tree_topo
sw2 = []
host2 = []
for i in range(0, layer):
for j in range(0, 2 ** i):
if not i == layer - 1:
switch_count = switch_count + 1
str1 = 's' + str(switch_count)
sw = self.addSwitch(str1)
sw2.append(sw)
else:
host_count = host_count + 1
str1 = 'h' + str(host_count)
if host_count < 9:
address = "00:00:00:00:00:0" + str(host_count)
else:
address = "00:00:00:00:00:" + str(host_count)
host1 = self.addHost(str1, mac=address)
host2.append(host1)
count = 0
for i in range(0, 2 ** (layer - 2) - 1):
self.addLink(sw2[i], sw2[2 * i + 1])
self.addLink(sw2[i], sw2[2 * i + 2])
for i in range(2 ** (layer - 2) - 1, 2 ** (layer - 1) - 1):
self.addLink(sw2[i], host2[count])
self.addLink(sw2[i], host2[count + 1])
count = count + 2
self.addLink(sw1[0], router[-2])
self.addLink(sw2[0], router[-1])
if __name__ == '__main__':
setLogLevel('info')
info('*** Creating network\n')
topo = MyTopo()
net = Mininet(topo=topo, controller=None)
net.addController('c0', controller=RemoteController, ip='127.0.0.1', port=6653, autoSetMac=True)
net.start()
CLI(net)
````
### ARP代理
来自:<https://www.sdnlab.com/2318.html>
```python
# Author:muzixing
# Time:2014/10/19
#
from ryu.base import app_manager
from ryu.controller import ofp_event
from ryu.controller.handler import CONFIG_DISPATCHER, MAIN_DISPATCHER
from ryu.controller.handler import set_ev_cls
from ryu.ofproto import ofproto_v1_3
from ryu.lib.packet import packet
from ryu.lib.packet import ethernet
from ryu.lib.packet import arp
ETHERNET = ethernet.ethernet.__name__
ETHERNET_MULTICAST = "ff:ff:ff:ff:ff:ff"
ARP = arp.arp.__name__
class ARP_PROXY_13(app_manager.RyuApp):
OFP_VERSIONS = [ofproto_v1_3.OFP_VERSION]
def __init__(self, *args, **kwargs):
super(ARP_PROXY_13, self).__init__(*args, **kwargs)
self.mac_to_port = {}
self.arp_table = {}
self.sw = {}
@set_ev_cls(ofp_event.EventOFPSwitchFeatures, CONFIG_DISPATCHER)
def switch_features_handler(self, ev):
datapath = ev.msg.datapath
ofproto = datapath.ofproto
parser = datapath.ofproto_parser
# install table-miss flow entry
#
# We specify NO BUFFER to max_len of the output action due to
# OVS bug. At this moment, if we specify a lesser number, e.g.,
# 128, OVS will send Packet-In with invalid buffer_id and
# truncated packet data. In that case, we cannot output packets
# correctly.
match = parser.OFPMatch()
actions = [parser.OFPActionOutput(ofproto.OFPP_CONTROLLER,
ofproto.OFPCML_NO_BUFFER)]
self.add_flow(datapath, 0, match, actions)
def add_flow(self, datapath, priority, match, actions):
ofproto = datapath.ofproto
parser = datapath.ofproto_parser
inst = [parser.OFPInstructionActions(ofproto.OFPIT_APPLY_ACTIONS,
actions)]
mod = parser.OFPFlowMod(datapath=datapath, priority=priority,
idle_timeout=5, hard_timeout=15,
match=match, instructions=inst)
datapath.send_msg(mod)
@set_ev_cls(ofp_event.EventOFPPacketIn, MAIN_DISPATCHER)
def _packet_in_handler(self, ev):
msg = ev.msg
datapath = msg.datapath
ofproto = datapath.ofproto
parser = datapath.ofproto_parser
in_port = msg.match['in_port']
pkt = packet.Packet(msg.data)
eth = pkt.get_protocols(ethernet.ethernet)[0]
dst = eth.dst
src = eth.src
dpid = datapath.id
header_list = dict(
(p.protocol_name, p)for p in pkt.protocols if type(p) != str)
if ARP in header_list:
self.arp_table[header_list[ARP].src_ip] = src # ARP learning
self.mac_to_port.setdefault(dpid, {})
self.logger.info("packet in %s %s %s %s", dpid, src, dst, in_port)
# learn a mac address to avoid FLOOD next time.
self.mac_to_port[dpid][src] = in_port
if dst in self.mac_to_port[dpid]:
out_port = self.mac_to_port[dpid][dst]
else:
if self.arp_handler(header_list, datapath, in_port, msg.buffer_id):
# 1:reply or drop; 0: flood
# print "ARP_PROXY_13"
print("ARP_PROXY_13")
return None
else:
out_port = ofproto.OFPP_FLOOD
# print 'OFPP_FLOOD'
print('OFPP_FLOOD')
actions = [parser.OFPActionOutput(out_port)]
# install a flow to avoid packet_in next time
if out_port != ofproto.OFPP_FLOOD:
match = parser.OFPMatch(in_port=in_port, eth_dst=dst)
self.add_flow(datapath, 1, match, actions)
data = None
if msg.buffer_id == ofproto.OFP_NO_BUFFER:
data = msg.data
out = parser.OFPPacketOut(datapath=datapath, buffer_id=msg.buffer_id,
in_port=in_port, actions=actions, data=data)
datapath.send_msg(out)
def arp_handler(self, header_list, datapath, in_port, msg_buffer_id):
header_list = header_list
datapath = datapath
in_port = in_port
if ETHERNET in header_list:
eth_dst = header_list[ETHERNET].dst
eth_src = header_list[ETHERNET].src
if eth_dst == ETHERNET_MULTICAST and ARP in header_list:
arp_dst_ip = header_list[ARP].dst_ip
if (datapath.id, eth_src, arp_dst_ip) in self.sw: # Break the loop
if self.sw[(datapath.id, eth_src, arp_dst_ip)] != in_port:
out = datapath.ofproto_parser.OFPPacketOut(
datapath=datapath,
buffer_id=datapath.ofproto.OFP_NO_BUFFER,
in_port=in_port,
actions=[], data=None)
datapath.send_msg(out)
return True
else:
self.sw[(datapath.id, eth_src, arp_dst_ip)] = in_port
if ARP in header_list:
hwtype = header_list[ARP].hwtype
proto = header_list[ARP].proto
hlen = header_list[ARP].hlen
plen = header_list[ARP].plen
opcode = header_list[ARP].opcode
arp_src_ip = header_list[ARP].src_ip
arp_dst_ip = header_list[ARP].dst_ip
actions = []
if opcode == arp.ARP_REQUEST:
if arp_dst_ip in self.arp_table: # arp reply
actions.append(datapath.ofproto_parser.OFPActionOutput(
in_port)
)
ARP_Reply = packet.Packet()
ARP_Reply.add_protocol(ethernet.ethernet(
ethertype=header_list[ETHERNET].ethertype,
dst=eth_src,
src=self.arp_table[arp_dst_ip]))
ARP_Reply.add_protocol(arp.arp(
opcode=arp.ARP_REPLY,
src_mac=self.arp_table[arp_dst_ip],
src_ip=arp_dst_ip,
dst_mac=eth_src,
dst_ip=arp_src_ip))
ARP_Reply.serialize()
out = datapath.ofproto_parser.OFPPacketOut(
datapath=datapath,
buffer_id=datapath.ofproto.OFP_NO_BUFFER,
in_port=datapath.ofproto.OFPP_CONTROLLER,
actions=actions, data=ARP_Reply.data)
datapath.send_msg(out)
return True
return False
```