215 lines
7.6 KiB
Python
215 lines
7.6 KiB
Python
"""
|
|
routes/tracker.py — Live Tracker Blueprint
|
|
Handles:
|
|
POST /api/<world_id>/movements (Tampermonkey pushes movement data)
|
|
GET /api/<world_id>/movements/<player_id> (Dashboard initial load)
|
|
GET /api/<world_id>/movements/<player_id>/stream (SSE push stream)
|
|
|
|
All data is isolated per player_id + world_id.
|
|
"""
|
|
|
|
from flask import Blueprint, request, jsonify, Response, stream_with_context
|
|
from db import get_db
|
|
from datetime import datetime
|
|
import json
|
|
import queue
|
|
import threading
|
|
import logging
|
|
|
|
_log = logging.getLogger(__name__)
|
|
|
|
tracker = Blueprint('tracker', __name__)
|
|
|
|
# ----------------------------------------------------------------
|
|
# In-memory SSE subscriber registry
|
|
# Key: "<world_id>:<player_id>" Value: list of queue.Queue()
|
|
# One Queue per open dashboard tab. Thread-safe via a lock.
|
|
# ----------------------------------------------------------------
|
|
_subscribers = {}
|
|
_sub_lock = threading.Lock()
|
|
|
|
|
|
def _sub_key(player_id, world_id):
|
|
return f"{world_id}:{player_id}"
|
|
|
|
|
|
def _notify(player_id, world_id, payload):
|
|
"""Push a JSON payload to all SSE subscribers for this player+world."""
|
|
key = _sub_key(player_id, world_id)
|
|
data = json.dumps(payload)
|
|
with _sub_lock:
|
|
for q in _subscribers.get(key, []):
|
|
try:
|
|
q.put_nowait(data)
|
|
except queue.Full:
|
|
pass # slow consumer — drop silently
|
|
|
|
|
|
# ----------------------------------------------------------------
|
|
# Helper: read clan from X-Clan-Key header (same as api.py)
|
|
# ----------------------------------------------------------------
|
|
def _get_clan_from_request():
|
|
key = request.headers.get('X-Clan-Key', '').strip()
|
|
if not key:
|
|
return None
|
|
conn = get_db()
|
|
clan = conn.execute('SELECT * FROM clans WHERE clan_key = ?', (key,)).fetchone()
|
|
conn.close()
|
|
return clan
|
|
|
|
|
|
# ----------------------------------------------------------------
|
|
# POST /api/<world_id>/movements
|
|
# Tampermonkey sends the current movement snapshot.
|
|
# Requires X-Clan-Key header (same as all other bot endpoints).
|
|
# Body: { player_id, world_id, movements: [{...}, ...] }
|
|
# ----------------------------------------------------------------
|
|
@tracker.route('/api/<world_id>/movements', methods=['POST'])
|
|
def receive_movements(world_id):
|
|
clan = _get_clan_from_request()
|
|
if not clan:
|
|
return jsonify({'error': 'Unauthorized'}), 403
|
|
|
|
data = request.get_json(silent=True)
|
|
if not data:
|
|
return jsonify({'error': 'no data'}), 400
|
|
|
|
player_id = str(data.get('player_id', '')).strip()
|
|
movements = data.get('movements', [])
|
|
|
|
if not player_id:
|
|
return jsonify({'error': 'missing player_id'}), 400
|
|
|
|
# Normalise world_id — trust the URL param, not the body
|
|
world_id = world_id.strip()
|
|
|
|
conn = get_db()
|
|
c = conn.cursor()
|
|
now = datetime.utcnow().isoformat()
|
|
|
|
# 1. Upsert each movement (UNIQUE on player_id+world_id+command_id)
|
|
for m in movements:
|
|
cmd_id = str(m.get('id', '')).strip()
|
|
if not cmd_id:
|
|
continue
|
|
c.execute('''
|
|
INSERT INTO movements
|
|
(player_id, world_id, command_id, cmd_type,
|
|
origin_town, origin_player, target_town, target_player,
|
|
arrival_at, raw_data, updated_at)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
ON CONFLICT(player_id, world_id, command_id) DO UPDATE SET
|
|
cmd_type = excluded.cmd_type,
|
|
origin_town = excluded.origin_town,
|
|
origin_player = excluded.origin_player,
|
|
target_town = excluded.target_town,
|
|
target_player = excluded.target_player,
|
|
arrival_at = excluded.arrival_at,
|
|
raw_data = excluded.raw_data,
|
|
updated_at = excluded.updated_at
|
|
''', (
|
|
player_id, world_id, cmd_id,
|
|
m.get('type', 'unknown'),
|
|
m.get('origin_town'), m.get('origin_player'),
|
|
m.get('target_town'), m.get('target_player'),
|
|
m.get('arrival_at'),
|
|
json.dumps(m),
|
|
now
|
|
))
|
|
|
|
# 2. Purge stale entries: movements that are no longer in the snapshot
|
|
# (the game already resolved them). We use command_ids sent in this batch.
|
|
live_ids = [str(m.get('id', '')) for m in movements if m.get('id')]
|
|
if live_ids:
|
|
placeholders = ','.join('?' * len(live_ids))
|
|
c.execute(f'''
|
|
DELETE FROM movements
|
|
WHERE player_id = ? AND world_id = ?
|
|
AND command_id NOT IN ({placeholders})
|
|
''', [player_id, world_id] + live_ids)
|
|
else:
|
|
# Empty snapshot → all movements resolved, clear the table for this player+world
|
|
c.execute(
|
|
'DELETE FROM movements WHERE player_id = ? AND world_id = ?',
|
|
(player_id, world_id)
|
|
)
|
|
|
|
conn.commit()
|
|
|
|
# 3. Read back the current state and notify SSE subscribers immediately
|
|
rows = c.execute('''
|
|
SELECT command_id, cmd_type, origin_town, origin_player,
|
|
target_town, target_player, arrival_at
|
|
FROM movements
|
|
WHERE player_id = ? AND world_id = ?
|
|
ORDER BY arrival_at ASC
|
|
''', (player_id, world_id)).fetchall()
|
|
conn.close()
|
|
|
|
result = [dict(r) for r in rows]
|
|
_notify(player_id, world_id, {'movements': result})
|
|
|
|
return jsonify({'ok': True, 'stored': len(result)})
|
|
|
|
|
|
# ----------------------------------------------------------------
|
|
# GET /api/<world_id>/movements/<player_id>
|
|
# Dashboard initial load — returns current snapshot from DB.
|
|
# ----------------------------------------------------------------
|
|
@tracker.route('/api/<world_id>/movements/<player_id>', methods=['GET'])
|
|
def get_movements(world_id, player_id):
|
|
conn = get_db()
|
|
rows = conn.execute('''
|
|
SELECT command_id, cmd_type, origin_town, origin_player,
|
|
target_town, target_player, arrival_at
|
|
FROM movements
|
|
WHERE player_id = ? AND world_id = ?
|
|
ORDER BY arrival_at ASC
|
|
''', (player_id, world_id)).fetchall()
|
|
conn.close()
|
|
return jsonify({'movements': [dict(r) for r in rows]})
|
|
|
|
|
|
# ----------------------------------------------------------------
|
|
# GET /api/<world_id>/movements/<player_id>/stream
|
|
# SSE endpoint — keeps connection open, pushes updates to dashboard.
|
|
# Each connected dashboard tab gets its own Queue.
|
|
# ----------------------------------------------------------------
|
|
@tracker.route('/api/<world_id>/movements/<player_id>/stream', methods=['GET'])
|
|
def stream_movements(world_id, player_id):
|
|
key = _sub_key(player_id, world_id)
|
|
q = queue.Queue(maxsize=20)
|
|
|
|
with _sub_lock:
|
|
_subscribers.setdefault(key, []).append(q)
|
|
|
|
def generate():
|
|
# Send a comment immediately so the browser confirms the connection
|
|
yield ': connected\n\n'
|
|
try:
|
|
while True:
|
|
try:
|
|
data = q.get(timeout=30)
|
|
yield f'data: {data}\n\n'
|
|
except queue.Empty:
|
|
# Heartbeat — keeps the connection alive through proxies/firewalls
|
|
yield ': heartbeat\n\n'
|
|
except GeneratorExit:
|
|
pass
|
|
finally:
|
|
with _sub_lock:
|
|
subs = _subscribers.get(key, [])
|
|
if q in subs:
|
|
subs.remove(q)
|
|
if not subs:
|
|
_subscribers.pop(key, None)
|
|
|
|
return Response(
|
|
stream_with_context(generate()),
|
|
content_type='text/event-stream',
|
|
headers={
|
|
'Cache-Control': 'no-cache',
|
|
'X-Accel-Buffering': 'no', # disables nginx buffering (important!)
|
|
}
|
|
)
|