# -*- coding: utf-8 -*- from sshtunnel import SSHTunnelForwarder import psycopg2 class PGSQL(): def __init__(self): ssh_host = 'ops.yimaogo.com' ssh_username = 'yimaogo_test' ssh_pwd = 'yimaogo!test@1234' ssh_port = 22 db_host = '10.0.16.24' db_port = 5432 db_dbname = 'yimaogo_test' db_user = 'yimaogo_test' db_password = 'yimaogo!test@1234' self.ssh_server = SSHTunnelForwarder( ssh_address_or_host=(ssh_host, ssh_port), ssh_username=ssh_username, ssh_password=ssh_pwd, remote_bind_address=(db_host, db_port) ) self.ssh_server.daemon_forward_servers = False self.ssh_server.start() self.conn = psycopg2.connect( database=db_dbname, user=db_user, password=db_password, host='127.0.0.1', port=self.ssh_server.local_bind_port ) self.cursor = self.conn.cursor() def close(self): self.cursor.close() self.conn.close() self.ssh_server.close() def select_data(self, SQL): self.cursor.execute(SQL) # 获取查询结果 # rows = self.cursor.fetchone() # print(rows) rows = self.cursor.fetchall() # print(rows) # for row in rows: # print(row) return rows def delete_data(self, SQL): self.cursor.execute(SQL) self.conn.commit() def del_sample_activity_data(self, IDList=None): import time print('---开始删除派样活动自动化测试脏数据') for SampActivityID in IDList: # for SampActivityID in range(120, 140): print(f'---待删除ID为:{SampActivityID}') selsqlstr = f'SELECT * FROM public.sample_activity WHERE id={SampActivityID};' delsqlstr = [ f"DELETE FROM public.sample_activity_gift WHERE activity_id={SampActivityID};", f"DELETE FROM public.sample_record WHERE activity_id={SampActivityID};", f"DELETE FROM public.sample_activity_store WHERE activity_id={SampActivityID};" f"DELETE FROM public.sample_activity WHERE id={SampActivityID};" ] for s in delsqlstr: time.sleep(0.5) self.delete_data(s) time.sleep(0.5) self.select_data(selsqlstr) self.close() print("---删除 派样活动 脏数据结束!\n") def del_coupon_activity_data(self, IDList=None): try: import time print('---开始删除 优惠券活动 自动化测试脏数据') for ActivityID in IDList: # for ActivityID in range(251, 264): print(f'---待删除ID为:{ActivityID}') selsqlstr = f'SELECT * FROM public.coupon_setting WHERE id={ActivityID};' delsqlstr = [ f"DELETE FROM public.coupon_setting_distribution WHERE coupon_setting_id={ActivityID};", f"DELETE FROM public.coupon_setting WHERE id={ActivityID};", ] for s in delsqlstr: time.sleep(0.5) self.delete_data(s) time.sleep(0.5) self.select_data(selsqlstr) self.close() print("---删除派样活动脏数据结束!\n") return "---[Successed]" except Exception as e: print(f"---删除派样活动脏数据失败: {e}\n") return e