1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
use crate::{
  bundle::{Unbundler, UnbundlerStatus},
  config::StatsConfig,
};
use actix_rt::Runtime;
use actix_web::dev::ServerHandle;
use actix_web::web::Data;
use actix_web::{middleware::Logger, App, HttpResponse, HttpServer, Responder};
use collective::thread::{self, handle::ThreadHandle};
use mpsc::{RecvError, SendError, Sender};
use serde::Serialize;
use snafu::{ResultExt, Snafu};
use std::{
  path::PathBuf,
  sync::{mpsc, Arc},
  thread::Thread,
};

#[derive(Debug, Snafu)]
pub enum Error {
  ChannelSend { source: SendError<ServerHandle> },
  ChannelReceive { source: RecvError },
  Bind { source: std::io::Error },
  SystemRun { source: std::io::Error },
}

pub type Result<T, E = Error> = std::result::Result<T, E>;

struct State {
  unbundler: Arc<Unbundler>,
}

pub struct StatsServer {
  config: StatsConfig,
  unbundler: Arc<Unbundler>,
}

impl StatsServer {
  pub fn new(config: StatsConfig, unbundler: Arc<Unbundler>) -> Self {
    StatsServer { config, unbundler }
  }

  pub async fn spawn(
    self,
    notify_sender: Sender<Thread>,
  ) -> Result<(ServerHandle, ThreadHandle<Result<()>>)> {
    let (tx, rx) = mpsc::channel();

    let thread_handle = thread::handle::spawn(notify_sender, move || {
      let rt = Runtime::new().unwrap();

      let unbundler = self.unbundler.clone();
      let server_address = self.config.address;

      let srv = HttpServer::new(move || {
        App::new()
          .wrap(Logger::default())
          .route("/", actix_web::web::get().to(get_status))
          .route("/health", actix_web::web::get().to(get_health))
          .app_data(Data::new(State {
            unbundler: Arc::clone(&unbundler),
          }))
      })
      .bind(server_address)
      .context(Bind)?
      .shutdown_timeout(60)
      .run();

      tx.send(srv.handle()).context(ChannelSend)?;

      rt.block_on(async { srv.await }).context(SystemRun)?;

      Ok(())
    });

    Ok((rx.recv().context(ChannelReceive)?, thread_handle))
  }
}

#[derive(Serialize)]
struct GetStatusResponse {
  status: UnbundlerStatus,
  serve_dir: Option<PathBuf>,
}

async fn try_get_status(data: actix_web::web::Data<State>) -> anyhow::Result<String> {
  let status = data.unbundler.get_status().await?;
  let serve_dir = data.unbundler.get_serve_dir().await?;

  Ok(serde_json::to_string(&GetStatusResponse {
    status,
    serve_dir,
  })?)
}

async fn get_status(data: actix_web::web::Data<State>) -> impl Responder {
  let result = try_get_status(data).await;

  match result {
    Ok(response) => HttpResponse::Ok().body(response),
    Err(_) => HttpResponse::InternalServerError().body("Internal Server Error"),
  }
}

async fn get_health(data: actix_web::web::Data<State>) -> impl Responder {
  match data.unbundler.get_status().await {
    Ok(status) => match status {
      UnbundlerStatus::Ready | UnbundlerStatus::Polling => HttpResponse::Ok().body("Ready"),
      _ => HttpResponse::ServiceUnavailable().body("Not Ready"),
    },
    Err(_) => HttpResponse::InternalServerError().body("Internal Server Error"),
  }
}