127 lines
7.2 KiB
Python
127 lines
7.2 KiB
Python
# !/usr/bin/python
|
||
# -*- coding: utf-8 -*-
|
||
import unittest,allure,ddt,time,json,requests,datetime
|
||
from datetime import datetime,timedelta
|
||
from configs.globalObj import LOGGER
|
||
from configs.globalParams import *
|
||
from YiMao.config.ApiInfo_web import *
|
||
from commons.FileHandler import ReadExcel,Txt
|
||
from commons.PgSqlLib import PGSQL
|
||
from commons.AssertLib import _assert_result
|
||
ExlFilePath = GlobalPath['YMDataPath'] + '/YMHT_ApiTestCase_0.0.1.xlsx'
|
||
ExcelContent = ReadExcel().get_data(FilePath=ExlFilePath, SheetName='05-优惠券活动')
|
||
|
||
|
||
@ddt.ddt
|
||
class Test_CouponPromotion_Module(unittest.TestCase):
|
||
def setUp(self) -> None:
|
||
self.timestamp = int(time.time())
|
||
|
||
@allure.story("管理后台:优惠券活动模块")
|
||
@ddt.data(*ExcelContent)
|
||
def test_coupon_promotion(self, data):
|
||
'''@Date:: 2024/5/20
|
||
@Desc::
|
||
管理后台 -> 优惠券活动模块接口
|
||
'''
|
||
LOGGER.info(f"---用例data信息为:{data}")
|
||
''' step01- 前置数据、接口信息描述等 '''
|
||
allure.dynamic.title(data['CaseName'])
|
||
allure.dynamic.description("描述:优惠券活动 模块接口自动化测试")
|
||
|
||
caseModule_c = data['BusinessModule'] # 用例编号
|
||
caseNO_c = data['NO.'] # 用例编号
|
||
caseName_c = data['CaseName'] # 用例名称
|
||
requestType_c = str(data['RequestType']) # 请求方式
|
||
expStaCode_c = data['ExpStatusCode'] # 预期的状态码
|
||
expRespJson_c = eval(data['RespJson']) # 预期的返回体
|
||
LOGGER.info(f">>> 开始执行->{caseModule_c}接口自动化测试 <<<\n")
|
||
LOGGER.info(f"---用例编号为:{caseNO_c}")
|
||
LOGGER.info(f"---用例名称为:{caseName_c}")
|
||
LOGGER.info(f"---请求方式为:{requestType_c}")
|
||
# LOGGER.info(f"---请求Header为:{HEADERS}")
|
||
LOGGER.info(f"---预期接口返回码为:{expStaCode_c}")
|
||
LOGGER.info(f"---预期返回体为:{expRespJson_c}\n")
|
||
|
||
''' step02- 请求接口 '''
|
||
if requestType_c == "POST":
|
||
# 新建、查询
|
||
api_url = api_env[ProCfgData['ExecutionEnv']] + ManagementPlatformApi_zh[caseModule_c] # 请求地址
|
||
LOGGER.info(f"---请求接口为:{api_url}")
|
||
payload_c = json.dumps(eval(data['Payload']), indent=4, ensure_ascii=False) # 请求体为json格式
|
||
LOGGER.info(f"---请求体为:{payload_c}\n")
|
||
response = requests.request(method=requestType_c, url=api_url, headers=HEADERS, data=payload_c)
|
||
rspJson = response.json()
|
||
LOGGER.info(f"---接口返回状态码为:: {response.status_code}")
|
||
LOGGER.info(f"---接口返回体为:: {json.dumps(response.json(), indent=4, ensure_ascii=False)}\n")
|
||
if caseModule_c == "新建优惠券活动":
|
||
if "存为草稿" in caseName_c:
|
||
CouponActID = rspJson["data"]['id']
|
||
writeResult = Txt().append_write_txt("05_CouponDrawID.txt", str(CouponActID))
|
||
if not writeResult:
|
||
LOGGER.error(f"---写入 优惠券活动(新建)ID 失败!")
|
||
elif "复制" in caseName_c:
|
||
CouponActID = rspJson["data"]['id']
|
||
writeResult = Txt().append_write_txt("05_CopyCouponActID.txt", str(CouponActID))
|
||
if not writeResult:
|
||
LOGGER.error(f"---写入 优惠券活动(新建)ID 失败!")
|
||
else:
|
||
CouponActID = rspJson["data"]['id']
|
||
writeResult = Txt().append_write_txt("05_CreateCouponActID.txt", str(CouponActID))
|
||
if not writeResult:
|
||
LOGGER.error(f"---写入 优惠券活动(草稿)ID 失败!")
|
||
elif caseName_c == "修改并发布优惠券活动":
|
||
ActIDList = (Txt().read_txt("05_CreateCouponActID.txt"))
|
||
LOGGER.info(f"---新建并发布的 优惠券活动ID list为:{ActIDList}")
|
||
api_url = api_env[ProCfgData['ExecutionEnv']] + ManagementPlatformApi_zh[caseModule_c] # 请求地址
|
||
last_api_url = api_url.replace("{CouponID}", ActIDList[0])
|
||
LOGGER.info(f"---请求接口为:{last_api_url}")
|
||
payload_c = json.dumps(eval(data['Payload']), indent=4, ensure_ascii=False) # 请求体为json格式
|
||
LOGGER.info(f"---请求体为:{payload_c}\n")
|
||
time.sleep(0.5)
|
||
response = requests.request(method=requestType_c, url=last_api_url, headers=HEADERS, data=payload_c)
|
||
rspJson = response.json()
|
||
LOGGER.info(f"---接口返回状态码为:: {response.status_code}")
|
||
LOGGER.info(f"---接口返回体为:: {json.dumps(response.json(), indent=4, ensure_ascii=False)}\n")
|
||
else:
|
||
if requestType_c == "PUT":
|
||
ActIDList = (Txt().read_txt("05_CreateCouponActID.txt"))
|
||
LOGGER.info(f"---新建并发布的 优惠券活动ID list为:{ActIDList}")
|
||
else:
|
||
ActIDList = (Txt().read_txt("05_CopyCouponActID.txt") +
|
||
Txt().read_txt("05_CouponDrawID.txt"))
|
||
LOGGER.info(f"---存为草稿&复制的 优惠券活动ID list为:{ActIDList}")
|
||
for i in range(len(ActIDList)):
|
||
# 暂停、重启、结束
|
||
ActId = ActIDList[i]
|
||
if ActId not in [None, '']:
|
||
api_url = api_env[ProCfgData['ExecutionEnv']] + ManagementPlatformApi_zh[caseModule_c] # 请求地址
|
||
last_api_url = api_url.replace("{CouponID}", str(ActId))
|
||
LOGGER.info(f"---请求接口为:{last_api_url}")
|
||
payload_c = json.dumps(eval(data['Payload']), indent=4, ensure_ascii=False) # 请求体为json格式
|
||
LOGGER.info(f"---请求体为:{payload_c}\n")
|
||
time.sleep(0.5)
|
||
response = requests.request(method=requestType_c, url=last_api_url, headers=HEADERS, data=payload_c)
|
||
rspJson = response.json()
|
||
LOGGER.info(f"---接口返回状态码为:: {response.status_code}")
|
||
LOGGER.info(f"---接口返回体为:: {json.dumps(response.json(), indent=4, ensure_ascii=False)}\n")
|
||
''' step03- 断言处理 '''
|
||
assert _assert_result(rspJson, expRespJson_c, LOGGER) == True, AssertionError(f"---断言失败!")
|
||
LOGGER.info(f">>> {caseModule_c}接口请求 & 断言执行结束 <<<\n\n")
|
||
|
||
@classmethod
|
||
def tearDownClass(cls):
|
||
LOGGER.info(f">>> 优惠券活动模块接口自动化测试所有用例执行完毕 <<<\n")
|
||
LOGGER.info(f">>> 开始执行清理-优惠券活动模块-测试数据 <<<\n")
|
||
ActIDList = (Txt().read_txt("05_CreateCouponActID.txt") +
|
||
Txt().read_txt("05_CouponDrawID.txt") +
|
||
Txt().read_txt("05_CopyCouponActID.txt")) #
|
||
LOGGER.info(f"---待删除的优惠券活动ID list为:{ActIDList}")
|
||
res = PGSQL().del_coupon_activity_data(ActIDList) #
|
||
LOGGER.info(f"---清除自动化测试数据执行结果为:{res}")
|
||
LOGGER.info(f">>> 清理-优惠券活动模块-测试数据完毕 <<<\n")
|
||
|
||
if __name__ == '__main__':
|
||
unittest.main(verbosity=2)
|
||
|