[WIP] Refactor player lifetime

This commit is contained in:
Xiretza 2025-03-03 22:32:12 +00:00
parent 3577401935
commit 9d039dd839
4 changed files with 88 additions and 39 deletions

1
Cargo.lock generated
View file

@ -464,6 +464,7 @@ dependencies = [
"clap",
"color-eyre",
"command-fds",
"futures-util",
"serde",
"serde_json",
"thiserror 2.0.11",

View file

@ -8,6 +8,7 @@ axum = "0.8.1"
clap = { version = "4.5.31", features = ["derive"] }
color-eyre = { version = "0.6.3", features = ["capture-spantrace"] }
command-fds = { version = "0.3.0", features = ["tokio"] }
futures-util = { version = "0.3.31", default-features = false }
serde = { version = "1.0.218", features = ["derive"] }
serde_json = "1.0.139"
thiserror = "2.0.11"

View file

@ -4,7 +4,7 @@ use axum::{Json, Router, extract::State, http::StatusCode, response::IntoRespons
use clap::Parser;
use serde::Serialize;
use thiserror::Error;
use tokio::sync::{Mutex, MutexGuard};
use tokio::sync::{Mutex, MutexGuard, oneshot};
use tracing::{Level, debug, info, instrument, warn};
use tracing_subscriber::layer::SubscriberExt;
@ -12,7 +12,7 @@ mod mpv;
#[derive(Debug, Clone)]
struct AppState {
player: Arc<Mutex<Option<mpv::Mpv>>>,
player: Arc<Mutex<mpv::Mpv>>,
}
#[derive(Debug, Error)]
@ -22,10 +22,30 @@ enum StopError {
}
impl AppState {
pub fn new() -> Self {
Self {
player: Arc::new(Mutex::new(None)),
}
pub fn new() -> Result<Self, mpv::SpawnError> {
Ok(Self {
player: Self::spawn_player()?,
})
}
fn spawn_player() -> Result<Arc<Mutex<mpv::Mpv>>, mpv::SpawnError> {
// We need a reference to the Mpv mutex in the death handler. Arc::try_new_cyclic would be nice
let (sender, receiver) = oneshot::channel::<Arc<Mutex<mpv::Mpv>>>();
let on_death = async move |error| {
warn!(%error, "Player has encountered an error");
let Ok(player) = receiver.try_recv() else {
return;
};
let mut player = player.lock().await;
*player = Self::spawn_player()
};
let player = Arc::new(Mutex::new(mpv::Mpv::spawn(Some(on_death))));
let Ok(()) = sender.send(player) else {
warn!("Player died before we could hand it to the death handler");
};
Ok(player)
}
#[instrument(skip(self))]
@ -50,7 +70,7 @@ impl AppState {
// ugly, but borrow checker gets mad about the &mut lifetimes otherwise
if player.is_none() {
debug!("Spawning new player instance");
*player = Some(mpv::Mpv::spawn()?);
*player = Some();
} else {
debug!("Using existing player instance");
}

View file

@ -1,4 +1,5 @@
use command_fds::CommandFdExt as _;
use futures_util::FutureExt;
use serde::{Deserialize, Serialize};
use std::{
collections::HashMap,
@ -15,8 +16,9 @@ use tokio::{
io::{AsyncBufReadExt as _, AsyncWriteExt as _, BufReader},
net::unix::OwnedWriteHalf,
process::{Child, Command},
select,
sync::{Mutex, broadcast, oneshot},
task::JoinError,
task::{AbortHandle, JoinError, JoinHandle},
time::timeout,
};
use tokio_util::task::AbortOnDropHandle;
@ -215,17 +217,25 @@ pub(crate) enum PlayerError {
#[derive(Debug)]
pub(crate) struct Mpv {
player: Child,
/// May be held by the `death_watcher`
/// Option because it needs to be separated from the rest of `self` during `exit()`.
player: Option<Arc<Mutex<Child>>>,
ipc_write: OwnedWriteHalf,
next_request_id: u32,
requests: Arc<Mutex<RequestMap>>,
event_sender: broadcast::Sender<IpcEvent>,
/// May be moved into the `death_watcher`
rx_task: Option<AbortOnDropHandle<Result<Infallible, IpcHandlerError>>>,
death_watcher: Option<AbortOnDropHandle<()>>,
}
impl Mpv {
#[instrument]
pub(crate) fn spawn() -> Result<Self, SpawnError> {
#[instrument(skip(on_death))]
pub(crate) fn spawn<F, Fut>(on_death: Option<F>) -> Result<Self, SpawnError>
where
F: FnOnce(PlayerError) -> Fut + Send + 'static,
Fut: Future<Output = ()> + Send + 'static,
{
let (our_stream, child_stream) =
tokio::net::UnixStream::pair().map_err(SpawnError::IpcSocket)?;
let child_stream = OwnedFd::from(child_stream.into_std().unwrap());
@ -245,7 +255,7 @@ impl Mpv {
.stderr(Stdio::inherit())
.preserved_fds(vec![child_stream]);
info!(?command, "Spawning player");
let player = command.spawn().map_err(SpawnError::Command)?;
let player = Arc::new(Mutex::new(command.spawn().map_err(SpawnError::Command)?));
let (ipc_read, ipc_write) = our_stream.into_split();
@ -270,21 +280,60 @@ impl Mpv {
}
})
};
let rx_task = Some(AbortOnDropHandle::new(rx_task));
let mut rx_task = Some(AbortOnDropHandle::new(rx_task));
let mut death_watcher = None;
if let Some(on_death) = on_death {
let rx_task = rx_task.take().unwrap();
let player = Arc::clone(&player);
let wait_for_death = async move {
let mut player = player.lock().await;
// wait for either the IPC task to exit or the child to die
select! {
ret = rx_task => match ret {
Ok(Err(err)) => PlayerError::IpcHandlerDied(err),
Err(err) => PlayerError::JoinIpcHandler(err),
},
ret = player.wait() => match ret {
Ok(err) => PlayerError::ChildExited(err),
Err(err) => PlayerError::Wait(err),
},
}
};
death_watcher = Some(AbortOnDropHandle::new(tokio::task::spawn(
wait_for_death.then(on_death),
)));
}
Ok(Self {
player,
player: Some(player),
ipc_write,
next_request_id: 0,
requests,
event_sender,
rx_task,
death_watcher,
})
}
#[instrument(skip(self))]
pub async fn exit(mut self) -> Result<(), ExitError> {
if let Ok(Some(_)) = self.player.try_wait() {
// first things first, abort the death watcher, since this is an expected exit
if let Some(death_watcher) = self.death_watcher.take() {
death_watcher.abort();
let _ = death_watcher.await;
}
let player = self
.player
.take()
.expect("player is only taken during exit(), should still be there");
let mut player = player
.try_lock()
.expect("death watch has been cancelled, player should not be locked anymore");
if let Ok(Some(_)) = player.try_wait() {
debug!("Child has already exited");
return Ok(());
}
@ -294,7 +343,7 @@ impl Mpv {
Err(error) => {
warn!(?error, "Failed to stop process via IPC");
}
Ok(_) => match timeout(Duration::from_millis(500), self.player.wait()).await {
Ok(_) => match timeout(Duration::from_millis(500), player.wait()).await {
Ok(Ok(_)) => return Ok(()),
Ok(Err(error)) => warn!(?error, "Failed to wait for child to exit"),
Err(_) => warn!("Timed out waiting for child to exit"),
@ -302,33 +351,11 @@ impl Mpv {
}
info!("Killing child with SIGKILL");
self.player.kill().await.map_err(ExitError::Kill)?;
player.kill().await.map_err(ExitError::Kill)?;
Ok(())
}
pub async fn check_health(&mut self) -> Result<(), PlayerError> {
if let Some(exit_status) = self.player.try_wait().map_err(PlayerError::Wait)? {
return Err(PlayerError::ChildExited(exit_status));
}
let Some(rx_task) = self.rx_task.as_ref() else {
return Err(PlayerError::IpcHandlerGone);
};
if rx_task.is_finished() {
let Err(e) = self
.rx_task
.take()
.unwrap()
.await
.map_err(PlayerError::JoinIpcHandler)?;
Err(PlayerError::IpcHandlerDied(e))
} else {
Ok(())
}
}
fn next_request_id(&mut self) -> RequestId {
let ret = RequestId(self.next_request_id);
self.next_request_id += 1;