Table of Contents

ionpy – Kamera & Video Stream Roadmap

USB-Cams, Netz-Cams, Mobile Cam & Handy-Sensoren

Erstellt: 2026-02-27
Status: In Planung
Abhängigkeiten: Keine – unabhängig von Wizard und PWA MVP


Architektur-Übersicht

┌─────────────────────────────────────────────────────┐
│                  Quellen                            │
│  📱 Handy-Cam   🔌 USB-Cam   🌐 IP-Cam (RTSP)      │
└──────────┬──────────┬──────────┬────────────────────┘
           │  WS      │  OpenCV  │  OpenCV/ffmpeg
           ▼          ▼          ▼
┌─────────────────────────────────────────────────────┐
│          web/camera.py – Frame Store                │
│  _frame_store { device_id → bytes (JPEG) }          │
│  _frame_subscribers { device_id → [Queue, ...] }    │
└────────────────────┬────────────────────────────────┘
                     │
           ┌─────────┴──────────┐
           ▼                    ▼
  GET /api/camera/           GET /api/camera/
  snapshot/{id}              mjpeg/{id}
  (Einzelbild)               (MJPEG Stream)
           │                    │
           ▼                    ▼
  Desktop Widget          Desktop Widget
  (img + Refresh)         (img src=mjpeg)
  Mobile Snapshot         Browser / VLC / ffmpeg

Design-Prinzip: Alle Quellen landen im selben _frame_store. Alle Consumers lesen aus demselben MJPEG-Endpoint. Quellen und Consumers kennen sich nicht.


Neue Abhängigkeiten

# requirements.txt Ergänzungen:
opencv-python-headless    # USB + RTSP Cams (headless = kein GUI)
                          # Oder: opencv-python wenn GUI gewünscht

Hinweis: Kein ffmpeg Binary nötig – OpenCV bringt eigene Codec-Unterstützung mit. MJPEG-Streaming ist pure Python.


PHASE 0: Backend Fundament

(~1.5 Tage)

0.1 Frame Store & MJPEG Router

Datei: web/camera.py (NEU)

Hinweis für KI: Der _frame_store und _frame_subscribers sind globale Dicts – das ist bewusst simpel gehalten. Bei mehreren Worker-Prozessen (z.B. Gunicorn) müsste man Redis verwenden, aber für Single-Process FastAPI/Uvicorn ist das absolut ausreichend.

"""
web/camera.py
Zentrales Kamera-Backend für ionpy.
Empfängt Frames von beliebigen Quellen und stellt sie
als MJPEG-Stream und Snapshot bereit.
"""
import asyncio
import time
import logging
from pathlib import Path
from typing import Optional
 
from fastapi import APIRouter, WebSocket, WebSocketDisconnect, UploadFile, Form
from fastapi.responses import StreamingResponse, JSONResponse
 
logger = logging.getLogger("Web.Camera")
router = APIRouter(prefix="/api/camera", tags=["Camera"])
 
# =========================================================================
# GLOBALER FRAME STORE
# =========================================================================
# Letzter JPEG-Frame pro device_id
_frame_store: dict[str, bytes] = {}
 
# Unix-Timestamp des letzten Frames pro device_id
_frame_timestamps: dict[str, float] = {}
 
# Aktive MJPEG-Consumer pro device_id (Queue bekommt neue Frames)
_frame_subscribers: dict[str, list[asyncio.Queue]] = {}
 
# Kamera-Metadaten (Name, Beschreibung, etc.)
_camera_meta: dict[str, dict] = {}
 
 
def _notify_subscribers(device_id: str, frame: bytes):
    """Neuen Frame an alle wartenden MJPEG-Consumer senden."""
    for q in _frame_subscribers.get(device_id, []):
        try:
            q.put_nowait(frame)
        except asyncio.QueueFull:
            pass  # Langsamer Consumer – Frame überspringen
 
 
def register_camera(device_id: str, name: str = None,
                    source_type: str = "unknown"):
    """Kamera im System anmelden (optional, für /list Endpoint)."""
    _camera_meta[device_id] = {
        "device_id":   device_id,
        "name":        name or device_id,
        "source_type": source_type,  # "mobile", "usb", "rtsp", "http"
        "registered":  time.time()
    }
 
 
# =========================================================================
# FRAME EMPFANG – Handy via POST
# =========================================================================
@router.post("/frame")
async def receive_frame(
    device_id: str = Form(...),
    image:     UploadFile = None
):
    """
    Empfängt einen JPEG-Frame von der Mobile PWA.
    Die PWA sendet alle N Sekunden ein Standbild.
    """
    if not image:
        return JSONResponse(status_code=400,
                           content={"error": "Kein Bild erhalten"})
 
    data = await image.read()
 
    # Größe sanity check (max 5 MB)
    if len(data) > 5 * 1024 * 1024:
        return JSONResponse(status_code=413,
                           content={"error": "Frame zu groß (max 5MB)"})
 
    _frame_store[device_id]      = data
    _frame_timestamps[device_id] = time.time()
    _notify_subscribers(device_id, data)
 
    if device_id not in _camera_meta:
        register_camera(device_id, source_type="mobile")
 
    return {"status": "ok", "size_bytes": len(data)}
 
 
# =========================================================================
# FRAME EMPFANG – Handy via WebSocket (Binary Stream)
# =========================================================================
@router.websocket("/stream/{device_id}")
async def camera_websocket_stream(
    websocket: WebSocket,
    device_id: str
):
    """
    Empfängt kontinuierliche JPEG-Frames via WebSocket.
    Für Mobile-Cam Echtzeit-Streaming (10 FPS).
    Binary-Protokoll: Jede Nachricht = ein vollständiger JPEG-Frame.
    """
    await websocket.accept()
    logger.info(f"Kamera-Stream verbunden: {device_id}")
 
    if device_id not in _camera_meta:
        register_camera(device_id, source_type="mobile_ws")
 
    try:
        while True:
            data = await websocket.receive_bytes()
 
            # Größen-Check
            if len(data) > 5 * 1024 * 1024:
                logger.warning(f"Frame zu groß von {device_id}: {len(data)} bytes")
                continue
 
            _frame_store[device_id]      = data
            _frame_timestamps[device_id] = time.time()
            _notify_subscribers(device_id, data)
 
    except WebSocketDisconnect:
        logger.info(f"Kamera-Stream getrennt: {device_id}")
        # Letzten Frame für ~30s behalten, dann als inaktiv markieren
    except Exception as e:
        logger.error(f"Kamera WS Fehler ({device_id}): {e}")
 
 
# =========================================================================
# CONSUMER – Einzelbild
# =========================================================================
@router.get("/snapshot/{device_id}")
async def get_snapshot(device_id: str):
    """
    Gibt den aktuell gespeicherten Frame als JPEG zurück.
    Ideal für: gelegentliche Aktualisierung, Thumbnails.
    """
    frame = _frame_store.get(device_id)
    if not frame:
        return JSONResponse(status_code=404,
                           content={"error": "Kein Frame verfügbar",
                                    "device_id": device_id})
 
    ts = _frame_timestamps.get(device_id, 0)
    return StreamingResponse(
        iter([frame]),
        media_type="image/jpeg",
        headers={
            "Cache-Control":    "no-cache, no-store",
            "X-Frame-Age-Ms":   str(int((time.time() - ts) * 1000)),
            "X-Frame-Timestamp": str(ts)
        }
    )
 
 
# =========================================================================
# CONSUMER – MJPEG Live Stream
# =========================================================================
@router.get("/mjpeg/{device_id}")
async def mjpeg_stream(device_id: str):
    """
    MJPEG-Stream für Browser, VLC, ffmpeg, OpenCV.
    Einfach als <img src="/api/camera/mjpeg/device_id"> verwenden –
    kein JavaScript nötig!
 
    Kompatibel mit:
      - Browser: <img>, <video> (limitiert)
      - Python: cv2.VideoCapture("http://host/api/camera/mjpeg/id")
      - ffmpeg: ffmpeg -i http://host/api/camera/mjpeg/id ...
      - VLC: Netzwerk-Stream öffnen
    """
    # Queue für diesen Consumer registrieren
    q: asyncio.Queue = asyncio.Queue(maxsize=10)
 
    if device_id not in _frame_subscribers:
        _frame_subscribers[device_id] = []
    _frame_subscribers[device_id].append(q)
 
    # Sofort den letzten bekannten Frame senden (kein Warten auf nächsten)
    if device_id in _frame_store:
        await q.put(_frame_store[device_id])
 
    async def generate():
        try:
            while True:
                try:
                    # Auf nächsten Frame warten (Timeout = Keepalive)
                    frame = await asyncio.wait_for(q.get(), timeout=15.0)
                    yield (
                        b'--ionpy_frame\r\n'
                        b'Content-Type: image/jpeg\r\n'
                        b'Content-Length: ' + str(len(frame)).encode() + b'\r\n'
                        b'\r\n'
                        + frame +
                        b'\r\n'
                    )
                except asyncio.TimeoutError:
                    # Keepalive: leeren Kommentar senden
                    yield b'--ionpy_frame\r\nContent-Type: text/plain\r\n\r\n\r\n'
 
        except asyncio.CancelledError:
            pass
        except Exception as e:
            logger.debug(f"MJPEG Stream ({device_id}) beendet: {e}")
        finally:
            # Consumer abmelden – wichtig gegen Memory Leak!
            if device_id in _frame_subscribers:
                try:
                    _frame_subscribers[device_id].remove(q)
                except ValueError:
                    pass
 
    return StreamingResponse(
        generate(),
        media_type="multipart/x-mixed-replace; boundary=ionpy_frame",
        headers={"Cache-Control": "no-cache"}
    )
 
 
# =========================================================================
# VERWALTUNG
# =========================================================================
@router.get("/list")
async def list_cameras():
    """Alle bekannten Kamera-Quellen mit Status."""
    now = time.time()
    result = []
 
    all_ids = set(_frame_store.keys()) | set(_camera_meta.keys())
 
    for device_id in all_ids:
        ts      = _frame_timestamps.get(device_id, 0)
        age_sec = round(now - ts, 1) if ts > 0 else None
        active  = age_sec is not None and age_sec < 10
 
        meta = _camera_meta.get(device_id, {})
        result.append({
            "device_id":       device_id,
            "name":            meta.get("name", device_id),
            "source_type":     meta.get("source_type", "unknown"),
            "active":          active,
            "last_frame_age":  age_sec,
            "frame_size":      len(_frame_store.get(device_id, b"")),
            "subscribers":     len(_frame_subscribers.get(device_id, [])),
            "snapshot_url":    f"/api/camera/snapshot/{device_id}",
            "mjpeg_url":       f"/api/camera/mjpeg/{device_id}",
            "stream_ws_url":   f"ws://[host]/api/camera/stream/{device_id}"
        })
 
    return sorted(result, key=lambda x: x["device_id"])
 
 
@router.delete("/camera/{device_id}")
async def remove_camera(device_id: str):
    """Kamera aus dem System entfernen (löscht Frame und Meta)."""
    _frame_store.pop(device_id, None)
    _frame_timestamps.pop(device_id, None)
    _frame_subscribers.pop(device_id, None)
    _camera_meta.pop(device_id, None)
    return {"status": "ok", "device_id": device_id}

0.2 Router in server.py einbinden

Datei: web/server.py

# In create_app() nach den bestehenden Routern:
from web.camera import router as camera_router
app.include_router(camera_router)

PHASE 1: Server-seitige Kameras (USB & RTSP)

(~2 Tage)

1.1 ServerCamera Klasse

Datei: devices/drivers/camera/server_camera.py (NEU)

Hinweis für KI:

Prozessende automatisch stirbt