接口自动化框架搭建Unittes+HTMLTestRunner
本次主要尝试搭建接口自动化框架,基于 unittest+HTMLTestRunner
框架主要模块:
config: 存放配置文件
lib: 封装了一些接口前置函数:处理各种事物
log: 存放生成的日志文件
report: 放置生成的html测试报告
suite: 套件运行器
testcase: 存放测试用例
util: 封装了一些公共函数(例如封装了日志模块,操作mysql函数,tool工具类等)
剩下的就看代码吧:
1 import configparser 2 import os 3 from hashlib import md5 4 from APITestUnittest.util.client.httpclient import HttpClient 5 6 7 class ShowAPI(object): 8 """发送showapi平台的接口""" 9 API_URL = "https://route.showapi.com" 10 11 def __init__(self, showapi_appid=None, secret_key=None): 12 """ 13 args: 14 :param showapi_appid: 服务ID 15 :param secret_key: 服务密钥 16 如果实例化没有传入参数,就从配置文件读取 17 """ 18 config = configparser.ConfigParser() 19 config_dir = os.path.join(os.path.dirname(os.path.dirname(__file__)), "config") 20 config.read(config_dir+"/"+"env.ini", encoding="utf-8") 21 if showapi_appid is None: 22 self.showapi_appid = config.get("showapi", "SHOWAPI_APPID") 23 else: 24 self.showapi_appid = showapi_appid 25 if secret_key is None: 26 self.secret_key = config.get("showapi", "SECRET_ID") 27 else: 28 self.secret_key = secret_key 29 30 def gen_signature(self, params=None): 31 """ 32 生成签名信息 33 :param params: 请求参数 34 :return: 参数签名的md5值 35 """ 36 buff = "" 37 for k in sorted(params.keys()): 38 buff += str(k) + str(params[k]) 39 buff += self.secret_key 40 # print(buff) 41 return md5(buff.encode("utf-8")).hexdigest() 42 43 def send(self, path, method, params): 44 """ 45 :param path: 接口的路径 46 :param method: 接口的方法 47 :param params: 接口传的参数 48 :return: json 49 """ 50 params["showapi_appid"] = self.showapi_appid 51 params["showapi_sign"] = self.gen_signature(params) 52 try: 53 httpclient = HttpClient() 54 url = self.API_URL+'/'+path 55 r = httpclient.send_request(method=method, url=url, params_type="form", data=params) 56 httpclient.close_session() 57 return r 58 except Exception as e: 59 print("调用showapi接口失败",str(e))
1 ''' 2 对requests接口的二次封装 3 目的: 4 1,统一接口调用的方法,为了后续的数据驱动的实现 5 2,让测试用例更加整洁,更加干净 6 ''' 7 import requests 8 import json 9 from APITestUnittest.util import LogHandler 10 11 class HttpClient(object): 12 log = LogHandler.LogHandler().setLog() 13 """ 14 eg: httpclient = HttpClient() 15 response = httpclient(method, url, data) 16 response = httpclient.send_request(method, url, data) 17 """ 18 19 def __init__(self): 20 self.session = requests.session() 21 22 def send_request(self, method, url, params_type="form", data=None, **kwargs): 23 self.log.info("正在进行{0}请求,请求地址:{1},请求参数:{2}".format(method,url,data)) 24 method = method.upper() 25 params_type = params_type.upper() 26 # 如果data是字符串,就将其转换成字典 27 if isinstance(data, str): 28 data = json.loads(data) 29 if "GET" == method: 30 response = self.session.request(method=method, url=url, params=data, **kwargs) 31 elif "POST" == method: 32 if 'FORM' == params_type: # 发送表单数据,使用data参数传递 33 response = self.session.request(method=method, url=url, data=data, **kwargs) 34 else: # "JSON" == params_type:发送json数据,使用json从参数传递 35 response = self.session.request(method=method, url=url, json=data, **kwargs) 36 elif "PUT" == method: 37 if 'FORM' == params_type: # 发送表单数据,使用data参数传递 38 response = self.session.request(method=method, url=url, data=data, **kwargs) 39 else: # "JSON" == params_type:发送json数据,使用json从参数传递 40 response = self.session.request(method=method, url=url, json=data, **kwargs) 41 elif "DELETE" == method: 42 if 'FORM' == params_type: # 发送表单数据,使用data参数传递 43 response = self.session.request(method=method, url=url, data=data, **kwargs) 44 else: # "JSON" == params_type:发送json数据,使用json从参数传递 45 response = self.session.request(method=method, url=url, json=data, **kwargs) 46 else: 47 raise ValueError('request method "{}" error'.format(method)) 48 return response 49 50 def __call__(self, method, url, params_type="form", data=None, **kwargs): 51 return self.send_request(method, url, params_type, data, **kwargs) 52 53 def close_session(self): 54 self.session.close()
1 ''' 2 连接数据库: 封装数据库的操作函数 3 ''' 4 import pymysql 5 from APITestUnittest.util.LogHandler import LogHandler 6 7 class Connect_Mysql(object): 8 conn = None 9 log = LogHandler().setLog() 10 11 def __init__(self, host, username, password, db, charset="utf8", port=3306): 12 self.host = host 13 self.username = username 14 self.password = password 15 self.charset = charset 16 self.db = db 17 self.port = port 18 19 # 连接数据库 20 def connect(self): 21 try: 22 self.conn = pymysql.connect(host=self.host, 23 port=self.port, 24 user=self.username, 25 password=self.password, 26 charset=self.charset, 27 db=self.db) 28 # 创建游标 29 self.cursor = self.conn.cursor() 30 except Exception as e: 31 return e 32 33 # 关闭数据库连接 34 def close(self): 35 self.cursor.close() 36 self.conn.close() 37 38 # 查询一条数据 39 def get_one(self, sql, parmas=()): 40 ret = None 41 try: 42 self.connect() 43 self.cursor.execute(sql, parmas) 44 ret = self.cursor.fetchone() 45 #查询结果为空 46 if ret is (): 47 return None 48 self.close() 49 except Exception as e: 50 print(e) 51 return ret 52 53 # 查询所有记录 54 def get_all(self, sql, parmas=()): 55 result = None 56 try: 57 self.connect() 58 self.cursor.execute(sql, parmas) 59 result = self.cursor.fetchall() 60 if result is (): 61 return None 62 self.close() 63 except Exception as e: 64 print(e) 65 return result 66 67 def __edit(self, sql, parmas): 68 count = 0 69 try: 70 self.connect() 71 count = self.cursor.execute(sql, parmas) 72 self.conn.commit() 73 self.close() 74 except Exception as e: 75 print(e) 76 return count 77 78 # 插入 79 def insert(self, sql, parmas=()): 80 self.log.info(f"{sql}插入成功") 81 return self.__edit(sql,parmas) 82 83 # 修改 84 def update(self, sql, parmas=()): 85 self.log.info(f"{sql}修改成功") 86 return self.__edit(sql, parmas) 87 88 # 删除 89 def delete(self, sql, parmas=()): 90 self.log.info(f"删除语句{sql}删除成功") 91 return self.__edit(sql, parmas)
1 ''' 2 封装了日志类 3 4 ''' 5 import logging 6 from APITestUnittest.suites.RunCasesSuite import SuitRunner 7 8 class LogHandler(): 9 __log_name = SuitRunner.logname 10 # 创建一个logging对象,收集日志 11 logger = logging.getLogger(__name__) 12 # 设置日志的等级 13 logger.setLevel(level=logging.INFO) 14 """ 15 日志,输出到文件,输出到控制台 16 """ 17 18 def setLog(self): 19 if not self.logger.handlers: 20 # 日志存放路径 21 filenamePath = f"../log/{self.__log_name}.log" 22 # 设置文件处理器 23 __fhandler = logging.FileHandler(filename=filenamePath, encoding='utf-8') 24 # 设置控制台处理器 25 __shandler = logging.StreamHandler() 26 # 设置格式化 27 # __format = logging.Formatter('%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s') 28 __format = logging.Formatter('%(asctime)s %(levelname)s %(message)s') 29 # 设置文件处理格式化 30 __fhandler.setFormatter(__format) 31 # 设置控制台处理格式化 32 __shandler.setFormatter(__format) 33 # 添加处理器 34 self.logger.addHandler(__fhandler) 35 self.logger.addHandler(__shandler) 36 return self.logger
1 ''' 2 运行测试case套件,运行测试用例 3 封装套件运行器: 4 ''' 5 6 import configparser 7 import os 8 import time 9 import unittest 10 from HTMLTestRunner import HTMLTestRunner 11 12 13 class SuitRunner(object): 14 # 时间戳中不能有冒号 15 __t = time.strftime("%Y-%m-%d %H-%M-%S", time.localtime()).split("-") 16 __t2 = __t[2].split(" ") 17 __s = f"{__t[0]}年{__t[1]}月{__t2[0]}日{__t2[1]}时{__t[3]}分{__t[4]}秒" 18 logname = f"{__t[0]}年{__t[1]}月{__t2[0]}日" 19 __reportname = f"{__t[0]}年{__t[1]}月{__t2[0]}日" 20 runner = None 21 22 def __init__(self): 23 self.config = configparser.ConfigParser() 24 config_dir = os.path.join(os.path.dirname(os.path.dirname(__file__)), "config") 25 self.config.read(config_dir+'/'+'env.ini', encoding='utf-8') 26 self.report_name = self.__reportname+'.'+'html' 27 28 29 def get_report(self, case_dir = "../testcase/wuye/", pattern = "Test*.py", **kwargs): 30 """ 31 基于套件,运行,执行case生成测试报告 32 :param case_dir: case文件所在路径 33 :param pattern: case文件(匹配文件:) 34 :return: None 35 """ 36 discover = unittest.defaultTestLoader.discover(start_dir=case_dir, pattern=pattern) 37 # 测试报告配置: 38 report_dir = self.config.get("report", "report_dir") 39 name = self.config.get("report", "project_name") 40 filename = report_dir+self.report_name 41 title = f"{name}接口自动化测试报告" 42 43 description = self.config.get("report", "description") 44 if not os.path.exists(report_dir): 45 os.mkdir(report_dir) 46 with open(filename, "wb") as file: 47 runner = HTMLTestRunner(stream=file, title=title, description=description) 48 runner.run(discover)
1 ''' 2 公司接口:涉及新增,修改,删除公司接口 3 ''' 4 import os 5 import unittest 6 import configparser 7 import warnings 8 import jsonpath 9 from APITestUnittest.testcase.wuye.PcGetToken import GetToken 10 from APITestUnittest.util.ConnectMysql import Connect_Mysql 11 from APITestUnittest.util.client.httpclient import HttpClient 12 13 class Company(unittest.TestCase): 14 config = configparser.ConfigParser() 15 PATH = os.path.dirname(os.path.dirname(__file__)) 16 config_dir = os.path.join(os.path.dirname(PATH), 'config') 17 config.read(config_dir+"/"+"env.ini", encoding="utf-8") 18 URL = config.get("wuye", "host") 19 db = Connect_Mysql(host='******', username='*****', password='******', db='******') 20 21 @classmethod 22 def setUpClass(cls): 23 warnings.simplefilter('ignore', ResourceWarning) 24 cls.api = HttpClient() 25 cls.header = { 26 "Authorization": "Bearer" + " " + GetToken().get_token() 27 } 28 @classmethod 29 def tearDownClass(cls) -> None: 30 # 删除所有测试数据 31 cls.db.delete("delete from sys_company where name like '测试使用%' and is_delete='0'") 32 33 # 新增公司接口 34 def test_company_01(self, path="********"): 35 url = self.URL+ path 36 parmas = {"name":"测试使用公司","abbreviation":"简称","alias":"test","code":"16","parentId":"2","type":"COMPANY"} 37 ret = self.api.send_request(method='post', url=url, headers=self.header, params_type='json', data=parmas) 38 self.assertEqual(ret.json()['code'], "0") 39 self.assertEqual(ret.json()['data'], True) 40 41 # 新增公司名称字段name重复的公司 42 def test_company_02(self, path="********"): 43 url = self.URL+ path 44 parmas = {"name":"测试使用公司","abbreviation":"简称","alias":"test","code":"17","parentId":"2","type":"COMPANY"} 45 ret = self.api.send_request(method='post', url=url, headers=self.header, params_type='json', data=parmas) 46 print(ret.text) 47 result = self.db.get_all("select * from sys_company where name='测试使用公司' and alias='简称' and is_delete='1'") 48 self.assertIsNone(result) 49 self.assertEqual(ret.json()['code'], "-1") 50 self.assertEqual(jsonpath.jsonpath(ret.json(), "$..message")[0], "该名字已存在") 51 52 # 简称重复的公司 53 def test_company_03(self, path="*******"): 54 url = self.URL + path 55 parmas = {"name": "测试使用公司1简称", "abbreviation": "简称", "alias": "test", "code": "18", "parentId": "2", 56 "type": "COMPANY"} 57 ret = self.api.send_request(method='post', url=url, headers=self.header, params_type='json', data=parmas) 58 print(ret.text) 59 # 验证数据库未新增简称重复的公司 60 result = self.db.get_all("select * from sys_company where name='测试使用公司1' and alias='简称'") 61 self.assertIsNone(result) 62 self.assertEqual(ret.json()['code'], "-1") 63 self.assertEqual(jsonpath.jsonpath(ret.json(), "$..message")[0], "该简称已存在") 64 65 # code重复:路径用***代替 66 def test_company_04(self, path="*****"): 67 url = self.URL + path 68 parmas = {"name": "测试使用公司2", "abbreviation": "简称1", "alias": "test", "code": "16", "parentId": "2", 69 "type": "COMPANY"} 70 ret = self.api.send_request(method='post', url=url, headers=self.header, params_type='json', data=parmas) 71 print(ret.text) 72 # 验证数据库未新增code重复的公司 73 result = self.db.get_all("select * from sys_company where name='测试使用公司2' and code='16'") 74 self.assertIsNone(result) 75 self.assertEqual(ret.json()['code'], "-1") 76 self.assertEqual(jsonpath.jsonpath(ret.json(), "$..message")[0], "该code已存在") 77 78 # 删除公司接口:路径用***代替 79 def test_company_05(self, path="*********"): 80 url = self.URL+ path 81 company_id_value = self.db.get_one("select id from sys_company where name='测试使用公司' and is_delete='1'") 82 parmas ={ 83 "id": company_id_value 84 } 85 ret = self.api.send_request(method='post', url=url, headers=self.header, params_type='form', data=parmas) 86 self.assertEqual(ret.json()['code'], "0") 87 self.assertEqual(ret.json()['data'], True) 88 89 90 if __name__ == '__main__': 91 unittest.main()