eclipse-zenoh / zenoh-backend-influxdb Goto Github PK
View Code? Open in Web Editor NEWBackend and Storages for zenoh using InfluxDB
License: Other
Backend and Storages for zenoh using InfluxDB
License: Other
When I put data in as AppOctetStream encoding, it comes out as TextPlain.
Start influxdb2 with a bucket of "test" and start zenohd with the influxdbv2 backend. Zenoh config:
{
plugins: {
storage_manager: {
backend_search_dirs: [
".../zenoh-backend-influxdb/target/release",
],
volumes: {
influxdb2: {
url: "http://localhost:8086",
private: {
org_id: "...",
token: "...",
},
},
},
storages: {
"test": {
key_expr: "stored/**",
volume: {
id: "influxdb2",
db: "test",
private: {
org_id: "...",
token: "...",
},
},
},
},
},
},
}
and then run this:
use async_std::stream::StreamExt;
use std::{sync::Arc, time::Duration};
use zenoh::prelude::r#async::*;
#[async_std::main]
async fn main() {
let ke = "stored/test";
let session = Arc::new(zenoh::open(config::default()).res().await.unwrap());
let subscriber = session.declare_subscriber(ke).res().await.unwrap();
let publisher = session.declare_publisher(ke).res().await.unwrap();
async_std::task::spawn(async move {
loop {
publisher.put(vec![1, 2, 3]).res().await.unwrap();
async_std::task::sleep(Duration::from_secs(1)).await;
}
});
let sub_value = subscriber.recv_async().await.unwrap();
let mut query_value = None;
while query_value.is_none() {
query_value = session
.get(ke)
.res()
.await
.unwrap()
.stream()
.map(|r| r.sample.unwrap())
.next()
.await;
}
assert_eq!(sub_value.encoding, query_value.unwrap().encoding);
}
Result:
thread 'main' panicked at 'assertion failed: `(left == right)`
left: `Exact(AppOctetStream)`,
right: `Exact(TextPlain)`', crates\influx_test\src\main.rs:35:5
On a query with a time filter, an InfluxDB storage returns duplicates for each reply.
zenohd
with this conf:plugins: {
storage_manager: {
volumes: {
influxdb: {
url: "http://localhost:8086",
}
},
storages: {
demo: {
key_expr: "demo/example/**",
volume: {
id: "influxdb",
db: "zenoh_example",
create_db: true,
on_closure: "drop_db",
}
}
}
}
}
z_pub
example for 3 seconds (i.e. 3 publications)z_get
example as such:z_get -s "demo/**?_time=[..]"
Expected result: 3 replies with number 0, 1 and 2
Actual result: 9 replies, 3 for each number
Zenoh v0.7.2-rc
Make it build with Rust stable now that eclipse-zenoh/zenoh supports it.
Use case:
docker run --rm -p 8086:8086 -e INFLUXDB_HTTP_AUTH_ENABLED=true -e INFLUXDB_ADMIN_USER=admin -e INFLUXDB_ADMIN_PASSWORD=XXXXXXXX -e INFLUXDB_USER=nopriv_user -e INFLUXDB_USER_PASSWORD=YYYYYYYY influxdb:1.8
zenohd
and create a backend using wrong credentials. e.g.:
curl -X PUT -H 'content-type:application/properties' -d "url=http://localhost:8086;username=admin;password=ZZZZZZZZ" http://localhost:8000/@/router/local/plugin/storages/backend/influxdb
Current result: the backend is created without any error.
Expected result: a warning log indicating that the backend has not been created because of authentication failure.
In the README.md, it says:
To know the Rust version you're zenohd has been built with, use the --version option. Example:
$ zenohd --version
The zenoh router v0.6.0-beta.1 built with rustc 1.64.0 (a55dd71d5 2022-09-19)
Here, zenohd has been built with the rustc version 1.64.0. Install and use this toolchain with the following command:
$ rustup default 1.64.0
And zenohd version corresponds to an un-released commit with id 1f20c86. Update the zenoh dependency in Cargo.lock with this command:
$ cargo update -p zenoh --precise 1f20c86
It's not clear to me how I'm supposed to figure out that it corresponds to 1f20c86 based on the above printout. Perhaps this should be clarified?
See eclipse-zenoh/ci#10.
It would be quite useful to support token based authentication. This seems to be supported by the rust-api (see https://docs.rs/influxdb/latest/influxdb/struct.Client.html#method.with_token) and would work more nicely with influx-db docker images which by default have authentication based on user/passwd disabled.
In general the token used by influx-db API is available on the INFLUXDB_TOKEN
environment variable.
It seems that influxdb library 0.6.0 introduces a regression on the plugin (see #32).
Bump influxdb crate to 0.6.0
Any platform
Creating a storage with:
curl -X PUT -H 'content-type:application/properties' -d "path_expr=/demo/example/**;db=zenoh-example;create_db" http://localhost:8000/@/router/local/plugin/storages/backend/influxdb/storage/example
leads to an error log in the router:
[2021-04-08T16:27:53Z WARN zplugin_storages::backends_mgt] zenoh error: (Failed to create new InfluxDb database 'zenoh-example' : InfluxDB encountered the following error: influxdb error: "{"error":"error parsing query: found -, expected ; at line 1, char 22"} ") at src/lib.rs:709.
There is obviously an issue with -
character in the db name.
The get function uses the range flux function using Unix timestamps, but the start/stop parameters must be integers when used as Unix timestamps, while the code tries to send floats with a decimal, which causes an error:
[2024-01-04T21:36:37Z DEBUG zenoh_backend_influxdb2] Get Some("stored/test") with Influx query:from(bucket: "test")
|> range(start: 1704404158.244, stop: 1704404197)
|> filter(fn: (r) => r._measurement == "stored/test")
|> filter(fn: (r) => r["kind"] == "PUT")
in InfluxDBv2 storage
[2024-01-04T21:36:37Z ERROR zenoh_backend_influxdb2] Couldn't get data from database test in InfluxDBv2 storage with error: HTTP request returned an error: 400 Bad Request, `{"code":"invalid","message":"error calling function \"range\" @2:48-2:94: value is not a time, got float"}`
I think this could be fixed by either
use async_std::stream::StreamExt;
use chrono::{SecondsFormat, Utc};
use std::time::Duration;
use zenoh::prelude::r#async::*;
#[async_std::main]
async fn main() {
let mut config = config::default();
config
.connect
.endpoints
.push("tcp/localhost:7447".parse().unwrap());
let session = zenoh::open(config).res().await.unwrap();
let before = Utc::now();
let mut query_value = None;
while query_value.is_none() {
async_std::task::sleep(Duration::from_secs(1)).await;
let time = before.to_rfc3339_opts(SecondsFormat::Millis, true);
println!("{}", time);
query_value = session
.get(format!("stored/test?_time=[{time}..]"))
.res()
.await
.unwrap()
.stream()
.map(|r| r.sample.unwrap())
.next()
.await;
}
}
Once the above connects and attempts a query, you should see the above error in the Zenoh console.
Under moderately heavy load, the InfluxDB v1 backend falls behind and eventually starts dropping samples.
See https://discordapp.com/channels/914168414178779197/1207326913186762794 for the details
Once the storage falls behind by ~1m 40s, it starts dropping samples in batches, not making PUT requests to InfluxDB for them
zenohd v0.10.1-rc-1-g15b36a0f built with rustc 1.72.0 (5680fa18f 2023-08-23)
InfluxDB 1.8.10
When using curl -X GET http://localhost:8000/\*\*
no data come from influxdb2
after a post method.
Different from influxdb
docker run \
-p 8086:8086 \
-v myInfluxVolume:/var/lib/influxdb2 \
influxdb:1.8
./zenohd -c influx.json5 --adminspace-permissions=rw
curl -X PUT -d '"Hello World!"' http://localhost:8000/demo/example/test
curl -X GET http://localhost:8000/\*\*
my influx.json5
{
plugins: {
storage_manager: {
volumes: {
influxdb: {
// URL to the InfluxDB service
url: "http://0.0.0.0:8086",
private: {
token: "eCqM_gNl9fbVZE4JeQTtcLgI28vGztI2yQJUn7Kf5c4m73JaweWbgZJvydvhRVtFvAHAhftIeegCDkQVsalZZA=="
}
}
},
storages: {
demo: {
key_expr: "demo/example/**",
strip_prefix: "demo/example",
volume: {
id: "influxdb",
db: "test",
create_db: true,
on_closure: "drop_db",
private: {
token: "eCqM_gNl9fbVZE4JeQTtcLgI28vGztI2yQJUn7Kf5c4m73JaweWbgZJvydvhRVtFvAHAhftIeegCDkQVsalZZA=="
}
}
}
}
},
rest: { http_port: 8000 }
}
}
This steps work in influxdb
but fail in influxdb2
, the config need to change for influxdb
since the username and password are required for this version.
The zenoh router v0.10.0-rc built with rustc 1.72.0 (5680fa18f 2023-08-23)
-`
.o+` patrick@pichau
`ooo/ OS: Arch Linux
`+oooo: Kernel: x86_64 Linux 6.5.9-arch2-1
`+oooooo: Uptime: 14d 19h 34m
-+oooooo+: Packages: 2689
`/:-:++oooo+: Shell: zsh 5.9
`/++++/+++++++: Resolution: 7680x2160
`/++++++++++++++: DE: KDE 5.111.0 / Plasma 5.27.9
`/+++ooooooooooooo/` WM: KWin
./ooosssso++osssssso+` GTK Theme: [GTK2/3]
.oossssso-````/ossssss+` Icon Theme: bloom
-osssssso. :ssssssso. Disk: 800G / 911G (92%)
:osssssss/ osssso+++. CPU: AMD Ryzen 7 2700 Eight-Core @ 16x 3.2GHz
/ossssssss/ +ssssooo/- GPU: NVIDIA GeForce GTX 1060 6GB
`/ossssso+/:- -:/+osssso+- RAM: 33051MiB / 64247MiB
`+sso+:-` `.-/+oso:
`++:. `-/+/
.` `/
Hi there,
Does the plugin currently support InfluxDB2.0? The move to Influx2.0 changed serval key architectures:
InfluxDB2.0 does support legacy commands is this something you are already using? I am just struggling to get the example working with Influx2.0.
Many thanks for your suppoirt.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.