Files
market_page/backend/ai_services/services.py
jeremygan2021 926a9e7b5f
All checks were successful
Deploy to Server / deploy (push) Successful in 5s
tingwu_new
2026-03-11 15:10:40 +08:00

148 lines
5.7 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import json
import logging
import time
import uuid
import oss2
from aliyunsdkcore.client import AcsClient
from aliyunsdkcore.acs_exception.exceptions import ClientException, ServerException
# 尝试导入最新的 API 版本,如果有问题可能需要调整
try:
from aliyunsdktingwu.request.v20230930 import CreateTaskRequest, GetTaskInfoRequest
except ImportError:
# Fallback or error handling if version differs
pass
from django.conf import settings
logger = logging.getLogger(__name__)
class AliyunTingwuService:
def __init__(self):
self.access_key_id = settings.ALIYUN_ACCESS_KEY_ID
self.access_key_secret = settings.ALIYUN_ACCESS_KEY_SECRET
self.oss_bucket_name = settings.ALIYUN_OSS_BUCKET_NAME
self.oss_endpoint = settings.ALIYUN_OSS_ENDPOINT
self.tingwu_app_key = settings.ALIYUN_TINGWU_APP_KEY
self.region_id = "cn-shanghai" # 听悟服务区域根据文档应与OSS区域一致或者使用 'cn-beijing'
# 初始化 OSS Bucket
if self.access_key_id and self.access_key_secret and self.oss_endpoint:
auth = oss2.Auth(self.access_key_id, self.access_key_secret)
self.bucket = oss2.Bucket(auth, self.oss_endpoint, self.oss_bucket_name)
else:
self.bucket = None
logger.warning("Aliyun OSS configuration missing.")
# 初始化听悟 Client
if self.access_key_id and self.access_key_secret:
self.client = AcsClient(
self.access_key_id,
self.access_key_secret,
self.region_id
)
# 显式添加听悟服务的 Endpoint 映射,解决 EndpointResolvingError
# 听悟 API 的服务接入点通常是 tingwu.cn-beijing.aliyuncs.com
# 但新版听悟 API (tingwu.aliyuncs.com) 可能不同,需根据实际情况添加
# 这里添加一个通用的 Endpoint 映射
try:
# 尝试为 tingwu 产品设置 Endpoint
# 注意听悟服务主要部署在北京Endpoint 通常为 tingwu.cn-beijing.aliyuncs.com
# 如果您的服务在上海,也可能需要连接到北京的接入点
self.client.add_endpoint(self.region_id, "tingwu", "tingwu.cn-beijing.aliyuncs.com")
except Exception as e:
logger.warning(f"Failed to add endpoint: {e}")
else:
self.client = None
logger.warning("Aliyun AccessKey configuration missing.")
def upload_to_oss(self, file_obj, file_name):
"""
上传文件到 OSS 并返回带签名的 URL (有效期 3 小时)
"""
if not self.bucket:
raise Exception("OSS Client not initialized")
try:
# 上传文件
# file_obj 应该是打开的文件对象或字节流
self.bucket.put_object(file_name, file_obj)
# 生成签名 URL有效期 3 小时 (3600 * 3)
url = self.bucket.sign_url('GET', file_name, 3600 * 3)
return url
except Exception as e:
logger.error(f"OSS Upload failed: {e}")
raise e
def create_transcription_task(self, file_url, language="cn"):
"""
创建听悟转写任务
"""
if not self.client:
raise Exception("Tingwu Client not initialized")
request = CreateTaskRequest.CreateTaskRequest()
# 针对阿里云 SDK 不同版本的兼容性处理
# "type" 参数是听悟 API (ROA 风格) 的必填项,用于指定任务类型
# 根据官方文档,离线任务的 type 通常就是 'offline'
request.add_query_param('type', 'offline')
# 构造请求体 (Body)
# 根据听悟 API 文档AppKey, Input, Parameters 应位于 JSON Body 中
# 而不是 Query Parameter
body = {
"AppKey": self.tingwu_app_key,
"Input": {
"FileUrl": file_url,
"SourceLanguage": language,
"TaskKey": str(uuid.uuid4())
},
"Parameters": {
"Transcoding": {
"TargetAudioFormat": "mp3"
},
"AutoChaptersEnabled": True,
"SummarizationEnabled": True,
"Summarization": {
"Types": ["Paragraph", "Conversational", "QuestionsAnswering", "MindMap"]
}
}
}
# 设置 Body 内容
request.set_content(json.dumps(body))
request.add_header('Content-Type', 'application/json')
# 强制设置 Endpoint避免 SDK.EndpointResolvingError
# 听悟目前主要服务点在北京
request.set_endpoint("tingwu.cn-beijing.aliyuncs.com")
# 显式设置 Method 为 PUT
request.set_method('PUT')
try:
response = self.client.do_action_with_exception(request)
return json.loads(response)
except (ClientException, ServerException) as e:
logger.error(f"Tingwu CreateTask failed: {e}")
raise e
def get_task_info(self, task_id):
"""
查询任务状态和结果
"""
if not self.client:
raise Exception("Tingwu Client not initialized")
request = GetTaskInfoRequest.GetTaskInfoRequest()
request.set_TaskId(task_id)
try:
response = self.client.do_action_with_exception(request)
return json.loads(response)
except (ClientException, ServerException) as e:
logger.error(f"Tingwu GetTaskInfo failed: {e}")
raise e