脱敏后脚本

 

projectapi.py: 项目接口类

# -*- coding:utf-8 -*-
"""
xx项目接口类
2018-11
dinghanhua
"""

import requests
import re
import pymssql


#region 工具类函数
def findinfo_from_apiquerystockdetailinfo(str1,str2):
    """
    从str1中找第一个"str2":...后面的值
    :param str1:
    :param str2:
    :return: str2对应的值
    """
    pattern1 = \'"\'+str2 + \'":(.*?),"\' #左右边界
    result = re.search(pattern1, str1) #正则匹配
    if result:
        result = result.group(1).replace(\'"\',\'\')
    return result


def get_last_value_of_key(resultlist,key):
    \'\'\'
    从二维数组取第一行的元素对应的最后一行的值
    :param resultlist:
    :param key:
    :return: value
    \'\'\'
    for i in range(0,len(resultlist[0])):
        if key == resultlist[0][i]:   #第一行中找到对应字段名的索引
            result = resultlist[-1][i]
            return result #返回数组最后一行对应的值


def round_test(data,i=0):
    \'\'\'
    四舍五入,解决round(7.35)=7.3的问题
    :param data:
    :param i: 保留的位数,默认保留一位小数
    :return:
    \'\'\'
    if isinstance(i,int): #i是整数
        raise Exception(\'the second param must be int\')
    else:
        mi = 10**i
        f = data*mi - int(data*mi)
        if f >=0.5:
            res = (int(data*mi)+1)/mi
        elif f <=-0.5:
            res = (int(data*mi-1))/mi
        else:
            res = int(data*mi)/mi
        if i<=0:
            res = int(res)
    return res
# endregion

class ProjectApi:
    def api_querystockdetailinfo(self,stockcode):
        """
        请求并提取股票基本信息接口数据
        :param stockcode:
        :return: 截取信息dict
        """
        api = \'http://testdomain/querystockdetailinfo?stockcode={stockcode}\'.format(stockcode = stockcode)
        response = requests.get(api)
        result = response.text.replace(r\'\n\',\'\').replace(\'\\\', \'\')  # 去掉特殊字符\n,\
        result_dict = {\'stockcode\': stockcode}

        #股票名称
        result_dict[\'StockName\'] = findinfo_from_apiquerystockdetailinfo(result, \'StockName\')

        if result_dict[\'StockName\']: #股票名称存在继续处理其他字段,否则报错并返回

            # 公司概要 #剔除公司概要中“公司”“公司是”、“公司是一家”高度重复的内容
            overviewvalue = result_dict[\'OverviewValue\'] = findinfo_from_apiquerystockdetailinfo(result, \'OverviewValue\')

            if overviewvalue.startswith(\'公司是一家\'):
                result_dict[\'OverviewValue\'] = overviewvalue[5:]
            elif overviewvalue.startswith(\'公司是\'):
                result_dict[\'OverviewValue\'] = overviewvalue[3:]
            elif overviewvalue.startswith(\'公司\'):
                result_dict[\'OverviewValue\'] = overviewvalue[2:]

            if not overviewvalue.endswith(\'\'): #判断最后是否有句号,没有加一个
                result_dict[\'OverviewValue\'] += \'\'

            # 市值
            typecap = findinfo_from_apiquerystockdetailinfo(result, \'TypeCap\')
            dictcap = {\'1\': \'巨盘\', \'2\': \'大盘\', \'3\': \'中盘\', \'4\': \'小盘\', \'5\': \'微盘\'}
            result_dict[\'TypeCap\'] = dictcap[typecap]

            # 风格
            TypeStyle = result_dict[\'TypeStyle\'] = findinfo_from_apiquerystockdetailinfo(result, \'TypeStyle\')
            dictstyle = {\'1\': \'成长\', \'2\': \'价值\', \'3\': \'周期\', \'4\': \'题材\', \'5\': \'高价值\'}
            if len(TypeStyle) == 1:
                result_dict[\'TypeStyle\'] = dictstyle[TypeStyle]
            elif len(TypeStyle) >1:
                typestylelist = TypeStyle.split(\',\')  #风格可能有多个
                for t in range(len(typestylelist)):
                        typestylelist[t] = dictstyle[typestylelist[t]]
                result_dict[\'TypeStyle\'] = \'\'.join(typestylelist)

            # 生命周期 LifecycleValue 生命周期(单选,例:1);(1初创期、2成长期、3成熟期、4衰退期)")
            LifecycleValue = findinfo_from_apiquerystockdetailinfo(result, \'LifecycleValue\')
            dictlifecycle = {\'1\': \'初创期\', \'2\': \'成长期\', \'3\': \'成熟期\', \'4\': \'衰退期\', \'5\': \'周期底部\', \'6\': \'周期顶部\', \'7\': \'周期向下\', \'8\': \'周期向上\'}
            if LifecycleValue:
                result_dict[\'LifecycleValue\'] = dictlifecycle[LifecycleValue]

            # 估值 ScoreTTM 估值(分值1~5,>=4 偏低, <=2 偏高,其他适中)")
            ScoreTTM = findinfo_from_apiquerystockdetailinfo(result, \'ScoreTTM\')
            if ScoreTTM:
                if float(ScoreTTM) >= 4:
                    result_dict[\'ScoreTTM\'] = \'偏低\'
                elif ScoreTTM and float(ScoreTTM) <= 2:
                    result_dict[\'ScoreTTM\'] = \'偏高\'
                else:
                    result_dict[\'ScoreTTM\'] = \'适中\'

            # 成长指数 ScoreGrowing  成长指数(分值1~5,>=4 高, <=2 低,其他中)\')
            ScoreGrowing = findinfo_from_apiquerystockdetailinfo(result, \'ScoreGrowing\')
            if ScoreGrowing:
                if float(ScoreGrowing) >= 4:
                    result_dict[\'ScoreGrowing\'] = \'\'
                elif float(ScoreGrowing) <= 2:
                    result_dict[\'ScoreGrowing\'] = \'\'
                else:
                    result_dict[\'ScoreGrowing\'] = \'\'
            else:
                result_dict[\'ScoreGrowing\']=\'\'

            # 盈利能力
            ScoreProfit = findinfo_from_apiquerystockdetailinfo(result, \'ScoreProfit\')  # \'   ScoreProfit  盈利能力(分值1~5,>=4 高, <=2 低,其他中)\' )
            if ScoreProfit:
                if float(ScoreProfit) >= 4:
                    result_dict[\'ScoreProfit\'] = \'\'
                elif float(ScoreProfit) <= 2:
                    result_dict[\'ScoreProfit\'] = \'\'
                else:
                    result_dict[\'ScoreProfit\'] = \'\'
            else:
                result_dict[\'ScoreProfit\']=\'\'

        return result_dict


    def api_finance(self,stockcode):
        """
        请求并提取财务数据
        :param stockcode:
        :return: dict
        """
        api = \'http://testdomain/finance?stockcode={stockcode}\'.format(stockcode = stockcode)
        response = requests.get(api)
        response.encoding = \'utf-8-sig\'
        result = response.json()[\'data\'][0][\'result\']  # 转化为二位数组
        result_dict = {\'stockcode\': stockcode} #存储返回结果

        if len(result) <3: #说明股票没数据
            return result_dict

        # 当期报告期
        result_dict[\'EndDate\'] = get_last_value_of_key(result, \'EndDate\')
        # 预测收益报告期
        ReportPeriod = get_last_value_of_key(result, \'ReportPeriod\')
        dictreportperoid = {\'03/31\': \'一季度\', \'06/30\': \'上半年\', \'09/30\': \'前三季度\', \'12/31\': \'本年度\'}
        if ReportPeriod and ReportPeriod != result_dict[\'EndDate\']: #预测收益报告期不等于当期报告期
            ReportPeriod = get_last_value_of_key(result, \'ReportPeriod\')[5:10]
            result_dict[\'ReportPeriod\'] = dictreportperoid[ReportPeriod]
        else:
            result_dict[\'ReportPeriod\'] = \'\'
        # 预测业绩情况
        PerformanceType = get_last_value_of_key(result, \'PerformanceType\')
        result_dict[\'PerformanceType\'] = PerformanceType
        # 预测业绩比例
        result_dict[\'PerformanceTypeRange\'] = get_last_value_of_key(result, \'PerformanceTypeRange\')
        # 营业收入增长率
        result_dict[\'OperatingRevenueYoY\'] = get_last_value_of_key(result, \'OperatingRevenueYoY\')
        # 营业利润增长率
        result_dict[\'NetProfitYoY\'] = get_last_value_of_key(result, \'NetProfitYoY\')
        # 净资产收益率
        result_dict[\'ROE\'] = get_last_value_of_key(result, \'ROE\')
        # 毛利率
        result_dict[\'SalesGrossMargin\'] =  get_last_value_of_key(result, \'SalesGrossMargin\')

        return result_dict


    def api_quote(self,stockcode):
        """
        请求并提取PETTM
        :param stockcode:
        :return: dict
        """

        api = \'http://testdomain/quote?stockcode={stockcode}\'.format(stockcode=stockcode)
        response = requests.get(api)
        response.encoding = \'utf-8-sig\'
        result = response.json()[\'data\'][0][\'result\']  # 转化为二位数组
        result_dict = {\'stockcode\':stockcode}
        if len(result) <3: #说明股票没数据
            return result_dict
        result_dict[\'PETTM\'] = get_last_value_of_key(result, \'PE1\')
        return result_dict

    def result_of_3sourceapi(self,stockcode):
        """
        拼接3个接口取出的字串
        :param stockcode:
        :return:
        """
        result_stockdetailinfo = self.api_querystockdetailinfo(stockcode)
        if result_stockdetailinfo[\'StockName\']: #如果股票名称存在,执行后续步骤
            result_finance = self.api_finance(stockcode)
            result_quote = self.api_quote(stockcode)

            #显示三个接口结果
            #print(result_stockdetailinfo)
            #print(result_finance)
            #print(result_quote)

            #空值、精度处理
            OperatingRevenueYoY = round_test(float(result_finance[\'OperatingRevenueYoY\']),1)
            NetProfitYoY = round_test(float(result_finance[\'NetProfitYoY\']),1)

            if result_finance[\'ReportPeriod\'] ==\'\'\
                    or result_finance[\'PerformanceType\'] == \'\'\
                    or result_finance[\'PerformanceTypeRange\'] == \'\':
                ReportPeriod  = PerformanceType = PerformanceTypeRange = \'\'
            else:
                ReportPeriod = \',预计\' + result_finance[\'ReportPeriod\']
                PerformanceType = \'业绩\' + result_finance[\'PerformanceType\']
                PerformanceTypeRange = result_finance[\'PerformanceTypeRange\']

            if result_finance[\'ROE\']:
                ROE = \',净资产收益率:{0}%\'.format(round_test(float(result_finance[\'ROE\']),1))
            else:
                ROE = \'\'

            if result_finance[\'SalesGrossMargin\']:
                SalesGrossMargin = \',毛利率:{0}%\'.format(round_test(float(result_finance[\'SalesGrossMargin\']),1))
            else:
                SalesGrossMargin = \'\'


            result = \'{OverviewValue} {TypeCap}{TypeStyle}股,处于{LifecycleValue}。\' \
                     \'估值{ScoreTTM},PE(TTM):{PETTM};\' \
                     \'成长性{ScoreGrowing},当期营收增长:{OperatingRevenueYoY}%,当期利润增长:{NetProfitYoY}%;\' \
                     \'盈利能力{ScoreProfit}{ROE}{SalesGrossMargin}{ReportPeriod}{PerformanceType}{PerformanceTypeRange}。\'\
                .format(OverviewValue = result_stockdetailinfo[\'OverviewValue\'],
                        TypeCap = result_stockdetailinfo[\'TypeCap\'],
                        TypeStyle = result_stockdetailinfo[\'TypeStyle\'],
                        LifecycleValue = result_stockdetailinfo[\'LifecycleValue\'],
                        ScoreTTM = result_stockdetailinfo[\'ScoreTTM\'],
                        PETTM = round_test(float(result_quote[\'PETTM\'])),
                        ScoreGrowing = result_stockdetailinfo[\'ScoreGrowing\'],
                        OperatingRevenueYoY = OperatingRevenueYoY,
                        NetProfitYoY = NetProfitYoY,
                        ScoreProfit = result_stockdetailinfo[\'ScoreProfit\'],
                        ROE = ROE,
                        SalesGrossMargin=SalesGrossMargin,
                        ReportPeriod = ReportPeriod,
                        PerformanceType = PerformanceType,
                        PerformanceTypeRange = PerformanceTypeRange)

            return result
        else:
            return  \'不存在该股票数据\'



    def api_of_dev(self,stockcodelist,cookie,page=0,domain=\'testdomain.cn\'):
        """
        获取开发接口数据
        :param 股票列表;cookie;domain默认线上
        :return: 股票代码及数据
        """
        headers = {\'Cookie\':cookie}
        url = \'http://{domain}/getstockbypage?StockCodeList={stockcodelist}&PageIndex={page}&PageSize=10\'.format(stockcodelist = stockcodelist,domain = domain,page=page)

        response = requests.get(url, headers=headers)
        jsonstr = response.json()# 转成json,取出message
        message = jsonstr[\'Message\']
        dict_result = {}

        if message:
            for ele in message:
                 stockcode = ele[\'StockCode\']
                 content = ele[\'Content\']  # 实际结果
                 nickname = ele[\'NickName\'] #发布者
                 if nickname == \'project000\':
                    dict_result[stockcode] = content

        return dict_result

    def compare_vs_devapi(self,stockcodelist,cookie,page=0,domain=\'testdomain.cn\'):
        """
        开发接口数据与接口拼接数据对比
        :return: bool
        """
        diff_list = []  # 存储不一致的股票代码

        resultofdev = self.api_of_dev(stockcodelist,cookie,page,domain) #请求开发接口
        if resultofdev: #如果开发结果不为空
            for stock,actual in resultofdev.items():
                expected = self.result_of_3sourceapi(stock) #数据源拼接结果

                \'\'\'去掉pe(ttm)对比\'\'\'
                beginindex = actual.find(\'PE(TTM)\')
                endindex = actual.find(\'\', beginindex)
                actual_result = actual[:beginindex] + actual[endindex:]

                beginindex = expected.find(\'PE(TTM)\')
                endindex = expected.find(\'\', beginindex)
                expected_result = expected[:beginindex] + expected[endindex:]

                if actual_result != expected_result: #预期实际对比
                    print(stock)
                    print(\'开发:\',actual_result)
                    print(\'预期:\',expected_result)
                    diff_list.append(stock)
                else:
                    print(stock,\'一致(不含PETTM)\')

            if diff_list: #异常股票列表不为空则输出;空则提示全部一致
                print(\'不一致的股票列表:\', diff_list)
                return False
            else:
                print(\'对比结果:数据一致\')
                return True
        else:
            print(\'接口没有数据\')
            return True

    def compare_vs_database(self,count=10):
        """
        比较数据库数据与数据源拼接字串
        :param count:对比股票数量,default=10
        :return:True 一致;False 不一致
        """

        diff_list = []  # 存储不一致的股票代码

        with pymssql.connect(server=\'192.168.1.1\', user=\'sa\', password=\'sa\',
                             database=\'test_db\') as myconnect:
            with  myconnect.cursor(as_dict=True) as cursor:
                cursor.execute("""SELECT top {count} StockCode,StockName,content 
                FROM [test_db].[dbo].[table] 
                where NickName =\'project000\' and isvalid = 1 and IsDeleted =0 order by createtime desc""".format(count=count))
                for row in cursor:
                    stockcode = row[\'StockCode\']
                    actual = row[\'content\']
                    expected = self.result_of_3sourceapi(stockcode)  # 数据源拼接结果

                    \'\'\'去掉pe(ttm)对比\'\'\'
                    beginindex = actual.find(\'PE(TTM)\')
                    endindex = actual.find(\'\', beginindex)
                    actual_result = actual[:beginindex] + actual[endindex:]
                    beginindex = expected.find(\'PE(TTM)\')
                    endindex = expected.find(\'\', beginindex)
                    expected_result = expected[:beginindex] + expected[endindex:]

                    if actual_result != expected_result:  # 预期实际对比
                        print(stockcode)
                        print(\'开发:\', actual_result)
                        print(\'预期:\', expected_result)
                        diff_list.append(stockcode)
                    else:
                        print(stockcode, \'一致(不含PETTM)\')

        if diff_list:
            print(\'不一致的股票列表:\', diff_list)
            return False
        else:
            print(\'对比结果:数据全部一致\')
            return True

 

 

run_test.py 执行脚本:

# coding:utf-8
"""
接口测试执行脚本
"""

import projectapi
import unittest


class ApiTest(unittest.TestCase):

    def setUp(self):
        self.projectapi1 = projectapi.ProjectApi() # 创建接口对象

    def testcase1(self):
        """与开发接口比对"""
        stockcodelist = \'600000%2C600128%2C600146%2C600165%2C600186\'
        #通过抓包获取cookie
        cookie = \'globalid=24A85DEC-AF25-36DD-C774-ED092F705767\'
    
        testresult = self.projectapi1.compare_vs_devapi(stockcodelist,cookie)
        self.assertTrue(testresult)

    def testcase2(self):
        # 与数据库对比
        testresult = self.projectapi1.compare_vs_database(count=10)
        self.assertTrue(testresult)

    def testcase3(self):
        """手工查询原数据和拼接字串"""
        while True:
            stockcode = input(\'输入股票代码: \')
            if stockcode.isdigit() and len(stockcode)==6 and stockcode[:2] in (\'00\',\'60\',\'30\'):
                print(\'数据请求中.....\')
                print(self.projectapi1.api_quote(stockcode))
                print(self.projectapi1.api_finance(stockcode))
                print(self.projectapi1.api_querystockdetailinfo(stockcode))
                print(self.projectapi1.result_of_3sourceapi(stockcode))
            else:
                print(\'股票代码有误\')

 

版权声明:本文为dinghanhua原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://www.cnblogs.com/dinghanhua/p/10376705.html