python接口测试框架--基于page object模式

2019-04-14 18:37发布

       最近复原了之前公司的一套测试框架,并根据我们业务组的实际情况,进行一些变动和调整,之前公司业务流程比较长,多了一个业务层,做了四层封装,我们这边暂时的需求业务线没有这么长,后续可以根据需求在模板层的基础上适当扩展一层,目前在CRM1.16的接口测试阶段实现并应用了此框架~       简介:框架采用三层封装,基础层和模板层为继承关系,模板层之间相互独立,也可以互相调用,用例层相互独立。现阶段主要应用于CRM的需求,最底层(base)的封装本业务线各个需求都需要用的一些共用的方法,比如各种请求,鉴权,数据库连接封装等,主要是crm相关的业务逻辑;中间层(model)的封装是在各个接口的基础上封装请求过程,默认参数,断言以及回参结果验证,主要是各个需求的相关接口,可以每个需求新增一个;最上层(case)的封装专注于各种场景的测试以及复杂逻辑的测试,根据model适当增加;   结构图:                                               优点: 可扩展性:每一个需求相关接口编写一个model层,case层可以调用不同的model层来组合复杂场景; 可维护性:三层封装的结构,使得在接口的调用,同一个接口只有一个入口,并且实施读取swagger的接口参数,即使有接口有大的变动也可以只改一个接口的函数,就可以维护所有相关case,而且不影响其他接口调用; 兼容性:python本身的兼容性就比较强,可以对接各种平台,比如结果可以回写testlink,接口调用可以接入Jenkins,只需要个人提交代码到SVN,所有人可以在发版之前在Jenkins平台运行一遍所有的用例; 灵活性:接口用例用代码来写本来就具有很高的灵活性,case层可以采用ddt数据驱动的方式,在装饰器里添加参数,就可以快速添加多条各种场景的用例,model层也可以在接口的调用中,加入一些判断来处理调用接口的各种情况; 稳定性:model层加容错处理来处理接口调用的各种情况,case层添加setup和setdown来作为前置和后置处理器,清理会对接口测试造成影响的参数,尽量减少每个case之间的依赖性,使之独立; 可靠性:在model层加入数据的校验,测试环境可以加入数据库的校验,而不仅仅只是校验一个code和message,现网的环境可以适当放宽,不用做数据库的校验; 缺点: 需要一定的编程基础,可能导致前期的接口测试周期变长;每个人写的model风格不一,互相调用不熟悉的模块可能导致脚本容易报错; 解决办法: 适当增长接口测试周期;model层统一的接口封装的风格; 另外:框架的思路是根据UI框架的page object的模式衍生而来的 代码详情: 最底层(base)  # coding:utf8 import requests,json,re,pymysql,binascii,yaml,hashlib,time from urllib import parse class EC_base(): def post(self, url, data ,debug = False, header = {}): self.header['Content-Type'] = 'application/x-www-form-urlencoded' for key in header.keys(): self.header[key] = header[key] if self.header[key] == 'application/json': data = json.dumps(data) if debug: print(url) print(data) print(self.header) print(self.s.cookies) content = self.s.post(url, data=data, headers=self.header) if debug: print(content.text) if 'ec_csrf_token' in content.cookies: self.cookies_ec_csrf_token = content.cookies['ec_csrf_token'] self.header['ec_csrf_token'] = self.cookies_ec_csrf_token if 'XSRF-TOKEN' in content.cookies: self.CRMAPI_TOKEN = content.cookies['XSRF-TOKEN'] try: content = content.text content = json.loads(content) except: pass return content def get(self, url, params = None, debug = False, header = {}): self.header['Content-Type'] = 'application/x-www-form-urlencoded' if debug: print(url) print(self.header) print(self.s.cookies) for key in header.keys(): self.header[key] = header[key] content = self.s.get(url,params= params, headers=self.header) if debug: print(content.text) try: content = content.text return json.loads(content) except: pass return content def delete(self, url, params = None, debug = False, header = {}): self.header['Content-Type'] = 'application/x-www-form-urlencoded' if debug: print(url) print(self.header) print(self.s.cookies) for key in header.keys(): self.header[key] = header[key] content = self.s.delete(url,params= params, headers=self.header) if debug: print(content.text) try: content = content.text return json.loads(content) except: pass return content def yaml(self): return yaml.load(open("xxx/1.yaml")) def login(self): #登陆并鉴权 self.s = requests.session() self.header = {} #获取登陆信息 ...... def mysql_login(self, corp_id, db_type = 1): #数据库信连接 if db_type == 0: #BASE基础库 host, prot = 'xx.xx.xx', xxxx user = password = 'ecuser' db = pymysql.connect(host, user, password, None, prot) return db def MD5(self, str1): h1 = hashlib.md5() h1.update(str1.encode(encoding='utf-8')) return h1.hexdigest() def api_data(self): #通过swagger获取接口数据 self.urls = {} groups = [移动端","企业管理","客户端"] group_names = ["移动端","企业管理","客户端"] for i in range(len(groups)): group_str = parse.quote(groups[i]) group_name = group_names[i] self.urls[group_name] = {} a = requests.session().get('url') data = json.loads(a.text) host = "https://" + data['host'] for tags in data['tags']: self.urls[group_name][tags["name"]] = {} for paths in data["paths"].keys(): for tags in data["paths"][paths].keys(): for name in self.urls[group_name]: type= data["paths"][paths][tags] if type["tags"][0] == name: self.urls[group_name][name][type["summary"]] = host+paths if __name__ == "__main__": pass 中间层(model) from test.ec import EC_base class Field_model(EC_base): def __init__(self): self.api_data() self.login() content = self.getFieldInfo() self.groupID1 = content["data"]["groups"][0]["id"] self.name1 = content["data"]["groups"][0]["name"] self.groupID2 = content["data"]["groups"][1]["id"] self.name2 = content["data"]["groups"][1]["name"] def getFieldInfo(self): content = self.get(self.urls['企业管理']['客户字段']['字段信息列表']) return content def addGroup(self,name,code=200): #添加分组 content = self.post(self.urls['企业管理']['客户字段']['添加分组'],{"name":name}) assert content['code'] == code ,"返回报错:" + content['msg'] if code == 200: id = content['data']['id'] name = name.strip() sql = "..." self.query_data(sql, 1) return id def addField(self,name="python测试", id=0, groupId = None, type = 1,data_num = 1, isMust=1, status = 1,params =[],code = 200, funtion_type = "添加自定义字段及选项"): #添加自定义字段 header = {'Content-Type': 'application/json'} if not groupId: groupId = self.groupID2 data = {"groupId": groupId, "id": id, "isMust": isMust, "name": name, "params": params, "status": status, "type": type} content = self.post(self.urls['企业管理']['客户字段'][funtion_type], data, header=header) assert content['code'] == code, "返回报错:" + content['msg'] if code == 200: id = content['data']['id'] name = name.strip() sql = " xxxxxx" self.query_data(sql, data_num) return id def query_data(self,sql, num=None): db = self.mysql_login(self.corp_id) cursor = db.cursor() #验证分组数据 cursor.execute(sql) data = cursor.fetchall() db.close() if num: assert len(data) == num ,"测试数据与数据库对不上!查询语句:" + sql else: return data if __name__ == '__main__': pass 最上层(case) from test.main.model.Field import Field_model from ddt import ddt,data,unpack import unittest @ddt class Field_case(unittest.TestCase): @classmethod def setUpClass(self): self.model = Field_model() def setUp(self): try: content = self.getFieldInfo() for group in content["data"]["groups"]: if group['name'] == "python测试": self.model.deleteGroup(group['id']) for field in group['fields']: if field['name'] == "python测试": self.model.deleteField(field['id']) except: pass @data(-1, 1, 2, 3, 4) def test_01(self,value): #高级搜索-获取分组字段列表 self.model.searchField(value) @data("python测试", "测试10个字哈哈哈哈", "+-*%$*.,", "null", "测试 空格"," 测试空格","测试空格 ") def test_02(self, value): #【成功】添加/删除分组 id = self.model.addGroup(value) self.model.deleteGroup(id) def test_03(self): #【失败】添加分组重复 id = self.model.addGroup('测试重复A') self.model.addGroup('测试重复A',code=40102) self.model.addGroup('测试重复a', code=40102) self.model.addGroup('基本信息', code=40102) self.model.deleteGroup(id) @data(["", 4001], ['测试超10个字哈哈哈哈', 40101]) @unpack def test_04(self, value, code): #【失败】添加分组-分组名不合法 self.model.addGroup(value, code=code) if __name__ == '__main__': Field_case.main()