From dd62f28be1a32db9a7032d2c73e5ed693aba23a2 Mon Sep 17 00:00:00 2001 From: goulustis Date: Mon, 2 Feb 2026 09:50:53 +0800 Subject: [PATCH] add SSE live update --- fastapi_server/server_viewer.py | 80 ++++++++++++++++++++++++++++++++- 1 file changed, 79 insertions(+), 1 deletion(-) diff --git a/fastapi_server/server_viewer.py b/fastapi_server/server_viewer.py index f41130f..d1f6d90 100644 --- a/fastapi_server/server_viewer.py +++ b/fastapi_server/server_viewer.py @@ -1,13 +1,16 @@ from fastapi import FastAPI, HTTPException from fastapi.middleware.cors import CORSMiddleware -from fastapi.responses import HTMLResponse, JSONResponse +from fastapi.responses import HTMLResponse, JSONResponse, StreamingResponse from fastapi.staticfiles import StaticFiles from pydantic import BaseModel from typing import List, Dict, Optional import os import sys +import json +import asyncio from pathlib import Path from dotenv import load_dotenv +import time load_dotenv() @@ -119,6 +122,81 @@ async def health(): return {"status": "healthy", "db_connected": conv_store is not None} +@app.get("/api/events") +async def stream_events(): + """Server-Sent Events endpoint for live updates""" + if conv_store is None: + raise HTTPException(status_code=500, detail="Database connection not configured") + + import psycopg + conn_str = os.environ.get("CONN_STR") + if not conn_str: + raise HTTPException(status_code=500, detail="CONN_STR not set") + + async def event_generator(): + last_check = {} + check_interval = 2.0 # Check every 2 seconds + + while True: + try: + with psycopg.connect(conn_str) as conn: + with conn.cursor(row_factory=psycopg.rows.dict_row) as cur: + # Get current state of all conversations + cur.execute(""" + SELECT + conversation_id, + COUNT(*) as message_count, + MAX(created_at) as last_updated + FROM messages + GROUP BY conversation_id + ORDER BY last_updated DESC + """) + results = cur.fetchall() + + current_state = {} + for row in results: + conv_id = row["conversation_id"] + last_updated = row["last_updated"] + message_count = row["message_count"] + + current_state[conv_id] = { + "message_count": message_count, + "last_updated": last_updated.isoformat() if last_updated else None, + "timestamp": last_updated.timestamp() if last_updated else 0 + } + + # Check if this conversation is new or updated + if conv_id not in last_check: + # New conversation + yield f"data: {json.dumps({'type': 'conversation_new', 'conversation': {'conversation_id': conv_id, 'message_count': message_count, 'last_updated': current_state[conv_id]['last_updated']}})}\n\n" + elif last_check[conv_id]["timestamp"] < current_state[conv_id]["timestamp"]: + # Updated conversation (new messages) + yield f"data: {json.dumps({'type': 'conversation_updated', 'conversation': {'conversation_id': conv_id, 'message_count': message_count, 'last_updated': current_state[conv_id]['last_updated']}})}\n\n" + + # Check for deleted conversations + for conv_id in last_check: + if conv_id not in current_state: + yield f"data: {json.dumps({'type': 'conversation_deleted', 'conversation_id': conv_id})}\n\n" + + last_check = current_state + + await asyncio.sleep(check_interval) + + except Exception as e: + yield f"data: {json.dumps({'type': 'error', 'message': str(e)})}\n\n" + await asyncio.sleep(check_interval) + + return StreamingResponse( + event_generator(), + media_type="text/event-stream", + headers={ + "Cache-Control": "no-cache", + "Connection": "keep-alive", + "X-Accel-Buffering": "no", # Disable buffering for nginx + } + ) + + if __name__ == "__main__": import uvicorn uvicorn.run(