aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorOskari Timperi <oskari.timperi@iki.fi>2019-04-13 11:12:38 +0300
committerOskari Timperi <oskari.timperi@iki.fi>2019-04-13 11:12:38 +0300
commit5172d16e4be2d6dcfaba882d50ffa82224ebc507 (patch)
tree4065340ca8b8870db0e8261743760befa60c81f2 /src
downloadruuvitag-upload-5172d16e4be2d6dcfaba882d50ffa82224ebc507.tar.gz
ruuvitag-upload-5172d16e4be2d6dcfaba882d50ffa82224ebc507.zip
Initial commit
Diffstat (limited to 'src')
-rw-r--r--src/main.rs269
1 files changed, 269 insertions, 0 deletions
diff --git a/src/main.rs b/src/main.rs
new file mode 100644
index 0000000..7d22e14
--- /dev/null
+++ b/src/main.rs
@@ -0,0 +1,269 @@
+use std::collections::HashMap;
+use std::process;
+use std::sync::{Arc, Condvar, Mutex};
+use std::time::{Duration, SystemTime, UNIX_EPOCH};
+
+use rumble;
+use rumble::api::{BDAddr, Central, CentralEvent, Peripheral};
+use rumble::bluez::adapter::ConnectedAdapter;
+
+use failure::Error;
+
+use ruuvi_sensor_protocol::{ParseError, SensorValues};
+
+use serde::{Deserialize, Serialize};
+use serde_json;
+
+use docopt;
+
+use reqwest;
+
+#[derive(Serialize)]
+struct Measurement {
+ address: String,
+ // Unix timestamp.
+ timestamp: u64,
+ // Relative humidity, percent.
+ humidity: Option<f64>,
+ // Temperature, Celcius.
+ temperature: Option<f64>,
+ // Pressure, kPa.
+ pressure: Option<f64>,
+ // Battery potential, volts.
+ battery_potential: Option<f64>,
+}
+
+impl Measurement {
+ fn new(address: BDAddr, values: SensorValues) -> Measurement {
+ Measurement {
+ address: format!("{}", address),
+ timestamp: SystemTime::now()
+ .duration_since(UNIX_EPOCH)
+ .unwrap()
+ .as_secs(),
+ humidity: values.humidity.map(|x| f64::from(x) / 10000.0),
+ temperature: values.temperature.map(|x| f64::from(x) / 1000.0),
+ pressure: values.pressure.map(|x| f64::from(x) / 1000.0),
+ battery_potential: values.battery_potential.map(|x| f64::from(x) / 1000.0),
+ }
+ }
+}
+
+const USAGE: &'static str = "
+ruuvitag-upload
+
+A tool for collecting a set of ruuvitag sensor measurements
+and uploading them for further processing.
+
+The measurements are formatted as JSON with the following
+structure
+
+ {
+ \"<ALIAS>\": {
+ \"address\": \"XX:XX:XX:XX:XX:XX\",
+ \"timestamp\": <seconds since unix epoch>,
+ \"humidity\": <0-100%>,
+ \"pressure\": <kPa>,
+ \"temperature\": <Celcius>,
+ \"battery_potential\": <volts>
+ },
+ ...
+ }
+
+where ALIAS will either be the address of the sensor, or
+an alias that you can define.
+
+USAGE:
+
+ ruuvitag-upload [--url=URL] <sensor>...
+ ruuvitag-upload -h | --help
+ ruuvitag-upload --version
+
+ARGUMENTS:
+
+ <sensor>...
+
+ A sensor address and optionally a human-readable
+ alias. You can either specify the address as
+ XX:XX:XX:XX:XX:XX or you can attach a human-
+ readable alias to the address
+ XX:XX:XX:XX:XX:XX=mysensor.
+
+OPTIONS:
+
+ -u URL, --url=URL
+
+ Where the measurements are uploaded to. If you don't
+ specify this, the measurements are written to stdout.
+
+ -h, --help
+
+ Show this message.
+
+ --version
+
+ Show the version number.
+";
+
+#[derive(Deserialize)]
+struct Args {
+ arg_sensor: Vec<String>,
+ flag_url: Option<String>,
+}
+
+fn parse_sensor(s: &str) -> (&str, &str) {
+ let mut it = s.split('=');
+ let address = it.next().unwrap();
+ let alias = if let Some(s) = it.next() { s } else { address };
+ (address, alias)
+}
+
+fn main() {
+ if let Err(e) = run() {
+ eprintln!("error: {}", e);
+ process::exit(1);
+ }
+}
+
+fn run() -> Result<(), Error> {
+ let version = format!(
+ "{}.{}.{}",
+ env!("CARGO_PKG_VERSION_MAJOR"),
+ env!("CARGO_PKG_VERSION_MINOR"),
+ env!("CARGO_PKG_VERSION_PATCH")
+ );
+
+ let args: Args = docopt::Docopt::new(USAGE)
+ .and_then(|d| d.help(true).version(Some(version)).deserialize())
+ .unwrap_or_else(|e| e.exit());
+
+ let sensors: HashMap<String, String> = args
+ .arg_sensor
+ .iter()
+ .map(|x| parse_sensor(x))
+ .map(|(address, alias)| (address.to_string(), alias.to_string()))
+ .collect();
+ let sensors = Arc::new(sensors);
+ let sensors_clone = sensors.clone();
+
+ let manager = rumble::bluez::manager::Manager::new()?;
+
+ let mut adapter = manager.adapters()?.into_iter().nth(0).unwrap();
+
+ adapter = manager.down(&adapter)?;
+ adapter = manager.up(&adapter)?;
+
+ let central = Arc::new(adapter.connect()?);
+
+ central.active(false);
+ central.filter_duplicates(false);
+
+ let central_clone = central.clone();
+
+ let measurements: Arc<Mutex<HashMap<String, Measurement>>> =
+ Arc::new(Mutex::new(HashMap::new()));
+ let measurements_clone = measurements.clone();
+
+ let event = Arc::new((Mutex::new(false), Condvar::new()));
+ let event_clone = event.clone();
+
+ central.on_event(Box::new(move |event| {
+ if let Some(result) = on_event(&central_clone, event) {
+ match result {
+ Ok(measurement) => {
+ if let Some(alias) = sensors_clone.get(&measurement.address) {
+ let mut measurements = measurements_clone.lock().unwrap();
+ measurements.insert(alias.clone(), measurement);
+ if measurements.len() == sensors_clone.len() {
+ let &(ref lock, ref cv) = &*event_clone;
+ let mut done = lock.lock().unwrap();
+ *done = true;
+ cv.notify_one();
+ }
+ }
+ }
+ Err(_) => (),
+ }
+ }
+ }));
+
+ loop {
+ central.start_scan()?;
+
+ {
+ let &(ref lock, ref cv) = &*event;
+ let mut signalled = lock.lock().unwrap();
+ while !*signalled {
+ let result = cv.wait_timeout(signalled, Duration::from_secs(60)).unwrap();
+ signalled = result.0;
+ if result.1.timed_out() {
+ break;
+ }
+ }
+ }
+
+ central.stop_scan()?;
+
+ {
+ let &(ref lock, _) = &*event;
+ let signalled = lock.lock().unwrap();
+ if *signalled {
+ break;
+ }
+ }
+ }
+
+ if let Some(url) = args.flag_url {
+ let client = reqwest::Client::new();
+
+ client
+ .post(&url)
+ .json(&*measurements.lock().unwrap())
+ .send()?
+ .error_for_status()?;
+ } else {
+ println!("{}", serde_json::to_string(&*measurements.lock().unwrap()).unwrap());
+ }
+
+ Ok(())
+}
+
+fn on_event(
+ central: &ConnectedAdapter,
+ event: CentralEvent,
+) -> Option<Result<Measurement, ParseError>> {
+ match event {
+ CentralEvent::DeviceDiscovered(addr) => on_event_with_address(central, addr),
+ CentralEvent::DeviceUpdated(addr) => on_event_with_address(central, addr),
+ _ => None,
+ }
+}
+
+fn on_event_with_address(
+ central: &ConnectedAdapter,
+ address: BDAddr,
+) -> Option<Result<Measurement, ParseError>> {
+ match central.peripheral(address) {
+ Some(peripheral) => match to_sensor_value(peripheral) {
+ Ok(values) => Some(Ok(Measurement::new(address, values))),
+ Err(e) => Some(Err(e)),
+ },
+ None => None,
+ }
+}
+
+fn to_sensor_value<T: Peripheral>(peripheral: T) -> Result<SensorValues, ParseError> {
+ match peripheral.properties().manufacturer_data {
+ Some(data) => from_manufacturer_data(&data),
+ None => Err(ParseError::EmptyValue),
+ }
+}
+
+fn from_manufacturer_data(data: &[u8]) -> Result<SensorValues, ParseError> {
+ if data.len() > 2 {
+ let id = u16::from(data[0]) + (u16::from(data[1]) << 8);
+ SensorValues::from_manufacturer_specific_data(id, &data[2..])
+ } else {
+ Err(ParseError::EmptyValue)
+ }
+}