网络测试仪,网络流量测试,案例分享(编号1)Python+Json调用Xena Valkyrie硬件实现网络流量性能测试,可用于验证Performance,Latency,Jitter指标,能灵活适配Pair,Rotate,Ful-Mesh等多种拓扑环境
硬件:XENA Valkyrie 或 Vantage主机,测试板卡不限,本方法适用于其100M~400G所有速率端口
环境配置:Python 3
实现功能:
1.控制流量仪进行流量测试,预定配置的流量发送,报文统计,丢包率,延迟等信息的统计
2.单个Python脚本配合不同的JSON对象文件,来实现不同端口数量,不同拓扑模型下的流量测试
3.在发包测试前及运行过程中都对发包端口的链路状态进行监控100M/1G/2.5G/5G/10G
4.嵌入ARP探测,获取DUT的MAC信息,并记录在测试日志中
5.统计端口流量的Tx,Rx信息,丢包率,延迟信息
设计思路:main文件控制主流程,嵌入Statistics模块进行数据统计,CreateClass模块进行命令预定义
运行效果:
主要实现代码:
1 import threading, time, json, random, queue, types, sys, socket
2 from binascii import hexlify
3 from TestUtilsL23 import XenaScriptTools
4 from CreateClass import StreamCreate
5 from Statistics import CollectTestresult
6
7 def BuildDict(ports, configvalue, watching, watching_2):
8 #Generate ports Dictionary
9 a = 1
10 for port in ports:
11 ip_address_b = configvalue[\'IP\'][0:10]
12 ip_address_c = ip_address_b + str(a)
13 mac_a = configvalue[\'mac\'][0:10]
14 if a > 9:
15 mac_address = mac_a + str(a)
16 else:
17 mac_address = mac_a + \'0\' + str(a)
18 ip_address_a = hexlify(socket.inet_aton(ip_address_c)).decode()
19 mac_ip = [str(mac_address), str(ip_address_a)]
20 watching[port] = mac_ip
21 a += 1
22
23 watching_2[\'type_eth\'] = \'FFFF\',
24 watching_2[\'type_vlan\'] = \'8100\',
25 watching_2[\'type_ip\'] = \'08004500006A000000007F11BA2D\',
26 watching_2[\'type_udp\'] = \'00000000\',
27 watching_2[\'type_tcp\'] = \'000000000000000050000000E5A40000\',
28 watching_2[\'tcpid\'] = [hex(int(configvalue[\'tcpPort\'][0]))[2:].zfill(4) + hex(int(configvalue[\'tcpPort\'][1]))[2:].zfill(4)]
29
30
31 def PrintMes(ports, configvalue, ip_address):
32 localtime = time.asctime( time.localtime(time.time()) )
33 print("\n##############################################")
34 print(" 测试配置 ")
35 print("##############################################\n")
36 print("测试时间 : " + str(localtime))
37 print("设备IP地址 : " + ip_address)
38 print("测试端口 : " + str(ports))
39 print("测试模式 : " + configvalue[\'Stype\'])
40 print("持续时间 : " + configvalue[\'testtime\'])
41 print("报文类型 : " + configvalue[\'header_type\'])
42 print("报文长度 : " + configvalue[\'packet\'])
43 if configvalue[\'Stype\'] == \'Aggregation\':
44 print("上行VLAN : " + str(configvalue[\'uvlan\']))
45 print("下行VLAN : " + str(configvalue[\'uvlan\']))
46 print("WAN速率 : " + str(configvalue[\'wanrate\']))
47 print("LAN速率 : " + str(configvalue[\'lanrate\']))
48 else:
49 print("VLAN : " + str(configvalue[\'uvlan\']))
50 print("速率设置 : " + str(configvalue[\'wanrate\']))
51 print("端口速率 : " + str(configvalue[\'portrate\']))
52 print("学习时间 : " + configvalue[\'learntime\'])
53 print("丢包率设置 : " + configvalue[\'threshold\'])
54 if configvalue[\'snlearnenable\'] == \'1\':
55 print("自动学习SN : Enable")
56 else:
57 print("自动学习SN : Disable")
58 if configvalue[\'portspeedcheck\'] == \'1\':
59 print("端口速率检测 : Enable" )
60 else:
61 print("端口速率检测 : Disable" )
62
63
64 def Maclearning(xm, ports, configvalue):
65 if configvalue[\'snlearnenable\'] == \'1\':
66 xm.SendExpectOK(ports [1] + " PS_CREATE [0]")
67 xm.SendExpectOK(ports [1] + " ps_tpldid [0] 0")
68 xm.SendExpectOK(ports [1] + " PS_PACKETLENGTH [0] FIXED 70 1518")
69 xm.SendExpectOK(ports [1] + " PS_HEADERPROTOCOL [0] ETHERNET ARP")
70 learnIP_hex = hexlify(socket.inet_aton(configvalue[\'learnIP\'])).decode()
71 xm.SendExpectOK(ports [1] + " PS_PACKETHEADER [0] 0xFFFFFFFFFFFF00000000000108060001080006040001000000000001C0A80164FFFFFFFFFFFF" + learnIP_hex)
72 xm.SendExpectOK(ports [1] + " PS_RATEPPS [0] 2")
73 xm.SendExpectOK(ports [1] + " PS_ENABLE [0] on")
74 xm.SendExpectOK(ports [1] + " P_TRAFFIC ON")
75 xm.SendExpectOK(ports [1] + " P_CAPTURE ON")
76 time.sleep(1)
77 xm.SendExpectOK(ports [1] + " P_CAPTURE OFF")
78 xm.SendExpectOK(ports [1] + " P_TRAFFIC OFF")
79 SN = xm.Send(ports[1] + " PC_PACKET [0] ?")
80 SN = SN.split(\' \')[-1]
81 xm.SendExpectOK(ports [1] + " PS_DELETE [0]")
82 Serial = str(SN)
83 if SN == \'<BADINDEX>\':
84 print ("\nTest result :Failed;[ARP learning failed!!]")
85 sys.exit()
86 print ("DUT SN : " + str(Serial[16:28]))
87
88
89 def PortSpeed(xm, ports, PSC, configvalue):
90 if configvalue[\'PRcheck\'] == \'1\':
91 xm.PortSyncCheck(ports)
92 i = 0
93 for port in ports:
94 PS = xm.Send(port + \' p_speed ?\')
95 PS = PS.split(\' \')[-1]
96 PSC.append(str(PS))
97 if configvalue[\'portspeedcheck\'] == \'1\':
98 if str(PS) != configvalue[\'portrate\'][i]:
99 print (\'Test Result:Failed;[\' + port + \' 端口速率不匹配,请检查配置...]\')
100 sys.exit()
101 i += 1
102
103
104 def runtest(xm, ports, configvalue):
105 xm.PortTrafficStart(ports)
106 time.sleep(float(configvalue[\'learntime\']))
107 xm.PortTrafficStop(ports)
108 time.sleep(0.5)
109 xm.Statisticsclear(ports)
110 time.sleep(0.5)
111 xm.Porttimelimit(ports, configvalue[\'testtime\'])
112 time.sleep(0.2)
113 xm.PortTrafficStart(ports)
114 count = 0
115 print ("\n开始打流,请耐心等候, 测试时间约为" + configvalue[\'testtime\'] + "秒......\n")
116 while True:
117 print (">>>>>>>>>", end=\'\')
118 xm.Send(ports[0] + " P_TRAFFIC ?")
119 sys.stdout.flush()
120 count += 1
121 time.sleep(1)
122 if count > int(configvalue[\'testtime\']):
123 print (\'\n\n测试结束,正在收集测试数据......\')
124 break
125 print()
126 xm.PortTrafficStop(ports)
127 time.sleep(1)
128
129
130 def main(argv):
131 a = sys.argv
132 configvalue = {}
133 filename = str(a[1])
134 with open(filename,\'rb\') as f:
135 configs=json.loads(f.read())
136
137 ip_address = configs.get(\'ip_address\')
138 ports = configs.get(\'ports\', \'\')
139 configvalue[\'uvlan\'] = configs.get(\'uvlan\', \'\')
140 configvalue[\'dvlan\'] = configs.get(\'dvlan\', \'\')
141 configvalue[\'tcpPort\'] = configs.get(\'tcpUdpPort\', \'\')
142 configvalue[\'packet\'] = configs.get(\'packet\', \'\')
143 configvalue[\'testtime\'] = configs.get(\'testtime\', \'\')
144 configvalue[\'threshold\'] = configs.get(\'threshold\', \'\')
145 configvalue[\'header_type\'] = configs.get(\'headertype\', \'\')
146 configvalue[\'learntime\'] = configs.get(\'learntime\', \'\')
147 configvalue[\'payload\'] = configs.get(\'payload\', \'\')
148 configvalue[\'wanrate\'] = configs.get(\'wanrate\', \'\')
149 configvalue[\'lanrate\'] = configs.get(\'lanrate\', \'\')
150 configvalue[\'portrate\'] = configs.get(\'portrate\', \'\')
151 configvalue[\'PRcheck\'] = configs.get(\'PRcheck\', \'\')
152 configvalue[\'IP\'] = configs.get(\'IP\', \'\')
153 configvalue[\'mac\'] = configs.get(\'mac\', \'\')
154 configvalue[\'snlearnenable\'] = configs.get(\'snlernenable\', \'\')
155 configvalue[\'learnIP\'] = configs.get(\'learnIP\', \'\')
156 configvalue[\'Stype\'] = configs.get(\'Stype\', \'\')
157 configvalue[\'portspeedcheck\'] = configs.get(\'PScheck\', \'\')
158
159 PSC = []
160 FCS = []
161 watching = {}
162 watching_2 = {}
163 testresult = {}
164
165 PrintMes(ports, configvalue, ip_address)
166
167 xm = XenaScriptTools(ip_address)
168 xm.LogonSetOwner("xena", "python_test_1")
169 xm.PortRelinquish(ports)
170 xm.PortReserve(ports)
171 xm.PortTrafficStop(ports)
172 xm.StreamsDelete(ports)
173 sc = StreamCreate(xm, ports, watching, watching_2, configvalue)
174 cr = CollectTestresult(xm, ports, PSC, configvalue, testresult, FCS)
175
176
177 BuildDict(ports, configvalue, watching, watching_2)
178 PortSpeed(xm, ports, PSC, configvalue)
179 if configvalue[\'snlearnenable\'] == \'1\':
180 Maclearning(xm, ports, configvalue)
181 if configvalue[\'Stype\'] == \'Loopback\':
182 sc.Loopback(xm, ports, watching, watching_2, configvalue)
183 runtest(xm, ports, configvalue)
184 cr.LoopbackStatistics(xm, ports, testresult)
185 if configvalue[\'Stype\'] == \'Eachother\':
186 sc.Eachother(xm, ports, watching, watching_2, configvalue)
187 runtest(xm, ports, configvalue)
188 cr.EachotherStatistics(xm, ports, testresult)
189 if configvalue[\'Stype\'] == \'Aggregation\':
190 sc.Aggregation(xm, ports, watching, watching_2, configvalue)
191 runtest(xm, ports, configvalue)
192 cr.AggregationStatistics(xm, ports, testresult)
193 cr.FCSGetValue(xm, ports, FCS)
194 cr.OutputStatistics(ports, PSC, configvalue, testresult, FCS)
195 xm.PortRelease(ports)
196
197 if __name__ == \'__main__\':
198 sys.exit(main(sys.argv))
JSON对象配置文件
{
"##常用配置修改":"",
"ports": ["0/0","0/1","0/2","0/3","0/4","0/5"],
"packet": "FIXED 64 1518",
"testtime": "10",
"##注意:dvlan和lanrate只有在Aggregation模式生效,其他模式仅需uvlan和wanrate":"",
"##其中vlan设置为-1则表示不添加vlan":"",
"uvlan": "-1",
"dvlan": "-1",
"wanrate": ["100", "100", "100","100", "100", "100"],
"lanrate": ["21", "22", "23","24"],
"portrate": ["1000","1000","1000","1000","1000","1000"],
"##Stype为测试模式,分别可以配置为:Loopback, Eachother, Aggregation":"",
"##其中Loopback为环回测试,Eachother为两两互打测试,Aggregation为汇聚测试":"",
"Stype":"Eachother",
"##headertype为报文类型,分别可以配置为:TCP, UDP, IP, Ethernet":"",
"headertype": "Ethernet",
"##threshold为丢包率设置":"",
"threshold": "0",
"##不常用配置修改":"",
"##发送学习报文的时间":"",
"learntime": "3",
"##payload类型分别有: Pattern, Random":"",
"payload": "Pattern",
"##PRcheck为使能端口状态检测,1为开启,0为关闭;使能后端口为连接的时候会报错":"",
"PRcheck": "1",
"##PScheck为检测端口速率是否匹配,1为开启,0为关闭":"",
"PScheck":"1",
"tcpUdpPort": ["1024", "2048"],
"IP": "192.168.163.1",
"mac": "000000033333",
"##学习DUT的MAC地址":"",
"snlernenable": "0",
"learnIP": "192.168.2.1",
"##测试仪IP地址":"",
"ip_address": "192.168.1.200"
}