Arthur-Wu commit

This commit is contained in:
ieemoo 2024-11-11 17:16:31 +08:00
parent 0550884b67
commit fd92879ff1
36 changed files with 1505 additions and 8 deletions

18
.gitignore vendored Normal file
View File

@ -0,0 +1,18 @@
# Build and Release Folders
bin-debug/
bin-release/
[Oo]bj/
[Bb]in/
# Other files and folders
.settings/
# Executables
*.swf
*.air
*.ipa
*.apk
# Project files, i.e. `.project`, `.actionScriptProperties` and `.flexProperties`
# should NOT be excluded as they contain compiler settings and other important
# information for Eclipse / Flash Builder.

20
MAIN_YM.py Normal file
View File

@ -0,0 +1,20 @@
# !/usr/bin/python
# -*- coding: utf-8 -*-
# This is a sample Python script.
# Press Shift+F10 to execute it or replace it with your code.
# Press Double Shift to search everywhere for classes, files, tool windows, actions, and settings.
def print_hi(name):
from commons.EngineX import Engine, NotificationModule
# Use a breakpoint in the code line below to debug your script.
print(f'--- Hi, {name} ---\n') # Press Ctrl+F8 to toggle the breakpoint.
print(f"=== Start of Test Suite ===")
return_list = Engine().run_test_suite()
NotificationModule().send_msg(return_list)
print(f"=== End of Test Suite ===")
# Press the green button in the gutter to run the script.
if __name__ == '__main__':
print_hi("YiMao Autotest")

35
README.md Normal file
View File

@ -0,0 +1,35 @@
# YiMaoAutotest
#### 介绍
亿猫自动化仓库创建于20241008
#### 软件架构
软件架构说明
1. 项目概况
后台接口自动化工程
#### 安装教程
1. xxxx
2. xxxx
3. xxxx
#### 使用说明
1. xxxx
2. xxxx
3. xxxx
#### 参与贡献
1. Fork 本仓库
2. 新建 Feat_xxx 分支
3. 提交代码
4. 新建 Pull Request
#### 特技
1. 使用 Readme\_XXX.md 来支持不同的语言,例如 Readme\_en.md, Readme\_zh.md
2. Gitee 官方博客 [blog.gitee.com](https://blog.gitee.com)
3. 你可以 [https://gitee.com/explore](https://gitee.com/explore) 这个地址来了解 Gitee 上的优秀开源项目
4. [GVP](https://gitee.com/gvp) 全称是 Gitee 最有价值开源项目,是综合评定出的优秀开源项目
5. Gitee 官方提供的使用手册 [https://gitee.com/help](https://gitee.com/help)
6. Gitee 封面人物是一档用来展示 Gitee 会员风采的栏目 [https://gitee.com/gitee-stars/](https://gitee.com/gitee-stars/)

5
YiMao/__init__.py Normal file
View File

@ -0,0 +1,5 @@
# !/usr/bin/python
# -*- coding: utf-8 -*-
# @Author:: Arthur Wu
# @Date:: 2024/5/21-15:36
# @Description::

View File

@ -1 +0,0 @@
业务方法目录

59
YiMao/config/ApiInfo.py Normal file
View File

@ -0,0 +1,59 @@
# !/usr/bin/python
# -*- coding: utf-8 -*-
# @Author:: Arthur Wu
# @Date:: 2024/10/14-11:50
# @Description:: API接口信息
api_env = {
"test_env": "https://api.test.yimaogo.com/",
"gray_env": "https://api.yimaogo.com/",
}
ManagementPlatformApi_zh = {
"编辑超市系统参数": "admin/system/settings", # 超市管理 (MarketID可以通过查询超市接口获取)
"编辑门店系统参数": "admin/system/settings", # 门店管理 (StoreID可以通过查询门店接口获取)
"删除门店系统参数": "admin/system/settings/{StoreID}",
"新建促销商品白名单": "admin/goods/promotion",
"修改与删除促销商品白名单": "admin/goods/promotion/{GoodsWhiteListID}", # GoodsWhiteListID可以通过新建时过程存储调用删除接口时再读取
"新建重量放通白名单": "admin/goods/weight/promotion",
"修改与删除重量放通白名单": "admin/goods/weight/promotion/{WeightWhiteListID}", # WeightWhiteListID可以通过新建时过程存储其他接口使用时再读取
"新建派样活动": "admin/sample/activity", # 派样活动
"查询派样活动": "admin/sample/activity/list",
"修改与删除派样活动": "admin/sample/activity/{SampleActivityID}", # :可以通过新建时过程存储,其他接口使用时再读取
"暂停派样活动": "admin/sample/activity/change/status/{SampleActivityID}?status=3",
"重新发布派样活动": "admin/sample/activity/change/status/{SampleActivityID}?status=2",
"立即结束派样活动": "admin/sample/activity/change/status/{SampleActivityID}?status=4",
"复制派样活动": "admin/sample/activity",
"新建抽奖活动": "admin/lotteryAct", # 抽奖活动
"查询抽奖活动": "admin/lotteryAct/list",
"修改与删除抽奖活动": "admin/lotteryAct/{LotteryActID}",
"编辑抽奖活动状态": "admin/lotteryAct/modifyStatus/{LotteryActID}", # :可以通过新建时过程存储,其他接口使用时再读取
"复制抽奖活动": "admin/lotteryAct/copy/{LotteryActID}",
"新建优惠券活动": "admin/coupon/setting", # 优惠券活动
"查询优惠券活动": "admin/coupon/setting/list",
"暂停优惠券活动": "admin/coupon/setting/status/{CouponID}?status=6", # :可以通过新建时过程存储,其他接口使用时再读取
"重新开启优惠券活动": "admin/coupon/setting/status/{CouponID}?status=4",
"结束投放优惠券活动": "admin/coupon/setting/status/{CouponID}?status=7",
"修改与删除优惠券活动": "admin/coupon/setting/{CouponID}",
"复制优惠券活动": "admin/coupon/setting",
"新建广告": "admin/ads", # 广告活动
"复制广告": "admin/ads", # 广告活动
"查询广告": "admin/ads/list",
"修改与删除广告": "admin/ads/{ADsID}", # :可以通过新建时过程存储,其他接口使用时再读取
"暂停广告投放": "admin/ads/status/{ADsID}?status=6",
"重新投放广告": "admin/ads/status/{ADsID}?status=4",
"结束投放广告": "admin/ads/status/{ADsID}?status=7",
"新建版本信息": "admin/apk", # 版本管理
"查询版本信息": "admin/apk/list",
"修改与删除版本信息": "admin/apk/{VersionID}", # :可以通过新建时过程存储,其他接口使用时再读取
"新建升级任务": "admin/upgrade/settings", # 升级任务
"查询升级任务": "admin/upgrade/settings/list",
"修改与删除升级任务": "admin/upgrade/settings/{UpgradeID}", # :可以通过新建时过程存储,其他接口使用时再读取
"启禁升级任务": "admin/upgrade/settings/active/{UpgradeID}",
}

5
YiMao/config/__init__.py Normal file
View File

@ -0,0 +1,5 @@
# !/usr/bin/python
# -*- coding: utf-8 -*-
# @Author:: Arthur Wu
# @Date:: 2024/5/21-15:36
# @Description::

View File

@ -1 +0,0 @@
# 项目级配置文件

21
YiMao/conftest.py Normal file
View File

@ -0,0 +1,21 @@
# -*- coding: utf-8 -*-
import pytest
'''@Author:: Arthur Wu
@Date:: 2024/05/21
@Description:: This is the webservices library conftest file
'''
@pytest.fixture(scope='function', autouse=True)
def init_webservices():
"""@Author:: Arthur
@Date:: 2022/05/19
@Description::
:return:
"""
print("function setup")
yield
print("function teardwon")

Binary file not shown.

Binary file not shown.

View File

@ -1 +0,0 @@
测试用例

View File

View File

View File

@ -1 +0,0 @@
自动化报告目录

View File

@ -0,0 +1,5 @@
# !/usr/bin/python
# -*- coding: utf-8 -*-
# @Author:: Arthur Wu
# @Date:: 2024/10/21-9:21
# @Description::

View File

@ -1 +0,0 @@
项目自动化脚本目录

View File

@ -0,0 +1,61 @@
# !/usr/bin/python
# -*- coding: utf-8 -*-
import unittest,allure,ddt,time,json,requests
from configs.globalObj import LOGGER
from configs.globalParams import *
from YiMao.config.ApiInfo import *
from commons.FileHandler import ReadExcel
from commons.AssertLib import _assert_result
ExlFilePath = GlobalPath['YMDataPath'] + '/YMHT_ApiTestCase_0.0.1.xlsx'
ExcelContent = ReadExcel().get_data(FilePath=ExlFilePath, SheetName='01-超市管理')
@ddt.ddt
class Test_MarketManagementModule(unittest.TestCase):
def setUp(self) -> None:
self.timestamp = int(time.time())
@allure.story('[管理平台] 超市管理模块')
@ddt.data(*ExcelContent)
def test_market_system_parameters(self, data):
'''@Date:: 2024/5/20
@Desc::
管理后台 -> 超市管理模块接口
'''
''' step01 '''
allure.dynamic.title(data['CaseName'])
allure.dynamic.description("描述:超市管理模块接口自动化测试")
caseModule_c = data['BusinessModule'] # 用例编号
caseNO_c = data['NO.'] # 用例编号
caseName_c = data['CaseName'] # 用例名称
requestType_c = str(data['RequestType']) # 请求方式
expStaCode_c = data['ExpStatusCode'] # 预期的状态码
expRespJson_c = eval(data['RespJson']) # 预期的返回体
LOGGER.info(f">>> 开始执行->{caseModule_c}接口自动化测试 <<<\n")
LOGGER.info(f"---用例编号为:{caseNO_c}")
LOGGER.info(f"---用例名称为:{caseName_c}")
LOGGER.info(f"---请求方式为:{requestType_c}")
# LOGGER.info(f"---请求Header为{HEADERS}")
LOGGER.info(f"---预期接口返回码为:{expStaCode_c}")
LOGGER.info(f"---预期返回体为:{expRespJson_c}\n")
''' step02- 请求接口 '''
time.sleep(1)
api_url = api_env[ProCfgData['ExecutionEnv']] + ManagementPlatformApi_zh[data['BusinessModule']] + "/64" # 请求地址
LOGGER.info(f"---请求接口为:{api_url}")
payload_c = json.dumps(eval(data['Payload']), indent=4, ensure_ascii=False) # 请求体为json格式
LOGGER.info(f"---请求体为:{payload_c}\n")
response = requests.request(method=requestType_c, url=api_url, headers=HEADERS, data=payload_c)
rspJson = response.json()
rspJsonFormat = json.dumps(response.json(), indent=4, ensure_ascii=False)
LOGGER.info(f"---接口返回状态码为:: {response.status_code}")
LOGGER.info(f"---接口返回体为:: {rspJsonFormat}\n")
''' step03- 断言处理 '''
assert _assert_result(rspJson, expRespJson_c, LOGGER) == True, f"---断言失败!"
LOGGER.info(f">>> {data['BusinessModule']}接口请求 & 断言执行结束 <<<\n\n")
if __name__ == '__main__':
unittest.main(verbosity=2)

View File

@ -0,0 +1,145 @@
# !/usr/bin/python
# -*- coding: utf-8 -*-
import unittest,allure,ddt,time,json,requests
from configs.globalObj import LOGGER
from configs.globalParams import *
from YiMao.config.ApiInfo import *
from commons.FileHandler import ReadExcel,Txt
from commons.AssertLib import _assert_result
ExlFilePath = GlobalPath['YMDataPath'] + '/YMHT_ApiTestCase_0.0.1.xlsx'
ExcelContent = ReadExcel().get_data(FilePath=ExlFilePath, SheetName='02-门店管理')
@ddt.ddt
class Test_StoreManagementModule(unittest.TestCase):
def setUp(self) -> None:
self.timestamp = int(time.time())
@allure.story('[管理平台] 门店管理模块')
@ddt.data(*ExcelContent)
def test_store_management(self, data):
'''@Date:: 2024/5/20
@Desc::
管理后台 -> 门店管理模块接口
'''
''' step01获取测试用例数据 '''
allure.dynamic.title(data['CaseName'])
allure.dynamic.description("描述:门店管理模块接口自动化测试")
caseModule_c = data['BusinessModule'] # 用例编号
caseNO_c = data['NO.'] # 用例编号
caseName_c = data['CaseName'] # 用例名称
requestType_c = str(data['RequestType']) # 请求方式
expStaCode_c = data['ExpStatusCode'] # 预期的状态码
expRespJson_c = eval(data['RespJson']) # 预期的返回体
LOGGER.info(f">>> 开始执行->{caseModule_c}接口自动化测试 <<<\n")
LOGGER.info(f"---用例编号为:{caseNO_c}")
LOGGER.info(f"---用例名称为:{caseName_c}")
LOGGER.info(f"---请求方式为:{requestType_c}")
# LOGGER.info(f"---请求Header为{HEADERS}")
LOGGER.info(f"---预期接口返回码为:{expStaCode_c}")
LOGGER.info(f"---预期返回体为:{expRespJson_c}\n")
''' step02请求接口 '''
if data['BusinessModule'] == '删除门店系统参数':
time.sleep(0.5)
sysSettingID = 97
storeId = 65
api_url = api_env[ProCfgData['ExecutionEnv']] + ManagementPlatformApi_zh[data['BusinessModule']] # 请求地址
last_api_url = api_url.replace("{StoreID}", str(sysSettingID))
LOGGER.info(f"---请求接口为:{last_api_url}")
payload_c = json.dumps(eval(data['Payload']), indent=4, ensure_ascii=False) # 请求体为json格式
LOGGER.info(f"---请求体为:{payload_c}\n")
response = requests.request(method=requestType_c, url=last_api_url, headers=HEADERS, data=payload_c)
rspJson = response.json()
rspJsonFormat = json.dumps(response.json(), indent=4, ensure_ascii=False)
LOGGER.info(f"---接口返回状态码为:: {response.status_code}")
LOGGER.info(f"---接口返回体为:: {rspJsonFormat}\n")
elif data['BusinessModule'] == '修改与删除促销商品白名单':
GoodsWhiteListIDList = Txt().read_txt("GoodsWhiteListID.txt")
LOGGER.info(f"---促销商品白名单ID列表为{GoodsWhiteListIDList}")
if requestType_c == "DELETE":
for i in range(len(GoodsWhiteListIDList)):
if GoodsWhiteListIDList[i] not in [None, '']:
api_url = api_env[ProCfgData['ExecutionEnv']] + ManagementPlatformApi_zh[
data['BusinessModule']] # 请求地址
last_api_url = api_url.replace("{GoodsWhiteListID}", str(GoodsWhiteListIDList[i]))
LOGGER.info(f"---请求接口为:{last_api_url}")
payload_c = json.dumps(eval(data['Payload']), indent=4, ensure_ascii=False) # 请求体为json格式
LOGGER.info(f"---请求体为:{payload_c}\n")
time.sleep(1)
response = requests.request(method=requestType_c, url=last_api_url, headers=HEADERS, data=payload_c)
rspJson = response.json()
rspJsonFormat = json.dumps(response.json(), indent=4, ensure_ascii=False)
LOGGER.info(f"---接口返回状态码为:: {response.status_code}")
LOGGER.info(f"---接口返回体为:: {rspJsonFormat}\n")
else:
api_url = api_env[ProCfgData['ExecutionEnv']] + ManagementPlatformApi_zh[data['BusinessModule']] # 请求地址
last_api_url = api_url.replace("{GoodsWhiteListID}", str(GoodsWhiteListIDList[-2]))
LOGGER.info(f"---请求接口为:{last_api_url}")
payload_c = json.dumps(eval(data['Payload']), indent=4, ensure_ascii=False) # 请求体为json格式
LOGGER.info(f"---请求体为:{payload_c}\n")
response = requests.request(method=requestType_c, url=last_api_url, headers=HEADERS, data=payload_c)
rspJson = response.json()
rspJsonFormat = json.dumps(response.json(), indent=4, ensure_ascii=False)
LOGGER.info(f"---接口返回状态码为:: {response.status_code}")
LOGGER.info(f"---接口返回体为:: {rspJsonFormat}\n")
elif data['BusinessModule'] == '修改与删除重量放通白名单':
WeightWhiteListIDList = Txt().read_txt("WeightWhiteListID.txt")
LOGGER.info(f"---删除重量放通白名单ID列表为{WeightWhiteListIDList}")
if requestType_c == "DELETE":
for j in range(len(WeightWhiteListIDList)):
if WeightWhiteListIDList[j] not in [None, '']:
api_url = api_env[ProCfgData['ExecutionEnv']] + ManagementPlatformApi_zh[
data['BusinessModule']] # 请求地址
last_api_url2 = api_url.replace("{WeightWhiteListID}", str(WeightWhiteListIDList[j]))
LOGGER.info(f"---请求接口为:{last_api_url2}")
payload_c = json.dumps(eval(data['Payload']), indent=4, ensure_ascii=False) # 请求体为json格式
LOGGER.info(f"---请求体为:{payload_c}\n")
time.sleep(0.5)
response = requests.request(method=requestType_c, url=last_api_url2, headers=HEADERS, data=payload_c)
rspJson = response.json()
rspJsonFormat = json.dumps(response.json(), indent=4, ensure_ascii=False)
LOGGER.info(f"---删除第 {j+1} 条数据,接口返回状态码为:: {response.status_code}")
LOGGER.info(f"---第 {j+1} 条数据,接口返回体为:: {rspJsonFormat}\n")
else:
api_url = api_env[ProCfgData['ExecutionEnv']] + ManagementPlatformApi_zh[data['BusinessModule']] # 请求地址
last_api_url = api_url.replace("{WeightWhiteListID}", str(WeightWhiteListIDList[0]))
LOGGER.info(f"---请求接口为:{last_api_url}")
payload_c = json.dumps(eval(data['Payload']), indent=4, ensure_ascii=False) # 请求体为json格式
LOGGER.info(f"---请求体为:{payload_c}\n")
response = requests.request(method=requestType_c, url=last_api_url, headers=HEADERS, data=payload_c)
rspJson = response.json()
rspJsonFormat = json.dumps(response.json(), indent=4, ensure_ascii=False)
LOGGER.info(f"---接口返回状态码为:: {response.status_code}")
LOGGER.info(f"---接口返回体为:: {rspJsonFormat}\n")
else:
api_url = api_env[ProCfgData['ExecutionEnv']] + ManagementPlatformApi_zh[data['BusinessModule']] # 请求地址
LOGGER.info(f"---请求接口为:{api_url}")
payload_c = json.dumps(eval(data['Payload']), indent=4, ensure_ascii=False) # 请求体为json格式
LOGGER.info(f"---请求体为:{payload_c}\n")
response = requests.request(method=requestType_c, url=api_url, headers=HEADERS, data=payload_c)
rspJson = response.json()
rspJsonFormat = json.dumps(response.json(), indent=4, ensure_ascii=False)
LOGGER.info(f"---接口返回状态码为:: {response.status_code}")
LOGGER.info(f"---接口返回体为:: {rspJsonFormat}\n")
if data['BusinessModule'] == '新建促销商品白名单':
time.sleep(0.5)
goodsWhiteListID = rspJson["data"]['id']
writeResult = Txt().append_write_txt("GoodsWhiteListID.txt", str(goodsWhiteListID))
if not writeResult:
LOGGER.error(f"---写入 新建促销商品白名单ID 失败!")
elif data['BusinessModule'] == '新建重量放通白名单':
time.sleep(0.5)
weightWhiteListID = rspJson["data"]['id']
writeResult = Txt().append_write_txt("WeightWhiteListID.txt", str(weightWhiteListID))
if not writeResult:
LOGGER.error(f"---写入 新建重量放通白名单ID 失败!")
''' step03: 断言处理 '''
assert _assert_result(rspJson, expRespJson_c, LOGGER) == True, f"---断言失败!"
LOGGER.info(f">>> {data['BusinessModule']}接口请求 & 断言执行结束 <<<\n\n")
if __name__ == '__main__':
unittest.main(verbosity=2)

View File

@ -0,0 +1,110 @@
# !/usr/bin/python
# -*- coding: utf-8 -*-
import unittest,allure,ddt,time,json,requests,datetime
from datetime import datetime,timedelta
from configs.globalObj import LOGGER
from configs.globalParams import *
from YiMao.config.ApiInfo import *
from commons.FileHandler import ReadExcel,Txt
from commons.PgSqlLib import PGSQL
from commons.AssertLib import _assert_result
ExlFilePath = GlobalPath['YMDataPath'] + '/YMHT_ApiTestCase_0.0.1.xlsx'
ExcelContent = ReadExcel().get_data(FilePath=ExlFilePath, SheetName='03-派样活动')
@ddt.ddt
class Test_SampleDistributionActivity_Module(unittest.TestCase):
@allure.story('[管理平台] 派样活动模块')
@ddt.data(*ExcelContent)
def test_sample_distribution_activity(self, data):
'''@Date:: 2024/5/20
@Desc::
管理后台 -> 派样活动模块接口
'''
''' step01获取测试用例数据 '''
allure.dynamic.title(data['CaseName'])
allure.dynamic.description("描述:派样活动模块接口自动化测试")
caseModule_c = data['BusinessModule'] # 用例编号
caseNO_c = data['NO.'] # 用例编号
caseName_c = data['CaseName'] # 用例名称
requestType_c = str(data['RequestType']) # 请求方式
expStaCode_c = data['ExpStatusCode'] # 预期的状态码
expRespJson_c = eval(data['RespJson']) # 预期的返回体
LOGGER.info(f">>> 开始执行->{caseModule_c}接口自动化测试 <<<\n")
LOGGER.info(f"---用例编号为:{caseNO_c}")
LOGGER.info(f"---用例名称为:{caseName_c}")
LOGGER.info(f"---请求方式为:{requestType_c}")
# LOGGER.info(f"---请求Header为{HEADERS}")
LOGGER.info(f"---预期接口返回码为:{expStaCode_c}")
LOGGER.info(f"---预期返回体为:{expRespJson_c}\n")
''' step02- 请求接口
执行复制接口时存储对应 rspJson["data"]["id"]为删除接口使用
'''
if data['BusinessModule'] in ["新建派样活动", "复制派样活动", "查询派样活动"]:
# 新建、查询、复制
api_url = api_env[ProCfgData['ExecutionEnv']] + ManagementPlatformApi_zh[data['BusinessModule']] # 请求地址
LOGGER.info(f"---请求接口为:{api_url}")
# LOGGER.info(f"---处理前,请求体为:{data['Payload']}\n")
payload_c = json.dumps(eval(data['Payload']), indent=4, ensure_ascii=False) # 请求体为json格式
LOGGER.info(f"---处理后,请求体为:{payload_c}\n")
response = requests.request(method=requestType_c, url=api_url, headers=HEADERS, data=payload_c)
rspJson = response.json()
LOGGER.info(f"---接口返回状态码为:: {response.status_code}")
LOGGER.info(f"---接口返回体为:: {json.dumps(response.json(), indent=4, ensure_ascii=False)}\n")
time.sleep(0.5)
if data['BusinessModule'] == "新建派样活动":
if "存为草稿" in caseName_c:
SampleActivityID = rspJson["data"]['id']
writeResult = Txt().append_write_txt(
"03_DraftOfActivityID.txt", str(SampleActivityID)
)
if not writeResult:
LOGGER.error(f"---写入 派样活动(新建)ID 失败!")
else:
SampleActivityID = rspJson["data"]['id']
writeResult = Txt().append_write_txt(
"03_CreateSampleActivityID.txt", str(SampleActivityID)
)
if not writeResult:
LOGGER.error(f"---写入 派样活动(草稿)ID 失败!")
elif data['BusinessModule'] == "复制派样活动":
SampleActivityID = rspJson["data"]['id']
# writeResult = Txt().append_write_txt("03_CopySampleActivityID.txt", str(SampleActivityID))
writeResult = Txt().append_write_txt(
"03_CreateSampleActivityID.txt", str(SampleActivityID)
)
if not writeResult:
LOGGER.error(f"---写入 派样活动(复制)ID 失败!")
else:
sampleActIDList = Txt().read_txt("03_CreateSampleActivityID.txt")
LOGGER.info(f"---新建并发布的派样活动ID list为{sampleActIDList}")
api_url = api_env[ProCfgData['ExecutionEnv']] + ManagementPlatformApi_zh[data['BusinessModule']] # 请求地址
last_api_url = api_url.replace("{SampleActivityID}", str(sampleActIDList[0]))
LOGGER.info(f"---请求接口为:{last_api_url}")
payload_c = json.dumps(eval(data['Payload']), indent=4, ensure_ascii=False) # 请求体为json格式
LOGGER.info(f"---请求体为:{payload_c}\n")
response = requests.request(method=requestType_c, url=last_api_url, headers=HEADERS, data=payload_c)
rspJson = response.json()
rspJsonFormat = json.dumps(response.json(), indent=4, ensure_ascii=False)
LOGGER.info(f"---接口返回状态码为:: {response.status_code}")
LOGGER.info(f"---接口返回体为:: {rspJsonFormat}\n")
''' step03- 断言处理 '''
assert _assert_result(rspJson, expRespJson_c, LOGGER) == True, f"---断言失败!"
LOGGER.info(f">>> {data['BusinessModule']}接口请求 & 断言执行结束 <<<\n\n")
@classmethod
def tearDownClass(cls):
LOGGER.info(f">>> 派样活动模块接口自动化测试所有用例执行完毕 <<<\n")
LOGGER.info(f">>> 开始执行清理-派样活动模块-测试数据 <<<\n")
sampActIDList = (Txt().read_txt("03_CreateSampleActivityID.txt") +
Txt().read_txt("03_DraftOfActivityID.txt"))
LOGGER.info(f"---待删除的派样活动ID list为{sampActIDList}")
PGSQL().del_sample_activity_data(sampActIDList)
LOGGER.info(f">>> 清理-派样活动模块-测试数据完毕 <<<\n")
if __name__ == '__main__':
unittest.main(verbosity=2)

View File

@ -0,0 +1,112 @@
# !/usr/bin/python
# -*- coding: utf-8 -*-
import unittest,allure,ddt,json,requests
from configs.globalObj import LOGGER
from configs.globalParams import *
from YiMao.config.ApiInfo import *
from commons.FileHandler import ReadExcel,Txt
from commons.AssertLib import _assert_result
ExlFilePath = GlobalPath['YMDataPath'] + '/YMHT_ApiTestCase_0.0.1.xlsx'
ExcelContent = ReadExcel().get_data(FilePath=ExlFilePath, SheetName='04-抽奖活动')
@ddt.ddt
class Test_LotteryDraw_Module(unittest.TestCase):
@allure.story("[管理后台] 抽奖活动模块")
@ddt.data(*ExcelContent)
def test_lottery_draw(self, data):
'''@Date:: 2024/10/25
@Desc::
管理后台 -> 抽奖活动 模块接口
'''
''' step01- 前置数据、接口信息描述等 '''
allure.dynamic.title(data['CaseName'])
allure.dynamic.description("描述:抽奖活动 模块接口自动化测试")
caseModule_c = data['BusinessModule'] # 用例编号
caseNO_c = data['NO.'] # 用例编号
caseName_c = data['CaseName'] # 用例名称
requestType_c = str(data['RequestType']) # 请求方式
expStaCode_c = data['ExpStatusCode'] # 预期的状态码
expRespJson_c = eval(data['RespJson']) # 预期的返回体
LOGGER.info(f">>> 开始执行->{caseModule_c}接口自动化测试 <<<\n")
LOGGER.info(f"---用例编号为:{caseNO_c}")
LOGGER.info(f"---用例名称为:{caseName_c}")
LOGGER.info(f"---请求方式为:{requestType_c}")
# LOGGER.info(f"---请求Header为{HEADERS}")
LOGGER.info(f"---预期接口返回码为:{expStaCode_c}")
LOGGER.info(f"---预期返回体为:{expRespJson_c}\n")
''' step02- 请求接口 '''
rspJson = {}
if caseModule_c in ["新建抽奖活动", "查询抽奖活动"]:
# 新建、查询
api_url = api_env[ProCfgData['ExecutionEnv']] + ManagementPlatformApi_zh[caseModule_c] # 请求地址
LOGGER.info(f"---请求接口为:{api_url}")
payload_c = json.dumps(eval(data['Payload']), indent=4, ensure_ascii=False) # 请求体为json格式
LOGGER.info(f"---请求体为:{payload_c}\n")
response = requests.request(method=requestType_c, url=api_url, headers=HEADERS, data=payload_c)
rspJson = response.json()
LOGGER.info(f"---接口返回状态码为:: {response.status_code}")
LOGGER.info(f"---接口返回体为:: {json.dumps(response.json(), indent=4, ensure_ascii=False)}\n")
if caseModule_c == "新建抽奖活动":
if "存为草稿" in caseName_c:
LottreyActID = rspJson["data"]['id']
writeResult = Txt().append_write_txt("04_LottreyDrawID.txt", str(LottreyActID))
if not writeResult:
LOGGER.error(f"---写入 抽奖活动(新建)ID 失败!")
else:
LottreyActID = rspJson["data"]['id']
writeResult = Txt().append_write_txt("04_CreateLottreyActID.txt", str(LottreyActID))
if not writeResult:
LOGGER.error(f"---写入 抽奖活动(草稿)ID 失败!")
else:
# 编辑抽奖活动状态
if "存为草稿" in caseName_c:
lotteryActIDList = Txt().read_txt("04_LottreyDrawID.txt")
LOGGER.info(f"---新建抽奖活动 存为草稿的活动ID list为{lotteryActIDList}")
elif requestType_c == "DELETE":
lotteryActIDList = (Txt().read_txt("04_CreateLottreyActID.txt") +
Txt().read_txt("04_LottreyDrawID.txt"))
LOGGER.info(f"---新建并发布的抽奖活动ID list为{lotteryActIDList}")
else:
lotteryActIDList = Txt().read_txt("04_CreateLottreyActID.txt")
LOGGER.info(f"---新建并发布的抽奖活动ID list为{lotteryActIDList}")
if requestType_c == "DELETE" or caseModule_c == "编辑抽奖活动状态":
# 编辑抽奖活动状态、删除抽奖活动
for i in range(len(lotteryActIDList)):
lotteryId = lotteryActIDList[i]
if lotteryId not in [None, '']:
api_url = api_env[ProCfgData['ExecutionEnv']] + ManagementPlatformApi_zh[caseModule_c] # 请求地址
last_api_url = api_url.replace("{LotteryActID}", str(lotteryId))
LOGGER.info(f"---请求接口为:{last_api_url}")
payload_c = json.dumps(eval(data['Payload']), indent=4, ensure_ascii=False) # 请求体为json格式
LOGGER.info(f"---请求体为:{payload_c}\n")
time.sleep(0.5)
response = requests.request(method=requestType_c, url=last_api_url, headers=HEADERS, data=payload_c)
rspJson = response.json()
LOGGER.info(f"---接口返回状态码为:: {response.status_code}")
LOGGER.info(f"---接口返回体为:: {json.dumps(response.json(), indent=4, ensure_ascii=False)}\n")
else:
# 编辑抽奖活动、复制抽奖活动
lotteryId = lotteryActIDList[0]
if lotteryId not in [None, '']:
api_url = api_env[ProCfgData['ExecutionEnv']] + ManagementPlatformApi_zh[caseModule_c] # 请求地址
last_api_url = api_url.replace("{LotteryActID}", str(lotteryId))
LOGGER.info(f"---请求接口为:{last_api_url}")
payload_c = json.dumps(eval(data['Payload']), indent=4, ensure_ascii=False) # 请求体为json格式
LOGGER.info(f"---请求体为:{payload_c}\n")
response = requests.request(method=requestType_c, url=last_api_url, headers=HEADERS, data=payload_c)
rspJson = response.json()
rspJsonFormat = json.dumps(response.json(), indent=4, ensure_ascii=False)
LOGGER.info(f"---接口返回状态码为:: {response.status_code}")
LOGGER.info(f"---接口返回体为:: {rspJsonFormat}\n")
''' step03- 断言处理 '''
assert _assert_result(rspJson, expRespJson_c, LOGGER) == True, AssertionError(f"---断言失败!")
LOGGER.info(f">>> {caseModule_c}接口请求 & 断言执行结束 <<<\n\n")
if __name__ == '__main__':
unittest.main(verbosity=2)

View File

@ -0,0 +1,126 @@
# !/usr/bin/python
# -*- coding: utf-8 -*-
import unittest,allure,ddt,time,json,requests,datetime
from datetime import datetime,timedelta
from configs.globalObj import LOGGER
from configs.globalParams import *
from YiMao.config.ApiInfo import *
from commons.FileHandler import ReadExcel,Txt
from commons.PgSqlLib import PGSQL
from commons.AssertLib import _assert_result
ExlFilePath = GlobalPath['YMDataPath'] + '/YMHT_ApiTestCase_0.0.1.xlsx'
ExcelContent = ReadExcel().get_data(FilePath=ExlFilePath, SheetName='05-优惠券活动')
@ddt.ddt
class Test_CouponPromotion_Module(unittest.TestCase):
def setUp(self) -> None:
self.timestamp = int(time.time())
@allure.story("管理后台:优惠券活动模块")
@ddt.data(*ExcelContent)
def test_coupon_promotion(self, data):
'''@Date:: 2024/5/20
@Desc::
管理后台 -> 优惠券活动模块接口
'''
LOGGER.info(f"---用例data信息为{data}")
''' step01- 前置数据、接口信息描述等 '''
allure.dynamic.title(data['CaseName'])
allure.dynamic.description("描述:优惠券活动 模块接口自动化测试")
caseModule_c = data['BusinessModule'] # 用例编号
caseNO_c = data['NO.'] # 用例编号
caseName_c = data['CaseName'] # 用例名称
requestType_c = str(data['RequestType']) # 请求方式
expStaCode_c = data['ExpStatusCode'] # 预期的状态码
expRespJson_c = eval(data['RespJson']) # 预期的返回体
LOGGER.info(f">>> 开始执行->{caseModule_c}接口自动化测试 <<<\n")
LOGGER.info(f"---用例编号为:{caseNO_c}")
LOGGER.info(f"---用例名称为:{caseName_c}")
LOGGER.info(f"---请求方式为:{requestType_c}")
# LOGGER.info(f"---请求Header为{HEADERS}")
LOGGER.info(f"---预期接口返回码为:{expStaCode_c}")
LOGGER.info(f"---预期返回体为:{expRespJson_c}\n")
''' step02- 请求接口 '''
if requestType_c == "POST":
# 新建、查询
api_url = api_env[ProCfgData['ExecutionEnv']] + ManagementPlatformApi_zh[caseModule_c] # 请求地址
LOGGER.info(f"---请求接口为:{api_url}")
payload_c = json.dumps(eval(data['Payload']), indent=4, ensure_ascii=False) # 请求体为json格式
LOGGER.info(f"---请求体为:{payload_c}\n")
response = requests.request(method=requestType_c, url=api_url, headers=HEADERS, data=payload_c)
rspJson = response.json()
LOGGER.info(f"---接口返回状态码为:: {response.status_code}")
LOGGER.info(f"---接口返回体为:: {json.dumps(response.json(), indent=4, ensure_ascii=False)}\n")
if caseModule_c == "新建优惠券活动":
if "存为草稿" in caseName_c:
CouponActID = rspJson["data"]['id']
writeResult = Txt().append_write_txt("05_CouponDrawID.txt", str(CouponActID))
if not writeResult:
LOGGER.error(f"---写入 优惠券活动(新建)ID 失败!")
elif "复制" in caseName_c:
CouponActID = rspJson["data"]['id']
writeResult = Txt().append_write_txt("05_CopyCouponActID.txt", str(CouponActID))
if not writeResult:
LOGGER.error(f"---写入 优惠券活动(新建)ID 失败!")
else:
CouponActID = rspJson["data"]['id']
writeResult = Txt().append_write_txt("05_CreateCouponActID.txt", str(CouponActID))
if not writeResult:
LOGGER.error(f"---写入 优惠券活动(草稿)ID 失败!")
elif caseName_c == "修改并发布优惠券活动":
ActIDList = (Txt().read_txt("05_CreateCouponActID.txt"))
LOGGER.info(f"---新建并发布的 优惠券活动ID list为{ActIDList}")
api_url = api_env[ProCfgData['ExecutionEnv']] + ManagementPlatformApi_zh[caseModule_c] # 请求地址
last_api_url = api_url.replace("{CouponID}", ActIDList[0])
LOGGER.info(f"---请求接口为:{last_api_url}")
payload_c = json.dumps(eval(data['Payload']), indent=4, ensure_ascii=False) # 请求体为json格式
LOGGER.info(f"---请求体为:{payload_c}\n")
time.sleep(0.5)
response = requests.request(method=requestType_c, url=last_api_url, headers=HEADERS, data=payload_c)
rspJson = response.json()
LOGGER.info(f"---接口返回状态码为:: {response.status_code}")
LOGGER.info(f"---接口返回体为:: {json.dumps(response.json(), indent=4, ensure_ascii=False)}\n")
else:
if requestType_c == "PUT":
ActIDList = (Txt().read_txt("05_CreateCouponActID.txt"))
LOGGER.info(f"---新建并发布的 优惠券活动ID list为{ActIDList}")
else:
ActIDList = (Txt().read_txt("05_CopyCouponActID.txt") +
Txt().read_txt("05_CouponDrawID.txt"))
LOGGER.info(f"---存为草稿&复制的 优惠券活动ID list为{ActIDList}")
for i in range(len(ActIDList)):
# 暂停、重启、结束
ActId = ActIDList[i]
if ActId not in [None, '']:
api_url = api_env[ProCfgData['ExecutionEnv']] + ManagementPlatformApi_zh[caseModule_c] # 请求地址
last_api_url = api_url.replace("{CouponID}", str(ActId))
LOGGER.info(f"---请求接口为:{last_api_url}")
payload_c = json.dumps(eval(data['Payload']), indent=4, ensure_ascii=False) # 请求体为json格式
LOGGER.info(f"---请求体为:{payload_c}\n")
time.sleep(0.5)
response = requests.request(method=requestType_c, url=last_api_url, headers=HEADERS, data=payload_c)
rspJson = response.json()
LOGGER.info(f"---接口返回状态码为:: {response.status_code}")
LOGGER.info(f"---接口返回体为:: {json.dumps(response.json(), indent=4, ensure_ascii=False)}\n")
''' step03- 断言处理 '''
assert _assert_result(rspJson, expRespJson_c, LOGGER) == True, AssertionError(f"---断言失败!")
LOGGER.info(f">>> {caseModule_c}接口请求 & 断言执行结束 <<<\n\n")
@classmethod
def tearDownClass(cls):
LOGGER.info(f">>> 优惠券活动模块接口自动化测试所有用例执行完毕 <<<\n")
LOGGER.info(f">>> 开始执行清理-优惠券活动模块-测试数据 <<<\n")
ActIDList = (Txt().read_txt("05_CreateCouponActID.txt") +
Txt().read_txt("05_CouponDrawID.txt") +
Txt().read_txt("05_CopyCouponActID.txt")) #
LOGGER.info(f"---待删除的优惠券活动ID list为{ActIDList}")
res = PGSQL().del_coupon_activity_data(ActIDList) #
LOGGER.info(f"---清除自动化测试数据执行结果为:{res}")
LOGGER.info(f">>> 清理-优惠券活动模块-测试数据完毕 <<<\n")
if __name__ == '__main__':
unittest.main(verbosity=2)

View File

@ -0,0 +1,112 @@
# !/usr/bin/python
# -*- coding: utf-8 -*-
import unittest,allure,ddt,time,json,requests,datetime
from datetime import datetime,timedelta
from configs.globalObj import LOGGER
from configs.globalParams import *
from YiMao.config.ApiInfo import *
from commons.FileHandler import ReadExcel,Txt
from commons.AssertLib import _assert_result
ExlFilePath = GlobalPath['YMDataPath'] + '/YMHT_ApiTestCase_0.0.1.xlsx'
ExcelContent = ReadExcel().get_data(FilePath=ExlFilePath, SheetName='06-广告管理')
@ddt.ddt
class Test_ADManagement_Module(unittest.TestCase):
@allure.story("管理后台:广告管理模块")
@ddt.data(*ExcelContent)
def test_ad_management(self, data):
'''@Date:: 2024/5/20
@Desc::
管理后台 -> 广告管理模块接口
'''
''' step01- 前置数据、接口信息描述等 '''
allure.dynamic.title(data['CaseName'])
allure.dynamic.description("描述:广告管理 模块接口自动化测试")
caseModule_c = data['BusinessModule'] # 用例编号
caseNO_c = data['NO.'] # 用例编号
caseName_c = data['CaseName'] # 用例名称
requestType_c = str(data['RequestType']) # 请求方式
expStaCode_c = data['ExpStatusCode'] # 预期的状态码
expRespJson_c = eval(data['RespJson']) # 预期的返回体
LOGGER.info(f">>> 开始执行->{caseModule_c}接口自动化测试 <<<\n")
LOGGER.info(f"---用例编号为:{caseNO_c}")
LOGGER.info(f"---用例名称为:{caseName_c}")
LOGGER.info(f"---请求方式为:{requestType_c}")
# LOGGER.info(f"---请求Header为{HEADERS}")
LOGGER.info(f"---预期接口返回码为:{expStaCode_c}")
LOGGER.info(f"---预期返回体为:{expRespJson_c}\n")
''' step02- 请求接口 '''
if requestType_c == 'POST':
# 新建、查询、复制
timeStamp = str(int(time.time()))
api_url = api_env[ProCfgData['ExecutionEnv']] + ManagementPlatformApi_zh[caseModule_c] # 请求地址
LOGGER.info(f"---请求接口为:{api_url}")
payload_c = json.dumps(eval(data['Payload']), indent=4, ensure_ascii=False) # 请求体为json格式
LOGGER.info(f"---请求体为:{payload_c}\n")
response = requests.request(method=requestType_c, url=api_url, headers=HEADERS, data=payload_c)
rspJson = response.json()
LOGGER.info(f"---接口返回状态码为:: {response.status_code}")
LOGGER.info(f"---接口返回体为:: {json.dumps(response.json(), indent=4, ensure_ascii=False)}\n")
if "查询广告" not in caseName_c:
if "草稿" in caseName_c or "复制" in caseName_c:
ActID = rspJson["data"]['id']
writeResult = Txt().append_write_txt("06_ADDrawID.txt", str(ActID))
if not writeResult:
LOGGER.error(f"---写入 广告管理(新建)ID 失败!")
else:
ActID = rspJson["data"]['id']
writeResult = Txt().append_write_txt("06_CreateADID.txt", str(ActID))
if not writeResult:
LOGGER.error(f"---写入 广告管理(草稿)ID 失败!")
else:
# 编辑、删除
if "修改广告" in caseName_c:
# 取一个已发布的广告ID
ActIDList = Txt().read_txt("06_CreateADID.txt")
ActID = ActIDList[0]
LOGGER.info(f"---存为草稿的 广告管理ID 为:{ActID}")
if ActID not in [None, '']:
api_url = api_env[ProCfgData['ExecutionEnv']] + ManagementPlatformApi_zh[caseModule_c] # 请求地址
last_api_url = api_url.replace("{ADsID}", ActID)
LOGGER.info(f"---请求接口为:{last_api_url}")
payload_c = json.dumps(eval(data['Payload']), indent=4, ensure_ascii=False) # 请求体为json格式
LOGGER.info(f"---请求体为:{payload_c}\n")
time.sleep(0.5)
response = requests.request(method=requestType_c, url=last_api_url, headers=HEADERS, data=payload_c)
rspJson = response.json()
LOGGER.info(f"---接口返回状态码为:: {response.status_code}")
LOGGER.info(f"---接口返回体为:: {json.dumps(response.json(), indent=4, ensure_ascii=False)}\n")
else:
if requestType_c == "PUT":
ActIDList = Txt().read_txt("06_CreateADID.txt")
LOGGER.info(f"---新建并发布的 广告管理ID list为{ActIDList}")
else:
ActIDList = (Txt().read_txt("06_CreateADID.txt") +
Txt().read_txt("06_ADDrawID.txt"))
LOGGER.info(f"---存为草稿&复制的 广告管理ID list为{ActIDList}")
for i in range(len(ActIDList)):
# 暂停、重启、结束、删除
ActId = ActIDList[i]
if ActId not in [None, '']:
api_url = api_env[ProCfgData['ExecutionEnv']] + ManagementPlatformApi_zh[caseModule_c] # 请求地址
last_api_url = api_url.replace("{ADsID}", str(ActId))
LOGGER.info(f"---请求接口为:{last_api_url}")
payload_c = json.dumps(eval(data['Payload']), indent=4, ensure_ascii=False) # 请求体为json格式
LOGGER.info(f"---请求体为:{payload_c}\n")
time.sleep(0.5)
response = requests.request(method=requestType_c, url=last_api_url, headers=HEADERS,
data=payload_c)
rspJson = response.json()
LOGGER.info(f"---接口返回状态码为:: {response.status_code}")
LOGGER.info(f"---接口返回体为:: {json.dumps(response.json(), indent=4, ensure_ascii=False)}\n")
''' step03- 断言处理 '''
assert _assert_result(rspJson, expRespJson_c, LOGGER) == True, AssertionError(f"---断言失败!")
LOGGER.info(f">>> {caseModule_c}接口请求 & 断言执行结束 <<<\n\n")
if __name__ == '__main__':
unittest.main(verbosity=2)

View File

@ -0,0 +1,100 @@
# !/usr/bin/python
# -*- coding: utf-8 -*-
import unittest,allure,ddt,time,json,requests,datetime
from datetime import datetime,timedelta
from configs.globalObj import LOGGER
from configs.globalParams import *
from YiMao.config.ApiInfo import *
from commons.FileHandler import ReadExcel,Txt
from commons.AssertLib import _assert_result
ExlFilePath = GlobalPath['YMDataPath'] + '/YMHT_ApiTestCase_0.0.1.xlsx'
ExcelContent = ReadExcel().get_data(FilePath=ExlFilePath, SheetName='07-版本管理')
@ddt.ddt
class Test_VersionManagement_Module(unittest.TestCase):
@allure.story("管理后台:版本管理模块")
@ddt.data(*ExcelContent)
def test_version_management(self, data):
'''@Date:: 2024/5/20
@Desc::
管理后台 -> 版本管理模块接口
'''
''' step01- 前置数据、接口信息描述等 '''
allure.dynamic.title(data['CaseName'])
allure.dynamic.description("描述:版本管理 模块接口自动化测试")
caseModule_c = data['BusinessModule'] # 用例编号
caseNO_c = data['NO.'] # 用例编号
caseName_c = data['CaseName'] # 用例名称
requestType_c = str(data['RequestType']) # 请求方式
expStaCode_c = data['ExpStatusCode'] # 预期的状态码
expRespJson_c = eval(data['RespJson']) # 预期的返回体
LOGGER.info(f">>> 开始执行->{caseModule_c}接口自动化测试 <<<\n")
LOGGER.info(f"---用例编号为:{caseNO_c}")
LOGGER.info(f"---用例名称为:{caseName_c}")
LOGGER.info(f"---请求方式为:{requestType_c}")
# LOGGER.info(f"---请求Header为{HEADERS}")
LOGGER.info(f"---预期接口返回码为:{expStaCode_c}")
LOGGER.info(f"---预期返回体为:{expRespJson_c}\n")
''' step02- 请求接口 '''
if requestType_c == 'POST':
# 新建、查询
api_url = api_env[ProCfgData['ExecutionEnv']] + ManagementPlatformApi_zh[caseModule_c] # 请求地址
LOGGER.info(f"---请求接口为:{api_url}")
payload_c = json.dumps(eval(data['Payload']), indent=4, ensure_ascii=False) # 请求体为json格式
LOGGER.info(f"---请求体为:{payload_c}\n")
response = requests.request(method=requestType_c, url=api_url, headers=HEADERS, data=payload_c)
rspJson = response.json()
LOGGER.info(f"---接口返回状态码为:: {response.status_code}")
LOGGER.info(f"---接口返回体为:: {json.dumps(response.json(), indent=4, ensure_ascii=False)}\n")
if "查询" not in caseName_c:
ActID = rspJson["data"]['id']
writeResult = Txt().append_write_txt("07_CreateVersionManagementID.txt", str(ActID))
if not writeResult:
LOGGER.error(f"---写入 版本管理(草稿)ID 失败!")
else:
if requestType_c == "PUT":
ActIDList = Txt().read_txt("07_CreateVersionManagementID.txt")
LOGGER.info(f"---新建并发布的 版本管理ID list为{ActIDList}")
ActID = ActIDList[0]
if ActID not in [None, '']:
api_url = api_env[ProCfgData['ExecutionEnv']] + ManagementPlatformApi_zh[caseModule_c] # 请求地址
last_api_url = api_url.replace("{VersionID}", ActID)
LOGGER.info(f"---请求接口为:{last_api_url}")
payload_c = json.dumps(eval(data['Payload']), indent=4, ensure_ascii=False) # 请求体为json格式
LOGGER.info(f"---请求体为:{payload_c}\n")
time.sleep(0.5)
response = requests.request(
method=requestType_c, url=last_api_url, headers=HEADERS, data=payload_c)
rspJson = response.json()
LOGGER.info(f"---接口返回状态码为:: {response.status_code}")
LOGGER.info(f"---接口返回体为:: {json.dumps(response.json(), indent=4, ensure_ascii=False)}\n")
else:
ActIDList = Txt().read_txt("07_CreateVersionManagementID.txt")
LOGGER.info(f"---新建并发布的 版本管理ID list为{ActIDList}")
for i in range(len(ActIDList)):
# 暂停、重启、结束、删除
ActId = ActIDList[i]
if ActId not in [None, '']:
api_url = api_env[ProCfgData['ExecutionEnv']] + ManagementPlatformApi_zh[caseModule_c] # 请求地址
last_api_url = api_url.replace("{VersionID}", str(ActId))
LOGGER.info(f"---请求接口为:{last_api_url}")
payload_c = json.dumps(eval(data['Payload']), indent=4, ensure_ascii=False) # 请求体为json格式
LOGGER.info(f"---请求体为:{payload_c}\n")
time.sleep(0.5)
response = requests.request(
method=requestType_c, url=last_api_url, headers=HEADERS, data=payload_c)
rspJson = response.json()
LOGGER.info(f"---接口返回状态码为:: {response.status_code}")
LOGGER.info(f"---接口返回体为:: {json.dumps(response.json(), indent=4, ensure_ascii=False)}\n")
# ''' step03- 断言处理 '''
assert _assert_result(rspJson, expRespJson_c, LOGGER) == True, AssertionError(f"---断言失败!")
LOGGER.info(f">>> {caseModule_c}接口请求 & 断言执行结束 <<<\n\n")
if __name__ == '__main__':
unittest.main(verbosity=2)

View File

@ -0,0 +1,106 @@
# !/usr/bin/python
# -*- coding: utf-8 -*-
import unittest,allure,ddt,time,json,requests,datetime
from datetime import datetime,timedelta
from configs.globalObj import LOGGER
from configs.globalParams import *
from YiMao.config.ApiInfo import *
from commons.FileHandler import ReadExcel,Txt
from commons.AssertLib import _assert_result
# ExlFilePath = GlobalPath['YMDataPath'] + '/YMHT_ApiTestCase_0.0.1.xlsx'
Content = ReadExcel().get_data(
FilePath=GlobalPath['YMDataPath'] + '/YMHT_ApiTestCase_0.0.1.xlsx',
SheetName='08-升级任务'
)
@ddt.ddt
class Test_UpgradeTask_Module(unittest.TestCase):
@allure.story("管理后台:升级任务模块")
@ddt.data(*Content)
def test_upgrade_task(self, data):
'''@Date:: 2024/5/20
@Desc::
管理后台 -> 升级任务模块接口
'''
''' step01- 前置数据、接口信息描述等 '''
allure.dynamic.title(data['CaseName'])
allure.dynamic.description("描述:升级任务 模块接口自动化测试")
caseModule_c = data['BusinessModule'] # 用例编号
caseNO_c = data['NO.'] # 用例编号
caseName_c = data['CaseName'] # 用例名称
requestType_c = str(data['RequestType']) # 请求方式
expStaCode_c = data['ExpStatusCode'] # 预期的状态码
expRespJson_c = eval(data['RespJson']) # 预期的返回体
LOGGER.info(f">>> 开始执行->{caseModule_c}接口自动化测试 <<<\n")
LOGGER.info(f"---用例编号为:{caseNO_c}")
LOGGER.info(f"---用例名称为:{caseName_c}")
LOGGER.info(f"---请求方式为:{requestType_c}")
# LOGGER.info(f"---请求Header为{HEADERS}")
LOGGER.info(f"---预期接口返回码为:{expStaCode_c}")
LOGGER.info(f"---预期返回体为:{expRespJson_c}\n")
''' step02- 请求接口 '''
if requestType_c == 'POST':
# 新建、查询
api_url = api_env[ProCfgData['ExecutionEnv']] + ManagementPlatformApi_zh[caseModule_c] # 请求地址
LOGGER.info(f"---请求接口为:{api_url}")
payload_c = json.dumps(eval(data['Payload']), indent=4, ensure_ascii=False) # 请求体为json格式
LOGGER.info(f"---请求体为:{payload_c}\n")
response = requests.request(method=requestType_c, url=api_url, headers=HEADERS, data=payload_c)
rspJson = response.json()
LOGGER.info(f"---接口返回状态码为:: {response.status_code}")
LOGGER.info(f"---接口返回体为:: {json.dumps(response.json(), indent=4, ensure_ascii=False)}\n")
if "查询" not in caseName_c:
ActID = rspJson["data"]['id']
writeResult = Txt().append_write_txt("08_CreateUpgradeTaskID.txt", str(ActID))
if not writeResult:
LOGGER.error(f"---写入 升级任务(草稿)ID 失败!")
else:
if requestType_c == "PUT":
# 编辑
ActIDList = Txt().read_txt("08_CreateUpgradeTaskID.txt")
LOGGER.info(f"---新建的 升级任务ID list为{ActIDList}")
ActID = ActIDList[0]
if ActID not in [None, '']:
api_url = api_env[ProCfgData['ExecutionEnv']] + ManagementPlatformApi_zh[caseModule_c] # 请求地址
last_api_url = api_url.replace("{UpgradeID}", ActID)
LOGGER.info(f"---请求接口为:{last_api_url}")
payload_c = json.dumps(eval(data['Payload']), indent=4, ensure_ascii=False) # 请求体为json格式
LOGGER.info(f"---请求体为:{payload_c}\n")
time.sleep(0.5)
response = requests.request(
method=requestType_c, url=last_api_url, headers=HEADERS, data=payload_c)
rspJson = response.json()
LOGGER.info(f"---接口返回状态码为:: {response.status_code}")
LOGGER.info(f"---接口返回体为:: {json.dumps(response.json(), indent=4, ensure_ascii=False)}\n")
else:
ActIDList = Txt().read_txt("08_CreateUpgradeTaskID.txt")
LOGGER.info(f"---新建的 升级任务ID list为{ActIDList}")
for i in range(len(ActIDList)):
# 启用、禁用、删除
ActId = ActIDList[i]
if ActId not in [None, '']:
api_url = api_env[ProCfgData['ExecutionEnv']] + ManagementPlatformApi_zh[caseModule_c] # 请求地址
last_api_url = api_url.replace("{UpgradeID}", str(ActId))
LOGGER.info(f"---请求接口为:{last_api_url}")
payload_c = json.dumps(eval(data['Payload']), indent=4, ensure_ascii=False) # 请求体为json格式
LOGGER.info(f"---请求体为:{payload_c}\n")
time.sleep(0.5)
response = requests.request(
method=requestType_c, url=last_api_url, headers=HEADERS, data=payload_c)
rspJson = response.json()
LOGGER.info(f"---接口返回状态码为:: {response.status_code}")
LOGGER.info(f"---接口返回体为:: {json.dumps(response.json(), indent=4, ensure_ascii=False)}\n")
''' step03- 断言处理 '''
assert _assert_result(rspJson, expRespJson_c, LOGGER) == True, AssertionError(f"---断言失败!")
LOGGER.info(f">>> {caseModule_c}接口请求 & 断言执行结束 <<<\n\n")
if __name__ == '__main__':
unittest.main(verbosity=2)

45
commons/AssertLib.py Normal file
View File

@ -0,0 +1,45 @@
# !/usr/bin/python
# -*- coding: utf-8 -*-
# @Author:: Arthur Wu
# @Date:: 2024/10/24-14:39
# @Description::
def _assert_result(RspJson, ExpRespJson_c, LOGGER):
try:
if RspJson["msg"] == "成功":
if ExpRespJson_c["msg"] == "成功":
FailedCase = []
if ExpRespJson_c["data"] not in ['', None, True, False, "true", "false"]:
if type(RspJson["data"]) == dict:
# 0302-校验返回体中的修改的参数值是否正确
for key, expect_value in ExpRespJson_c['data'].items():
return_value = RspJson['data'][key]
if expect_value == return_value:
LOGGER.info(f"---接口返回体字段校验成功!\n"
f"---待校验字段为:{key},预期值为:{expect_value},返回值为:{return_value}\n")
else:
LOGGER.error(f"---接口返回体字段校验失败!字段值不正确:\n"
f"---待校验字段为:{key},预期值为:{expect_value},返回值为:{return_value}\n")
FailedCase.append(f"字段-{key}-校验失败!")
if FailedCase==[]:
return True
else: return False
elif RspJson["msg"] == "失败":
if ExpRespJson_c["msg"] == "失败":
return True
else:
LOGGER.error(f"---接口请求成功! 校验失败!接口返回信息:{RspJson['msg']}")
return False
elif ExpRespJson_c["msg"] == "请求参数错误":
if RspJson["msg"] == "请求参数错误":
return True
else:
LOGGER.error(f"---接口请求成功! 校验失败!接口返回信息:{RspJson['msg']}")
return False
else:
LOGGER.error(f"---接口请求失败! 错误信息:{RspJson['msg']}")
return False
except AssertionError as e:
LOGGER.error(f"---断言失败!错误信息:{e}")
return False

View File

@ -1,8 +1,27 @@
# !/usr/bin/python
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
'''@Author:: Arthur Wu import os,sys,random,string
@Date:: 2024/10/8 import time, datetime
class Common():
def __init__(self):
self.timestamp = int(time.time())
self.Datetime = time.strftime('%Y%m%d%H%M%S', time.localtime(time.time()))
self.today_date = datetime.date.today().strftime('%Y-%m-%d')
self.rootpath = os.path.dirname(os.path.dirname(__file__))
def return_folder(self):
ReportPath = os.path.join(self.rootpath, 'YiMao/reports', str(self.Datetime)).replace("\\", "/")
if not os.path.exists(ReportPath):
os.makedirs(ReportPath)
return ReportPath + '/'
def delete_txt_files(self):
txtPath = os.path.join(self.rootpath, 'YiMao/ProcessData')
for root, dirs, files in os.walk(txtPath):
for file_name in files:
if file_name.endswith('.txt'):
file_path = os.path.join(root, file_name)
os.remove(file_path)
print(f"已删除: {file_path}")

107
commons/EngineX.py Normal file
View File

@ -0,0 +1,107 @@
# !/usr/bin/python
# -*- coding: utf-8 -*-
'''@Author:: Arthur Wu
@Date:: 2024/10/8
'''
import os,pytest,subprocess,json,shutil,time,requests
from configs.globalObj import *
from configs.globalParams import *
class Engine():
def __init__(self):
self.CasesSuite = []
self.rootpath = os.path.dirname(os.path.dirname(__file__))
self.FolderPath = ReturnFolder
self.projectCasesPath = str(os.path.join(GlobalPath['YMCaseScriptsPath']).replace("\\", "/"))
self.ReportName = "Reports"
def __execute(self):
'''@Author:: Arthur Wu
@Date:: 2024/5/9
:return:
'''
for dirpath, dirnames, filenames in os.walk(self.projectCasesPath):
for fn in filenames:
if fn.startswith("test_"):
caseScriptPath = os.path.join(dirpath, fn).replace("\\", "/")
self.CasesSuite.append(caseScriptPath)
if 0 == len(self.CasesSuite):
raise ValueError("---[ERROR] the test suite was empty !")
else:
pytest.main(args=[*self.CasesSuite, '-vsq', '--alluredir', self.FolderPath])
subprocess.Popen(
"allure generate " + self.FolderPath + ' -o ' + self.FolderPath + "html"
+ ' --workers=2 '+'--tests-per-worker=2',
shell=True,
stdout=subprocess.PIPE).stdout.read()
LOGGER.info("---[INFO] Allure html report generate complete !")
def __move_file(self):
rootpath = (os.getcwd()).replace("\\", "/")
pathli = rootpath.split("/")
LastReportPath = ''
PathList = self.FolderPath.split("/")
try:
LastReportPath = "D:/Program Files/AutoTestReports/"
shutil.move(self.FolderPath, LastReportPath)
except Exception as e:
LOGGER.error(f"---Move the html report file error: {e}")
return os.path.join(LastReportPath, PathList[-2]).replace("\\", "/")
def __delete_pycache_dirs(self, target_dir):
for root, dirs, files in os.walk(target_dir):
for dir_name in dirs:
if dir_name == '__pycache__':
dir_path = os.path.join(root, dir_name)
shutil.rmtree(dir_path)
print(f"已删除: {dir_path}")
def __delete_txt_files(self, target_dir):
for root, dirs, files in os.walk(target_dir):
for file_name in files:
if file_name.endswith('.txt'):
file_path = os.path.join(root, file_name)
os.remove(file_path)
print(f"已删除: {file_path}")
def run_test_suite(self):
txtPath = os.path.join(self.rootpath, "YiMao/ProcessData")
self.__delete_pycache_dirs(self.rootpath)
self.__delete_txt_files(txtPath)
self.__execute()
returnList = []
lastReportPath = self.__move_file()
returnList.append(lastReportPath)
self.__delete_txt_files(txtPath)
self.__delete_pycache_dirs(self.rootpath)
return returnList
class NotificationModule():
def __init__(self):
self.__headers = {'Content-Type': 'application/json;charset=utf-8'}
datainfo = {
"Debug": "7aedbee7239870e3e653748a2889d8bf063c61efa9213c7099bd57476066dc86",
"Formal": "80b026022a28166cfc9eebaf8f6a880cc06f56a14b8803e8d67e7fb3cb05844e"
}
self.urlInfo = f'https://oapi.dingtalk.com/robot/send?access_token={datainfo["Debug"]}'
# self.urlInfo = f'https://oapi.dingtalk.com/robot/send?access_token={datainfo["Formal"]}'
self.ReportUrl = ProCfgData["ReportsURL"]
self.ExecutionEnvironment = ProCfgData["ExecutionEnv"]
def send_msg(self, ResultPath):
ReportPath = os.path.join(ResultPath[0], 'html', 'index.html').replace('\\', '/')
reportUrlpath = self.ReportUrl + ReportPath.split("AutoTestReports")[1]
my_data = {
"msgtype": "markdown",
"markdown": {
"title": "亿猫管理后台接口自动化测试",
"text": f"#### 亿猫管理后台({self.ExecutionEnvironment})环境自动化测试执行结束: \n"
f">#### [自动化测试报告链接(请点击查看)]({reportUrlpath})\n>\n"
}
}
sess = requests.session()
sess.keep_alive = False
requests.post(url=self.urlInfo, data=json.dumps(my_data), headers=self.__headers, verify=False)
sess.close()

65
commons/FileHandler.py Normal file
View File

@ -0,0 +1,65 @@
# -*- coding: utf-8 -*-
import os, xlrd, yaml
from openpyxl import load_workbook
class YamlHandler():
'''@Author:: Arthur Wu
@Date:: 2024/5/8
@Description::
'''
def read_yaml(self, YamlFilePath, encoding='utf-8'):
try:
with open(YamlFilePath, encoding=encoding) as f:
return yaml.load(f.read(), Loader=yaml.FullLoader)
except Exception as e:
print("ERROR: read yaml file error::".format(e))
return 'failed to read avata autotest config file.'
class ReadExcel(object):
'''@Author:: Arthur Wu
@Date:: 2024/5/8
@Description::
'''
def get_data(self, FilePath, SheetName="Sheet1"):
data = load_workbook(FilePath)
table = data[SheetName]
nrows = table.max_row
ncols = table.max_column
if nrows > 1:
test_data = []
for row in range(2, nrows + 1):
sub_data = {}
for colum in range(1, ncols + 1):
sub_data[table.cell(1, colum).value] = table.cell(row, colum).value
test_data.append(sub_data)
return test_data
class Txt():
'''@Author:: Arthur Wu
@Date:: 2024/5/8
@Description::
'''
def __init__(self):
rootpath = os.path.dirname(os.path.dirname(__file__))
self.record_files_path = os.path.join(rootpath, 'YiMao/ProcessData').replace("\\", "/")
def read_txt(self, FileName):
filepath = os.path.join(self.record_files_path, FileName)
AllLineValue = []
if os.path.exists(filepath):
with open(filepath, "r", encoding='utf-8') as file:
for lineval in file:
newlineval = lineval.strip()
AllLineValue.append(newlineval)
return AllLineValue
def append_write_txt(self, FileName, Message):
try:
filepath = os.path.join(self.record_files_path, FileName)
with open(filepath, "a", encoding='UTF-8') as f:
f.write(Message + '\n')
return True
except Exception as e:
print("Error: append_write_txt error::" + str(e))
return False

32
commons/Logger.py Normal file
View File

@ -0,0 +1,32 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time :
# @Author :
# @File : Logger.py
# @Software: PyCharm
import logging, os, time
from time import localtime
class Singleton:
_instance = None
def __new__(cls, *args, **kwargs):
if cls._instance is None:
cls._instance = super(Singleton, cls).__new__(cls)
return cls._instance
def __init__(self, LogPath, FileName='YMAutotest'):
def __logging(LogPath, FileName):
datetime = time.strftime("%Y%m%d%H%M%S", localtime())
filepath = f'{LogPath}{FileName}_{datetime}.log'
controlshow = logging.StreamHandler()
controlshow.setLevel(logging.INFO)
logging.basicConfig(
filename=filepath,
filemode="w",
level=logging.INFO,
format='%(asctime)s %(filename)s [line:%(lineno)d] %(levelname)s——: %(message)s \n',
datefmt='%Y%m%d %H:%M:%S'
)
return logging
self.logging = __logging(LogPath, FileName)

97
commons/PgSqlLib.py Normal file
View File

@ -0,0 +1,97 @@
# -*- coding: utf-8 -*-
from sshtunnel import SSHTunnelForwarder
import psycopg2
class PGSQL():
def __init__(self):
ssh_host = 'ops.yimaogo.com'
ssh_username = 'yimaogo_test'
ssh_pwd = 'yimaogo!test@1234'
ssh_port = 22
db_host = '10.0.16.24'
db_port = 5432
db_dbname = 'yimaogo_test'
db_user = 'yimaogo_test'
db_password = 'yimaogo!test@1234'
self.ssh_server = SSHTunnelForwarder(
ssh_address_or_host=(ssh_host, ssh_port),
ssh_username=ssh_username,
ssh_password=ssh_pwd,
remote_bind_address=(db_host, db_port)
)
self.ssh_server.daemon_forward_servers = False
self.ssh_server.start()
self.conn = psycopg2.connect(
database=db_dbname,
user=db_user,
password=db_password,
host='127.0.0.1',
port=self.ssh_server.local_bind_port
)
self.cursor = self.conn.cursor()
def close(self):
self.cursor.close()
self.conn.close()
self.ssh_server.close()
def select_data(self, SQL):
self.cursor.execute(SQL)
# 获取查询结果
# rows = self.cursor.fetchone()
# print(rows)
rows = self.cursor.fetchall()
# print(rows)
# for row in rows:
# print(row)
return rows
def delete_data(self, SQL):
self.cursor.execute(SQL)
self.conn.commit()
def del_sample_activity_data(self, IDList=None):
import time
print('---开始删除派样活动自动化测试脏数据')
for SampActivityID in IDList:
# for SampActivityID in range(120, 140):
print(f'---待删除ID为{SampActivityID}')
selsqlstr = f'SELECT * FROM public.sample_activity WHERE id={SampActivityID};'
delsqlstr = [
f"DELETE FROM public.sample_activity_gift WHERE activity_id={SampActivityID};",
f"DELETE FROM public.sample_record WHERE activity_id={SampActivityID};",
f"DELETE FROM public.sample_activity_store WHERE activity_id={SampActivityID};"
f"DELETE FROM public.sample_activity WHERE id={SampActivityID};"
]
for s in delsqlstr:
time.sleep(0.5)
self.delete_data(s)
time.sleep(0.5)
self.select_data(selsqlstr)
self.close()
print("---删除 派样活动 脏数据结束!\n")
def del_coupon_activity_data(self, IDList=None):
try:
import time
print('---开始删除 优惠券活动 自动化测试脏数据')
for ActivityID in IDList:
# for ActivityID in range(251, 264):
print(f'---待删除ID为{ActivityID}')
selsqlstr = f'SELECT * FROM public.coupon_setting WHERE id={ActivityID};'
delsqlstr = [
f"DELETE FROM public.coupon_setting_distribution WHERE coupon_setting_id={ActivityID};",
f"DELETE FROM public.coupon_setting WHERE id={ActivityID};",
]
for s in delsqlstr:
time.sleep(0.5)
self.delete_data(s)
time.sleep(0.5)
self.select_data(selsqlstr)
self.close()
print("---删除派样活动脏数据结束!\n")
return "---[Successed]"
except Exception as e:
print(f"---删除派样活动脏数据失败: {e}\n")
return e

32
commons/SignatureYM.py Normal file
View File

@ -0,0 +1,32 @@
# !/usr/bin/python
# -*- coding: utf-8 -*-
from authlib.integrations.requests_client import OAuth2Session
class SignatureYM():
def __init__(self):
self.client_id = "95579765e4fe91af9962"
self.client_secret = "3dcb964ede5d455f1d54623dad5e496bff468f81"
self.redirect_uri = 'https://auth.yimaogo.com/api/callback'
self.authorization_base_url = 'https://auth.yimaogo.com/api/login/oauth/authorize'
self.token_url = 'https://auth.yimaogo.com/api/login/oauth/access_token'
def __tokens(self):
oauth = OAuth2Session(self.client_id, redirect_uri=self.redirect_uri)
authorization_url, state = oauth.create_authorization_url(self.authorization_base_url)
token_dict = oauth.fetch_token(self.token_url, authorization_response={}, client_secret=self.client_secret)
return token_dict["access_token"]
def return_headers(self):
tokens = self.__tokens()
headers = {
'Authorization': tokens,
'User-Agent': 'Apifox/1.0.0 (https://apifox.com)',
'Content-Type': 'application/json',
'Accept': '*/*',
'Host': 'api.test.yimaogo.com',
'Connection': 'keep-alive'
}
return headers

8
configs/Configs.yaml Normal file
View File

@ -0,0 +1,8 @@
#############################################
############### 测试环境配置文件 ###############
#############################################
# 1-执行环境:测试环境-test_env灰度环境-gray_env
ExecutionEnv: test_env
# 2-Reports URL
ReportsURL: http://192.168.1.165

8
configs/globalObj.py Normal file
View File

@ -0,0 +1,8 @@
# !/usr/bin/python
# -*- coding: utf-8 -*-
from commons.Logger import Singleton
from commons.Common import Common as c
ReturnFolder = c().return_folder()
LOGGER = Singleton(ReturnFolder).logging
LOGGER.info("---hello world!")

36
configs/globalParams.py Normal file
View File

@ -0,0 +1,36 @@
# !/usr/bin/python
# -*- coding: utf-8 -*-
import os,datetime,time
from datetime import datetime,timedelta
from commons.FileHandler import YamlHandler as yam
from commons.SignatureYM import SignatureYM
__RootPath = os.path.dirname(os.path.dirname(__file__)).replace("\\", "/")
__YamlFilePath = os.path.join(__RootPath, "configs/Configs.yaml").replace("\\", "/")
ProCfgData = yam().read_yaml(__YamlFilePath)
HEADERS = SignatureYM().return_headers()
GlobalPath = {
"CfgPath": os.path.join(__RootPath, 'configs').replace("\\", "/"),
"YMDataPath": os.path.join(__RootPath, 'YiMao/data').replace("\\", "/"),
"YMCfgPath": os.path.join(__RootPath, 'YiMao/config').replace("\\", "/"),
"YMCaseScriptsPath": os.path.join(__RootPath, 'YiMao/scripts').replace("\\", "/"),
"YMReportPath": os.path.join(__RootPath, 'YiMao/report').replace("\\", "/")
}
GlobalParams = {
"timeStamp": int(time.time()),
"todayDate": time.strftime('%Y-%m-%d'),
"todayDateDel1": (datetime.now() + timedelta(days=-1)).strftime('%Y-%m-%d'),
"todayDateAdd1": (datetime.now() + timedelta(days=1)).strftime('%Y-%m-%d'),
"todayDateAdd2": (datetime.now() + timedelta(days=2)).strftime('%Y-%m-%d'),
"todayDateAdd3": (datetime.now() + timedelta(days=3)).strftime('%Y-%m-%d'),
"todayDateAdd4": (datetime.now() + timedelta(days=4)).strftime('%Y-%m-%d'),
"todayDateAdd5": (datetime.now() + timedelta(days=5)).strftime('%Y-%m-%d'),
}

13
pytest.ini Normal file
View File

@ -0,0 +1,13 @@
[pytest]
testpaths = YiMao/
addopts = -p no:warnings
log_cli = True
markers =
webtest: Run the Avata Open API case
hello: Run the hello case
log_level = INFO
log_cli_level = INFO
log_format = %(asctime)s %(levelname)s %(message)s
log_cli_format = %(asctime)s %(levelname)s %(message)s
log_date_format = %H:%M:%S
log_cli_date_format = %H:%M:%S