Files
market_page/backend/community/views.py
2026-02-12 20:50:01 +08:00

192 lines
7.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from rest_framework import viewsets, status, mixins, parsers, filters
from django_filters.rest_framework import DjangoFilterBackend
from rest_framework.decorators import action
from rest_framework.response import Response
from rest_framework import serializers, permissions
from django.core.signing import TimestampSigner, BadSignature, SignatureExpired
from django.utils import timezone
from django.db import models
from drf_spectacular.utils import extend_schema
from shop.models import WeChatUser
from .models import Activity, ActivitySignup, Topic, Reply, TopicMedia, Announcement
from .serializers import ActivitySerializer, ActivitySignupSerializer, TopicSerializer, ReplySerializer, TopicMediaSerializer, AnnouncementSerializer
from .utils import get_current_wechat_user
from .permissions import IsAuthorOrReadOnly
class ActivityViewSet(viewsets.ReadOnlyModelViewSet):
"""
社区活动接口
"""
queryset = Activity.objects.filter(is_active=True).order_by('-created_at')
serializer_class = ActivitySerializer
@extend_schema(summary="报名活动")
@action(detail=True, methods=['post'])
def signup(self, request, pk=None):
user = get_current_wechat_user(request)
if not user:
return Response({'error': '请先登录'}, status=401)
activity = self.get_object()
# Check if already signed up
if ActivitySignup.objects.filter(activity=activity, user=user).exists():
return Response({'error': '您已报名该活动'}, status=400)
if activity.signups.count() >= activity.max_participants:
return Response({'error': '活动名额已满'}, status=400)
# Get signup info
signup_info = request.data.get('signup_info', {})
# Basic validation
if activity.signup_form_config:
required_fields = [f['name'] for f in activity.signup_form_config if f.get('required')]
for field in required_fields:
# Simple check: field exists and is not empty string (if it's a string)
val = signup_info.get(field)
if val is None or (isinstance(val, str) and not val.strip()):
# Try to find label for better error message
label = next((f['label'] for f in activity.signup_form_config if f['name'] == field), field)
return Response({'error': f'请填写: {label}'}, status=400)
signup = ActivitySignup.objects.create(
activity=activity,
user=user,
signup_info=signup_info
)
serializer = ActivitySignupSerializer(signup)
return Response(serializer.data, status=201)
@extend_schema(summary="我的报名记录")
@action(detail=False, methods=['get'])
def my_signups(self, request):
user = get_current_wechat_user(request)
if not user:
return Response({'error': '请先登录'}, status=401)
signups = ActivitySignup.objects.filter(user=user).order_by('-signup_time')
serializer = ActivitySignupSerializer(signups, many=True)
return Response(serializer.data)
class TopicViewSet(viewsets.ModelViewSet):
"""
技术论坛帖子接口
"""
queryset = Topic.objects.all()
serializer_class = TopicSerializer
permission_classes = [IsAuthorOrReadOnly]
filter_backends = [filters.SearchFilter, filters.OrderingFilter, DjangoFilterBackend]
search_fields = ['title', 'content']
filterset_fields = ['category', 'is_pinned']
ordering_fields = ['created_at', 'view_count']
ordering = ['-is_pinned', '-created_at']
def perform_create(self, serializer):
user = get_current_wechat_user(self.request)
# Auth check is done in create or permission, but here we need user for save
if user:
serializer.save(author=user)
def create(self, request, *args, **kwargs):
user = get_current_wechat_user(request)
if not user:
return Response({'error': '请先登录'}, status=401)
return super().create(request, *args, **kwargs)
def retrieve(self, request, *args, **kwargs):
instance = self.get_object()
instance.view_count += 1
instance.save(update_fields=['view_count'])
serializer = self.get_serializer(instance)
return Response(serializer.data)
class ReplyViewSet(viewsets.ModelViewSet):
"""
帖子回复接口
"""
queryset = Reply.objects.all()
serializer_class = ReplySerializer
permission_classes = [IsAuthorOrReadOnly]
def perform_create(self, serializer):
user = get_current_wechat_user(self.request)
if user:
serializer.save(author=user)
def create(self, request, *args, **kwargs):
user = get_current_wechat_user(request)
if not user:
return Response({'error': '请先登录'}, status=401)
return super().create(request, *args, **kwargs)
import requests
class TopicMediaViewSet(viewsets.ViewSet):
"""
论坛多媒体资源上传接口 (代理到外部OSS服务)
"""
permission_classes = [] # 内部鉴权
parser_classes = [parsers.MultiPartParser, parsers.FormParser]
@extend_schema(summary="上传媒体文件 (返回URL用于Markdown)")
def create(self, request, *args, **kwargs):
user = get_current_wechat_user(request)
if not user:
return Response({'error': '请先登录'}, status=401)
file_obj = request.FILES.get('file')
if not file_obj:
return Response({'error': '未提供文件'}, status=400)
# 转发到外部 OSS 上传服务
upload_url = "https://data.tangledup-ai.com/upload?folder=uploads%2Fmarket%2Fforum_image"
files = {'file': (file_obj.name, file_obj, file_obj.content_type)}
try:
# 这里的 headers 不需要 Content-Typerequests 会自动设置 multipart/form-data
response = requests.post(upload_url, files=files, timeout=30)
if response.status_code == 200:
data = response.json()
if data.get('success'):
# Create TopicMedia record
media_type = 'image' if 'image' in file_obj.content_type else 'video'
media_obj = TopicMedia.objects.create(
file_url=data.get('file_url'),
media_type=media_type,
# topic will be associated later
)
# 返回符合前端预期的格式
return Response({
'id': media_obj.id, # Return real DB ID
'file': media_obj.file_url,
'media_type': media_obj.media_type,
'created_at': media_obj.created_at
})
else:
return Response({'error': '外部服务上传失败', 'detail': data}, status=400)
else:
return Response({'error': f'上传服务响应错误: {response.status_code}', 'detail': response.text}, status=502)
except Exception as e:
return Response({'error': str(e)}, status=500)
class AnnouncementViewSet(viewsets.ReadOnlyModelViewSet):
"""
社区公告接口
"""
queryset = Announcement.objects.all()
serializer_class = AnnouncementSerializer
permission_classes = [permissions.AllowAny]
def get_queryset(self):
now = timezone.now()
qs = Announcement.objects.filter(is_active=True)
# Filter by start_time (if set, must be <= now)
qs = qs.filter(models.Q(start_time__isnull=True) | models.Q(start_time__lte=now))
# Filter by end_time (if set, must be >= now)
qs = qs.filter(models.Q(end_time__isnull=True) | models.Q(end_time__gte=now))
return qs.order_by('-is_pinned', '-priority', '-created_at')