auto_test_dev/YiMao/businessFunc/ServiceApiLib.py

240 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# !/usr/bin/python
# -*- coding: utf-8 -*-
# @Author:: Arthur Wu
# @Date:: 2024/11/15-13:10
# @Description::
import requests,json, time, logging
from configs.globalParams import *
from commons.SignatureYM import SignatureYM
from commons.FileHandler import Txt
class YMServiceApi(object):
def __init__(self):
self.Domain = "https://api.test.yimaogo.com/"
self.headerss = SignatureYM().return_headers()
def publish_ad(self, ADDetailList, MarketAndStoreDetail):
logging.info("========== [发布广告] ==========")
def __publish(ADDetail, MarketAndStoreDetail):
timeStamp = str(int(time.time()))
PublicParams = {
"status": 4, "name": "Auto"+timeStamp,
"agencyId": 3, "agencyName": "洪家班", "advertiserId": 3,
"advertiserName": "阿宝传媒", "adsUse": 1, "customerTag": 3,
"putRangeCycle": "[7,6,5,1,2,3,4]",
"addType": 1, "pricingType": 1, "standardPrice": "1", "minPrice": "1",
"putStart": GlobalParams["todayDate"], "putEnd": GlobalParams["todayDate"],
"putStartTime": "00:00:00", "putEndTime": "23:00:00"
}
payload = json.dumps(PublicParams | ADDetail | MarketAndStoreDetail)
url = self.Domain + "admin/ads"
response = requests.request("POST", url, headers=self.headerss, data=payload)
logging.info(f"-----------接口返回状态码:{response.status_code}")
logging.info(f"-----------接口返回数据:{response.json()}\n\n")
for adDetail in ADDetailList:
__publish(adDetail, MarketAndStoreDetail)
time.sleep(0.5)
def add_promotional_product_whitelist(self, GoodsInfo, MarketAndStoreInfo):
logging.info("========== [新增促销白名单] ==========")
payload = json.dumps({
"goodsType": 3,
"dataScope": 1,
"categoryCode": "",
"categoryName": "",
"barcode": GoodsInfo["GoodsInputcode"],
"goodsName": GoodsInfo["GoodsName"],
"storeId": MarketAndStoreInfo["StoreId"],
"marketId": MarketAndStoreInfo["MarketId"]
})
url = self.Domain + "admin/goods/promotion"
response = requests.request("POST", url, headers=self.headerss, data=payload)
logging.info(f"-----------接口返回状态码:{response.status_code}")
logging.info(f"-----------接口返回数据:{response.json()}\n\n")
if response.status_code == 200:
time.sleep(0.5)
rspJson = response.json()
goodsWhiteListID = rspJson["data"]['id']
writeResult = Txt().append_write_txt("ScenarioTest_GoodsWhiteListID.txt", str(goodsWhiteListID))
if not writeResult:
logging.error(f"---写入 新建促销商品白名单ID 失败!")
return response.json()
def delete_whitelist_goods(self):
logging.info("========== [删除促销白名单商品] ==========")
GoodsWhiteListIDList = Txt().read_txt("ScenarioTest_GoodsWhiteListID.txt")
for i in range(len(GoodsWhiteListIDList)):
if GoodsWhiteListIDList[i] not in [None, '']:
url = self.Domain + "admin/goods/promotion/{GoodsWhiteListID}".replace(
"{GoodsWhiteListID}", str(GoodsWhiteListIDList[i]))
logging.info(f"---删除促销白名单商品接口URL为:: {url}")
response = requests.request(method="DELETE", url=url, headers=HEADERS, data={})
logging.info(f"---接口返回状态码为:: {response.status_code}")
logging.info(f"---接口返回体为:: {response.json()}\n")
def add_weight_to_whitelist(self, GoodsInfo, MarketAndStoreInfo):
logging.info("========== [新增重量放通白名单] ==========")
payload = json.dumps({
"goodsType": 4,
"content": GoodsInfo["GoodsInputcode"],
"type": 1,
"dataScope": 1,
"reason": "自动化测试之场景测试",
"storeId": MarketAndStoreInfo["StoreId"],
"marketId": MarketAndStoreInfo["MarketId"]
})
url = self.Domain + "admin/goods/weight/promotion"
response = requests.request("POST", url, headers=self.headerss, data=payload)
logging.info(f"-----------接口返回状态码:{response.status_code}")
logging.info(f"-----------接口返回数据:{response.json()}\n\n")
if response.status_code == 200:
time.sleep(0.5)
rspJson = response.json()
weightWhiteListID = rspJson["data"]['id']
writeResult = Txt().append_write_txt("ScenarioTest_WeightWhiteListID.txt", str(weightWhiteListID))
if not writeResult:
logging.error(f"---写入 新建重量放通白名单ID 失败!")
return response.json()
def delete_weight_whitelist_goods(self):
logging.info("========== [删除重量放通白名单商品] ==========")
WeightWhiteListIDList = Txt().read_txt("ScenarioTest_WeightWhiteListID.txt")
for i in range(len(WeightWhiteListIDList)):
if WeightWhiteListIDList[i] not in [None, '']:
url = self.Domain + "admin/goods/weight/promotion/{WeightWhiteListID}".replace(
"{WeightWhiteListID}", str(WeightWhiteListIDList[i]))
logging.info(f"---删除重量放通白名单商品接口URL为:: {url}")
response = requests.request(method="DELETE", url=url, headers=self.headerss, data={})
logging.info(f"---接口返回状态码为:: {response.status_code}")
logging.info(f"---接口返回体为:: {response.json()}\n\n")
def update_cart_info(self, NewStoreInfo):
'''
NewStoreInfo={
"CartMac": "b8:2d:28:04:c7:5c",
"StoreId": 69,
}
购物车MAC地址"mac": "b8:2d:28:04:c7:5c",
亿猫超市-国秀广场:"storeId": 9017,
武商-梦时代:"storeId": 69,
家家悦-青岛乐客:"storeId": 9022,
中百-中百测试店:"storeId": 68,
永辉-龙湖天街:"storeId": 9010,
:param NewStoreInfo:
:return:
'''
logging.info("========== [更新购物车信息] ==========")
payload = json.dumps({
"id": 1213,
"mac": NewStoreInfo["CartMac"], #
"storeId": NewStoreInfo["StoreId"],
"cartModelId": 5,
"serialNum": "MMAT3FC10100003101023491",
"storeCartNo": "QH97",
"rfid": "0104e26401440143010000001",
"status": 1,
"motherboardType": "3568",
"activationDate": "2024-08-01"
})
url = self.Domain + "admin/cart/1213"
response = requests.request("PUT", url, headers=self.headerss, data=payload)
logging.info(f"-----------接口返回状态码:{response.status_code}")
logging.info(f"-----------接口返回数据:{response.json()}\n\n")
return response.json()
def approved_abnormal_products(self, MarketAndStoreInfo):
logging.info("========== [审核通过异常商品] ==========")
auth_token_b = self.headerss["Authorization"]
print(f"-----------headerss:: \n{self.headerss}\n")
print(f"-----------auth_token_b:: \n{auth_token_b}\n")
print(f"-----------auth_token_b:: \n{type(auth_token_b)}\n")
# 1-查询
url = self.Domain + "admin/check_goods/list"
get_abnormal_list_payload = json.dumps({
"page": 1,
"limit": 10,
"query": {
"order": {"createdAt": "desc"},
"where": {
"marketId": {"eq": MarketAndStoreInfo["putMarketId"]},
"storeId": {"eq": MarketAndStoreInfo["putStoreId"][0]},
"sessionState": {"in": [1, 2, 3, 4, 5, 6, 7]},
"state": {"eq": 0}
}
}
})
abnormal_list_resp = requests.request("POST", url,
headers=self.headerss, data=get_abnormal_list_payload)
print(f"-----------查询异常待处理商品返回结果为:: \n{abnormal_list_resp.json()}\n\n")
logging.info(f"-----------接口返回状态码:{abnormal_list_resp.status_code}")
logging.info(f"-----------接口返回数据:{abnormal_list_resp.json()}\n\n")
if abnormal_list_resp.json()["data"]["data"] != []:
abnormal_id = abnormal_list_resp.json()["data"]["data"][0]["id"]
num_val = abnormal_list_resp.json()["data"]["data"][0]["num"]
# 2-审核通过
# url2 = self.Domain + (
url2 = ('https://api.test.yimaogo.com/' + (
"admin/check_goods/change/state/"
"{abnormal_id}?state=1&num={Num}&update=true&promotion=false"
# "{abnormal_id}"
).replace("{abnormal_id}", str(abnormal_id)).
replace("{Num}", str(num_val)))
# print(f"-----------headers:: \n{self.headerss}\n")
print(f"-----------url2:: \n{url2}\n")
try:
auth_token_b = self.headerss["Authorization"]
auth_token_c = 'Bearer eyJhbGciOiJSUzI1NiIsImtpZCI6ImNlcnQtYnVpbHQtaW4iLCJ0eXAiOiJKV1QifQ.eyJhdWQiOlsiOTU1Nzk3NjVlNGZlOTFhZjk5NjIiXSwiYXZhdGFyIjoiaHR0cHM6Ly9zdGF0aWMtbGVnYWN5LmRpbmd0YWxrLmNvbS9tZWRpYS9sQURQRDRQdkgxQ0Z6R1BOQWd6TkFlQV80ODBfNTI0LmpwZyIsImRpc3BsYXlOYW1lIjoi5ZC05Zu95b69IiwiZW1haWwiOiJ3dWd1b2h1aUBpZWVtb28uY29tIiwiZXhwIjoxNzMyODQzODQxLCJpYXQiOjE3MzIyMzkwNDEsImlkIjoiUTJwcVJlVjNQRUViaVM5OGlTaVA4ZUVDZ2lFaUUiLCJpc3MiOiJodHRwczovL2F1dGgueWltYW9nby5jb20iLCJqdGkiOiJhZG1pbi9iZjk0MDFhMi1mZDIxLTRjODgtOWZkMS0yNGVlZWM0MjdmMmYiLCJuYW1lIjoiMDI5NiIsIm5iZiI6MTczMjIzOTA0MSwibm9uY2UiOiIiLCJvd25lciI6ImllZW1vbyIsInNjb3BlIjoicHJvZmlsZSIsInN1YiI6IlEycHFSZVYzUEVFYmlTOThpU2lQOGVFQ2dpRWlFIiwidGFnIjoiIiwidG9rZW5UeXBlIjoiYWNjZXNzLXRva2VuIn0.MEXmbdk3QOitECq7K4qG5BOy0K7pecJxTQVNrJq8a5VllNLTmfnsCWs_UswiF4-hrf7KvMj71TKI3xcMLPZDPTYee4LKi3npXrSHtuF9yWef9xY3841KEDHUc_RwNMlqFukVWIH3c-TL3M5XVqvY_ccqxPVG00ArRuXemkFsF7YoOsBzqMBR5oZdTw4vdZBjv07vXPvX-D8oQ1AR2Ib4jZxifBCwGB00GqFrQaNKGTuzaEo_nAXiZ3Xd6GBO5GywBIZ6-60J9PSf4VTZzKdxM16DoQoRrZUzLgxVmuxYo1g9OrQt3FQEPH2GwiPRRNezZKOCJOkU5MY-ZbxAizK5OkFn96ozbkiTKDqlUfkPASZWWX2XgVTLdaoRVltBIGDI03OzOJgiN4ogmz4uOyfeAxmgNPYyFoCclS3rKyCAWNPthY4R5R-AmkZAr1Lpu2NkSlxpCAiDLuzITE2POaG2XFGpAfn2OYeSJWxJhx2twOSFwu4zG4-yGD_AiQKXuxpYpLQbFfIL3d3sJ9dM8Wom4nTdh4rQDWVT0ed1Pc77NxSqMWlAviszaGU_V9deY-JqmtOwvw4p7lmAmr4yeB_-7fdKTXfGyEj9zy863Mzj9zZlPVr-ZB6RjvgqI8YK8A1g-ggjRL5OFOwVt2SP59qrAkUfpgAQXj1vN_ias1LhlJU',
print(f"-----------auth_token_c:: \n{auth_token_b}\n")
print(f"-----------auth_token_c:: \n{type(auth_token_b)}\n")
# payload = ""
# headers = {
# 'Authorization': str(auth_token_c[0]),
# 'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36',
# 'authority': 'api.test.yimaogo.com',
# 'Accept': '*/*',
# 'Origin': 'https://dashboard.test.yimaogo.com',
# 'Referer': 'https://dashboard.test.yimaogo.com/',
# 'sec-ch-ua': '"Google Chrome";v="131", "Chromium";v="131", "Not_A Brand";v="24"',
# 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
# 'Date': 'Fri, 22 Nov 2024 07:28:39 GMT',
# 'server': 'APISIX/3.8.0',
# 'Content-Type': 'application/json',
# 'Host': 'api.test.yimaogo.com',
# 'Connection': 'keep-alive'
# }
# time.sleep(1)
# response = requests.request("GET", url2, headers=headers, data=payload)
# response.raise_for_status() # 检查请求是否成功
# logging.info(f"-----------接口返回状态码:{response.status_code}")
# logging.info(f"-----------接口返回数据:{response.json()}\n\n")
# print(f"-----------接口返回数据:\n{response.json()}\n\n")
except requests.exceptions.RequestException as e:
logging.error(f"请求失败: {e}")
# if response.json()["msg"] == '成功':
# logging.info("---审核通过异常商品成功!")
# else:
# logging.error("---审核通过异常商品失败!")
# else:
# logging.info("---未查询到待核验的异常商品!")
if __name__ == '__main__':
yms = YMServiceApi()
GoodsInfo = {"GoodsInputcode": "6924743915848"}
MarketAndStoreDetails = {"putMarketId": 50, "putStoreId": ["69"]}
NewStoreInfo = {
"CartMac": "b8:2d:28:04:c7:5c",
"StoreId": 68,
}
yms.update_cart_info(NewStoreInfo)