From f41fd0136746b50e74e75a229539a3eb459097f9 Mon Sep 17 00:00:00 2001 From: jeremygan2021 Date: Wed, 11 Mar 2026 20:41:49 +0800 Subject: [PATCH] tingwu_new --- .../commands/poll_transcription_results.py | 63 +++++++ backend/ai_services/services.py | 145 ++++++++++++++++ backend/ai_services/views.py | 162 +----------------- 3 files changed, 210 insertions(+), 160 deletions(-) create mode 100644 backend/ai_services/management/commands/poll_transcription_results.py diff --git a/backend/ai_services/management/commands/poll_transcription_results.py b/backend/ai_services/management/commands/poll_transcription_results.py new file mode 100644 index 0000000..16d2620 --- /dev/null +++ b/backend/ai_services/management/commands/poll_transcription_results.py @@ -0,0 +1,63 @@ +import time +import logging +from django.core.management.base import BaseCommand +from ai_services.models import TranscriptionTask +from ai_services.services import AliyunTingwuService + +logger = logging.getLogger(__name__) + +class Command(BaseCommand): + help = 'Polls Aliyun Tingwu for transcription results every 10 seconds' + + def handle(self, *args, **options): + self.stdout.write(self.style.SUCCESS('Starting polling service...')) + service = AliyunTingwuService() + + while True: + try: + # Find tasks that are PENDING or PROCESSING + # Include PENDING because create() might set it to PENDING initially + # though usually it sets to PROCESSING if task_id is obtained. + # Just in case. + tasks = TranscriptionTask.objects.filter( + status__in=[TranscriptionTask.Status.PENDING, TranscriptionTask.Status.PROCESSING] + ).exclude(task_id__isnull=True).exclude(task_id='') + + count = tasks.count() + if count > 0: + self.stdout.write(f'Found {count} pending/processing tasks.') + + for task in tasks: + self.stdout.write(f'Checking task {task.task_id} (Status: {task.status})...') + try: + result = service.get_task_info(task.task_id) + + # Store old status to check for changes + old_status = task.status + + service.parse_and_update_task(task, result) + + # Re-fetch or check updated object + if task.status != old_status: + if task.status == TranscriptionTask.Status.SUCCEEDED: + self.stdout.write(self.style.SUCCESS(f'Task {task.task_id} SUCCEEDED')) + elif task.status == TranscriptionTask.Status.FAILED: + self.stdout.write(self.style.ERROR(f'Task {task.task_id} FAILED: {task.error_message}')) + else: + # Still processing + pass + + except Exception as e: + logger.error(f"Error checking task {task.task_id}: {e}") + self.stdout.write(self.style.ERROR(f"Error checking task {task.task_id}: {e}")) + + # Wait for 10 seconds + time.sleep(10) + + except KeyboardInterrupt: + self.stdout.write(self.style.SUCCESS('Stopping polling service...')) + break + except Exception as e: + logger.error(f"Polling loop error: {e}") + self.stdout.write(self.style.ERROR(f"Polling loop error: {e}")) + time.sleep(10) diff --git a/backend/ai_services/services.py b/backend/ai_services/services.py index 4fe8645..3237c3b 100644 --- a/backend/ai_services/services.py +++ b/backend/ai_services/services.py @@ -150,3 +150,148 @@ class AliyunTingwuService: except (ClientException, ServerException) as e: logger.error(f"Tingwu GetTaskInfo failed: {e}") raise e + + def parse_and_update_task(self, task, result): + """ + 解析听悟结果并更新任务 + :param task: TranscriptionTask 实例 + :param result: get_task_info 返回的完整 JSON (或 Data 部分) + """ + # 1. 提取 Data 对象 + if isinstance(result, dict): + data_obj = result.get('Data', result) + else: + data_obj = result + + if not isinstance(data_obj, dict): + logger.error(f"Unexpected data format: {type(data_obj)}") + return + + # 2. 更新状态 + task_status = data_obj.get('TaskStatus') or data_obj.get('Status') + if task_status in ['COMPLETE', 'COMPLETED', 'SUCCEEDED']: + task.status = 'SUCCEEDED' # 使用字符串引用,避免导入模型循环引用 + elif task_status == 'FAILED': + task.status = 'FAILED' + task.error_message = data_obj.get('TaskStatusText', data_obj.get('Message', 'Unknown error')) + task.save() + return + else: + # 仍在处理中,不更新内容 + return + + # 3. 解析结果 + task_result = data_obj.get('Result', {}) + + # --- A. 处理逐字稿 (Transcription) --- + transcription_data = task_result.get('Transcription', {}) + + # 处理 URL 下载 + if isinstance(transcription_data, str) and transcription_data.startswith('http'): + try: + import requests + t_resp = requests.get(transcription_data) + if t_resp.status_code == 200: + transcription_data = t_resp.json() + except Exception as e: + logger.error(f"Download transcription failed: {e}") + transcription_data = {} + elif isinstance(transcription_data, dict) and 'TranscriptionUrl' in transcription_data: + try: + import requests + t_resp = requests.get(transcription_data['TranscriptionUrl']) + if t_resp.status_code == 200: + transcription_data = t_resp.json() + except Exception as e: + logger.error(f"Download transcription url failed: {e}") + + # 保存原始数据 + task.transcription_data = transcription_data + + # 提取文本 + # 结构: {"Transcription": {"Paragraphs": [{"Words": [{"Text": "..."}]}]}} + # 或直接 {"Paragraphs": ...} + content_source = transcription_data + if 'Transcription' in content_source and isinstance(content_source['Transcription'], dict): + content_source = content_source['Transcription'] + + paragraphs = content_source.get('Paragraphs', []) + full_text_lines = [] + + if paragraphs and isinstance(paragraphs, list): + for p in paragraphs: + # 尝试从 Words 中提取 + words = p.get('Words', []) + if words: + line_text = "".join([str(w.get('Text', '')) for w in words]) + full_text_lines.append(line_text) + # 兼容旧结构或直接 Text + elif 'Text' in p: + full_text_lines.append(p['Text']) + + if full_text_lines: + task.transcription = "\n".join(full_text_lines) + + # --- B. 处理 AI 总结 (Summarization) --- + summarization = task_result.get('Summarization', {}) + + # 处理 URL 下载 + if isinstance(summarization, str) and summarization.startswith('http'): + try: + import requests + s_resp = requests.get(summarization) + if s_resp.status_code == 200: + summarization = s_resp.json() + except Exception as e: + logger.error(f"Download summarization failed: {e}") + summarization = {} + + # 保存原始数据 + task.summary_data = summarization + + # 提取文本 (MindMapSummary) + # 结构: {"MindMapSummary": [{"Title": "...", "Topic": [...]}]} + summary_text = [] + + def parse_mindmap_topic(topic_list, level=0): + indent = " " * level + for topic in topic_list: + title = topic.get('Title', '') + if title: + summary_text.append(f"{indent}- {title}") + + sub_topics = topic.get('Topic', []) + if sub_topics: + parse_mindmap_topic(sub_topics, level + 1) + + if 'MindMapSummary' in summarization: + parse_mindmap_topic(summarization['MindMapSummary']) + elif 'Text' in summarization: + summary_text.append(summarization['Text']) + elif 'Headline' in summarization: + summary_text.append(summarization['Headline']) + + if summary_text: + task.summary = "\n".join(summary_text) + + # --- C. 处理章节 (AutoChapters) --- + auto_chapters = task_result.get('AutoChapters', []) + + # 处理 URL 下载 + if isinstance(auto_chapters, str) and auto_chapters.startswith('http'): + try: + import requests + ac_resp = requests.get(auto_chapters) + if ac_resp.status_code == 200: + auto_chapters = ac_resp.json() + except Exception as e: + logger.error(f"Download auto chapters failed: {e}") + auto_chapters = [] + + # 保存原始数据 + task.auto_chapters_data = auto_chapters + + # (可选) 将章节信息追加到 summary 或 evaluation 中,或者仅保存 raw data + # 根据用户需求,这里主要保存到 model 的 auto_chapters_data 字段 (已在 models.py 定义) + + task.save() diff --git a/backend/ai_services/views.py b/backend/ai_services/views.py index ee2b479..0be4ca4 100644 --- a/backend/ai_services/views.py +++ b/backend/ai_services/views.py @@ -192,166 +192,8 @@ class TranscriptionTaskViewSet(viewsets.ModelViewSet): logger.error(f"Unexpected response format: {type(data_obj)} - {data_obj}") return Response({'error': f"Unexpected response format: {type(data_obj)}"}, status=status.HTTP_500_INTERNAL_SERVER_ERROR) - task_status = data_obj.get('TaskStatus') - - # 兼容其他状态字段名 - if not task_status: - task_status = data_obj.get('Status') - - if task_status == 'COMPLETE' or task_status == 'COMPLETED' or task_status == 'SUCCEEDED': - task.status = TranscriptionTask.Status.SUCCEEDED - - # 解析结果 - task_result = data_obj.get('Result', {}) - logger.info(f"Task result keys: {task_result.keys()}") - - # 提取逐字稿 - transcription_data = task_result.get('Transcription', {}) - logger.info(f"Raw transcription data type: {type(transcription_data)}") - - # 如果是 URL (字符串),尝试下载内容 - if isinstance(transcription_data, str) and transcription_data.startswith('http'): - try: - import requests - logger.info(f"Downloading transcription from {transcription_data}") - t_resp = requests.get(transcription_data) - if t_resp.status_code == 200: - transcription_data = t_resp.json() - logger.info(f"Downloaded transcription keys: {transcription_data.keys() if isinstance(transcription_data, dict) else 'Not a dict'}") - # 保存原始数据 - task.transcription_data = transcription_data - else: - logger.warning(f"Failed to download transcription: {t_resp.status_code}") - transcription_data = {} - except Exception as e: - logger.error(f"Error downloading transcription: {e}") - transcription_data = {} - elif isinstance(transcription_data, dict) and 'TranscriptionUrl' in transcription_data: - # 有些情况 Transcription 还是对象,但内容在 Url 字段 - try: - import requests - url = transcription_data['TranscriptionUrl'] - logger.info(f"Downloading transcription from {url}") - t_resp = requests.get(url) - if t_resp.status_code == 200: - transcription_data = t_resp.json() - logger.info(f"Downloaded transcription keys: {transcription_data.keys() if isinstance(transcription_data, dict) else 'Not a dict'}") - # 保存原始数据 - task.transcription_data = transcription_data - except Exception as e: - logger.error(f"Error downloading transcription nested url: {e}") - - if isinstance(transcription_data, dict): - # 确定包含实际内容的字典源 - content_source = transcription_data - - # 关键修复: - # 阿里云返回的 JSON 可能是 {"Transcription": {"Sentences": ...}} 也可能是 {"Sentences": ...} - # 之前的逻辑虽然尝试了 content_source = transcription_data['Transcription'],但如果 key 不存在会报错 - # 且如果是 {"TaskId": "...", "Transcription": {"Sentences": ...}} 这种结构,需要先剥离外层 - - # 尝试找到真正的 sentences/paragraphs 所在的字典 - # 优先查找 'Transcription' 键,如果它对应的是字典,那么数据很可能在里面 - if 'Transcription' in content_source and isinstance(content_source['Transcription'], dict): - content_source = content_source['Transcription'] - logger.info(f"Drilled down to nested 'Transcription' key. Keys: {content_source.keys()}") - - # 尝试提取 Sentences - sentences = content_source.get('Sentences', []) - - # 尝试提取 Paragraphs - paragraphs_data = content_source.get('Paragraphs', []) - - if sentences: - full_text = " ".join([s.get('Text', '') for s in sentences]) - task.transcription = full_text - elif paragraphs_data: - # 处理 Paragraphs - para_list = [] - if isinstance(paragraphs_data, dict): - # 有时结构是 {"Paragraphs": {"Paragraphs": [...]}} 或者 {"Paragraphs": [...]} - para_list = paragraphs_data.get('Paragraphs', []) - if not para_list and isinstance(paragraphs_data, list): - para_list = paragraphs_data - elif isinstance(paragraphs_data, list): - para_list = paragraphs_data - - if para_list: - texts = [] - for p in para_list: - if 'Text' in p: - texts.append(p['Text']) - elif 'Sentences' in p: - for s in p['Sentences']: - if 'Text' in s: - texts.append(s['Text']) - task.transcription = "\n".join(texts) - logger.info(f"Extracted {len(texts)} paragraphs") - else: - logger.warning(f"Paragraphs found but failed to extract list. Type: {type(paragraphs_data)}") - else: - logger.warning(f"Could not find Sentences or Paragraphs in content source. Keys: {content_source.keys()}") - - # 提取总结 - # 总结结果结构可能因配置不同而异,这里尝试获取摘要 - summarization = task_result.get('Summarization', {}) - - # 如果是 URL (字符串),尝试下载内容 - if isinstance(summarization, str) and summarization.startswith('http'): - try: - import requests - logger.info(f"Downloading summarization from {summarization}") - s_resp = requests.get(summarization) - if s_resp.status_code == 200: - summarization = s_resp.json() - # 保存原始数据 - task.summary_data = summarization - else: - logger.warning(f"Failed to download summarization: {s_resp.status_code}") - summarization = {} - except Exception as e: - logger.error(f"Error downloading summarization: {e}") - summarization = {} - - # 听悟的总结通常在 Summarization.Text 或类似字段 - # 如果是章节摘要,可能在 Chapters 中 - # 假设是全文摘要 - if 'Text' in summarization: - task.summary = summarization['Text'] - elif 'Headline' in summarization: - task.summary = summarization['Headline'] - else: - # 尝试从章节摘要中提取 - chapters = task_result.get('Chapters', []) - # 处理 AutoChapters - auto_chapters = task_result.get('AutoChapters', {}) - if isinstance(auto_chapters, str) and auto_chapters.startswith('http'): - try: - import requests - logger.info(f"Downloading auto chapters from {auto_chapters}") - ac_resp = requests.get(auto_chapters) - if ac_resp.status_code == 200: - auto_chapters = ac_resp.json() - task.auto_chapters_data = auto_chapters - except Exception as e: - logger.error(f"Error downloading auto chapters: {e}") - - summary_parts = [] - for chapter in chapters: - if 'Headline' in chapter: - summary_parts.append(chapter['Headline']) - if 'Summary' in chapter: - summary_parts.append(chapter['Summary']) - task.summary = "\n".join(summary_parts) - - task.save() - - elif task_status == 'FAILED': - task.status = TranscriptionTask.Status.FAILED - task.error_message = data_obj.get('TaskStatusText', result.get('Message', 'Unknown error')) - task.save() - - # 其他状态 (PENDING, RUNNING) 不做更改 + # 调用 Service 进行解析和更新 + service.parse_and_update_task(task, result) serializer = self.get_serializer(task) return Response(serializer.data)