Files
market_page/backend/community/views.py
jeremygan2021 9e81eaaaab forum
2026-02-12 15:02:53 +08:00

186 lines
7.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from rest_framework import viewsets, status, mixins, parsers, filters
from django_filters.rest_framework import DjangoFilterBackend
from rest_framework.decorators import action
from rest_framework.response import Response
from rest_framework import serializers, permissions
from django.core.signing import TimestampSigner, BadSignature, SignatureExpired
from django.utils import timezone
from django.db import models
from drf_spectacular.utils import extend_schema
from shop.models import WeChatUser
from .models import Activity, ActivitySignup, Topic, Reply, TopicMedia, Announcement
from .serializers import ActivitySerializer, ActivitySignupSerializer, TopicSerializer, ReplySerializer, TopicMediaSerializer, AnnouncementSerializer
def get_current_wechat_user(request):
"""
根据 Authorization 头获取当前微信用户 (复用 shop app 的逻辑)
"""
auth_header = request.headers.get('Authorization')
if not auth_header or not auth_header.startswith('Bearer '):
return None
token = auth_header.split(' ')[1]
signer = TimestampSigner()
try:
# 签名包含 openid
openid = signer.unsign(token, max_age=86400 * 30) # 30天有效
return WeChatUser.objects.filter(openid=openid).first()
except (BadSignature, SignatureExpired):
return None
class ActivityViewSet(viewsets.ReadOnlyModelViewSet):
"""
社区活动接口
"""
queryset = Activity.objects.filter(is_active=True).order_by('-created_at')
serializer_class = ActivitySerializer
@extend_schema(summary="报名活动")
@action(detail=True, methods=['post'])
def signup(self, request, pk=None):
user = get_current_wechat_user(request)
if not user:
return Response({'error': '请先登录'}, status=401)
activity = self.get_object()
# Check if already signed up
if ActivitySignup.objects.filter(activity=activity, user=user).exists():
return Response({'error': '您已报名该活动'}, status=400)
if activity.signups.count() >= activity.max_participants:
return Response({'error': '活动名额已满'}, status=400)
signup = ActivitySignup.objects.create(activity=activity, user=user)
serializer = ActivitySignupSerializer(signup)
return Response(serializer.data, status=201)
@extend_schema(summary="我的报名记录")
@action(detail=False, methods=['get'])
def my_signups(self, request):
user = get_current_wechat_user(request)
if not user:
return Response({'error': '请先登录'}, status=401)
signups = ActivitySignup.objects.filter(user=user).order_by('-signup_time')
serializer = ActivitySignupSerializer(signups, many=True)
return Response(serializer.data)
class TopicViewSet(viewsets.ModelViewSet):
"""
技术论坛帖子接口
"""
queryset = Topic.objects.all()
serializer_class = TopicSerializer
filter_backends = [filters.SearchFilter, filters.OrderingFilter, DjangoFilterBackend]
search_fields = ['title', 'content']
filterset_fields = ['category', 'is_pinned']
ordering_fields = ['created_at', 'view_count']
ordering = ['-is_pinned', '-created_at']
def perform_create(self, serializer):
user = get_current_wechat_user(self.request)
# Auth check is done in create or permission, but here we need user for save
if user:
serializer.save(author=user)
def create(self, request, *args, **kwargs):
user = get_current_wechat_user(request)
if not user:
return Response({'error': '请先登录'}, status=401)
return super().create(request, *args, **kwargs)
def retrieve(self, request, *args, **kwargs):
instance = self.get_object()
instance.view_count += 1
instance.save(update_fields=['view_count'])
serializer = self.get_serializer(instance)
return Response(serializer.data)
class ReplyViewSet(viewsets.ModelViewSet):
"""
帖子回复接口
"""
queryset = Reply.objects.all()
serializer_class = ReplySerializer
def perform_create(self, serializer):
user = get_current_wechat_user(self.request)
if user:
serializer.save(author=user)
def create(self, request, *args, **kwargs):
user = get_current_wechat_user(request)
if not user:
return Response({'error': '请先登录'}, status=401)
return super().create(request, *args, **kwargs)
import requests
class TopicMediaViewSet(viewsets.ViewSet):
"""
论坛多媒体资源上传接口 (代理到外部OSS服务)
"""
permission_classes = [] # 内部鉴权
parser_classes = [parsers.MultiPartParser, parsers.FormParser]
@extend_schema(summary="上传媒体文件 (返回URL用于Markdown)")
def create(self, request, *args, **kwargs):
user = get_current_wechat_user(request)
if not user:
return Response({'error': '请先登录'}, status=401)
file_obj = request.FILES.get('file')
if not file_obj:
return Response({'error': '未提供文件'}, status=400)
# 转发到外部 OSS 上传服务
upload_url = "https://data.tangledup-ai.com/upload?folder=uploads%2Fmarket%2Fforum_image"
files = {'file': (file_obj.name, file_obj, file_obj.content_type)}
try:
# 这里的 headers 不需要 Content-Typerequests 会自动设置 multipart/form-data
response = requests.post(upload_url, files=files, timeout=30)
if response.status_code == 200:
data = response.json()
if data.get('success'):
# Create TopicMedia record
media_type = 'image' if 'image' in file_obj.content_type else 'video'
media_obj = TopicMedia.objects.create(
file_url=data.get('file_url'),
media_type=media_type,
# topic will be associated later
)
# 返回符合前端预期的格式
return Response({
'id': media_obj.id, # Return real DB ID
'file': media_obj.file_url,
'media_type': media_obj.media_type,
'created_at': media_obj.created_at
})
else:
return Response({'error': '外部服务上传失败', 'detail': data}, status=400)
else:
return Response({'error': f'上传服务响应错误: {response.status_code}', 'detail': response.text}, status=502)
except Exception as e:
return Response({'error': str(e)}, status=500)
class AnnouncementViewSet(viewsets.ReadOnlyModelViewSet):
"""
社区公告接口
"""
queryset = Announcement.objects.all()
serializer_class = AnnouncementSerializer
permission_classes = [permissions.AllowAny]
def get_queryset(self):
now = timezone.now()
qs = Announcement.objects.filter(is_active=True)
# Filter by start_time (if set, must be <= now)
qs = qs.filter(models.Q(start_time__isnull=True) | models.Q(start_time__lte=now))
# Filter by end_time (if set, must be >= now)
qs = qs.filter(models.Q(end_time__isnull=True) | models.Q(end_time__gte=now))
return qs.order_by('-is_pinned', '-priority', '-created_at')