273 lines
14 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# !/usr/bin/python
# -*- coding: utf-8 -*-
# @Author:: Arthur Wu
# @Date:: 2024/11/15-13:10
# @Description::
import requests,json, time, logging
from configs.globalParams import *
from commons.SignatureYM import SignatureYM
from commons.FileHandler import Txt
class YMServiceApi(object):
def __init__(self):
self.Domain = "https://api.test.yimaogo.com/"
self.headerss = SignatureYM().return_headers()
def publish_ad(self, ADDetailList, MarketAndStoreDetail):
logging.info("========== [发布广告] ==========")
def __publish(ADDetail, MarketAndStoreDetail):
timeStamp = str(int(time.time()))
PublicParams = {
"status": 4, "name": "Auto"+timeStamp,
"agencyId": 3, "agencyName": "洪家班", "advertiserId": 3,
"advertiserName": "阿宝传媒", "adsUse": 1, "customerTag": 3,
"putRangeCycle": "[7,6,5,1,2,3,4]",
"addType": 1, "pricingType": 1, "standardPrice": "1", "minPrice": "1",
"putStart": GlobalParams["todayDate"], "putEnd": GlobalParams["todayDate"],
"putStartTime": "00:00:00", "putEndTime": "23:00:00"
}
payload = json.dumps(PublicParams | ADDetail | MarketAndStoreDetail)
url = self.Domain + "admin/ads"
response = requests.request("POST", url, headers=self.headerss, data=payload)
logging.info(f"-----------接口返回状态码:{response.status_code}")
logging.info(f"-----------接口返回数据:{response.json()}\n\n")
for adDetail in ADDetailList:
__publish(adDetail, MarketAndStoreDetail)
time.sleep(0.5)
def add_promotional_product_whitelist(self, GoodsInfo, MarketAndStoreInfo):
logging.info("========== [新增促销白名单] ==========")
payload = json.dumps({
"goodsType": 3,
"dataScope": 1,
"categoryCode": "",
"categoryName": "",
"barcode": GoodsInfo["GoodsInputcode"],
"goodsName": GoodsInfo["GoodsName"],
"storeId": MarketAndStoreInfo["StoreId"],
"marketId": MarketAndStoreInfo["MarketId"]
})
url = self.Domain + "admin/goods/promotion"
response = requests.request("POST", url, headers=self.headerss, data=payload)
logging.info(f"-----------接口返回状态码:{response.status_code}")
logging.info(f"-----------接口返回数据:{response.json()}\n\n")
if response.status_code == 200:
time.sleep(0.5)
rspJson = response.json()
goodsWhiteListID = rspJson["data"]['id']
writeResult = Txt().append_write_txt("ScenarioTest_GoodsWhiteListID.txt", str(goodsWhiteListID))
if not writeResult:
logging.error(f"---写入 新建促销商品白名单ID 失败!")
return response.json()
def delete_whitelist_goods(self):
logging.info("========== [删除促销白名单商品] ==========")
GoodsWhiteListIDList = Txt().read_txt("ScenarioTest_GoodsWhiteListID.txt")
for i in range(len(GoodsWhiteListIDList)):
if GoodsWhiteListIDList[i] not in [None, '']:
url = self.Domain + "admin/goods/promotion/{GoodsWhiteListID}".replace(
"{GoodsWhiteListID}", str(GoodsWhiteListIDList[i]))
logging.info(f"---删除促销白名单商品接口URL为:: {url}")
response = requests.request(method="DELETE", url=url, headers=HEADERS, data={})
logging.info(f"---接口返回状态码为:: {response.status_code}")
logging.info(f"---接口返回体为:: {response.json()}\n")
def add_weight_to_whitelist(self, GoodsInfo, MarketAndStoreInfo):
logging.info("========== [新增重量放通白名单] ==========")
payload = json.dumps({
"goodsType": 4,
"content": GoodsInfo["GoodsInputcode"],
"type": 1,
"dataScope": 1,
"reason": "自动化测试之场景测试",
"storeId": MarketAndStoreInfo["StoreId"],
"marketId": MarketAndStoreInfo["MarketId"]
})
url = self.Domain + "admin/goods/weight/promotion"
response = requests.request("POST", url, headers=self.headerss, data=payload)
logging.info(f"-----------接口返回状态码:{response.status_code}")
logging.info(f"-----------接口返回数据:{response.json()}\n\n")
if response.status_code == 200:
time.sleep(0.5)
rspJson = response.json()
weightWhiteListID = rspJson["data"]['id']
writeResult = Txt().append_write_txt("ScenarioTest_WeightWhiteListID.txt", str(weightWhiteListID))
if not writeResult:
logging.error(f"---写入 新建重量放通白名单ID 失败!")
return response.json()
def delete_weight_whitelist_goods(self):
logging.info("========== [删除重量放通白名单商品] ==========")
WeightWhiteListIDList = Txt().read_txt("ScenarioTest_WeightWhiteListID.txt")
for i in range(len(WeightWhiteListIDList)):
if WeightWhiteListIDList[i] not in [None, '']:
url = self.Domain + "admin/goods/weight/promotion/{WeightWhiteListID}".replace(
"{WeightWhiteListID}", str(WeightWhiteListIDList[i]))
logging.info(f"---删除重量放通白名单商品接口URL为:: {url}")
response = requests.request(method="DELETE", url=url, headers=self.headerss, data={})
logging.info(f"---接口返回状态码为:: {response.status_code}")
logging.info(f"---接口返回体为:: {response.json()}\n\n")
def get_markets_of_this_cart(self):
logging.info("========== [获取购物车的商超信息] ==========")
url = self.Domain + "dmin/cart/list"
payload = json.dumps({
"page": 1,
"limit": 10,
"query": {
"order": {
"createdAt": "desc"
},
"where": {
"mac": {
"include": "b8:2d:28:04:c7:5c"
}
}
}
})
response = requests.request("POST", url, headers=self.headerss, data=payload)
logging.info(f"-----------接口返回状态码:{response.status_code}")
logging.info(f"-----------接口返回数据:{response.json()}\n\n")
return response.json()
def update_the_store_to_which_the_shopping_cart_belongs(self, CartId, CartMac, NewStoreId):
'''
NewStoreInfo={
"CartMac": "b8:2d:28:04:c7:5c",
"StoreId": 69,
}
购物车MAC地址"mac": "b8:2d:28:04:c7:5c",
亿猫超市-国秀广场:"storeId": 9017,
武商-梦时代:"storeId": 69,
家家悦-青岛乐客:"storeId": 9022,
中百-中百测试店:"storeId": 68,
永辉-龙湖天街:"storeId": 9010,
:param NewStoreInfo:
:return:
'''
logging.info("========== [更新购物车 所属门店 信息] ==========")
payload = json.dumps({
"id": int(CartId),
"mac": CartMac, #
"storeId": int(NewStoreId),
"cartModelId": 5,
"serialNum": "autoTest",
"storeCartNo": "ieemooTest_009",
"rfid": "autoTest",
"status": 1,
"motherboardType": "3568",
"activationDate": "2024-08-01"
})
url = self.Domain + f"admin/cart/{int(CartId)}"
response = requests.request("PUT", url, headers=self.headerss, data=payload)
logging.info(f"-----------接口返回状态码:{response.status_code}")
logging.info(f"-----------接口返回数据:{response.json()}\n\n")
return response.json()
def approved_abnormal_products(self, MarketAndStoreInfo):
logging.info("========== [审核通过异常商品] ==========")
auth_token_b = self.headerss["Authorization"]
print(f"-----------headerss:: \n{self.headerss}\n")
print(f"-----------auth_token_b:: \n{auth_token_b}\n")
print(f"-----------auth_token_b:: \n{type(auth_token_b)}\n")
# 1-查询
url = self.Domain + "admin/check_goods/list"
get_abnormal_list_payload = json.dumps({
"page": 1,
"limit": 10,
"query": {
"order": {"createdAt": "desc"},
"where": {
"marketId": {"eq": MarketAndStoreInfo["putMarketId"]},
"storeId": {"eq": MarketAndStoreInfo["putStoreId"][0]},
"sessionState": {"in": [1, 2, 3, 4, 5, 6, 7]},
"state": {"eq": 0}
}
}
})
abnormal_list_resp = requests.request("POST", url,
headers=self.headerss, data=get_abnormal_list_payload)
print(f"-----------查询异常待处理商品返回结果为:: \n{abnormal_list_resp.json()}\n\n")
logging.info(f"-----------接口返回状态码:{abnormal_list_resp.status_code}")
logging.info(f"-----------接口返回数据:{abnormal_list_resp.json()}\n\n")
if abnormal_list_resp.json()["data"]["data"] != []:
abnormal_id = abnormal_list_resp.json()["data"]["data"][0]["id"]
num_val = abnormal_list_resp.json()["data"]["data"][0]["num"]
# 2-审核通过
# url2 = self.Domain + (
url2 = ('https://api.test.yimaogo.com/' + (
"admin/check_goods/change/state/"
"{abnormal_id}?state=1&num={Num}&update=true&promotion=false"
# "{abnormal_id}"
).replace("{abnormal_id}", str(abnormal_id)).
replace("{Num}", str(num_val)))
# print(f"-----------headers:: \n{self.headerss}\n")
print(f"-----------url2:: \n{url2}\n")
try:
auth_token_b = self.headerss["Authorization"]
auth_token_c = 'Bearer eyJhbGciOiJSUzI1NiIsImtpZCI6ImNlcnQtYnVpbHQtaW4iLCJ0eXAiOiJKV1QifQ.eyJhdWQiOlsiOTU1Nzk3NjVlNGZlOTFhZjk5NjIiXSwiYXZhdGFyIjoiaHR0cHM6Ly9zdGF0aWMtbGVnYWN5LmRpbmd0YWxrLmNvbS9tZWRpYS9sQURQRDRQdkgxQ0Z6R1BOQWd6TkFlQV80ODBfNTI0LmpwZyIsImRpc3BsYXlOYW1lIjoi5ZC05Zu95b69IiwiZW1haWwiOiJ3dWd1b2h1aUBpZWVtb28uY29tIiwiZXhwIjoxNzMyODQzODQxLCJpYXQiOjE3MzIyMzkwNDEsImlkIjoiUTJwcVJlVjNQRUViaVM5OGlTaVA4ZUVDZ2lFaUUiLCJpc3MiOiJodHRwczovL2F1dGgueWltYW9nby5jb20iLCJqdGkiOiJhZG1pbi9iZjk0MDFhMi1mZDIxLTRjODgtOWZkMS0yNGVlZWM0MjdmMmYiLCJuYW1lIjoiMDI5NiIsIm5iZiI6MTczMjIzOTA0MSwibm9uY2UiOiIiLCJvd25lciI6ImllZW1vbyIsInNjb3BlIjoicHJvZmlsZSIsInN1YiI6IlEycHFSZVYzUEVFYmlTOThpU2lQOGVFQ2dpRWlFIiwidGFnIjoiIiwidG9rZW5UeXBlIjoiYWNjZXNzLXRva2VuIn0.MEXmbdk3QOitECq7K4qG5BOy0K7pecJxTQVNrJq8a5VllNLTmfnsCWs_UswiF4-hrf7KvMj71TKI3xcMLPZDPTYee4LKi3npXrSHtuF9yWef9xY3841KEDHUc_RwNMlqFukVWIH3c-TL3M5XVqvY_ccqxPVG00ArRuXemkFsF7YoOsBzqMBR5oZdTw4vdZBjv07vXPvX-D8oQ1AR2Ib4jZxifBCwGB00GqFrQaNKGTuzaEo_nAXiZ3Xd6GBO5GywBIZ6-60J9PSf4VTZzKdxM16DoQoRrZUzLgxVmuxYo1g9OrQt3FQEPH2GwiPRRNezZKOCJOkU5MY-ZbxAizK5OkFn96ozbkiTKDqlUfkPASZWWX2XgVTLdaoRVltBIGDI03OzOJgiN4ogmz4uOyfeAxmgNPYyFoCclS3rKyCAWNPthY4R5R-AmkZAr1Lpu2NkSlxpCAiDLuzITE2POaG2XFGpAfn2OYeSJWxJhx2twOSFwu4zG4-yGD_AiQKXuxpYpLQbFfIL3d3sJ9dM8Wom4nTdh4rQDWVT0ed1Pc77NxSqMWlAviszaGU_V9deY-JqmtOwvw4p7lmAmr4yeB_-7fdKTXfGyEj9zy863Mzj9zZlPVr-ZB6RjvgqI8YK8A1g-ggjRL5OFOwVt2SP59qrAkUfpgAQXj1vN_ias1LhlJU',
print(f"-----------auth_token_c:: \n{auth_token_b}\n")
print(f"-----------auth_token_c:: \n{type(auth_token_b)}\n")
# payload = ""
# headers = {
# 'Authorization': str(auth_token_c[0]),
# 'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36',
# 'authority': 'api.test.yimaogo.com',
# 'Accept': '*/*',
# 'Origin': 'https://dashboard.test.yimaogo.com',
# 'Referer': 'https://dashboard.test.yimaogo.com/',
# 'sec-ch-ua': '"Google Chrome";v="131", "Chromium";v="131", "Not_A Brand";v="24"',
# 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
# 'Date': 'Fri, 22 Nov 2024 07:28:39 GMT',
# 'server': 'APISIX/3.8.0',
# 'Content-Type': 'application/json',
# 'Host': 'api.test.yimaogo.com',
# 'Connection': 'keep-alive'
# }
# time.sleep(1)
# response = requests.request("GET", url2, headers=headers, data=payload)
# response.raise_for_status() # 检查请求是否成功
# logging.info(f"-----------接口返回状态码:{response.status_code}")
# logging.info(f"-----------接口返回数据:{response.json()}\n\n")
# print(f"-----------接口返回数据:\n{response.json()}\n\n")
except requests.exceptions.RequestException as e:
logging.error(f"请求失败: {e}")
# if response.json()["msg"] == '成功':
# logging.info("---审核通过异常商品成功!")
# else:
# logging.error("---审核通过异常商品失败!")
# else:
# logging.info("---未查询到待核验的异常商品!")
if __name__ == '__main__':
'''
1、购物车ID
170:f7:54:07:a6:c0 - 1214
1b8:2d:28:04:c7:5c - 1213
{'code': 2007, 'msg': '设备的mac,sn,rfid不允许重复', 'data': None}
亿猫超市-国秀广场-storeId: 9017,
武商-梦时代-storeId: 69,
中百-中百测试店-storeId: 68,
永辉-龙湖天街-storeId: 9010,
家家悦-青岛乐客-storeId: 9022,
'''
yms = YMServiceApi()
GoodsInfo = {"GoodsInputcode": "6924743915848"}
MarketAndStoreDetails = {"putMarketId": 50, "putStoreId": ["69"]}
CartId = 1214
CartMac = "70:f7:54:07:a6:c0"
NewStoreId = 68
resp = yms.update_the_store_to_which_the_shopping_cart_belongs(CartId, CartMac, NewStoreId)
print(f"-----------更新购物车所属门店信息返回结果为:: \n{resp}\n\n")