Files
market_page/backend/ai_services/views.py
jeremygan2021 7612c09571
All checks were successful
Deploy to Server / deploy (push) Successful in 1m8s
tingwu_new
2026-03-11 20:46:25 +08:00

249 lines
9.9 KiB
Python

import logging
import uuid
from rest_framework import viewsets, status
from rest_framework.decorators import action, api_view, permission_classes, parser_classes
from rest_framework.response import Response
from rest_framework.parsers import MultiPartParser, FormParser, JSONParser
from rest_framework.permissions import AllowAny
from django.conf import settings
from drf_spectacular.utils import extend_schema, OpenApiParameter, OpenApiTypes
from .models import TranscriptionTask, AIEvaluation
from .serializers import TranscriptionTaskSerializer, TranscriptionUploadSerializer, AIEvaluationSerializer
from .services import AliyunTingwuService
logger = logging.getLogger(__name__)
@api_view(['POST'])
@permission_classes([AllowAny])
def tingwu_callback(request):
"""
处理阿里云听悟的回调消息
"""
data = request.data
logger.info(f"收到听悟回调: {data}")
# 1. 处理连通性测试消息
# 格式: {"Code": "0", "Data": {"Test": "..."}, "Message": "success", "RequestId": "..."}
if isinstance(data, dict) and 'Data' in data and 'Test' in data['Data']:
logger.info("收到听悟连通性测试请求")
return Response({'message': 'success'}, status=status.HTTP_200_OK)
# 2. 处理任务完成消息 (根据实际文档或后续调试完善)
# 通常会包含 TaskId 和 Status
# 注意:阿里云听悟回调的结构可能在 Header 或 Body 中不同,需根据实际情况调整
# 这里是一个通用的处理逻辑
task_id = data.get('TaskId')
task_status = data.get('Status')
if task_id:
try:
task = TranscriptionTask.objects.filter(task_id=task_id).first()
if task:
if task_status == 'COMPLETE':
logger.info(f"任务 {task_id} 完成,等待下一次查询刷新")
# 可以在这里直接调用 get_task_info 刷新数据,但要注意超时
elif task_status == 'FAILED':
task.status = TranscriptionTask.Status.FAILED
task.error_message = data.get('StatusText', 'Callback reported failure')
task.save()
else:
logger.warning(f"回调收到未知任务ID: {task_id}")
except Exception as e:
logger.error(f"处理回调异常: {e}")
return Response({'message': 'success'}, status=status.HTTP_200_OK)
class TranscriptionTaskViewSet(viewsets.ModelViewSet):
queryset = TranscriptionTask.objects.all()
serializer_class = TranscriptionTaskSerializer
parser_classes = (MultiPartParser, FormParser)
@extend_schema(
request={
'multipart/form-data': {
'type': 'object',
'properties': {
'file': {
'type': 'string',
'format': 'binary'
},
'file_url': {
'type': 'string',
'description': '音频文件的URL地址'
}
}
}
},
responses={201: TranscriptionTaskSerializer}
)
def create(self, request, *args, **kwargs):
"""
上传音频文件并创建听悟转写任务
"""
file_obj = request.FILES.get('file')
file_url = request.data.get('file_url')
if not file_obj and not file_url:
return Response({'error': '请提供文件或文件URL'}, status=status.HTTP_400_BAD_REQUEST)
service = AliyunTingwuService()
if not service.bucket or not service.client:
return Response({'error': '阿里云服务未配置'}, status=status.HTTP_503_SERVICE_UNAVAILABLE)
try:
oss_url = None
if file_obj:
# 1. 上传文件到 OSS
file_extension = file_obj.name.split('.')[-1]
file_name = f"transcription/{uuid.uuid4()}.{file_extension}"
# 使用服务上传
oss_url = service.upload_to_oss(file_obj, file_name)
else:
# 使用提供的 URL
oss_url = file_url
# 2. 创建数据库记录
task_record = TranscriptionTask.objects.create(
file_url=oss_url,
status=TranscriptionTask.Status.PENDING
)
# 3. 调用听悟接口创建任务
try:
tingwu_response = service.create_transcription_task(oss_url)
# 兼容处理响应结构,通常为 {"Data": {"TaskId": "...", ...}}
if 'Data' in tingwu_response and isinstance(tingwu_response['Data'], dict):
task_id = tingwu_response['Data'].get('TaskId')
else:
task_id = tingwu_response.get('TaskId')
if task_id:
task_record.task_id = task_id
task_record.status = TranscriptionTask.Status.PROCESSING
task_record.save()
else:
task_record.status = TranscriptionTask.Status.FAILED
task_record.error_message = "未能获取 TaskId"
task_record.save()
return Response({'error': '未能获取 TaskId'}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
except Exception as e:
task_record.status = TranscriptionTask.Status.FAILED
task_record.error_message = str(e)
task_record.save()
logger.error(f"创建听悟任务失败: {e}")
return Response({'error': f"创建听悟任务失败: {str(e)}"}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
serializer = self.get_serializer(task_record)
return Response(serializer.data, status=status.HTTP_201_CREATED)
except Exception as e:
logger.error(f"处理上传请求失败: {e}")
return Response({'error': str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
@action(detail=True, methods=['post'])
@extend_schema(
request={
'application/json': {
'type': 'object',
'properties': {
'model_selection': {'type': 'string', 'description': '模型选择'},
'prompt': {'type': 'string', 'description': '评分提示词'},
}
}
},
responses={200: AIEvaluationSerializer}
)
def evaluate(self, request, pk=None):
"""
触发AI评估
"""
task = self.get_object()
# 1. 检查或创建 Evaluation 对象
evaluation, created = AIEvaluation.objects.get_or_create(task=task)
# 2. 如果请求中有配置,更新配置
model_selection = request.data.get('model_selection')
prompt = request.data.get('prompt')
updated = False
if model_selection:
evaluation.model_selection = model_selection
updated = True
if prompt:
evaluation.prompt = prompt
updated = True
if updated:
evaluation.save()
# 3. 调用 Service 执行评估
from .bailian_service import BailianService
service = BailianService()
service.evaluate_task(evaluation)
serializer = AIEvaluationSerializer(evaluation)
return Response(serializer.data)
@action(detail=True, methods=['get'])
@extend_schema(
parameters=[
OpenApiParameter("id", OpenApiTypes.UUID, OpenApiParameter.PATH, description="Task ID"),
],
responses={200: TranscriptionTaskSerializer}
)
def refresh_status(self, request, pk=None):
"""
刷新任务状态并获取结果
"""
task = self.get_object()
# 如果任务已经完成或失败,但逐字稿为空,允许重新刷新
if task.status == TranscriptionTask.Status.SUCCEEDED and not task.transcription:
pass # 继续执行刷新逻辑
elif task.status in [TranscriptionTask.Status.SUCCEEDED, TranscriptionTask.Status.FAILED]:
serializer = self.get_serializer(task)
return Response(serializer.data)
if not task.task_id:
return Response({'error': '任务ID不存在'}, status=status.HTTP_400_BAD_REQUEST)
service = AliyunTingwuService()
try:
result = service.get_task_info(task.task_id)
# 兼容处理响应结构 {"Data": {"TaskStatus": "...", "Result": ...}}
# 有些情况下 SDK 返回的是 JSON 字符串,需要二次解析
if isinstance(result, str):
import json
try:
result = json.loads(result)
except:
pass
if isinstance(result, dict):
data_obj = result.get('Data', result)
else:
data_obj = result
if not isinstance(data_obj, dict):
# 如果 Data 不是字典,可能它本身就是字符串,或者 result 结构更平铺
data_obj = result
# 防御性编程:确保 data_obj 是字典
if not isinstance(data_obj, dict):
logger.error(f"Unexpected response format: {type(data_obj)} - {data_obj}")
return Response({'error': f"Unexpected response format: {type(data_obj)}"}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
# 调用 Service 进行解析和更新
service.parse_and_update_task(task, result)
serializer = self.get_serializer(task)
return Response(serializer.data)
except Exception as e:
logger.error(f"刷新任务状态失败: {e}")
return Response({'error': str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)