apache / opendal Goto Github PK
View Code? Open in Web Editor NEWApache OpenDAL: access data freely.
Home Page: https://opendal.apache.org
License: Apache License 2.0
Apache OpenDAL: access data freely.
Home Page: https://opendal.apache.org
License: Apache License 2.0
Feel that BoxedAsyncReader
is easier to understand as a reader, BoxedAsyncRead
like a reading action :)
Originally posted by @BohuTANG in datafuselabs/databend#4203 (comment)
opendal: opendal-0.1.4
branch:
datafuselabs/databend#4328
docker run -d -p 9900:9000 --name minio \
-e "MINIO_ACCESS_KEY=minioadmin" \
-e "MINIO_SECRET_KEY=minioadmin" \
-v /tmp/data:/data \
-v /tmp/config:/root/.minio \
minio/minio server /data
export AWS_ACCESS_KEY_ID=minioadmin
export AWS_SECRET_ACCESS_KEY=minioadmin
export AWS_EC2_METADATA_DISABLED=true
aws --endpoint-url http://127.0.0.1:9900/ s3 mb s3://testbucket
aws --endpoint-url http://127.0.0.1:9900/ s3 cp tests/data s3://testbucket/admin/data --recursive
https://repo.databend.rs/t_ontime/create_table.sql
copy into ontime from 's3://testbucket/admin/data/ontime_200.parquet' credentials=(aws_key_id='minioadmin' aws_secret_key='minioadmin') FILE_FORMAT = (type = 'parquet');
Server logs:
2022-03-06T00:19:16.316335Z ERROR common_tracing::panic_hook: panicked at '`async fn` resumed after completion', /home/bohu/.cargo/registry/src/github.com-1ecc6299db9ec823/opendal-0.1.4/src/io.rs:147:45 backtrace= 0: common_tracing::panic_hook::set_panic_hook::{{closure}}
at common/tracing/src/panic_hook.rs:25:25
1: std::panicking::rust_panic_with_hook
at /rustc/4ce3749235fc31d15ebd444b038a9877e8c700d7/library/std/src/panicking.rs:702:17
2: std::panicking::begin_panic_handler::{{closure}}
at /rustc/4ce3749235fc31d15ebd444b038a9877e8c700d7/library/std/src/panicking.rs:586:13
3: std::sys_common::backtrace::__rust_end_short_backtrace
at /rustc/4ce3749235fc31d15ebd444b038a9877e8c700d7/library/std/src/sys_common/backtrace.rs:138:18
4: rust_begin_unwind
at /rustc/4ce3749235fc31d15ebd444b038a9877e8c700d7/library/std/src/panicking.rs:584:5
5: core::panicking::panic_fmt
at /rustc/4ce3749235fc31d15ebd444b038a9877e8c700d7/library/core/src/panicking.rs:143:14
6: core::panicking::panic
at /rustc/4ce3749235fc31d15ebd444b038a9877e8c700d7/library/core/src/panicking.rs:48:5
7: <opendal::io::Reader as futures_io::if_std::AsyncSeek>::poll_seek::{{closure}}
at /home/bohu/.cargo/registry/src/github.com-1ecc6299db9ec823/opendal-0.1.4/src/io.rs:147:45
8: <core::future::from_generator::GenFuture<T> as core::future::future::Future>::poll
at /rustc/4ce3749235fc31d15ebd444b038a9877e8c700d7/library/core/src/future/mod.rs:91:19
9: <core::pin::Pin<P> as core::future::future::Future>::poll
at /rustc/4ce3749235fc31d15ebd444b038a9877e8c700d7/library/core/src/future/future.rs:124:9
10: <opendal::io::Reader as futures_io::if_std::AsyncSeek>::poll_seek
at /home/bohu/.cargo/registry/src/github.com-1ecc6299db9ec823/opendal-0.1.4/src/io.rs:129:26
11: <&mut T as futures_io::if_std::AsyncSeek>::poll_seek
at /home/bohu/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-io-0.3.21/src/lib.rs:479:17
12: <futures_util::io::seek::Seek<S> as core::future::future::Future>::poll
at /home/bohu/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-util-0.3.21/src/io/seek.rs:28:9
13: parquet2::read::stream::stream_len::{{closure}}
at /home/bohu/.cargo/registry/src/github.com-1ecc6299db9ec823/parquet2-0.10.2/src/read/stream.rs:14:50
14: <core::future::from_generator::GenFuture<T> as core::future::future::Future>::poll
at /rustc/4ce3749235fc31d15ebd444b038a9877e8c700d7/library/core/src/future/mod.rs:91:19
15: parquet2::read::stream::read_metadata::{{closure}}
at /home/bohu/.cargo/registry/src/github.com-1ecc6299db9ec823/parquet2-0.10.2/src/read/stream.rs:30:39
16: <core::future::from_generator::GenFuture<T> as core::future::future::Future>::poll
at /rustc/4ce3749235fc31d15ebd444b038a9877e8c700d7/library/core/src/future/mod.rs:91:19
17: arrow2::io::parquet::read::read_metadata_async::{{closure}}
at /home/bohu/.cargo/git/checkouts/arrow2-6249446b5f8db6f7/f71124b/src/io/parquet/read/mod.rs:68:36
18: <core::future::from_generator::GenFuture<T> as core::future::future::Future>::poll
at /rustc/4ce3749235fc31d15ebd444b038a9877e8c700d7/library/core/src/future/mod.rs:91:19
19: <tracing::instrument::Instrumented<T> as core::future::future::Future>::poll
at /home/bohu/.cargo/registry/src/github.com-1ecc6299db9ec823/tracing-0.1.31/src/instrument.rs:272:9
20: <common_streams::sources::source_parquet::ParquetSource<R> as common_streams::sources::source::Source>::read::{{closure}}::{{closure}}
at common/streams/src/sources/source_parquet.rs:102:73
21: <core::future::from_generator::GenFuture<T> as core::future::future::Future>::poll
at /rustc/4ce3749235fc31d15ebd444b038a9877e8c700d7/library/core/src/future/mod.rs:91:19
22: <common_streams::sources::source_parquet::ParquetSource<R> as common_streams::sources::source::Source>::read::{{closure}}
at common/streams/src/sources/source_parquet.rs:95:5
23: <core::future::from_generator::GenFuture<T> as core::future::future::Future>::poll
at /rustc/4ce3749235fc31d15ebd444b038a9877e8c700d7/library/core/src/future/mod.rs:91:19
24: <core::pin::Pin<P> as core::future::future::Future>::poll
at /rustc/4ce3749235fc31d15ebd444b038a9877e8c700d7/library/core/src/future/future.rs:124:9
25: <databend_query::storages::s3::s3_external_source::ExternalSource as databend_query::pipelines::new::processors::sources::async_source::AsyncSource>::generate::{{closure}}
at query/src/storages/s3/s3_external_source.rs:181:52
26: <core::future::from_generator::GenFuture<T> as core::future::future::Future>::poll
at /rustc/4ce3749235fc31d15ebd444b038a9877e8c700d7/library/core/src/future/mod.rs:91:19
27: <databend_query::pipelines::new::processors::sources::async_source::AsyncSourcer<T> as databend_query::pipelines::new::processors::processor::Processor>::async_process::{{closure}}
at query/src/pipelines/new/processors/sources/async_source.rs:86:36
28: <core::future::from_generator::GenFuture<T> as core::future::future::Future>::poll
at /rustc/4ce3749235fc31d15ebd444b038a9877e8c700d7/library/core/src/future/mod.rs:91:19
29: <core::pin::Pin<P> as core::future::future::Future>::poll
at /rustc/4ce3749235fc31d15ebd444b038a9877e8c700d7/library/core/src/future/future.rs:124:9
30: <core::pin::Pin<P> as core::future::future::Future>::poll
at /rustc/4ce3749235fc31d15ebd444b038a9877e8c700d7/library/core/src/future/future.rs:124:9
31: databend_query::pipelines::new::executor::executor_worker_context::ExecutorWorkerContext::execute_async_task::{{closure}}
at query/src/pipelines/new/executor/executor_worker_context.rs:90:48
32: <core::future::from_generator::GenFuture<T> as core::future::future::Future>::poll
at /rustc/4ce3749235fc31d15ebd444b038a9877e8c700d7/library/core/src/future/mod.rs:91:19
33: tokio::runtime::task::core::CoreStage<T>::poll::{{closure}}
at /home/bohu/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.17.0/src/runtime/task/core.rs:161:17
34: tokio::loom::std::unsafe_cell::UnsafeCell<T>::with_mut
at /home/bohu/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.17.0/src/loom/std/unsafe_cell.rs:14:9
35: tokio::runtime::task::core::CoreStage<T>::poll
at /home/bohu/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.17.0/src/runtime/task/core.rs:151:13
36: tokio::runtime::task::harness::poll_future::{{closure}}
at /home/bohu/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.17.0/src/runtime/task/harness.rs:467:19
37: <core::panic::unwind_safe::AssertUnwindSafe<F> as core::ops::function::FnOnce<()>>::call_once
at /rustc/4ce3749235fc31d15ebd444b038a9877e8c700d7/library/core/src/panic/unwind_safe.rs:271:9
38: std::panicking::try::do_call
at /rustc/4ce3749235fc31d15ebd444b038a9877e8c700d7/library/std/src/panicking.rs:492:40
39: __rust_try
40: std::panicking::try
at /rustc/4ce3749235fc31d15ebd444b038a9877e8c700d7/library/std/src/panicking.rs:456:19
41: std::panic::catch_unwind
at /rustc/4ce3749235fc31d15ebd444b038a9877e8c700d7/library/std/src/panic.rs:137:14
42: tokio::runtime::task::harness::poll_future
at /home/bohu/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.17.0/src/runtime/task/harness.rs:455:18
43: tokio::runtime::task::harness::Harness<T,S>::poll_inner
at /home/bohu/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.17.0/src/runtime/task/harness.rs:103:27
44: tokio::runtime::task::harness::Harness<T,S>::poll
at /home/bohu/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.17.0/src/runtime/task/harness.rs:57:15
45: tokio::runtime::task::raw::poll
at /home/bohu/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.17.0/src/runtime/task/raw.rs:128:5
46: tokio::runtime::task::raw::RawTask::poll
at /home/bohu/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.17.0/src/runtime/task/raw.rs:80:18
47: tokio::runtime::task::LocalNotified<S>::run
at /home/bohu/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.17.0/src/runtime/task/mod.rs:347:9
48: tokio::runtime::thread_pool::worker::Context::run_task::{{closure}}
at /home/bohu/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.17.0/src/runtime/thread_pool/worker.rs:425:13
49: tokio::coop::with_budget::{{closure}}
at /home/bohu/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.17.0/src/coop.rs:102:9
50: std::thread::local::LocalKey<T>::try_with
at /rustc/4ce3749235fc31d15ebd444b038a9877e8c700d7/library/std/src/thread/local.rs:413:16
51: std::thread::local::LocalKey<T>::with
at /rustc/4ce3749235fc31d15ebd444b038a9877e8c700d7/library/std/src/thread/local.rs:389:9
52: tokio::coop::with_budget
at /home/bohu/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.17.0/src/coop.rs:95:5
tokio::coop::budget
at /home/bohu/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.17.0/src/coop.rs:72:5
tokio::runtime::thread_pool::worker::Context::run_task
at /home/bohu/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.17.0/src/runtime/thread_pool/worker.rs:424:9
53: tokio::runtime::thread_pool::worker::Context::run
at /home/bohu/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.17.0/src/runtime/thread_pool/worker.rs:391:24
54: tokio::runtime::thread_pool::worker::run::{{closure}}
at /home/bohu/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.17.0/src/runtime/thread_pool/worker.rs:376:17
55: tokio::macros::scoped_tls::ScopedKey<T>::set
at /home/bohu/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.17.0/src/macros/scoped_tls.rs:61:9
56: tokio::runtime::thread_pool::worker::run
at /home/bohu/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.17.0/src/runtime/thread_pool/worker.rs:373:5
57: tokio::runtime::thread_pool::worker::Launch::launch::{{closure}}
at /home/bohu/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.17.0/src/runtime/thread_pool/worker.rs:352:45
58: <tokio::runtime::blocking::task::BlockingTask<T> as core::future::future::Future>::poll
at /home/bohu/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.17.0/src/runtime/blocking/task.rs:42:21
59: tokio::runtime::task::core::CoreStage<T>::poll::{{closure}}
at /home/bohu/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.17.0/src/runtime/task/core.rs:161:17
60: tokio::loom::std::unsafe_cell::UnsafeCell<T>::with_mut
at /home/bohu/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.17.0/src/loom/std/unsafe_cell.rs:14:9
61: tokio::runtime::task::core::CoreStage<T>::poll
at /home/bohu/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.17.0/src/runtime/task/core.rs:151:13
62: tokio::runtime::task::harness::poll_future::{{closure}}
at /home/bohu/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.17.0/src/runtime/task/harness.rs:467:19
63: <core::panic::unwind_safe::AssertUnwindSafe<F> as core::ops::function::FnOnce<()>>::call_once
at /rustc/4ce3749235fc31d15ebd444b038a9877e8c700d7/library/core/src/panic/unwind_safe.rs:271:9
64: std::panicking::try::do_call
at /rustc/4ce3749235fc31d15ebd444b038a9877e8c700d7/library/std/src/panicking.rs:492:40
65: __rust_try
66: std::panicking::try
at /rustc/4ce3749235fc31d15ebd444b038a9877e8c700d7/library/std/src/panicking.rs:456:19
67: std::panic::catch_unwind
at /rustc/4ce3749235fc31d15ebd444b038a9877e8c700d7/library/std/src/panic.rs:137:14
68: tokio::runtime::task::harness::poll_future
at /home/bohu/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.17.0/src/runtime/task/harness.rs:455:18
69: tokio::runtime::task::harness::Harness<T,S>::poll_inner
at /home/bohu/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.17.0/src/runtime/task/harness.rs:103:27
70: tokio::runtime::task::harness::Harness<T,S>::poll
at /home/bohu/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.17.0/src/runtime/task/harness.rs:57:15
71: tokio::runtime::task::raw::poll
at /home/bohu/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.17.0/src/runtime/task/raw.rs:128:5
72: tokio::runtime::task::raw::RawTask::poll
at /home/bohu/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.17.0/src/runtime/task/raw.rs:80:18
73: tokio::runtime::task::UnownedTask<S>::run
at /home/bohu/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.17.0/src/runtime/task/mod.rs:384:9
74: tokio::runtime::blocking::pool::Task::run
at /home/bohu/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.17.0/src/runtime/blocking/pool.rs:91:9
75: tokio::runtime::blocking::pool::Inner::run
at /home/bohu/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.17.0/src/runtime/blocking/pool.rs:308:17
76: tokio::runtime::blocking::pool::Spawner::spawn_thread::{{closure}}
at /home/bohu/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.17.0/src/runtime/blocking/pool.rs:288:17
77: std::sys_common::backtrace::__rust_begin_short_backtrace
at /rustc/4ce3749235fc31d15ebd444b038a9877e8c700d7/library/std/src/sys_common/backtrace.rs:122:18
78: std::thread::Builder::spawn_unchecked_::{{closure}}::{{closure}}
at /rustc/4ce3749235fc31d15ebd444b038a9877e8c700d7/library/std/src/thread/mod.rs:498:17
79: <core::panic::unwind_safe::AssertUnwindSafe<F> as core::ops::function::FnOnce<()>>::call_once
at /rustc/4ce3749235fc31d15ebd444b038a9877e8c700d7/library/core/src/panic/unwind_safe.rs:271:9
80: std::panicking::try::do_call
at /rustc/4ce3749235fc31d15ebd444b038a9877e8c700d7/library/std/src/panicking.rs:492:40
81: __rust_try
82: std::panicking::try
at /rustc/4ce3749235fc31d15ebd444b038a9877e8c700d7/library/std/src/panicking.rs:456:19
83: std::panic::catch_unwind
at /rustc/4ce3749235fc31d15ebd444b038a9877e8c700d7/library/std/src/panic.rs:137:14
84: std::thread::Builder::spawn_unchecked_::{{closure}}
at /rustc/4ce3749235fc31d15ebd444b038a9877e8c700d7/library/std/src/thread/mod.rs:497:30
85: core::ops::function::FnOnce::call_once{{vtable.shim}}
at /rustc/4ce3749235fc31d15ebd444b038a9877e8c700d7/library/core/src/ops/function.rs:227:5
86: <alloc::boxed::Box<F,A> as core::ops::function::FnOnce<Args>>::call_once
at /rustc/4ce3749235fc31d15ebd444b038a9877e8c700d7/library/alloc/src/boxed.rs:1854:9
<alloc::boxed::Box<F,A> as core::ops::function::FnOnce<Args>>::call_once
at /rustc/4ce3749235fc31d15ebd444b038a9877e8c700d7/library/alloc/src/boxed.rs:1854:9
std::sys::unix::thread::Thread::new::thread_start
at /rustc/4ce3749235fc31d15ebd444b038a9877e8c700d7/library/std/src/sys/unix/thread.rs:108:17
87: start_thread
at ./nptl/./nptl/pthread_create.c:442:8
88: __GI___clone3
at ./misc/../sysdeps/unix/sysv/linux/x86_64/clone3.S:81
panic.file="/home/bohu/.cargo/registry/src/github.com-1ecc6299db9ec823/opendal-0.1.4/src/io.rs" panic.line=147 panic.column=45
When using the OpenDAL to stat a s3 file, the error message is not useful with Error::Unexpected(path.to_string())
, can we bring the s3 error or the OpenDAl stack trace?
Server error log :
2022-02-21T03:06:34.112245Z ERROR databend_query::servers::mysql::writers::query_result_writer: OnQuery Error: Code: 3007, displayText = dal stat https://s3.amazonaws.com/databend-external/t_ontime/t_ontime.csv error:Unexpected("https://s3.amazonaws.com/databend-external/t_ontime/t_ontime.csv").
0: common_exception::exception_code::<impl common_exception::exception::ErrorCode>::DalStatError
at common/exception/src/exception_code.rs:36:66
1: databend_query::interpreters::interpreter_copy::CopyInterpreter::get_csv_stream::{{closure}}::{{closure}}
at query/src/interpreters/interpreter_copy.rs:93:13
Allow user to init storage backend like what we does for database:
fs:///path/to/dir
s3://s3.amazonaws.com/bucket/path/to/dir?aws_access_key_id=xxx&aws_secret_access_key=xxx®ion=xxx
2022-02-15T07:04:48.380574Z WARN load_region{provider=ImdsRegionProvider { client: LazyClient { client: OnceCell { value: None }, builder: Builder { max_attempts: None, endpoint: None, mode_override: None, token_ttl: None, connect_timeout: None, read_timeout: None, config: Some(ProviderConfig { env: Env(Real), fs: Fs(Real), sleep: Some(TokioSleep), region: None }) } }, env: Env(Real) }}: aws_config::imds::region: failed to load region from IMDS err=Failed to load session token: timeout: error trying to connect: HTTP connect timeout occurred after 1s
2022-02-15T07:04:49.386606Z WARN load_region{provider=ImdsRegionProvider { client: LazyClient { client: OnceCell { value: None }, builder: Builder { max_attempts: None, endpoint: None, mode_override: None, token_ttl: None, connect_timeout: None, read_timeout: None, config: Some(ProviderConfig { env: Env(Real), fs: Fs(Real), sleep: Some(TokioSleep), region: None }) } }, env: Env(Real) }}: aws_config::imds::region: failed to load region from IMDS err=Failed to load session token: timeout: error trying to connect: HTTP connect timeout occurred after 1s
create s3 accessor
2022-02-15T07:04:50.393993Z WARN load_region{provider=ImdsRegionProvider { client: LazyClient { client: OnceCell { value: None }, builder: Builder { max_attempts: None, endpoint: None, mode_override: None, token_ttl: None, connect_timeout: None, read_timeout: None, config: Some(ProviderConfig { env: Env(Real), fs: Fs(Real), sleep: Some(TokioSleep), region: None }) } }, env: Env(Real) }}: aws_config::imds::region: failed to load region from IMDS err=Failed to load session token: timeout: error trying to connect: HTTP connect timeout occurred after 1s
2022-02-15T07:04:51.399065Z WARN load_region{provider=ImdsRegionProvider { client: LazyClient { client: OnceCell { value: None }, builder: Builder { max_attempts: None, endpoint: None, mode_override: None, token_ttl: None, connect_timeout: None, read_timeout: None, config: Some(ProviderConfig { env: Env(Real), fs: Fs(Real), sleep: Some(TokioSleep), region: None }) } }, env: Env(Real) }}: aws_config::imds::region: failed to load region from IMDS err=Failed to load session token: timeout: error trying to connect: HTTP connect timeout occurred after 1s
This will make every databend query stall up on 3s.
S3 SDK will not support this officially, we need to workaround it.
This note is used to track performance improvement.
Please let me know for any ideas.
To spawn
a new task on tokio runtime, we can't have reference:
fn spawn<T>(&self, future: T) -> JoinHandle<T::Output>
where
T: Future + Send + 'static,
T::Output: Send + 'static,
{
match self {
Executor::External => tokio::spawn(future),
Executor::Global => GLOBAL_EXECUTOR.spawn(future),
Executor::Internal() => unimplemented!(),
}
}
So we have to take an owned buf like monoio
and tokio-uring
:
fn read(&self, path: &str, offset: u64, buf: Vec<u8>) -> JoinHandle<Result<(usize, Vec<u8>)>> {
let (_, _, _) = (path, offset, buf);
unimplemented!()
}
fn write(&self, path: &str, buf: Vec<u8>) -> JoinHandle<Result<()>> {
let (_, _) = (path, buf);
unimplemented!()
}
However, these API requires extra alloc and memory copy which drops 50% read performance.
Here is another try that keeps returning BoxedAsyncRead
:
fn read(&self, path: &str, offset: u64, size: u64) -> JoinHandle<Result<BoxedAsyncRead>> {
let (_, _, _) = (path, offset, size);
unimplemented!()
}
There is no significant performance on regular benches, but we can find performance drops under parallel read benches.
OpenDAL focuses on IO-bound tasks. Maintaining a separate runtime may increase the performance.
opendal
support self-maintained runtime and global runtime simultaneously?We need to find the root cause of #86.
It's wired that get_object
with size=None will return the whole object's body. Something must be wrong inside:
let r = o.reader();
let buf = vec[0;4*1024*1024];
r.read_exact(buf).await;
We are expecting that only read 4MiB, but it's not.
Make ErrorKind
a separate enum out of Error
for error check.
export S3_STORAGE_ENDPOINT_URL=https://s3.amazonaws.com
It's buggy now.
The behavior between keeping S3_STORAGE_ENDPOINT_URL
empty and setting S3_STORAGE_ENDPOINT_URL=https://s3.amazonaws.com/
is different.
S3_STORAGE_ENDPOINT_URL=""
: s3 SDK will use format!("s3.{}.amazonaws.com", region)
to construct the correct endpoint.S3_STORAGE_ENDPOINT_URL=https://s3.amazonaws.com/
: s3 SDK will take this input as static, and can't handle the region correctly.I'm considering removing the region
option so that:
https://s3.amazonaws.com/
and detect the region for users.http://127.0.0.1:9000
Originally posted by @Xuanwo in #54 (comment)
Background
The s3://repo.databend.rs/dataset/stateful/ontime.csv
ACL is Everyone (public access) on AWS S3.
If the credentials not set for OpenDAL:
will get the error:
displayText = Parse csv error at line 0, cause: unexpected: (op: read, path: /dataset/stateful/ontime.csv, source: failed to construct request: No credentials in the property bag
Enhancement
It would be better to add a test for minio bucket with public ACL.
The file_location is s3://databend-external/t_ontime/t_ontime.csv
, opendal stat will return Unexpected("s3://databend-external/t_ontime/t_ontime.csv").
For s3, there are 3 styles URL:
https://docs.aws.amazon.com/AmazonS3/latest/userguide/access-bucket-intro.html
The last is S3://bucket-name/key-name
, the bucket name does not include the AWS Region.
In OpenDAL:
opendal::services::s3::Backend::build()
.endpoint("https://bucket-name.s3.amazonaws.com")
.bucket("bucket-name")
.credential(credential)
.finish()
If we not set the region, it would be better to get it:
https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetBucketLocation.html
This is used by datafuselabs/databend#3586:
copy into mytable
from 's3://bucket-name/data/files'
credentials=(aws_key_id='$AWS_ACCESS_KEY_ID' aws_secret_key='$AWS_SECRET_ACCESS_KEY')
encryption=(master_key = 'eSxX0jzYfIamtnBKOEOwq80Au6NbSgPH5r4BDDwOaO8=')
file_format = (type = csv field_delimiter = '|' skip_header = 1);
I tried using cargo bench
to get the results, but I didn't get a valid output.
Maybe I am missing something?
For local fs, we can switch into sync read/write API to improve the performance
Implement encryption for s3
Make the code style same as other repos style?
Such as databend:
https://github.com/datafuselabs/databend/blob/main/rustfmt.toml
There is no standard here, but defined a style may be important for the contributor to follow :)
Originally posted by @BohuTANG in #31 (comment)
Waitting for imports_granularity = "Item"
to be stable.
Bucket is admin
, file path is /admin/data//ontime_200.csv
, OpenDAL reader returns empty results.
Don't know this should return file not found error or not?
File name to /admin/data/ontime_200.csv
works well.
Needs Xuanwo/reqsign#7
For example, the following error occurs during writing block data to AWS-S3, it would be better if we could provide some further descriptions of the cause which triggered DalTransportError.
2022-02-16T14:05:49.109930Z ERROR databend_query::servers::mysql::writers::query_result_writer: OnQuery Error: Code: 3003, displayText = unexpected: (cause _b/5c1bf0208df24d97bda2cf2201956705.parquet).
0: common_exception::exception_code::<impl common_exception::exception::ErrorCode>::DalTransportError
at common/exception/src/exception_code.rs:36:66
1: databend_query::storages::fuse::io::block_writer::write_block::{{closure}}::{{closure}}
at query/src/storages/fuse/io/block_writer.rs:75:22
core::result::Result<T,E>::map_err
at /rustc/f624427f8771c00819684c783bb841bf72095704/library/core/src/result.rs:842:27
databend_query::storages::fuse::io::block_writer::write_block::{{closure}}
at query/src/storages/fuse/io/block_writer.rs:71:5
<core::future::from_generator::GenFuture<T> as core::future::future::Future>::poll
at /rustc/f624427f8771c00819684c783bb841bf72095704/library/core/src/future/mod.rs:84:19
databend_query::storages::fuse::io::block_stream_writer::BlockStreamWriter::write_block::{{closure}}
at query/src/storages/fuse/io/block_stream_writer.rs:121:93
<core::future::from_generator::GenFuture<T> as core::future::future::Future>::poll
at /rustc/f624427f8771c00819684c783bb841bf72095704/library/core/src/future/mod.rs:84:19
<databend_query::storages::fuse::io::block_stream_writer::BlockStreamWriter as
.....
This tracking issue intends to track the implementations of different underlying storage schemes. Please feel free to take any of them.
This section is used to track services that are waiting for passing integration tests.
This section is used to track services that are working on their first release.
This section is used to track services that no developers interested in. Please feel free to take one of them.
ssh
removed: #2643China Mobile Ecloud Elastic Object Storage (EOS)
removed: No native API (only S3).github
removed: no use cases.gitlab
removed: no use cases.rsync.net
removed: no usable API, requires ssh
For now, we use a simple data range logic:
if exist_range.includes(request_range) {
let offset = (request_range.offset - exist_range.offset) as usize;
let length = request_range.size;
let (buf, _) = buf.split_at_mut(length);
buf.copy_from_slice(&chunk.data[offset..offset + length]);
self.pos += length as u64;
Poll::Ready(Ok(length))
} else {
// request whole new
}
It's possible that the request_range
is just an extension of exist_range
, especially for read pattern of parquet
in the databend.
We can request the data [request_range.offset, exist_range.offset)
instead of [request_range.offset, request_range.offset + exist_range.size]
.
Ok, I got the problem. The callback was triggered at the end of this read, so we cannot calculate the cost of every read operation. Let me find a solution for it.
Originally posted by @Xuanwo in datafuselabs/databend#4256 (comment)
This is found from databend ontime dataset tests on AWS EC2 and S3: How to
parallel_read_threads
is 1, if we setting it to 4:
set parallel_read_threads=4;
All the query cost almost same as the no setting.
How to check:
if we checkout to f9971bdf335333ffc2253b60b0842b2a3c8ca6cc
commit:
commit f9971bdf335333ffc2253b60b0842b2a3c8ca6cc (HEAD -> main, origin/main, origin/HEAD)
Merge: 3c54a8416 7373fc595
Author: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
Date: Thu Feb 24 02:07:08 2022 +0000
Merge pull request #4230 from youngsofun/parquet2
remove buffer in block_reader of fuse store
Then set parallel_read_threads=4
will have a lot performance improve.
Waiting for AWS Rust SDK's support.
We can use https://github.com/rust-lang/rust-semverver to make our public API not changed by mistake.
This bug is found by @zhang2014
Reader
may read unnecessary data from underlying storage, as the size
of reading operation OpRead
is set to None
, which may cause the Backend
s to read all the data from the current position to EOF (worst-case)
In order to make opendal ready for v0.1, we need the following works:
NOTE: we are on the migration from databend common-dal2
to opendal
. So there are some old issues referred from databend
repo here. But all new opendal only issues should be submitted here.
tokio::fs
is just a wrapper of std::fs
, we can adopt aio
or io_uring
to get more throughput in our pure async framework.
fs
with the flag or feature flag?build
. (io_uring
requires Linux 5.6)opendal
or another new crate?Good news is that we already have examples
:D
When I use OpenDAL, still need to dive into the codes, like:
Delete
It would be great to better the example by the files name:
read.rs -- how to do read and range read
list.rs -- how to do list
delete.rs -- how to do delete
...
// endpoint:http://127.0.0.1:9900, bucket:testbucket, path:/admin/data/
let operator = Self::open(s3_endpoint, s3_bucket, aws_key_id, aws_secret_key).await?; --
let mode = operator.object(path).metadata().await?.mode();
2022-03-04T12:33:56.044309Z ERROR databend_query::servers::mysql::writers::query_result_writer: OnQuery Error: Code: 3006, displayText = Object { kind: ObjectNotExist, op: "stat", path: "/admin/data/", source: NotFound
Caused by:
NotFound }.
0: common_exception::exception_code::<impl common_exception::exception::ErrorCode>::DalError
at common/exception/src/exception_code.rs:36:66
1: common_exception::exception_into::<impl core::convert::From<opendal::error::Error> for common_exception::exception::ErrorCode>::from
at common/exception/src/exception_into.rs:235:9
2: <core::result::Result<T,F> as core::ops::try_trait::FromResidual<core::result::Result<core::convert::Infallible,E>>>::from_residual
at /rustc/4ce3749235fc31d15ebd444b038a9877e8c700d7/library/core/src/result.rs:2064:27
3: common_io::files::file_s3::S3File::list::{{closure}}
at common/io/src/files/file_s3.rs:87:20
4: <core::future::from_generator::GenFuture<T> as core::future::future::Future>::poll
at /rustc/4ce3749235fc31d15ebd444b038a9877e8c700d7/library/core/src/future/mod.rs:91:19
5: databend_query::interpreters::interpreter_copy::CopyInterpreter::list_files::{{closure}}
at query/src/interpreters/interpreter_copy.rs:74:81
6: <core::future::from_generator::GenFuture<T> as core::future::future::Future>::poll
at /rustc/4ce3749235fc31d15ebd444b038a9877e8c700d7/library/core/src/future/mod.rs:91:19
7: <databend_query::interpreters::interpreter_copy::CopyInterpreter as databend_query::interpreters::interpreter::Interpreter>::execute::{{closure}}::{{closure}}
at query/src/interpreters/interpreter_copy.rs:162:38
8: <core::future::from_generator::GenFuture<T> as core::future::future::Future>::poll
at /rustc/4ce3749235fc31d15ebd444b038a9877e8c700d7/library/core/src/future/mod.rs:91:19
9: <databend_query::interpreters::interpreter_copy::CopyInterpreter as databend_query::interpreters::interpreter::Interpreter>::execute::{{closure}}
If we want to get all the files under a path, but the path maybe a file, so we should check the object is DIR or FILE.
How we do it?
Summary
Add integration tests that target production services.
Make opendal work well with zstd
, zip
and so on.
For the first stage, we will focus on the read
part of compress which means
decompress
of a single compressed file like xxx.zstd
, yyy.gz
unarchive
of an archived file like aaa.tar
, bbb.zip
It's obvious that unarchive
support will depend on decompress
support.
As requested by our community, we will support zip
first to make sure our design is in the right direction.
compress
and archive
will be gated by carge features)The specific API is subject to RFC
After this feature supported, OpenDAL users can read compressed file like the following:
let o = op.object("abc.zip");
let meta = o.stat().await?;
let r = if o.actions().decompress() {
o.decompress()
} else {
o.reader()
}
// Read the data as usual.
OpenDAL will help address all problems around gz
, zstd
, zip
, xz
, everything works!
One more thing: docker image / CD / DVD are also archived files, so we can ...
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.