硬件:XENA Valkyrie 或 Vantage主机,测试板卡不限,本方法适用于其100M~400G所有速率端口

环境配置:Python 3

实现功能:

1.控制流量仪进行流量测试,预定配置的流量发送,报文统计,丢包率,延迟等信息的统计

2.单个Python脚本配合不同的JSON对象文件,来实现不同端口数量,不同拓扑模型下的流量测试

3.在发包测试前及运行过程中都对发包端口的链路状态进行监控100M/1G/2.5G/5G/10G

4.嵌入ARP探测,获取DUT的MAC信息,并记录在测试日志中

5.统计端口流量的Tx,Rx信息,丢包率,延迟信息

设计思路:main文件控制主流程,嵌入Statistics模块进行数据统计,CreateClass模块进行命令预定义

运行效果:

 

 

 

 

主要实现代码:

  1 import threading, time, json, random, queue, types, sys, socket
  2 from binascii import hexlify
  3 from TestUtilsL23 import XenaScriptTools
  4 from CreateClass import StreamCreate
  5 from Statistics import CollectTestresult
  6 
  7 def BuildDict(ports, configvalue, watching, watching_2):
  8     #Generate ports Dictionary
  9     a = 1 
 10     for port in ports:
 11         ip_address_b = configvalue[\'IP\'][0:10]
 12         ip_address_c = ip_address_b + str(a)
 13         mac_a = configvalue[\'mac\'][0:10]
 14         if a > 9:
 15             mac_address = mac_a + str(a)
 16         else:
 17             mac_address = mac_a + \'0\' + str(a)
 18         ip_address_a = hexlify(socket.inet_aton(ip_address_c)).decode()
 19         mac_ip = [str(mac_address), str(ip_address_a)]
 20         watching[port] = mac_ip
 21         a += 1
 22 
 23     watching_2[\'type_eth\'] = \'FFFF\',
 24     watching_2[\'type_vlan\'] = \'8100\',
 25     watching_2[\'type_ip\'] = \'08004500006A000000007F11BA2D\',
 26     watching_2[\'type_udp\'] = \'00000000\',
 27     watching_2[\'type_tcp\'] = \'000000000000000050000000E5A40000\',
 28     watching_2[\'tcpid\'] = [hex(int(configvalue[\'tcpPort\'][0]))[2:].zfill(4) + hex(int(configvalue[\'tcpPort\'][1]))[2:].zfill(4)]
 29 
 30 
 31 def PrintMes(ports, configvalue, ip_address):
 32     localtime = time.asctime( time.localtime(time.time()) )
 33     print("\n##############################################")
 34     print("                   测试配置                   ")
 35     print("##############################################\n")
 36     print("测试时间     :  " + str(localtime))
 37     print("设备IP地址   :  " + ip_address)
 38     print("测试端口     :  " + str(ports))
 39     print("测试模式     :  " + configvalue[\'Stype\'])
 40     print("持续时间     :  " + configvalue[\'testtime\'])
 41     print("报文类型     :  " + configvalue[\'header_type\'])
 42     print("报文长度     :  " + configvalue[\'packet\'])
 43     if configvalue[\'Stype\'] == \'Aggregation\':
 44         print("上行VLAN     :  " + str(configvalue[\'uvlan\']))
 45         print("下行VLAN     :  " + str(configvalue[\'uvlan\']))
 46         print("WAN速率      :  " + str(configvalue[\'wanrate\']))
 47         print("LAN速率      :  " + str(configvalue[\'lanrate\']))
 48     else:
 49         print("VLAN         :  " + str(configvalue[\'uvlan\']))
 50         print("速率设置     :  " + str(configvalue[\'wanrate\']))
 51     print("端口速率     :  " + str(configvalue[\'portrate\']))
 52     print("学习时间     :  " + configvalue[\'learntime\'])
 53     print("丢包率设置   :  " + configvalue[\'threshold\'])
 54     if configvalue[\'snlearnenable\'] == \'1\':
 55         print("自动学习SN   :  Enable")
 56     else:
 57         print("自动学习SN   :  Disable")
 58     if configvalue[\'portspeedcheck\'] == \'1\':
 59         print("端口速率检测 :  Enable" )
 60     else:
 61         print("端口速率检测 :  Disable" )
 62 
 63 
 64 def Maclearning(xm, ports, configvalue):
 65     if configvalue[\'snlearnenable\'] == \'1\':
 66         xm.SendExpectOK(ports [1] + " PS_CREATE [0]")
 67         xm.SendExpectOK(ports [1] + " ps_tpldid [0] 0")
 68         xm.SendExpectOK(ports [1] + " PS_PACKETLENGTH [0] FIXED 70 1518")
 69         xm.SendExpectOK(ports [1] + " PS_HEADERPROTOCOL [0] ETHERNET ARP")
 70         learnIP_hex = hexlify(socket.inet_aton(configvalue[\'learnIP\'])).decode()
 71         xm.SendExpectOK(ports [1] + " PS_PACKETHEADER [0] 0xFFFFFFFFFFFF00000000000108060001080006040001000000000001C0A80164FFFFFFFFFFFF" + learnIP_hex)
 72         xm.SendExpectOK(ports [1] + " PS_RATEPPS [0] 2")
 73         xm.SendExpectOK(ports [1] + " PS_ENABLE [0] on")
 74         xm.SendExpectOK(ports [1] + " P_TRAFFIC ON")
 75         xm.SendExpectOK(ports [1] + " P_CAPTURE ON")
 76         time.sleep(1)
 77         xm.SendExpectOK(ports [1] + " P_CAPTURE OFF")
 78         xm.SendExpectOK(ports [1] + " P_TRAFFIC OFF")
 79         SN = xm.Send(ports[1] + " PC_PACKET [0] ?")
 80         SN = SN.split(\'  \')[-1]
 81         xm.SendExpectOK(ports [1] + " PS_DELETE [0]")
 82         Serial = str(SN)
 83         if SN == \'<BADINDEX>\':
 84             print ("\nTest result :Failed;[ARP learning failed!!]")
 85             sys.exit()
 86         print ("DUT SN       :  " + str(Serial[16:28]))
 87 
 88 
 89 def PortSpeed(xm, ports, PSC, configvalue):
 90     if configvalue[\'PRcheck\'] == \'1\':
 91         xm.PortSyncCheck(ports)
 92     i = 0
 93     for port in ports:
 94         PS = xm.Send(port + \' p_speed ?\')
 95         PS = PS.split(\'  \')[-1]
 96         PSC.append(str(PS))
 97         if configvalue[\'portspeedcheck\'] == \'1\':
 98             if str(PS) != configvalue[\'portrate\'][i]:
 99                 print (\'Test Result:Failed;[\' + port + \' 端口速率不匹配,请检查配置...]\')
100                 sys.exit()
101         i += 1
102 
103 
104 def runtest(xm, ports, configvalue):
105     xm.PortTrafficStart(ports)
106     time.sleep(float(configvalue[\'learntime\']))
107     xm.PortTrafficStop(ports)
108     time.sleep(0.5)
109     xm.Statisticsclear(ports)
110     time.sleep(0.5)
111     xm.Porttimelimit(ports, configvalue[\'testtime\'])
112     time.sleep(0.2)
113     xm.PortTrafficStart(ports)
114     count = 0
115     print ("\n开始打流,请耐心等候, 测试时间约为" + configvalue[\'testtime\'] + "秒......\n")
116     while True:
117         print (">>>>>>>>>", end=\'\')
118         xm.Send(ports[0] + " P_TRAFFIC ?")
119         sys.stdout.flush()
120         count += 1
121         time.sleep(1)
122         if count > int(configvalue[\'testtime\']):
123             print (\'\n\n测试结束,正在收集测试数据......\')
124             break
125     print()
126     xm.PortTrafficStop(ports)
127     time.sleep(1)
128 
129 
130 def main(argv):
131     a = sys.argv
132     configvalue = {}
133     filename = str(a[1])
134     with open(filename,\'rb\') as f:
135         configs=json.loads(f.read())
136 
137         ip_address = configs.get(\'ip_address\')
138         ports = configs.get(\'ports\', \'\')
139         configvalue[\'uvlan\'] = configs.get(\'uvlan\', \'\')
140         configvalue[\'dvlan\'] = configs.get(\'dvlan\', \'\')
141         configvalue[\'tcpPort\'] = configs.get(\'tcpUdpPort\', \'\')
142         configvalue[\'packet\'] = configs.get(\'packet\', \'\')
143         configvalue[\'testtime\'] = configs.get(\'testtime\', \'\')
144         configvalue[\'threshold\'] = configs.get(\'threshold\', \'\')
145         configvalue[\'header_type\'] = configs.get(\'headertype\', \'\')
146         configvalue[\'learntime\'] = configs.get(\'learntime\', \'\')
147         configvalue[\'payload\'] = configs.get(\'payload\', \'\')
148         configvalue[\'wanrate\'] = configs.get(\'wanrate\', \'\')
149         configvalue[\'lanrate\'] = configs.get(\'lanrate\', \'\')
150         configvalue[\'portrate\'] = configs.get(\'portrate\', \'\')
151         configvalue[\'PRcheck\'] = configs.get(\'PRcheck\', \'\')
152         configvalue[\'IP\'] = configs.get(\'IP\', \'\')
153         configvalue[\'mac\'] = configs.get(\'mac\', \'\')
154         configvalue[\'snlearnenable\'] = configs.get(\'snlernenable\', \'\')
155         configvalue[\'learnIP\'] = configs.get(\'learnIP\', \'\')
156         configvalue[\'Stype\'] = configs.get(\'Stype\', \'\')
157         configvalue[\'portspeedcheck\'] = configs.get(\'PScheck\', \'\')
158 
159     PSC = []
160     FCS = []
161     watching = {}
162     watching_2 = {}
163     testresult = {}
164 
165     PrintMes(ports, configvalue, ip_address)
166 
167     xm = XenaScriptTools(ip_address)
168     xm.LogonSetOwner("xena", "python_test_1")
169     xm.PortRelinquish(ports)
170     xm.PortReserve(ports)
171     xm.PortTrafficStop(ports)
172     xm.StreamsDelete(ports)
173     sc = StreamCreate(xm, ports, watching, watching_2, configvalue)
174     cr = CollectTestresult(xm, ports, PSC, configvalue, testresult, FCS)
175 
176     
177     BuildDict(ports, configvalue, watching, watching_2)
178     PortSpeed(xm, ports, PSC, configvalue)
179     if configvalue[\'snlearnenable\'] == \'1\':
180         Maclearning(xm, ports, configvalue)
181     if configvalue[\'Stype\'] == \'Loopback\':
182         sc.Loopback(xm, ports, watching, watching_2, configvalue)
183         runtest(xm, ports, configvalue)
184         cr.LoopbackStatistics(xm, ports, testresult)
185     if configvalue[\'Stype\'] == \'Eachother\':
186         sc.Eachother(xm, ports, watching, watching_2, configvalue)
187         runtest(xm, ports, configvalue)
188         cr.EachotherStatistics(xm, ports, testresult)
189     if configvalue[\'Stype\'] == \'Aggregation\':
190         sc.Aggregation(xm, ports, watching, watching_2, configvalue)
191         runtest(xm, ports, configvalue)
192         cr.AggregationStatistics(xm, ports, testresult)
193     cr.FCSGetValue(xm, ports, FCS)
194     cr.OutputStatistics(ports, PSC, configvalue, testresult, FCS)
195     xm.PortRelease(ports)
196 
197 if __name__ == \'__main__\':
198     sys.exit(main(sys.argv))

 

JSON对象配置文件

{
"##常用配置修改":"",

"ports": ["0/0","0/1","0/2","0/3","0/4","0/5"],
"packet": "FIXED 64 1518",
"testtime": "10",

"##注意:dvlan和lanrate只有在Aggregation模式生效,其他模式仅需uvlan和wanrate":"",
"##其中vlan设置为-1则表示不添加vlan":"",
"uvlan": "-1",
"dvlan": "-1",
"wanrate": ["100", "100", "100","100", "100", "100"],
"lanrate": ["21", "22", "23","24"],

"portrate": ["1000","1000","1000","1000","1000","1000"],

"##Stype为测试模式,分别可以配置为:Loopback, Eachother, Aggregation":"",
"##其中Loopback为环回测试,Eachother为两两互打测试,Aggregation为汇聚测试":"",
"Stype":"Eachother",

"##headertype为报文类型,分别可以配置为:TCP, UDP, IP, Ethernet":"",
"headertype": "Ethernet",

"##threshold为丢包率设置":"",
"threshold": "0",




"##不常用配置修改":"",

"##发送学习报文的时间":"",
"learntime": "3",

"##payload类型分别有: Pattern, Random":"",
"payload": "Pattern",

"##PRcheck为使能端口状态检测,1为开启,0为关闭;使能后端口为连接的时候会报错":"",
"PRcheck": "1",

"##PScheck为检测端口速率是否匹配,1为开启,0为关闭":"",
"PScheck":"1",

"tcpUdpPort": ["1024", "2048"],
"IP": "192.168.163.1",
"mac": "000000033333",

"##学习DUT的MAC地址":"",
"snlernenable": "0",
"learnIP": "192.168.2.1",

"##测试仪IP地址":"",
"ip_address": "192.168.1.200"
}

版权声明:本文为xena原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://www.cnblogs.com/xena/p/11579033.html