Comments (20)
There's no reason other than that I haven't needed it and no one's asked for it yet. We could add an option to the macro #[cached(rwlock)]
and have it use https://docs.rs/async-rwlock/1.3.0/async_rwlock/ instead of the async-mutex. I'm not sure exactly how you're using this, but it's good to keep in mind that if your cached function is going to be called frequently with new values (that would cause "writes"), then a rwlock may increase the latency of your readers (calls with arguments that are cached), as opposed to read and write being equally "not the fastest" with a regular mutex: https://blog.nelhage.com/post/rwlock-contention/
from cached.
Ah.. you know what... I forgot the reason a RwLock won't work is because of the signature
/// Attempt to retrieve a cached value
fn cache_get(&mut self, k: &K) -> Option<&V>;
which needs to be able to mutate the cache to update the hit rate statistics, and in some cases (LRU and Timed) modify the internal data of the cache on reads.
from cached.
Since a RwLock (correctly) prevents you from mutating on reads, the only scenario where you could use a RwLock is if the backing cache is a plain HashMap with no extra features. So in order to support using a RwLock, the cached
macro would either need to restrict you to only using a HashMap as the cache-type, or introducing a second trait CachedPlain
which has only
fn cache_get(&self, k: &K) -> Option<&V>;
fn cache_set(&mut self, k: K, v: V) -> Option<V>;
and calling the differing trait methods cached::Cached::cache_get(&mut cache, &key)
vs cached::CachedPlain::cache_get(&cache, &key)
depending on the macro arguments (rwlock = true
).
Note, this would mean you can't use fancier cache stores like the SizedCache
/ TimedCache
/ SizedTimedCache
with a #[cached(rwlock = true)]
since they require mutating their data on reads.
from cached.
we are actually going to use it without parameters, see bellow:
async fn get_new_token() -> Result<VapToken> {
let req_body = format!(
r#"
{{
"strategy": "local",
"email": "{email}",
"password": "{password}"
}}
"#,
email = VAP_AUTH.svc_acc_usr,
password = VAP_AUTH.svc_acc_pwd,
);
let mut headers = HeaderMap::new();
headers.insert("Content-Type", HeaderValue::from_str("application/json")?);
trace!("/vapapi/authentication/v1 body {}", req_body);
let resp = HTTP_CLIENT
.post(&format!(
"{}/vapapi/authentication/v1",
VAP_AUTH.vap_base_url
))
.body(req_body)
.headers(headers)
.send()
.await?;
let status = resp.status();
debug!("status: {}", status);
let resp = resp.text().await?; // extract body as string
eval_status_code_reqwest!(
status,
reqwest::StatusCode::CREATED,
resp,
"error when calling get_new_token"
);
Ok(serde_json::from_str(&resp)?)
}
/// Returns VAP bearer token as a string
/// Result is cached/memoized for 60*60*23 seconds, i.e. 23 hours
/// which leaves one hour safety buffer (VAP token validity is 24 hours)
/// cache only Ok values, not errors
#[cached(size = 1, time = 82800, result = true)]
pub async fn get_token() -> Result<String> {
let new_token = get_new_token().await?;
Ok(new_token.access_token)
}
get_token (no input params, the relevant values - fixed URL + service account credentials - are taken from config file) will retrieve token via REST call and cache for 23 hours. So while we will be reading very frequently, writes will be very unfrequent. Hence RwLock would make perfect sense for our use case, even if this would limit us in available cache stores types (we don't need anything fancy here, in fact we are caching single value).
U quickly went through RWLock contention article pasted above but I don't think this is our case, we do not call with different params, readers will be extremely quick. All we need to do is to cache single value for 23 hours.
from cached.
Ah I see. I don’t think I’ll have time to update the macro this week, so in the meantime you could emulate what the macro would be doing (and have a little more control over stopping a stampede):
disclaimer, I didn’t try compiling this
lazy_static! {
static ref CACHE: Arc<async_rwlock::RwLock<Vec<i64, String>>> = Arc::new(RwLock::new(Vec::with_capacity(1)));
}
async fn get_token() -> Result<String> {
{
let token = CACHE.read().await.get(0);
if let Some((created_ts, token)) = token {
if (now - created_ts) < 82800 { return Ok(token) }
}
}
let mut store = CACHE.write().await;
let already_set = store.get(0).map(|(created_ts, _)| (now - created_ts) < 82800).unwrap_or(false);
if already_set {
Ok(store[0].1.clone())
else {
let token = get_new_token().await?;
store.clear();
store.push((now, token.clone()));
Ok(token)
}
}
from cached.
Thank you James for your quick support! I will implement workaround for now and looking forward updated macro.
cheers,
Adam
from cached.
I did actually compile like this:
lazy_static! {
static ref CACHE: Arc<async_rwlock::RwLock<Vec<(u128, String)>>> =
Arc::new(RwLock::new(Vec::with_capacity(1)));
}
fn time_now() -> u128 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis()
}
async fn get_token() -> Result<String> {
{
let token = CACHE.read().await;
if let Some((created_ts, token)) = token.get(0) {
if (time_now() - *created_ts) < 82800 {
return Ok(String::from(token));
}
}
}
let mut store = CACHE.write().await;
let already_set = store
.get(0)
.map(|(created_ts, _)| (time_now() - created_ts) < 82800)
.unwrap_or(false);
if already_set {
return Ok(store[0].1.clone());
} else {
let token = get_new_token().await?;
store.clear();
store.push((time_now(), token.access_token.to_owned()));
Ok(token.access_token)
}
}
seems to be working but I think get_token it could be actually reduced to this
async fn get_token() -> Result<String> {
{
let token = CACHE.read().await;
if let Some((created_ts, token)) = token.get(0) {
if (time_now() - *created_ts) < 82800 {
return Ok(String::from(token));
}
}
}
let mut store = CACHE.write().await;
let token = get_new_token().await?;
store.clear();
store.push((time_now(), token.access_token.to_owned()));
Ok(token.access_token)
}
Also it seems (do not have hard data to support this, just subjective feeling from initial testing) that tokio rwlock is much faster:
static ref CACHE: Arc<tokio::sync::RwLock<Vec<(u128, String)>>> =
Arc::new(tokio::sync::RwLock::new(Vec::with_capacity(1)));
from cached.
Nice. Yeah, the already_set
bit isn't required, but it would prevent multiple concurrent calls from calling get_new_token
when the token is missing or expired. Without it there may be multiple callers seeing that it's expired and then all queuing up to get write access, and then getting a new token even if something else just did so.
I haven't compared async_rwlock vs tokio, but that may be the case!
from cached.
Maybe consider using dashmap?
It is designed to be a high-performance hash map designed for concurrency.
I would say it actually fit quite well with caching.
from cached.
There’s also another crate left_right that also provides better concurrency than RwLock<HashMap>
.
It is designed to be make reading scale linearly and not blocked by writing while making the writing slower, which IMHO fits in the pattern of caching, where cache hit should be as fast as possible while cache miss can be slower.
Edit:
It seems left_right suggests to use its high-level wrapper evmap instead of left_right directly.
Edit2:
concread seems to provide data structures works similar to evmap, but in additional to HashMap
, it also provides BTreeMap
and Adaptive Replacement Cache, the later might be more interesting than HashMap
.
from cached.
hi,
any progress on this issue? Is it possible to implement this RwLock based cache as discussed?
from cached.
@adambezecny I'll take another look at this tonight and add a #[once]
macro that implements the RwLock
(only caching a single value) version we were chatting about a while ago
from cached.
@adambezecny A #[once]
macro is now available in 0.26.0
0e36dbd
from cached.
hi James,
so to cache token for 23 hours I should use:
#[once(size = 1, time = 82800, result = true, sync_writes = true)]
pub async fn get_token() -> Result<String> {
let new_token = get_new_token().await?;
Ok(new_token.access_token)
}
This will use RwLock, i.e. all subsequent invocations within 23 hours interval will allow parallel reads (without getting lock). is that correct? Do I understand it correctly that it is sync_writes = true which causes to synchronize/lock write operations only? I.e. without *sync_writes = true Mutex is used, with *sync_writes = true RwLock is used? Correct?
regards,
Adam
from cached.
once
will always use a RwLock
, but when sync_writes = true
it will try to acquire the write-lock before executing the function that returns the value to cache (and will check if a valid cached value exists before executing) to prevent concurrent calls from executing the same logic. When sync_writes = false
(default), the write-lock is acquired after executing, only wrapping the setting of the cache.
once
also implies only one cached value, so there's no size
parameter for this one. The arguments you pass to the function are only used when there's no cached value or the cache has expired, and are otherwise ignored.
Concurrent calls will only do a RwLock::read
until the cached value has "expired" (then acquiring a write-lock), so reads should be parallel for the majority of calls.
from cached.
ok, got it. so sync_writes = true basically does same above discussed section in manual code:
let already_set = store
.get(0)
.map(|(created_ts, _)| (time_now() - created_ts) < 82800)
.unwrap_or(false);
if already_set {
return Ok(store[0].1.clone());
} else {...
i.e. when there is no cached value and we need to get write lock (+ retrieve token and cache it) we make sure we get it only once while remaining callers will get cached value already. So final code:
#[once(time = 82800, result = true, sync_writes = true)]
pub async fn get_token() -> Result<String> {
let new_token = get_new_token().await?;
Ok(new_token.access_token)
}
from cached.
Yes, correct!
from cached.
hi James,
with following dependencies:
[dependencies]
tokio = { version = "1.7.1", default-features = false, features = ["macros", "time", "rt-multi-thread", "signal", "fs", "rt"] }
cached = "0.26.0"
[dev-dependencies]
criterion = { version = "0.3.5", features = ["async_tokio", "html_reports"] }
[[bench]]
name = "benchmarks"
harness = false
I ran this criterion bechmarks:
use criterion::{criterion_group, criterion_main, Criterion};
use cached::proc_macro::once;
use lazy_static::lazy_static;
use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};
use va_voice_gateway_rs::errors::Result;
use va_voice_gateway_rs::nlp::vap::{VapToken, VapTokenAuth, VapTokenUser};
lazy_static! {
static ref CACHE: Arc<tokio::sync::RwLock<Vec<(u128, String)>>> =
Arc::new(tokio::sync::RwLock::new(Vec::with_capacity(1)));
}
/// dummy function for getting new token. instead of doing respective http call
/// it simply returns hardcoded value.
async fn get_new_token() -> Result<VapToken> {
Ok(VapToken {
access_token: "123456".to_owned(),
authentication: VapTokenAuth {
strategy: "local".to_owned(),
},
user: VapTokenUser {
user_id: "usr123".to_owned(),
email: "[email protected]".to_owned(),
description: "some desc".to_owned(),
allowed_services: vec!["SvcA".to_owned(), "SvcB".to_owned()],
},
})
}
fn time_now() -> u128 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis()
}
#[once(time = 82800, result = true, sync_writes = true)]
async fn get_token() -> Result<String> {
let new_token = get_new_token().await?;
Ok(new_token.access_token)
}
async fn get_token_manual() -> Result<String> {
{
let token = CACHE.read().await;
if let Some((created_ts, token)) = token.get(0) {
if (time_now() - *created_ts) < 82800 {
return Ok(String::from(token));
}
}
}
let mut store = CACHE.write().await;
// this is needed to prevent multiple writers from getting the lock
// one by one and retrieving token multiple times
let already_set = store
.get(0)
.map(|(created_ts, _)| (time_now() - created_ts) < 82800)
.unwrap_or(false);
if already_set {
Ok(store[0].1.clone())
} else {
let token = get_new_token().await?;
store.clear();
store.push((time_now(), token.access_token.to_owned()));
Ok(token.access_token)
}
}
pub fn benchmark_suite(c: &mut Criterion) {
// create the tokio runtime to be used for the benchmarks
let rt = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap();
c.bench_function("get_token", |b| {
b.to_async(&rt).iter(|| async {
get_token().await.unwrap();
})
});
c.bench_function("get_token_manual", |b| {
b.to_async(&rt).iter(|| async {
get_token_manual().await.unwrap();
})
});
}
criterion_group!(benches, benchmark_suite);
criterion_main!(benches);
Results:
get_token time: [219.76 ns 223.59 ns 227.74 ns]
change: [-1.7765% +0.3829% +2.4452%] (p = 0.73 > 0.05)
No change in performance detected.
Found 5 outliers among 100 measurements (5.00%)
4 (4.00%) high mild
1 (1.00%) high severe
get_token_manual time: [199.78 ns 202.23 ns 204.91 ns]
change: [-2.9261% -0.7982% +1.5430%] (p = 0.48 > 0.05)
No change in performance detected.
Found 9 outliers among 100 measurements (9.00%)
4 (4.00%) high mild
5 (5.00%) high severe
So performance is more-less same (slightly worse) then with manual implementation.
from cached.
I think the difference might be because the macros expand to use async_std
's async_mutex
and async_rwlock
whereas the manual version you have is using tokio::sync::RwLock
from cached.
Sorry to break the flow.
I need a Rwlocked hashmap, and I just found your crate.
The docs kind of imply this is already possible:
[cached] macros are thread-safe with the backing function-cache wrapped in a mutex/rwlock
Alas it's not. So just registering interest. Will try to use a dashmap for now.
from cached.
Related Issues (20)
- Also generate the fresh data function HOT 2
- Cannot call *_no_cache function HOT 3
- `failed to resolve: could not find `async_sync` in `cached`` HOT 2
- Once with sync_writes and async causes deadlocks in tokio.
- Unnecessary `&mut V` with `get_or_set_with` and `try_get_or_set_with` (CachedAsync)
- Feature: Ability to configure (or reconfigure) a SizedCache `size` based on runtime data HOT 4
- 2021 edition?
- Async disk cache HOT 4
- Add helper attribute to ignore arguments HOT 1
- Disk Cache does not persist between runs HOT 1
- Borrowed keys and values for `IOCached::set_cache` HOT 1
- Cache clear operation
- proc_macro: support args which are &T and Option<&T>
- [Question] Thread safety? HOT 3
- Redis cache doesn't support patterns HOT 1
- DiskCache blobs aren't cleaned on overwrite HOT 8
- Why does `DiskCache` need to impl `Display`? HOT 6
- [Discussion] What do you think of removing `async-std` support?
- Cache hit / miss rate metrics when using `cached` procedural macro HOT 3
- Generic parameters not added to no_cache function.
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from cached.