Files
grepo-remote/routes/auth.py
2026-04-26 22:39:52 +03:00

304 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from flask import Blueprint, render_template, request, jsonify, redirect, url_for, flash
from flask_login import login_user, logout_user, login_required, current_user
from werkzeug.security import generate_password_hash, check_password_hash
from db import get_db, generate_clan_key
from datetime import datetime, timezone
auth = Blueprint('auth', __name__)
# ------------------------------------------------------------------
# Helper — resolve the User class from app (avoid circular import)
# ------------------------------------------------------------------
def _make_user(row):
from app import User
return User(row['id'], row['username'], row['clan_id'])
# ------------------------------------------------------------------
# GET/POST /auth/login
# ------------------------------------------------------------------
@auth.route('/auth/login', methods=['GET', 'POST'])
def login():
if current_user.is_authenticated:
return redirect(url_for('dashboard.index'))
error = None
if request.method == 'POST':
username = request.form.get('username', '').strip()
password = request.form.get('password', '')
conn = get_db()
row = conn.execute(
'SELECT id, username, password_hash, clan_id FROM users WHERE username = ?', (username,)
).fetchone()
conn.close()
if row and check_password_hash(row['password_hash'], password):
user = _make_user(row)
login_user(user, remember=True)
return redirect(url_for('dashboard.index'))
else:
error = 'Λάθος όνομα χρήστη ή κωδικός.'
return render_template('login.html', error=error)
# ------------------------------------------------------------------
# GET/POST /auth/register
# ------------------------------------------------------------------
@auth.route('/auth/register', methods=['GET', 'POST'])
def register():
if current_user.is_authenticated:
return redirect(url_for('dashboard.index'))
error = None
if request.method == 'POST':
username = request.form.get('username', '').strip()
password = request.form.get('password', '')
confirm = request.form.get('confirm', '')
if not username or not password:
error = 'Συμπλήρωσε όνομα χρήστη και κωδικό.'
elif password != confirm:
error = 'Οι κωδικοί δεν ταιριάζουν.'
elif len(password) < 6:
error = 'Ο κωδικός πρέπει να έχει τουλάχιστον 6 χαρακτήρες.'
else:
conn = get_db()
existing = conn.execute(
'SELECT id FROM users WHERE username = ?', (username,)
).fetchone()
if existing:
error = 'Το όνομα χρήστη χρησιμοποιείται ήδη.'
conn.close()
else:
pw_hash = generate_password_hash(password)
conn.execute(
'INSERT INTO users (username, password_hash) VALUES (?, ?)',
(username, pw_hash)
)
conn.commit()
row = conn.execute(
'SELECT id, username, password_hash, clan_id FROM users WHERE username = ?', (username,)
).fetchone()
conn.close()
user = _make_user(row)
login_user(user, remember=True)
return redirect(url_for('auth.options'))
return render_template('register.html', error=error)
# ------------------------------------------------------------------
# GET /auth/logout
# ------------------------------------------------------------------
@auth.route('/auth/logout')
@login_required
def logout():
logout_user()
return redirect(url_for('auth.login'))
# ------------------------------------------------------------------
# GET/POST /auth/options — Clan management page
# ------------------------------------------------------------------
@auth.route('/auth/options', methods=['GET', 'POST'])
@login_required
def options():
conn = get_db()
# Load clan based on current user's clan_id
clan = None
if current_user.clan_id:
clan = conn.execute(
'SELECT * FROM clans WHERE id = ?', (current_user.clan_id,)
).fetchone()
# Fetch website admins (users belonging to this clan other than current user)
admins = []
if clan and clan['owner_id'] == current_user.id:
admins = conn.execute(
'SELECT id, username, created_at FROM users WHERE clan_id = ? AND id != ? ORDER BY created_at ASC',
(clan['id'], current_user.id)
).fetchall()
members = []
if clan:
rows = conn.execute(
'''SELECT cm.id, cm.player_id, cm.player_name, cm.joined_at, cm.features,
ts.updated_at
FROM clan_members cm
LEFT JOIN town_state ts ON ts.player_id = cm.player_id
WHERE cm.clan_id = ?
GROUP BY cm.player_id
ORDER BY cm.joined_at DESC''',
(clan['id'],)
).fetchall()
now = datetime.utcnow()
for row in rows:
is_online = False
if row['updated_at']:
try:
last_seen = datetime.fromisoformat(row['updated_at'])
if (now - last_seen).total_seconds() <= 150:
is_online = True
except Exception:
pass
members.append({
'id': row['id'],
'player_id': row['player_id'],
'player_name': row['player_name'] or 'Άγνωστος',
'joined_at': row['joined_at'][:10] if row['joined_at'] else '',
'is_online': is_online,
'feat_farm': 'farm' in (row['features'] or 'farm,admin'),
'feat_admin': 'admin' in (row['features'] or 'farm,admin'),
})
conn.close()
return render_template('options.html', clan=clan, members=members, admins=admins)
# ------------------------------------------------------------------
# POST /auth/clan/create
# ------------------------------------------------------------------
@auth.route('/auth/clan/create', methods=['POST'])
@login_required
def create_clan():
clan_name = request.form.get('clan_name', '').strip()
if not clan_name:
return redirect(url_for('auth.options'))
conn = get_db()
existing = conn.execute(
'SELECT id FROM clans WHERE owner_id = ?', (current_user.id,)
).fetchone()
if not existing:
key = generate_clan_key()
cursor = conn.execute(
'INSERT INTO clans (owner_id, name, clan_key) VALUES (?, ?, ?)',
(current_user.id, clan_name, key)
)
clan_id = cursor.lastrowid
conn.execute('UPDATE users SET clan_id = ? WHERE id = ?', (clan_id, current_user.id))
conn.commit()
# Update the current_user object dynamically to reflect the new clan_id without re-login
current_user.clan_id = clan_id
conn.close()
return redirect(url_for('auth.options'))
# ------------------------------------------------------------------
# POST /auth/clan/regenerate-key
# ------------------------------------------------------------------
@auth.route('/auth/clan/regenerate-key', methods=['POST'])
@login_required
def regenerate_key():
new_key = generate_clan_key()
conn = get_db()
conn.execute(
'UPDATE clans SET clan_key = ? WHERE owner_id = ?',
(new_key, current_user.id)
)
conn.commit()
conn.close()
return redirect(url_for('auth.options'))
@auth.route('/auth/clan/remove-member/<player_id>', methods=['POST'])
@login_required
def remove_member(player_id):
conn = get_db()
clan = conn.execute(
'SELECT id FROM clans WHERE owner_id = ?', (current_user.id,)
).fetchone()
if clan:
conn.execute(
'DELETE FROM clan_members WHERE clan_id = ? AND player_id = ?',
(clan['id'], player_id)
)
conn.commit()
conn.close()
return redirect(url_for('auth.options'))
# ------------------------------------------------------------------
# POST /auth/clan/update-features/<player_id>
# ------------------------------------------------------------------
@auth.route('/auth/clan/update-features/<player_id>', methods=['POST'])
@login_required
def update_member_features(player_id):
farm = 'farm' if request.form.get('farm') else None
admin = 'admin' if request.form.get('admin') else None
features = ','.join(f for f in [farm, admin] if f) or ''
conn = get_db()
clan = conn.execute(
'SELECT id FROM clans WHERE owner_id = ?', (current_user.id,)
).fetchone()
if clan:
conn.execute(
'UPDATE clan_members SET features = ? WHERE clan_id = ? AND player_id = ?',
(features, clan['id'], player_id)
)
conn.commit()
conn.close()
return redirect(url_for('auth.options'))
# ------------------------------------------------------------------
# POST /auth/clan/add-admin
# ------------------------------------------------------------------
@auth.route('/auth/clan/add-admin', methods=['POST'])
@login_required
def add_admin():
username = request.form.get('admin_username', '').strip()
if not username:
return redirect(url_for('auth.options'))
conn = get_db()
clan = conn.execute(
'SELECT id FROM clans WHERE owner_id = ?', (current_user.id,)
).fetchone()
if clan:
# Check if user exists
user = conn.execute('SELECT id, clan_id FROM users WHERE username = ?', (username,)).fetchone()
if user:
# If user already belongs to a clan, we could show an error, but let's just overwrite for now
# or maybe only if clan_id is NULL
conn.execute('UPDATE users SET clan_id = ? WHERE id = ?', (clan['id'], user['id']))
conn.commit()
flash(f"Ο χρήστης {username} προστέθηκε ως διαχειριστής.", "success")
else:
flash(f"Ο χρήστης {username} δεν βρέθηκε.", "error")
conn.close()
return redirect(url_for('auth.options'))
# ------------------------------------------------------------------
# POST /auth/clan/remove-admin/<admin_id>
# ------------------------------------------------------------------
@auth.route('/auth/clan/remove-admin/<int:admin_id>', methods=['POST'])
@login_required
def remove_admin(admin_id):
conn = get_db()
clan = conn.execute(
'SELECT id FROM clans WHERE owner_id = ?', (current_user.id,)
).fetchone()
if clan:
conn.execute('UPDATE users SET clan_id = NULL WHERE id = ? AND clan_id = ?', (admin_id, clan['id']))
conn.commit()
flash("Ο διαχειριστής αφαιρέθηκε.", "success")
conn.close()
return redirect(url_for('auth.options'))