Fix: Move project files to root directory
This commit is contained in:
105
utils.py
Normal file
105
utils.py
Normal file
@@ -0,0 +1,105 @@
|
||||
import os
|
||||
import mimetypes
|
||||
from typing import Tuple
|
||||
from fastapi import UploadFile, HTTPException
|
||||
from config import settings
|
||||
|
||||
|
||||
def validate_file(file: UploadFile) -> Tuple[bool, str]:
|
||||
"""
|
||||
验证上传的文件
|
||||
|
||||
Args:
|
||||
file: FastAPI上传文件对象
|
||||
|
||||
Returns:
|
||||
(是否通过验证, 错误信息)
|
||||
"""
|
||||
# 检查文件名
|
||||
if not file.filename:
|
||||
return False, "文件名不能为空"
|
||||
|
||||
# 检查文件大小
|
||||
if hasattr(file, 'size') and file.size > 1024 * 1024 * 1024: # 1GB = 1024^3 bytes
|
||||
return False, f"文件大小超过限制 (1GB)"
|
||||
|
||||
|
||||
# 检查文件扩展名
|
||||
_, ext = os.path.splitext(file.filename.lower())
|
||||
if ext not in [e.lower() for e in settings.allowed_extensions]:
|
||||
return False, f"不支持的文件类型。支持的类型: {', '.join(settings.allowed_extensions)}"
|
||||
|
||||
return True, ""
|
||||
|
||||
|
||||
def get_content_type(filename: str) -> str:
|
||||
"""
|
||||
根据文件名获取MIME类型
|
||||
|
||||
Args:
|
||||
filename: 文件名
|
||||
|
||||
Returns:
|
||||
MIME类型字符串
|
||||
"""
|
||||
content_type, _ = mimetypes.guess_type(filename)
|
||||
return content_type or "application/octet-stream"
|
||||
|
||||
|
||||
def format_file_size(size_bytes: int) -> str:
|
||||
"""
|
||||
格式化文件大小
|
||||
|
||||
Args:
|
||||
size_bytes: 文件大小(字节)
|
||||
|
||||
Returns:
|
||||
格式化后的文件大小字符串
|
||||
"""
|
||||
if size_bytes == 0:
|
||||
return "0B"
|
||||
|
||||
size_names = ["B", "KB", "MB", "GB", "TB"]
|
||||
i = 0
|
||||
size = float(size_bytes)
|
||||
|
||||
while size >= 1024.0 and i < len(size_names) - 1:
|
||||
size /= 1024.0
|
||||
i += 1
|
||||
|
||||
return f"{size:.2f}{size_names[i]}"
|
||||
|
||||
|
||||
def extract_object_key_from_url(file_url: str) -> str:
|
||||
"""
|
||||
从文件URL中提取对象键
|
||||
|
||||
Args:
|
||||
file_url: 文件URL
|
||||
|
||||
Returns:
|
||||
对象键
|
||||
"""
|
||||
# 假设URL格式为: https://bucket.endpoint.com/object_key
|
||||
try:
|
||||
return file_url.split('/')[-1]
|
||||
except:
|
||||
return ""
|
||||
|
||||
|
||||
def sanitize_filename(filename: str) -> str:
|
||||
"""
|
||||
清理文件名,移除特殊字符
|
||||
|
||||
Args:
|
||||
filename: 原始文件名
|
||||
|
||||
Returns:
|
||||
清理后的文件名
|
||||
"""
|
||||
# 移除或替换特殊字符
|
||||
invalid_chars = '<>:"/\\|?*'
|
||||
for char in invalid_chars:
|
||||
filename = filename.replace(char, '_')
|
||||
|
||||
return filename.strip()
|
||||
Reference in New Issue
Block a user