# !/usr/bin/python # -*- coding: utf-8 -*- # @Author:: Arthur Wu # @Date:: 2024/11/15-13:10 # @Description:: import requests,json, time, logging from configs.globalParams import * from commons.SignatureYM import SignatureYM from commons.FileHandler import Txt class YMServiceApi(object): def __init__(self): self.Domain = "https://api.test.yimaogo.com/" self.headerss = SignatureYM().return_headers() def publish_ad(self, ADDetailList, MarketAndStoreDetail): logging.info("========== [发布广告] ==========") def __publish(ADDetail, MarketAndStoreDetail): timeStamp = str(int(time.time())) PublicParams = { "status": 4, "name": "Auto"+timeStamp, "agencyId": 3, "agencyName": "洪家班", "advertiserId": 3, "advertiserName": "阿宝传媒", "adsUse": 1, "customerTag": 3, "putRangeCycle": "[7,6,5,1,2,3,4]", "addType": 1, "pricingType": 1, "standardPrice": "1", "minPrice": "1", "putStart": GlobalParams["todayDate"], "putEnd": GlobalParams["todayDate"], "putStartTime": "00:00:00", "putEndTime": "23:00:00" } payload = json.dumps(PublicParams | ADDetail | MarketAndStoreDetail) url = self.Domain + "admin/ads" response = requests.request("POST", url, headers=self.headerss, data=payload) logging.info(f"-----------接口返回状态码:{response.status_code}") logging.info(f"-----------接口返回数据:{response.json()}\n\n") for adDetail in ADDetailList: __publish(adDetail, MarketAndStoreDetail) time.sleep(0.5) def add_promotional_product_whitelist(self, GoodsInfo, MarketAndStoreInfo): logging.info("========== [新增促销白名单] ==========") payload = json.dumps({ "goodsType": 3, "dataScope": 1, "categoryCode": "", "categoryName": "", "barcode": GoodsInfo["GoodsInputcode"], "goodsName": GoodsInfo["GoodsName"], "storeId": MarketAndStoreInfo["StoreId"], "marketId": MarketAndStoreInfo["MarketId"] }) url = self.Domain + "admin/goods/promotion" response = requests.request("POST", url, headers=self.headerss, data=payload) logging.info(f"-----------接口返回状态码:{response.status_code}") logging.info(f"-----------接口返回数据:{response.json()}\n\n") if response.status_code == 200: time.sleep(0.5) rspJson = response.json() goodsWhiteListID = rspJson["data"]['id'] writeResult = Txt().append_write_txt("ScenarioTest_GoodsWhiteListID.txt", str(goodsWhiteListID)) if not writeResult: logging.error(f"---写入 新建促销商品白名单ID 失败!") return response.json() def delete_whitelist_goods(self): logging.info("========== [删除促销白名单商品] ==========") GoodsWhiteListIDList = Txt().read_txt("ScenarioTest_GoodsWhiteListID.txt") for i in range(len(GoodsWhiteListIDList)): if GoodsWhiteListIDList[i] not in [None, '']: url = self.Domain + "admin/goods/promotion/{GoodsWhiteListID}".replace( "{GoodsWhiteListID}", str(GoodsWhiteListIDList[i])) logging.info(f"---删除促销白名单商品接口URL为:: {url}") response = requests.request(method="DELETE", url=url, headers=HEADERS, data={}) logging.info(f"---接口返回状态码为:: {response.status_code}") logging.info(f"---接口返回体为:: {response.json()}\n") def add_weight_to_whitelist(self, GoodsInfo, MarketAndStoreInfo): logging.info("========== [新增重量放通白名单] ==========") payload = json.dumps({ "goodsType": 4, "content": GoodsInfo["GoodsInputcode"], "type": 1, "dataScope": 1, "reason": "自动化测试之场景测试", "storeId": MarketAndStoreInfo["StoreId"], "marketId": MarketAndStoreInfo["MarketId"] }) url = self.Domain + "admin/goods/weight/promotion" response = requests.request("POST", url, headers=self.headerss, data=payload) logging.info(f"-----------接口返回状态码:{response.status_code}") logging.info(f"-----------接口返回数据:{response.json()}\n\n") if response.status_code == 200: time.sleep(0.5) rspJson = response.json() weightWhiteListID = rspJson["data"]['id'] writeResult = Txt().append_write_txt("ScenarioTest_WeightWhiteListID.txt", str(weightWhiteListID)) if not writeResult: logging.error(f"---写入 新建重量放通白名单ID 失败!") return response.json() def delete_weight_whitelist_goods(self): logging.info("========== [删除重量放通白名单商品] ==========") WeightWhiteListIDList = Txt().read_txt("ScenarioTest_WeightWhiteListID.txt") for i in range(len(WeightWhiteListIDList)): if WeightWhiteListIDList[i] not in [None, '']: url = self.Domain + "admin/goods/weight/promotion/{WeightWhiteListID}".replace( "{WeightWhiteListID}", str(WeightWhiteListIDList[i])) logging.info(f"---删除重量放通白名单商品接口URL为:: {url}") response = requests.request(method="DELETE", url=url, headers=self.headerss, data={}) logging.info(f"---接口返回状态码为:: {response.status_code}") logging.info(f"---接口返回体为:: {response.json()}\n\n") def get_markets_of_this_cart(self): logging.info("========== [获取购物车的商超信息] ==========") url = self.Domain + "dmin/cart/list" payload = json.dumps({ "page": 1, "limit": 10, "query": { "order": { "createdAt": "desc" }, "where": { "mac": { "include": "b8:2d:28:04:c7:5c" } } } }) response = requests.request("POST", url, headers=self.headerss, data=payload) logging.info(f"-----------接口返回状态码:{response.status_code}") logging.info(f"-----------接口返回数据:{response.json()}\n\n") return response.json() def update_the_store_to_which_the_shopping_cart_belongs(self, CartId, CartMac, NewStoreId): ''' NewStoreInfo={ "CartMac": "b8:2d:28:04:c7:5c", "StoreId": 69, } 购物车MAC地址:"mac": "b8:2d:28:04:c7:5c", 亿猫超市-国秀广场:"storeId": 9017, 武商-梦时代:"storeId": 69, 家家悦-青岛乐客:"storeId": 9022, 中百-中百测试店:"storeId": 68, 永辉-龙湖天街:"storeId": 9010, :param NewStoreInfo: :return: ''' logging.info("========== [更新购物车 所属门店 信息] ==========") payload = json.dumps({ "id": int(CartId), "mac": CartMac, # "storeId": int(NewStoreId), "cartModelId": 5, "serialNum": "autoTest", "storeCartNo": "ieemooTest_009", "rfid": "autoTest", "status": 1, "motherboardType": "3568", "activationDate": "2024-08-01" }) url = self.Domain + f"admin/cart/{int(CartId)}" response = requests.request("PUT", url, headers=self.headerss, data=payload) logging.info(f"-----------接口返回状态码:{response.status_code}") logging.info(f"-----------接口返回数据:{response.json()}\n\n") return response.json() def approved_abnormal_products(self, MarketAndStoreInfo): logging.info("========== [审核通过异常商品] ==========") auth_token_b = self.headerss["Authorization"] print(f"-----------headerss:: \n{self.headerss}\n") print(f"-----------auth_token_b:: \n{auth_token_b}\n") print(f"-----------auth_token_b:: \n{type(auth_token_b)}\n") # 1-查询 url = self.Domain + "admin/check_goods/list" get_abnormal_list_payload = json.dumps({ "page": 1, "limit": 10, "query": { "order": {"createdAt": "desc"}, "where": { "marketId": {"eq": MarketAndStoreInfo["putMarketId"]}, "storeId": {"eq": MarketAndStoreInfo["putStoreId"][0]}, "sessionState": {"in": [1, 2, 3, 4, 5, 6, 7]}, "state": {"eq": 0} } } }) abnormal_list_resp = requests.request("POST", url, headers=self.headerss, data=get_abnormal_list_payload) print(f"-----------查询异常待处理商品返回结果为:: \n{abnormal_list_resp.json()}\n\n") logging.info(f"-----------接口返回状态码:{abnormal_list_resp.status_code}") logging.info(f"-----------接口返回数据:{abnormal_list_resp.json()}\n\n") if abnormal_list_resp.json()["data"]["data"] != []: abnormal_id = abnormal_list_resp.json()["data"]["data"][0]["id"] num_val = abnormal_list_resp.json()["data"]["data"][0]["num"] # 2-审核通过 # url2 = self.Domain + ( url2 = ('https://api.test.yimaogo.com/' + ( "admin/check_goods/change/state/" "{abnormal_id}?state=1&num={Num}&update=true&promotion=false" # "{abnormal_id}" ).replace("{abnormal_id}", str(abnormal_id)). replace("{Num}", str(num_val))) # print(f"-----------headers:: \n{self.headerss}\n") print(f"-----------url2:: \n{url2}\n") try: auth_token_b = self.headerss["Authorization"] auth_token_c = 'Bearer eyJhbGciOiJSUzI1NiIsImtpZCI6ImNlcnQtYnVpbHQtaW4iLCJ0eXAiOiJKV1QifQ.eyJhdWQiOlsiOTU1Nzk3NjVlNGZlOTFhZjk5NjIiXSwiYXZhdGFyIjoiaHR0cHM6Ly9zdGF0aWMtbGVnYWN5LmRpbmd0YWxrLmNvbS9tZWRpYS9sQURQRDRQdkgxQ0Z6R1BOQWd6TkFlQV80ODBfNTI0LmpwZyIsImRpc3BsYXlOYW1lIjoi5ZC05Zu95b69IiwiZW1haWwiOiJ3dWd1b2h1aUBpZWVtb28uY29tIiwiZXhwIjoxNzMyODQzODQxLCJpYXQiOjE3MzIyMzkwNDEsImlkIjoiUTJwcVJlVjNQRUViaVM5OGlTaVA4ZUVDZ2lFaUUiLCJpc3MiOiJodHRwczovL2F1dGgueWltYW9nby5jb20iLCJqdGkiOiJhZG1pbi9iZjk0MDFhMi1mZDIxLTRjODgtOWZkMS0yNGVlZWM0MjdmMmYiLCJuYW1lIjoiMDI5NiIsIm5iZiI6MTczMjIzOTA0MSwibm9uY2UiOiIiLCJvd25lciI6ImllZW1vbyIsInNjb3BlIjoicHJvZmlsZSIsInN1YiI6IlEycHFSZVYzUEVFYmlTOThpU2lQOGVFQ2dpRWlFIiwidGFnIjoiIiwidG9rZW5UeXBlIjoiYWNjZXNzLXRva2VuIn0.MEXmbdk3QOitECq7K4qG5BOy0K7pecJxTQVNrJq8a5VllNLTmfnsCWs_UswiF4-hrf7KvMj71TKI3xcMLPZDPTYee4LKi3npXrSHtuF9yWef9xY3841KEDHUc_RwNMlqFukVWIH3c-TL3M5XVqvY_ccqxPVG00ArRuXemkFsF7YoOsBzqMBR5oZdTw4vdZBjv07vXPvX-D8oQ1AR2Ib4jZxifBCwGB00GqFrQaNKGTuzaEo_nAXiZ3Xd6GBO5GywBIZ6-60J9PSf4VTZzKdxM16DoQoRrZUzLgxVmuxYo1g9OrQt3FQEPH2GwiPRRNezZKOCJOkU5MY-ZbxAizK5OkFn96ozbkiTKDqlUfkPASZWWX2XgVTLdaoRVltBIGDI03OzOJgiN4ogmz4uOyfeAxmgNPYyFoCclS3rKyCAWNPthY4R5R-AmkZAr1Lpu2NkSlxpCAiDLuzITE2POaG2XFGpAfn2OYeSJWxJhx2twOSFwu4zG4-yGD_AiQKXuxpYpLQbFfIL3d3sJ9dM8Wom4nTdh4rQDWVT0ed1Pc77NxSqMWlAviszaGU_V9deY-JqmtOwvw4p7lmAmr4yeB_-7fdKTXfGyEj9zy863Mzj9zZlPVr-ZB6RjvgqI8YK8A1g-ggjRL5OFOwVt2SP59qrAkUfpgAQXj1vN_ias1LhlJU', print(f"-----------auth_token_c:: \n{auth_token_b}\n") print(f"-----------auth_token_c:: \n{type(auth_token_b)}\n") # payload = "" # headers = { # 'Authorization': str(auth_token_c[0]), # 'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36', # 'authority': 'api.test.yimaogo.com', # 'Accept': '*/*', # 'Origin': 'https://dashboard.test.yimaogo.com', # 'Referer': 'https://dashboard.test.yimaogo.com/', # 'sec-ch-ua': '"Google Chrome";v="131", "Chromium";v="131", "Not_A Brand";v="24"', # 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8', # 'Date': 'Fri, 22 Nov 2024 07:28:39 GMT', # 'server': 'APISIX/3.8.0', # 'Content-Type': 'application/json', # 'Host': 'api.test.yimaogo.com', # 'Connection': 'keep-alive' # } # time.sleep(1) # response = requests.request("GET", url2, headers=headers, data=payload) # response.raise_for_status() # 检查请求是否成功 # logging.info(f"-----------接口返回状态码:{response.status_code}") # logging.info(f"-----------接口返回数据:{response.json()}\n\n") # print(f"-----------接口返回数据:\n{response.json()}\n\n") except requests.exceptions.RequestException as e: logging.error(f"请求失败: {e}") # if response.json()["msg"] == '成功': # logging.info("---审核通过异常商品成功!") # else: # logging.error("---审核通过异常商品失败!") # else: # logging.info("---未查询到待核验的异常商品!") if __name__ == '__main__': ''' 1、购物车ID 1)70:f7:54:07:a6:c0 - 1214 1)b8:2d:28:04:c7:5c - 1213 {'code': 2007, 'msg': '设备的mac,sn,rfid不允许重复', 'data': None} 亿猫超市-国秀广场-storeId: 9017, 武商-梦时代-storeId: 69, 中百-中百测试店-storeId: 68, 永辉-龙湖天街-storeId: 9010, 家家悦-青岛乐客-storeId: 9022, ''' yms = YMServiceApi() GoodsInfo = {"GoodsInputcode": "6924743915848"} MarketAndStoreDetails = {"putMarketId": 50, "putStoreId": ["69"]} CartId = 1214 CartMac = "70:f7:54:07:a6:c0" NewStoreId = 68 resp = yms.update_the_store_to_which_the_shopping_cart_belongs(CartId, CartMac, NewStoreId) print(f"-----------更新购物车所属门店信息返回结果为:: \n{resp}\n\n")