from obs import ObsClient import obs from utils.config import cfg from qcloud_cos import CosConfig,CosS3Client from qcloud_cos.cos_exception import CosClientError, CosServiceError from minio import Minio from minio.error import S3Error def obsInit(): print('start init obs <<') obsClient = ObsClient( access_key_id=cfg.obs_access_key_id, secret_access_key=cfg.obs_secret_access_key, server=cfg.obs_server) bucketName = cfg.obs_bucketName headers = obs.SetObjectMetadataHeader() headers.cacheControl = "no-cache" return obsClient, headers def cosInit(): print('start init cos <<') secret_id = cfg.cos_secret_id secret_key = cfg.cos_secret_key region = cfg.cos_region config = CosConfig(Region=region, Secret_id=secret_id, Secret_key=secret_key) client = CosS3Client(config) return client def s3Init(): print('start init s3 <<') secret_id = cfg.s3_secret_id secret_key = cfg.s3_secret_key region = cfg.s3_region client = Minio(region, secure=False, access_key=secret_id, secret_key=secret_key) return client class up_load: def __init__(self): if cfg.cos: self.cosclient = cosInit() if cfg.obs: self.obsclient, self.headers = obsInit() if cfg.s3: self.s3client = s3Init() def upLoad(self, key=None, datapath=None): # 使用高级接口断点续传,失败重试时不会上传已成功的分块(这里重试10次) response = 'get re' if cfg.cos: for i in range(0, 10): try: response = self.cosclient.upload_file( Bucket=cfg.cos_Bucket, Key=key, LocalFilePath=datapath) break except CosClientError or CosServiceError as e: print(e) if cfg.obs: response = self.obsclient.putFile(cfg.obs_bucketName, key, datapath) if cfg.s3: found = self.s3client.bucket_exists(cfg.s3_Bucket) if not found: self.s3client.make_bucket(cfg.s3_Bucket) self.s3client.fput_object(cfg.obs_bucketName, key, datapath) return response