sms 更新

This commit is contained in:
jeremygan2021
2026-01-13 21:15:14 +08:00
parent d14f4029f2
commit f85cded5d9
16 changed files with 2520 additions and 0 deletions

View File

@@ -0,0 +1,29 @@
# Large files
*.tar
*.tar.gz
*.zip
*.rar
*.7z
*.iso
*.bin
*.exe
*.dll
*.so
*.dylib
# Python
__pycache__/
*.pyc
*.pyo
*.pyd
.Python
env/
venv/
.env
# OS
.DS_Store
Thumbs.db
# Project specific
uploads/

View File

@@ -0,0 +1,47 @@
# Add Dynamic Agent API (Type=1) with Database Integration
I will implement the Dynamic Agent feature by integrating it with the existing `agent_cards` system. Dynamic Agents will be distinguished by `type=1` and will store their extended data (videos, knowledge base) in a new linked table.
## 1. Database Schema Extension
I will create a new table `dynamic_agent_details` to store the specialized data for dynamic agents.
* **Table Name**: `dynamic_agent_details`
* **Columns**:
* `id`: `SERIAL PRIMARY KEY`
* `card_id`: `INTEGER` (Foreign Key linking to `agent_cards.card_id`)
* `videos`: `JSONB` (Stores list of `{url, emotion}`, min 1, max 7)
* `kb_id`: `VARCHAR` (Knowledge Base ID)
* `kb_config`: `JSONB` (Additional Knowledge Base fields)
* `created_at`: `TIMESTAMP`
## 2. Code Implementation (`main.py`)
### Pydantic Models
I will define:
* `VideoItem`: `{url: str, emotion: str}`
* `DynamicAgentCreate`: Inherits/Includes fields from `AgentCard` (name, avatar, etc.) PLUS:
* `videos`: List[VideoItem] (validated length 1-7)
* `kb_id`: str
* `kb_config`: dict
### API Endpoints
1. **Create Dynamic Agent**
* `POST /dynamic_agent/`
* **Logic**:
1. Insert the base agent info into `agent_cards` with `type=1` and `RETURNING card_id`.
2. Insert the extended info (videos, kb details) into `dynamic_agent_details` using the returned `card_id`.
3. Return the new `card_id`.
2. **Get Dynamic Agent Details**
* `GET /dynamic_agent/{card_id}`
* **Logic**:
1. Fetch base info from `agent_cards`.
2. Fetch extended info from `dynamic_agent_details`.
3. Return combined result.
### Helper Functions
* `init_dynamic_agent_db()`: Checks/Creates the `dynamic_agent_details` table on startup.
## 3. Verification
* Create a dynamic agent via the new API.
* Verify it appears in the `agent_cards` table (as type 1).
* Verify extended data is in `dynamic_agent_details`.

View File

@@ -0,0 +1,43 @@
# Implement Tool Management for Dynamic Agents
I will implement a system to manage "Tools" (Function Calls) and associate them with Dynamic Agents, allowing the frontend to easily configure which tools an agent can use.
## 1. Database Schema Expansion
I will create two new tables in PostgreSQL:
* `tools`: Stores reusable tool definitions (templates).
* `tool_id`: `SERIAL PRIMARY KEY`
* `name`: `VARCHAR` (Unique, e.g., "get_weather")
* `description`: `TEXT` (For LLM understanding)
* `parameters`: `JSONB` (JSON Schema for arguments)
* `created_at`: `TIMESTAMP`
* `agent_tools`: Maps agents to tools (Many-to-Many).
* `agent_card_id`: `INTEGER` (FK)
* `tool_id`: `INTEGER` (FK)
* `config`: `JSONB` (Optional, for agent-specific tool config if needed)
## 2. API Implementation (`main.py`)
### Pydantic Models
* `ToolCreate`: Validation for creating new tool definitions.
* `ToolResponse`: API response format.
* `AgentToolAssignment`: For linking tools to agents.
### New Endpoints
* **Tool Management (CRUD)**:
* `POST /tools/`: Create a new tool definition.
* `GET /tools/`: List all available tools.
* `PUT /tools/{tool_id}`: Update a tool.
* `DELETE /tools/{tool_id}`: Delete a tool.
* **Agent Tool Association**:
* `POST /agent/{card_id}/tools`: Assign a list of tools to an agent.
* `GET /agent/{card_id}/tools`: Get all tools assigned to an agent.
* `DELETE /agent/{card_id}/tools/{tool_id}`: Remove a tool from an agent.
### Update Existing Logic
* **Modify `GET /dynamic_agent/{card_id}`**:
* Update the query to automatically fetch and include the list of associated tools in the agent details response. This ensures the frontend gets everything in one call.
## 3. Verification
* Create a "Weather Query" tool definition.
* Assign it to a Dynamic Agent.
* Verify the agent details endpoint returns the tool info correctly.

View File

@@ -0,0 +1,30 @@
FROM python:3.12
WORKDIR /app
COPY requirements.txt .
COPY .env .
RUN pip install -r requirements.txt -i https://mirrors.aliyun.com/pypi/simple
COPY . .
ARG API_KEY
ARG DB_HOST
ARG DB_PORT
ARG DB_NAME
ARG DB_USER
ARG DB_PASSWORD
ARG DB_SSLMODE
ENV API_KEY=$API_KEY
ENV DB_HOST=$DB_HOST
ENV DB_PORT=$DB_PORT
ENV DB_NAME=$DB_NAME
ENV DB_USER=$DB_USER
ENV DB_PASSWORD=$DB_PASSWORD
ENV DB_SSLMODE=$DB_SSLMODE
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "9090"]
EXPOSE 9090

View File

@@ -0,0 +1,216 @@
# TangledupAI API 数据库服务
这是一个基于 FastAPI 构建的后端 API 服务,为 TangledupAI 提供用户管理、Agent 卡片管理、对话管理和文件存储等功能。
## 功能特性
- **用户管理**: 用户注册、查询、更新
- **Agent 管理**: 创建、查询、更新、删除 Agent 卡片
- **对话管理**: 创建对话、保存消息
- **短信服务**: 发送验证码短信
- **文件存储**: 基于阿里云 OSS 的文件上传、删除、查询
- **API 安全**: 基于 API Key 的身份验证
## 技术栈
- **后端框架**: FastAPI
- **数据库**: PostgreSQL
- **对象存储**: 阿里云 OSS
- **短信服务**: 阿里云短信服务
- **部署**: Docker & Docker Compose
## 项目结构
```
API_database/
├── main.py # 主应用程序入口
├── config.py # 配置管理
├── models.py # 数据模型
├── oss_service.py # 阿里云 OSS 服务
├── sms.py # 短信服务
├── utils.py # 工具函数
├── requirements.txt # Python 依赖
├── Dockerfile # Docker 镜像构建文件
├── docker-compose.yml # Docker Compose 配置
└── .env # 环境变量配置
```
## 安装与运行
### 环境要求
- Python 3.12+
- PostgreSQL 数据库
- 阿里云 OSS 账号
- 阿里云短信服务账号
### 本地开发
1. 克隆仓库
```bash
git clone <repository-url>
cd API_database
```
2. 安装依赖
```bash
pip install -r requirements.txt -i https://mirrors.aliyun.com/pypi/simple
```
3. 配置环境变量
创建 `.env` 文件并配置以下变量:
```env
API_KEY=your_api_key
DB_HOST=your_db_host
DB_PORT=your_db_port
DB_NAME=your_db_name
DB_USER=your_db_user
DB_PASSWORD=your_db_password
DB_SSLMODE=disable
# 阿里云 OSS 配置
OSS_ACCESS_KEY_ID=your_oss_access_key_id
OSS_ACCESS_KEY_SECRET=your_oss_access_key_secret
OSS_ENDPOINT=https://oss-cn-shanghai.aliyuncs.com
OSS_BUCKET_NAME=your_bucket_name
```
4. 启动应用
```bash
uvicorn main:app --reload --host 0.0.0.0 --port 9090
```
### Docker 部署
1. 使用 Docker Compose
```bash
# 设置环境变量
export API_KEY=your_api_key
export DB_HOST=your_db_host
export DB_PORT=your_db_port
export DB_NAME=your_db_name
export DB_USER=your_db_user
export DB_PASSWORD=your_db_password
export DB_SSLMODE=disable
# 启动服务
docker-compose up -d
```
2. 或者直接使用 Docker
```bash
docker build -t tangledup-api .
docker run -d -p 9090:9090 \
-e API_KEY=your_api_key \
-e DB_HOST=your_db_host \
-e DB_PORT=your_db_port \
-e DB_NAME=your_db_name \
-e DB_USER=your_db_user \
-e DB_PASSWORD=your_db_password \
-e DB_SSLMODE=disable \
tangledup-api
```
## API 文档
启动服务后,可以通过以下地址访问 API 文档:
- Swagger UI: http://localhost:9090/docs
- ReDoc: http://localhost:9090/redoc
## API 端点
### 用户管理
- `POST /register/` - 用户注册
- `GET /user/{phone}` - 获取用户信息
- `PUT /user/{phone}` - 更新用户信息
### Agent 管理
- `POST /new_agent/` - 创建 Agent 卡片
- `GET /agents/{phone}` - 获取用户的所有 Agent 卡片
- `GET /agent_id/{agent_id}` - 根据 ID 获取 Agent 卡片
- `PUT /agent/{phone_number}/{agent_name}` - 更新 Agent 卡片
- `PUT /agent_id/{agent_id}` - 根据 ID 更新 Agent 卡片
- `DELETE /agent/{phone_number}/{agent_name}` - 删除 Agent 卡片
- `DELETE /agent_id/{agent_id}` - 根据 ID 删除 Agent 卡片
### 对话管理
- `POST /conversations/` - 创建新对话
- `POST /messages/` - 保存消息
### 文件上传
- `POST /upload_image/` - 上传图片
- `POST /upload` - 上传文件到 OSS
### 短信服务
- `POST /api/send-sms` - 发送短信
- `GET /api/sms-records` - 获取短信记录
### OSS 服务
- `POST /test/` - 测试 OSS 连接
## 身份验证
所有 API 端点都需要在请求头中包含有效的 API Key
```
x-api-key: 123tangledup-ai
```
## 数据库表结构
### users 表
- phone_number: 手机号码 (主键)
- user_name: 用户名
- user_details: 用户详情
- avatar: 头像URL
- email: 邮箱
- points: 积分
- create_date: 创建时间
### agent_cards 表
- card_id: 卡片ID (主键)
- phone_number: 用户手机号码
- card_info: 卡片信息
- agent_avatar_url: Agent头像URL
- agent_prompt: Agent提示
- agent_name: Agent名称
- is_publish: 是否发布
- create_date: 创建时间
- voice_type: 语音类型
- temperature: 温度参数
### conversations 表
- conversation_id: 对话ID (主键)
- user_phone: 用户手机号码
- visitor_key: 访客密钥
- agent_card_id: Agent卡片ID
### messages 表
- message_id: 消息ID (主键)
- conversation_id: 对话ID
- sender: 发送者 (user/agent)
- content: 消息内容
- order: 消息顺序
## 贡献指南
1. Fork 本仓库
2. 创建您的特性分支 (`git checkout -b feature/AmazingFeature`)
3. 提交您的更改 (`git commit -m 'Add some AmazingFeature'`)
4. 推送到分支 (`git push origin feature/AmazingFeature`)
5. 打开一个 Pull Request
## 许可证
本项目采用 MIT 许可证 - 查看 [LICENSE](LICENSE) 文件了解详情。
## 联系方式
如有问题或建议,请联系项目维护者。

View File

@@ -0,0 +1,45 @@
import os
from typing import Optional
from pydantic_settings import BaseSettings
class Settings(BaseSettings):
"""应用配置类"""
# API密钥配置
api_key: str = "123tangledup-ai"
# SMS配置
sms_template_code: str = "SMS_465420786"
sms_sign_name: str = "叠加态科技"
# 数据库配置
db_host: str = "121.43.104.161"
db_port: str = "6432"
db_name: str = "call_data"
db_user: str = "tangledup-ai"
db_password: str = "123tangledup-ai"
db_sslmode: str = "disable"
# 阿里云OSS配置
oss_access_key_id: str = "LTAI5tE62GW8MKyoEaotzxXk"
oss_access_key_secret: str = "Zdzqo1fgj57DxxioXOotNKhJdSfVQW"
oss_endpoint: str = "https://oss-cn-shanghai.aliyuncs.com"
oss_bucket_name: str = "tangledup-ai-staging"
oss_internal_endpoint: str = "https://oss-cn-shanghai-internal.aliyuncs.com"
# FastAPI配置
app_host: str = os.getenv("APP_HOST", "0.0.0.0")
app_port: int = int(os.getenv("APP_PORT", "8000"))
debug: bool = os.getenv("DEBUG", "True").lower() == "true"
# 文件上传配置
max_file_size: int = 1024 * 1024 * 1024 # 1GB
allowed_extensions: list = ['.jpg', '.jpeg', '.png', '.gif', '.pdf', '.txt', '.doc', '.docx', '.mp3', '.mp4', '.wav', '.webm']
class Config:
env_file = ".env"
# 全局设置实例
settings = Settings()

View File

@@ -0,0 +1,29 @@
version: '3.8'
services:
api:
build:
context: .
dockerfile: Dockerfile
args:
- API_KEY
- DB_HOST
- DB_PORT
- DB_NAME
- DB_USER
- DB_PASSWORD
- DB_SSLMODE
environment:
- API_KEY=${API_KEY}
- DB_HOST=${DB_HOST}
- DB_PORT=${DB_PORT}
- DB_NAME=${DB_NAME}
- DB_USER=${DB_USER}
- DB_PASSWORD=${DB_PASSWORD}
- DB_SSLMODE=${DB_SSLMODE}
ports:
- "9090:9090"
volumes:
- .:/app
# - /mnt/server/userImage/call_avator:/mnt/server/userImage/call_avator
command: ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "9090"]
restart: always

View File

@@ -0,0 +1,301 @@
#!/bin/bash
# =============================================================================
# Docker 镜像构建和部署自动化脚本
# # chmod u+x docker_deply.sh
# 用法:
# ./docker_deply.sh # 完整构建和部署流程 (默认AMD64架构)
# ./docker_deply.sh -amd # 构建和部署AMD64架构
# ./docker_deply.sh -arm # 构建和部署ARM64架构
# ./docker_depla.sh -upload # 仅上传已存在的tar文件并部署
# ./docker_deply.sh -upload -amd # 仅上传已存在的AMD64架构tar文件并部署
# ./docker_deply.sh -upload -arm # 仅上传已存在的ARM64架构tar文件并部署
# =============================================================================
# 配置变量 - 请根据实际情况修改
SERVER_HOST="121.43.104.161" # 服务器IP地址
SERVER_USER="root" # 服务器用户名
SERVER_PASSWORD="123quant-speed" # 服务器密码
SERVER_PORT="22" # SSH端口默认22
IMAGE_NAME="api_database-api" # Docker镜像名称
IMAGE_TAG="latest" # Docker镜像标签
CONTAINER_NAME="api_database-api" # 容器名称
LOCAL_PORT="9090" # 本地端口
CONTAINER_PORT="9090" # 容器端口
TAR_FILE="${IMAGE_NAME}-${IMAGE_TAG}.tar" # 压缩包文件名
# 架构相关变量
PLATFORM="linux/amd64" # 默认架构
ARCH_SUFFIX="" # 架构后缀用于区分不同架构的tar文件
# 颜色输出
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
BLUE='\033[0;34m'
NC='\033[0m' # No Color
# 日志函数
log_info() {
echo -e "${BLUE}[INFO]${NC} $1"
}
log_success() {
echo -e "${GREEN}[SUCCESS]${NC} $1"
}
log_warning() {
echo -e "${YELLOW}[WARNING]${NC} $1"
}
log_error() {
echo -e "${RED}[ERROR]${NC} $1"
}
# 检查依赖
check_dependencies() {
log_info "检查依赖..."
if ! command -v docker &> /dev/null; then
log_error "Docker 未安装,请先安装 Docker"
exit 1
fi
if ! command -v sshpass &> /dev/null; then
log_error "sshpass 未安装,请先安装 sshpass"
log_info "macOS: brew install sshpass"
log_info "Ubuntu: sudo apt-get install sshpass"
exit 1
fi
log_success "依赖检查完成"
}
# 解析命令行参数
parse_arguments() {
while [[ $# -gt 0 ]]; do
case $1 in
-amd)
PLATFORM="linux/amd64"
ARCH_SUFFIX="-amd64"
log_info "设置目标架构为 AMD64"
shift
;;
-arm)
PLATFORM="linux/arm64"
ARCH_SUFFIX="-arm64"
log_info "设置目标架构为 ARM64"
shift
;;
-upload)
UPLOAD_ONLY=true
log_info "设置为仅上传模式"
shift
;;
*)
log_error "未知参数: $1"
log_info "支持的参数: -amd, -arm, -upload"
exit 1
;;
esac
done
# 更新TAR_FILE名包含架构后缀
TAR_FILE="${IMAGE_NAME}-${IMAGE_TAG}${ARCH_SUFFIX}.tar"
log_info "镜像文件名: ${TAR_FILE}"
}
# 构建Docker镜像
build_image() {
log_info "开始构建 Docker 镜像..."
# 检查是否存在旧的tar文件
if [ -f "$TAR_FILE" ]; then
log_warning "发现旧的tar文件正在删除..."
rm -f "$TAR_FILE"
fi
# 构建镜像并导出为tar文件
docker buildx build --platform $PLATFORM -t "${IMAGE_NAME}:${IMAGE_TAG}" --output type=docker,dest="./${TAR_FILE}" .
if [ $? -eq 0 ]; then
log_success "Docker 镜像构建完成: ${TAR_FILE}"
else
log_error "Docker 镜像构建失败"
exit 1
fi
}
# 上传文件到服务器
upload_to_server() {
log_info "上传文件到服务器..."
sshpass -p "$SERVER_PASSWORD" scp -P "$SERVER_PORT" -o StrictHostKeyChecking=no "$TAR_FILE" "${SERVER_USER}@${SERVER_HOST}:/tmp/"
if [ $? -eq 0 ]; then
log_success "文件上传成功"
else
log_error "文件上传失败"
exit 1
fi
}
# 在服务器上部署
deploy_on_server() {
log_info "在服务器上部署..."
sshpass -p "$SERVER_PASSWORD" ssh -p "$SERVER_PORT" -o StrictHostKeyChecking=no "${SERVER_USER}@${SERVER_HOST}" << EOF
set -e
echo "[INFO] 开始服务器端部署..."
# 检查并停止现有容器
if docker ps -a --format 'table {{.Names}}' | grep -q "^${CONTAINER_NAME}$"; then
echo "[INFO] 发现现有容器 ${CONTAINER_NAME},正在停止并删除..."
docker stop ${CONTAINER_NAME} || true
docker rm ${CONTAINER_NAME} || true
fi
# 检查是否有其他容器占用目标端口(处理容器名称不一致但端口冲突的情况)
# 注意:这里使用 \$ 转义,确保命令在服务器端执行而不是本地
CONFLICT_CONTAINER_ID=\$(docker ps --format "{{.ID}} {{.Ports}}" | grep ":${LOCAL_PORT}->" | awk '{print \$1}')
if [ ! -z "\$CONFLICT_CONTAINER_ID" ]; then
echo "[WARNING] 发现端口 ${LOCAL_PORT} 被其他容器 (\$CONFLICT_CONTAINER_ID) 占用,正在强制停止并删除..."
docker stop \$CONFLICT_CONTAINER_ID || true
docker rm \$CONFLICT_CONTAINER_ID || true
fi
# 检查并删除现有镜像
if sudo docker images --format 'table {{.Repository}}:{{.Tag}}' | grep -q "^${IMAGE_NAME}:${IMAGE_TAG}$"; then
echo "[INFO] 发现现有镜像 ${IMAGE_NAME}:${IMAGE_TAG},正在删除..."
sudo docker rmi ${IMAGE_NAME}:${IMAGE_TAG} || true
fi
# 加载新镜像
echo "[INFO] 加载新镜像..."
sudo docker load -i /tmp/${TAR_FILE}
# 验证镜像是否加载成功
if sudo docker images | grep -q "${IMAGE_NAME}"; then
echo "[SUCCESS] 镜像加载成功"
else
echo "[ERROR] 镜像加载失败"
exit 1
fi
# 运行新容器
echo "[INFO] 启动新容器..."
sudo docker run -d -p ${LOCAL_PORT}:${CONTAINER_PORT} --name ${CONTAINER_NAME} ${IMAGE_NAME}:${IMAGE_TAG}
# 验证容器是否启动成功
if sudo docker ps | grep -q "${CONTAINER_NAME}"; then
echo "[SUCCESS] 容器启动成功"
echo "[INFO] 容器状态:"
sudo docker ps | grep "${CONTAINER_NAME}"
else
echo "[ERROR] 容器启动失败"
echo "[INFO] 查看容器日志:"
sudo docker logs ${CONTAINER_NAME}
exit 1
fi
# 清理临时文件
echo "[INFO] 清理临时文件..."
rm -f /tmp/${TAR_FILE}
echo "[SUCCESS] 部署完成!"
echo "[INFO] 应用访问地址: http://${SERVER_HOST}:${LOCAL_PORT}"
EOF
if [ $? -eq 0 ]; then
log_success "服务器部署完成"
else
log_error "服务器部署失败"
exit 1
fi
}
# 清理本地文件
cleanup_local() {
log_info "清理本地临时文件..."
if [ -f "$TAR_FILE" ]; then
rm -f "$TAR_FILE"
log_success "本地临时文件已清理"
fi
}
# 显示部署信息
show_deployment_info() {
log_success "部署完成!"
echo ""
echo "=========================================="
echo "部署信息:"
echo "=========================================="
echo "服务器地址: ${SERVER_HOST}"
echo "应用端口: ${LOCAL_PORT}"
echo "访问地址: http://${SERVER_HOST}:${LOCAL_PORT}"
echo "容器名称: ${CONTAINER_NAME}"
echo "镜像名称: ${IMAGE_NAME}:${IMAGE_TAG}"
echo "目标架构: ${PLATFORM}"
echo "镜像文件: ${TAR_FILE}"
echo "=========================================="
echo ""
log_info "如需查看容器日志,请在服务器上运行: sudo docker logs ${CONTAINER_NAME}"
log_info "如需停止容器,请在服务器上运行: sudo docker stop ${CONTAINER_NAME}"
}
# 主函数
main() {
echo "=========================================="
echo "Docker 镜像构建和部署自动化脚本"
echo "=========================================="
echo ""
# 解析命令行参数
UPLOAD_ONLY=false
parse_arguments "$@"
# 检查配置
if [ "$SERVER_HOST" = "your-server-ip" ] || [ "$SERVER_PASSWORD" = "your-password" ]; then
log_error "请先配置脚本顶部的服务器信息"
log_info "需要修改的变量:"
log_info " - SERVER_HOST: 服务器IP地址"
log_info " - SERVER_USER: 服务器用户名"
log_info " - SERVER_PASSWORD: 服务器密码"
exit 1
fi
# 检查是否是上传模式
if [ "$UPLOAD_ONLY" = true ]; then
log_info "检测到 -upload 参数,跳过构建步骤"
# 检查tar文件是否存在
if [ ! -f "$TAR_FILE" ]; then
log_error "未找到tar文件: $TAR_FILE"
log_info "请先运行脚本构建镜像或确保tar文件存在"
exit 1
fi
log_success "找到tar文件: $TAR_FILE"
# 执行上传和部署流程
upload_to_server
deploy_on_server
cleanup_local
show_deployment_info
else
# 执行完整的部署流程
check_dependencies
build_image
upload_to_server
deploy_on_server
cleanup_local
show_deployment_info
fi
}
# 脚本入口
if [[ "${BASH_SOURCE[0]}" == "${0}" ]]; then
main "$@"
fi

View File

@@ -0,0 +1,6 @@
nohup uvicorn main:app --reload --port 9090 > output.log 2>&1 &
uvicorn main:app --host 0.0.0.0 --port 9090
uvicorn main:app --host 0.0.0.0 --port 9090 --reload

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,57 @@
from pydantic import BaseModel
from typing import Optional, List
from datetime import datetime
class FileUploadResponse(BaseModel):
"""文件上传响应模型"""
success: bool
message: str
object_key: Optional[str] = None
file_url: Optional[str] = None
etag: Optional[str] = None
request_id: Optional[str] = None
error_code: Optional[str] = None
class FileInfo(BaseModel):
"""文件信息模型"""
success: bool
object_key: Optional[str] = None
size: Optional[int] = None
last_modified: Optional[datetime] = None
content_type: Optional[str] = None
etag: Optional[str] = None
message: Optional[str] = None
class FileListItem(BaseModel):
"""文件列表项模型"""
key: str
size: int
last_modified: Optional[str] = None
etag: str
class FileListResponse(BaseModel):
"""文件列表响应模型"""
success: bool
files: Optional[List[FileListItem]] = None
count: Optional[int] = None
message: Optional[str] = None
class DeleteFileResponse(BaseModel):
"""删除文件响应模型"""
success: bool
message: str
object_key: Optional[str] = None
request_id: Optional[str] = None
error_code: Optional[str] = None
class HealthCheck(BaseModel):
"""健康检查响应模型"""
status: str
message: str
timestamp: datetime

View File

@@ -0,0 +1,198 @@
import os
import uuid
from datetime import datetime
from typing import Optional, BinaryIO
import oss2
from oss2.exceptions import OssError, NoSuchBucket, NoSuchKey
from config import settings
class OSSService:
"""阿里云OSS服务类"""
def __init__(self):
"""初始化OSS客户端"""
if not all([settings.oss_access_key_id, settings.oss_access_key_secret, settings.oss_bucket_name]):
raise ValueError("OSS配置不完整请检查环境变量")
# 创建认证对象
self.auth = oss2.Auth(settings.oss_access_key_id, settings.oss_access_key_secret)
# 创建Bucket对象
self.bucket = oss2.Bucket(self.auth, settings.oss_endpoint, settings.oss_bucket_name)
# 验证bucket是否存在
try:
self.bucket.get_bucket_info()
except NoSuchBucket:
raise ValueError(f"Bucket '{settings.oss_bucket_name}' 不存在")
def generate_object_key(self, original_filename: str, folder: str = "uploads") -> str:
"""生成OSS对象键名"""
# 获取文件扩展名
_, ext = os.path.splitext(original_filename)
# 生成唯一文件名
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
unique_id = str(uuid.uuid4())[:8]
filename = f"{timestamp}_{unique_id}{ext}"
# 返回完整的对象键
return f"{folder}/{filename}"
def upload_file(self, file_content: bytes, object_key: str, content_type: str = None) -> dict:
"""
上传文件到OSS
Args:
file_content: 文件内容
object_key: OSS对象键
content_type: 文件MIME类型
Returns:
包含上传结果的字典
"""
try:
# 设置文件头信息
headers = {}
if content_type:
headers['Content-Type'] = content_type
# 上传文件
result = self.bucket.put_object(object_key, file_content, headers=headers)
# 构造文件URL
file_url = f"https://{settings.oss_bucket_name}.{settings.oss_endpoint.replace('http://', '').replace('https://', '')}/{object_key}"
return {
"success": True,
"message": "文件上传成功",
"object_key": object_key,
"file_url": file_url,
"etag": result.etag,
"request_id": result.request_id
}
except OssError as e:
return {
"success": False,
"message": f"上传失败: {e}",
"error_code": e.code if hasattr(e, 'code') else 'Unknown'
}
def delete_file(self, object_key: str) -> dict:
"""
删除OSS文件
Args:
object_key: OSS对象键
Returns:
删除结果字典
"""
try:
# 首先检查文件是否存在
try:
self.bucket.head_object(object_key)
except NoSuchKey:
return {
"success": False,
"message": f"文件不存在: {object_key}",
"error_code": "NoSuchKey"
}
# 文件存在,执行删除操作
result = self.bucket.delete_object(object_key)
return {
"success": True,
"message": "文件删除成功",
"object_key": object_key,
"request_id": result.request_id
}
except OssError as e:
return {
"success": False,
"message": f"删除失败: {e}",
"error_code": e.code if hasattr(e, 'code') else 'Unknown'
}
def get_file_info(self, object_key: str) -> dict:
"""
获取文件信息
Args:
object_key: OSS对象键
Returns:
文件信息字典
"""
try:
head_result = self.bucket.head_object(object_key)
return {
"success": True,
"object_key": object_key,
"size": head_result.content_length,
"last_modified": head_result.last_modified,
"content_type": head_result.content_type,
"etag": head_result.etag
}
except NoSuchKey:
return {
"success": False,
"message": "文件不存在"
}
except OssError as e:
return {
"success": False,
"message": f"获取文件信息失败: {e}"
}
def list_files(self, prefix: str = "", max_keys: int = 100) -> dict:
"""
列出文件
Args:
prefix: 对象键前缀
max_keys: 最大返回数量
Returns:
文件列表字典
"""
try:
files = []
for obj in oss2.ObjectIterator(self.bucket, prefix=prefix, max_keys=max_keys):
# 安全地处理 last_modified 字段
last_modified_str = None
if obj.last_modified:
if hasattr(obj.last_modified, 'isoformat'):
# 如果是 datetime 对象
last_modified_str = obj.last_modified.isoformat()
elif isinstance(obj.last_modified, (int, float)):
# 如果是时间戳,转换为 datetime 然后格式化
last_modified_str = datetime.fromtimestamp(obj.last_modified).isoformat()
else:
# 其他情况,尝试转换为字符串
last_modified_str = str(obj.last_modified)
files.append({
"key": obj.key,
"size": obj.size,
"last_modified": last_modified_str,
"etag": obj.etag
})
return {
"success": True,
"files": files,
"count": len(files)
}
except OssError as e:
return {
"success": False,
"message": f"列出文件失败: {e}"
}
# 全局OSS服务实例
oss_service = OSSService()

View File

@@ -0,0 +1,15 @@
fastapi
uvicorn
pydantic
psycopg2-binary
python-multipart
alibabacloud_dysmsapi20170525
alibabacloud_tea_openapi
alibabacloud_darabonba_env
alibabacloud_tea_util
alibabacloud_tea_console
alibabacloud_darabonba_string
alibabacloud_darabonba_time
oss2==2.18.3
pydantic-settings==2.9.1
aiofiles==23.2.0

View File

@@ -0,0 +1,41 @@
#!/usr/bin/env python3
"""
阿里云OSS文件上传API服务启动脚本
"""
import os
import sys
import uvicorn
from main import app
from config import settings
def main():
"""启动应用"""
print("🚀 启动阿里云OSS文件上传API服务...")
print(f"📍 服务地址: http://{settings.app_host}:{settings.app_port}")
print(f"📚 API文档: http://{settings.app_host}:{settings.app_port}/docs")
print(f"🔧 OSS端点: {settings.oss_endpoint}")
print(f"🪣 Bucket: {settings.oss_bucket_name}")
print("-" * 50)
# 检查必要的环境变量
if not settings.oss_access_key_id or not settings.oss_access_key_secret:
print("❌ 错误: 请设置OSS访问密钥环境变量:")
print(" export OSS_ACCESS_KEY_ID=your_access_key_id")
print(" export OSS_ACCESS_KEY_SECRET=your_access_key_secret")
print(" export OSS_BUCKET_NAME=your_bucket_name")
sys.exit(1)
# 启动服务
uvicorn.run(
app,
host=settings.app_host,
port=settings.app_port,
reload=settings.debug,
log_level="info"
)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,221 @@
# -*- coding: utf-8 -*-
# This file is auto-generated, don't edit it. Thanks.
import sys
import os
import json
from typing import List
from alibabacloud_dysmsapi20170525.client import Client as DysmsapiClient
from alibabacloud_tea_openapi import models as open_api_models
from alibabacloud_darabonba_env.client import Client as EnvClient
from alibabacloud_dysmsapi20170525 import models as dysmsapi_models
from alibabacloud_tea_util.client import Client as UtilClient
from alibabacloud_tea_console.client import Client as ConsoleClient
from alibabacloud_darabonba_string.client import Client as StringClient
from alibabacloud_darabonba_time.client import Client as TimeClient
from config import settings
class SMS:
def __init__(self):
self.AccessKey_ID = "LTAI5tEWmH3robZUGHiWYVnP"
self.AccessKey_Secret = "7LAVepzjXcnEobqlx7WPrvHaFYYEU3"
@staticmethod
def create_client(
access_key_id: str,
access_key_secret: str,
) -> DysmsapiClient:
"""
使用AK&SK初始化账号Client
"""
config = open_api_models.Config()
config.access_key_id = access_key_id
config.access_key_secret = access_key_secret
return DysmsapiClient(config)
@staticmethod
def main(self,
phone_number: str,
template_param: str,
template_code: str = settings.sms_template_code,
sign_name: str = settings.sms_sign_name,
) -> dict:
try:
if self.AccessKey_ID is None or self.AccessKey_Secret is None:
ConsoleClient.log("警告:使用默认的访问密钥,建议通过环境变量设置您自己的访问密钥")
client = SMS.create_client(self.AccessKey_ID, self.AccessKey_Secret)
# 设置默认值
phone_number = phone_number
# sign_name is passed as argument
# template_code is passed as argument
param_code = f"{{\"code\":\"{template_param}\"}}"
ConsoleClient.log(f"正在尝试发送短信... 模板代码: {template_code}, 签名: {sign_name}")
# 1.发送短信
send_req = dysmsapi_models.SendSmsRequest(
phone_numbers=phone_number,
sign_name=sign_name,
template_code=template_code,
template_param=param_code
)
try:
send_resp = client.send_sms(send_req)
code = send_resp.body.code
if not UtilClient.equal_string(code, 'OK'):
error_msg = f'发送失败,错误代码: {code}, 错误信息: {send_resp.body.message}'
ConsoleClient.log(error_msg)
return {"success": False, "error_message": error_msg}
ConsoleClient.log(f'短信发送成功业务ID: {send_resp.body.biz_id}')
biz_id = send_resp.body.biz_id
# 2. 等待 10 秒后查询结果
ConsoleClient.log("等待10秒后查询发送状态...")
UtilClient.sleep(10000)
# 3.查询结果
phone_nums = StringClient.split(phone_number, ',', -1)
details = []
for phone_num in phone_nums:
query_req = dysmsapi_models.QuerySendDetailsRequest(
phone_number=UtilClient.assert_as_string(phone_num),
biz_id=biz_id,
send_date=TimeClient.format('yyyyMMdd'),
page_size=10,
current_page=1
)
query_resp = client.query_send_details(query_req)
dtos = query_resp.body.sms_send_detail_dtos.sms_send_detail_dto
# 打印结果
for dto in dtos:
detail = {
"phone_num": dto.phone_num,
"send_status": dto.send_status,
"receive_date": dto.receive_date,
"err_code": dto.err_code
}
details.append(detail)
if UtilClient.equal_string(f'{dto.send_status}', '3'):
ConsoleClient.log(f'{dto.phone_num} 发送成功,接收时间: {dto.receive_date}')
elif UtilClient.equal_string(f'{dto.send_status}', '2'):
ConsoleClient.log(f'{dto.phone_num} 发送失败')
else:
ConsoleClient.log(f'{dto.phone_num} 正在发送中...')
return {"success": True, "biz_id": biz_id, "details": details}
except Exception as e:
error_msg = str(e)
if "NoPermission" in error_msg or "403" in error_msg:
ConsoleClient.log("权限错误:当前账号没有发送短信的权限")
# ... (keep existing log messages if possible, but for brevity I might shorten)
elif "InvalidAccessKeyId" in error_msg:
ConsoleClient.log("错误AccessKeyId无效")
elif "SignatureDoesNotMatch" in error_msg:
ConsoleClient.log("错误:签名不匹配")
else:
ConsoleClient.log(f"发生错误: {error_msg}")
return {"success": False, "error_message": error_msg}
except Exception as e:
ConsoleClient.log(f"程序运行错误: {str(e)}")
return {"success": False, "error_message": str(e)}
@staticmethod
async def main_async(self,
phone_number: str,
template_param: str,
template_code: str = settings.sms_template_code,
sign_name: str = settings.sms_sign_name,
) -> None:
try:
client = SMS.create_client(self.AccessKey_ID, self.AccessKey_Secret)
# 设置默认值
phone_number = phone_number
# sign_name is passed as argument
# template_code is passed as argument
template_param = f"{{\"code\":\"{template_param}\"}}"
# 如果提供了命令行参数,则使用命令行参数
ConsoleClient.log(f"正在尝试发送短信... 模板代码: {template_code}, 签名: {sign_name}")
# 1.发送短信
send_req = dysmsapi_models.SendSmsRequest(
phone_numbers=phone_number,
sign_name=sign_name,
template_code=template_code,
template_param=template_param
)
try:
send_resp = await client.send_sms_async(send_req)
code = send_resp.body.code
if not UtilClient.equal_string(code, 'OK'):
ConsoleClient.log(f'发送失败,错误代码: {code}, 错误信息: {send_resp.body.message}')
return
ConsoleClient.log(f'短信发送成功业务ID: {send_resp.body.biz_id}')
biz_id = send_resp.body.biz_id
# 2. 等待 10 秒后查询结果
ConsoleClient.log("等待10秒后查询发送状态...")
await UtilClient.sleep_async(10000)
# 3.查询结果
phone_nums = StringClient.split(phone_number, ',', -1)
for phone_num in phone_nums:
query_req = dysmsapi_models.QuerySendDetailsRequest(
phone_number=UtilClient.assert_as_string(phone_num),
biz_id=biz_id,
send_date=TimeClient.format('yyyyMMdd'),
page_size=10,
current_page=1
)
query_resp = await client.query_send_details_async(query_req)
dtos = query_resp.body.sms_send_detail_dtos.sms_send_detail_dto
# 打印结果
for dto in dtos:
if UtilClient.equal_string(f'{dto.send_status}', '3'):
ConsoleClient.log(f'{dto.phone_num} 发送成功,接收时间: {dto.receive_date}')
elif UtilClient.equal_string(f'{dto.send_status}', '2'):
ConsoleClient.log(f'{dto.phone_num} 发送失败')
else:
ConsoleClient.log(f'{dto.phone_num} 正在发送中...')
except Exception as e:
error_msg = str(e)
if "NoPermission" in error_msg or "403" in error_msg:
ConsoleClient.log("权限错误:当前账号没有发送短信的权限")
ConsoleClient.log("可能的原因:")
ConsoleClient.log("1. 访问密钥无效或已过期")
ConsoleClient.log("2. 账号未开通短信服务或权限不足")
ConsoleClient.log("3. 如果使用子账号,需要主账号授予相应权限")
ConsoleClient.log("解决方法:")
ConsoleClient.log("1. 登录阿里云控制台,检查账号短信服务开通状态")
ConsoleClient.log("2. 确认访问密钥是否有效")
ConsoleClient.log("3. 如需帮助,请联系阿里云客服")
elif "InvalidAccessKeyId" in error_msg:
ConsoleClient.log("错误AccessKeyId无效")
ConsoleClient.log("请检查您的AccessKeyId是否正确")
elif "SignatureDoesNotMatch" in error_msg:
ConsoleClient.log("错误:签名不匹配")
ConsoleClient.log("请检查您的AccessKeySecret是否正确")
else:
ConsoleClient.log(f"发生错误: {error_msg}")
except Exception as e:
ConsoleClient.log(f"程序运行错误: {str(e)}")
if __name__ == '__main__':
SMS = SMS()
SMS.main(self=SMS, phone_number="18585164448", template_param="6666")

View File

@@ -0,0 +1,105 @@
import os
import mimetypes
from typing import Tuple
from fastapi import UploadFile, HTTPException
from config import settings
def validate_file(file: UploadFile) -> Tuple[bool, str]:
"""
验证上传的文件
Args:
file: FastAPI上传文件对象
Returns:
(是否通过验证, 错误信息)
"""
# 检查文件名
if not file.filename:
return False, "文件名不能为空"
# 检查文件大小
if hasattr(file, 'size') and file.size > 1024 * 1024 * 1024: # 1GB = 1024^3 bytes
return False, f"文件大小超过限制 (1GB)"
# 检查文件扩展名
_, ext = os.path.splitext(file.filename.lower())
if ext not in [e.lower() for e in settings.allowed_extensions]:
return False, f"不支持的文件类型。支持的类型: {', '.join(settings.allowed_extensions)}"
return True, ""
def get_content_type(filename: str) -> str:
"""
根据文件名获取MIME类型
Args:
filename: 文件名
Returns:
MIME类型字符串
"""
content_type, _ = mimetypes.guess_type(filename)
return content_type or "application/octet-stream"
def format_file_size(size_bytes: int) -> str:
"""
格式化文件大小
Args:
size_bytes: 文件大小(字节)
Returns:
格式化后的文件大小字符串
"""
if size_bytes == 0:
return "0B"
size_names = ["B", "KB", "MB", "GB", "TB"]
i = 0
size = float(size_bytes)
while size >= 1024.0 and i < len(size_names) - 1:
size /= 1024.0
i += 1
return f"{size:.2f}{size_names[i]}"
def extract_object_key_from_url(file_url: str) -> str:
"""
从文件URL中提取对象键
Args:
file_url: 文件URL
Returns:
对象键
"""
# 假设URL格式为: https://bucket.endpoint.com/object_key
try:
return file_url.split('/')[-1]
except:
return ""
def sanitize_filename(filename: str) -> str:
"""
清理文件名,移除特殊字符
Args:
filename: 原始文件名
Returns:
清理后的文件名
"""
# 移除或替换特殊字符
invalid_chars = '<>:"/\\|?*'
for char in invalid_chars:
filename = filename.replace(char, '_')
return filename.strip()