Projet

Général

Profil

Wiki » server.py

Emil Abutalibov, 02/03/2026 10:47

 
#!/usr/bin/env python3
"""
Serveur HTTP pour Raspberry Pi - Upload FreeCAD vers clés USB
Supporte le portail captif (redirection automatique) et la copie vers USB
"""

from http.server import BaseHTTPRequestHandler, HTTPServer
import os
import shutil
import cgi
import json
import subprocess
import glob

# ─── Configuration ────────────────────────────────────────────────────────────
PORT = 80
UPLOAD_DIR = "/tmp/uploads" # Dossier temporaire avant copie USB
SERVER_IP = "192.168.4.1" # IP du Raspberry Pi (hotspot)

# ─── Détection des clés USB ───────────────────────────────────────────────────

def get_usb_drives():
"""Retourne la liste des clés USB montées avec leur chemin et espace disponible."""
drives = []
try:
# Cherche les périphériques montés dans /media ou /mnt
result = subprocess.run(
['lsblk', '-J', '-o', 'NAME,MOUNTPOINT,TRAN,SIZE,FSTYPE,LABEL'],
capture_output=True, text=True
)
data = json.loads(result.stdout)
def parse_block(device):
# Cherche les périphériques USB montés
if device.get('tran') == 'usb' and device.get('mountpoint'):
mp = device['mountpoint']
if mp and mp not in ['/', '/boot']:
stat = shutil.disk_usage(mp)
drives.append({
'name': device.get('label') or device['name'],
'path': mp,
'device': device['name'],
'size': device.get('size', '?'),
'free_gb': round(stat.free / (1024**3), 2),
'total_gb': round(stat.total / (1024**3), 2),
})
# Recurse sur les partitions
for child in device.get('children', []):
parse_block(child)
for device in data.get('blockdevices', []):
parse_block(device)
except Exception as e:
print(f"[USB] Erreur détection: {e}")
# Fallback: cherche dans /media/pi ou /mnt
for path in glob.glob('/media/pi/*') + glob.glob('/mnt/usb*'):
if os.path.ismount(path):
try:
stat = shutil.disk_usage(path)
drives.append({
'name': os.path.basename(path),
'path': path,
'device': '?',
'size': '?',
'free_gb': round(stat.free / (1024**3), 2),
'total_gb': round(stat.total / (1024**3), 2),
})
except:
pass
return drives


def copy_to_usb_drives(file_path, filename):
"""Copie un fichier vers toutes les clés USB disponibles."""
drives = get_usb_drives()
results = []
if not drives:
return [{"drive": "Aucune clé USB", "success": False, "error": "Aucune clé USB détectée"}]
for drive in drives:
dest_dir = os.path.join(drive['path'], 'FreeCAD_Files')
dest_path = os.path.join(dest_dir, filename)
try:
os.makedirs(dest_dir, exist_ok=True)
shutil.copy2(file_path, dest_path)
results.append({
"drive": drive['name'],
"path": dest_path,
"success": True
})
print(f"[USB] ✓ Copié vers {dest_path}")
except Exception as e:
results.append({
"drive": drive['name'],
"path": drive['path'],
"success": False,
"error": str(e)
})
print(f"[USB] ✗ Erreur copie vers {drive['path']}: {e}")
return results


# ─── Pages HTML ───────────────────────────────────────────────────────────────

def load_html():
"""Charge index.html depuis le même dossier que ce script."""
script_dir = os.path.dirname(os.path.abspath(__file__))
html_path = os.path.join(script_dir, 'index.html')
with open(html_path, 'rb') as f:
return f.read()


# ─── Gestionnaire HTTP ────────────────────────────────────────────────────────

class RequestHandler(BaseHTTPRequestHandler):

def log_message(self, format, *args):
print(f"[HTTP] {self.address_string()} - {format % args}")

# ── Portail captif: intercepte toutes les requêtes non-locales ──
def is_captive_probe(self):
"""Détecte les requêtes de vérification de connectivité (Android/iOS/Windows)."""
captive_paths = [
'/generate_204', # Android
'/gen_204', # Android Chrome
'/hotspot-detect.html', # Apple iOS/macOS
'/ncsi.txt', # Windows
'/connecttest.txt', # Windows 10+
'/redirect', # Divers
'/success.txt', # Firefox
'/canonical.html', # Ubuntu
]
return self.path.split('?')[0] in captive_paths

def do_GET(self):
host = self.headers.get('Host', '')
# ── Portail captif ──────────────────────────────────────────
# Si la requête ne vient pas de notre IP, on redirige
if self.is_captive_probe() or (host and SERVER_IP not in host and 'localhost' not in host):
self.send_response(302)
self.send_header('Location', f'http://{SERVER_IP}/')
self.end_headers()
return

path = self.path.split('?')[0]

if path == '/' or path == '/index.html':
try:
html = load_html()
self.send_response(200)
self.send_header('Content-type', 'text/html; charset=utf-8')
self.end_headers()
self.wfile.write(html)
except FileNotFoundError:
self.send_error(404, "index.html introuvable")

elif path == '/api/usb':
# API JSON: liste des clés USB disponibles
drives = get_usb_drives()
self.send_response(200)
self.send_header('Content-type', 'application/json')
self.end_headers()
self.wfile.write(json.dumps(drives).encode())

else:
self.send_error(404, "Page non trouvée")

def do_POST(self):
if self.path == '/upload':
self._handle_upload()
else:
self.send_error(404)

def _handle_upload(self):
try:
form = cgi.FieldStorage(
fp=self.rfile,
headers=self.headers,
environ={'REQUEST_METHOD': 'POST',
'CONTENT_TYPE': self.headers['Content-Type']}
)
except Exception as e:
self._json_response(500, {"success": False, "error": f"Erreur lecture formulaire: {e}"})
return

os.makedirs(UPLOAD_DIR, exist_ok=True)

uploaded = []
errors = []

# Traite chaque fichier uploadé
items = form['files[]'] if 'files[]' in form else []
if not isinstance(items, list):
items = [items]

for item in items:
if not item.filename:
continue

filename = os.path.basename(item.filename)
tmp_path = os.path.join(UPLOAD_DIR, filename)

# Sauvegarde temporaire
try:
with open(tmp_path, 'wb') as f:
shutil.copyfileobj(item.file, f, length=131072)
except Exception as e:
errors.append({"file": filename, "error": f"Erreur sauvegarde: {e}"})
continue

# Copie vers les clés USB
usb_results = copy_to_usb_drives(tmp_path, filename)
# Supprime le fichier temporaire
try:
os.remove(tmp_path)
except:
pass

uploaded.append({
"file": filename,
"usb_results": usb_results
})

response = {
"success": len(errors) == 0,
"uploaded": uploaded,
"errors": errors,
"usb_drives": get_usb_drives()
}

self._json_response(200, response)

def _json_response(self, code, data):
body = json.dumps(data, ensure_ascii=False).encode('utf-8')
self.send_response(code)
self.send_header('Content-type', 'application/json; charset=utf-8')
self.send_header('Content-Length', len(body))
self.end_headers()
self.wfile.write(body)


# ─── Lancement ────────────────────────────────────────────────────────────────

def run():
os.makedirs(UPLOAD_DIR, exist_ok=True)
server_address = ('', PORT)
httpd = HTTPServer(server_address, RequestHandler)
print(f"╔══════════════════════════════════════╗")
print(f"║ Serveur FreeCAD USB - Port {PORT} ║")
print(f"║ Adresse: http://{SERVER_IP} ║")
print(f"║ Uploads temp: {UPLOAD_DIR} ║")
print(f"╚══════════════════════════════════════╝")
drives = get_usb_drives()
if drives:
print(f"[USB] {len(drives)} clé(s) USB détectée(s):")
for d in drives:
print(f" • {d['name']} ({d['path']}) - {d['free_gb']} GB libre")
else:
print("[USB] Aucune clé USB détectée au démarrage")
print()
httpd.serve_forever()

if __name__ == '__main__':
run()
(1-1/3)