feat(media): retention policies
This commit is contained in:
@@ -1,16 +1,18 @@
|
||||
use std::{collections::BTreeMap, ops::Range, slice::Split};
|
||||
|
||||
use bytesize::ByteSize;
|
||||
use ruma::{api::client::error::ErrorKind, OwnedServerName, ServerName, UserId};
|
||||
use sha2::{digest::Output, Sha256};
|
||||
use tracing::error;
|
||||
|
||||
use crate::{
|
||||
config::{MediaRetentionConfig, MediaRetentionScope},
|
||||
database::KeyValueDatabase,
|
||||
service::{
|
||||
self,
|
||||
media::{BlockedMediaInfo, DbFileMeta},
|
||||
media::{BlockedMediaInfo, Data as _, DbFileMeta, MediaType},
|
||||
},
|
||||
utils, Error, Result,
|
||||
services, utils, Error, Result,
|
||||
};
|
||||
|
||||
impl service::media::Data for KeyValueDatabase {
|
||||
@@ -773,6 +775,140 @@ impl service::media::Data for KeyValueDatabase {
|
||||
|
||||
Ok(false)
|
||||
}
|
||||
|
||||
fn files_to_delete(
|
||||
&self,
|
||||
sha256_digest: &[u8],
|
||||
retention: &MediaRetentionConfig,
|
||||
media_type: MediaType,
|
||||
new_size: u64,
|
||||
) -> Result<Vec<Result<String>>> {
|
||||
// If the file already exists, no space needs to be cleared
|
||||
if self.filehash_metadata.get(sha256_digest)?.is_some() {
|
||||
return Ok(Vec::new());
|
||||
}
|
||||
|
||||
let scoped_space = |scope| retention.scoped.get(&scope).and_then(|policy| policy.space);
|
||||
|
||||
let mut files_to_delete = Vec::new();
|
||||
|
||||
if media_type.is_thumb() {
|
||||
if let Some(mut f) = self.purge_if_necessary(
|
||||
scoped_space(MediaRetentionScope::Thumbnail),
|
||||
|k| self.file_is_thumb(k),
|
||||
&new_size,
|
||||
) {
|
||||
files_to_delete.append(&mut f);
|
||||
}
|
||||
}
|
||||
|
||||
match media_type {
|
||||
MediaType::LocalMedia { thumbnail: _ } => {
|
||||
if let Some(mut f) = self.purge_if_necessary(
|
||||
scoped_space(MediaRetentionScope::Local),
|
||||
|k| self.file_is_local(k).unwrap_or(true),
|
||||
&new_size,
|
||||
) {
|
||||
files_to_delete.append(&mut f);
|
||||
}
|
||||
}
|
||||
MediaType::RemoteMedia { thumbnail: _ } => {
|
||||
if let Some(mut f) = self.purge_if_necessary(
|
||||
scoped_space(MediaRetentionScope::Remote),
|
||||
|k| !self.file_is_local(k).unwrap_or(true),
|
||||
&new_size,
|
||||
) {
|
||||
files_to_delete.append(&mut f);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(mut f) = self.purge_if_necessary(retention.global_space, |_| true, &new_size) {
|
||||
files_to_delete.append(&mut f);
|
||||
}
|
||||
|
||||
Ok(files_to_delete)
|
||||
}
|
||||
|
||||
fn cleanup_time_retention(&self, retention: &MediaRetentionConfig) -> Vec<Result<String>> {
|
||||
let now = utils::secs_since_unix_epoch();
|
||||
|
||||
let should_be_deleted = |k: &[u8], metadata: &FilehashMetadata| {
|
||||
let check_policy = |retention_scope| {
|
||||
if let Some(scoped_retention) = retention.scoped.get(&retention_scope) {
|
||||
if let Some(created_policy) = scoped_retention.created {
|
||||
if now - metadata.creation(k)? > created_policy.as_secs() {
|
||||
return Ok(true);
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(accessed_policy) = scoped_retention.accessed {
|
||||
if now - metadata.last_access(k)? > accessed_policy.as_secs() {
|
||||
return Ok(true);
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(false)
|
||||
};
|
||||
|
||||
if self.file_is_thumb(k) && check_policy(MediaRetentionScope::Thumbnail)? {
|
||||
return Ok(true);
|
||||
}
|
||||
|
||||
if self.file_is_local(k)? {
|
||||
check_policy(MediaRetentionScope::Local)
|
||||
} else {
|
||||
check_policy(MediaRetentionScope::Remote)
|
||||
}
|
||||
};
|
||||
|
||||
let mut files_to_delete = Vec::new();
|
||||
let mut errors_and_hashes = Vec::new();
|
||||
|
||||
for (k, v) in self.filehash_metadata.iter() {
|
||||
match should_be_deleted(&k, &FilehashMetadata::from_vec(v)) {
|
||||
Ok(true) => files_to_delete.push(k),
|
||||
Ok(false) => (),
|
||||
Err(e) => errors_and_hashes.push(Err(e)),
|
||||
}
|
||||
}
|
||||
|
||||
errors_and_hashes.append(&mut self.purge(files_to_delete));
|
||||
|
||||
errors_and_hashes
|
||||
}
|
||||
|
||||
fn update_last_accessed(&self, server_name: &ServerName, media_id: &str) -> Result<()> {
|
||||
let mut key = server_name.as_bytes().to_vec();
|
||||
key.push(0xff);
|
||||
key.extend_from_slice(media_id.as_bytes());
|
||||
|
||||
if let Some(mut meta) = self.servernamemediaid_metadata.get(&key)? {
|
||||
meta.truncate(32);
|
||||
let sha256_digest = meta;
|
||||
|
||||
self.update_last_accessed_filehash(&sha256_digest)
|
||||
} else {
|
||||
// File was probably deleted just as we were fetching it, so nothing to do
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
fn update_last_accessed_filehash(&self, sha256_digest: &[u8]) -> Result<()> {
|
||||
if let Some(mut metadata) = self
|
||||
.filehash_metadata
|
||||
.get(sha256_digest)?
|
||||
.map(FilehashMetadata::from_vec)
|
||||
{
|
||||
metadata.update_last_access();
|
||||
|
||||
self.filehash_metadata
|
||||
.insert(sha256_digest, metadata.value())
|
||||
} else {
|
||||
// File was probably deleted just as we were fetching it, so nothing to do
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl KeyValueDatabase {
|
||||
@@ -930,6 +1066,119 @@ impl KeyValueDatabase {
|
||||
|
||||
self.filehash_metadata.remove(&sha256_digest)
|
||||
}
|
||||
|
||||
fn file_is_local(&self, k: &[u8]) -> Result<bool> {
|
||||
for (k, _) in self.filehash_servername_mediaid.scan_prefix(k.to_vec()) {
|
||||
let mut parts = k
|
||||
.get(32..)
|
||||
.map(|k| k.split(|&b| b == 0xff))
|
||||
.ok_or_else(|| {
|
||||
Error::bad_database("Invalid format of key in filehash_servername_mediaid")
|
||||
})?;
|
||||
|
||||
let Some(server_name) = parts.next() else {
|
||||
return Err(Error::bad_database(
|
||||
"Invalid format of key in filehash_servername_mediaid",
|
||||
));
|
||||
};
|
||||
|
||||
if utils::string_from_bytes(server_name).map_err(|_| {
|
||||
Error::bad_database("Invalid UTF-8 servername in filehash_servername_mediaid")
|
||||
})? == services().globals.server_name().as_str()
|
||||
{
|
||||
return Ok(true);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(false)
|
||||
}
|
||||
|
||||
fn file_is_thumb(&self, k: &[u8]) -> bool {
|
||||
self.filehash_thumbnailid
|
||||
.scan_prefix(k.to_vec())
|
||||
.next()
|
||||
.is_some()
|
||||
&& self
|
||||
.filehash_servername_mediaid
|
||||
.scan_prefix(k.to_vec())
|
||||
.next()
|
||||
.is_none()
|
||||
}
|
||||
|
||||
fn purge_if_necessary(
|
||||
&self,
|
||||
space: Option<ByteSize>,
|
||||
filter: impl Fn(&[u8]) -> bool,
|
||||
new_size: &u64,
|
||||
) -> Option<Vec<Result<String>>> {
|
||||
if let Some(space) = space {
|
||||
let mut candidate_files_to_delete = Vec::new();
|
||||
let mut errors_and_hashes = Vec::new();
|
||||
let mut total_size = 0;
|
||||
|
||||
let parse_value = |k: Vec<u8>, v: &FilehashMetadata| {
|
||||
let last_access = v.last_access(&k)?;
|
||||
let size = v.size(&k)?;
|
||||
Ok((k, last_access, size))
|
||||
};
|
||||
|
||||
for (k, v) in self.filehash_metadata.iter().filter(|(k, _)| filter(k)) {
|
||||
match parse_value(k, &FilehashMetadata::from_vec(v)) {
|
||||
Ok(x) => {
|
||||
total_size += x.2;
|
||||
candidate_files_to_delete.push(x)
|
||||
}
|
||||
Err(e) => errors_and_hashes.push(Err(e)),
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(required_to_delete) = (total_size + *new_size).checked_sub(space.as_u64()) {
|
||||
candidate_files_to_delete.sort_by_key(|(_, last_access, _)| *last_access);
|
||||
candidate_files_to_delete.reverse();
|
||||
|
||||
let mut size_sum = 0;
|
||||
let mut take = candidate_files_to_delete.len();
|
||||
|
||||
for (i, (_, _, file_size)) in candidate_files_to_delete.iter().enumerate() {
|
||||
size_sum += file_size;
|
||||
if size_sum >= required_to_delete {
|
||||
take = i + 1;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
errors_and_hashes.append(
|
||||
&mut self.purge(
|
||||
candidate_files_to_delete
|
||||
.into_iter()
|
||||
.take(take)
|
||||
.map(|(hash, _, _)| hash)
|
||||
.collect(),
|
||||
),
|
||||
);
|
||||
|
||||
Some(errors_and_hashes)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
fn purge(&self, hashes: Vec<Vec<u8>>) -> Vec<Result<String>> {
|
||||
hashes
|
||||
.into_iter()
|
||||
.map(|sha256_digest| {
|
||||
let sha256_hex = hex::encode(&sha256_digest);
|
||||
let is_blocked = self.is_blocked_filehash(&sha256_digest)?;
|
||||
|
||||
self.purge_filehash(sha256_digest, is_blocked)?;
|
||||
|
||||
Ok(sha256_hex)
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
}
|
||||
|
||||
fn parse_metadata(value: &[u8]) -> Result<DbFileMeta> {
|
||||
@@ -994,6 +1243,12 @@ impl FilehashMetadata {
|
||||
Self { value: vec }
|
||||
}
|
||||
|
||||
pub fn update_last_access(&mut self) {
|
||||
let now = utils::secs_since_unix_epoch().to_be_bytes();
|
||||
self.value.truncate(16);
|
||||
self.value.extend_from_slice(&now);
|
||||
}
|
||||
|
||||
pub fn value(&self) -> &[u8] {
|
||||
&self.value
|
||||
}
|
||||
@@ -1025,6 +1280,15 @@ impl FilehashMetadata {
|
||||
})
|
||||
}
|
||||
|
||||
pub fn size(&self, sha256_digest: &[u8]) -> Result<u64> {
|
||||
self.get_u64_val(
|
||||
0..8,
|
||||
"file size",
|
||||
sha256_digest,
|
||||
"Invalid file size in filehash_metadata",
|
||||
)
|
||||
}
|
||||
|
||||
pub fn creation(&self, sha256_digest: &[u8]) -> Result<u64> {
|
||||
self.get_u64_val(
|
||||
8..16,
|
||||
@@ -1033,4 +1297,13 @@ impl FilehashMetadata {
|
||||
"Invalid creation time in filehash_metadata",
|
||||
)
|
||||
}
|
||||
|
||||
pub fn last_access(&self, sha256_digest: &[u8]) -> Result<u64> {
|
||||
self.get_u64_val(
|
||||
16..24,
|
||||
"last access time",
|
||||
sha256_digest,
|
||||
"Invalid last access time in filehash_metadata",
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1103,6 +1103,8 @@ impl KeyValueDatabase {
|
||||
|
||||
services().sending.start_handler();
|
||||
|
||||
services().media.start_time_retention_checker();
|
||||
|
||||
Self::start_cleanup_task().await;
|
||||
if services().globals.allow_check_for_updates() {
|
||||
Self::start_check_for_updates_task();
|
||||
|
||||
Reference in New Issue
Block a user