1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
use crate::{
bundle::{Unbundler, UnbundlerStatus},
config::StatsConfig,
};
use actix_rt::Runtime;
use actix_web::dev::ServerHandle;
use actix_web::web::Data;
use actix_web::{middleware::Logger, App, HttpResponse, HttpServer, Responder};
use collective::thread::{self, handle::ThreadHandle};
use mpsc::{RecvError, SendError, Sender};
use serde::Serialize;
use snafu::{ResultExt, Snafu};
use std::{
path::PathBuf,
sync::{mpsc, Arc},
thread::Thread,
};
#[derive(Debug, Snafu)]
pub enum Error {
ChannelSend { source: SendError<ServerHandle> },
ChannelReceive { source: RecvError },
Bind { source: std::io::Error },
SystemRun { source: std::io::Error },
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
struct State {
unbundler: Arc<Unbundler>,
}
pub struct StatsServer {
config: StatsConfig,
unbundler: Arc<Unbundler>,
}
impl StatsServer {
pub fn new(config: StatsConfig, unbundler: Arc<Unbundler>) -> Self {
StatsServer { config, unbundler }
}
pub async fn spawn(
self,
notify_sender: Sender<Thread>,
) -> Result<(ServerHandle, ThreadHandle<Result<()>>)> {
let (tx, rx) = mpsc::channel();
let thread_handle = thread::handle::spawn(notify_sender, move || {
let rt = Runtime::new().unwrap();
let unbundler = self.unbundler.clone();
let server_address = self.config.address;
let srv = HttpServer::new(move || {
App::new()
.wrap(Logger::default())
.route("/", actix_web::web::get().to(get_status))
.route("/health", actix_web::web::get().to(get_health))
.app_data(Data::new(State {
unbundler: Arc::clone(&unbundler),
}))
})
.bind(server_address)
.context(Bind)?
.shutdown_timeout(60)
.run();
tx.send(srv.handle()).context(ChannelSend)?;
rt.block_on(async { srv.await }).context(SystemRun)?;
Ok(())
});
Ok((rx.recv().context(ChannelReceive)?, thread_handle))
}
}
#[derive(Serialize)]
struct GetStatusResponse {
status: UnbundlerStatus,
serve_dir: Option<PathBuf>,
}
async fn try_get_status(data: actix_web::web::Data<State>) -> anyhow::Result<String> {
let status = data.unbundler.get_status().await?;
let serve_dir = data.unbundler.get_serve_dir().await?;
Ok(serde_json::to_string(&GetStatusResponse {
status,
serve_dir,
})?)
}
async fn get_status(data: actix_web::web::Data<State>) -> impl Responder {
let result = try_get_status(data).await;
match result {
Ok(response) => HttpResponse::Ok().body(response),
Err(_) => HttpResponse::InternalServerError().body("Internal Server Error"),
}
}
async fn get_health(data: actix_web::web::Data<State>) -> impl Responder {
match data.unbundler.get_status().await {
Ok(status) => match status {
UnbundlerStatus::Ready | UnbundlerStatus::Polling => HttpResponse::Ok().body("Ready"),
_ => HttpResponse::ServiceUnavailable().body("Not Ready"),
},
Err(_) => HttpResponse::InternalServerError().body("Internal Server Error"),
}
}