1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
use crate::bundle::packager::{BundlePackager, Error, Result};
use async_compression::tokio::write::GzipEncoder;
use async_trait::async_trait;
use futures_util::TryStreamExt;
use rusoto_core::RusotoError;
use rusoto_s3::{PutObjectError, PutObjectRequest, S3Client, StreamingBody, S3};
use snafu::{ResultExt, Snafu};
use std::path::PathBuf;
use tokio::{
  fs::{File, OpenOptions},
  io::AsyncWriteExt,
};
use tokio_tar::Builder as TarBuilder;
use tokio_util::codec::{BytesCodec, FramedRead};

#[derive(Snafu, Debug)]
pub enum InternalError {
  S3PutObjectError { source: RusotoError<PutObjectError> },
  IOError { source: std::io::Error },
}

impl From<InternalError> for Error {
  fn from(err: InternalError) -> Self {
    Error::InternalBundlePackagerError {
      source: Box::new(err),
    }
  }
}

pub struct S3BundlePackager {
  client: S3Client,
  bucket: String,
  object_name: String,
}

impl S3BundlePackager {
  pub fn new(
    access_key: String,
    secret_key: String,
    endpoint: String,
    bucket: String,
    region: String,
    object_name: String,
  ) -> S3BundlePackager {
    let region = rusoto_core::Region::Custom {
      name: region,
      endpoint,
    };
    let credentials = rusoto_credential::StaticProvider::new_minimal(access_key, secret_key);
    let dispatcher = rusoto_core::HttpClient::new().unwrap();
    let client = S3Client::new_with(dispatcher, credentials, region);

    S3BundlePackager {
      client,
      bucket,
      object_name,
    }
  }
}

#[async_trait(?Send)]
impl BundlePackager for S3BundlePackager {
  async fn generate(&self, path: PathBuf) -> Result<()> {
    let tar_file = OpenOptions::new()
      .create(true)
      .write(true)
      .read(true)
      .open("bundle.tar.gz")
      .await
      .context(IOError)?;
    let tar_gz_file = File::create("bundle.tar").await.context(IOError)?;

    let mut builder = TarBuilder::new(tar_file);

    builder.append_dir_all(".", path).await.context(IOError)?;
    let mut tar_file = builder.into_inner().await.context(IOError)?;

    let mut encoder = GzipEncoder::new(tar_gz_file);

    tokio::io::copy(&mut tar_file, &mut encoder)
      .await
      .context(IOError)?;

    encoder.shutdown().await.context(IOError)?;

    let tar_gz_file = File::open("bundle.tar.gz").await.context(IOError)?;

    let final_stream_of_bytes = FramedRead::new(tar_gz_file, BytesCodec::new());

    let body = StreamingBody::new(final_stream_of_bytes.map_ok(|x| x.freeze()));

    log::info!(
      "S3BundlePackager: Uploading to {} in {}",
      &self.object_name,
      &self.bucket
    );

    let request = PutObjectRequest {
      bucket: self.bucket.clone(),
      key: self.object_name.clone(),
      body: Some(body),
      ..Default::default()
    };

    let response = self
      .client
      .put_object(request)
      .await
      .context(S3PutObjectError)?;

    log::info!(
      "S3BundlePackager: Bundle uploaded. ETag: {}",
      response.e_tag.unwrap_or_else(|| "N/A".to_owned())
    );

    log::info!("S3BundlePackager: Removing temporary files...");

    tokio::fs::remove_file("bundle.tar")
      .await
      .context(IOError)?;
    tokio::fs::remove_file("bundle.tar.gz")
      .await
      .context(IOError)?;

    Ok(())
  }
}