Compare commits
12 commits
Author | SHA1 | Date | |
---|---|---|---|
9d039dd839 | |||
3577401935 | |||
19f903aa5c | |||
e13282d5dd | |||
189d911fd6 | |||
00c772c711 | |||
204d36238b | |||
0fd860cf00 | |||
2b4f4785aa | |||
39b2670529 | |||
23a28936e5 | |||
4fe7df6036 |
5 changed files with 1872 additions and 0 deletions
5
.gitignore
vendored
5
.gitignore
vendored
|
@ -1,2 +1,7 @@
|
|||
music/
|
||||
__pycache__/
|
||||
|
||||
|
||||
# Added by cargo
|
||||
|
||||
/target
|
||||
|
|
1183
Cargo.lock
generated
Normal file
1183
Cargo.lock
generated
Normal file
File diff suppressed because it is too large
Load diff
19
Cargo.toml
Normal file
19
Cargo.toml
Normal file
|
@ -0,0 +1,19 @@
|
|||
[package]
|
||||
name = "laas"
|
||||
version = "0.1.0"
|
||||
edition = "2024"
|
||||
|
||||
[dependencies]
|
||||
axum = "0.8.1"
|
||||
clap = { version = "4.5.31", features = ["derive"] }
|
||||
color-eyre = { version = "0.6.3", features = ["capture-spantrace"] }
|
||||
command-fds = { version = "0.3.0", features = ["tokio"] }
|
||||
futures-util = { version = "0.3.31", default-features = false }
|
||||
serde = { version = "1.0.218", features = ["derive"] }
|
||||
serde_json = "1.0.139"
|
||||
thiserror = "2.0.11"
|
||||
tokio = { version = "1.43.0", features = ["full"] }
|
||||
tokio-util = { version = "0.7.13", features = ["rt"] }
|
||||
tracing = "0.1.41"
|
||||
tracing-error = "0.2.1"
|
||||
tracing-subscriber = { version = "0.3.19", features = ["env-filter"] }
|
185
src/main.rs
Normal file
185
src/main.rs
Normal file
|
@ -0,0 +1,185 @@
|
|||
use std::{error::Error, fmt::Debug, path::Path, sync::Arc};
|
||||
|
||||
use axum::{Json, Router, extract::State, http::StatusCode, response::IntoResponse, routing::post};
|
||||
use clap::Parser;
|
||||
use serde::Serialize;
|
||||
use thiserror::Error;
|
||||
use tokio::sync::{Mutex, MutexGuard, oneshot};
|
||||
use tracing::{Level, debug, info, instrument, warn};
|
||||
use tracing_subscriber::layer::SubscriberExt;
|
||||
|
||||
mod mpv;
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
struct AppState {
|
||||
player: Arc<Mutex<mpv::Mpv>>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Error)]
|
||||
enum StopError {
|
||||
#[error("failed to communicate via IPC")]
|
||||
Ipc(#[from] mpv::IpcError),
|
||||
}
|
||||
|
||||
impl AppState {
|
||||
pub fn new() -> Result<Self, mpv::SpawnError> {
|
||||
Ok(Self {
|
||||
player: Self::spawn_player()?,
|
||||
})
|
||||
}
|
||||
|
||||
fn spawn_player() -> Result<Arc<Mutex<mpv::Mpv>>, mpv::SpawnError> {
|
||||
// We need a reference to the Mpv mutex in the death handler. Arc::try_new_cyclic would be nice
|
||||
let (sender, receiver) = oneshot::channel::<Arc<Mutex<mpv::Mpv>>>();
|
||||
let on_death = async move |error| {
|
||||
warn!(%error, "Player has encountered an error");
|
||||
let Ok(player) = receiver.try_recv() else {
|
||||
return;
|
||||
};
|
||||
let mut player = player.lock().await;
|
||||
*player = Self::spawn_player()
|
||||
};
|
||||
|
||||
let player = Arc::new(Mutex::new(mpv::Mpv::spawn(Some(on_death))));
|
||||
let Ok(()) = sender.send(player) else {
|
||||
warn!("Player died before we could hand it to the death handler");
|
||||
};
|
||||
|
||||
Ok(player)
|
||||
}
|
||||
|
||||
#[instrument(skip(self))]
|
||||
async fn get_player(&self) -> MutexGuard<Option<mpv::Mpv>> {
|
||||
let mut player = self.player.lock().await;
|
||||
|
||||
if let Some(p) = player.as_mut() {
|
||||
if let Err(error) = p.check_health().await {
|
||||
warn!(%error, "Player has encountered an error, dropping");
|
||||
let _ = player.take().unwrap().exit().await;
|
||||
}
|
||||
}
|
||||
|
||||
player
|
||||
}
|
||||
|
||||
#[instrument(skip(self))]
|
||||
pub async fn play<P: AsRef<Path> + Debug>(&self, file: P) -> Result<(), mpv::PlayError> {
|
||||
let file = file.as_ref();
|
||||
let mut player = self.get_player().await;
|
||||
|
||||
// ugly, but borrow checker gets mad about the &mut lifetimes otherwise
|
||||
if player.is_none() {
|
||||
debug!("Spawning new player instance");
|
||||
*player = Some();
|
||||
} else {
|
||||
debug!("Using existing player instance");
|
||||
}
|
||||
let player = player.as_mut().unwrap();
|
||||
|
||||
player.play(file).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[instrument(skip(self))]
|
||||
async fn stop(&self) -> Result<(), StopError> {
|
||||
let mut player = self.player.lock().await;
|
||||
let Some(player) = player.as_mut() else {
|
||||
debug!("No child currently active");
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
player.send_ipc(&["stop".to_owned()]).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
struct JsonError<E> {
|
||||
inner: E,
|
||||
}
|
||||
|
||||
impl<E: Error> IntoResponse for JsonError<E> {
|
||||
fn into_response(self) -> axum::response::Response {
|
||||
#[derive(Serialize)]
|
||||
struct Response {
|
||||
error_stack: Vec<String>,
|
||||
}
|
||||
|
||||
let mut error_stack = Vec::new();
|
||||
let mut e: &dyn Error = &self.inner;
|
||||
loop {
|
||||
error_stack.push(e.to_string());
|
||||
let Some(next) = e.source() else { break };
|
||||
e = next;
|
||||
}
|
||||
|
||||
(
|
||||
StatusCode::INTERNAL_SERVER_ERROR,
|
||||
Json(Response { error_stack }),
|
||||
)
|
||||
.into_response()
|
||||
}
|
||||
}
|
||||
|
||||
impl<E> From<E> for JsonError<E> {
|
||||
fn from(value: E) -> Self {
|
||||
Self { inner: value }
|
||||
}
|
||||
}
|
||||
|
||||
async fn start(State(state): State<AppState>) -> Result<Json<String>, JsonError<mpv::PlayError>> {
|
||||
state.play("/tmp/0118999.opus").await?;
|
||||
|
||||
Ok(Json("ok".to_owned()))
|
||||
}
|
||||
|
||||
async fn stop(State(state): State<AppState>) -> Result<Json<String>, JsonError<StopError>> {
|
||||
state.stop().await?;
|
||||
|
||||
Ok(Json("ok".to_owned()))
|
||||
}
|
||||
|
||||
#[derive(Parser)]
|
||||
struct Args {
|
||||
/// Make log output more verbose. Can be specified multiple times.
|
||||
#[arg(short, long, action = clap::ArgAction::Count)]
|
||||
verbose: u8,
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> color_eyre::Result<()> {
|
||||
let args = Args::parse();
|
||||
|
||||
color_eyre::install()?;
|
||||
|
||||
let filter = tracing_subscriber::EnvFilter::builder()
|
||||
.with_default_directive(
|
||||
match args.verbose {
|
||||
0 => Level::ERROR,
|
||||
1 => Level::INFO,
|
||||
2 => Level::DEBUG,
|
||||
_ => Level::TRACE,
|
||||
}
|
||||
.into(),
|
||||
)
|
||||
.from_env()?;
|
||||
|
||||
let subscriber = tracing_subscriber::Registry::default()
|
||||
.with(tracing_error::ErrorLayer::default())
|
||||
.with(filter)
|
||||
.with(tracing_subscriber::fmt::Layer::new());
|
||||
tracing::subscriber::set_global_default(subscriber)?;
|
||||
|
||||
let app = Router::new()
|
||||
.route("/api/start", post(start))
|
||||
.route("/api/stop", post(stop))
|
||||
.with_state(AppState::new());
|
||||
|
||||
let listener = tokio::net::TcpListener::bind("[::]:8080").await?;
|
||||
|
||||
info!("Starting server");
|
||||
axum::serve(listener, app).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
480
src/mpv.rs
Normal file
480
src/mpv.rs
Normal file
|
@ -0,0 +1,480 @@
|
|||
use command_fds::CommandFdExt as _;
|
||||
use futures_util::FutureExt;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
convert::Infallible,
|
||||
fmt::Debug,
|
||||
os::fd::{AsRawFd as _, OwnedFd},
|
||||
path::{Path, PathBuf},
|
||||
process::{ExitStatus, Stdio},
|
||||
sync::Arc,
|
||||
time::Duration,
|
||||
};
|
||||
use thiserror::Error;
|
||||
use tokio::{
|
||||
io::{AsyncBufReadExt as _, AsyncWriteExt as _, BufReader},
|
||||
net::unix::OwnedWriteHalf,
|
||||
process::{Child, Command},
|
||||
select,
|
||||
sync::{Mutex, broadcast, oneshot},
|
||||
task::{AbortHandle, JoinError, JoinHandle},
|
||||
time::timeout,
|
||||
};
|
||||
use tokio_util::task::AbortOnDropHandle;
|
||||
use tracing::{debug, error, info, instrument, trace, warn};
|
||||
|
||||
#[derive(Debug, Error)]
|
||||
pub(crate) enum SpawnError {
|
||||
#[error("failed to open IPC socket")]
|
||||
IpcSocket(#[source] std::io::Error),
|
||||
#[error("failed to spawn player command")]
|
||||
Command(#[source] std::io::Error),
|
||||
}
|
||||
|
||||
#[derive(Debug, Error)]
|
||||
pub(crate) enum ExitError {
|
||||
#[error("failed to kill process")]
|
||||
Kill(#[source] std::io::Error),
|
||||
}
|
||||
|
||||
#[derive(Debug, Error)]
|
||||
pub(crate) enum PlayError {
|
||||
#[error("failed to create new player instance")]
|
||||
Spawn(#[from] SpawnError),
|
||||
#[error("invalid path: {0}")]
|
||||
InvalidPath(PathBuf),
|
||||
#[error("failed to communicate via IPC")]
|
||||
Ipc(#[from] IpcError),
|
||||
#[error("file was not reported as loaded in time")]
|
||||
LoadTimedOut,
|
||||
#[error("event stream ended unexpectedly")]
|
||||
EndOfEventStream,
|
||||
#[error("failed to load file: {reason}, {}", file_error.as_deref().unwrap_or("[unknown]"))]
|
||||
LoadFailed {
|
||||
reason: String,
|
||||
file_error: Option<String>,
|
||||
},
|
||||
#[error("invalid IPC response")]
|
||||
InvalidIpcResponse(#[source] serde_json::Error),
|
||||
}
|
||||
|
||||
#[derive(Debug, Error)]
|
||||
pub(crate) enum IpcHandlerError {
|
||||
#[error("failed to read from IPC stream")]
|
||||
IoRead(#[source] std::io::Error),
|
||||
#[error("IPC stream ended unexpectedly")]
|
||||
EndOfStream,
|
||||
}
|
||||
|
||||
#[derive(Debug, Error)]
|
||||
pub(crate) enum IpcError {
|
||||
#[error("failed to serialize IPC request")]
|
||||
Serialize(#[source] serde_json::Error),
|
||||
#[error("failed to write to IPC stream")]
|
||||
IoWrite(#[source] std::io::Error),
|
||||
#[error("timed out waiting for IPC response")]
|
||||
ResponseTimedOut,
|
||||
#[error("receiver task died")]
|
||||
ReceiverDied(#[from] oneshot::error::RecvError),
|
||||
#[error(transparent)]
|
||||
ErrorResponse(#[from] ErrorResponse),
|
||||
}
|
||||
|
||||
#[derive(Debug, Error, Deserialize)]
|
||||
#[error("IPC returned error response: {0}")]
|
||||
pub(crate) struct ErrorResponse(pub String);
|
||||
|
||||
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Hash)]
|
||||
struct RequestId(u32);
|
||||
|
||||
type RequestMap = HashMap<RequestId, oneshot::Sender<Result<serde_json::Value, ErrorResponse>>>;
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
#[serde(tag = "error", rename_all = "lowercase")]
|
||||
enum CommandResponse {
|
||||
Success {
|
||||
data: serde_json::Value,
|
||||
request_id: RequestId,
|
||||
},
|
||||
#[serde(untagged)]
|
||||
Error {
|
||||
error: ErrorResponse,
|
||||
request_id: RequestId,
|
||||
},
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize, Clone)]
|
||||
#[serde(tag = "event", rename_all = "kebab-case")]
|
||||
enum IpcEvent {
|
||||
StartFile {
|
||||
playlist_entry_id: u64,
|
||||
},
|
||||
EndFile {
|
||||
playlist_entry_id: u64,
|
||||
playlist_insert_id: Option<u64>,
|
||||
reason: String,
|
||||
file_error: Option<String>,
|
||||
},
|
||||
FileLoaded,
|
||||
AudioReconfig,
|
||||
PlaybackRestart,
|
||||
/// Deprecated.
|
||||
Idle,
|
||||
#[serde(untagged)]
|
||||
#[allow(unused)]
|
||||
Unknown {
|
||||
event: String,
|
||||
#[serde(flatten)]
|
||||
args: HashMap<String, serde_json::Value>,
|
||||
},
|
||||
}
|
||||
|
||||
struct IpcHandler {
|
||||
requests: Arc<Mutex<RequestMap>>,
|
||||
event_sender: broadcast::Sender<IpcEvent>,
|
||||
}
|
||||
|
||||
impl IpcHandler {
|
||||
pub fn new(
|
||||
requests: Arc<Mutex<RequestMap>>,
|
||||
event_sender: broadcast::Sender<IpcEvent>,
|
||||
) -> Self {
|
||||
Self {
|
||||
requests,
|
||||
event_sender,
|
||||
}
|
||||
}
|
||||
|
||||
#[instrument(skip(self))]
|
||||
pub async fn handle_message(&mut self, message: &str) -> Result<(), serde_json::Error> {
|
||||
trace!("Got IPC message");
|
||||
|
||||
#[derive(Deserialize)]
|
||||
#[serde(untagged)]
|
||||
pub(crate) enum IpcMessage {
|
||||
Event(IpcEvent),
|
||||
Response(CommandResponse),
|
||||
}
|
||||
|
||||
let response = serde_json::from_str::<IpcMessage>(message)?;
|
||||
|
||||
match response {
|
||||
IpcMessage::Response(response) => {
|
||||
self.handle_command_response(response).await;
|
||||
Ok(())
|
||||
}
|
||||
IpcMessage::Event(event) => {
|
||||
self.handle_event(event).await;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[instrument(skip(self))]
|
||||
async fn handle_command_response(&mut self, response: CommandResponse) {
|
||||
trace!("Got command response");
|
||||
|
||||
let (CommandResponse::Success { request_id, .. }
|
||||
| CommandResponse::Error { request_id, .. }) = &response;
|
||||
|
||||
let Some(response_sender) = self.requests.lock().await.remove(request_id) else {
|
||||
error!("No active request for response");
|
||||
return;
|
||||
};
|
||||
|
||||
let response = match response {
|
||||
CommandResponse::Success { data, .. } => Ok(data),
|
||||
CommandResponse::Error { error, .. } => Err(error),
|
||||
};
|
||||
if response_sender.send(response).is_err() {
|
||||
debug!("Failed to send to command response channel, probably timed out");
|
||||
}
|
||||
}
|
||||
|
||||
#[instrument(skip(self))]
|
||||
async fn handle_event(&mut self, event: IpcEvent) {
|
||||
trace!("Got event");
|
||||
// don't care if there are no receivers
|
||||
let _ = self.event_sender.send(event);
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Error)]
|
||||
pub(crate) enum PlayerError {
|
||||
#[error("failed to wait on child process")]
|
||||
Wait(#[source] std::io::Error),
|
||||
#[error("child process exited with code {0}")]
|
||||
ChildExited(ExitStatus),
|
||||
|
||||
#[error("IPC handler is gone")]
|
||||
IpcHandlerGone,
|
||||
#[error("failed to join on IPC handler")]
|
||||
JoinIpcHandler(#[source] JoinError),
|
||||
#[error(transparent)]
|
||||
IpcHandlerDied(#[from] IpcHandlerError),
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct Mpv {
|
||||
/// May be held by the `death_watcher`
|
||||
/// Option because it needs to be separated from the rest of `self` during `exit()`.
|
||||
player: Option<Arc<Mutex<Child>>>,
|
||||
ipc_write: OwnedWriteHalf,
|
||||
next_request_id: u32,
|
||||
requests: Arc<Mutex<RequestMap>>,
|
||||
event_sender: broadcast::Sender<IpcEvent>,
|
||||
/// May be moved into the `death_watcher`
|
||||
rx_task: Option<AbortOnDropHandle<Result<Infallible, IpcHandlerError>>>,
|
||||
death_watcher: Option<AbortOnDropHandle<()>>,
|
||||
}
|
||||
|
||||
impl Mpv {
|
||||
#[instrument(skip(on_death))]
|
||||
pub(crate) fn spawn<F, Fut>(on_death: Option<F>) -> Result<Self, SpawnError>
|
||||
where
|
||||
F: FnOnce(PlayerError) -> Fut + Send + 'static,
|
||||
Fut: Future<Output = ()> + Send + 'static,
|
||||
{
|
||||
let (our_stream, child_stream) =
|
||||
tokio::net::UnixStream::pair().map_err(SpawnError::IpcSocket)?;
|
||||
let child_stream = OwnedFd::from(child_stream.into_std().unwrap());
|
||||
|
||||
let mut command = Command::new("mpv");
|
||||
command
|
||||
.arg("--idle")
|
||||
.arg("--msg-level=all=warn")
|
||||
.arg("--input-terminal=no")
|
||||
.arg(format!(
|
||||
"--input-ipc-client=fd://{}",
|
||||
child_stream.as_raw_fd()
|
||||
))
|
||||
.arg("--no-video")
|
||||
.stdin(Stdio::null())
|
||||
.stdout(Stdio::inherit())
|
||||
.stderr(Stdio::inherit())
|
||||
.preserved_fds(vec![child_stream]);
|
||||
info!(?command, "Spawning player");
|
||||
let player = Arc::new(Mutex::new(command.spawn().map_err(SpawnError::Command)?));
|
||||
|
||||
let (ipc_read, ipc_write) = our_stream.into_split();
|
||||
|
||||
let requests = Arc::new(Mutex::new(HashMap::new()));
|
||||
let (event_sender, _event_receiver) = broadcast::channel(5);
|
||||
let rx_task = {
|
||||
let requests = Arc::clone(&requests);
|
||||
let event_sender = event_sender.clone();
|
||||
tokio::task::spawn(async move {
|
||||
let mut handler = IpcHandler::new(requests, event_sender);
|
||||
let mut ipc_read = BufReader::new(ipc_read).lines();
|
||||
loop {
|
||||
let message = ipc_read
|
||||
.next_line()
|
||||
.await
|
||||
.map_err(IpcHandlerError::IoRead)?
|
||||
.ok_or(IpcHandlerError::EndOfStream)?;
|
||||
|
||||
if let Err(error) = handler.handle_message(&message).await {
|
||||
warn!(%error, "Failed to handle IPC message");
|
||||
};
|
||||
}
|
||||
})
|
||||
};
|
||||
let mut rx_task = Some(AbortOnDropHandle::new(rx_task));
|
||||
let mut death_watcher = None;
|
||||
if let Some(on_death) = on_death {
|
||||
let rx_task = rx_task.take().unwrap();
|
||||
let player = Arc::clone(&player);
|
||||
|
||||
let wait_for_death = async move {
|
||||
let mut player = player.lock().await;
|
||||
// wait for either the IPC task to exit or the child to die
|
||||
select! {
|
||||
ret = rx_task => match ret {
|
||||
Ok(Err(err)) => PlayerError::IpcHandlerDied(err),
|
||||
Err(err) => PlayerError::JoinIpcHandler(err),
|
||||
},
|
||||
ret = player.wait() => match ret {
|
||||
Ok(err) => PlayerError::ChildExited(err),
|
||||
Err(err) => PlayerError::Wait(err),
|
||||
},
|
||||
}
|
||||
};
|
||||
|
||||
death_watcher = Some(AbortOnDropHandle::new(tokio::task::spawn(
|
||||
wait_for_death.then(on_death),
|
||||
)));
|
||||
}
|
||||
|
||||
Ok(Self {
|
||||
player: Some(player),
|
||||
ipc_write,
|
||||
next_request_id: 0,
|
||||
requests,
|
||||
event_sender,
|
||||
rx_task,
|
||||
death_watcher,
|
||||
})
|
||||
}
|
||||
|
||||
#[instrument(skip(self))]
|
||||
pub async fn exit(mut self) -> Result<(), ExitError> {
|
||||
// first things first, abort the death watcher, since this is an expected exit
|
||||
if let Some(death_watcher) = self.death_watcher.take() {
|
||||
death_watcher.abort();
|
||||
let _ = death_watcher.await;
|
||||
}
|
||||
|
||||
let player = self
|
||||
.player
|
||||
.take()
|
||||
.expect("player is only taken during exit(), should still be there");
|
||||
let mut player = player
|
||||
.try_lock()
|
||||
.expect("death watch has been cancelled, player should not be locked anymore");
|
||||
|
||||
if let Ok(Some(_)) = player.try_wait() {
|
||||
debug!("Child has already exited");
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
debug!("Trying to stop child via IPC");
|
||||
match self.send_ipc(&["quit".to_owned()]).await {
|
||||
Err(error) => {
|
||||
warn!(?error, "Failed to stop process via IPC");
|
||||
}
|
||||
Ok(_) => match timeout(Duration::from_millis(500), player.wait()).await {
|
||||
Ok(Ok(_)) => return Ok(()),
|
||||
Ok(Err(error)) => warn!(?error, "Failed to wait for child to exit"),
|
||||
Err(_) => warn!("Timed out waiting for child to exit"),
|
||||
},
|
||||
}
|
||||
|
||||
info!("Killing child with SIGKILL");
|
||||
player.kill().await.map_err(ExitError::Kill)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn next_request_id(&mut self) -> RequestId {
|
||||
let ret = RequestId(self.next_request_id);
|
||||
self.next_request_id += 1;
|
||||
ret
|
||||
}
|
||||
|
||||
#[instrument(skip(self))]
|
||||
pub async fn send_ipc(&mut self, command: &[String]) -> Result<serde_json::Value, IpcError> {
|
||||
let request_id = self.next_request_id();
|
||||
|
||||
#[derive(Serialize)]
|
||||
pub(crate) struct IpcRequest<'a> {
|
||||
command: &'a [String],
|
||||
request_id: RequestId,
|
||||
r#async: bool,
|
||||
}
|
||||
let serialized = serde_json::to_string(&IpcRequest {
|
||||
command,
|
||||
request_id,
|
||||
r#async: true,
|
||||
})
|
||||
.map_err(IpcError::Serialize)?;
|
||||
|
||||
// register a channel for our request ID to get the response
|
||||
let (response_sender, response_receiver) = oneshot::channel();
|
||||
self.requests
|
||||
.lock()
|
||||
.await
|
||||
.insert(request_id, response_sender);
|
||||
|
||||
debug!(serialized, "Sending IPC command");
|
||||
self.ipc_write
|
||||
.write_all(serialized.as_bytes())
|
||||
.await
|
||||
.map_err(IpcError::IoWrite)?;
|
||||
self.ipc_write
|
||||
.write_all(b"\n")
|
||||
.await
|
||||
.map_err(IpcError::IoWrite)?;
|
||||
|
||||
let response = timeout(Duration::from_millis(100), response_receiver)
|
||||
.await
|
||||
.map_err(|_e| IpcError::ResponseTimedOut)???;
|
||||
|
||||
Ok(response)
|
||||
}
|
||||
|
||||
#[instrument(skip(self))]
|
||||
pub async fn play<P: AsRef<Path> + Debug>(&mut self, file: P) -> Result<(), PlayError> {
|
||||
let file = file.as_ref();
|
||||
|
||||
let mut event_receiver = self.event_sender.subscribe();
|
||||
|
||||
let response = self
|
||||
.send_ipc(&[
|
||||
"loadfile".to_owned(),
|
||||
file.to_str()
|
||||
.ok_or_else(|| PlayError::InvalidPath(file.to_owned()))?
|
||||
.to_owned(),
|
||||
])
|
||||
.await?;
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
struct ResponseData {
|
||||
playlist_entry_id: u64,
|
||||
}
|
||||
let response: ResponseData =
|
||||
serde_json::from_value(response).map_err(PlayError::InvalidIpcResponse)?;
|
||||
|
||||
let mut current_entry_id = response.playlist_entry_id;
|
||||
timeout(Duration::from_millis(100), async move {
|
||||
loop {
|
||||
let event = match event_receiver.recv().await {
|
||||
Ok(event) => event,
|
||||
Err(broadcast::error::RecvError::Closed) => {
|
||||
return Err(PlayError::EndOfEventStream);
|
||||
}
|
||||
Err(broadcast::error::RecvError::Lagged(_)) => {
|
||||
warn!("Failed to keep up with event stream");
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
match event {
|
||||
IpcEvent::StartFile { playlist_entry_id } => {
|
||||
if playlist_entry_id != current_entry_id {
|
||||
debug!(playlist_entry_id, "Incorrect playlist_entry_id, ignoring");
|
||||
continue;
|
||||
}
|
||||
debug!(playlist_entry_id, "File has been added to playlist");
|
||||
}
|
||||
IpcEvent::FileLoaded => {
|
||||
debug!("File has been loaded successfully");
|
||||
return Ok(());
|
||||
}
|
||||
IpcEvent::EndFile {
|
||||
reason,
|
||||
file_error,
|
||||
playlist_entry_id,
|
||||
playlist_insert_id,
|
||||
} => {
|
||||
if playlist_entry_id != current_entry_id {
|
||||
debug!(playlist_entry_id, "Incorrect playlist_entry_id, ignoring");
|
||||
continue;
|
||||
}
|
||||
|
||||
if let Some(insert_id) = playlist_insert_id {
|
||||
debug!(insert_id, "File has been replaced with new entry");
|
||||
current_entry_id = insert_id;
|
||||
} else {
|
||||
warn!(reason, file_error, "Failed to play file");
|
||||
return Err(PlayError::LoadFailed { reason, file_error });
|
||||
}
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
})
|
||||
.await
|
||||
.map_err(|_| PlayError::LoadTimedOut)?
|
||||
}
|
||||
}
|
Loading…
Add table
Reference in a new issue