|
#!/usr/bin/env python3
|
|
"""
|
|
Serveur HTTP pour Raspberry Pi - Upload FreeCAD vers clés USB
|
|
Supporte le portail captif (redirection automatique) et la copie vers USB
|
|
"""
|
|
|
|
from http.server import BaseHTTPRequestHandler, HTTPServer
|
|
import os
|
|
import shutil
|
|
import cgi
|
|
import json
|
|
import subprocess
|
|
import glob
|
|
|
|
# ─── Configuration ────────────────────────────────────────────────────────────
|
|
PORT = 80
|
|
UPLOAD_DIR = "/tmp/uploads" # Dossier temporaire avant copie USB
|
|
SERVER_IP = "192.168.4.1" # IP du Raspberry Pi (hotspot)
|
|
|
|
# ─── Détection des clés USB ───────────────────────────────────────────────────
|
|
|
|
def get_usb_drives():
|
|
"""Retourne la liste des clés USB montées avec leur chemin et espace disponible."""
|
|
drives = []
|
|
try:
|
|
# Cherche les périphériques montés dans /media ou /mnt
|
|
result = subprocess.run(
|
|
['lsblk', '-J', '-o', 'NAME,MOUNTPOINT,TRAN,SIZE,FSTYPE,LABEL'],
|
|
capture_output=True, text=True
|
|
)
|
|
data = json.loads(result.stdout)
|
|
|
|
def parse_block(device):
|
|
# Cherche les périphériques USB montés
|
|
if device.get('tran') == 'usb' and device.get('mountpoint'):
|
|
mp = device['mountpoint']
|
|
if mp and mp not in ['/', '/boot']:
|
|
stat = shutil.disk_usage(mp)
|
|
drives.append({
|
|
'name': device.get('label') or device['name'],
|
|
'path': mp,
|
|
'device': device['name'],
|
|
'size': device.get('size', '?'),
|
|
'free_gb': round(stat.free / (1024**3), 2),
|
|
'total_gb': round(stat.total / (1024**3), 2),
|
|
})
|
|
# Recurse sur les partitions
|
|
for child in device.get('children', []):
|
|
parse_block(child)
|
|
|
|
for device in data.get('blockdevices', []):
|
|
parse_block(device)
|
|
|
|
except Exception as e:
|
|
print(f"[USB] Erreur détection: {e}")
|
|
# Fallback: cherche dans /media/pi ou /mnt
|
|
for path in glob.glob('/media/pi/*') + glob.glob('/mnt/usb*'):
|
|
if os.path.ismount(path):
|
|
try:
|
|
stat = shutil.disk_usage(path)
|
|
drives.append({
|
|
'name': os.path.basename(path),
|
|
'path': path,
|
|
'device': '?',
|
|
'size': '?',
|
|
'free_gb': round(stat.free / (1024**3), 2),
|
|
'total_gb': round(stat.total / (1024**3), 2),
|
|
})
|
|
except:
|
|
pass
|
|
|
|
return drives
|
|
|
|
|
|
def copy_to_usb_drives(file_path, filename):
|
|
"""Copie un fichier vers toutes les clés USB disponibles."""
|
|
drives = get_usb_drives()
|
|
results = []
|
|
|
|
if not drives:
|
|
return [{"drive": "Aucune clé USB", "success": False, "error": "Aucune clé USB détectée"}]
|
|
|
|
for drive in drives:
|
|
dest_dir = os.path.join(drive['path'], 'FreeCAD_Files')
|
|
dest_path = os.path.join(dest_dir, filename)
|
|
try:
|
|
os.makedirs(dest_dir, exist_ok=True)
|
|
shutil.copy2(file_path, dest_path)
|
|
results.append({
|
|
"drive": drive['name'],
|
|
"path": dest_path,
|
|
"success": True
|
|
})
|
|
print(f"[USB] ✓ Copié vers {dest_path}")
|
|
except Exception as e:
|
|
results.append({
|
|
"drive": drive['name'],
|
|
"path": drive['path'],
|
|
"success": False,
|
|
"error": str(e)
|
|
})
|
|
print(f"[USB] ✗ Erreur copie vers {drive['path']}: {e}")
|
|
|
|
return results
|
|
|
|
|
|
# ─── Pages HTML ───────────────────────────────────────────────────────────────
|
|
|
|
def load_html():
|
|
"""Charge index.html depuis le même dossier que ce script."""
|
|
script_dir = os.path.dirname(os.path.abspath(__file__))
|
|
html_path = os.path.join(script_dir, 'index.html')
|
|
with open(html_path, 'rb') as f:
|
|
return f.read()
|
|
|
|
|
|
# ─── Gestionnaire HTTP ────────────────────────────────────────────────────────
|
|
|
|
class RequestHandler(BaseHTTPRequestHandler):
|
|
|
|
def log_message(self, format, *args):
|
|
print(f"[HTTP] {self.address_string()} - {format % args}")
|
|
|
|
# ── Portail captif: intercepte toutes les requêtes non-locales ──
|
|
def is_captive_probe(self):
|
|
"""Détecte les requêtes de vérification de connectivité (Android/iOS/Windows)."""
|
|
captive_paths = [
|
|
'/generate_204', # Android
|
|
'/gen_204', # Android Chrome
|
|
'/hotspot-detect.html', # Apple iOS/macOS
|
|
'/ncsi.txt', # Windows
|
|
'/connecttest.txt', # Windows 10+
|
|
'/redirect', # Divers
|
|
'/success.txt', # Firefox
|
|
'/canonical.html', # Ubuntu
|
|
]
|
|
return self.path.split('?')[0] in captive_paths
|
|
|
|
def do_GET(self):
|
|
host = self.headers.get('Host', '')
|
|
|
|
# ── Portail captif ──────────────────────────────────────────
|
|
# Si la requête ne vient pas de notre IP, on redirige
|
|
if self.is_captive_probe() or (host and SERVER_IP not in host and 'localhost' not in host):
|
|
self.send_response(302)
|
|
self.send_header('Location', f'http://{SERVER_IP}/')
|
|
self.end_headers()
|
|
return
|
|
|
|
path = self.path.split('?')[0]
|
|
|
|
if path == '/' or path == '/index.html':
|
|
try:
|
|
html = load_html()
|
|
self.send_response(200)
|
|
self.send_header('Content-type', 'text/html; charset=utf-8')
|
|
self.end_headers()
|
|
self.wfile.write(html)
|
|
except FileNotFoundError:
|
|
self.send_error(404, "index.html introuvable")
|
|
|
|
elif path == '/api/usb':
|
|
# API JSON: liste des clés USB disponibles
|
|
drives = get_usb_drives()
|
|
self.send_response(200)
|
|
self.send_header('Content-type', 'application/json')
|
|
self.end_headers()
|
|
self.wfile.write(json.dumps(drives).encode())
|
|
|
|
else:
|
|
self.send_error(404, "Page non trouvée")
|
|
|
|
def do_POST(self):
|
|
if self.path == '/upload':
|
|
self._handle_upload()
|
|
else:
|
|
self.send_error(404)
|
|
|
|
def _handle_upload(self):
|
|
try:
|
|
form = cgi.FieldStorage(
|
|
fp=self.rfile,
|
|
headers=self.headers,
|
|
environ={'REQUEST_METHOD': 'POST',
|
|
'CONTENT_TYPE': self.headers['Content-Type']}
|
|
)
|
|
except Exception as e:
|
|
self._json_response(500, {"success": False, "error": f"Erreur lecture formulaire: {e}"})
|
|
return
|
|
|
|
os.makedirs(UPLOAD_DIR, exist_ok=True)
|
|
|
|
uploaded = []
|
|
errors = []
|
|
|
|
# Traite chaque fichier uploadé
|
|
items = form['files[]'] if 'files[]' in form else []
|
|
if not isinstance(items, list):
|
|
items = [items]
|
|
|
|
for item in items:
|
|
if not item.filename:
|
|
continue
|
|
|
|
filename = os.path.basename(item.filename)
|
|
tmp_path = os.path.join(UPLOAD_DIR, filename)
|
|
|
|
# Sauvegarde temporaire
|
|
try:
|
|
with open(tmp_path, 'wb') as f:
|
|
shutil.copyfileobj(item.file, f, length=131072)
|
|
except Exception as e:
|
|
errors.append({"file": filename, "error": f"Erreur sauvegarde: {e}"})
|
|
continue
|
|
|
|
# Copie vers les clés USB
|
|
usb_results = copy_to_usb_drives(tmp_path, filename)
|
|
|
|
# Supprime le fichier temporaire
|
|
try:
|
|
os.remove(tmp_path)
|
|
except:
|
|
pass
|
|
|
|
uploaded.append({
|
|
"file": filename,
|
|
"usb_results": usb_results
|
|
})
|
|
|
|
response = {
|
|
"success": len(errors) == 0,
|
|
"uploaded": uploaded,
|
|
"errors": errors,
|
|
"usb_drives": get_usb_drives()
|
|
}
|
|
|
|
self._json_response(200, response)
|
|
|
|
def _json_response(self, code, data):
|
|
body = json.dumps(data, ensure_ascii=False).encode('utf-8')
|
|
self.send_response(code)
|
|
self.send_header('Content-type', 'application/json; charset=utf-8')
|
|
self.send_header('Content-Length', len(body))
|
|
self.end_headers()
|
|
self.wfile.write(body)
|
|
|
|
|
|
# ─── Lancement ────────────────────────────────────────────────────────────────
|
|
|
|
def run():
|
|
os.makedirs(UPLOAD_DIR, exist_ok=True)
|
|
server_address = ('', PORT)
|
|
httpd = HTTPServer(server_address, RequestHandler)
|
|
print(f"╔══════════════════════════════════════╗")
|
|
print(f"║ Serveur FreeCAD USB - Port {PORT} ║")
|
|
print(f"║ Adresse: http://{SERVER_IP} ║")
|
|
print(f"║ Uploads temp: {UPLOAD_DIR} ║")
|
|
print(f"╚══════════════════════════════════════╝")
|
|
drives = get_usb_drives()
|
|
if drives:
|
|
print(f"[USB] {len(drives)} clé(s) USB détectée(s):")
|
|
for d in drives:
|
|
print(f" • {d['name']} ({d['path']}) - {d['free_gb']} GB libre")
|
|
else:
|
|
print("[USB] Aucune clé USB détectée au démarrage")
|
|
print()
|
|
httpd.serve_forever()
|
|
|
|
if __name__ == '__main__':
|
|
run()
|