from asyncio.streams import StreamReader, StreamWriter import re from typing import Any, LiteralString, Mapping from flask import Flask, abort, render_template, request, send_file, jsonify from werkzeug.utils import secure_filename import os import subprocess import signal from pathlib import Path import tempfile import json import asyncio mpv_pidfile = Path(tempfile.gettempdir()).joinpath("laas_mpv.pidfile") mpv_socket = Path(tempfile.gettempdir()).joinpath("mpvsocket") music_path = Path(os.environ.get("MUSIC_PATH", "./music")) fx_path = Path(os.environ.get("FX_PATH", "./fx")) mpv_process: subprocess.Popen | None = None app = Flask(__name__) volume: float = 50 def get_path(base: Path, file: LiteralString | str | Path): p = base.resolve().joinpath(file) p.resolve().relative_to(base) return p def cleanup_unclean(): if mpv_pidfile.is_file(): print("Unclean shutdown detected") try: mpv_pid = int(open(mpv_pidfile, "r").read().strip()) print(f"Killing mpv process with pid {mpv_pid}") os.kill(mpv_pid, signal.SIGTERM) except Exception: pass mpv_pidfile.unlink() if mpv_socket.is_dir(): mpv_socket.rmdir() elif mpv_socket.exists(): mpv_socket.unlink() def shutdown(status: int = 0): mpv_stop() os._exit(status) def sigh(signum: int, _frame): status = 0 if signum == signal.SIGTERM else 128 + signum shutdown(status) def mpv_spawn(args: list[str]): return subprocess.Popen( [ "mpv", "--no-video", f"--volume={volume}", "--terminal=no", *args, ], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, ) async def mpv_start(args: list[str]): global mpv_process mpv_process = mpv_spawn([f"--input-ipc-server={mpv_socket}", *args]) open(mpv_pidfile, "w").write(str(mpv_process.pid)) while not mpv_socket.exists(): await asyncio.sleep(0.01) async def playback_start(): return await mpv_start( args=[ "--shuffle", # "--loop-playlist", str(music_path), ] ) def mpv_stop(): global mpv_process if mpv_process is not None: mpv_process.terminate() mpv_process.wait() mpv_process = None if mpv_pidfile.exists(): mpv_pidfile.unlink() def mpv_running(): global mpv_process running = mpv_process is not None if mpv_process is not None: mpv_process.poll() running = mpv_process.returncode is None return running async def mpv_socket_open() -> tuple[StreamReader, StreamWriter] | None: async def socket_open_helper(): while True: try: return await asyncio.open_unix_connection(mpv_socket) except ConnectionRefusedError: await asyncio.sleep(0.05) if mpv_running(): try: return await asyncio.wait_for(socket_open_helper(), timeout=1) except asyncio.TimeoutError: mpv_stop() if mpv_socket.is_dir(): mpv_socket.rmdir() elif mpv_socket.exists(): mpv_socket.unlink() return None async def mpv_socket_command( reader: StreamReader, writer: StreamWriter, command: dict[str, Any] ) -> Mapping[str, Any] | None: async def command_helper(): writer.write((json.dumps(command) + "\n").encode()) await writer.drain() reply = await reader.readline() return json.loads(reply.decode()) try: return await asyncio.wait_for(command_helper(), timeout=0.1) except asyncio.TimeoutError: return None def sizeof_fmt(num, suffix="B"): for unit in ("", "Ki", "Mi", "Gi", "Ti", "Pi", "Ei", "Zi"): if abs(num) < 1024.0: return f"{num:3.1f}{unit}{suffix}" num /= 1024.0 return f"{num:.1f}Yi{suffix}" @app.route("/", methods=["GET"]) def route_interface(): return render_template("player.html", playing=mpv_running()) @app.route("/", methods=["POST"]) async def route_toggle(): global mpv_process if mpv_running(): print("Stopping Audio Playback..") mpv_stop() else: print("Starting Audio Playback..") await playback_start() return render_template("player.html", playing=mpv_running()) @app.route("/files/<path:path>", methods=["GET"]) @app.route("/files") def filemgr(path=""): try: full_path = get_path(music_path, path) except ValueError: abort(403) print(full_path) if os.path.isfile(full_path): return send_file(full_path) files = os.listdir(full_path) for i in range(len(files)): if os.path.isfile(os.path.join(full_path, files[i])): files[i] = [files[i], os.path.getsize(os.path.join(full_path, files[i]))] else: totalsize = 0 for file in os.listdir(os.path.join(full_path, files[i])): totalsize += os.path.getsize(os.path.join(full_path, files[i], file)) files[i] = [files[i], totalsize] for file in files: file[1] = sizeof_fmt(file[1]) return render_template("filemanager.html", files=files, path=path) @app.route("/upload") def upload(): return render_template("upload.html") @app.route("/api/start", methods=["POST"]) async def api_start(): if mpv_running(): return jsonify("Laas is already lofi'ing"), 400 else: await playback_start() return jsonify("ok"), 200 @app.route("/api/stop", methods=["POST"]) def api_stop(): if not mpv_running(): return jsonify("You cant stop when theres no playback, womp womp!"), 400 else: mpv_stop() return jsonify("ok"), 200 @app.route("/api/status", methods=["GET"]) def api_status(): return jsonify(mpv_running()) @app.route("/api/nowplaying", methods=["GET"]) async def api_nowplaying(): response: dict[str, Any] = {"status": "stopped"} sock = await mpv_socket_open() if sock is not None: response["status"] = "playing" for prop in [ "filename", ("metadata/by-key/artist", "artist"), ("metadata/by-key/album", "album"), ("media-title", "title"), ("playback-time", "position"), "duration", ]: if isinstance(prop, tuple): prop, key = prop else: key = prop reply_json = await mpv_socket_command( *sock, {"command": ["get_property", prop]} ) if reply_json is None: break response[key] = reply_json["data"] if "data" in reply_json else "Unknown" sock[1].close() await sock[1].wait_closed() return jsonify(response) @app.route("/api/play", methods=["POST"], defaults={"filename_or_url": None}) @app.route("/api/play/<path:filename_or_url>", methods=["POST"]) async def api_play_track( filename_or_url: str | None = None, error_str: str = "Could not play file '{filename}'", ): if filename_or_url is None: filename_or_url = request.get_data().decode() if re.match("^https?://.*", filename_or_url): playback_uri = filename_or_url else: try: file_path = get_path(music_path, filename_or_url) except ValueError: return jsonify(error_str.format(filename=filename_or_url)), 403 if not file_path.exists(): return jsonify(error_str.format(filename=filename_or_url)), 404 playback_uri = str(file_path) if mpv_running(): mpv_stop() await mpv_start([playback_uri]) if not mpv_running(): return jsonify(error_str.format(filename=filename_or_url)), 500 return jsonify("ok") @app.route("/api/isengard", methods=["POST"]) async def api_isengard(): return await api_play_track( "isengard.mp3", error_str="Could not take the hobbits to Isengard" ) @app.route("/api/fx", methods=["POST"], defaults={"filename_or_url": None}) @app.route("/api/fx/<path:filename_or_url>", methods=["POST"]) async def api_play_fx( filename_or_url: str | None = None, error_str: str = "Could not play file '{filename}'", ): if filename_or_url is None: filename_or_url = request.get_data().decode() if re.match("^https?://.*", filename_or_url): playback_uri = filename_or_url else: try: file_path = get_path(fx_path, filename_or_url) except ValueError: return jsonify(error_str.format(filename=filename_or_url)), 403 if not file_path.exists(): return jsonify(error_str.format(filename=filename_or_url)), 404 playback_uri = str(file_path) fx_process = mpv_spawn([playback_uri]) await asyncio.sleep(0.1) fx_process.poll() if fx_process.returncode not in [None, 0]: return jsonify(error_str.format(filename=filename_or_url)), 500 return jsonify("ok") @app.route("/api/volume", methods=["GET", "PUT"]) async def api_volume(): if request.method == "PUT": global volume try: volume = float(request.get_data().decode()) if not 0 <= volume <= 100: raise ValueError() except Exception: return jsonify("bad volume"), 400 sock = await mpv_socket_open() if sock is not None: await mpv_socket_command(*sock, {"command": ["set", "volume", str(volume)]}) sock[1].close() await sock[1].wait_closed() return jsonify(volume) @app.route("/api/skip", methods=["POST"]) async def api_skip(): sock = await mpv_socket_open() if sock is not None: await mpv_socket_command(*sock, {"command": ["playlist-next"]}) sock[1].close() await sock[1].wait_closed() return "ok" @app.route("/api/upload", methods=["POST"]) def upload_api(): filename = request.form.get("filename") upload_type = request.form.get("type") if "file" not in request.files: abort(400, "You need to provide a file") file = request.files["file"] if file.filename == "" or filename == "": abort(400, "You need to provide a file") filename = secure_filename(filename) if upload_type == "music": file.save(os.path.join(music_path, filename)) elif upload_type == "fx": file.save(os.path.join(fx_path, filename)) else: abort(400, "Invalid file type") return f"file uploaded as {filename}" signal.signal(signal.SIGTERM, sigh) signal.signal(signal.SIGINT, sigh) cleanup_unclean() music_path.mkdir(parents=True, exist_ok=True) if __name__ == "__main__": try: app.run(host="::", port=1337) finally: mpv_stop()