python接口测试-项目实践(八) 完成的接口类和执行脚本
脱敏后脚本
projectapi.py: 项目接口类
# -*- coding:utf-8 -*- """ xx项目接口类 2018-11 dinghanhua """ import requests import re import pymssql #region 工具类函数 def findinfo_from_apiquerystockdetailinfo(str1,str2): """ 从str1中找第一个"str2":...后面的值 :param str1: :param str2: :return: str2对应的值 """ pattern1 = \'"\'+str2 + \'":(.*?),"\' #左右边界 result = re.search(pattern1, str1) #正则匹配 if result: result = result.group(1).replace(\'"\',\'\') return result def get_last_value_of_key(resultlist,key): \'\'\' 从二维数组取第一行的元素对应的最后一行的值 :param resultlist: :param key: :return: value \'\'\' for i in range(0,len(resultlist[0])): if key == resultlist[0][i]: #第一行中找到对应字段名的索引 result = resultlist[-1][i] return result #返回数组最后一行对应的值 def round_test(data,i=0): \'\'\' 四舍五入,解决round(7.35)=7.3的问题 :param data: :param i: 保留的位数,默认保留一位小数 :return: \'\'\' if isinstance(i,int): #i是整数 raise Exception(\'the second param must be int\') else: mi = 10**i f = data*mi - int(data*mi) if f >=0.5: res = (int(data*mi)+1)/mi elif f <=-0.5: res = (int(data*mi-1))/mi else: res = int(data*mi)/mi if i<=0: res = int(res) return res # endregion class ProjectApi: def api_querystockdetailinfo(self,stockcode): """ 请求并提取股票基本信息接口数据 :param stockcode: :return: 截取信息dict """ api = \'http://testdomain/querystockdetailinfo?stockcode={stockcode}\'.format(stockcode = stockcode) response = requests.get(api) result = response.text.replace(r\'\n\',\'\').replace(\'\\\', \'\') # 去掉特殊字符\n,\ result_dict = {\'stockcode\': stockcode} #股票名称 result_dict[\'StockName\'] = findinfo_from_apiquerystockdetailinfo(result, \'StockName\') if result_dict[\'StockName\']: #股票名称存在继续处理其他字段,否则报错并返回 # 公司概要 #剔除公司概要中“公司”“公司是”、“公司是一家”高度重复的内容 overviewvalue = result_dict[\'OverviewValue\'] = findinfo_from_apiquerystockdetailinfo(result, \'OverviewValue\') if overviewvalue.startswith(\'公司是一家\'): result_dict[\'OverviewValue\'] = overviewvalue[5:] elif overviewvalue.startswith(\'公司是\'): result_dict[\'OverviewValue\'] = overviewvalue[3:] elif overviewvalue.startswith(\'公司\'): result_dict[\'OverviewValue\'] = overviewvalue[2:] if not overviewvalue.endswith(\'。\'): #判断最后是否有句号,没有加一个 result_dict[\'OverviewValue\'] += \'。\' # 市值 typecap = findinfo_from_apiquerystockdetailinfo(result, \'TypeCap\') dictcap = {\'1\': \'巨盘\', \'2\': \'大盘\', \'3\': \'中盘\', \'4\': \'小盘\', \'5\': \'微盘\'} result_dict[\'TypeCap\'] = dictcap[typecap] # 风格 TypeStyle = result_dict[\'TypeStyle\'] = findinfo_from_apiquerystockdetailinfo(result, \'TypeStyle\') dictstyle = {\'1\': \'成长\', \'2\': \'价值\', \'3\': \'周期\', \'4\': \'题材\', \'5\': \'高价值\'} if len(TypeStyle) == 1: result_dict[\'TypeStyle\'] = dictstyle[TypeStyle] elif len(TypeStyle) >1: typestylelist = TypeStyle.split(\',\') #风格可能有多个 for t in range(len(typestylelist)): typestylelist[t] = dictstyle[typestylelist[t]] result_dict[\'TypeStyle\'] = \'、\'.join(typestylelist) # 生命周期 LifecycleValue 生命周期(单选,例:1);(1初创期、2成长期、3成熟期、4衰退期)") LifecycleValue = findinfo_from_apiquerystockdetailinfo(result, \'LifecycleValue\') dictlifecycle = {\'1\': \'初创期\', \'2\': \'成长期\', \'3\': \'成熟期\', \'4\': \'衰退期\', \'5\': \'周期底部\', \'6\': \'周期顶部\', \'7\': \'周期向下\', \'8\': \'周期向上\'} if LifecycleValue: result_dict[\'LifecycleValue\'] = dictlifecycle[LifecycleValue] # 估值 ScoreTTM 估值(分值1~5,>=4 偏低, <=2 偏高,其他适中)") ScoreTTM = findinfo_from_apiquerystockdetailinfo(result, \'ScoreTTM\') if ScoreTTM: if float(ScoreTTM) >= 4: result_dict[\'ScoreTTM\'] = \'偏低\' elif ScoreTTM and float(ScoreTTM) <= 2: result_dict[\'ScoreTTM\'] = \'偏高\' else: result_dict[\'ScoreTTM\'] = \'适中\' # 成长指数 ScoreGrowing 成长指数(分值1~5,>=4 高, <=2 低,其他中)\') ScoreGrowing = findinfo_from_apiquerystockdetailinfo(result, \'ScoreGrowing\') if ScoreGrowing: if float(ScoreGrowing) >= 4: result_dict[\'ScoreGrowing\'] = \'高\' elif float(ScoreGrowing) <= 2: result_dict[\'ScoreGrowing\'] = \'低\' else: result_dict[\'ScoreGrowing\'] = \'中\' else: result_dict[\'ScoreGrowing\']=\'\' # 盈利能力 ScoreProfit = findinfo_from_apiquerystockdetailinfo(result, \'ScoreProfit\') # \' ScoreProfit 盈利能力(分值1~5,>=4 高, <=2 低,其他中)\' ) if ScoreProfit: if float(ScoreProfit) >= 4: result_dict[\'ScoreProfit\'] = \'高\' elif float(ScoreProfit) <= 2: result_dict[\'ScoreProfit\'] = \'低\' else: result_dict[\'ScoreProfit\'] = \'中\' else: result_dict[\'ScoreProfit\']=\'\' return result_dict def api_finance(self,stockcode): """ 请求并提取财务数据 :param stockcode: :return: dict """ api = \'http://testdomain/finance?stockcode={stockcode}\'.format(stockcode = stockcode) response = requests.get(api) response.encoding = \'utf-8-sig\' result = response.json()[\'data\'][0][\'result\'] # 转化为二位数组 result_dict = {\'stockcode\': stockcode} #存储返回结果 if len(result) <3: #说明股票没数据 return result_dict # 当期报告期 result_dict[\'EndDate\'] = get_last_value_of_key(result, \'EndDate\') # 预测收益报告期 ReportPeriod = get_last_value_of_key(result, \'ReportPeriod\') dictreportperoid = {\'03/31\': \'一季度\', \'06/30\': \'上半年\', \'09/30\': \'前三季度\', \'12/31\': \'本年度\'} if ReportPeriod and ReportPeriod != result_dict[\'EndDate\']: #预测收益报告期不等于当期报告期 ReportPeriod = get_last_value_of_key(result, \'ReportPeriod\')[5:10] result_dict[\'ReportPeriod\'] = dictreportperoid[ReportPeriod] else: result_dict[\'ReportPeriod\'] = \'\' # 预测业绩情况 PerformanceType = get_last_value_of_key(result, \'PerformanceType\') result_dict[\'PerformanceType\'] = PerformanceType # 预测业绩比例 result_dict[\'PerformanceTypeRange\'] = get_last_value_of_key(result, \'PerformanceTypeRange\') # 营业收入增长率 result_dict[\'OperatingRevenueYoY\'] = get_last_value_of_key(result, \'OperatingRevenueYoY\') # 营业利润增长率 result_dict[\'NetProfitYoY\'] = get_last_value_of_key(result, \'NetProfitYoY\') # 净资产收益率 result_dict[\'ROE\'] = get_last_value_of_key(result, \'ROE\') # 毛利率 result_dict[\'SalesGrossMargin\'] = get_last_value_of_key(result, \'SalesGrossMargin\') return result_dict def api_quote(self,stockcode): """ 请求并提取PETTM :param stockcode: :return: dict """ api = \'http://testdomain/quote?stockcode={stockcode}\'.format(stockcode=stockcode) response = requests.get(api) response.encoding = \'utf-8-sig\' result = response.json()[\'data\'][0][\'result\'] # 转化为二位数组 result_dict = {\'stockcode\':stockcode} if len(result) <3: #说明股票没数据 return result_dict result_dict[\'PETTM\'] = get_last_value_of_key(result, \'PE1\') return result_dict def result_of_3sourceapi(self,stockcode): """ 拼接3个接口取出的字串 :param stockcode: :return: """ result_stockdetailinfo = self.api_querystockdetailinfo(stockcode) if result_stockdetailinfo[\'StockName\']: #如果股票名称存在,执行后续步骤 result_finance = self.api_finance(stockcode) result_quote = self.api_quote(stockcode) #显示三个接口结果 #print(result_stockdetailinfo) #print(result_finance) #print(result_quote) #空值、精度处理 OperatingRevenueYoY = round_test(float(result_finance[\'OperatingRevenueYoY\']),1) NetProfitYoY = round_test(float(result_finance[\'NetProfitYoY\']),1) if result_finance[\'ReportPeriod\'] ==\'\'\ or result_finance[\'PerformanceType\'] == \'\'\ or result_finance[\'PerformanceTypeRange\'] == \'\': ReportPeriod = PerformanceType = PerformanceTypeRange = \'\' else: ReportPeriod = \',预计\' + result_finance[\'ReportPeriod\'] PerformanceType = \'业绩\' + result_finance[\'PerformanceType\'] PerformanceTypeRange = result_finance[\'PerformanceTypeRange\'] if result_finance[\'ROE\']: ROE = \',净资产收益率:{0}%\'.format(round_test(float(result_finance[\'ROE\']),1)) else: ROE = \'\' if result_finance[\'SalesGrossMargin\']: SalesGrossMargin = \',毛利率:{0}%\'.format(round_test(float(result_finance[\'SalesGrossMargin\']),1)) else: SalesGrossMargin = \'\' result = \'{OverviewValue} {TypeCap}{TypeStyle}股,处于{LifecycleValue}。\' \ \'估值{ScoreTTM},PE(TTM):{PETTM};\' \ \'成长性{ScoreGrowing},当期营收增长:{OperatingRevenueYoY}%,当期利润增长:{NetProfitYoY}%;\' \ \'盈利能力{ScoreProfit}{ROE}{SalesGrossMargin}{ReportPeriod}{PerformanceType}{PerformanceTypeRange}。\'\ .format(OverviewValue = result_stockdetailinfo[\'OverviewValue\'], TypeCap = result_stockdetailinfo[\'TypeCap\'], TypeStyle = result_stockdetailinfo[\'TypeStyle\'], LifecycleValue = result_stockdetailinfo[\'LifecycleValue\'], ScoreTTM = result_stockdetailinfo[\'ScoreTTM\'], PETTM = round_test(float(result_quote[\'PETTM\'])), ScoreGrowing = result_stockdetailinfo[\'ScoreGrowing\'], OperatingRevenueYoY = OperatingRevenueYoY, NetProfitYoY = NetProfitYoY, ScoreProfit = result_stockdetailinfo[\'ScoreProfit\'], ROE = ROE, SalesGrossMargin=SalesGrossMargin, ReportPeriod = ReportPeriod, PerformanceType = PerformanceType, PerformanceTypeRange = PerformanceTypeRange) return result else: return \'不存在该股票数据\' def api_of_dev(self,stockcodelist,cookie,page=0,domain=\'testdomain.cn\'): """ 获取开发接口数据 :param 股票列表;cookie;domain默认线上 :return: 股票代码及数据 """ headers = {\'Cookie\':cookie} url = \'http://{domain}/getstockbypage?StockCodeList={stockcodelist}&PageIndex={page}&PageSize=10\'.format(stockcodelist = stockcodelist,domain = domain,page=page) response = requests.get(url, headers=headers) jsonstr = response.json()# 转成json,取出message message = jsonstr[\'Message\'] dict_result = {} if message: for ele in message: stockcode = ele[\'StockCode\'] content = ele[\'Content\'] # 实际结果 nickname = ele[\'NickName\'] #发布者 if nickname == \'project000\': dict_result[stockcode] = content return dict_result def compare_vs_devapi(self,stockcodelist,cookie,page=0,domain=\'testdomain.cn\'): """ 开发接口数据与接口拼接数据对比 :return: bool """ diff_list = [] # 存储不一致的股票代码 resultofdev = self.api_of_dev(stockcodelist,cookie,page,domain) #请求开发接口 if resultofdev: #如果开发结果不为空 for stock,actual in resultofdev.items(): expected = self.result_of_3sourceapi(stock) #数据源拼接结果 \'\'\'去掉pe(ttm)对比\'\'\' beginindex = actual.find(\'PE(TTM)\') endindex = actual.find(\';\', beginindex) actual_result = actual[:beginindex] + actual[endindex:] beginindex = expected.find(\'PE(TTM)\') endindex = expected.find(\';\', beginindex) expected_result = expected[:beginindex] + expected[endindex:] if actual_result != expected_result: #预期实际对比 print(stock) print(\'开发:\',actual_result) print(\'预期:\',expected_result) diff_list.append(stock) else: print(stock,\'一致(不含PETTM)\') if diff_list: #异常股票列表不为空则输出;空则提示全部一致 print(\'不一致的股票列表:\', diff_list) return False else: print(\'对比结果:数据一致\') return True else: print(\'接口没有数据\') return True def compare_vs_database(self,count=10): """ 比较数据库数据与数据源拼接字串 :param count:对比股票数量,default=10 :return:True 一致;False 不一致 """ diff_list = [] # 存储不一致的股票代码 with pymssql.connect(server=\'192.168.1.1\', user=\'sa\', password=\'sa\', database=\'test_db\') as myconnect: with myconnect.cursor(as_dict=True) as cursor: cursor.execute("""SELECT top {count} StockCode,StockName,content FROM [test_db].[dbo].[table] where NickName =\'project000\' and isvalid = 1 and IsDeleted =0 order by createtime desc""".format(count=count)) for row in cursor: stockcode = row[\'StockCode\'] actual = row[\'content\'] expected = self.result_of_3sourceapi(stockcode) # 数据源拼接结果 \'\'\'去掉pe(ttm)对比\'\'\' beginindex = actual.find(\'PE(TTM)\') endindex = actual.find(\';\', beginindex) actual_result = actual[:beginindex] + actual[endindex:] beginindex = expected.find(\'PE(TTM)\') endindex = expected.find(\';\', beginindex) expected_result = expected[:beginindex] + expected[endindex:] if actual_result != expected_result: # 预期实际对比 print(stockcode) print(\'开发:\', actual_result) print(\'预期:\', expected_result) diff_list.append(stockcode) else: print(stockcode, \'一致(不含PETTM)\') if diff_list: print(\'不一致的股票列表:\', diff_list) return False else: print(\'对比结果:数据全部一致\') return True
run_test.py 执行脚本:
# coding:utf-8 """ 接口测试执行脚本 """ import projectapi import unittest class ApiTest(unittest.TestCase): def setUp(self): self.projectapi1 = projectapi.ProjectApi() # 创建接口对象 def testcase1(self): """与开发接口比对""" stockcodelist = \'600000%2C600128%2C600146%2C600165%2C600186\' #通过抓包获取cookie cookie = \'globalid=24A85DEC-AF25-36DD-C774-ED092F705767\' testresult = self.projectapi1.compare_vs_devapi(stockcodelist,cookie) self.assertTrue(testresult) def testcase2(self): # 与数据库对比 testresult = self.projectapi1.compare_vs_database(count=10) self.assertTrue(testresult) def testcase3(self): """手工查询原数据和拼接字串""" while True: stockcode = input(\'输入股票代码: \') if stockcode.isdigit() and len(stockcode)==6 and stockcode[:2] in (\'00\',\'60\',\'30\'): print(\'数据请求中.....\') print(self.projectapi1.api_quote(stockcode)) print(self.projectapi1.api_finance(stockcode)) print(self.projectapi1.api_querystockdetailinfo(stockcode)) print(self.projectapi1.result_of_3sourceapi(stockcode)) else: print(\'股票代码有误\')
版权声明:本文为dinghanhua原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。