121 lines
6.0 KiB
Python
121 lines
6.0 KiB
Python
# !/usr/bin/python
|
|
# -*- coding: utf-8 -*-
|
|
# @Author:: Arthur Wu
|
|
# @Date:: 2024/11/15-13:10
|
|
# @Description::
|
|
import requests,json, time, logging
|
|
from configs.globalParams import *
|
|
from commons.SignatureYM import SignatureYM
|
|
from commons.FileHandler import Txt
|
|
|
|
|
|
class YMServiceApi(object):
|
|
def __init__(self):
|
|
self.Domain = "https://api.test.yimaogo.com/"
|
|
self.headerss = SignatureYM().return_headers()
|
|
|
|
''' 1-广告模块 '''
|
|
def publish_ad(self, ADDetailList, MarketAndStoreDetail):
|
|
logging.info("========== [发布广告] ==========")
|
|
def __publish(ADDetail, MarketAndStoreDetail):
|
|
timeStamp = str(int(time.time()))
|
|
PublicParams = {
|
|
"status": 4, "name": "Auto"+timeStamp,
|
|
"agencyId": 3, "agencyName": "洪家班", "advertiserId": 3,
|
|
"advertiserName": "阿宝传媒", "adsUse": 1, "customerTag": 3,
|
|
"putRangeCycle": "[7,6,5,1,2,3,4]",
|
|
"addType": 1, "pricingType": 1, "standardPrice": "1", "minPrice": "1",
|
|
"putStart": GlobalParams["todayDate"], "putEnd": GlobalParams["todayDate"],
|
|
"putStartTime": "00:00:00", "putEndTime": "23:00:00"
|
|
}
|
|
payload = json.dumps(PublicParams | ADDetail | MarketAndStoreDetail)
|
|
url = self.Domain + "admin/ads"
|
|
response = requests.request("POST", url, headers=self.headerss, data=payload)
|
|
logging.info(f"-----------接口返回状态码:{response.status_code}")
|
|
logging.info(f"-----------接口返回数据:{response.json()}\n\n")
|
|
|
|
for adDetail in ADDetailList:
|
|
__publish(adDetail, MarketAndStoreDetail)
|
|
time.sleep(0.5)
|
|
|
|
def add_promotional_product_whitelist(self, GoodsInfo, MarketAndStoreInfo):
|
|
logging.info("========== [新增促销白名单] ==========")
|
|
payload = json.dumps({
|
|
"goodsType": 3,
|
|
"dataScope": 1,
|
|
"categoryCode": "",
|
|
"categoryName": "",
|
|
"barcode": GoodsInfo["GoodsInputcode"],
|
|
"goodsName": GoodsInfo["GoodsName"],
|
|
"storeId": MarketAndStoreInfo["StoreId"],
|
|
"marketId": MarketAndStoreInfo["MarketId"]
|
|
})
|
|
url = self.Domain + "admin/goods/promotion"
|
|
response = requests.request("POST", url, headers=self.headerss, data=payload)
|
|
logging.info(f"-----------接口返回状态码:{response.status_code}")
|
|
logging.info(f"-----------接口返回数据:{response.json()}\n\n")
|
|
if response.status_code == 200:
|
|
time.sleep(0.5)
|
|
rspJson = response.json()
|
|
goodsWhiteListID = rspJson["data"]['id']
|
|
writeResult = Txt().append_write_txt("ScenarioTest_GoodsWhiteListID.txt", str(goodsWhiteListID))
|
|
if not writeResult:
|
|
logging.error(f"---写入 新建促销商品白名单ID 失败!")
|
|
return response.json()
|
|
|
|
def delete_whitelist_goods(self):
|
|
logging.info("========== [删除促销白名单商品] ==========")
|
|
GoodsWhiteListIDList = Txt().read_txt("ScenarioTest_GoodsWhiteListID.txt")
|
|
for i in range(len(GoodsWhiteListIDList)):
|
|
if GoodsWhiteListIDList[i] not in [None, '']:
|
|
url = self.Domain + "admin/goods/promotion/{GoodsWhiteListID}".replace(
|
|
"{GoodsWhiteListID}", str(GoodsWhiteListIDList[i]))
|
|
logging.info(f"---删除促销白名单商品接口URL为:: {url}")
|
|
response = requests.request(method="DELETE", url=url, headers=HEADERS, data={})
|
|
logging.info(f"---接口返回状态码为:: {response.status_code}")
|
|
logging.info(f"---接口返回体为:: {response.json()}\n")
|
|
|
|
def add_weight_to_whitelist(self, GoodsInfo, MarketAndStoreInfo):
|
|
logging.info("========== [新增重量放通白名单] ==========")
|
|
payload = json.dumps({
|
|
"goodsType": 4,
|
|
"content": GoodsInfo["GoodsInputcode"],
|
|
"type": 1,
|
|
"dataScope": 1,
|
|
"reason": "自动化测试之场景测试",
|
|
"storeId": MarketAndStoreInfo["StoreId"],
|
|
"marketId": MarketAndStoreInfo["MarketId"]
|
|
})
|
|
url = self.Domain + "admin/goods/weight/promotion"
|
|
response = requests.request("POST", url, headers=self.headerss, data=payload)
|
|
logging.info(f"-----------接口返回状态码:{response.status_code}")
|
|
logging.info(f"-----------接口返回数据:{response.json()}\n\n")
|
|
if response.status_code == 200:
|
|
time.sleep(0.5)
|
|
rspJson = response.json()
|
|
weightWhiteListID = rspJson["data"]['id']
|
|
writeResult = Txt().append_write_txt("ScenarioTest_WeightWhiteListID.txt", str(weightWhiteListID))
|
|
if not writeResult:
|
|
logging.error(f"---写入 新建重量放通白名单ID 失败!")
|
|
return response.json()
|
|
|
|
def delete_weight_whitelist_goods(self):
|
|
logging.info("========== [删除重量放通白名单商品] ==========")
|
|
WeightWhiteListIDList = Txt().read_txt("ScenarioTest_WeightWhiteListID.txt")
|
|
for i in range(len(WeightWhiteListIDList)):
|
|
if WeightWhiteListIDList[i] not in [None, '']:
|
|
url = self.Domain + "admin/goods/weight/promotion/{WeightWhiteListID}".replace(
|
|
"{WeightWhiteListID}", str(WeightWhiteListIDList[i]))
|
|
logging.info(f"---删除重量放通白名单商品接口URL为:: {url}")
|
|
response = requests.request(method="DELETE", url=url, headers=self.headerss, data={})
|
|
logging.info(f"---接口返回状态码为:: {response.status_code}")
|
|
logging.info(f"---接口返回体为:: {response.json()}\n\n")
|
|
|
|
|
|
if __name__ == '__main__':
|
|
yms = YMServiceApi()
|
|
GoodsInfo = {"GoodsInputcode": "6924743915848"}
|
|
MarketAndStoreInfo = {"MarketId": 50, "StoreId": 29}
|
|
yms.add_weight_to_whitelist(GoodsInfo, MarketAndStoreInfo)
|
|
time.sleep(10)
|
|
yms.delete_weight_whitelist_goods() |