tingwu_new
All checks were successful
Deploy to Server / deploy (push) Successful in 19s

This commit is contained in:
jeremygan2021
2026-03-11 20:41:49 +08:00
parent b0aa902f89
commit f41fd01367
3 changed files with 210 additions and 160 deletions

View File

@@ -0,0 +1,63 @@
import time
import logging
from django.core.management.base import BaseCommand
from ai_services.models import TranscriptionTask
from ai_services.services import AliyunTingwuService
logger = logging.getLogger(__name__)
class Command(BaseCommand):
help = 'Polls Aliyun Tingwu for transcription results every 10 seconds'
def handle(self, *args, **options):
self.stdout.write(self.style.SUCCESS('Starting polling service...'))
service = AliyunTingwuService()
while True:
try:
# Find tasks that are PENDING or PROCESSING
# Include PENDING because create() might set it to PENDING initially
# though usually it sets to PROCESSING if task_id is obtained.
# Just in case.
tasks = TranscriptionTask.objects.filter(
status__in=[TranscriptionTask.Status.PENDING, TranscriptionTask.Status.PROCESSING]
).exclude(task_id__isnull=True).exclude(task_id='')
count = tasks.count()
if count > 0:
self.stdout.write(f'Found {count} pending/processing tasks.')
for task in tasks:
self.stdout.write(f'Checking task {task.task_id} (Status: {task.status})...')
try:
result = service.get_task_info(task.task_id)
# Store old status to check for changes
old_status = task.status
service.parse_and_update_task(task, result)
# Re-fetch or check updated object
if task.status != old_status:
if task.status == TranscriptionTask.Status.SUCCEEDED:
self.stdout.write(self.style.SUCCESS(f'Task {task.task_id} SUCCEEDED'))
elif task.status == TranscriptionTask.Status.FAILED:
self.stdout.write(self.style.ERROR(f'Task {task.task_id} FAILED: {task.error_message}'))
else:
# Still processing
pass
except Exception as e:
logger.error(f"Error checking task {task.task_id}: {e}")
self.stdout.write(self.style.ERROR(f"Error checking task {task.task_id}: {e}"))
# Wait for 10 seconds
time.sleep(10)
except KeyboardInterrupt:
self.stdout.write(self.style.SUCCESS('Stopping polling service...'))
break
except Exception as e:
logger.error(f"Polling loop error: {e}")
self.stdout.write(self.style.ERROR(f"Polling loop error: {e}"))
time.sleep(10)

View File

@@ -150,3 +150,148 @@ class AliyunTingwuService:
except (ClientException, ServerException) as e: except (ClientException, ServerException) as e:
logger.error(f"Tingwu GetTaskInfo failed: {e}") logger.error(f"Tingwu GetTaskInfo failed: {e}")
raise e raise e
def parse_and_update_task(self, task, result):
"""
解析听悟结果并更新任务
:param task: TranscriptionTask 实例
:param result: get_task_info 返回的完整 JSON (或 Data 部分)
"""
# 1. 提取 Data 对象
if isinstance(result, dict):
data_obj = result.get('Data', result)
else:
data_obj = result
if not isinstance(data_obj, dict):
logger.error(f"Unexpected data format: {type(data_obj)}")
return
# 2. 更新状态
task_status = data_obj.get('TaskStatus') or data_obj.get('Status')
if task_status in ['COMPLETE', 'COMPLETED', 'SUCCEEDED']:
task.status = 'SUCCEEDED' # 使用字符串引用,避免导入模型循环引用
elif task_status == 'FAILED':
task.status = 'FAILED'
task.error_message = data_obj.get('TaskStatusText', data_obj.get('Message', 'Unknown error'))
task.save()
return
else:
# 仍在处理中,不更新内容
return
# 3. 解析结果
task_result = data_obj.get('Result', {})
# --- A. 处理逐字稿 (Transcription) ---
transcription_data = task_result.get('Transcription', {})
# 处理 URL 下载
if isinstance(transcription_data, str) and transcription_data.startswith('http'):
try:
import requests
t_resp = requests.get(transcription_data)
if t_resp.status_code == 200:
transcription_data = t_resp.json()
except Exception as e:
logger.error(f"Download transcription failed: {e}")
transcription_data = {}
elif isinstance(transcription_data, dict) and 'TranscriptionUrl' in transcription_data:
try:
import requests
t_resp = requests.get(transcription_data['TranscriptionUrl'])
if t_resp.status_code == 200:
transcription_data = t_resp.json()
except Exception as e:
logger.error(f"Download transcription url failed: {e}")
# 保存原始数据
task.transcription_data = transcription_data
# 提取文本
# 结构: {"Transcription": {"Paragraphs": [{"Words": [{"Text": "..."}]}]}}
# 或直接 {"Paragraphs": ...}
content_source = transcription_data
if 'Transcription' in content_source and isinstance(content_source['Transcription'], dict):
content_source = content_source['Transcription']
paragraphs = content_source.get('Paragraphs', [])
full_text_lines = []
if paragraphs and isinstance(paragraphs, list):
for p in paragraphs:
# 尝试从 Words 中提取
words = p.get('Words', [])
if words:
line_text = "".join([str(w.get('Text', '')) for w in words])
full_text_lines.append(line_text)
# 兼容旧结构或直接 Text
elif 'Text' in p:
full_text_lines.append(p['Text'])
if full_text_lines:
task.transcription = "\n".join(full_text_lines)
# --- B. 处理 AI 总结 (Summarization) ---
summarization = task_result.get('Summarization', {})
# 处理 URL 下载
if isinstance(summarization, str) and summarization.startswith('http'):
try:
import requests
s_resp = requests.get(summarization)
if s_resp.status_code == 200:
summarization = s_resp.json()
except Exception as e:
logger.error(f"Download summarization failed: {e}")
summarization = {}
# 保存原始数据
task.summary_data = summarization
# 提取文本 (MindMapSummary)
# 结构: {"MindMapSummary": [{"Title": "...", "Topic": [...]}]}
summary_text = []
def parse_mindmap_topic(topic_list, level=0):
indent = " " * level
for topic in topic_list:
title = topic.get('Title', '')
if title:
summary_text.append(f"{indent}- {title}")
sub_topics = topic.get('Topic', [])
if sub_topics:
parse_mindmap_topic(sub_topics, level + 1)
if 'MindMapSummary' in summarization:
parse_mindmap_topic(summarization['MindMapSummary'])
elif 'Text' in summarization:
summary_text.append(summarization['Text'])
elif 'Headline' in summarization:
summary_text.append(summarization['Headline'])
if summary_text:
task.summary = "\n".join(summary_text)
# --- C. 处理章节 (AutoChapters) ---
auto_chapters = task_result.get('AutoChapters', [])
# 处理 URL 下载
if isinstance(auto_chapters, str) and auto_chapters.startswith('http'):
try:
import requests
ac_resp = requests.get(auto_chapters)
if ac_resp.status_code == 200:
auto_chapters = ac_resp.json()
except Exception as e:
logger.error(f"Download auto chapters failed: {e}")
auto_chapters = []
# 保存原始数据
task.auto_chapters_data = auto_chapters
# (可选) 将章节信息追加到 summary 或 evaluation 中,或者仅保存 raw data
# 根据用户需求,这里主要保存到 model 的 auto_chapters_data 字段 (已在 models.py 定义)
task.save()

View File

@@ -192,166 +192,8 @@ class TranscriptionTaskViewSet(viewsets.ModelViewSet):
logger.error(f"Unexpected response format: {type(data_obj)} - {data_obj}") logger.error(f"Unexpected response format: {type(data_obj)} - {data_obj}")
return Response({'error': f"Unexpected response format: {type(data_obj)}"}, status=status.HTTP_500_INTERNAL_SERVER_ERROR) return Response({'error': f"Unexpected response format: {type(data_obj)}"}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
task_status = data_obj.get('TaskStatus') # 调用 Service 进行解析和更新
service.parse_and_update_task(task, result)
# 兼容其他状态字段名
if not task_status:
task_status = data_obj.get('Status')
if task_status == 'COMPLETE' or task_status == 'COMPLETED' or task_status == 'SUCCEEDED':
task.status = TranscriptionTask.Status.SUCCEEDED
# 解析结果
task_result = data_obj.get('Result', {})
logger.info(f"Task result keys: {task_result.keys()}")
# 提取逐字稿
transcription_data = task_result.get('Transcription', {})
logger.info(f"Raw transcription data type: {type(transcription_data)}")
# 如果是 URL (字符串),尝试下载内容
if isinstance(transcription_data, str) and transcription_data.startswith('http'):
try:
import requests
logger.info(f"Downloading transcription from {transcription_data}")
t_resp = requests.get(transcription_data)
if t_resp.status_code == 200:
transcription_data = t_resp.json()
logger.info(f"Downloaded transcription keys: {transcription_data.keys() if isinstance(transcription_data, dict) else 'Not a dict'}")
# 保存原始数据
task.transcription_data = transcription_data
else:
logger.warning(f"Failed to download transcription: {t_resp.status_code}")
transcription_data = {}
except Exception as e:
logger.error(f"Error downloading transcription: {e}")
transcription_data = {}
elif isinstance(transcription_data, dict) and 'TranscriptionUrl' in transcription_data:
# 有些情况 Transcription 还是对象,但内容在 Url 字段
try:
import requests
url = transcription_data['TranscriptionUrl']
logger.info(f"Downloading transcription from {url}")
t_resp = requests.get(url)
if t_resp.status_code == 200:
transcription_data = t_resp.json()
logger.info(f"Downloaded transcription keys: {transcription_data.keys() if isinstance(transcription_data, dict) else 'Not a dict'}")
# 保存原始数据
task.transcription_data = transcription_data
except Exception as e:
logger.error(f"Error downloading transcription nested url: {e}")
if isinstance(transcription_data, dict):
# 确定包含实际内容的字典源
content_source = transcription_data
# 关键修复:
# 阿里云返回的 JSON 可能是 {"Transcription": {"Sentences": ...}} 也可能是 {"Sentences": ...}
# 之前的逻辑虽然尝试了 content_source = transcription_data['Transcription'],但如果 key 不存在会报错
# 且如果是 {"TaskId": "...", "Transcription": {"Sentences": ...}} 这种结构,需要先剥离外层
# 尝试找到真正的 sentences/paragraphs 所在的字典
# 优先查找 'Transcription' 键,如果它对应的是字典,那么数据很可能在里面
if 'Transcription' in content_source and isinstance(content_source['Transcription'], dict):
content_source = content_source['Transcription']
logger.info(f"Drilled down to nested 'Transcription' key. Keys: {content_source.keys()}")
# 尝试提取 Sentences
sentences = content_source.get('Sentences', [])
# 尝试提取 Paragraphs
paragraphs_data = content_source.get('Paragraphs', [])
if sentences:
full_text = " ".join([s.get('Text', '') for s in sentences])
task.transcription = full_text
elif paragraphs_data:
# 处理 Paragraphs
para_list = []
if isinstance(paragraphs_data, dict):
# 有时结构是 {"Paragraphs": {"Paragraphs": [...]}} 或者 {"Paragraphs": [...]}
para_list = paragraphs_data.get('Paragraphs', [])
if not para_list and isinstance(paragraphs_data, list):
para_list = paragraphs_data
elif isinstance(paragraphs_data, list):
para_list = paragraphs_data
if para_list:
texts = []
for p in para_list:
if 'Text' in p:
texts.append(p['Text'])
elif 'Sentences' in p:
for s in p['Sentences']:
if 'Text' in s:
texts.append(s['Text'])
task.transcription = "\n".join(texts)
logger.info(f"Extracted {len(texts)} paragraphs")
else:
logger.warning(f"Paragraphs found but failed to extract list. Type: {type(paragraphs_data)}")
else:
logger.warning(f"Could not find Sentences or Paragraphs in content source. Keys: {content_source.keys()}")
# 提取总结
# 总结结果结构可能因配置不同而异,这里尝试获取摘要
summarization = task_result.get('Summarization', {})
# 如果是 URL (字符串),尝试下载内容
if isinstance(summarization, str) and summarization.startswith('http'):
try:
import requests
logger.info(f"Downloading summarization from {summarization}")
s_resp = requests.get(summarization)
if s_resp.status_code == 200:
summarization = s_resp.json()
# 保存原始数据
task.summary_data = summarization
else:
logger.warning(f"Failed to download summarization: {s_resp.status_code}")
summarization = {}
except Exception as e:
logger.error(f"Error downloading summarization: {e}")
summarization = {}
# 听悟的总结通常在 Summarization.Text 或类似字段
# 如果是章节摘要,可能在 Chapters 中
# 假设是全文摘要
if 'Text' in summarization:
task.summary = summarization['Text']
elif 'Headline' in summarization:
task.summary = summarization['Headline']
else:
# 尝试从章节摘要中提取
chapters = task_result.get('Chapters', [])
# 处理 AutoChapters
auto_chapters = task_result.get('AutoChapters', {})
if isinstance(auto_chapters, str) and auto_chapters.startswith('http'):
try:
import requests
logger.info(f"Downloading auto chapters from {auto_chapters}")
ac_resp = requests.get(auto_chapters)
if ac_resp.status_code == 200:
auto_chapters = ac_resp.json()
task.auto_chapters_data = auto_chapters
except Exception as e:
logger.error(f"Error downloading auto chapters: {e}")
summary_parts = []
for chapter in chapters:
if 'Headline' in chapter:
summary_parts.append(chapter['Headline'])
if 'Summary' in chapter:
summary_parts.append(chapter['Summary'])
task.summary = "\n".join(summary_parts)
task.save()
elif task_status == 'FAILED':
task.status = TranscriptionTask.Status.FAILED
task.error_message = data_obj.get('TaskStatusText', result.get('Message', 'Unknown error'))
task.save()
# 其他状态 (PENDING, RUNNING) 不做更改
serializer = self.get_serializer(task) serializer = self.get_serializer(task)
return Response(serializer.data) return Response(serializer.data)