120 lines
4.2 KiB
Python
120 lines
4.2 KiB
Python
import json
|
||
import logging
|
||
import time
|
||
import uuid
|
||
import oss2
|
||
from aliyunsdkcore.client import AcsClient
|
||
from aliyunsdkcore.acs_exception.exceptions import ClientException, ServerException
|
||
# 尝试导入最新的 API 版本,如果有问题可能需要调整
|
||
try:
|
||
from aliyunsdktingwu.request.v20230930 import CreateTaskRequest, GetTaskInfoRequest
|
||
except ImportError:
|
||
# Fallback or error handling if version differs
|
||
pass
|
||
|
||
from django.conf import settings
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
class AliyunTingwuService:
|
||
def __init__(self):
|
||
self.access_key_id = settings.ALIYUN_ACCESS_KEY_ID
|
||
self.access_key_secret = settings.ALIYUN_ACCESS_KEY_SECRET
|
||
self.oss_bucket_name = settings.ALIYUN_OSS_BUCKET_NAME
|
||
self.oss_endpoint = settings.ALIYUN_OSS_ENDPOINT
|
||
self.tingwu_app_key = settings.ALIYUN_TINGWU_APP_KEY
|
||
self.region_id = "cn-beijing" # 听悟服务主要在北京
|
||
|
||
# 初始化 OSS Bucket
|
||
if self.access_key_id and self.access_key_secret and self.oss_endpoint:
|
||
auth = oss2.Auth(self.access_key_id, self.access_key_secret)
|
||
self.bucket = oss2.Bucket(auth, self.oss_endpoint, self.oss_bucket_name)
|
||
else:
|
||
self.bucket = None
|
||
logger.warning("Aliyun OSS configuration missing.")
|
||
|
||
# 初始化听悟 Client
|
||
if self.access_key_id and self.access_key_secret:
|
||
self.client = AcsClient(
|
||
self.access_key_id,
|
||
self.access_key_secret,
|
||
self.region_id
|
||
)
|
||
else:
|
||
self.client = None
|
||
logger.warning("Aliyun AccessKey configuration missing.")
|
||
|
||
def upload_to_oss(self, file_obj, file_name):
|
||
"""
|
||
上传文件到 OSS 并返回带签名的 URL (有效期 3 小时)
|
||
"""
|
||
if not self.bucket:
|
||
raise Exception("OSS Client not initialized")
|
||
|
||
try:
|
||
# 上传文件
|
||
# file_obj 应该是打开的文件对象或字节流
|
||
self.bucket.put_object(file_name, file_obj)
|
||
|
||
# 生成签名 URL,有效期 3 小时 (3600 * 3)
|
||
url = self.bucket.sign_url('GET', file_name, 3600 * 3)
|
||
return url
|
||
except Exception as e:
|
||
logger.error(f"OSS Upload failed: {e}")
|
||
raise e
|
||
|
||
def create_transcription_task(self, file_url, language="cn"):
|
||
"""
|
||
创建听悟转写任务
|
||
"""
|
||
if not self.client:
|
||
raise Exception("Tingwu Client not initialized")
|
||
|
||
request = CreateTaskRequest.CreateTaskRequest()
|
||
|
||
# 针对阿里云 SDK 不同版本的兼容性处理
|
||
# 优先使用 query param 方式,因为这是更通用的 API 请求方式
|
||
request.add_query_param('AppKey', self.tingwu_app_key)
|
||
|
||
# 配置 Input
|
||
input_param = {
|
||
"FileUrl": file_url,
|
||
"SourceLanguage": language,
|
||
"TaskKey": str(uuid.uuid4())
|
||
}
|
||
request.add_query_param('Input', json.dumps(input_param))
|
||
|
||
# 配置 Parameters (开启自动章节和摘要)
|
||
parameters = {
|
||
"Transcoding": {
|
||
"TargetAudioFormat": "mp3"
|
||
},
|
||
"AutoChaptersEnabled": True,
|
||
"SummarizationEnabled": True
|
||
}
|
||
request.add_query_param('Parameters', json.dumps(parameters))
|
||
|
||
try:
|
||
response = self.client.do_action_with_exception(request)
|
||
return json.loads(response)
|
||
except (ClientException, ServerException) as e:
|
||
logger.error(f"Tingwu CreateTask failed: {e}")
|
||
raise e
|
||
|
||
def get_task_info(self, task_id):
|
||
"""
|
||
查询任务状态和结果
|
||
"""
|
||
if not self.client:
|
||
raise Exception("Tingwu Client not initialized")
|
||
|
||
request = GetTaskInfoRequest.GetTaskInfoRequest()
|
||
request.set_TaskId(task_id)
|
||
|
||
try:
|
||
response = self.client.do_action_with_exception(request)
|
||
return json.loads(response)
|
||
except (ClientException, ServerException) as e:
|
||
logger.error(f"Tingwu GetTaskInfo failed: {e}")
|
||
raise e
|