# -*- coding: utf-8 -*- from sshtunnel import SSHTunnelForwarder import psycopg2 class PGSQL(): def __init__(self): ssh_host = 'ops.yimaogo.com' ssh_username = 'yimaogo_test' ssh_pwd = 'yimaogo!test@1234' ssh_port = 22 db_host = '10.0.16.24' db_port = 5432 db_dbname = 'yimaogo_test' db_user = 'yimaogo_test' db_password = 'yimaogo!test@1234' self.ssh_server = SSHTunnelForwarder( ssh_address_or_host=(ssh_host, ssh_port), ssh_username=ssh_username, ssh_password=ssh_pwd, remote_bind_address=(db_host, db_port) ) self.ssh_server.daemon_forward_servers = False self.ssh_server.start() self.conn = psycopg2.connect( database=db_dbname, user=db_user, password=db_password, host='127.0.0.1', port=self.ssh_server.local_bind_port ) self.cursor = self.conn.cursor() def close(self): self.cursor.close() self.conn.close() self.ssh_server.close() def select_data(self, SQL): self.cursor.execute(SQL) # 获取查询结果 # rows = self.cursor.fetchone() # print(rows) rows = self.cursor.fetchall() # print(rows) # for row in rows: # print(row) return rows def delete_data(self, SQL): self.cursor.execute(SQL) self.conn.commit() def del_sample_activity_data(self, IDList=None): import time for SampActivityID in IDList: selsqlstr = f'SELECT * FROM public.sample_activity WHERE id={SampActivityID};' delsqlstr = [ f"DELETE FROM public.sample_activity_gift WHERE activity_id={SampActivityID};", f"DELETE FROM public.sample_record WHERE activity_id={SampActivityID};", f"DELETE FROM public.sample_activity_store WHERE activity_id={SampActivityID};" f"DELETE FROM public.sample_activity WHERE id={SampActivityID};" ] for s in delsqlstr: time.sleep(0.5) self.delete_data(s) time.sleep(0.5) self.select_data(selsqlstr) self.close() def del_coupon_activity_data(self, IDList=None): try: import time for ActivityID in IDList: selsqlstr = f'SELECT * FROM public.coupon_setting WHERE id={ActivityID};' delsqlstr = [ f"DELETE FROM public.coupon_setting_distribution WHERE coupon_setting_id={ActivityID};", f"DELETE FROM public.coupon_setting WHERE id={ActivityID};", ] for s in delsqlstr: time.sleep(0.5) self.delete_data(s) time.sleep(0.5) self.select_data(selsqlstr) self.close() return "---[Successed]" except Exception as e: return e