atomic_store/
version_sync.rs#![allow(clippy::mutex_atomic)]
use crate::storage_location::StorageLocation;
use crate::Result;
use std::{
sync::{Arc, Condvar, Mutex},
time::{Duration, Instant},
};
#[derive(Debug)]
pub struct VersionSyncHandle {
last_version_location: Option<StorageLocation>,
next_version_location: Option<StorageLocation>,
version_pending: Arc<(Mutex<bool>, Condvar)>,
_resource_key: String,
}
impl VersionSyncHandle {
pub fn new(key: &str, last_version_location: Option<StorageLocation>) -> VersionSyncHandle {
VersionSyncHandle {
last_version_location,
next_version_location: last_version_location,
version_pending: Arc::new((Mutex::new(false), Condvar::new())),
_resource_key: key.to_string(),
}
}
pub fn last_location(&self) -> &Option<StorageLocation> {
&self.last_version_location
}
pub fn start_version(&mut self) -> Result<()> {
let (mtx, _) = &*self.version_pending;
let mut version_ready = mtx.lock()?;
*version_ready = false;
Ok(())
}
pub fn advance_next(&mut self, next_version_location: Option<StorageLocation>) {
self.next_version_location = next_version_location;
}
pub fn update_version(&mut self) -> Result<()> {
let (mtx, cv) = &*self.version_pending;
let mut version_ready = mtx.lock()?;
if !*version_ready {
self.last_version_location = self.next_version_location;
*version_ready = true;
cv.notify_one();
}
Ok(())
}
pub fn skip_version(&mut self) -> Result<()> {
let (mtx, cv) = &*self.version_pending;
let mut version_ready = mtx.lock()?;
if !*version_ready {
*version_ready = true;
cv.notify_one();
}
Ok(())
}
pub fn revert_version(&mut self) -> Result<()> {
let (mtx, _cv) = &*self.version_pending;
let _version_ready = mtx.lock()?;
self.next_version_location = self.last_version_location;
Ok(())
}
pub fn wait_for_version_with_timeout(&self, timeout: Duration) -> Result<()> {
let version_pending = Arc::clone(&self.version_pending);
let (mtx, cv) = &*version_pending;
let mut version_ready = mtx.lock()?;
let start_time = Instant::now();
while !*version_ready {
let elapsed = start_time.elapsed();
if elapsed >= timeout {
return Err(crate::error::PersistenceError::TimedOut);
}
let remaining_time = timeout - elapsed;
let (guard, result) = cv.wait_timeout(version_ready, remaining_time)?;
if result.timed_out() {
return Err(crate::error::PersistenceError::TimedOut);
}
version_ready = guard;
}
Ok(())
}
}