Compare commits

...
Sign in to create a new pull request.

12 commits
main ... rust

Author SHA1 Message Date
9d039dd839 [WIP] Refactor player lifetime 2025-03-04 17:35:18 +00:00
3577401935 Redo event stream
Subscribing to a broadcast ahead of time is so much simpler than passing
a channel along with the command and making the IPC handler look inside
event contents.
2025-03-03 19:16:38 +00:00
19f903aa5c Add more common event types 2025-03-03 17:21:17 +00:00
e13282d5dd Wait for song to start playing successfully 2025-03-03 17:20:56 +00:00
189d911fd6 Move play() method into Mpv 2025-03-03 17:18:36 +00:00
00c772c711 mpv: track current playlist_entry_id 2025-03-03 17:18:36 +00:00
204d36238b Get mpv off the terminal 2025-03-03 17:18:36 +00:00
0fd860cf00 Adjust tracing levels 2025-03-03 17:18:36 +00:00
2b4f4785aa Handle IPC messages in background task 2025-03-03 17:18:36 +00:00
39b2670529 always keep mpv running in background 2025-03-03 17:18:36 +00:00
23a28936e5 Refactor mpv handler to separate module 2025-03-03 17:18:36 +00:00
4fe7df6036 Rewrite it in Rust (WIP) 2025-03-03 17:18:36 +00:00
5 changed files with 1872 additions and 0 deletions

5
.gitignore vendored
View file

@ -1,2 +1,7 @@
music/
__pycache__/
# Added by cargo
/target

1183
Cargo.lock generated Normal file

File diff suppressed because it is too large Load diff

19
Cargo.toml Normal file
View file

@ -0,0 +1,19 @@
[package]
name = "laas"
version = "0.1.0"
edition = "2024"
[dependencies]
axum = "0.8.1"
clap = { version = "4.5.31", features = ["derive"] }
color-eyre = { version = "0.6.3", features = ["capture-spantrace"] }
command-fds = { version = "0.3.0", features = ["tokio"] }
futures-util = { version = "0.3.31", default-features = false }
serde = { version = "1.0.218", features = ["derive"] }
serde_json = "1.0.139"
thiserror = "2.0.11"
tokio = { version = "1.43.0", features = ["full"] }
tokio-util = { version = "0.7.13", features = ["rt"] }
tracing = "0.1.41"
tracing-error = "0.2.1"
tracing-subscriber = { version = "0.3.19", features = ["env-filter"] }

185
src/main.rs Normal file
View file

@ -0,0 +1,185 @@
use std::{error::Error, fmt::Debug, path::Path, sync::Arc};
use axum::{Json, Router, extract::State, http::StatusCode, response::IntoResponse, routing::post};
use clap::Parser;
use serde::Serialize;
use thiserror::Error;
use tokio::sync::{Mutex, MutexGuard, oneshot};
use tracing::{Level, debug, info, instrument, warn};
use tracing_subscriber::layer::SubscriberExt;
mod mpv;
#[derive(Debug, Clone)]
struct AppState {
player: Arc<Mutex<mpv::Mpv>>,
}
#[derive(Debug, Error)]
enum StopError {
#[error("failed to communicate via IPC")]
Ipc(#[from] mpv::IpcError),
}
impl AppState {
pub fn new() -> Result<Self, mpv::SpawnError> {
Ok(Self {
player: Self::spawn_player()?,
})
}
fn spawn_player() -> Result<Arc<Mutex<mpv::Mpv>>, mpv::SpawnError> {
// We need a reference to the Mpv mutex in the death handler. Arc::try_new_cyclic would be nice
let (sender, receiver) = oneshot::channel::<Arc<Mutex<mpv::Mpv>>>();
let on_death = async move |error| {
warn!(%error, "Player has encountered an error");
let Ok(player) = receiver.try_recv() else {
return;
};
let mut player = player.lock().await;
*player = Self::spawn_player()
};
let player = Arc::new(Mutex::new(mpv::Mpv::spawn(Some(on_death))));
let Ok(()) = sender.send(player) else {
warn!("Player died before we could hand it to the death handler");
};
Ok(player)
}
#[instrument(skip(self))]
async fn get_player(&self) -> MutexGuard<Option<mpv::Mpv>> {
let mut player = self.player.lock().await;
if let Some(p) = player.as_mut() {
if let Err(error) = p.check_health().await {
warn!(%error, "Player has encountered an error, dropping");
let _ = player.take().unwrap().exit().await;
}
}
player
}
#[instrument(skip(self))]
pub async fn play<P: AsRef<Path> + Debug>(&self, file: P) -> Result<(), mpv::PlayError> {
let file = file.as_ref();
let mut player = self.get_player().await;
// ugly, but borrow checker gets mad about the &mut lifetimes otherwise
if player.is_none() {
debug!("Spawning new player instance");
*player = Some();
} else {
debug!("Using existing player instance");
}
let player = player.as_mut().unwrap();
player.play(file).await?;
Ok(())
}
#[instrument(skip(self))]
async fn stop(&self) -> Result<(), StopError> {
let mut player = self.player.lock().await;
let Some(player) = player.as_mut() else {
debug!("No child currently active");
return Ok(());
};
player.send_ipc(&["stop".to_owned()]).await?;
Ok(())
}
}
struct JsonError<E> {
inner: E,
}
impl<E: Error> IntoResponse for JsonError<E> {
fn into_response(self) -> axum::response::Response {
#[derive(Serialize)]
struct Response {
error_stack: Vec<String>,
}
let mut error_stack = Vec::new();
let mut e: &dyn Error = &self.inner;
loop {
error_stack.push(e.to_string());
let Some(next) = e.source() else { break };
e = next;
}
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(Response { error_stack }),
)
.into_response()
}
}
impl<E> From<E> for JsonError<E> {
fn from(value: E) -> Self {
Self { inner: value }
}
}
async fn start(State(state): State<AppState>) -> Result<Json<String>, JsonError<mpv::PlayError>> {
state.play("/tmp/0118999.opus").await?;
Ok(Json("ok".to_owned()))
}
async fn stop(State(state): State<AppState>) -> Result<Json<String>, JsonError<StopError>> {
state.stop().await?;
Ok(Json("ok".to_owned()))
}
#[derive(Parser)]
struct Args {
/// Make log output more verbose. Can be specified multiple times.
#[arg(short, long, action = clap::ArgAction::Count)]
verbose: u8,
}
#[tokio::main]
async fn main() -> color_eyre::Result<()> {
let args = Args::parse();
color_eyre::install()?;
let filter = tracing_subscriber::EnvFilter::builder()
.with_default_directive(
match args.verbose {
0 => Level::ERROR,
1 => Level::INFO,
2 => Level::DEBUG,
_ => Level::TRACE,
}
.into(),
)
.from_env()?;
let subscriber = tracing_subscriber::Registry::default()
.with(tracing_error::ErrorLayer::default())
.with(filter)
.with(tracing_subscriber::fmt::Layer::new());
tracing::subscriber::set_global_default(subscriber)?;
let app = Router::new()
.route("/api/start", post(start))
.route("/api/stop", post(stop))
.with_state(AppState::new());
let listener = tokio::net::TcpListener::bind("[::]:8080").await?;
info!("Starting server");
axum::serve(listener, app).await?;
Ok(())
}

480
src/mpv.rs Normal file
View file

@ -0,0 +1,480 @@
use command_fds::CommandFdExt as _;
use futures_util::FutureExt;
use serde::{Deserialize, Serialize};
use std::{
collections::HashMap,
convert::Infallible,
fmt::Debug,
os::fd::{AsRawFd as _, OwnedFd},
path::{Path, PathBuf},
process::{ExitStatus, Stdio},
sync::Arc,
time::Duration,
};
use thiserror::Error;
use tokio::{
io::{AsyncBufReadExt as _, AsyncWriteExt as _, BufReader},
net::unix::OwnedWriteHalf,
process::{Child, Command},
select,
sync::{Mutex, broadcast, oneshot},
task::{AbortHandle, JoinError, JoinHandle},
time::timeout,
};
use tokio_util::task::AbortOnDropHandle;
use tracing::{debug, error, info, instrument, trace, warn};
#[derive(Debug, Error)]
pub(crate) enum SpawnError {
#[error("failed to open IPC socket")]
IpcSocket(#[source] std::io::Error),
#[error("failed to spawn player command")]
Command(#[source] std::io::Error),
}
#[derive(Debug, Error)]
pub(crate) enum ExitError {
#[error("failed to kill process")]
Kill(#[source] std::io::Error),
}
#[derive(Debug, Error)]
pub(crate) enum PlayError {
#[error("failed to create new player instance")]
Spawn(#[from] SpawnError),
#[error("invalid path: {0}")]
InvalidPath(PathBuf),
#[error("failed to communicate via IPC")]
Ipc(#[from] IpcError),
#[error("file was not reported as loaded in time")]
LoadTimedOut,
#[error("event stream ended unexpectedly")]
EndOfEventStream,
#[error("failed to load file: {reason}, {}", file_error.as_deref().unwrap_or("[unknown]"))]
LoadFailed {
reason: String,
file_error: Option<String>,
},
#[error("invalid IPC response")]
InvalidIpcResponse(#[source] serde_json::Error),
}
#[derive(Debug, Error)]
pub(crate) enum IpcHandlerError {
#[error("failed to read from IPC stream")]
IoRead(#[source] std::io::Error),
#[error("IPC stream ended unexpectedly")]
EndOfStream,
}
#[derive(Debug, Error)]
pub(crate) enum IpcError {
#[error("failed to serialize IPC request")]
Serialize(#[source] serde_json::Error),
#[error("failed to write to IPC stream")]
IoWrite(#[source] std::io::Error),
#[error("timed out waiting for IPC response")]
ResponseTimedOut,
#[error("receiver task died")]
ReceiverDied(#[from] oneshot::error::RecvError),
#[error(transparent)]
ErrorResponse(#[from] ErrorResponse),
}
#[derive(Debug, Error, Deserialize)]
#[error("IPC returned error response: {0}")]
pub(crate) struct ErrorResponse(pub String);
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Hash)]
struct RequestId(u32);
type RequestMap = HashMap<RequestId, oneshot::Sender<Result<serde_json::Value, ErrorResponse>>>;
#[derive(Debug, Deserialize)]
#[serde(tag = "error", rename_all = "lowercase")]
enum CommandResponse {
Success {
data: serde_json::Value,
request_id: RequestId,
},
#[serde(untagged)]
Error {
error: ErrorResponse,
request_id: RequestId,
},
}
#[derive(Debug, Deserialize, Clone)]
#[serde(tag = "event", rename_all = "kebab-case")]
enum IpcEvent {
StartFile {
playlist_entry_id: u64,
},
EndFile {
playlist_entry_id: u64,
playlist_insert_id: Option<u64>,
reason: String,
file_error: Option<String>,
},
FileLoaded,
AudioReconfig,
PlaybackRestart,
/// Deprecated.
Idle,
#[serde(untagged)]
#[allow(unused)]
Unknown {
event: String,
#[serde(flatten)]
args: HashMap<String, serde_json::Value>,
},
}
struct IpcHandler {
requests: Arc<Mutex<RequestMap>>,
event_sender: broadcast::Sender<IpcEvent>,
}
impl IpcHandler {
pub fn new(
requests: Arc<Mutex<RequestMap>>,
event_sender: broadcast::Sender<IpcEvent>,
) -> Self {
Self {
requests,
event_sender,
}
}
#[instrument(skip(self))]
pub async fn handle_message(&mut self, message: &str) -> Result<(), serde_json::Error> {
trace!("Got IPC message");
#[derive(Deserialize)]
#[serde(untagged)]
pub(crate) enum IpcMessage {
Event(IpcEvent),
Response(CommandResponse),
}
let response = serde_json::from_str::<IpcMessage>(message)?;
match response {
IpcMessage::Response(response) => {
self.handle_command_response(response).await;
Ok(())
}
IpcMessage::Event(event) => {
self.handle_event(event).await;
Ok(())
}
}
}
#[instrument(skip(self))]
async fn handle_command_response(&mut self, response: CommandResponse) {
trace!("Got command response");
let (CommandResponse::Success { request_id, .. }
| CommandResponse::Error { request_id, .. }) = &response;
let Some(response_sender) = self.requests.lock().await.remove(request_id) else {
error!("No active request for response");
return;
};
let response = match response {
CommandResponse::Success { data, .. } => Ok(data),
CommandResponse::Error { error, .. } => Err(error),
};
if response_sender.send(response).is_err() {
debug!("Failed to send to command response channel, probably timed out");
}
}
#[instrument(skip(self))]
async fn handle_event(&mut self, event: IpcEvent) {
trace!("Got event");
// don't care if there are no receivers
let _ = self.event_sender.send(event);
}
}
#[derive(Debug, Error)]
pub(crate) enum PlayerError {
#[error("failed to wait on child process")]
Wait(#[source] std::io::Error),
#[error("child process exited with code {0}")]
ChildExited(ExitStatus),
#[error("IPC handler is gone")]
IpcHandlerGone,
#[error("failed to join on IPC handler")]
JoinIpcHandler(#[source] JoinError),
#[error(transparent)]
IpcHandlerDied(#[from] IpcHandlerError),
}
#[derive(Debug)]
pub(crate) struct Mpv {
/// May be held by the `death_watcher`
/// Option because it needs to be separated from the rest of `self` during `exit()`.
player: Option<Arc<Mutex<Child>>>,
ipc_write: OwnedWriteHalf,
next_request_id: u32,
requests: Arc<Mutex<RequestMap>>,
event_sender: broadcast::Sender<IpcEvent>,
/// May be moved into the `death_watcher`
rx_task: Option<AbortOnDropHandle<Result<Infallible, IpcHandlerError>>>,
death_watcher: Option<AbortOnDropHandle<()>>,
}
impl Mpv {
#[instrument(skip(on_death))]
pub(crate) fn spawn<F, Fut>(on_death: Option<F>) -> Result<Self, SpawnError>
where
F: FnOnce(PlayerError) -> Fut + Send + 'static,
Fut: Future<Output = ()> + Send + 'static,
{
let (our_stream, child_stream) =
tokio::net::UnixStream::pair().map_err(SpawnError::IpcSocket)?;
let child_stream = OwnedFd::from(child_stream.into_std().unwrap());
let mut command = Command::new("mpv");
command
.arg("--idle")
.arg("--msg-level=all=warn")
.arg("--input-terminal=no")
.arg(format!(
"--input-ipc-client=fd://{}",
child_stream.as_raw_fd()
))
.arg("--no-video")
.stdin(Stdio::null())
.stdout(Stdio::inherit())
.stderr(Stdio::inherit())
.preserved_fds(vec![child_stream]);
info!(?command, "Spawning player");
let player = Arc::new(Mutex::new(command.spawn().map_err(SpawnError::Command)?));
let (ipc_read, ipc_write) = our_stream.into_split();
let requests = Arc::new(Mutex::new(HashMap::new()));
let (event_sender, _event_receiver) = broadcast::channel(5);
let rx_task = {
let requests = Arc::clone(&requests);
let event_sender = event_sender.clone();
tokio::task::spawn(async move {
let mut handler = IpcHandler::new(requests, event_sender);
let mut ipc_read = BufReader::new(ipc_read).lines();
loop {
let message = ipc_read
.next_line()
.await
.map_err(IpcHandlerError::IoRead)?
.ok_or(IpcHandlerError::EndOfStream)?;
if let Err(error) = handler.handle_message(&message).await {
warn!(%error, "Failed to handle IPC message");
};
}
})
};
let mut rx_task = Some(AbortOnDropHandle::new(rx_task));
let mut death_watcher = None;
if let Some(on_death) = on_death {
let rx_task = rx_task.take().unwrap();
let player = Arc::clone(&player);
let wait_for_death = async move {
let mut player = player.lock().await;
// wait for either the IPC task to exit or the child to die
select! {
ret = rx_task => match ret {
Ok(Err(err)) => PlayerError::IpcHandlerDied(err),
Err(err) => PlayerError::JoinIpcHandler(err),
},
ret = player.wait() => match ret {
Ok(err) => PlayerError::ChildExited(err),
Err(err) => PlayerError::Wait(err),
},
}
};
death_watcher = Some(AbortOnDropHandle::new(tokio::task::spawn(
wait_for_death.then(on_death),
)));
}
Ok(Self {
player: Some(player),
ipc_write,
next_request_id: 0,
requests,
event_sender,
rx_task,
death_watcher,
})
}
#[instrument(skip(self))]
pub async fn exit(mut self) -> Result<(), ExitError> {
// first things first, abort the death watcher, since this is an expected exit
if let Some(death_watcher) = self.death_watcher.take() {
death_watcher.abort();
let _ = death_watcher.await;
}
let player = self
.player
.take()
.expect("player is only taken during exit(), should still be there");
let mut player = player
.try_lock()
.expect("death watch has been cancelled, player should not be locked anymore");
if let Ok(Some(_)) = player.try_wait() {
debug!("Child has already exited");
return Ok(());
}
debug!("Trying to stop child via IPC");
match self.send_ipc(&["quit".to_owned()]).await {
Err(error) => {
warn!(?error, "Failed to stop process via IPC");
}
Ok(_) => match timeout(Duration::from_millis(500), player.wait()).await {
Ok(Ok(_)) => return Ok(()),
Ok(Err(error)) => warn!(?error, "Failed to wait for child to exit"),
Err(_) => warn!("Timed out waiting for child to exit"),
},
}
info!("Killing child with SIGKILL");
player.kill().await.map_err(ExitError::Kill)?;
Ok(())
}
fn next_request_id(&mut self) -> RequestId {
let ret = RequestId(self.next_request_id);
self.next_request_id += 1;
ret
}
#[instrument(skip(self))]
pub async fn send_ipc(&mut self, command: &[String]) -> Result<serde_json::Value, IpcError> {
let request_id = self.next_request_id();
#[derive(Serialize)]
pub(crate) struct IpcRequest<'a> {
command: &'a [String],
request_id: RequestId,
r#async: bool,
}
let serialized = serde_json::to_string(&IpcRequest {
command,
request_id,
r#async: true,
})
.map_err(IpcError::Serialize)?;
// register a channel for our request ID to get the response
let (response_sender, response_receiver) = oneshot::channel();
self.requests
.lock()
.await
.insert(request_id, response_sender);
debug!(serialized, "Sending IPC command");
self.ipc_write
.write_all(serialized.as_bytes())
.await
.map_err(IpcError::IoWrite)?;
self.ipc_write
.write_all(b"\n")
.await
.map_err(IpcError::IoWrite)?;
let response = timeout(Duration::from_millis(100), response_receiver)
.await
.map_err(|_e| IpcError::ResponseTimedOut)???;
Ok(response)
}
#[instrument(skip(self))]
pub async fn play<P: AsRef<Path> + Debug>(&mut self, file: P) -> Result<(), PlayError> {
let file = file.as_ref();
let mut event_receiver = self.event_sender.subscribe();
let response = self
.send_ipc(&[
"loadfile".to_owned(),
file.to_str()
.ok_or_else(|| PlayError::InvalidPath(file.to_owned()))?
.to_owned(),
])
.await?;
#[derive(Debug, Deserialize)]
struct ResponseData {
playlist_entry_id: u64,
}
let response: ResponseData =
serde_json::from_value(response).map_err(PlayError::InvalidIpcResponse)?;
let mut current_entry_id = response.playlist_entry_id;
timeout(Duration::from_millis(100), async move {
loop {
let event = match event_receiver.recv().await {
Ok(event) => event,
Err(broadcast::error::RecvError::Closed) => {
return Err(PlayError::EndOfEventStream);
}
Err(broadcast::error::RecvError::Lagged(_)) => {
warn!("Failed to keep up with event stream");
continue;
}
};
match event {
IpcEvent::StartFile { playlist_entry_id } => {
if playlist_entry_id != current_entry_id {
debug!(playlist_entry_id, "Incorrect playlist_entry_id, ignoring");
continue;
}
debug!(playlist_entry_id, "File has been added to playlist");
}
IpcEvent::FileLoaded => {
debug!("File has been loaded successfully");
return Ok(());
}
IpcEvent::EndFile {
reason,
file_error,
playlist_entry_id,
playlist_insert_id,
} => {
if playlist_entry_id != current_entry_id {
debug!(playlist_entry_id, "Incorrect playlist_entry_id, ignoring");
continue;
}
if let Some(insert_id) = playlist_insert_id {
debug!(insert_id, "File has been replaced with new entry");
current_entry_id = insert_id;
} else {
warn!(reason, file_error, "Failed to play file");
return Err(PlayError::LoadFailed { reason, file_error });
}
}
_ => {}
}
}
})
.await
.map_err(|_| PlayError::LoadTimedOut)?
}
}