support multiple sample file directories

This is still pretty basic support. There's no config UI support for
renaming/moving the sample file directories after they are created, and no
error checking that the files are still in the expected place. I can imagine
sysadmins getting into trouble trying to change things. I hope to address at
least some of that in a follow-up change to introduce a versioning/locking
scheme that ensures databases and sample file dirs match in some way.

A bonus change that kinda got pulled along for the ride: a dialog pops up in
the config UI while a stream is being tested. The experience was pretty bad
before; there was no indication the button worked at all until it was done,
sometimes many seconds later.
This commit is contained in:
Scott Lamb 2018-02-11 22:45:51 -08:00
parent 6f309e432f
commit 89b6bccaa3
17 changed files with 822 additions and 370 deletions

View File

@ -99,9 +99,9 @@ state:
* a SQLite database, typically <1 GiB. It should be stored on flash if
available.
* the "sample file directory", which holds the actual samples/frames of
H.264 video. This should be quite large and typically is stored on a hard
drive.
* the "sample file directories", which hold the actual samples/frames of
H.264 video. These should be quite large and are typically stored on hard
drives.
(See [schema.md](schema.md) for more information.)
@ -134,23 +134,27 @@ You can configure the system through a text-based user interface:
$ sudo -u moonfire-nvr moonfire-nvr config 2>debug-log
In the user interface, add your cameras under the "Edit cameras" dialog.
There's a "Test" button to verify your settings directly from the dialog.
In the user interface,
After the cameras look correct, go to "Edit retention" to assign disk space to
each camera. Leave a little slack (at least 100 MB per camera) between the total
limit and the filesystem capacity, even if you store nothing else on the disk.
There are several reasons this is needed:
1. add your sample file dirs under "Edit cameras and retention"
2. add cameras under the "Edit cameras and streams" dialog.
There's a "Test" button to verify your settings directly from the dialog.
Be sure to assign each stream you want to capture to a sample file
directory.
3. Assign disk space to your cameras back in "Edit cameras and retention".
Leave a little slack (at least 100 MB per camera) between the total limit
and the filesystem capacity, even if you store nothing else on the disk.
There are several reasons this is needed:
* The limit currently controls fully-written files only. There will be up
to two minutes of video per camera of additional video.
* The rotation happens after the limit is exceeded, not proactively.
* Moonfire NVR currently doesn't account for the unused space in the final
filesystem block at the end of each file.
* Moonfire NVR doesn't account for the space used for directory listings.
* If a file is open when it is deleted (such as if a HTTP client is
downloading it), it stays around until the file is closed. Moonfire NVR
currently doesn't account for this.
* The limit currently controls fully-written files only. There will be up
to two minutes of video per camera of additional video.
* The rotation happens after the limit is exceeded, not proactively.
* Moonfire NVR currently doesn't account for the unused space in the final
filesystem block at the end of each file.
* Moonfire NVR doesn't account for the space used for directory listings.
* If a file is open when it is deleted (such as if a HTTP client is
downloading it), it stays around until the file is closed. Moonfire NVR
currently doesn't account for this.
When finished, start the daemon:
@ -168,7 +172,6 @@ been done for you. If not, Create
[Service]
ExecStart=/usr/local/bin/moonfire-nvr run \
--sample-file-dir=/var/lib/moonfire-nvr/sample \
--db-dir=/var/lib/moonfire-nvr/db \
--http-addr=0.0.0.0:8080
Environment=TZ=:/etc/localtime

View File

@ -197,6 +197,8 @@ The general upgrade procedure applies to this upgrade.
Version 2 adds:
* recording of sub streams (splits a new `stream` table out of `camera`)
* support for multiple sample file directories, to take advantage of
multiple hard drives (or multiple RAID volumes).
* records the RFC-6381 codec associated with a video sample entry, so that
logic for determining this is no longer needed as part of the database
layer.

View File

@ -36,6 +36,7 @@ use self::cursive::views;
use db;
use dir;
use error::Error;
use std::collections::BTreeMap;
use std::sync::Arc;
use stream::{self, Opener, Stream};
use super::{decode_size, encode_size};
@ -49,22 +50,33 @@ fn get_change(siv: &mut Cursive) -> db::CameraChange {
let h = siv.find_id::<views::EditView>("host").unwrap().get_content().as_str().into();
let u = siv.find_id::<views::EditView>("username").unwrap().get_content().as_str().into();
let p = siv.find_id::<views::EditView>("password").unwrap().get_content().as_str().into();
let m = siv.find_id::<views::EditView>("main_rtsp_path").unwrap().get_content().as_str().into();
let s = siv.find_id::<views::EditView>("sub_rtsp_path").unwrap().get_content().as_str().into();
db::CameraChange {
let mut c = db::CameraChange {
short_name: sn,
description: d,
host: h,
username: u,
password: p,
rtsp_paths: [m, s],
streams: Default::default(),
};
for &t in &db::ALL_STREAM_TYPES {
let p = siv.find_id::<views::EditView>(&format!("{}_rtsp_path", t.as_str()))
.unwrap().get_content().as_str().into();
let r = siv.find_id::<views::Checkbox>(&format!("{}_record", t.as_str()))
.unwrap().is_checked();
let d = *siv.find_id::<views::SelectView<Option<i32>>>(
&format!("{}_sample_file_dir", t.as_str()))
.unwrap().selection();
c.streams[t.index()] = db::StreamChange {
rtsp_path: p,
sample_file_dir_id: d,
record: r,
};
}
c
}
fn press_edit(siv: &mut Cursive, db: &Arc<db::Database>, dir: &Arc<dir::SampleFileDir>,
id: Option<i32>) {
fn press_edit(siv: &mut Cursive, db: &Arc<db::Database>, id: Option<i32>) {
let change = get_change(siv);
siv.pop_layer(); // get rid of the add/edit camera dialog.
let result = {
let mut l = db.lock();
@ -79,9 +91,11 @@ fn press_edit(siv: &mut Cursive, db: &Arc<db::Database>, dir: &Arc<dir::SampleFi
.title("Error")
.dismiss_button("Abort"));
} else {
siv.pop_layer(); // get rid of the add/edit camera dialog.
// Recreate the "Edit cameras" dialog from scratch; it's easier than adding the new entry.
siv.pop_layer();
add_dialog(db, dir, siv);
top_dialog(db, siv);
}
}
@ -91,25 +105,44 @@ fn press_test_inner(url: &str) -> Result<String, Error> {
Ok(format!("{}x{} video stream", extra_data.width, extra_data.height))
}
fn press_test(siv: &mut Cursive, c: &db::CameraChange, stream: &str, path: &str) {
let url = format!("rtsp://{}:{}@{}{}", c.username, c.password, c.host, path);
let description = match press_test_inner(&url) {
Err(e) => {
siv.add_layer(
views::Dialog::text(format!("{} stream at {}:\n\n{}", stream, url, e))
.title("Stream test failed")
.dismiss_button("Back"));
return;
},
Ok(d) => d,
};
siv.add_layer(views::Dialog::text(format!("{} stream at {}:\n\n{}", stream, url, description))
.title("Stream test succeeded")
.dismiss_button("Back"));
fn press_test(siv: &mut Cursive, t: db::StreamType) {
let c = get_change(siv);
let url = format!("rtsp://{}:{}@{}{}", c.username, c.password, c.host,
c.streams[t.index()].rtsp_path);
siv.add_layer(views::Dialog::text(format!("Testing {} stream at {}. This may take a while \
on timeout or if you have a long key frame interval",
t.as_str(), url))
.title("Testing"));
// Let siv have this thread for its event loop; do the work in a background thread.
// siv.cb_sink doesn't actually wake up the event loop. Tell siv to poll, as a workaround.
siv.set_fps(5);
let sink = siv.cb_sink().clone();
::std::thread::spawn(move || {
let r = press_test_inner(&url);
sink.send(Box::new(move |siv| {
// Polling is no longer necessary.
siv.set_fps(0);
siv.pop_layer();
let description = match r {
Err(ref e) => {
siv.add_layer(
views::Dialog::text(format!("{} stream at {}:\n\n{}", t.as_str(), url, e))
.title("Stream test failed")
.dismiss_button("Back"));
return;
},
Ok(ref d) => d,
};
siv.add_layer(views::Dialog::text(
format!("{} stream at {}:\n\n{}", t.as_str(), url, description))
.title("Stream test succeeded")
.dismiss_button("Back"));
})).unwrap();
});
}
fn press_delete(siv: &mut Cursive, db: &Arc<db::Database>, dir: &Arc<dir::SampleFileDir>, id: i32,
name: String, to_delete: i64) {
fn press_delete(siv: &mut Cursive, db: &Arc<db::Database>, id: i32, name: String, to_delete: i64) {
let dialog = if to_delete > 0 {
let prompt = format!("Camera {} has recorded video. Please confirm the amount \
of data to delete by typing it back:\n\n{}", name,
@ -120,50 +153,51 @@ fn press_delete(siv: &mut Cursive, db: &Arc<db::Database>, dir: &Arc<dir::Sample
.child(views::DummyView)
.child(views::EditView::new().on_submit({
let db = db.clone();
let dir = dir.clone();
move |siv, _| confirm_deletion(siv, &db, &dir, id, to_delete)
move |siv, _| confirm_deletion(siv, &db, id, to_delete)
}).with_id("confirm")))
.button("Delete", {
let db = db.clone();
let dir = dir.clone();
move |siv| confirm_deletion(siv, &db, &dir, id, to_delete)
move |siv| confirm_deletion(siv, &db, id, to_delete)
})
} else {
views::Dialog::text(format!("Delete camera {}? This camera has no recorded video.", name))
.button("Delete", {
let db = db.clone();
let dir = dir.clone();
move |s| actually_delete(s, &db, &dir, id)
move |s| actually_delete(s, &db, id)
})
}.title("Delete camera").dismiss_button("Cancel");
siv.add_layer(dialog);
}
fn confirm_deletion(siv: &mut Cursive, db: &Arc<db::Database>, dir: &Arc<dir::SampleFileDir>,
id: i32, to_delete: i64) {
fn confirm_deletion(siv: &mut Cursive, db: &Arc<db::Database>, id: i32, to_delete: i64) {
let typed = siv.find_id::<views::EditView>("confirm").unwrap().get_content();
if decode_size(typed.as_str()).ok() == Some(to_delete) {
siv.pop_layer(); // deletion confirmation dialog
let mut zero_limits = Vec::new();
let mut zero_limits = BTreeMap::new();
{
let l = db.lock();
for (&stream_id, stream) in l.streams_by_id() {
if stream.camera_id == id {
zero_limits.push(dir::NewLimit {
let dir_id = match stream.sample_file_dir_id {
Some(d) => d,
None => continue,
};
let l = zero_limits.entry(dir_id).or_insert_with(|| Vec::with_capacity(2));
l.push(dir::NewLimit {
stream_id,
limit: 0,
});
}
}
}
if let Err(e) = dir::lower_retention(dir.clone(), &zero_limits) {
if let Err(e) = lower_retention(db, zero_limits) {
siv.add_layer(views::Dialog::text(format!("Unable to delete recordings: {}", e))
.title("Error")
.dismiss_button("Abort"));
return;
}
actually_delete(siv, db, dir, id);
actually_delete(siv, db, id);
} else {
siv.add_layer(views::Dialog::text("Please confirm amount.")
.title("Try again")
@ -171,8 +205,16 @@ fn confirm_deletion(siv: &mut Cursive, db: &Arc<db::Database>, dir: &Arc<dir::Sa
}
}
fn actually_delete(siv: &mut Cursive, db: &Arc<db::Database>, dir: &Arc<dir::SampleFileDir>,
id: i32) {
fn lower_retention(db: &Arc<db::Database>, zero_limits: BTreeMap<i32, Vec<dir::NewLimit>>)
-> Result<(), Error> {
for (dir_id, l) in &zero_limits {
let dir = db.lock().sample_file_dirs_by_id().get(dir_id).unwrap().open()?;
dir::lower_retention(dir, db.clone(), &l)?;
}
Ok(())
}
fn actually_delete(siv: &mut Cursive, db: &Arc<db::Database>, id: i32) {
siv.pop_layer(); // get rid of the add/edit camera dialog.
let result = {
let mut l = db.lock();
@ -185,15 +227,14 @@ fn actually_delete(siv: &mut Cursive, db: &Arc<db::Database>, dir: &Arc<dir::Sam
} else {
// Recreate the "Edit cameras" dialog from scratch; it's easier than adding the new entry.
siv.pop_layer();
add_dialog(db, dir, siv);
top_dialog(db, siv);
}
}
/// Adds or updates a camera.
/// (The former if `item` is None; the latter otherwise.)
fn edit_camera_dialog(db: &Arc<db::Database>, dir: &Arc<dir::SampleFileDir>, siv: &mut Cursive,
item: &Option<i32>) {
let list = views::ListView::new()
fn edit_camera_dialog(db: &Arc<db::Database>, siv: &mut Cursive, item: &Option<i32>) {
let camera_list = views::ListView::new()
.child("id", views::TextView::new(match *item {
None => "<new>".to_string(),
Some(id) => id.to_string(),
@ -203,94 +244,119 @@ fn edit_camera_dialog(db: &Arc<db::Database>, dir: &Arc<dir::SampleFileDir>, siv
.child("host", views::EditView::new().with_id("host"))
.child("username", views::EditView::new().with_id("username"))
.child("password", views::EditView::new().with_id("password"))
.child("main_rtsp_path", views::LinearLayout::horizontal()
.child(views::EditView::new().with_id("main_rtsp_path").full_width())
.child(views::DummyView)
.child(views::Button::new("Test", |siv| {
let c = get_change(siv);
press_test(siv, &c, "main", &c.rtsp_paths[0])
})))
.child("sub_rtsp_path", views::LinearLayout::horizontal()
.child(views::EditView::new().with_id("sub_rtsp_path").full_width())
.child(views::DummyView)
.child(views::Button::new("Test", |siv| {
let c = get_change(siv);
press_test(siv, &c, "sub", &c.rtsp_paths[1])
})))
.min_height(8);
let layout = views::LinearLayout::vertical()
.child(list)
.min_height(6);
let mut layout = views::LinearLayout::vertical()
.child(camera_list)
.child(views::TextView::new("description"))
.child(views::TextArea::new().with_id("description").min_height(3))
.full_width();
.child(views::TextArea::new().with_id("description").min_height(3));
let dirs: Vec<_> = ::std::iter::once(("<none>".to_owned(), None))
.chain(db.lock()
.sample_file_dirs_by_id()
.iter()
.map(|(&id, d)| (d.path.as_str().to_owned(), Some(id))))
.collect();
for &type_ in &db::ALL_STREAM_TYPES {
let list = views::ListView::new()
.child("rtsp path", views::LinearLayout::horizontal()
.child(views::EditView::new()
.with_id(format!("{}_rtsp_path", type_.as_str()))
.full_width())
.child(views::DummyView)
.child(views::Button::new("Test", move |siv| press_test(siv, type_))))
.child("sample file dir",
views::SelectView::<Option<i32>>::new()
.with_all(dirs.iter().map(|d| d.clone()))
.popup()
.with_id(format!("{}_sample_file_dir", type_.as_str())))
.child("record", views::Checkbox::new().with_id(format!("{}_record", type_.as_str())))
.child("usage/capacity",
views::TextView::new("").with_id(format!("{}_usage_cap", type_.as_str())))
.min_height(4);
layout.add_child(views::DummyView);
layout.add_child(views::TextView::new(format!("{} stream", type_.as_str())));
layout.add_child(list);
}
let mut dialog = views::Dialog::around(layout);
let dialog = if let Some(camera_id) = *item {
let l = db.lock();
let camera = l.cameras_by_id().get(&camera_id).expect("missing camera");
dialog.find_id("uuid", |v: &mut views::TextView| v.set_content(camera.uuid.to_string()))
.expect("missing TextView");
let mut main_rtsp_path = "";
let mut sub_rtsp_path = "";
let mut bytes = 0;
for (_, s) in l.streams_by_id() {
if s.camera_id != camera_id { continue; }
bytes += s.sample_file_bytes;
match s.type_ {
db::StreamType::MAIN => main_rtsp_path = &s.rtsp_path,
db::StreamType::SUB => sub_rtsp_path = &s.rtsp_path,
};
for (i, sid) in camera.streams.iter().enumerate() {
let t = db::StreamType::from_index(i).unwrap();
// Find the index into dirs of the stored sample file dir.
let mut selected_dir = 0;
if let Some(s) = sid.map(|sid| l.streams_by_id().get(&sid).unwrap()) {
if let Some(id) = s.sample_file_dir_id {
for (i, &(_, d_id)) in dirs.iter().skip(1).enumerate() {
if Some(id) == d_id {
selected_dir = i + 1;
break;
}
}
}
bytes += s.sample_file_bytes;
let u = if s.retain_bytes == 0 {
"0 / 0 (0.0%)".to_owned()
} else {
format!("{} / {} ({:.1}%)", s.sample_file_bytes, s.retain_bytes,
100. * s.sample_file_bytes as f32 / s.retain_bytes as f32)
};
dialog.find_id(&format!("{}_rtsp_path", t.as_str()),
|v: &mut views::EditView| v.set_content(s.rtsp_path.to_owned()));
dialog.find_id(&format!("{}_usage_cap", t.as_str()),
|v: &mut views::TextView| v.set_content(u));
dialog.find_id(&format!("{}_record", t.as_str()),
|v: &mut views::Checkbox| v.set_checked(s.record));
}
dialog.find_id(&format!("{}_sample_file_dir", t.as_str()),
|v: &mut views::SelectView<Option<i32>>| v.set_selection(selected_dir));
}
let name = camera.short_name.clone();
for &(view_id, content) in &[("short_name", &*camera.short_name),
("host", &*camera.host),
("username", &*camera.username),
("password", &*camera.password),
("main_rtsp_path", main_rtsp_path),
("sub_rtsp_path", sub_rtsp_path)] {
("password", &*camera.password)] {
dialog.find_id(view_id, |v: &mut views::EditView| v.set_content(content.to_string()))
.expect("missing EditView");
}
for s in l.streams_by_id().values() {
if s.camera_id != camera_id { continue };
let id = match s.type_ {
db::StreamType::MAIN => "main_rtsp_path",
db::StreamType::SUB => "sub_rtsp_path",
};
dialog.find_id(id, |v: &mut views::EditView| v.set_content(s.rtsp_path.to_string()))
.expect("missing EditView");
}
dialog.find_id("description",
|v: &mut views::TextArea| v.set_content(camera.description.to_string()))
.expect("missing TextArea");
dialog.title("Edit camera")
.button("Edit", {
let db = db.clone();
let dir = dir.clone();
move |s| press_edit(s, &db, &dir, Some(camera_id))
move |s| press_edit(s, &db, Some(camera_id))
})
.button("Delete", {
let db = db.clone();
let dir = dir.clone();
move |s| press_delete(s, &db, &dir, camera_id, name.clone(), bytes)
move |s| press_delete(s, &db, camera_id, name.clone(), bytes)
})
} else {
for t in &db::ALL_STREAM_TYPES {
dialog.find_id(&format!("{}_usage_cap", t.as_str()),
|v: &mut views::TextView| v.set_content("<new>"));
}
dialog.title("Add camera")
.button("Add", {
let db = db.clone();
let dir = dir.clone();
move |s| press_edit(s, &db, &dir, None)
move |s| press_edit(s, &db, None)
})
};
siv.add_layer(dialog.dismiss_button("Cancel"));
}
pub fn add_dialog(db: &Arc<db::Database>, dir: &Arc<dir::SampleFileDir>, siv: &mut Cursive) {
pub fn top_dialog(db: &Arc<db::Database>, siv: &mut Cursive) {
siv.add_layer(views::Dialog::around(
views::SelectView::new()
.on_submit({
let db = db.clone();
let dir = dir.clone();
move |siv, item| edit_camera_dialog(&db, &dir, siv, item)
move |siv, item| edit_camera_dialog(&db, siv, item)
})
.item("<new camera>".to_string(), None)
.with_all(db.lock()

View File

@ -51,7 +51,7 @@ struct Stream {
struct Model {
db: Arc<db::Database>,
dir: Arc<dir::SampleFileDir>,
dir_id: i32,
fs_capacity: i64,
total_used: i64,
total_retain: i64,
@ -106,9 +106,9 @@ fn edit_limit(model: &RefCell<Model>, siv: &mut Cursive, id: i32, content: &str)
.set_content(if new_value.is_none() { "*" } else { " " });
}
stream.retain = new_value;
info!("model.errors = {}", model.errors);
debug!("model.errors = {}", model.errors);
if (model.errors == 0) != (old_errors == 0) {
info!("toggling change state: errors={}", model.errors);
trace!("toggling change state: errors={}", model.errors);
siv.find_id::<views::Button>("change")
.unwrap()
.set_enabled(model.errors == 0);
@ -144,7 +144,11 @@ fn actually_delete(model: &RefCell<Model>, siv: &mut Cursive) {
.collect();
siv.pop_layer(); // deletion confirmation
siv.pop_layer(); // retention dialog
if let Err(e) = dir::lower_retention(model.dir.clone(), &new_limits[..]) {
let dir = {
let l = model.db.lock();
l.sample_file_dirs_by_id().get(&model.dir_id).unwrap().open().unwrap()
};
if let Err(e) = dir::lower_retention(dir, model.db.clone(), &new_limits[..]) {
siv.add_layer(views::Dialog::text(format!("Unable to delete excess video: {}", e))
.title("Error")
.dismiss_button("Abort"));
@ -179,20 +183,110 @@ fn press_change(model: &Rc<RefCell<Model>>, siv: &mut Cursive) {
.title("Confirm deletion");
siv.add_layer(dialog);
} else {
siv.screen_mut().pop_layer();
siv.pop_layer();
update_limits(&model.borrow(), siv);
}
}
pub fn add_dialog(db: &Arc<db::Database>, dir: &Arc<dir::SampleFileDir>, siv: &mut Cursive) {
pub fn top_dialog(db: &Arc<db::Database>, siv: &mut Cursive) {
siv.add_layer(views::Dialog::around(
views::SelectView::new()
.on_submit({
let db = db.clone();
move |siv, item| match *item {
Some(d) => edit_dir_dialog(&db, siv, d),
None => add_dir_dialog(&db, siv),
}
})
.item("<new sample file dir>".to_string(), None)
.with_all(db.lock()
.sample_file_dirs_by_id()
.iter()
.map(|(&id, d)| (d.path.to_string(), Some(id))))
.full_width())
.dismiss_button("Done")
.title("Edit sample file directories"));
}
fn add_dir_dialog(db: &Arc<db::Database>, siv: &mut Cursive) {
siv.add_layer(
views::Dialog::around(
views::LinearLayout::vertical()
.child(views::TextView::new("path"))
.child(views::EditView::new()
.on_submit({
let db = db.clone();
move |siv, path| add_dir(&db, siv, path)
})
.with_id("path")
.fixed_width(60)))
.button("Add", {
let db = db.clone();
move |siv| {
let path = siv.find_id::<views::EditView>("path").unwrap().get_content();
add_dir(&db, siv, &path)
}
})
.button("Cancel", |siv| siv.pop_layer())
.title("Add sample file directory"));
}
fn add_dir(db: &Arc<db::Database>, siv: &mut Cursive, path: &str) {
if let Err(e) = db.lock().add_sample_file_dir(path.to_owned()) {
siv.add_layer(views::Dialog::text(format!("Unable to add path {}: {}", path, e))
.dismiss_button("Back")
.title("Error"));
return;
}
siv.pop_layer();
// Recreate the edit dialog from scratch; it's easier than adding the new entry.
siv.pop_layer();
top_dialog(db, siv);
}
fn delete_dir_dialog(db: &Arc<db::Database>, siv: &mut Cursive, dir_id: i32) {
siv.add_layer(
views::Dialog::around(
views::TextView::new("Empty (no associated streams)."))
.button("Delete", {
let db = db.clone();
move |siv| {
delete_dir(&db, siv, dir_id)
}
})
.button("Cancel", |siv| siv.pop_layer())
.title("Delete sample file directory"));
}
fn delete_dir(db: &Arc<db::Database>, siv: &mut Cursive, dir_id: i32) {
if let Err(e) = db.lock().delete_sample_file_dir(dir_id) {
siv.add_layer(views::Dialog::text(format!("Unable to delete dir id {}: {}", dir_id, e))
.dismiss_button("Back")
.title("Error"));
return;
}
siv.pop_layer();
// Recreate the edit dialog from scratch; it's easier than adding the new entry.
siv.pop_layer();
top_dialog(db, siv);
}
fn edit_dir_dialog(db: &Arc<db::Database>, siv: &mut Cursive, dir_id: i32) {
let path;
let model = {
let mut streams = BTreeMap::new();
let mut total_used = 0;
let mut total_retain = 0;
let fs_capacity;
{
let db = db.lock();
for (&id, s) in db.streams_by_id() {
let c = db.cameras_by_id().get(&s.camera_id).expect("stream without camera");
let l = db.lock();
for (&id, s) in l.streams_by_id() {
let c = l.cameras_by_id().get(&s.camera_id).expect("stream without camera");
if s.sample_file_dir_id != Some(dir_id) {
continue;
}
streams.insert(id, Stream {
label: format!("{}: {}: {}", id, c.short_name, s.type_.as_str()),
used: s.sample_file_bytes,
@ -202,11 +296,18 @@ pub fn add_dialog(db: &Arc<db::Database>, dir: &Arc<dir::SampleFileDir>, siv: &m
total_used += s.sample_file_bytes;
total_retain += s.retain_bytes;
}
if streams.is_empty() {
return delete_dir_dialog(db, siv, dir_id);
}
let dir = l.sample_file_dirs_by_id().get(&dir_id).unwrap();
// TODO: go another way if open fails.
let stat = dir.open().unwrap().statfs().unwrap();
fs_capacity = stat.f_bsize as i64 * stat.f_bavail as i64 + total_used;
path = dir.path.clone();
}
let stat = dir.statfs().unwrap();
let fs_capacity = stat.f_bsize as i64 * stat.f_bavail as i64 + total_used;
Rc::new(RefCell::new(Model{
dir: dir.clone(),
Rc::new(RefCell::new(Model {
dir_id,
db: db.clone(),
fs_capacity,
total_used,
@ -217,7 +318,7 @@ pub fn add_dialog(db: &Arc<db::Database>, dir: &Arc<dir::SampleFileDir>, siv: &m
};
const RECORD_WIDTH: usize = 8;
const BYTES_WIDTH: usize = 20;
const BYTES_WIDTH: usize = 22;
let mut list = views::ListView::new();
list.add_child(
@ -276,12 +377,12 @@ pub fn add_dialog(db: &Arc<db::Database>, dir: &Arc<dir::SampleFileDir>, siv: &m
.child(views::DummyView.full_width());
buttons.add_child(change_button.with_id("change"));
buttons.add_child(views::DummyView);
buttons.add_child(views::Button::new("Cancel", |siv| siv.screen_mut().pop_layer()));
buttons.add_child(views::Button::new("Cancel", |siv| siv.pop_layer()));
siv.add_layer(
views::Dialog::around(
views::LinearLayout::vertical()
.child(list)
.child(views::DummyView)
.child(buttons))
.title("Edit retention"));
.title(format!("Edit retention for {}", path)));
}

View File

@ -38,7 +38,6 @@ extern crate cursive;
use self::cursive::Cursive;
use self::cursive::views;
use db;
use dir;
use error::Error;
use regex::Regex;
use std::sync::Arc;
@ -46,7 +45,7 @@ use std::fmt::Write;
use std::str::FromStr;
mod cameras;
mod retention;
mod dirs;
static USAGE: &'static str = r#"
Interactive configuration editor.
@ -61,9 +60,6 @@ Options:
--db-dir=DIR Set the directory holding the SQLite3 index database.
This is typically on a flash device.
[default: /var/lib/moonfire-nvr/db]
--sample-file-dir=DIR Set the directory holding video data.
This is typically on a hard drive.
[default: /var/lib/moonfire-nvr/sample]
"#;
static MULTIPLIERS: [(char, u64); 4] = [
@ -123,28 +119,24 @@ fn decode_size(encoded: &str) -> Result<i64, ()> {
#[derive(Debug, Deserialize)]
struct Args {
flag_db_dir: String,
flag_sample_file_dir: String,
}
pub fn run() -> Result<(), Error> {
let args: Args = super::parse_args(USAGE)?;
let (_db_dir, conn) = super::open_conn(&args.flag_db_dir, super::OpenMode::ReadWrite)?;
let db = Arc::new(db::Database::new(conn)?);
//let dir = Arc::new(dir::Fd::open(&args.flag_sample_file_dir)?);
let dir = dir::SampleFileDir::new(&args.flag_sample_file_dir, db.clone())?;
let mut siv = Cursive::new();
//siv.add_global_callback('q', |s| s.quit());
siv.add_layer(views::Dialog::around(
views::SelectView::<fn(&Arc<db::Database>, &Arc<dir::SampleFileDir>, &mut Cursive)>::new()
views::SelectView::<fn(&Arc<db::Database>, &mut Cursive)>::new()
.on_submit({
let db = db.clone();
let dir = dir.clone();
move |siv, item| item(&db, &dir, siv)
move |siv, item| item(&db, siv)
})
.item("Edit cameras".to_string(), cameras::add_dialog)
.item("Edit retention".to_string(), retention::add_dialog)
.item("Directories and retention".to_string(), dirs::top_dialog)
.item("Cameras and streams".to_string(), cameras::top_dialog)
)
.button("Quit", |siv| siv.quit())
.title("Main menu"));

View File

@ -75,7 +75,7 @@ enum OpenMode {
/// Locks and opens the database.
/// The returned `dir::Fd` holds the lock and should be kept open as long as the `Connection` is.
fn open_conn(db_dir: &str, mode: OpenMode) -> Result<(dir::Fd, rusqlite::Connection), Error> {
let dir = dir::Fd::open(db_dir)?;
let dir = dir::Fd::open(db_dir, mode == OpenMode::Create)?;
let ro = mode == OpenMode::ReadOnly;
dir.lock(if ro { libc::LOCK_SH } else { libc::LOCK_EX } | libc::LOCK_NB)
.map_err(|e| Error{description: format!("db dir {:?} already in use; can't get {} lock",

View File

@ -32,6 +32,7 @@ use clock;
use db;
use dir;
use error::Error;
use fnv::FnvHashMap;
use futures::{Future, Stream};
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
@ -55,9 +56,6 @@ Options:
--db-dir=DIR Set the directory holding the SQLite3 index database.
This is typically on a flash device.
[default: /var/lib/moonfire-nvr/db]
--sample-file-dir=DIR Set the directory holding video data.
This is typically on a hard drive.
[default: /var/lib/moonfire-nvr/sample]
--ui-dir=DIR Set the directory with the user interface files (.html, .js, etc).
[default: /usr/local/lib/moonfire-nvr/ui]
--http-addr=ADDR Set the bind address for the unencrypted HTTP server.
@ -68,7 +66,6 @@ Options:
#[derive(Debug, Deserialize)]
struct Args {
flag_db_dir: String,
flag_sample_file_dir: String,
flag_http_addr: String,
flag_ui_dir: String,
flag_read_only: bool,
@ -92,48 +89,90 @@ fn resolve_zone() -> String {
p[ZONEINFO_PATH.len()..].into()
}
struct Syncer {
dir: Arc<dir::SampleFileDir>,
channel: dir::SyncerChannel,
join: thread::JoinHandle<()>,
}
pub fn run() -> Result<(), Error> {
let args: Args = super::parse_args(USAGE)?;
let (_db_dir, conn) = super::open_conn(
&args.flag_db_dir,
if args.flag_read_only { super::OpenMode::ReadOnly } else { super::OpenMode::ReadWrite })?;
let db = Arc::new(db::Database::new(conn).unwrap());
// TODO: multiple sample file dirs.
let dir = dir::SampleFileDir::new(&args.flag_sample_file_dir, db.clone()).unwrap();
info!("Database is loaded.");
let s = web::Service::new(db.clone(), dir.clone(), Some(&args.flag_ui_dir), resolve_zone())?;
let s = web::Service::new(db.clone(), Some(&args.flag_ui_dir), resolve_zone())?;
// Start a streamer for each stream.
let shutdown_streamers = Arc::new(AtomicBool::new(false));
let mut streamers = Vec::new();
let syncer = if !args.flag_read_only {
let (syncer_channel, syncer_join) = dir::start_syncer(dir.clone()).unwrap();
let syncers = if !args.flag_read_only {
let l = db.lock();
let mut dirs = FnvHashMap::with_capacity_and_hasher(
l.sample_file_dirs_by_id().len(), Default::default());
let streams = l.streams_by_id().len();
let env = streamer::Environment{
let env = streamer::Environment {
db: &db,
dir: &dir,
clocks: &clock::REAL,
opener: &*stream::FFMPEG,
shutdown: &shutdown_streamers,
};
// Create directories for streams that need them.
for stream in l.streams_by_id().values() {
if let (Some(id), true) = (stream.sample_file_dir_id, stream.record) {
dirs.entry(id).or_insert_with(|| {
let d = l.sample_file_dirs_by_id().get(&id).unwrap();
info!("Starting syncer for path {}", d.path);
d.open()
});
}
}
// Then, with the lock dropped, create syncers.
drop(l);
let mut syncers = FnvHashMap::with_capacity_and_hasher(dirs.len(), Default::default());
for (id, dir) in dirs.drain() {
let dir = dir?;
let (channel, join) = dir::start_syncer(dir.clone(), db.clone())?;
syncers.insert(id, Syncer {
dir,
channel,
join,
});
}
// Then start up streams.
let l = db.lock();
for (i, (id, stream)) in l.streams_by_id().iter().enumerate() {
if !stream.record {
continue;
}
let camera = l.cameras_by_id().get(&stream.camera_id).unwrap();
let sample_file_dir_id = match stream.sample_file_dir_id {
Some(s) => s,
None => {
warn!("Can't record stream {} ({}/{}) because it has no sample file dir",
id, camera.short_name, stream.type_.as_str());
continue;
},
};
let rotate_offset_sec = streamer::ROTATE_INTERVAL_SEC * i as i64 / streams as i64;
let mut streamer = streamer::Streamer::new(&env, syncer_channel.clone(), *id, camera,
stream, rotate_offset_sec,
let syncer = syncers.get(&sample_file_dir_id).unwrap();
let mut streamer = streamer::Streamer::new(&env, syncer.dir.clone(),
syncer.channel.clone(), *id, camera, stream,
rotate_offset_sec,
streamer::ROTATE_INTERVAL_SEC);
info!("Starting streamer for {}", streamer.short_name());
let name = format!("s-{}", streamer.short_name());
streamers.push(thread::Builder::new().name(name).spawn(move|| {
streamer.run();
}).expect("can't create thread"));
}
Some((syncer_channel, syncer_join))
drop(l);
Some(syncers)
} else { None };
// Start the web interface.
@ -153,10 +192,11 @@ pub fn run() -> Result<(), Error> {
streamer.join().unwrap();
}
if let Some((syncer_channel, syncer_join)) = syncer {
info!("Shutting down syncer.");
drop(syncer_channel);
syncer_join.join().unwrap();
if let Some(mut ss) = syncers {
for (_, s) in ss.drain() {
drop(s.channel);
s.join.join().unwrap();
}
}
info!("Exiting.");

View File

@ -49,9 +49,8 @@ Options:
--db-dir=DIR Set the directory holding the SQLite3 index database.
This is typically on a flash device.
[default: /var/lib/moonfire-nvr/db]
--sample-file-dir=DIR Set the directory holding video data.
--sample-file-dir=DIR When upgrading from schema version 1 to 2, the sample file directory.
This is typically on a hard drive.
[default: /var/lib/moonfire-nvr/sample]
--preset-journal=MODE Resets the SQLite journal_mode to the specified mode
prior to the upgrade. The default, delete, is
recommended. off is very dangerous but may be
@ -65,15 +64,15 @@ Options:
const UPGRADE_NOTES: &'static str =
concat!("upgraded using moonfire-nvr ", env!("CARGO_PKG_VERSION"));
const UPGRADERS: [fn(&rusqlite::Transaction) -> Result<(), Error>; 2] = [
const UPGRADERS: [fn(&rusqlite::Transaction, &Args) -> Result<(), Error>; 2] = [
v0_to_v1::run,
v1_to_v2::run,
];
#[derive(Debug, Deserialize)]
struct Args {
pub struct Args {
flag_db_dir: String,
flag_sample_file_dir: String,
flag_sample_file_dir: Option<String>,
flag_preset_journal: String,
flag_no_vacuum: bool,
}
@ -105,7 +104,7 @@ pub fn run() -> Result<(), Error> {
for ver in old_ver .. db::EXPECTED_VERSION {
info!("...from version {} to version {}", ver, ver + 1);
let tx = conn.transaction()?;
UPGRADERS[ver as usize](&tx)?;
UPGRADERS[ver as usize](&tx, &args)?;
tx.execute(r#"
insert into version (id, unix_time, notes)
values (?, cast(strftime('%s', 'now') as int32), ?)

View File

@ -37,7 +37,7 @@ use rusqlite;
use std::collections::HashMap;
use strutil;
pub fn run(tx: &rusqlite::Transaction) -> Result<(), Error> {
pub fn run(tx: &rusqlite::Transaction, _args: &super::Args) -> Result<(), Error> {
// These create statements match the schema.sql when version 1 was the latest.
tx.execute_batch(r#"
alter table camera rename to old_camera;

View File

@ -33,8 +33,32 @@
use error::Error;
use rusqlite;
pub fn run(tx: &rusqlite::Transaction) -> Result<(), Error> {
pub fn run(tx: &rusqlite::Transaction, args: &super::Args) -> Result<(), Error> {
// These create statements match the schema.sql when version 2 was the latest.
tx.execute_batch(r#"
create table sample_file_dir (
id integer primary key,
path text unique not null,
uuid blob unique not null check (length(uuid) = 16)
);
"#)?;
{
let mut stmt = tx.prepare_cached(r#"
insert into sample_file_dir (path, uuid)
values (:path, :uuid)
"#)?;
let uuid = ::uuid::Uuid::new_v4();
let uuid_bytes = &uuid.as_bytes()[..];
let path = args.flag_sample_file_dir
.as_ref()
.ok_or_else(|| Error::new("--sample-file-dir required when upgrading from
schema version 1 to 2.".to_owned()))?;
stmt.execute_named(&[
(":path", &path.as_str()),
(":uuid", &uuid_bytes),
])?;
}
tx.execute_batch(r#"
alter table camera rename to old_camera;
alter table recording rename to old_recording;
@ -54,6 +78,7 @@ pub fn run(tx: &rusqlite::Transaction) -> Result<(), Error> {
create table stream (
id integer primary key,
camera_id integer not null references camera (id),
sample_file_dir_id integer references sample_file_dir (id),
type text not null check (type in ('main', 'sub')),
record integer not null check (record in (1, 0)),
rtsp_path text not null,
@ -113,29 +138,32 @@ pub fn run(tx: &rusqlite::Transaction) -> Result<(), Error> {
-- Insert main streams using the same id as the camera, to ease changing recordings.
insert into stream
select
id,
id,
old_camera.id,
old_camera.id,
sample_file_dir.id,
'main',
1,
main_rtsp_path,
retain_bytes,
next_recording_id
old_camera.main_rtsp_path,
old_camera.retain_bytes,
old_camera.next_recording_id
from
old_camera;
old_camera cross join sample_file_dir;
-- Insert sub stream (if path is non-empty) using any id.
insert into stream (camera_id, type, record, rtsp_path, retain_bytes, next_recording_id)
insert into stream (camera_id, sample_file_dir_id, type, record, rtsp_path, retain_bytes,
next_recording_id)
select
id,
old_camera.id,
sample_file_dir.id,
'sub',
0,
sub_rtsp_path,
old_camera.sub_rtsp_path,
0,
0
from
old_camera
old_camera cross join sample_file_dir
where
sub_rtsp_path != '';
old_camera.sub_rtsp_path != '';
insert into recording
select

373
src/db.rs
View File

@ -51,6 +51,7 @@
//! * the `Transaction` interface allows callers to batch write operations to reduce latency and
//! SSD write cycles.
use dir;
use error::{Error, ResultExt};
use fnv;
use lru_cache::LruCache;
@ -352,6 +353,26 @@ pub struct StreamDayValue {
pub duration: recording::Duration,
}
#[derive(Debug)]
pub struct SampleFileDir {
pub id: i32,
pub path: String,
pub uuid: Uuid,
dir: RefCell<Option<Arc<dir::SampleFileDir>>>,
}
impl SampleFileDir {
pub fn open(&self) -> Result<Arc<dir::SampleFileDir>, Error> {
let mut d = self.dir.borrow_mut();
if let Some(ref d) = *d {
return Ok(d.clone());
}
let dir = dir::SampleFileDir::open(&self.path)?;
*d = Some(dir.clone());
Ok(dir)
}
}
/// In-memory state about a camera.
#[derive(Debug)]
pub struct Camera {
@ -400,10 +421,13 @@ impl StreamType {
}
}
#[derive(Debug)]
pub const ALL_STREAM_TYPES: [StreamType; 2] = [StreamType::MAIN, StreamType::SUB];
#[derive(Clone, Debug)]
pub struct Stream {
pub id: i32,
pub camera_id: i32,
pub sample_file_dir_id: Option<i32>,
pub type_: StreamType,
pub rtsp_path: String,
pub retain_bytes: i64,
@ -423,6 +447,13 @@ pub struct Stream {
next_recording_id: i32,
}
#[derive(Debug, Default)]
pub struct StreamChange {
pub sample_file_dir_id: Option<i32>,
pub rtsp_path: String,
pub record: bool,
}
/// Information about a camera, used by `add_camera` and `update_camera`.
#[derive(Debug)]
pub struct CameraChange {
@ -431,7 +462,11 @@ pub struct CameraChange {
pub host: String,
pub username: String,
pub password: String,
pub rtsp_paths: [String; 2],
/// `StreamType t` is represented by `streams[t.index()]`. A default StreamChange will
/// correspond to no stream in the database, provided there are no existing recordings for that
/// stream.
pub streams: [StreamChange; 2],
}
/// Adds non-zero `delta` to the day represented by `day` in the map `m`.
@ -564,6 +599,7 @@ pub struct LockedDatabase {
/// while its underlying `rusqlite::Transaction` is borrowing `conn`.
#[derive(Debug)]
struct State {
sample_file_dirs_by_id: BTreeMap<i32, SampleFileDir>,
cameras_by_id: BTreeMap<i32, Camera>,
streams_by_id: BTreeMap<i32, Stream>,
cameras_by_uuid: BTreeMap<Uuid, i32>,
@ -878,54 +914,139 @@ impl<'a> Transaction<'a> {
}
}
struct StreamInserter<'tx> {
tx: &'tx rusqlite::Transaction<'tx>,
stmt: rusqlite::Statement<'tx>,
new_streams: BTreeMap<i32, Stream>,
/// Inserts, updates, or removes streams in the `State` object to match a set of `StreamChange`
/// structs.
struct StreamStateChanger {
sids: [Option<i32>; 2],
streams: Vec<(i32, Option<Stream>)>,
}
impl<'tx> StreamInserter<'tx> {
fn new(tx: &'tx rusqlite::Transaction) -> Result<Self, Error> {
let stmt = tx.prepare(r#"
insert into stream (camera_id, type, rtsp_path, record, retain_bytes, next_recording_id)
values (:camera_id, :type, :rtsp_path, 0, 0, 1)
"#)?;
Ok(StreamInserter {
tx,
stmt,
new_streams: BTreeMap::new(),
impl StreamStateChanger {
/// Performs the database updates (guarded by the given transaction) and returns the state
/// change to be applied on successful commit.
fn new(tx: &rusqlite::Transaction, camera_id: i32, existing: Option<&Camera>,
streams_by_id: &BTreeMap<i32, Stream>, change: &mut CameraChange)
-> Result<Self, Error> {
let mut sids = [None; 2];
let mut streams = Vec::with_capacity(2);
let existing_streams = existing.map(|e| e.streams).unwrap_or_default();
for (i, ref mut sc) in change.streams.iter_mut().enumerate() {
let mut have_data = false;
if let Some(sid) = existing_streams[i] {
let s = streams_by_id.get(&sid).unwrap();
if s.range.is_some() {
have_data = true;
if let (Some(d), false) = (s.sample_file_dir_id,
s.sample_file_dir_id == sc.sample_file_dir_id) {
return Err(Error::new(format!("can't change sample_file_dir_id \
{:?}->{:?} for non-empty stream {}",
d, sc.sample_file_dir_id, sid)));
}
}
if !have_data && sc.rtsp_path.is_empty() && sc.sample_file_dir_id.is_none() &&
!sc.record {
// Delete stream.
let mut stmt = tx.prepare_cached(r#"
delete from stream where id = ?
"#)?;
if stmt.execute(&[&sid])? != 1 {
return Err(Error::new(format!("missing stream {}", sid)));
}
streams.push((sid, None));
} else {
// Update stream.
let mut stmt = tx.prepare_cached(r#"
update stream set
rtsp_path = :rtsp_path,
record = :record,
sample_file_dir_id = :sample_file_dir_id
where
id = :id
"#)?;
let rows = stmt.execute_named(&[
(":rtsp_path", &sc.rtsp_path),
(":record", &sc.record),
(":sample_file_dir_id", &sc.sample_file_dir_id),
(":id", &sid),
])?;
if rows != 1 {
return Err(Error::new(format!("missing stream {}", sid)));
}
sids[i] = Some(sid);
let s = (*s).clone();
streams.push((sid, Some(Stream {
sample_file_dir_id: sc.sample_file_dir_id,
rtsp_path: mem::replace(&mut sc.rtsp_path, String::new()),
record: sc.record,
..s
})));
}
} else {
if sc.rtsp_path.is_empty() && sc.sample_file_dir_id.is_none() && !sc.record {
// Do nothing; there is no record and we want to keep it that way.
continue;
}
// Insert stream.
let mut stmt = tx.prepare_cached(r#"
insert into stream (camera_id, sample_file_dir_id, type, rtsp_path, record,
retain_bytes, next_recording_id)
values (:camera_id, :sample_file_dir_id, :type, :rtsp_path, :record,
0, 1)
"#)?;
let type_ = StreamType::from_index(i).unwrap();
stmt.execute_named(&[
(":camera_id", &camera_id),
(":sample_file_dir_id", &sc.sample_file_dir_id),
(":type", &type_.as_str()),
(":rtsp_path", &sc.rtsp_path),
(":record", &sc.record),
])?;
let id = tx.last_insert_rowid() as i32;
sids[i] = Some(id);
streams.push((id, Some(Stream {
id,
type_,
camera_id,
sample_file_dir_id: sc.sample_file_dir_id,
rtsp_path: mem::replace(&mut sc.rtsp_path, String::new()),
retain_bytes: 0,
range: None,
sample_file_bytes: 0,
duration: recording::Duration(0),
days: BTreeMap::new(),
record: sc.record,
next_recording_id: 1,
})));
}
}
Ok(StreamStateChanger {
sids,
streams,
})
}
fn add(&mut self, camera_id: i32, type_: StreamType, rtsp_path: String) -> Result<(), Error> {
self.stmt.execute_named(&[
(":camera_id", &camera_id),
(":type", &type_.as_str()),
(":rtsp_path", &rtsp_path)
])?;
let id = self.tx.last_insert_rowid() as i32;
self.new_streams.insert(id, Stream {
id,
type_,
camera_id,
rtsp_path,
retain_bytes: 0,
range: None,
sample_file_bytes: 0,
duration: recording::Duration(0),
days: BTreeMap::new(),
record: false,
next_recording_id: 1,
});
Ok(())
/// Applies the change to the given `streams_by_id`. The caller is expected to set
/// `Camera::streams` to the return value.
fn apply(mut self, streams_by_id: &mut BTreeMap<i32, Stream>) -> [Option<i32>; 2] {
for (id, mut stream) in self.streams.drain(..) {
use ::std::collections::btree_map::Entry;
match (streams_by_id.entry(id), stream) {
(Entry::Vacant(mut e), Some(new)) => { e.insert(new); },
(Entry::Vacant(_), None) => {},
(Entry::Occupied(mut e), Some(new)) => { e.insert(new); },
(Entry::Occupied(mut e), None) => { e.remove(); },
};
}
self.sids
}
fn streams(self) -> BTreeMap<i32, Stream> { self.new_streams }
}
impl LockedDatabase {
/// Returns an immutable view of the cameras by id.
pub fn cameras_by_id(&self) -> &BTreeMap<i32, Camera> { &self.state.cameras_by_id }
pub fn sample_file_dirs_by_id(&self) -> &BTreeMap<i32, SampleFileDir> {
&self.state.sample_file_dirs_by_id
}
pub fn streams_by_id(&self) -> &BTreeMap<i32, Stream> { &self.state.streams_by_id }
/// Returns an immutable view of the video sample entries.
@ -1198,6 +1319,34 @@ impl LockedDatabase {
Ok(())
}
/// Initializes the sample file dirs.
/// To be called during construction.
fn init_sample_file_dirs(&mut self) -> Result<(), Error> {
info!("Loading sample file dirs");
let mut stmt = self.conn.prepare(r#"
select
id,
path,
uuid
from
sample_file_dir;
"#)?;
let mut rows = stmt.query(&[])?;
while let Some(row) = rows.next() {
let row = row?;
let id = row.get_checked(0)?;
let uuid: FromSqlUuid = row.get_checked(2)?;
self.state.sample_file_dirs_by_id.insert(id, SampleFileDir {
id,
uuid: uuid.0,
path: row.get_checked(1)?,
dir: RefCell::new(None),
});
}
info!("Loaded {} sample file dirs", self.state.sample_file_dirs_by_id.len());
Ok(())
}
/// Initializes the cameras, but not their matching recordings.
/// To be called during construction.
fn init_cameras(&mut self) -> Result<(), Error> {
@ -1244,6 +1393,7 @@ impl LockedDatabase {
id,
type,
camera_id,
sample_file_dir_id,
rtsp_path,
retain_bytes,
next_recording_id,
@ -1259,21 +1409,24 @@ impl LockedDatabase {
let type_ = StreamType::parse(&type_).ok_or_else(
|| Error::new(format!("no such stream type {}", type_)))?;
let camera_id = row.get_checked(2)?;
let c = self.state
.cameras_by_id
.get_mut(&camera_id)
.ok_or_else(|| Error::new("missing camera".to_owned()))?;
self.state.streams_by_id.insert(id, Stream {
id,
type_,
camera_id,
rtsp_path: row.get_checked(3)?,
retain_bytes: row.get_checked(4)?,
sample_file_dir_id: row.get_checked(3)?,
rtsp_path: row.get_checked(4)?,
retain_bytes: row.get_checked(5)?,
range: None,
sample_file_bytes: 0,
duration: recording::Duration(0),
days: BTreeMap::new(),
next_recording_id: row.get_checked(5)?,
record: row.get_checked(6)?,
next_recording_id: row.get_checked(6)?,
record: row.get_checked(7)?,
});
let c = self.state.cameras_by_id.get_mut(&camera_id)
.ok_or_else(|| Error::new("missing camera".to_owned()))?;
c.streams[type_.index()] = Some(id);
}
info!("Loaded {} streams", self.state.streams_by_id.len());
@ -1324,16 +1477,61 @@ impl LockedDatabase {
Ok(id)
}
pub fn add_sample_file_dir(&mut self, path: String) -> Result<i32, Error> {
let dir = dir::SampleFileDir::create(&path)?;
let uuid = Uuid::new_v4();
let uuid_bytes = &uuid.as_bytes()[..];
let tx = self.conn.transaction()?;
{
let mut stmt = tx.prepare_cached(r#"
insert into sample_file_dir (path, uuid) values (:path, :uuid)
"#)?;
stmt.execute_named(&[
(":uuid", &uuid_bytes),
(":path", &path),
])?;
}
let id = tx.last_insert_rowid() as i32;
tx.commit()?;
self.state.sample_file_dirs_by_id.insert(id, SampleFileDir {
id,
path,
uuid,
dir: RefCell::new(Some(dir)),
});
Ok(id)
}
pub fn delete_sample_file_dir(&mut self, dir_id: i32) -> Result<(), Error> {
for (&id, s) in self.state.streams_by_id.iter() {
if s.sample_file_dir_id == Some(dir_id) {
return Err(Error::new(format!("can't delete dir referenced by stream {}", id)));
}
}
let tx = self.conn.transaction()?;
{
let mut stmt = tx.prepare_cached(r#"
delete from sample_file_dir where id = ?
"#)?;
if stmt.execute(&[&dir_id])? != 1 {
return Err(Error::new(format!("no such dir {} to remove", dir_id)));
}
}
tx.commit()?;
self.state.sample_file_dirs_by_id.remove(&dir_id).expect("sample file dir should exist!");
Ok(())
}
/// Adds a camera.
pub fn add_camera(&mut self, mut camera: CameraChange) -> Result<i32, Error> {
let uuid = Uuid::new_v4();
let uuid_bytes = &uuid.as_bytes()[..];
let tx = self.conn.transaction()?;
let mut new_streams;
let streams;
let camera_id;
{
let mut stmt = tx.prepare_cached(r#"
insert into camera (uuid, short_name, description, host, username, password)
insert into camera (uuid, short_name, description, host, username, password)
values (:uuid, :short_name, :description, :host, :username, :password)
"#)?;
stmt.execute_named(&[
@ -1345,20 +1543,11 @@ impl LockedDatabase {
(":password", &camera.password),
])?;
camera_id = tx.last_insert_rowid() as i32;
let mut inserter = StreamInserter::new(&tx)?;
for (i, ref mut rtsp_path) in camera.rtsp_paths.iter_mut().enumerate() {
if rtsp_path.is_empty() { continue; }
inserter.add(camera_id, StreamType::from_index(i).unwrap(),
mem::replace(rtsp_path, String::new()))?;
}
new_streams = inserter.streams();
streams = StreamStateChanger::new(&tx, camera_id, None, &self.state.streams_by_id,
&mut camera)?;
}
tx.commit()?;
let mut streams = [None, None];
for (&id, s) in &new_streams {
streams[s.type_.index()] = Some(id);
}
self.state.streams_by_id.append(&mut new_streams);
let streams = streams.apply(&mut self.state.streams_by_id);
self.state.cameras_by_id.insert(camera_id, Camera {
id: camera_id,
uuid,
@ -1375,41 +1564,16 @@ impl LockedDatabase {
/// Updates a camera.
pub fn update_camera(&mut self, camera_id: i32, mut camera: CameraChange) -> Result<(), Error> {
// TODO: sample_file_dir_id. disallow change when data is stored; change otherwise.
let tx = self.conn.transaction()?;
let mut new_streams;
let mut stream_rtsp_changes = BTreeMap::new();
let streams;
let c = self.state
.cameras_by_id
.get_mut(&camera_id)
.ok_or_else(|| Error::new(format!("no such camera {}", camera_id)))?;
{
let mut stream_ids = [None; 2];
let mut stream_update_stmt = tx.prepare_cached(r#"
update stream set
rtsp_path = :rtsp_path
where
id = :id
"#)?;
for (&stream_id, stream) in &self.state.streams_by_id {
if stream.camera_id != camera_id {
continue;
}
stream_ids[stream.type_.index()] = Some(stream_id);
let p = mem::replace(&mut camera.rtsp_paths[stream.type_.index()], String::new());
let rows = stream_update_stmt.execute_named(&[
(":id", &stream_id),
(":rtsp_path", &p),
])?;
if rows != 1 {
return Err(Error::new(format!("Stream {} missing from database",
stream_id)));
}
stream_rtsp_changes.insert(stream_id, p);
}
let mut inserter = StreamInserter::new(&tx)?;
for (index, id) in stream_ids.iter().enumerate() {
if id.is_none() && !camera.rtsp_paths[index].is_empty() {
inserter.add(camera_id, StreamType::from_index(index).unwrap(),
mem::replace(&mut camera.rtsp_paths[index], String::new()))?;
}
}
new_streams = inserter.streams();
streams = StreamStateChanger::new(&tx, camera_id, Some(c), &self.state.streams_by_id,
&mut camera)?;
let mut stmt = tx.prepare_cached(r#"
update camera set
short_name = :short_name,
@ -1433,21 +1597,12 @@ impl LockedDatabase {
}
}
tx.commit()?;
let c = self.state.cameras_by_id.get_mut(&camera_id).unwrap();
c.short_name = camera.short_name;
c.description = camera.description;
c.host = camera.host;
c.username = camera.username;
c.password = camera.password;
for (&id, s) in &new_streams {
c.streams[s.type_.index()] = Some(id);
}
self.state.streams_by_id.append(&mut new_streams);
for (id, p) in &mut stream_rtsp_changes {
let mut s = self.state.streams_by_id.get_mut(id)
.ok_or_else(|| Error::new(format!("stream {} missing", id)))?;
mem::swap(&mut s.rtsp_path, p);
}
c.streams = streams.apply(&mut self.state.streams_by_id);
Ok(())
}
@ -1481,6 +1636,7 @@ impl LockedDatabase {
for id in streams_to_delete {
self.state.streams_by_id.remove(&id);
}
self.state.cameras_by_id.remove(&id);
self.state.cameras_by_uuid.remove(&uuid);
return Ok(())
}
@ -1556,6 +1712,7 @@ impl Database {
let db = Database(Mutex::new(LockedDatabase{
conn: conn,
state: State {
sample_file_dirs_by_id: BTreeMap::new(),
cameras_by_id: BTreeMap::new(),
cameras_by_uuid: BTreeMap::new(),
streams_by_id: BTreeMap::new(),
@ -1567,6 +1724,7 @@ impl Database {
{
let l = &mut *db.lock();
l.init_video_sample_entries().annotate_err("init_video_sample_entries")?;
l.init_sample_file_dirs().annotate_err("init_sample_file_dirs")?;
l.init_cameras().annotate_err("init_cameras")?;
l.init_streams().annotate_err("init_streams")?;
for (&stream_id, ref mut stream) in &mut l.state.streams_by_id {
@ -1593,6 +1751,8 @@ impl Database {
#[cfg(test)]
mod tests {
extern crate tempdir;
use core::cmp::Ord;
use recording::{self, TIME_UNITS_PER_SEC};
use rusqlite::Connection;
@ -1808,15 +1968,18 @@ mod tests {
testutil::init();
let conn = setup_conn();
let db = Database::new(conn).unwrap();
let tmpdir = tempdir::TempDir::new("moonfire-nvr-test").unwrap();
let path = tmpdir.path().to_str().unwrap().to_owned();
let sample_file_dir_id = Some({ db.lock() }.add_sample_file_dir(path).unwrap());
let camera_id = { db.lock() }.add_camera(CameraChange {
short_name: "testcam".to_owned(),
description: "".to_owned(),
host: "test-camera".to_owned(),
username: "foo".to_owned(),
password: "bar".to_owned(),
rtsp_paths: [
"/main".to_owned(),
"/sub".to_owned(),
streams: [
StreamChange { sample_file_dir_id, rtsp_path: "/main".to_owned(), record: true },
StreamChange { sample_file_dir_id, rtsp_path: "/sub".to_owned(), record: true },
],
}).unwrap();
{

View File

@ -48,16 +48,14 @@ use std::sync::mpsc;
use std::thread;
use uuid::Uuid;
/// A sample file directory. This is currently a singleton in production. (Maybe in the future
/// Moonfire will be extended to support multiple directories on different spindles.)
/// A sample file directory. Typically one per physical disk drive.
///
/// If the directory is used for writing, the `start_syncer` function should be called to start
/// a background thread. This thread manages deleting files and writing new files. It synces the
/// directory and commits these operations to the database in the correct order to maintain the
/// invariants described in `design/schema.md`.
#[derive(Debug)]
pub struct SampleFileDir {
db: Arc<db::Database>,
/// The open file descriptor for the directory. The worker uses it to create files and sync the
/// directory. Other threads use it to open sample files for reading during video serving.
fd: Fd,
@ -67,6 +65,7 @@ pub struct SampleFileDir {
}
/// A file descriptor associated with a directory (not necessarily the sample file dir).
#[derive(Debug)]
pub struct Fd(libc::c_int);
impl Drop for Fd {
@ -80,9 +79,15 @@ impl Drop for Fd {
impl Fd {
/// Opens the given path as a directory.
pub fn open(path: &str) -> Result<Fd, io::Error> {
pub fn open(path: &str, mkdir: bool) -> Result<Fd, io::Error> {
let cstring = ffi::CString::new(path)
.map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e))?;
if mkdir && unsafe { libc::mkdir(cstring.as_ptr(), 0o700) } != 0 {
let e = io::Error::last_os_error();
if e.kind() != io::ErrorKind::AlreadyExists {
return Err(e.into());
}
}
let fd = unsafe { libc::open(cstring.as_ptr(), libc::O_DIRECTORY | libc::O_RDONLY, 0) };
if fd < 0 {
return Err(io::Error::last_os_error().into());
@ -111,12 +116,19 @@ impl Fd {
}
impl SampleFileDir {
pub fn new(path: &str, db: Arc<db::Database>) -> Result<Arc<SampleFileDir>, Error> {
let fd = Fd::open(path)
pub fn open(path: &str) -> Result<Arc<SampleFileDir>, Error> {
SampleFileDir::open_self(path, false)
}
pub fn create(path: &str) -> Result<Arc<SampleFileDir>, Error> {
SampleFileDir::open_self(path, true)
}
fn open_self(path: &str, create: bool) -> Result<Arc<SampleFileDir>, Error> {
let fd = Fd::open(path, create)
.map_err(|e| Error::new(format!("unable to open sample file dir {}: {}", path, e)))?;
Ok(Arc::new(SampleFileDir{
db: db,
fd: fd,
Ok(Arc::new(SampleFileDir {
fd,
mutable: Mutex::new(SharedMutableState{
next_uuid: None,
}),
@ -135,8 +147,9 @@ impl SampleFileDir {
///
/// The new recording will continue from `prev` if specified; this should be as returned from
/// a previous `close` call.
pub fn create_writer<'a>(&self, channel: &'a SyncerChannel, prev: Option<PreviousWriter>,
camera_id: i32, video_sample_entry_id: i32)
pub fn create_writer<'a>(&self, db: &db::Database, channel: &'a SyncerChannel,
prev: Option<PreviousWriter>, camera_id: i32,
video_sample_entry_id: i32)
-> Result<Writer<'a>, Error> {
// Grab the next uuid. Typically one is cached—a sync has usually completed since the last
// writer was created, and syncs ensure `next_uuid` is filled while performing their
@ -145,8 +158,8 @@ impl SampleFileDir {
Some(u) => u,
None => {
info!("Committing extra transaction because there's no cached uuid");
let mut db = self.db.lock();
let mut tx = db.tx()?;
let mut l = db.lock();
let mut tx = l.tx()?;
let u = tx.reserve_sample_file()?;
tx.commit()?;
u
@ -206,6 +219,7 @@ impl SampleFileDir {
}
/// State shared between users of the `SampleFileDirectory` struct and the syncer.
#[derive(Debug)]
struct SharedMutableState {
next_uuid: Option<Uuid>,
}
@ -227,11 +241,15 @@ pub struct SyncerChannel(mpsc::Sender<SyncerCommand>);
/// State of the worker thread.
struct Syncer {
dir: Arc<SampleFileDir>,
db: Arc<db::Database>,
to_unlink: Vec<Uuid>,
to_mark_deleted: Vec<Uuid>,
}
/// Starts a syncer for the given sample file directory.
///
/// The lock must not be held on `db` when this is called.
///
/// There should be only one syncer per directory, or 0 if operating in read-only mode.
/// This function will perform the initial rotation synchronously, so that it is finished before
/// file writing starts. Afterward the syncing happens in a background thread.
@ -239,13 +257,14 @@ struct Syncer {
/// Returns a `SyncerChannel` which can be used to send commands (and can be cloned freely) and
/// a `JoinHandle` for the syncer thread. At program shutdown, all `SyncerChannel` clones should be
/// removed and then the handle joined to allow all recordings to be persisted.
pub fn start_syncer(dir: Arc<SampleFileDir>)
pub fn start_syncer(dir: Arc<SampleFileDir>, db: Arc<db::Database>)
-> Result<(SyncerChannel, thread::JoinHandle<()>), Error> {
let to_unlink = dir.db.lock().list_reserved_sample_files()?;
let to_unlink = db.lock().list_reserved_sample_files()?;
let (snd, rcv) = mpsc::channel();
let mut syncer = Syncer {
dir: dir,
to_unlink: to_unlink,
dir,
db,
to_unlink,
to_mark_deleted: Vec::new(),
};
syncer.initial_rotation()?;
@ -261,11 +280,13 @@ pub struct NewLimit {
/// Deletes recordings if necessary to fit within the given new `retain_bytes` limit.
/// Note this doesn't change the limit in the database; it only deletes files.
/// Pass a limit of 0 to delete all recordings associated with a camera.
pub fn lower_retention(dir: Arc<SampleFileDir>, limits: &[NewLimit]) -> Result<(), Error> {
let to_unlink = dir.db.lock().list_reserved_sample_files()?;
pub fn lower_retention(dir: Arc<SampleFileDir>, db: Arc<db::Database>, limits: &[NewLimit])
-> Result<(), Error> {
let to_unlink = db.lock().list_reserved_sample_files()?;
let mut syncer = Syncer {
dir: dir,
to_unlink: to_unlink,
dir,
db,
to_unlink,
to_mark_deleted: Vec::new(),
};
syncer.do_rotation(|db| {
@ -357,7 +378,7 @@ impl Syncer {
fn do_rotation<F>(&mut self, get_rows_to_delete: F) -> Result<(), Error>
where F: FnOnce(&db::LockedDatabase) -> Result<Vec<db::ListOldestSampleFilesRow>, Error> {
let to_delete = {
let mut db = self.dir.db.lock();
let mut db = self.db.lock();
let to_delete = get_rows_to_delete(&*db)?;
let mut tx = db.tx()?;
tx.delete_recordings(&to_delete)?;
@ -374,7 +395,7 @@ impl Syncer {
}
self.dir.sync()?;
{
let mut db = self.dir.db.lock();
let mut db = self.db.lock();
let mut tx = db.tx()?;
tx.mark_sample_files_deleted(&self.to_mark_deleted)?;
tx.commit()?;
@ -413,7 +434,7 @@ impl Syncer {
let mut to_delete = Vec::new();
let mut l = self.dir.mutable.lock().unwrap();
let mut db = self.dir.db.lock();
let mut db = self.db.lock();
let mut new_next_uuid = l.next_uuid;
{
let stream =

View File

@ -776,7 +776,8 @@ impl FileBuilder {
}
/// Builds the `File`, consuming the builder.
pub fn build(mut self, db: Arc<db::Database>, dir: Arc<dir::SampleFileDir>)
pub fn build(mut self, db: Arc<db::Database>,
dirs_by_stream_id: Arc<::fnv::FnvHashMap<i32, Arc<dir::SampleFileDir>>>)
-> Result<File, Error> {
let mut max_end = None;
let mut etag = hash::Hasher::new(hash::MessageDigest::sha1())?;
@ -876,7 +877,7 @@ impl FileBuilder {
::std::time::Duration::from_secs(max_end as u64);
Ok(File(Arc::new(FileInner {
db,
dir,
dirs_by_stream_id,
segments: self.segments,
slices: self.body.slices,
buf: self.body.buf,
@ -1418,7 +1419,7 @@ impl BodyState {
struct FileInner {
db: Arc<db::Database>,
dir: Arc<dir::SampleFileDir>,
dirs_by_stream_id: Arc<::fnv::FnvHashMap<i32, Arc<dir::SampleFileDir>>>,
segments: Vec<Segment>,
slices: Slices<Slice>,
buf: Vec<u8>,
@ -1452,10 +1453,15 @@ impl FileInner {
fn get_video_sample_data(&self, i: usize, r: Range<u64>) -> Result<Chunk, Error> {
let s = &self.segments[i];
let uuid = {
self.db.lock().with_recording_playback(s.s.stream_id, s.s.recording_id,
|p| Ok(p.sample_file_uuid))?
let l = self.db.lock();
l.with_recording_playback(s.s.stream_id, s.s.recording_id,
|p| Ok(p.sample_file_uuid))?
};
let f = self.dir.open_sample_file(uuid)?;
let f = self.dirs_by_stream_id
.get(&s.s.stream_id)
.ok_or_else(|| Error::new(format!("{}/{}: stream not found",
s.s.stream_id, s.s.recording_id)))?
.open_sample_file(uuid)?;
let start = s.s.sample_file_range().start + r.start;
let mmap = Box::new(unsafe {
memmap::MmapOptions::new()
@ -1525,8 +1531,6 @@ impl http_serve::Entity for File {
#[cfg(test)]
mod tests {
use byteorder::{BigEndian, ByteOrder};
use db;
use dir;
use futures::Future;
use futures::Stream as FuturesStream;
use hyper::header;
@ -1536,7 +1540,6 @@ mod tests {
use std::fs;
use std::ops::Range;
use std::path::Path;
use std::sync::Arc;
use std::str;
use strutil;
use super::*;
@ -1764,8 +1767,9 @@ mod tests {
let video_sample_entry_id = db.db.lock().insert_video_sample_entry(
extra_data.width, extra_data.height, extra_data.sample_entry,
extra_data.rfc6381_codec).unwrap();
let mut output = db.dir.create_writer(&db.syncer_channel, None,
TEST_STREAM_ID, video_sample_entry_id).unwrap();
let dir = db.dirs_by_stream_id.get(&TEST_STREAM_ID).unwrap();
let mut output = dir.create_writer(&db.db, &db.syncer_channel, None,
TEST_STREAM_ID, video_sample_entry_id).unwrap();
// end_pts is the pts of the end of the most recent frame (start + duration).
// It's needed because dir::Writer calculates a packet's duration from its pts and the
@ -1792,13 +1796,13 @@ mod tests {
db.syncer_channel.flush();
}
pub fn create_mp4_from_db(db: Arc<db::Database>, dir: Arc<dir::SampleFileDir>,
pub fn create_mp4_from_db(tdb: &TestDb,
skip_90k: i32, shorten_90k: i32, include_subtitles: bool) -> File {
let mut builder = FileBuilder::new(Type::Normal);
builder.include_timestamp_subtitle_track(include_subtitles);
let all_time = recording::Time(i64::min_value()) .. recording::Time(i64::max_value());
{
let db = db.lock();
let db = tdb.db.lock();
db.list_recordings_by_time(TEST_STREAM_ID, all_time, |r| {
let d = r.duration_90k;
assert!(skip_90k + shorten_90k < d);
@ -1806,7 +1810,7 @@ mod tests {
Ok(())
}).unwrap();
}
builder.build(db, dir).unwrap()
builder.build(tdb.db.clone(), tdb.dirs_by_stream_id.clone()).unwrap()
}
fn write_mp4(mp4: &File, dir: &Path) -> String {
@ -1879,7 +1883,7 @@ mod tests {
duration_so_far += row.duration_90k;
builder.append(&db.db.lock(), row, d_start .. d_end).unwrap();
}
builder.build(db.db.clone(), db.dir.clone()).unwrap()
builder.build(db.db.clone(), db.dirs_by_stream_id.clone()).unwrap()
}
/// Tests sample table for a simple video index of all sync frames.
@ -2104,7 +2108,7 @@ mod tests {
testutil::init();
let db = TestDb::new();
copy_mp4_to_db(&db);
let mp4 = create_mp4_from_db(db.db.clone(), db.dir.clone(), 0, 0, false);
let mp4 = create_mp4_from_db(&db, 0, 0, false);
let new_filename = write_mp4(&mp4, db.tmpdir.path());
compare_mp4s(&new_filename, 0, 0);
@ -2124,7 +2128,7 @@ mod tests {
testutil::init();
let db = TestDb::new();
copy_mp4_to_db(&db);
let mp4 = create_mp4_from_db(db.db.clone(), db.dir.clone(), 0, 0, true);
let mp4 = create_mp4_from_db(&db, 0, 0, true);
let new_filename = write_mp4(&mp4, db.tmpdir.path());
compare_mp4s(&new_filename, 0, 0);
@ -2144,7 +2148,7 @@ mod tests {
testutil::init();
let db = TestDb::new();
copy_mp4_to_db(&db);
let mp4 = create_mp4_from_db(db.db.clone(), db.dir.clone(), 1, 0, false);
let mp4 = create_mp4_from_db(&db, 1, 0, false);
let new_filename = write_mp4(&mp4, db.tmpdir.path());
compare_mp4s(&new_filename, 1, 0);
@ -2164,7 +2168,7 @@ mod tests {
testutil::init();
let db = TestDb::new();
copy_mp4_to_db(&db);
let mp4 = create_mp4_from_db(db.db.clone(), db.dir.clone(), 0, 1, false);
let mp4 = create_mp4_from_db(&db, 0, 1, false);
let new_filename = write_mp4(&mp4, db.tmpdir.path());
compare_mp4s(&new_filename, 0, 1);
@ -2212,7 +2216,7 @@ mod bench {
fn new() -> BenchServer {
let db = TestDb::new();
testutil::add_dummy_recordings_to_db(&db.db, 60);
let mp4 = create_mp4_from_db(db.db.clone(), db.dir.clone(), 0, 0, false);
let mp4 = create_mp4_from_db(&db, 0, 0, false);
let p = mp4.0.initial_sample_byte_pos;
let (tx, rx) = ::std::sync::mpsc::channel();
::std::thread::spawn(move || {
@ -2306,7 +2310,7 @@ mod bench {
let db = TestDb::new();
testutil::add_dummy_recordings_to_db(&db.db, 60);
b.iter(|| {
create_mp4_from_db(db.db.clone(), db.dir.clone(), 0, 0, false);
create_mp4_from_db(&db, 0, 0, false);
});
}
}

View File

@ -45,6 +45,12 @@ create table version (
notes text
);
create table sample_file_dir (
id integer primary key,
path text unique not null,
uuid blob unique not null check (length(uuid) = 16)
);
create table camera (
id integer primary key,
uuid blob unique not null check (length(uuid) = 16),
@ -69,6 +75,7 @@ create table camera (
create table stream (
id integer primary key,
camera_id integer not null references camera (id),
sample_file_dir_id integer references sample_file_dir (id),
type text not null check (type in ('main', 'sub')),
-- If record is true, the stream should start recording when moonfire

View File

@ -47,7 +47,6 @@ pub struct Environment<'a, 'b, C, S> where C: 'a + Clocks, S: 'a + stream::Strea
pub clocks: &'a C,
pub opener: &'a stream::Opener<S>,
pub db: &'b Arc<Database>,
pub dir: &'b Arc<dir::SampleFileDir>,
pub shutdown: &'b Arc<AtomicBool>,
}
@ -76,7 +75,8 @@ struct WriterState<'a> {
}
impl<'a, C, S> Streamer<'a, C, S> where C: 'a + Clocks, S: 'a + stream::Stream {
pub fn new<'b>(env: &Environment<'a, 'b, C, S>, syncer_channel: dir::SyncerChannel,
pub fn new<'b>(env: &Environment<'a, 'b, C, S>, dir: Arc<dir::SampleFileDir>,
syncer_channel: dir::SyncerChannel,
stream_id: i32, c: &Camera, s: &Stream, rotate_offset_sec: i64,
rotate_interval_sec: i64) -> Self {
Streamer {
@ -84,7 +84,7 @@ impl<'a, C, S> Streamer<'a, C, S> where C: 'a + Clocks, S: 'a + stream::Stream {
rotate_offset_sec: rotate_offset_sec,
rotate_interval_sec: rotate_interval_sec,
db: env.db.clone(),
dir: env.dir.clone(),
dir,
syncer_channel: syncer_channel,
clocks: env.clocks,
opener: env.opener,
@ -167,8 +167,8 @@ impl<'a, C, S> Streamer<'a, C, S> where C: 'a + Clocks, S: 'a + stream::Stream {
let r = r + if prev.is_none() { self.rotate_interval_sec } else { 0 };
let _t = TimerGuard::new(self.clocks, || "creating writer");
let w = self.dir.create_writer(&self.syncer_channel, prev, self.stream_id,
video_sample_entry_id)?;
let w = self.dir.create_writer(&self.db, &self.syncer_channel, prev,
self.stream_id, video_sample_entry_id)?;
WriterState{
writer: w,
rotate: r,
@ -351,7 +351,6 @@ mod tests {
clocks: &clocks,
opener: &opener,
db: &db.db,
dir: &db.dir,
shutdown: &opener.shutdown,
};
let mut stream;
@ -359,8 +358,9 @@ mod tests {
let l = db.db.lock();
let camera = l.cameras_by_id().get(&testutil::TEST_CAMERA_ID).unwrap();
let s = l.streams_by_id().get(&testutil::TEST_STREAM_ID).unwrap();
stream = super::Streamer::new(&env, db.syncer_channel.clone(), testutil::TEST_STREAM_ID,
camera, s, 0, 3);
let dir = db.dirs_by_stream_id.get(&testutil::TEST_STREAM_ID).unwrap().clone();
stream = super::Streamer::new(&env, dir, db.syncer_channel.clone(),
testutil::TEST_STREAM_ID, camera, s, 0, 3);
}
stream.run();
assert!(opener.streams.lock().unwrap().is_empty());

View File

@ -32,11 +32,12 @@ extern crate tempdir;
use db;
use dir;
use fnv::FnvHashMap;
use mylog;
use recording::{self, TIME_UNITS_PER_SEC};
use rusqlite;
use std::env;
use std::sync;
use std::sync::{self, Arc};
use std::thread;
use time;
use uuid::Uuid;
@ -64,8 +65,8 @@ pub fn init() {
}
pub struct TestDb {
pub db: sync::Arc<db::Database>,
pub dir: sync::Arc<dir::SampleFileDir>,
pub db: Arc<db::Database>,
pub dirs_by_stream_id: Arc<FnvHashMap<i32, Arc<dir::SampleFileDir>>>,
pub syncer_channel: dir::SyncerChannel,
pub syncer_join: thread::JoinHandle<()>,
pub tmpdir: tempdir::TempDir,
@ -80,32 +81,42 @@ impl TestDb {
let conn = rusqlite::Connection::open_in_memory().unwrap();
let schema = include_str!("schema.sql");
conn.execute_batch(schema).unwrap();
let db = sync::Arc::new(db::Database::new(conn).unwrap());
let test_camera_uuid;
let db = Arc::new(db::Database::new(conn).unwrap());
let (test_camera_uuid, sample_file_dir_id);
let path = tmpdir.path().to_str().unwrap().to_owned();
let dir;
{
let mut l = db.lock();
assert_eq!(TEST_CAMERA_ID, l.add_camera(db::CameraChange {
short_name: "test camera".to_owned(),
description: "".to_owned(),
host: "test-camera".to_owned(),
username: "foo".to_owned(),
password: "bar".to_owned(),
rtsp_paths: [
"/main".to_owned(),
"/sub".to_owned(),
],
}).unwrap());
test_camera_uuid = l.cameras_by_id().get(&TEST_CAMERA_ID).unwrap().uuid;
let mut tx = l.tx().unwrap();
tx.update_retention(TEST_STREAM_ID, true, 1048576).unwrap();
tx.commit().unwrap();
{
sample_file_dir_id = l.add_sample_file_dir(path.to_owned()).unwrap();
assert_eq!(TEST_CAMERA_ID, l.add_camera(db::CameraChange {
short_name: "test camera".to_owned(),
description: "".to_owned(),
host: "test-camera".to_owned(),
username: "foo".to_owned(),
password: "bar".to_owned(),
streams: [
db::StreamChange {
sample_file_dir_id: Some(sample_file_dir_id),
rtsp_path: "/main".to_owned(),
record: true,
},
Default::default(),
],
}).unwrap());
test_camera_uuid = l.cameras_by_id().get(&TEST_CAMERA_ID).unwrap().uuid;
let mut tx = l.tx().unwrap();
tx.update_retention(TEST_STREAM_ID, true, 1048576).unwrap();
tx.commit().unwrap();
}
dir = l.sample_file_dirs_by_id().get(&sample_file_dir_id).unwrap().open().unwrap();
}
let path = tmpdir.path().to_str().unwrap().to_owned();
let dir = dir::SampleFileDir::new(&path, db.clone()).unwrap();
let (syncer_channel, syncer_join) = dir::start_syncer(dir.clone()).unwrap();
let mut dirs_by_stream_id = FnvHashMap::default();
dirs_by_stream_id.insert(TEST_STREAM_ID, dir.clone());
let (syncer_channel, syncer_join) = dir::start_syncer(dir, db.clone()).unwrap();
TestDb {
db,
dir,
dirs_by_stream_id: Arc::new(dirs_by_stream_id),
syncer_channel,
syncer_join,
tmpdir,

View File

@ -35,6 +35,7 @@ use core::str::FromStr;
use db;
use dir::SampleFileDir;
use error::Error;
use fnv::FnvHashMap;
use futures::{future, stream};
use futures_cpupool;
use json;
@ -183,7 +184,7 @@ struct UiFile {
struct ServiceInner {
db: Arc<db::Database>,
dir: Arc<SampleFileDir>,
dirs_by_stream_id: Arc<FnvHashMap<i32, Arc<SampleFileDir>>>,
ui_files: HashMap<String, UiFile>,
pool: futures_cpupool::CpuPool,
time_zone_name: String,
@ -286,7 +287,7 @@ impl ServiceInner {
for ent in db.video_sample_entries() {
if ent.sha1 == sha1 {
builder.append_video_sample_entry(ent.clone());
let mp4 = builder.build(self.db.clone(), self.dir.clone())?;
let mp4 = builder.build(self.db.clone(), self.dirs_by_stream_id.clone())?;
return Ok(http_serve::serve(mp4, req));
}
}
@ -381,7 +382,7 @@ impl ServiceInner {
}
};
}
let mp4 = builder.build(self.db.clone(), self.dir.clone())?;
let mp4 = builder.build(self.db.clone(), self.dirs_by_stream_id.clone())?;
Ok(http_serve::serve(mp4, req))
}
@ -400,16 +401,31 @@ impl ServiceInner {
pub struct Service(Arc<ServiceInner>);
impl Service {
pub fn new(db: Arc<db::Database>, dir: Arc<SampleFileDir>, ui_dir: Option<&str>, zone: String)
-> Result<Self, Error> {
pub fn new(db: Arc<db::Database>, ui_dir: Option<&str>, zone: String) -> Result<Self, Error> {
let mut ui_files = HashMap::new();
if let Some(d) = ui_dir {
Service::fill_ui_files(d, &mut ui_files);
}
debug!("UI files: {:#?}", ui_files);
let dirs_by_stream_id = {
let l = db.lock();
let mut d =
FnvHashMap::with_capacity_and_hasher(l.streams_by_id().len(), Default::default());
for (&id, s) in l.streams_by_id().iter() {
let dir_id = match s.sample_file_dir_id {
Some(d) => d,
None => continue,
};
d.insert(id, l.sample_file_dirs_by_id()
.get(&dir_id)
.unwrap()
.open()?);
}
Arc::new(d)
};
Ok(Service(Arc::new(ServiceInner {
db,
dir,
dirs_by_stream_id,
ui_files,
pool: futures_cpupool::Builder::new().pool_size(1).name_prefix("static").create(),
time_zone_name: zone,
@ -535,8 +551,7 @@ mod bench {
let (tx, rx) = ::std::sync::mpsc::channel();
::std::thread::spawn(move || {
let addr = "127.0.0.1:0".parse().unwrap();
let (db, dir) = (db.db.clone(), db.dir.clone());
let service = super::Service::new(db.clone(), dir.clone(), None, "".to_owned()).unwrap();
let service = super::Service::new(db.db.clone(), None, "".to_owned()).unwrap();
let server = hyper::server::Http::new()
.bind(&addr, move || Ok(service.clone()))
.unwrap();