import os import uuid from datetime import datetime from typing import Optional, BinaryIO import oss2 from oss2.exceptions import OssError, NoSuchBucket, NoSuchKey from config import settings class OSSService: """阿里云OSS服务类""" def __init__(self): """初始化OSS客户端""" if not all([settings.oss_access_key_id, settings.oss_access_key_secret, settings.oss_bucket_name]): raise ValueError("OSS配置不完整,请检查环境变量") # 创建认证对象 self.auth = oss2.Auth(settings.oss_access_key_id, settings.oss_access_key_secret) # 创建Bucket对象 self.bucket = oss2.Bucket(self.auth, settings.oss_endpoint, settings.oss_bucket_name) # 验证bucket是否存在 try: self.bucket.get_bucket_info() except NoSuchBucket: raise ValueError(f"Bucket '{settings.oss_bucket_name}' 不存在") def generate_object_key(self, original_filename: str, folder: str = "uploads") -> str: """生成OSS对象键名""" # 获取文件扩展名 _, ext = os.path.splitext(original_filename) # 生成唯一文件名 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") unique_id = str(uuid.uuid4())[:8] filename = f"{timestamp}_{unique_id}{ext}" # 返回完整的对象键 return f"{folder}/{filename}" def upload_file(self, file_content: bytes, object_key: str, content_type: str = None) -> dict: """ 上传文件到OSS Args: file_content: 文件内容 object_key: OSS对象键 content_type: 文件MIME类型 Returns: 包含上传结果的字典 """ try: # 设置文件头信息 headers = {} if content_type: headers['Content-Type'] = content_type # 上传文件 result = self.bucket.put_object(object_key, file_content, headers=headers) # 构造文件URL file_url = f"https://{settings.oss_bucket_name}.{settings.oss_endpoint.replace('http://', '').replace('https://', '')}/{object_key}" return { "success": True, "message": "文件上传成功", "object_key": object_key, "file_url": file_url, "etag": result.etag, "request_id": result.request_id } except OssError as e: return { "success": False, "message": f"上传失败: {e}", "error_code": e.code if hasattr(e, 'code') else 'Unknown' } def delete_file(self, object_key: str) -> dict: """ 删除OSS文件 Args: object_key: OSS对象键 Returns: 删除结果字典 """ try: # 首先检查文件是否存在 try: self.bucket.head_object(object_key) except NoSuchKey: return { "success": False, "message": f"文件不存在: {object_key}", "error_code": "NoSuchKey" } # 文件存在,执行删除操作 result = self.bucket.delete_object(object_key) return { "success": True, "message": "文件删除成功", "object_key": object_key, "request_id": result.request_id } except OssError as e: return { "success": False, "message": f"删除失败: {e}", "error_code": e.code if hasattr(e, 'code') else 'Unknown' } def get_file_info(self, object_key: str) -> dict: """ 获取文件信息 Args: object_key: OSS对象键 Returns: 文件信息字典 """ try: head_result = self.bucket.head_object(object_key) return { "success": True, "object_key": object_key, "size": head_result.content_length, "last_modified": head_result.last_modified, "content_type": head_result.content_type, "etag": head_result.etag } except NoSuchKey: return { "success": False, "message": "文件不存在" } except OssError as e: return { "success": False, "message": f"获取文件信息失败: {e}" } def list_files(self, prefix: str = "", max_keys: int = 100) -> dict: """ 列出文件 Args: prefix: 对象键前缀 max_keys: 最大返回数量 Returns: 文件列表字典 """ try: files = [] for obj in oss2.ObjectIterator(self.bucket, prefix=prefix, max_keys=max_keys): # 安全地处理 last_modified 字段 last_modified_str = None if obj.last_modified: if hasattr(obj.last_modified, 'isoformat'): # 如果是 datetime 对象 last_modified_str = obj.last_modified.isoformat() elif isinstance(obj.last_modified, (int, float)): # 如果是时间戳,转换为 datetime 然后格式化 last_modified_str = datetime.fromtimestamp(obj.last_modified).isoformat() else: # 其他情况,尝试转换为字符串 last_modified_str = str(obj.last_modified) files.append({ "key": obj.key, "size": obj.size, "last_modified": last_modified_str, "etag": obj.etag }) return { "success": True, "files": files, "count": len(files) } except OssError as e: return { "success": False, "message": f"列出文件失败: {e}" } # 全局OSS服务实例 oss_service = OSSService()