aboutsummaryrefslogtreecommitdiff
path: root/src/main.rs
diff options
context:
space:
mode:
authorOskari Timperi <oskari.timperi@iki.fi>2019-05-26 22:19:36 +0300
committerOskari Timperi <oskari.timperi@iki.fi>2019-05-26 22:19:36 +0300
commita27ff425df5475765bc99df8eddc384b395a5e13 (patch)
treec7e2c828d571ad003b91fe1bb743f76f0807aec1 /src/main.rs
parentb73a89fbdbfd607726eded2eac9fa969f9073927 (diff)
parentee547b84ddce76411b9efb25b734b89e5847cb22 (diff)
downloadruuvitag-upload-a27ff425df5475765bc99df8eddc384b395a5e13.tar.gz
ruuvitag-upload-a27ff425df5475765bc99df8eddc384b395a5e13.zip
Merge branch 'caching'
Diffstat (limited to 'src/main.rs')
-rw-r--r--src/main.rs174
1 files changed, 160 insertions, 14 deletions
diff --git a/src/main.rs b/src/main.rs
index dabb04b..c2b94e7 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -1,4 +1,7 @@
use std::collections::HashMap;
+use std::fs;
+use std::io::{self, BufReader, Write};
+use std::path::Path;
use std::process;
use std::sync::{mpsc::channel, Arc};
use std::time::{SystemTime, UNIX_EPOCH};
@@ -18,7 +21,9 @@ use docopt;
use reqwest;
-#[derive(Serialize)]
+use directories::ProjectDirs;
+
+#[derive(Serialize, Deserialize)]
struct Measurement {
address: String,
// Unix timestamp.
@@ -73,6 +78,15 @@ structure
where ALIAS will either be the address of the sensor, or
an alias that you can define.
+If uploading measurements fails, the measurements are
+cached. The cached measurements are uploaded the next time
+ruuvitag-upload is called. Cached measurements are uploaded
+first, from oldest to newest. If uploading cached measurements
+fails, the current measurements are again cached for next time.
+This way, you won't lose any measurements. When a cached
+measurement is succesfully uploaded, the cache entry will be
+removed.
+
USAGE:
ruuvitag-upload [--url=URL] <sensor>...
@@ -144,6 +158,123 @@ fn run() -> Result<(), Error> {
.map(|(address, alias)| (address.to_string(), alias.to_string()))
.collect();
+ let measurements = collect_measurements(sensors)?;
+
+ if let Some(url) = args.flag_url {
+ let result = upload_cached_measurements(&url);
+
+ // If uploading cached measurements failed, we try to cache the latest measurements.
+ if result.is_err() {
+ eprintln!("error: {}", result.unwrap_err());
+ cache_measurements(measurements)?;
+ return Ok(());
+ }
+
+ let client = reqwest::Client::new();
+
+ let result = match client.post(&url).json(&measurements).send() {
+ Ok(response) => match response.error_for_status() {
+ Ok(response) => Ok(response),
+ Err(error) => Err(error),
+ },
+ Err(error) => Err(error),
+ };
+
+ // If uploading the latest measurements failed, we try to cache them for later uploading.
+ if result.is_err() {
+ eprintln!("error: {}", result.unwrap_err());
+ cache_measurements(measurements)?;
+ }
+ } else {
+ println!("{}", serde_json::to_string(&measurements).unwrap());
+ }
+
+ Ok(())
+}
+
+fn find_cached_measurements(cache_dir: &Path) -> Result<Vec<std::path::PathBuf>, Error> {
+ let mut result = Vec::new();
+
+ // It's OK if we don't find cached data. Other errors are not good.
+ if let Err(error) = fs::metadata(cache_dir) {
+ if error.kind() == io::ErrorKind::NotFound {
+ return Ok(result);
+ }
+ return Err(error.into());
+ }
+
+ for entry in fs::read_dir(cache_dir)? {
+ let entry = entry?;
+ let file_type = entry.file_type()?;
+ if file_type.is_file() {
+ let path = entry.path();
+ if let Some(ext) = path.extension() {
+ if ext == "json" {
+ result.push(path);
+ }
+ }
+ }
+ }
+
+ result.sort();
+
+ Ok(result)
+}
+
+fn upload_cached_measurements(url: &str) -> Result<(), Error> {
+ let paths = find_cached_measurements(&get_cache_dir()?)?;
+
+ let client = reqwest::Client::new();
+
+ for path in paths {
+ let file = fs::File::open(&path)?;
+ let reader = BufReader::new(file);
+ let measurements: HashMap<String, Measurement> = serde_json::from_reader(reader)?;
+ client
+ .post(url)
+ .json(&measurements)
+ .send()?
+ .error_for_status()?;
+ fs::remove_file(&path)?;
+ }
+
+ Ok(())
+}
+
+fn get_cache_dir() -> Result<std::path::PathBuf, Error> {
+ match ProjectDirs::from("dev", "otimperi", "ruuvitag-upload") {
+ None => Err(failure::format_err!("failed to get cache dir location")),
+ Some(dir) => Ok(dir.data_dir().to_path_buf()),
+ }
+}
+
+fn cache_measurements(measurements: HashMap<String, Measurement>) -> Result<(), Error> {
+ let mut path = get_cache_dir()?;
+
+ path.push(format!(
+ "{}.json",
+ SystemTime::now()
+ .duration_since(UNIX_EPOCH)
+ .unwrap()
+ .as_secs()
+ ));
+
+ eprintln!("caching measurements to {}", path.display());
+
+ std::fs::create_dir_all(path.parent().unwrap())?;
+
+ let mut file = std::fs::File::create(path)?;
+
+ let json = serde_json::to_string(&measurements)?;
+
+ file.write_all(&json.into_bytes())?;
+
+ Ok(())
+}
+
+fn collect_measurements(
+ sensors: HashMap<String, String>,
+) -> Result<HashMap<String, Measurement>, Error> {
let manager = rumble::bluez::manager::Manager::new()?;
let mut adapter = manager.adapters()?.into_iter().nth(0).unwrap();
@@ -181,19 +312,7 @@ fn run() -> Result<(), Error> {
central.stop_scan()?;
- if let Some(url) = args.flag_url {
- let client = reqwest::Client::new();
-
- client
- .post(&url)
- .json(&measurements)
- .send()?
- .error_for_status()?;
- } else {
- println!("{}", serde_json::to_string(&measurements).unwrap());
- }
-
- Ok(())
+ Ok(measurements)
}
fn on_event(
@@ -235,3 +354,30 @@ fn from_manufacturer_data(data: &[u8]) -> Result<SensorValues, ParseError> {
Err(ParseError::EmptyValue)
}
}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use assert_fs::prelude::*;
+
+ #[test]
+ fn test_find_cached_measurements() {
+ let test_dir = assert_fs::TempDir::new().unwrap();
+
+ test_dir.child("1236.json").touch().unwrap();
+ test_dir.child("1233.cmd").touch().unwrap();
+ test_dir.child("1234.json").touch().unwrap();
+ test_dir.child("1235.json").touch().unwrap();
+ test_dir.child("1238.md").touch().unwrap();
+ test_dir.child("1237.txt").touch().unwrap();
+
+ let files: Vec<String> = find_cached_measurements(test_dir.path())
+ .unwrap()
+ .iter()
+ .filter_map(|path| path.file_name())
+ .map(|file_name| file_name.to_string_lossy().into_owned())
+ .collect();
+
+ assert_eq!(files, vec!["1234.json", "1235.json", "1236.json"]);
+ }
+}