from obs import ObsClient import obs import sys import os from tools.config import cfg import requests import argparse class uploadVideos: def __init__(self): self.obsClient, self.headers = self.InitObsClient() def InitObsClient(self): # 创建ObsClient实例 obsClient = ObsClient( access_key_id=cfg.obs_access_key_id, # 配置访问密钥ID secret_access_key=cfg.obs_secret_access_key, # 配置访问密钥 server=cfg.obs_server) # 配置服务器地址 headers = obs.SetObjectMetadataHeader() # 设置对象元数据头 headers.cacheControl = "no-cache" # 设置缓存控制为不缓存 return obsClient, headers # 返回ObsClient实例和元数据头 def upload(self, video_name, video_dir=None): # 检查是否已初始化ObsClient实例 if not self.obsClient: raise ValueError("请先初始化ObsClient实例") class uploadResult: def __init__(self, video_path=None, squenceId=None): self.video_path = video_path self.squenceId = squenceId uploadRes = uploadResult() # 解析视频名称获取相关信息 information = video_name.split('.')[0].split('_') action_category = cfg.action_type[information[-1]] # 动作类别 camera_id = cfg.camera_id[information[-3]] # 摄像头ID recognize_result = cfg.recognize_result[information[0]] # 识别结果 time = information[1].split('-')[0] # 时间 squenceId = information[1] # 构建OSS对象键 objectkey = os.path.join(cfg.obs_root_dir, time, action_category, camera_id, recognize_result, video_name) if video_dir is None: file_path = os.sep.join([cfg.save_videos_dir, squenceId, video_name]) # 本地文件路径 else: file_path = os.sep.join([video_dir, squenceId, video_name]) # 上传文件到OSS resp = self.obsClient.putFile(cfg.obs_bucketName, objectkey, file_path) uploadRes.video_path = resp['body']['objectUrl'] uploadRes.squenceId = squenceId os.remove(file_path) return uploadRes def get_information(self, video_squence, camera_type): """获取视频信息""" videos_path = [] videos_dir = os.sep.join([cfg.save_videos_dir, video_squence]) for video_name in os.listdir(videos_dir): if video_squence in video_name: if camera_type == video_name.split('_')[-3]: # 摄像头位置ID videos_path.append(self.upload(video_name).video_path) elif camera_type == '2': videos_path.append(self.upload(video_name).video_path) return {"videos_path": videos_path} @staticmethod def exception_action(queueImgs): """删除视频""" # 解析视频名称获取相关信息 video_squence = queueImgs['videoIds'].split(',')[0] for video_name in os.listdir(cfg.save_videos_dir): if video_squence in video_name: os.rename(os.sep.join([cfg.save_videos_dir, video_name]), os.sep.join([cfg.save_videos_dir, '03_' + video_name])) class VideoUploader: @staticmethod def read_config_file(file_path): """安全地读取配置文件""" try: with open(file_path, 'r') as file: lines = file.readlines() except IOError as e: print(f"读取配置文件错误: {e}") return [] return [line.strip() for line in lines if line.strip() != ''] @staticmethod def upload_videos_for_ids(video_ids, video_dir): """根据ID上传视频""" tempdata = [] print('----------->', video_dir) for root, dirs, files in os.walk(video_dir): for name in files: print(name) name_s = name.split('.')[0] # 避免重复分割 parts = name_s.split('_') if len(parts) < 7: continue for data in video_ids: if parts[-1] == data['action'] and (parts[-3] == data['type'] or data['type'] == '2'): try: upload_rs = uploadVideos().upload(name, video_dir) if upload_rs: video_path = upload_rs.video_path sequence_id = upload_rs.squenceId tempdata.append({ "squenceId": sequence_id, "video_path": [video_path] }) break # 找到匹配项即跳出循环 except Exception as e: print(f"上传视频 {name} 时出错: {e}") return tempdata @staticmethod def timedUpload(rootPth = '/home/lc/project/ieemoo'): """定时上传视频""" storidPth = os.sep.join([rootPth, 'tools', 'storeId.txt']) save_videos_dir = os.sep.join([rootPth, 'videos']) config_lines = VideoUploader.read_config_file(storidPth) if not config_lines: print("未找到有效配置。") print('配置行 --- >', config_lines) soreid_list = [{"storeId": line} for line in config_lines] try: rep = requests.post(url=cfg.get_config_url, data=soreid_list[0]) rep.raise_for_status() # 检查响应状态 video_ids = rep.json().get('data', []) except requests.RequestException as e: print(f"获取配置信息失败: {e}") if video_ids: tempdata = VideoUploader.upload_videos_for_ids(video_ids, save_videos_dir) if tempdata: tmpdata = {'videosPth': str(tempdata)} try: requests.post(url=cfg.push_url, data=tmpdata) print("推送数据成功") except requests.RequestException as e: print(f"推送数据失败: {e}") else: tmpdata = {'videosPth': str([])} try: requests.post(url=cfg.push_url, data=tmpdata) except requests.RequestException as e: print(f"空数据推送失败: {e}") # print(tmpdata) if __name__ == '__main__': VideoUploader.timedUpload()