auto_test_dev/YiMao/businessFunc/ServiceApiLib.py

201 lines
11 KiB
Python

# !/usr/bin/python
# -*- coding: utf-8 -*-
# @Author:: Arthur Wu
# @Date:: 2024/11/15-13:10
# @Description::
import requests,json, time, logging
from configs.globalParams import *
from commons.SignatureYM import SignatureYM
from commons.FileHandler import Txt
class YMServiceApi(object):
def __init__(self):
self.Domain = "https://api.test.yimaogo.com/"
self.headerss = SignatureYM().return_headers()
def publish_ad(self, ADDetailList, MarketAndStoreDetail):
logging.info("========== [发布广告] ==========")
def __publish(ADDetail, MarketAndStoreDetail):
timeStamp = str(int(time.time()))
PublicParams = {
"status": 4, "name": "Auto"+timeStamp,
"agencyId": 3, "agencyName": "洪家班", "advertiserId": 3,
"advertiserName": "阿宝传媒", "adsUse": 1, "customerTag": 3,
"putRangeCycle": "[7,6,5,1,2,3,4]",
"addType": 1, "pricingType": 1, "standardPrice": "1", "minPrice": "1",
"putStart": GlobalParams["todayDate"], "putEnd": GlobalParams["todayDate"],
"putStartTime": "00:00:00", "putEndTime": "23:00:00"
}
payload = json.dumps(PublicParams | ADDetail | MarketAndStoreDetail)
url = self.Domain + "admin/ads"
response = requests.request("POST", url, headers=self.headerss, data=payload)
logging.info(f"-----------接口返回状态码:{response.status_code}")
logging.info(f"-----------接口返回数据:{response.json()}\n\n")
for adDetail in ADDetailList:
__publish(adDetail, MarketAndStoreDetail)
time.sleep(0.5)
def add_promotional_product_whitelist(self, GoodsInfo, MarketAndStoreInfo):
logging.info("========== [新增促销白名单] ==========")
payload = json.dumps({
"goodsType": 3,
"dataScope": 1,
"categoryCode": "",
"categoryName": "",
"barcode": GoodsInfo["GoodsInputcode"],
"goodsName": GoodsInfo["GoodsName"],
"storeId": MarketAndStoreInfo["StoreId"],
"marketId": MarketAndStoreInfo["MarketId"]
})
url = self.Domain + "admin/goods/promotion"
response = requests.request("POST", url, headers=self.headerss, data=payload)
logging.info(f"-----------接口返回状态码:{response.status_code}")
logging.info(f"-----------接口返回数据:{response.json()}\n\n")
if response.status_code == 200:
time.sleep(0.5)
rspJson = response.json()
goodsWhiteListID = rspJson["data"]['id']
writeResult = Txt().append_write_txt("ScenarioTest_GoodsWhiteListID.txt", str(goodsWhiteListID))
if not writeResult:
logging.error(f"---写入 新建促销商品白名单ID 失败!")
return response.json()
def delete_whitelist_goods(self):
logging.info("========== [删除促销白名单商品] ==========")
GoodsWhiteListIDList = Txt().read_txt("ScenarioTest_GoodsWhiteListID.txt")
for i in range(len(GoodsWhiteListIDList)):
if GoodsWhiteListIDList[i] not in [None, '']:
url = self.Domain + "admin/goods/promotion/{GoodsWhiteListID}".replace(
"{GoodsWhiteListID}", str(GoodsWhiteListIDList[i]))
logging.info(f"---删除促销白名单商品接口URL为:: {url}")
response = requests.request(method="DELETE", url=url, headers=HEADERS, data={})
logging.info(f"---接口返回状态码为:: {response.status_code}")
logging.info(f"---接口返回体为:: {response.json()}\n")
def add_weight_to_whitelist(self, GoodsInfo, MarketAndStoreInfo):
logging.info("========== [新增重量放通白名单] ==========")
payload = json.dumps({
"goodsType": 4,
"content": GoodsInfo["GoodsInputcode"],
"type": 1,
"dataScope": 1,
"reason": "自动化测试之场景测试",
"storeId": MarketAndStoreInfo["StoreId"],
"marketId": MarketAndStoreInfo["MarketId"]
})
url = self.Domain + "admin/goods/weight/promotion"
response = requests.request("POST", url, headers=self.headerss, data=payload)
logging.info(f"-----------接口返回状态码:{response.status_code}")
logging.info(f"-----------接口返回数据:{response.json()}\n\n")
if response.status_code == 200:
time.sleep(0.5)
rspJson = response.json()
weightWhiteListID = rspJson["data"]['id']
writeResult = Txt().append_write_txt("ScenarioTest_WeightWhiteListID.txt", str(weightWhiteListID))
if not writeResult:
logging.error(f"---写入 新建重量放通白名单ID 失败!")
return response.json()
def delete_weight_whitelist_goods(self):
logging.info("========== [删除重量放通白名单商品] ==========")
WeightWhiteListIDList = Txt().read_txt("ScenarioTest_WeightWhiteListID.txt")
for i in range(len(WeightWhiteListIDList)):
if WeightWhiteListIDList[i] not in [None, '']:
url = self.Domain + "admin/goods/weight/promotion/{WeightWhiteListID}".replace(
"{WeightWhiteListID}", str(WeightWhiteListIDList[i]))
logging.info(f"---删除重量放通白名单商品接口URL为:: {url}")
response = requests.request(method="DELETE", url=url, headers=self.headerss, data={})
logging.info(f"---接口返回状态码为:: {response.status_code}")
logging.info(f"---接口返回体为:: {response.json()}\n\n")
def approved_abnormal_products(self, MarketAndStoreInfo):
logging.info("========== [审核通过异常商品] ==========")
auth_token_b = self.headerss["Authorization"]
print(f"-----------headerss:: \n{self.headerss}\n")
print(f"-----------auth_token_b:: \n{auth_token_b}\n")
print(f"-----------auth_token_b:: \n{type(auth_token_b)}\n")
# 1-查询
url = self.Domain + "admin/check_goods/list"
get_abnormal_list_payload = json.dumps({
"page": 1,
"limit": 10,
"query": {
"order": {"createdAt": "desc"},
"where": {
"marketId": {"eq": MarketAndStoreInfo["putMarketId"]},
"storeId": {"eq": MarketAndStoreInfo["putStoreId"][0]},
"sessionState": {"in": [1, 2, 3, 4, 5, 6, 7]},
"state": {"eq": 0}
}
}
})
abnormal_list_resp = requests.request("POST", url,
headers=self.headerss, data=get_abnormal_list_payload)
print(f"-----------查询异常待处理商品返回结果为:: \n{abnormal_list_resp.json()}\n\n")
logging.info(f"-----------接口返回状态码:{abnormal_list_resp.status_code}")
logging.info(f"-----------接口返回数据:{abnormal_list_resp.json()}\n\n")
if abnormal_list_resp.json()["data"]["data"] != []:
abnormal_id = abnormal_list_resp.json()["data"]["data"][0]["id"]
num_val = abnormal_list_resp.json()["data"]["data"][0]["num"]
# 2-审核通过
# url2 = self.Domain + (
url2 = ('https://api.test.yimaogo.com/' + (
"admin/check_goods/change/state/"
"{abnormal_id}?state=1&num={Num}&update=true&promotion=false"
# "{abnormal_id}"
).replace("{abnormal_id}", str(abnormal_id)).
replace("{Num}", str(num_val)))
# print(f"-----------headers:: \n{self.headerss}\n")
print(f"-----------url2:: \n{url2}\n")
try:
auth_token_b = self.headerss["Authorization"]
auth_token_c = 'Bearer eyJhbGciOiJSUzI1NiIsImtpZCI6ImNlcnQtYnVpbHQtaW4iLCJ0eXAiOiJKV1QifQ.eyJhdWQiOlsiOTU1Nzk3NjVlNGZlOTFhZjk5NjIiXSwiYXZhdGFyIjoiaHR0cHM6Ly9zdGF0aWMtbGVnYWN5LmRpbmd0YWxrLmNvbS9tZWRpYS9sQURQRDRQdkgxQ0Z6R1BOQWd6TkFlQV80ODBfNTI0LmpwZyIsImRpc3BsYXlOYW1lIjoi5ZC05Zu95b69IiwiZW1haWwiOiJ3dWd1b2h1aUBpZWVtb28uY29tIiwiZXhwIjoxNzMyODQzODQxLCJpYXQiOjE3MzIyMzkwNDEsImlkIjoiUTJwcVJlVjNQRUViaVM5OGlTaVA4ZUVDZ2lFaUUiLCJpc3MiOiJodHRwczovL2F1dGgueWltYW9nby5jb20iLCJqdGkiOiJhZG1pbi9iZjk0MDFhMi1mZDIxLTRjODgtOWZkMS0yNGVlZWM0MjdmMmYiLCJuYW1lIjoiMDI5NiIsIm5iZiI6MTczMjIzOTA0MSwibm9uY2UiOiIiLCJvd25lciI6ImllZW1vbyIsInNjb3BlIjoicHJvZmlsZSIsInN1YiI6IlEycHFSZVYzUEVFYmlTOThpU2lQOGVFQ2dpRWlFIiwidGFnIjoiIiwidG9rZW5UeXBlIjoiYWNjZXNzLXRva2VuIn0.MEXmbdk3QOitECq7K4qG5BOy0K7pecJxTQVNrJq8a5VllNLTmfnsCWs_UswiF4-hrf7KvMj71TKI3xcMLPZDPTYee4LKi3npXrSHtuF9yWef9xY3841KEDHUc_RwNMlqFukVWIH3c-TL3M5XVqvY_ccqxPVG00ArRuXemkFsF7YoOsBzqMBR5oZdTw4vdZBjv07vXPvX-D8oQ1AR2Ib4jZxifBCwGB00GqFrQaNKGTuzaEo_nAXiZ3Xd6GBO5GywBIZ6-60J9PSf4VTZzKdxM16DoQoRrZUzLgxVmuxYo1g9OrQt3FQEPH2GwiPRRNezZKOCJOkU5MY-ZbxAizK5OkFn96ozbkiTKDqlUfkPASZWWX2XgVTLdaoRVltBIGDI03OzOJgiN4ogmz4uOyfeAxmgNPYyFoCclS3rKyCAWNPthY4R5R-AmkZAr1Lpu2NkSlxpCAiDLuzITE2POaG2XFGpAfn2OYeSJWxJhx2twOSFwu4zG4-yGD_AiQKXuxpYpLQbFfIL3d3sJ9dM8Wom4nTdh4rQDWVT0ed1Pc77NxSqMWlAviszaGU_V9deY-JqmtOwvw4p7lmAmr4yeB_-7fdKTXfGyEj9zy863Mzj9zZlPVr-ZB6RjvgqI8YK8A1g-ggjRL5OFOwVt2SP59qrAkUfpgAQXj1vN_ias1LhlJU',
print(f"-----------auth_token_c:: \n{auth_token_b}\n")
print(f"-----------auth_token_c:: \n{type(auth_token_b)}\n")
# payload = ""
# headers = {
# 'Authorization': str(auth_token_c[0]),
# 'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36',
# 'authority': 'api.test.yimaogo.com',
# 'Accept': '*/*',
# 'Origin': 'https://dashboard.test.yimaogo.com',
# 'Referer': 'https://dashboard.test.yimaogo.com/',
# 'sec-ch-ua': '"Google Chrome";v="131", "Chromium";v="131", "Not_A Brand";v="24"',
# 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
# 'Date': 'Fri, 22 Nov 2024 07:28:39 GMT',
# 'server': 'APISIX/3.8.0',
# 'Content-Type': 'application/json',
# 'Host': 'api.test.yimaogo.com',
# 'Connection': 'keep-alive'
# }
# time.sleep(1)
# response = requests.request("GET", url2, headers=headers, data=payload)
# response.raise_for_status() # 检查请求是否成功
# logging.info(f"-----------接口返回状态码:{response.status_code}")
# logging.info(f"-----------接口返回数据:{response.json()}\n\n")
# print(f"-----------接口返回数据:\n{response.json()}\n\n")
except requests.exceptions.RequestException as e:
logging.error(f"请求失败: {e}")
# if response.json()["msg"] == '成功':
# logging.info("---审核通过异常商品成功!")
# else:
# logging.error("---审核通过异常商品失败!")
# else:
# logging.info("---未查询到待核验的异常商品!")
if __name__ == '__main__':
yms = YMServiceApi()
GoodsInfo = {"GoodsInputcode": "6924743915848"}
MarketAndStoreDetails = {"putMarketId": 50, "putStoreId": ["69"]}
yms.approved_abnormal_products(MarketAndStoreDetails)