198 lines
6.5 KiB
Python
198 lines
6.5 KiB
Python
import os
|
||
import uuid
|
||
from datetime import datetime
|
||
from typing import Optional, BinaryIO
|
||
import oss2
|
||
from oss2.exceptions import OssError, NoSuchBucket, NoSuchKey
|
||
from config import settings
|
||
|
||
|
||
class OSSService:
|
||
"""阿里云OSS服务类"""
|
||
|
||
def __init__(self):
|
||
"""初始化OSS客户端"""
|
||
if not all([settings.oss_access_key_id, settings.oss_access_key_secret, settings.oss_bucket_name]):
|
||
raise ValueError("OSS配置不完整,请检查环境变量")
|
||
|
||
# 创建认证对象
|
||
self.auth = oss2.Auth(settings.oss_access_key_id, settings.oss_access_key_secret)
|
||
|
||
# 创建Bucket对象
|
||
self.bucket = oss2.Bucket(self.auth, settings.oss_endpoint, settings.oss_bucket_name)
|
||
|
||
# 验证bucket是否存在
|
||
try:
|
||
self.bucket.get_bucket_info()
|
||
except NoSuchBucket:
|
||
raise ValueError(f"Bucket '{settings.oss_bucket_name}' 不存在")
|
||
|
||
def generate_object_key(self, original_filename: str, folder: str = "uploads") -> str:
|
||
"""生成OSS对象键名"""
|
||
# 获取文件扩展名
|
||
_, ext = os.path.splitext(original_filename)
|
||
|
||
# 生成唯一文件名
|
||
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
||
unique_id = str(uuid.uuid4())[:8]
|
||
filename = f"{timestamp}_{unique_id}{ext}"
|
||
|
||
# 返回完整的对象键
|
||
return f"{folder}/{filename}"
|
||
|
||
def upload_file(self, file_content: bytes, object_key: str, content_type: str = None) -> dict:
|
||
"""
|
||
上传文件到OSS
|
||
|
||
Args:
|
||
file_content: 文件内容
|
||
object_key: OSS对象键
|
||
content_type: 文件MIME类型
|
||
|
||
Returns:
|
||
包含上传结果的字典
|
||
"""
|
||
try:
|
||
# 设置文件头信息
|
||
headers = {}
|
||
if content_type:
|
||
headers['Content-Type'] = content_type
|
||
|
||
# 上传文件
|
||
result = self.bucket.put_object(object_key, file_content, headers=headers)
|
||
|
||
# 构造文件URL
|
||
file_url = f"https://{settings.oss_bucket_name}.{settings.oss_endpoint.replace('http://', '').replace('https://', '')}/{object_key}"
|
||
|
||
return {
|
||
"success": True,
|
||
"message": "文件上传成功",
|
||
"object_key": object_key,
|
||
"file_url": file_url,
|
||
"etag": result.etag,
|
||
"request_id": result.request_id
|
||
}
|
||
|
||
except OssError as e:
|
||
return {
|
||
"success": False,
|
||
"message": f"上传失败: {e}",
|
||
"error_code": e.code if hasattr(e, 'code') else 'Unknown'
|
||
}
|
||
|
||
def delete_file(self, object_key: str) -> dict:
|
||
"""
|
||
删除OSS文件
|
||
|
||
Args:
|
||
object_key: OSS对象键
|
||
|
||
Returns:
|
||
删除结果字典
|
||
"""
|
||
try:
|
||
# 首先检查文件是否存在
|
||
try:
|
||
self.bucket.head_object(object_key)
|
||
except NoSuchKey:
|
||
return {
|
||
"success": False,
|
||
"message": f"文件不存在: {object_key}",
|
||
"error_code": "NoSuchKey"
|
||
}
|
||
|
||
# 文件存在,执行删除操作
|
||
result = self.bucket.delete_object(object_key)
|
||
return {
|
||
"success": True,
|
||
"message": "文件删除成功",
|
||
"object_key": object_key,
|
||
"request_id": result.request_id
|
||
}
|
||
except OssError as e:
|
||
return {
|
||
"success": False,
|
||
"message": f"删除失败: {e}",
|
||
"error_code": e.code if hasattr(e, 'code') else 'Unknown'
|
||
}
|
||
|
||
def get_file_info(self, object_key: str) -> dict:
|
||
"""
|
||
获取文件信息
|
||
|
||
Args:
|
||
object_key: OSS对象键
|
||
|
||
Returns:
|
||
文件信息字典
|
||
"""
|
||
try:
|
||
head_result = self.bucket.head_object(object_key)
|
||
|
||
return {
|
||
"success": True,
|
||
"object_key": object_key,
|
||
"size": head_result.content_length,
|
||
"last_modified": head_result.last_modified,
|
||
"content_type": head_result.content_type,
|
||
"etag": head_result.etag
|
||
}
|
||
except NoSuchKey:
|
||
return {
|
||
"success": False,
|
||
"message": "文件不存在"
|
||
}
|
||
except OssError as e:
|
||
return {
|
||
"success": False,
|
||
"message": f"获取文件信息失败: {e}"
|
||
}
|
||
|
||
def list_files(self, prefix: str = "", max_keys: int = 100) -> dict:
|
||
"""
|
||
列出文件
|
||
|
||
Args:
|
||
prefix: 对象键前缀
|
||
max_keys: 最大返回数量
|
||
|
||
Returns:
|
||
文件列表字典
|
||
"""
|
||
try:
|
||
files = []
|
||
for obj in oss2.ObjectIterator(self.bucket, prefix=prefix, max_keys=max_keys):
|
||
# 安全地处理 last_modified 字段
|
||
last_modified_str = None
|
||
if obj.last_modified:
|
||
if hasattr(obj.last_modified, 'isoformat'):
|
||
# 如果是 datetime 对象
|
||
last_modified_str = obj.last_modified.isoformat()
|
||
elif isinstance(obj.last_modified, (int, float)):
|
||
# 如果是时间戳,转换为 datetime 然后格式化
|
||
last_modified_str = datetime.fromtimestamp(obj.last_modified).isoformat()
|
||
else:
|
||
# 其他情况,尝试转换为字符串
|
||
last_modified_str = str(obj.last_modified)
|
||
|
||
files.append({
|
||
"key": obj.key,
|
||
"size": obj.size,
|
||
"last_modified": last_modified_str,
|
||
"etag": obj.etag
|
||
})
|
||
|
||
return {
|
||
"success": True,
|
||
"files": files,
|
||
"count": len(files)
|
||
}
|
||
except OssError as e:
|
||
return {
|
||
"success": False,
|
||
"message": f"列出文件失败: {e}"
|
||
}
|
||
|
||
|
||
# 全局OSS服务实例
|
||
oss_service = OSSService() |