1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
use crate::bundle::packager::{BundlePackager, Error, Result};
use async_compression::tokio::write::GzipEncoder;
use async_trait::async_trait;
use futures_util::TryStreamExt;
use rusoto_core::RusotoError;
use rusoto_s3::{PutObjectError, PutObjectRequest, S3Client, StreamingBody, S3};
use snafu::{ResultExt, Snafu};
use std::path::PathBuf;
use tokio::{
fs::{File, OpenOptions},
io::AsyncWriteExt,
};
use tokio_tar::Builder as TarBuilder;
use tokio_util::codec::{BytesCodec, FramedRead};
#[derive(Snafu, Debug)]
pub enum InternalError {
S3PutObjectError { source: RusotoError<PutObjectError> },
IOError { source: std::io::Error },
}
impl From<InternalError> for Error {
fn from(err: InternalError) -> Self {
Error::InternalBundlePackagerError {
source: Box::new(err),
}
}
}
pub struct S3BundlePackager {
client: S3Client,
bucket: String,
object_name: String,
}
impl S3BundlePackager {
pub fn new(
access_key: String,
secret_key: String,
endpoint: String,
bucket: String,
region: String,
object_name: String,
) -> S3BundlePackager {
let region = rusoto_core::Region::Custom {
name: region,
endpoint,
};
let credentials = rusoto_credential::StaticProvider::new_minimal(access_key, secret_key);
let dispatcher = rusoto_core::HttpClient::new().unwrap();
let client = S3Client::new_with(dispatcher, credentials, region);
S3BundlePackager {
client,
bucket,
object_name,
}
}
}
#[async_trait(?Send)]
impl BundlePackager for S3BundlePackager {
async fn generate(&self, path: PathBuf) -> Result<()> {
let tar_file = OpenOptions::new()
.create(true)
.write(true)
.read(true)
.open("bundle.tar.gz")
.await
.context(IOError)?;
let tar_gz_file = File::create("bundle.tar").await.context(IOError)?;
let mut builder = TarBuilder::new(tar_file);
builder.append_dir_all(".", path).await.context(IOError)?;
let mut tar_file = builder.into_inner().await.context(IOError)?;
let mut encoder = GzipEncoder::new(tar_gz_file);
tokio::io::copy(&mut tar_file, &mut encoder)
.await
.context(IOError)?;
encoder.shutdown().await.context(IOError)?;
let tar_gz_file = File::open("bundle.tar.gz").await.context(IOError)?;
let final_stream_of_bytes = FramedRead::new(tar_gz_file, BytesCodec::new());
let body = StreamingBody::new(final_stream_of_bytes.map_ok(|x| x.freeze()));
log::info!(
"S3BundlePackager: Uploading to {} in {}",
&self.object_name,
&self.bucket
);
let request = PutObjectRequest {
bucket: self.bucket.clone(),
key: self.object_name.clone(),
body: Some(body),
..Default::default()
};
let response = self
.client
.put_object(request)
.await
.context(S3PutObjectError)?;
log::info!(
"S3BundlePackager: Bundle uploaded. ETag: {}",
response.e_tag.unwrap_or_else(|| "N/A".to_owned())
);
log::info!("S3BundlePackager: Removing temporary files...");
tokio::fs::remove_file("bundle.tar")
.await
.context(IOError)?;
tokio::fs::remove_file("bundle.tar.gz")
.await
.context(IOError)?;
Ok(())
}
}