Files
sam3_local/human_analysis_service.py
2026-02-18 14:38:12 +08:00

473 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import os
import uuid
import time
import requests
import numpy as np
import json
import torch
import cv2
import ast
import re
from PIL import Image
from dashscope import MultiModalConversation
# 配置 (与 fastAPI_tarot.py 保持一致或通过参数传入)
# 这里的常量可以根据需要调整,或者从主文件传入
QWEN_MODEL = 'qwen-vl-max'
def load_image_from_url(url: str) -> Image.Image:
"""
从 URL 下载图片并转换为 RGB 格式
"""
try:
headers = {'User-Agent': 'Mozilla/5.0'}
response = requests.get(url, headers=headers, stream=True, timeout=10)
response.raise_for_status()
image = Image.open(response.raw).convert("RGB")
return image
except Exception as e:
raise Exception(f"无法下载图片: {str(e)}")
def crop_head_with_padding(image: Image.Image, box, padding_ratio=0.1) -> Image.Image:
"""
根据 bounding box 裁剪图片,并添加一定的 padding
box格式: [x1, y1, x2, y2]
"""
img_w, img_h = image.size
x1, y1, x2, y2 = box
w = x2 - x1
h = y2 - y1
# 计算 padding
pad_w = w * padding_ratio
pad_h = h * padding_ratio
# 应用 padding 并确保不越界
new_x1 = max(0, int(x1 - pad_w))
new_y1 = max(0, int(y1 - pad_h))
new_x2 = min(img_w, int(x2 + pad_w))
new_y2 = min(img_h, int(y2 + pad_h))
return image.crop((new_x1, new_y1, new_x2, new_y2))
def create_highlighted_visualization(image: Image.Image, masks, output_path: str):
"""
创建一个突出显示头部Mask区域的可视化图背景变暗
"""
# Convert PIL to numpy RGB
img_np = np.array(image)
# Create darkened background (e.g., 30% brightness)
darkened_np = (img_np * 0.3).astype(np.uint8)
# Combine all masks
if len(masks) > 0:
combined_mask = np.zeros(img_np.shape[:2], dtype=bool)
for mask in masks:
# Handle tensor/numpy conversions
if isinstance(mask, torch.Tensor):
m = mask.cpu().numpy().squeeze()
else:
m = mask.squeeze()
# Ensure 2D
if m.ndim > 2:
m = m[0]
# Threshold if probability or float
if m.dtype != bool:
m = m > 0.5
# Resize mask if it doesn't match image size (rare but possible with some internal resizing)
if m.shape != img_np.shape[:2]:
# resize to match image
m = cv2.resize(m.astype(np.uint8), (img_np.shape[1], img_np.shape[0]), interpolation=cv2.INTER_NEAREST).astype(bool)
combined_mask = np.logical_or(combined_mask, m)
# Expand mask to 3 channels for broadcasting
mask_3ch = np.stack([combined_mask]*3, axis=-1)
# Composite: Original where mask is True, Darkened where False
result_np = np.where(mask_3ch, img_np, darkened_np)
else:
result_np = darkened_np # No masks, just dark
# Save
Image.fromarray(result_np).save(output_path)
def extract_json_from_response(text: str) -> dict:
"""
Robustly extract JSON from text, handling:
1. Markdown code blocks (```json ... ```)
2. Single quotes (Python dict style) via ast.literal_eval
"""
try:
# 1. Try to find JSON block
json_match = re.search(r'```json\s*(.*?)\s*```', text, re.DOTALL)
if json_match:
clean_text = json_match.group(1).strip()
else:
# Try to find { ... } block if no markdown
match = re.search(r'\{.*\}', text, re.DOTALL)
if match:
clean_text = match.group(0).strip()
else:
clean_text = text.strip()
# 2. Try standard JSON
return json.loads(clean_text)
except Exception as e1:
# 3. Try ast.literal_eval for single quotes
try:
return ast.literal_eval(clean_text)
except Exception as e2:
# 4. Fail
raise ValueError(f"Could not parse JSON: {e1} | {e2} | Content: {text[:100]}...")
def analyze_demographics_with_qwen(image_path: str, model_name: str = 'qwen-vl-max', prompt_template: str = None) -> dict:
"""
调用 Qwen-VL 模型分析人物的年龄和性别
"""
try:
# 确保路径是绝对路径
abs_path = os.path.abspath(image_path)
file_url = f"file://{abs_path}"
# 默认 Prompt
default_prompt = """请仔细观察这张图片中的人物头部/面部特写:
1. 识别性别 (Gender):男性/女性
2. 预估年龄 (Age):请给出一个合理的年龄范围,例如 "25-30岁"
3. 简要描述:发型、发色、是否有眼镜等显著特征。
请以 JSON 格式返回,包含 'gender', 'age', 'description' 字段。
不要包含 Markdown 标记。"""
final_prompt = prompt_template if prompt_template else default_prompt
# 构造 Prompt
messages = [
{
"role": "user",
"content": [
{"image": file_url},
{"text": final_prompt}
]
}
]
# 调用模型
response = MultiModalConversation.call(model=model_name, messages=messages)
if response.status_code == 200:
content = response.output.choices[0].message.content[0]['text']
try:
result = extract_json_from_response(content)
result["model_used"] = model_name
return result
except Exception as e:
print(f"JSON Parse Error in face analysis: {e}")
return {"raw_analysis": content, "error": str(e), "model_used": model_name}
else:
return {"error": f"API Error: {response.code} - {response.message}"}
except Exception as e:
return {"error": f"分析失败: {str(e)}"}
import asyncio
def process_face_segmentation_and_analysis(
processor,
image: Image.Image,
prompt: str = "head",
output_base_dir: str = "static/results",
qwen_model: str = "qwen-vl-max",
analysis_prompt: str = None
) -> dict:
"""
核心处理逻辑:
1. SAM3 分割 (默认提示词 "head" 以包含头发)
2. 裁剪图片
3. Qwen-VL 识别性别年龄 (并发)
4. 返回结果
"""
# 1. SAM3 推理 (同步,因为涉及 GPU 操作)
inference_state = processor.set_image(image)
output = processor.set_text_prompt(state=inference_state, prompt=prompt)
masks, boxes, scores = output["masks"], output["boxes"], output["scores"]
detected_count = len(masks)
if detected_count == 0:
return {
"status": "success",
"message": "未检测到目标",
"detected_count": 0,
"results": []
}
# 准备结果目录
request_id = f"{int(time.time())}_{uuid.uuid4().hex[:8]}"
output_dir = os.path.join(output_base_dir, request_id)
os.makedirs(output_dir, exist_ok=True)
# --- 生成可视化图 ---
vis_filename = f"seg_{uuid.uuid4().hex}.jpg"
vis_path = os.path.join(output_dir, vis_filename)
try:
create_highlighted_visualization(image, masks, vis_path)
full_vis_relative_path = f"results/{request_id}/{vis_filename}"
except Exception as e:
print(f"可视化生成失败: {e}")
full_vis_relative_path = None
# ------------------
# 转换 boxes 和 scores
if isinstance(boxes, torch.Tensor):
boxes_np = boxes.cpu().numpy()
else:
boxes_np = boxes
if isinstance(scores, torch.Tensor):
scores_list = scores.tolist()
else:
scores_list = scores if isinstance(scores, list) else [float(scores)]
# 准备异步任务
async def run_analysis_tasks():
loop = asyncio.get_event_loop()
tasks = []
temp_results = [] # 存储 (index, filename, score) 以便后续排序组合
for i, box in enumerate(boxes_np):
# 2. 裁剪 (同步)
cropped_img = crop_head_with_padding(image, box, padding_ratio=0.1)
filename = f"face_{i}.jpg"
save_path = os.path.join(output_dir, filename)
cropped_img.save(save_path)
# 3. 准备识别任务
task = loop.run_in_executor(
None,
analyze_demographics_with_qwen,
save_path,
qwen_model,
analysis_prompt
)
tasks.append(task)
temp_results.append({
"filename": filename,
"relative_path": f"results/{request_id}/{filename}",
"score": float(scores_list[i]) if i < len(scores_list) else 0.0
})
# 等待所有任务完成
if tasks:
analysis_results = await asyncio.gather(*tasks)
else:
analysis_results = []
# 组合结果
final_results = []
for i, item in enumerate(temp_results):
item["analysis"] = analysis_results[i]
final_results.append(item)
return final_results
# 运行异步任务
# 注意:由于本函数被 FastAPI (异步环境) 中的同步或异步函数调用,
# 如果上层是 async def我们可以直接 await。
# 但由于这个函数定义没有 async且之前的调用是同步的
# 为了兼容性,我们需要检查当前是否在事件循环中。
# 然而,查看 fastAPI_tarot.py这个函数是在 async def segment_face 中被调用的。
# 但它是作为普通函数被导入和调用的。
# 为了不破坏现有签名,我们可以使用 asyncio.run() 或者在新循环中运行,
# 但这在已经运行的 loop 中是不允许的。
# 最佳方案:修改本函数为 async并在 fastAPI_tarot.py 中 await 它。
# 但这需要修改 fastAPI_tarot.py 的调用处。
# 既然我们已经修改了 fastAPI_tarot.py我们也可以顺便修改这里的签名。
# 但为了稳妥,我们可以用一种 hack
# 如果在一个正在运行的 loop 中调用,我们必须返回 awaitable 或者使用 loop.run_until_complete (会报错)
# 让我们先把这个函数改成 async然后去修改 fastAPI_tarot.py 的调用。
# 这是最正确的做法。
pass # 占位,实际代码在下面
async def process_face_segmentation_and_analysis_async(
processor,
image: Image.Image,
prompt: str = "head",
output_base_dir: str = "static/results",
qwen_model: str = "qwen-vl-max",
analysis_prompt: str = None
) -> dict:
# ... (同上逻辑,只是是 async)
# 1. SAM3 推理
inference_state = processor.set_image(image)
output = processor.set_text_prompt(state=inference_state, prompt=prompt)
masks, boxes, scores = output["masks"], output["boxes"], output["scores"]
detected_count = len(masks)
if detected_count == 0:
return {
"status": "success",
"message": "未检测到目标",
"detected_count": 0,
"results": []
}
request_id = f"{int(time.time())}_{uuid.uuid4().hex[:8]}"
output_dir = os.path.join(output_base_dir, request_id)
os.makedirs(output_dir, exist_ok=True)
vis_filename = f"seg_{uuid.uuid4().hex}.jpg"
vis_path = os.path.join(output_dir, vis_filename)
try:
create_highlighted_visualization(image, masks, vis_path)
full_vis_relative_path = f"results/{request_id}/{vis_filename}"
except Exception as e:
print(f"可视化生成失败: {e}")
full_vis_relative_path = None
if isinstance(boxes, torch.Tensor):
boxes_np = boxes.cpu().numpy()
else:
boxes_np = boxes
if isinstance(scores, torch.Tensor):
scores_list = scores.tolist()
else:
scores_list = scores if isinstance(scores, list) else [float(scores)]
loop = asyncio.get_event_loop()
tasks = []
results = []
for i, box in enumerate(boxes_np):
cropped_img = crop_head_with_padding(image, box, padding_ratio=0.1)
filename = f"face_{i}.jpg"
save_path = os.path.join(output_dir, filename)
cropped_img.save(save_path)
task = loop.run_in_executor(
None,
analyze_demographics_with_qwen,
save_path,
qwen_model,
analysis_prompt
)
tasks.append(task)
results.append({
"filename": filename,
"relative_path": f"results/{request_id}/{filename}",
"score": float(scores_list[i]) if i < len(scores_list) else 0.0
})
if tasks:
analysis_results = await asyncio.gather(*tasks)
else:
analysis_results = []
for i, item in enumerate(results):
item["analysis"] = analysis_results[i]
return {
"status": "success",
"message": f"成功检测并分析 {detected_count} 个人脸",
"detected_count": detected_count,
"request_id": request_id,
"full_visualization": full_vis_relative_path,
"scores": scores_list,
"results": results
}
# 保留旧的同步接口以兼容其他潜在调用者,但内部实现可能会有问题如果它在 loop 中运行
# 既然我们主要关注 fastAPI_tarot.py我们可以直接替换 process_face_segmentation_and_analysis
# 或者让它只是一个 wrapper
def process_face_segmentation_and_analysis(
processor,
image: Image.Image,
prompt: str = "head",
output_base_dir: str = "static/results",
qwen_model: str = "qwen-vl-max",
analysis_prompt: str = None
) -> dict:
"""
同步版本 (保留以兼容)
注意:如果在 async loop 中调用此函数,且此函数内部没有异步操作,则会阻塞 loop。
如果需要异步并发,请使用 process_face_segmentation_and_analysis_async
"""
# 这里我们简单地复用逻辑,但去除异步部分,退化为串行
# 1. SAM3 推理
inference_state = processor.set_image(image)
output = processor.set_text_prompt(state=inference_state, prompt=prompt)
masks, boxes, scores = output["masks"], output["boxes"], output["scores"]
detected_count = len(masks)
if detected_count == 0:
return {
"status": "success",
"message": "未检测到目标",
"detected_count": 0,
"results": []
}
request_id = f"{int(time.time())}_{uuid.uuid4().hex[:8]}"
output_dir = os.path.join(output_base_dir, request_id)
os.makedirs(output_dir, exist_ok=True)
vis_filename = f"seg_{uuid.uuid4().hex}.jpg"
vis_path = os.path.join(output_dir, vis_filename)
try:
create_highlighted_visualization(image, masks, vis_path)
full_vis_relative_path = f"results/{request_id}/{vis_filename}"
except Exception as e:
print(f"可视化生成失败: {e}")
full_vis_relative_path = None
if isinstance(boxes, torch.Tensor):
boxes_np = boxes.cpu().numpy()
else:
boxes_np = boxes
if isinstance(scores, torch.Tensor):
scores_list = scores.tolist()
else:
scores_list = scores if isinstance(scores, list) else [float(scores)]
results = []
for i, box in enumerate(boxes_np):
cropped_img = crop_head_with_padding(image, box, padding_ratio=0.1)
filename = f"face_{i}.jpg"
save_path = os.path.join(output_dir, filename)
cropped_img.save(save_path)
# 同步调用
analysis = analyze_demographics_with_qwen(save_path, model_name=qwen_model, prompt_template=analysis_prompt)
results.append({
"filename": filename,
"relative_path": f"results/{request_id}/{filename}",
"analysis": analysis,
"score": float(scores_list[i]) if i < len(scores_list) else 0.0
})
return {
"status": "success",
"message": f"成功检测并分析 {detected_count} 个人脸",
"detected_count": detected_count,
"request_id": request_id,
"full_visualization": full_vis_relative_path,
"scores": scores_list,
"results": results
}