Git Product home page Git Product logo

Comments (20)

jaemk avatar jaemk commented on July 30, 2024

There's no reason other than that I haven't needed it and no one's asked for it yet. We could add an option to the macro #[cached(rwlock)] and have it use https://docs.rs/async-rwlock/1.3.0/async_rwlock/ instead of the async-mutex. I'm not sure exactly how you're using this, but it's good to keep in mind that if your cached function is going to be called frequently with new values (that would cause "writes"), then a rwlock may increase the latency of your readers (calls with arguments that are cached), as opposed to read and write being equally "not the fastest" with a regular mutex: https://blog.nelhage.com/post/rwlock-contention/

from cached.

jaemk avatar jaemk commented on July 30, 2024

Ah.. you know what... I forgot the reason a RwLock won't work is because of the signature

    /// Attempt to retrieve a cached value
    fn cache_get(&mut self, k: &K) -> Option<&V>;

which needs to be able to mutate the cache to update the hit rate statistics, and in some cases (LRU and Timed) modify the internal data of the cache on reads.

from cached.

jaemk avatar jaemk commented on July 30, 2024

Since a RwLock (correctly) prevents you from mutating on reads, the only scenario where you could use a RwLock is if the backing cache is a plain HashMap with no extra features. So in order to support using a RwLock, the cached macro would either need to restrict you to only using a HashMap as the cache-type, or introducing a second trait CachedPlain which has only

    fn cache_get(&self, k: &K) -> Option<&V>;

    fn cache_set(&mut self, k: K, v: V) -> Option<V>;

and calling the differing trait methods cached::Cached::cache_get(&mut cache, &key) vs cached::CachedPlain::cache_get(&cache, &key) depending on the macro arguments (rwlock = true).

Note, this would mean you can't use fancier cache stores like the SizedCache / TimedCache / SizedTimedCache with a #[cached(rwlock = true)] since they require mutating their data on reads.

from cached.

adambezecny avatar adambezecny commented on July 30, 2024

we are actually going to use it without parameters, see bellow:

async fn get_new_token() -> Result<VapToken> {
    let req_body = format!(
        r#"
                {{
                    "strategy": "local",
                    "email": "{email}",
                    "password": "{password}"
                }}
                "#,
        email = VAP_AUTH.svc_acc_usr,
        password = VAP_AUTH.svc_acc_pwd,
    );

    let mut headers = HeaderMap::new();
    headers.insert("Content-Type", HeaderValue::from_str("application/json")?);

    trace!("/vapapi/authentication/v1 body {}", req_body);

    let resp = HTTP_CLIENT
        .post(&format!(
            "{}/vapapi/authentication/v1",
            VAP_AUTH.vap_base_url
        ))
        .body(req_body)
        .headers(headers)
        .send()
        .await?;

    let status = resp.status();
    debug!("status: {}", status);

    let resp = resp.text().await?; // extract body as string

    eval_status_code_reqwest!(
        status,
        reqwest::StatusCode::CREATED,
        resp,
        "error when calling  get_new_token"
    );

    Ok(serde_json::from_str(&resp)?)
}

/// Returns VAP bearer token as a string
/// Result is cached/memoized for 60*60*23 seconds, i.e. 23 hours
/// which leaves one hour safety buffer (VAP token validity is 24 hours)
/// cache only Ok values, not errors
#[cached(size = 1, time = 82800, result = true)]
pub async fn get_token() -> Result<String> {
    let new_token = get_new_token().await?;
    Ok(new_token.access_token)
}

get_token (no input params, the relevant values - fixed URL + service account credentials - are taken from config file) will retrieve token via REST call and cache for 23 hours. So while we will be reading very frequently, writes will be very unfrequent. Hence RwLock would make perfect sense for our use case, even if this would limit us in available cache stores types (we don't need anything fancy here, in fact we are caching single value).

U quickly went through RWLock contention article pasted above but I don't think this is our case, we do not call with different params, readers will be extremely quick. All we need to do is to cache single value for 23 hours.

from cached.

jaemk avatar jaemk commented on July 30, 2024

Ah I see. I don’t think I’ll have time to update the macro this week, so in the meantime you could emulate what the macro would be doing (and have a little more control over stopping a stampede):

disclaimer, I didn’t try compiling this

lazy_static! {
    static ref CACHE: Arc<async_rwlock::RwLock<Vec<i64, String>>> = Arc::new(RwLock::new(Vec::with_capacity(1)));
}

async fn get_token() -> Result<String> {
    {
        let token = CACHE.read().await.get(0);
        if let Some((created_ts, token)) = token {
            if (now - created_ts) < 82800 { return Ok(token) }
        }
    }
    let mut store = CACHE.write().await;
    let already_set = store.get(0).map(|(created_ts, _)| (now - created_ts) < 82800).unwrap_or(false);
    if already_set {
        Ok(store[0].1.clone())
    else {
        let token = get_new_token().await?;
        store.clear();
        store.push((now, token.clone()));
        Ok(token)
    }
}

from cached.

adambezecny avatar adambezecny commented on July 30, 2024

Thank you James for your quick support! I will implement workaround for now and looking forward updated macro.

cheers,

Adam

from cached.

adambezecny avatar adambezecny commented on July 30, 2024

I did actually compile like this:

lazy_static! {
    static ref CACHE: Arc<async_rwlock::RwLock<Vec<(u128, String)>>> =
        Arc::new(RwLock::new(Vec::with_capacity(1)));
}

fn time_now() -> u128 {
    std::time::SystemTime::now()
        .duration_since(std::time::UNIX_EPOCH)
        .unwrap()
        .as_millis()
}

async fn get_token() -> Result<String> {
    {
        let token = CACHE.read().await;
        if let Some((created_ts, token)) = token.get(0) {
            if (time_now() - *created_ts) < 82800 {
                return Ok(String::from(token));
            }
        }
    }
    let mut store = CACHE.write().await;
    let already_set = store
        .get(0)
        .map(|(created_ts, _)| (time_now() - created_ts) < 82800)
        .unwrap_or(false);
    if already_set {
        return Ok(store[0].1.clone());
    } else {
        let token = get_new_token().await?;
        store.clear();
        store.push((time_now(), token.access_token.to_owned()));
        Ok(token.access_token)
    }
}

seems to be working but I think get_token it could be actually reduced to this

async fn get_token() -> Result<String> {
    {
        let token = CACHE.read().await;
        if let Some((created_ts, token)) = token.get(0) {
            if (time_now() - *created_ts) < 82800 {
                return Ok(String::from(token));
            }
        }
    }
    let mut store = CACHE.write().await;
    let token = get_new_token().await?;
    store.clear();
    store.push((time_now(), token.access_token.to_owned()));
    Ok(token.access_token)
}

Also it seems (do not have hard data to support this, just subjective feeling from initial testing) that tokio rwlock is much faster:

    static ref CACHE: Arc<tokio::sync::RwLock<Vec<(u128, String)>>> =
        Arc::new(tokio::sync::RwLock::new(Vec::with_capacity(1)));

from cached.

jaemk avatar jaemk commented on July 30, 2024

Nice. Yeah, the already_set bit isn't required, but it would prevent multiple concurrent calls from calling get_new_token when the token is missing or expired. Without it there may be multiple callers seeing that it's expired and then all queuing up to get write access, and then getting a new token even if something else just did so.

I haven't compared async_rwlock vs tokio, but that may be the case!

from cached.

NobodyXu avatar NobodyXu commented on July 30, 2024

Maybe consider using dashmap?
It is designed to be a high-performance hash map designed for concurrency.

I would say it actually fit quite well with caching.

from cached.

NobodyXu avatar NobodyXu commented on July 30, 2024

There’s also another crate left_right that also provides better concurrency than RwLock<HashMap>.

It is designed to be make reading scale linearly and not blocked by writing while making the writing slower, which IMHO fits in the pattern of caching, where cache hit should be as fast as possible while cache miss can be slower.

Edit:

It seems left_right suggests to use its high-level wrapper evmap instead of left_right directly.

Edit2:

concread seems to provide data structures works similar to evmap, but in additional to HashMap, it also provides BTreeMap and Adaptive Replacement Cache, the later might be more interesting than HashMap.

from cached.

adambezecny avatar adambezecny commented on July 30, 2024

hi,

any progress on this issue? Is it possible to implement this RwLock based cache as discussed?

from cached.

jaemk avatar jaemk commented on July 30, 2024

@adambezecny I'll take another look at this tonight and add a #[once] macro that implements the RwLock (only caching a single value) version we were chatting about a while ago

from cached.

jaemk avatar jaemk commented on July 30, 2024

@adambezecny A #[once] macro is now available in 0.26.0 0e36dbd

from cached.

adambezecny avatar adambezecny commented on July 30, 2024

hi James,

so to cache token for 23 hours I should use:

#[once(size = 1, time = 82800, result = true, sync_writes = true)]
pub async fn get_token() -> Result<String> {
    let new_token = get_new_token().await?;
    Ok(new_token.access_token)
}

This will use RwLock, i.e. all subsequent invocations within 23 hours interval will allow parallel reads (without getting lock). is that correct? Do I understand it correctly that it is sync_writes = true which causes to synchronize/lock write operations only? I.e. without *sync_writes = true Mutex is used, with *sync_writes = true RwLock is used? Correct?

regards,

Adam

from cached.

jaemk avatar jaemk commented on July 30, 2024

@adambezecny

once will always use a RwLock, but when sync_writes = true it will try to acquire the write-lock before executing the function that returns the value to cache (and will check if a valid cached value exists before executing) to prevent concurrent calls from executing the same logic. When sync_writes = false (default), the write-lock is acquired after executing, only wrapping the setting of the cache.

once also implies only one cached value, so there's no size parameter for this one. The arguments you pass to the function are only used when there's no cached value or the cache has expired, and are otherwise ignored.

Concurrent calls will only do a RwLock::read until the cached value has "expired" (then acquiring a write-lock), so reads should be parallel for the majority of calls.

from cached.

adambezecny avatar adambezecny commented on July 30, 2024

ok, got it. so sync_writes = true basically does same above discussed section in manual code:

let already_set = store
        .get(0)
        .map(|(created_ts, _)| (time_now() - created_ts) < 82800)
        .unwrap_or(false);
    if already_set {
        return Ok(store[0].1.clone());
   } else {...

i.e. when there is no cached value and we need to get write lock (+ retrieve token and cache it) we make sure we get it only once while remaining callers will get cached value already. So final code:

#[once(time = 82800, result = true, sync_writes = true)]
pub async fn get_token() -> Result<String> {
    let new_token = get_new_token().await?;
    Ok(new_token.access_token)
}

from cached.

jaemk avatar jaemk commented on July 30, 2024

Yes, correct!

from cached.

adambezecny avatar adambezecny commented on July 30, 2024

hi James,

with following dependencies:

[dependencies]
tokio = { version = "1.7.1", default-features = false, features = ["macros", "time", "rt-multi-thread", "signal", "fs", "rt"] }
cached = "0.26.0"

[dev-dependencies]
criterion = { version = "0.3.5", features = ["async_tokio", "html_reports"] }

[[bench]]
name = "benchmarks"
harness = false

I ran this criterion bechmarks:

use criterion::{criterion_group, criterion_main, Criterion};
use cached::proc_macro::once;
use lazy_static::lazy_static;
use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};
use va_voice_gateway_rs::errors::Result;
use va_voice_gateway_rs::nlp::vap::{VapToken, VapTokenAuth, VapTokenUser};

lazy_static! {
    static ref CACHE: Arc<tokio::sync::RwLock<Vec<(u128, String)>>> =
        Arc::new(tokio::sync::RwLock::new(Vec::with_capacity(1)));
}

/// dummy function for getting new token. instead of doing respective http call
/// it simply returns hardcoded value.
async fn get_new_token() -> Result<VapToken> {
    Ok(VapToken {
        access_token: "123456".to_owned(),
        authentication: VapTokenAuth {
            strategy: "local".to_owned(),
        },
        user: VapTokenUser {
            user_id: "usr123".to_owned(),
            email: "[email protected]".to_owned(),
            description: "some desc".to_owned(),
            allowed_services: vec!["SvcA".to_owned(), "SvcB".to_owned()],
        },
    })
}

fn time_now() -> u128 {
    SystemTime::now()
        .duration_since(UNIX_EPOCH)
        .unwrap()
        .as_millis()
}

#[once(time = 82800, result = true, sync_writes = true)]
async fn get_token() -> Result<String> {
    let new_token = get_new_token().await?;
    Ok(new_token.access_token)
}

async fn get_token_manual() -> Result<String> {
    {
        let token = CACHE.read().await;
        if let Some((created_ts, token)) = token.get(0) {
            if (time_now() - *created_ts) < 82800 {
                return Ok(String::from(token));
            }
        }
    }
    let mut store = CACHE.write().await;
    // this is needed to prevent multiple writers from getting the lock
    // one by one and retrieving token multiple times
    let already_set = store
        .get(0)
        .map(|(created_ts, _)| (time_now() - created_ts) < 82800)
        .unwrap_or(false);
    if already_set {
        Ok(store[0].1.clone())
    } else {
        let token = get_new_token().await?;
        store.clear();
        store.push((time_now(), token.access_token.to_owned()));
        Ok(token.access_token)
    }
}

pub fn benchmark_suite(c: &mut Criterion) {
    // create the tokio runtime to be used for the benchmarks
    let rt = tokio::runtime::Builder::new_multi_thread()
        .enable_all()
        .build()
        .unwrap();

    c.bench_function("get_token", |b| {
        b.to_async(&rt).iter(|| async {
            get_token().await.unwrap();
        })
    });

    c.bench_function("get_token_manual", |b| {
        b.to_async(&rt).iter(|| async {
            get_token_manual().await.unwrap();
        })
    });
}

criterion_group!(benches, benchmark_suite);
criterion_main!(benches);

Results:

get_token               time:   [219.76 ns 223.59 ns 227.74 ns]
                        change: [-1.7765% +0.3829% +2.4452%] (p = 0.73 > 0.05)
                        No change in performance detected.
Found 5 outliers among 100 measurements (5.00%)
  4 (4.00%) high mild
  1 (1.00%) high severe

get_token_manual        time:   [199.78 ns 202.23 ns 204.91 ns]
                        change: [-2.9261% -0.7982% +1.5430%] (p = 0.48 > 0.05)
                        No change in performance detected.
Found 9 outliers among 100 measurements (9.00%)
  4 (4.00%) high mild
  5 (5.00%) high severe

So performance is more-less same (slightly worse) then with manual implementation.

from cached.

jaemk avatar jaemk commented on July 30, 2024

I think the difference might be because the macros expand to use async_std's async_mutex and async_rwlock whereas the manual version you have is using tokio::sync::RwLock

from cached.

vext01 avatar vext01 commented on July 30, 2024

Sorry to break the flow.

I need a Rwlocked hashmap, and I just found your crate.

The docs kind of imply this is already possible:

[cached] macros are thread-safe with the backing function-cache wrapped in a mutex/rwlock

Alas it's not. So just registering interest. Will try to use a dashmap for now.

from cached.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.