auto_test_dev/YiMao/businessFunc/ServiceApiLib.py

119 lines
5.9 KiB
Python

# !/usr/bin/python
# -*- coding: utf-8 -*-
# @Author:: Arthur Wu
# @Date:: 2024/11/15-13:10
# @Description::
import requests,json, time, logging
from configs.globalParams import *
from commons.SignatureYM import SignatureYM
from commons.FileHandler import Txt
class YMServiceApi(object):
def __init__(self):
self.Domain = "https://api.test.yimaogo.com/"
self.headerss = SignatureYM().return_headers()
''' 1-广告模块 '''
def publish_ad(self, ADDetailList, MarketAndStoreDetail):
logging.info("========== [发布广告] ==========")
def __publish(ADDetail, MarketAndStoreDetail):
timeStamp = str(int(time.time()))
PublicParams = {
"status": 4, "name": "Auto"+timeStamp,
"agencyId": 3, "agencyName": "洪家班", "advertiserId": 3,
"advertiserName": "阿宝传媒", "adsUse": 1, "customerTag": 3,
"putRangeCycle": "[7,6,5,1,2,3,4]",
"addType": 1, "pricingType": 1, "standardPrice": "1", "minPrice": "1",
"putStart": GlobalParams["todayDate"], "putEnd": GlobalParams["todayDate"],
"putStartTime": "00:00:00", "putEndTime": "23:00:00"
}
payload = json.dumps(PublicParams | ADDetail | MarketAndStoreDetail)
url = self.Domain + "admin/ads"
response = requests.request("POST", url, headers=self.headerss, data=payload)
logging.info(f"-----------接口返回状态码:{response.status_code}")
logging.info(f"-----------接口返回数据:{response.json()}\n\n")
for adDetail in ADDetailList:
__publish(adDetail, MarketAndStoreDetail)
time.sleep(0.5)
def add_promotional_product_whitelist(self, GoodsInfo, MarketAndStoreInfo):
logging.info("========== [新增促销白名单] ==========")
payload = json.dumps({
"goodsType": 3,
"dataScope": 1,
"categoryCode": "",
"categoryName": "",
"barcode": GoodsInfo["GoodsInputcode"],
"goodsName": GoodsInfo["GoodsName"],
"storeId": MarketAndStoreInfo["StoreId"],
"marketId": MarketAndStoreInfo["MarketId"]
})
url = self.Domain + "admin/goods/promotion"
response = requests.request("POST", url, headers=self.headerss, data=payload)
logging.info(f"-----------接口返回状态码:{response.status_code}")
logging.info(f"-----------接口返回数据:{response.json()}\n\n")
if response.status_code == 200:
time.sleep(0.5)
rspJson = response.json()
goodsWhiteListID = rspJson["data"]['id']
writeResult = Txt().append_write_txt("ScenarioTest_GoodsWhiteListID.txt", str(goodsWhiteListID))
if not writeResult:
logging.error(f"---写入 新建促销商品白名单ID 失败!")
def delete_whitelist_goods(self):
logging.info("========== [删除促销白名单商品] ==========")
GoodsWhiteListIDList = Txt().read_txt("ScenarioTest_GoodsWhiteListID.txt")
for i in range(len(GoodsWhiteListIDList)):
if GoodsWhiteListIDList[i] not in [None, '']:
url = self.Domain + "admin/goods/promotion/{GoodsWhiteListID}".replace(
"{GoodsWhiteListID}", str(GoodsWhiteListIDList[i]))
logging.info(f"---删除促销白名单商品接口URL为:: {url}")
response = requests.request(method="DELETE", url=url, headers=HEADERS, data={})
logging.info(f"---接口返回状态码为:: {response.status_code}")
logging.info(f"---接口返回体为:: {response.json()}\n")
def add_weight_to_whitelist(self, GoodsInfo, MarketAndStoreInfo):
logging.info("========== [新增重量放通白名单] ==========")
payload = json.dumps({
"goodsType": 4,
"content": GoodsInfo["GoodsInputcode"],
"type": 1,
"dataScope": 1,
"reason": "自动化测试之场景测试",
"storeId": MarketAndStoreInfo["StoreId"],
"marketId": MarketAndStoreInfo["MarketId"]
})
url = self.Domain + "admin/goods/weight/promotion"
response = requests.request("POST", url, headers=self.headerss, data=payload)
logging.info(f"-----------接口返回状态码:{response.status_code}")
logging.info(f"-----------接口返回数据:{response.json()}\n\n")
if response.status_code == 200:
time.sleep(0.5)
rspJson = response.json()
weightWhiteListID = rspJson["data"]['id']
writeResult = Txt().append_write_txt("ScenarioTest_WeightWhiteListID.txt", str(weightWhiteListID))
if not writeResult:
logging.error(f"---写入 新建重量放通白名单ID 失败!")
def delete_weight_whitelist_goods(self):
logging.info("========== [删除重量放通白名单商品] ==========")
WeightWhiteListIDList = Txt().read_txt("ScenarioTest_WeightWhiteListID.txt")
for i in range(len(WeightWhiteListIDList)):
if WeightWhiteListIDList[i] not in [None, '']:
url = self.Domain + "admin/goods/weight/promotion/{WeightWhiteListID}".replace(
"{WeightWhiteListID}", str(WeightWhiteListIDList[i]))
logging.info(f"---删除重量放通白名单商品接口URL为:: {url}")
response = requests.request(method="DELETE", url=url, headers=self.headerss, data={})
logging.info(f"---接口返回状态码为:: {response.status_code}")
logging.info(f"---接口返回体为:: {response.json()}\n\n")
if __name__ == '__main__':
yms = YMServiceApi()
GoodsInfo = {"GoodsInputcode": "6924743915848"}
MarketAndStoreInfo = {"MarketId": 50, "StoreId": 29}
yms.add_weight_to_whitelist(GoodsInfo, MarketAndStoreInfo)
time.sleep(10)
yms.delete_weight_whitelist_goods()