add SSE live update

This commit is contained in:
2026-02-02 09:50:53 +08:00
parent 0e06463490
commit dd62f28be1

View File

@@ -1,13 +1,16 @@
from fastapi import FastAPI, HTTPException from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import HTMLResponse, JSONResponse from fastapi.responses import HTMLResponse, JSONResponse, StreamingResponse
from fastapi.staticfiles import StaticFiles from fastapi.staticfiles import StaticFiles
from pydantic import BaseModel from pydantic import BaseModel
from typing import List, Dict, Optional from typing import List, Dict, Optional
import os import os
import sys import sys
import json
import asyncio
from pathlib import Path from pathlib import Path
from dotenv import load_dotenv from dotenv import load_dotenv
import time
load_dotenv() load_dotenv()
@@ -119,6 +122,81 @@ async def health():
return {"status": "healthy", "db_connected": conv_store is not None} return {"status": "healthy", "db_connected": conv_store is not None}
@app.get("/api/events")
async def stream_events():
"""Server-Sent Events endpoint for live updates"""
if conv_store is None:
raise HTTPException(status_code=500, detail="Database connection not configured")
import psycopg
conn_str = os.environ.get("CONN_STR")
if not conn_str:
raise HTTPException(status_code=500, detail="CONN_STR not set")
async def event_generator():
last_check = {}
check_interval = 2.0 # Check every 2 seconds
while True:
try:
with psycopg.connect(conn_str) as conn:
with conn.cursor(row_factory=psycopg.rows.dict_row) as cur:
# Get current state of all conversations
cur.execute("""
SELECT
conversation_id,
COUNT(*) as message_count,
MAX(created_at) as last_updated
FROM messages
GROUP BY conversation_id
ORDER BY last_updated DESC
""")
results = cur.fetchall()
current_state = {}
for row in results:
conv_id = row["conversation_id"]
last_updated = row["last_updated"]
message_count = row["message_count"]
current_state[conv_id] = {
"message_count": message_count,
"last_updated": last_updated.isoformat() if last_updated else None,
"timestamp": last_updated.timestamp() if last_updated else 0
}
# Check if this conversation is new or updated
if conv_id not in last_check:
# New conversation
yield f"data: {json.dumps({'type': 'conversation_new', 'conversation': {'conversation_id': conv_id, 'message_count': message_count, 'last_updated': current_state[conv_id]['last_updated']}})}\n\n"
elif last_check[conv_id]["timestamp"] < current_state[conv_id]["timestamp"]:
# Updated conversation (new messages)
yield f"data: {json.dumps({'type': 'conversation_updated', 'conversation': {'conversation_id': conv_id, 'message_count': message_count, 'last_updated': current_state[conv_id]['last_updated']}})}\n\n"
# Check for deleted conversations
for conv_id in last_check:
if conv_id not in current_state:
yield f"data: {json.dumps({'type': 'conversation_deleted', 'conversation_id': conv_id})}\n\n"
last_check = current_state
await asyncio.sleep(check_interval)
except Exception as e:
yield f"data: {json.dumps({'type': 'error', 'message': str(e)})}\n\n"
await asyncio.sleep(check_interval)
return StreamingResponse(
event_generator(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no", # Disable buffering for nginx
}
)
if __name__ == "__main__": if __name__ == "__main__":
import uvicorn import uvicorn
uvicorn.run( uvicorn.run(