# !/usr/bin/python # -*- coding: utf-8 -*- # @Author:: Arthur Wu # @Date:: 2024/11/15-13:10 # @Description:: import requests,json, time, logging from configs.globalParams import * from commons.SignatureYM import SignatureYM from commons.FileHandler import Txt class YMServiceApi(object): def __init__(self): self.Domain = "https://api.test.yimaogo.com/" self.headerss = SignatureYM().return_headers() ''' 1-广告模块 ''' def publish_ad(self, ADDetailList, MarketAndStoreDetail): logging.info("========== [发布广告] ==========") def __publish(ADDetail, MarketAndStoreDetail): timeStamp = str(int(time.time())) PublicParams = { "status": 4, "name": "Auto"+timeStamp, "agencyId": 3, "agencyName": "洪家班", "advertiserId": 3, "advertiserName": "阿宝传媒", "adsUse": 1, "customerTag": 3, "putRangeCycle": "[7,6,5,1,2,3,4]", "addType": 1, "pricingType": 1, "standardPrice": "1", "minPrice": "1", "putStart": GlobalParams["todayDate"], "putEnd": GlobalParams["todayDate"], "putStartTime": "00:00:00", "putEndTime": "23:00:00" } payload = json.dumps(PublicParams | ADDetail | MarketAndStoreDetail) url = self.Domain + "admin/ads" response = requests.request("POST", url, headers=self.headerss, data=payload) logging.info(f"-----------接口返回状态码:{response.status_code}") logging.info(f"-----------接口返回数据:{response.json()}\n\n") for adDetail in ADDetailList: __publish(adDetail, MarketAndStoreDetail) time.sleep(0.5) def add_promotional_product_whitelist(self, GoodsInfo, MarketAndStoreInfo): logging.info("========== [新增促销白名单] ==========") payload = json.dumps({ "goodsType": 3, "dataScope": 1, "categoryCode": "", "categoryName": "", "barcode": GoodsInfo["GoodsInputcode"], "goodsName": GoodsInfo["GoodsName"], "storeId": MarketAndStoreInfo["StoreId"], "marketId": MarketAndStoreInfo["MarketId"] }) url = self.Domain + "admin/goods/promotion" response = requests.request("POST", url, headers=self.headerss, data=payload) logging.info(f"-----------接口返回状态码:{response.status_code}") logging.info(f"-----------接口返回数据:{response.json()}\n\n") if response.status_code == 200: time.sleep(0.5) rspJson = response.json() goodsWhiteListID = rspJson["data"]['id'] writeResult = Txt().append_write_txt("ScenarioTest_GoodsWhiteListID.txt", str(goodsWhiteListID)) if not writeResult: logging.error(f"---写入 新建促销商品白名单ID 失败!") return response.json() def delete_whitelist_goods(self): logging.info("========== [删除促销白名单商品] ==========") GoodsWhiteListIDList = Txt().read_txt("ScenarioTest_GoodsWhiteListID.txt") for i in range(len(GoodsWhiteListIDList)): if GoodsWhiteListIDList[i] not in [None, '']: url = self.Domain + "admin/goods/promotion/{GoodsWhiteListID}".replace( "{GoodsWhiteListID}", str(GoodsWhiteListIDList[i])) logging.info(f"---删除促销白名单商品接口URL为:: {url}") response = requests.request(method="DELETE", url=url, headers=HEADERS, data={}) logging.info(f"---接口返回状态码为:: {response.status_code}") logging.info(f"---接口返回体为:: {response.json()}\n") def add_weight_to_whitelist(self, GoodsInfo, MarketAndStoreInfo): logging.info("========== [新增重量放通白名单] ==========") payload = json.dumps({ "goodsType": 4, "content": GoodsInfo["GoodsInputcode"], "type": 1, "dataScope": 1, "reason": "自动化测试之场景测试", "storeId": MarketAndStoreInfo["StoreId"], "marketId": MarketAndStoreInfo["MarketId"] }) url = self.Domain + "admin/goods/weight/promotion" response = requests.request("POST", url, headers=self.headerss, data=payload) logging.info(f"-----------接口返回状态码:{response.status_code}") logging.info(f"-----------接口返回数据:{response.json()}\n\n") if response.status_code == 200: time.sleep(0.5) rspJson = response.json() weightWhiteListID = rspJson["data"]['id'] writeResult = Txt().append_write_txt("ScenarioTest_WeightWhiteListID.txt", str(weightWhiteListID)) if not writeResult: logging.error(f"---写入 新建重量放通白名单ID 失败!") return response.json() def delete_weight_whitelist_goods(self): logging.info("========== [删除重量放通白名单商品] ==========") WeightWhiteListIDList = Txt().read_txt("ScenarioTest_WeightWhiteListID.txt") for i in range(len(WeightWhiteListIDList)): if WeightWhiteListIDList[i] not in [None, '']: url = self.Domain + "admin/goods/weight/promotion/{WeightWhiteListID}".replace( "{WeightWhiteListID}", str(WeightWhiteListIDList[i])) logging.info(f"---删除重量放通白名单商品接口URL为:: {url}") response = requests.request(method="DELETE", url=url, headers=self.headerss, data={}) logging.info(f"---接口返回状态码为:: {response.status_code}") logging.info(f"---接口返回体为:: {response.json()}\n\n") if __name__ == '__main__': yms = YMServiceApi() GoodsInfo = {"GoodsInputcode": "6924743915848"} MarketAndStoreInfo = {"MarketId": 50, "StoreId": 29} yms.add_weight_to_whitelist(GoodsInfo, MarketAndStoreInfo) time.sleep(10) yms.delete_weight_whitelist_goods()