S3FS builds on aiobotocore to provide a convenient Python filesystem interface for S3.
View the documentation for s3fs.
S3 Filesystem
Home Page: http://s3fs.readthedocs.io/en/latest/
License: BSD 3-Clause "New" or "Revised" License
S3FS builds on aiobotocore to provide a convenient Python filesystem interface for S3.
View the documentation for s3fs.
import s3fs
from fastparquet import ParquetFile
from fastparquet import write
s3 = s3fs.core.S3FileSystem(key=GLOBAL_ACCESS_KEY,
secret=GLOBAL_SECRET_KEY)
#,client_kwargs={'config': Config(signature_version="s3v4")})
myopen = s3.open
noop = lambda x: True
write('marionte-test/temp_parq', df, file_scheme='hive', open_with=myopen, mkdirs=noop, compression='SNAPPY')
Error:
IOError Traceback (most recent call last)
<ipython-input-91-7e826066cd11> in <module>()
----> 1 write('marionte-test/temp_parq', df, file_scheme='hive', open_with=myopen, mkdirs=noop, compression='SNAPPY')
/Users/carlosrodrigues/anaconda/envs/testenv/lib/python2.7/site-packages/fastparquet/writer.pyc in write(filename, data, row_group_offsets, compression, file_scheme, open_with, mkdirs, has_nulls, write_index, partition_on, fixed_text, append, object_encoding, times)
779 with open_with(partname, 'wb') as f2:
780 rg = make_part_file(f2, data[start:end], fmd.schema,
--> 781 compression=compression)
782 for chunk in rg.columns:
783 chunk.file_path = part
/Users/carlosrodrigues/anaconda/envs/testenv/lib/python2.7/site-packages/s3fs/core.pyc in __exit__(self, *args)
1143
1144 def __exit__(self, *args):
-> 1145 self.close()
1146
1147
/Users/carlosrodrigues/anaconda/envs/testenv/lib/python2.7/site-packages/s3fs/core.pyc in close(self)
1115 Body=self.buffer.read(), ACL=self.acl)
1116 except (ClientError, ParamValidationError) as e:
-> 1117 raise IOError('Write failed: %s' % self.path, e)
1118 self.s3.invalidate_cache(self.path)
1119 self.closed = True
IOError: [Errno Write failed: marionte-test/temp_parq/part.0.parquet] An error occurred (InvalidRequest) when calling the PutObject operation: The authorization mechanism you have provided is not supported. Please use AWS4-HMAC-SHA256.
Usually in boto3 client I would solve it by passing the region or the signature_version.
s3fs documentation refer about -> config_kwargs : dict of parameters passed to botocore.client.Config as an argument in S3FileSystem which would be useful to solve this problem
http://s3fs.readthedocs.io/en/latest/api.html
However the argument is not present, I tried to use client_kwargs however it does not let me pass my own conf...
Any suggestion so solve this?
I'm trying out a little word count example on a bunch of (144 to be exact) GZIP newline-delimited JSON files on S3 in a private S3 bucket, and I've been running into SSL errors like the one below. These errors are intermittent, unfortunately, and happen maybe 1/4 of the time.
Without even trying to do word counting, I get this just by counting lines:
g = (db.read_text('s3://path/to/input/*/*.gz')
.count())
g.compute()
I haven't gotten too far with tracking it down, but it seems possibly related to issues with sharing SSL connections across threads (as discussed here). It may also be related to this issue with thread safety in requests
and this more specific one in conda's s3 channel support.
Any ideas? Maybe dask
needs to use different boto clients per thread?
This very well might be an upstream issue with botocore
, requests
, etc.
Versions and such:
Traceback:
---------------------------------------------------------------------------
SSLError Traceback (most recent call last)
<ipython-input-39-db600568b11b> in <module>()
1 g = (db.read_text('s3://path/to/input/*/*.gz')
2 .count())
----> 3 g.compute()
/mnt/mheilman/miniconda3/envs/3.4/lib/python3.4/site-packages/dask/base.py in compute(self, **kwargs)
84 Extra keywords to forward to the scheduler ``get`` function.
85 """
---> 86 return compute(self, **kwargs)[0]
87
88 @classmethod
/mnt/mheilman/miniconda3/envs/3.4/lib/python3.4/site-packages/dask/base.py in compute(*args, **kwargs)
177 dsk = merge(var.dask for var in variables)
178 keys = [var._keys() for var in variables]
--> 179 results = get(dsk, keys, **kwargs)
180
181 results_iter = iter(results)
/mnt/mheilman/miniconda3/envs/3.4/lib/python3.4/site-packages/dask/multiprocessing.py in get(dsk, keys, num_workers, func_loads, func_dumps, optimize_graph, **kwargs)
77 # Run
78 result = get_async(apply_async, len(pool._pool), dsk3, keys,
---> 79 queue=queue, get_id=_process_get_id, **kwargs)
80 finally:
81 if cleanup:
/mnt/mheilman/miniconda3/envs/3.4/lib/python3.4/site-packages/dask/async.py in get_async(apply_async, num_workers, dsk, result, cache, queue, get_id, raise_on_exception, rerun_exceptions_locally, callbacks, **kwargs)
482 _execute_task(task, data) # Re-execute locally
483 else:
--> 484 raise(remote_exception(res, tb))
485 state['cache'][key] = res
486 finish_task(dsk, key, state, results, keyorder.get)
SSLError: [SSL: DECRYPTION_FAILED_OR_BAD_RECORD_MAC] decryption failed or bad record mac (_ssl.c:1748)
Traceback
---------
File "/mnt/mheilman/miniconda3/envs/3.4/lib/python3.4/site-packages/dask/async.py", line 267, in execute_task
result = _execute_task(task, data)
File "/mnt/mheilman/miniconda3/envs/3.4/lib/python3.4/site-packages/dask/async.py", line 248, in _execute_task
args2 = [_execute_task(a, cache) for a in args]
File "/mnt/mheilman/miniconda3/envs/3.4/lib/python3.4/site-packages/dask/async.py", line 248, in <listcomp>
args2 = [_execute_task(a, cache) for a in args]
File "/mnt/mheilman/miniconda3/envs/3.4/lib/python3.4/site-packages/dask/async.py", line 248, in _execute_task
args2 = [_execute_task(a, cache) for a in args]
File "/mnt/mheilman/miniconda3/envs/3.4/lib/python3.4/site-packages/dask/async.py", line 248, in <listcomp>
args2 = [_execute_task(a, cache) for a in args]
File "/mnt/mheilman/miniconda3/envs/3.4/lib/python3.4/site-packages/dask/async.py", line 245, in _execute_task
return [_execute_task(a, cache) for a in arg]
File "/mnt/mheilman/miniconda3/envs/3.4/lib/python3.4/site-packages/dask/async.py", line 245, in <listcomp>
return [_execute_task(a, cache) for a in arg]
File "/mnt/mheilman/miniconda3/envs/3.4/lib/python3.4/site-packages/dask/async.py", line 248, in _execute_task
args2 = [_execute_task(a, cache) for a in args]
File "/mnt/mheilman/miniconda3/envs/3.4/lib/python3.4/site-packages/dask/async.py", line 248, in <listcomp>
args2 = [_execute_task(a, cache) for a in args]
File "/mnt/mheilman/miniconda3/envs/3.4/lib/python3.4/site-packages/dask/async.py", line 249, in _execute_task
return func(*args2)
File "/mnt/mheilman/miniconda3/envs/3.4/lib/python3.4/site-packages/dask/bytes/s3.py", line 103, in s3_open_file
return s3.open(path, mode='rb')
File "/mnt/mheilman/miniconda3/envs/3.4/lib/python3.4/site-packages/s3fs/core.py", line 212, in open
return S3File(self, path, mode, block_size=block_size)
File "/mnt/mheilman/miniconda3/envs/3.4/lib/python3.4/site-packages/s3fs/core.py", line 680, in __init__
self.size = self.info()['Size']
File "/mnt/mheilman/miniconda3/envs/3.4/lib/python3.4/site-packages/s3fs/core.py", line 686, in info
return self.s3.info(self.path)
File "/mnt/mheilman/miniconda3/envs/3.4/lib/python3.4/site-packages/s3fs/core.py", line 303, in info
files = self._lsdir(parent, refresh=refresh)
File "/mnt/mheilman/miniconda3/envs/3.4/lib/python3.4/site-packages/s3fs/core.py", line 226, in _lsdir
for i in it:
File "/mnt/mheilman/miniconda3/envs/3.4/lib/python3.4/site-packages/botocore/paginate.py", line 102, in __iter__
response = self._make_request(current_kwargs)
File "/mnt/mheilman/miniconda3/envs/3.4/lib/python3.4/site-packages/botocore/paginate.py", line 174, in _make_request
return self._method(**current_kwargs)
File "/mnt/mheilman/miniconda3/envs/3.4/lib/python3.4/site-packages/botocore/client.py", line 262, in _api_call
return self._make_api_call(operation_name, kwargs)
File "/mnt/mheilman/miniconda3/envs/3.4/lib/python3.4/site-packages/botocore/client.py", line 541, in _make_api_call
operation_model, request_dict)
File "/mnt/mheilman/miniconda3/envs/3.4/lib/python3.4/site-packages/botocore/endpoint.py", line 117, in make_request
return self._send_request(request_dict, operation_model)
File "/mnt/mheilman/miniconda3/envs/3.4/lib/python3.4/site-packages/botocore/endpoint.py", line 146, in _send_request
success_response, exception):
File "/mnt/mheilman/miniconda3/envs/3.4/lib/python3.4/site-packages/botocore/endpoint.py", line 219, in _needs_retry
caught_exception=caught_exception)
File "/mnt/mheilman/miniconda3/envs/3.4/lib/python3.4/site-packages/botocore/hooks.py", line 227, in emit
return self._emit(event_name, kwargs)
File "/mnt/mheilman/miniconda3/envs/3.4/lib/python3.4/site-packages/botocore/hooks.py", line 210, in _emit
response = handler(**kwargs)
File "/mnt/mheilman/miniconda3/envs/3.4/lib/python3.4/site-packages/botocore/retryhandler.py", line 183, in __call__
if self._checker(attempts, response, caught_exception):
File "/mnt/mheilman/miniconda3/envs/3.4/lib/python3.4/site-packages/botocore/retryhandler.py", line 251, in __call__
caught_exception)
File "/mnt/mheilman/miniconda3/envs/3.4/lib/python3.4/site-packages/botocore/retryhandler.py", line 266, in _should_retry
return self._checker(attempt_number, response, caught_exception)
File "/mnt/mheilman/miniconda3/envs/3.4/lib/python3.4/site-packages/botocore/retryhandler.py", line 314, in __call__
caught_exception)
File "/mnt/mheilman/miniconda3/envs/3.4/lib/python3.4/site-packages/botocore/retryhandler.py", line 223, in __call__
attempt_number, caught_exception)
File "/mnt/mheilman/miniconda3/envs/3.4/lib/python3.4/site-packages/botocore/retryhandler.py", line 356, in _check_caught_exception
raise caught_exception
File "/mnt/mheilman/miniconda3/envs/3.4/lib/python3.4/site-packages/botocore/endpoint.py", line 174, in _get_response
proxies=self.proxies, timeout=self.timeout)
File "/mnt/mheilman/miniconda3/envs/3.4/lib/python3.4/site-packages/botocore/vendored/requests/sessions.py", line 605, in send
r.content
File "/mnt/mheilman/miniconda3/envs/3.4/lib/python3.4/site-packages/botocore/vendored/requests/models.py", line 750, in content
self._content = bytes().join(self.iter_content(CONTENT_CHUNK_SIZE)) or bytes()
File "/mnt/mheilman/miniconda3/envs/3.4/lib/python3.4/site-packages/botocore/vendored/requests/models.py", line 673, in generate
for chunk in self.raw.stream(chunk_size, decode_content=True):
File "/mnt/mheilman/miniconda3/envs/3.4/lib/python3.4/site-packages/botocore/vendored/requests/packages/urllib3/response.py", line 303, in stream
for line in self.read_chunked(amt, decode_content=decode_content):
File "/mnt/mheilman/miniconda3/envs/3.4/lib/python3.4/site-packages/botocore/vendored/requests/packages/urllib3/response.py", line 447, in read_chunked
self._update_chunk_length()
File "/mnt/mheilman/miniconda3/envs/3.4/lib/python3.4/site-packages/botocore/vendored/requests/packages/urllib3/response.py", line 394, in _update_chunk_length
line = self._fp.fp.readline()
File "/mnt/mheilman/miniconda3/envs/3.4/lib/python3.4/socket.py", line 378, in readinto
return self._sock.recv_into(b)
File "/mnt/mheilman/miniconda3/envs/3.4/lib/python3.4/ssl.py", line 748, in recv_into
return self.read(nbytes, buffer)
File "/mnt/mheilman/miniconda3/envs/3.4/lib/python3.4/ssl.py", line 620, in read
v = self._sslobj.read(len, buffer)
It would be nice to be able to set the public access control level to a bucket or file. When making a bucket with boto3 we do the following:
client.create_bucket(Bucket=test_bucket_name, ACL='public-read-write')
It's not clear how to cleanly map these out to a general chmod
api though.
Any number of "file parameters" could be included in the constructor, for use in open() and other methods by default within the instance, but over-ridable in the call; or there could be just the specific SSE-related ones.
Currently, the default buffer size used for an open S3File is 5MB, hard-coded.
Suggest that there should be a parameter to S3FileSystems, default_block_size=
, which is then used for subsequent open calls, unless otherwise specified. This allows, for example, dask to pass a buffer size using storage_options
(i.e., keyword parameters in the top-level to/from functions), while leaving calls to open()
unchanged.
So I create an S3FileSystem locally with my authentication. I then use this within some object that I pickle up and send to another machine. Some questions:
I was perusing the code a little more, and I wanted to bring this up, because people will run into it. In the S3Map class, you provide a __setitem__
. S3 only has eventual consistency guarantees on overwrites. Anecdotally, I've heard that this can be up to 48 hours. If people start to try to use this as a true, consistent key-value store in a production setting, they're going to have a bad time. I suppose removing this is going to be out of the question, so I would at least provide a very visible warning somewhere.
Q: What data consistency model does Amazon S3 employ?
Amazon S3 buckets in all Regions provide read-after-write consistency for PUTS of new objects and eventual consistency for overwrite PUTS and DELETES.
I think that if given no information from the user that we should try anon=False
first and then fall back on anon=True
if we fail to connect.
Nice to have:
The functionality provided by
http://boto3.readthedocs.org/en/latest/reference/services/s3.html#S3.Client.upload_part_copy could allow for appending to existing keys and/or multiple flush with individual block sizes <5MB.
Writing with server-side encryption with files large enough to trigger multipart upload fails:
s3 = s3fs.S3FileSystem(s3_additional_kwargs = {'ServerSideEncryption': 'AES256'})
with s3.open('example-bucket/test.file', 'wb') as f:
f.write(b'a' * 10**7) # force multipart upload
Traceback (most recent call last):
File "/Users/del82/Documents/git/s3fs/s3fs/core.py", line 1164, in flush
Body=self.buffer.read(), Key=self.key)
File "/Users/del82/Documents/git/s3fs/s3fs/core.py", line 941, in _call_s3
return self.s3._call_s3(method, self.s3_additional_kwargs, *kwarglist, **kwargs)
File "/Users/del82/Documents/git/s3fs/s3fs/core.py", line 175, in _call_s3
return method(**additional_kwargs)
File "/Users/del82/anaconda3/envs/dask/lib/python3.6/site-packages/botocore/client.py", line 253, in _api_call
return self._make_api_call(operation_name, kwargs)
File "/Users/del82/anaconda3/envs/dask/lib/python3.6/site-packages/botocore/client.py", line 531, in _make_api_call
api_params, operation_model, context=request_context)
File "/Users/del82/anaconda3/envs/dask/lib/python3.6/site-packages/botocore/client.py", line 586, in _convert_to_request_dict
api_params, operation_model)
File "/Users/del82/anaconda3/envs/dask/lib/python3.6/site-packages/botocore/validate.py", line 291, in serialize_to_request
raise ParamValidationError(report=report.generate_report())
botocore.exceptions.ParamValidationError: Parameter validation failed:
Unknown parameter in input: "ServerSideEncryption", must be one of: Body, Bucket, ContentLength, ContentMD5, Key, PartNumber, UploadId, SSECustomerAlgorithm, SSECustomerKey, SSECustomerKeyMD5, RequestPayer
(I recognize that this may not be the preferred way to do this, but it mirrors the way that dask does it, so failing to handle this case means not being able to write large objects to S3 using SSE from dask.)
Multipart uploads use not only botocore.S3.client.put_object
but also client.create_multipart_upload
, client.upload_part
, etc. which don't accept ServerSideEncryption
as a kwarg, but it's passed to them and results in the above exception.
S3 allows creation of temporary HTTP links to keys. Expose this functionality, so people can upload data and easily generate that link for it.
Is there a regular release schedule for this project, or some planned milestone for upcoming releases? It would be nice to have a rough idea for when a new version might be published.
For context I am interested in the fix from #98. For now I am specifying the git repo in my requirements file so I'm not blocked nor in a particular hurry.
Boto3 now allows us to tag s3 objects with key/value pairs. These don't have some of the limitations that storing things in metadata have like requiring us to rewrite the file in s3 just to update the metadata which is problematic for large files.
http://boto3.readthedocs.io/en/latest/reference/services/s3.html#S3.Client.put_object_tagging
http://boto3.readthedocs.io/en/latest/reference/services/s3.html#S3.Client.get_object_tagging
Here are some of the limitations of tagging:
https://docs.aws.amazon.com/awsaccountbilling/latest/aboutv2/allocation-tag-restrictions.html
S3 timout on setting the attribute
s3.setxattr('the.bucket/' + path1, a='b')
attribute was set on inspection however:
s3.metadata('the.bucket/' + path1)
{'a': 'b'}
file size is 128MB
stack trace
TypeError Traceback (most recent call last)
/home/brian/py_env2/lib/python3.5/site-packages/botocore/vendored/requests/packages/urllib3/connectionpool.py in _make_request(self, conn, method, url, timeout, **httplib_request_kw)
371 try: # Python 2.7, use buffering of HTTP responses
--> 372 httplib_response = conn.getresponse(buffering=True)
373 except TypeError: # Python 2.6 and older
TypeError: getresponse() got an unexpected keyword argument 'buffering'
During handling of the above exception, another exception occurred:
timeout Traceback (most recent call last)
/home/brian/py_env2/lib/python3.5/site-packages/botocore/vendored/requests/packages/urllib3/connectionpool.py in _make_request(self, conn, method, url, timeout, **httplib_request_kw)
373 except TypeError: # Python 2.6 and older
--> 374 httplib_response = conn.getresponse()
375 except (SocketTimeout, BaseSSLError, SocketError) as e:
/usr/lib/python3.5/http/client.py in getresponse(self)
1196 try:
-> 1197 response.begin()
1198 except ConnectionError:
/usr/lib/python3.5/http/client.py in begin(self)
296 while True:
--> 297 version, status, reason = self._read_status()
298 if status != CONTINUE:
/usr/lib/python3.5/http/client.py in _read_status(self)
257 def _read_status(self):
--> 258 line = str(self.fp.readline(_MAXLINE + 1), "iso-8859-1")
259 if len(line) > _MAXLINE:
/usr/lib/python3.5/socket.py in readinto(self, b)
574 try:
--> 575 return self._sock.recv_into(b)
576 except timeout:
timeout: timed out
During handling of the above exception, another exception occurred:
ReadTimeoutError Traceback (most recent call last)
/home/brian/py_env2/lib/python3.5/site-packages/botocore/vendored/requests/adapters.py in send(self, request, stream, timeout, verify, cert, proxies)
369 retries=self.max_retries,
--> 370 timeout=timeout
371 )
/home/brian/py_env2/lib/python3.5/site-packages/botocore/vendored/requests/packages/urllib3/connectionpool.py in urlopen(self, method, url, body, headers, retries, redirect, assert_same_host, timeout, pool_timeout, release_conn, **response_kw)
596 retries = retries.increment(method, url, error=e, _pool=self,
--> 597 _stacktrace=sys.exc_info()[2])
598 retries.sleep()
/home/brian/py_env2/lib/python3.5/site-packages/botocore/vendored/requests/packages/urllib3/util/retry.py in increment(self, method, url, response, error, _pool, _stacktrace)
244 if read is False:
--> 245 raise six.reraise(type(error), error, _stacktrace)
246 elif read is not None:
/home/brian/py_env2/lib/python3.5/site-packages/botocore/vendored/requests/packages/urllib3/packages/six.py in reraise(tp, value, tb)
309 raise value.with_traceback(tb)
--> 310 raise value
311
/home/brian/py_env2/lib/python3.5/site-packages/botocore/vendored/requests/packages/urllib3/connectionpool.py in urlopen(self, method, url, body, headers, retries, redirect, assert_same_host, timeout, pool_timeout, release_conn, **response_kw)
543 timeout=timeout_obj,
--> 544 body=body, headers=headers)
545
/home/brian/py_env2/lib/python3.5/site-packages/botocore/vendored/requests/packages/urllib3/connectionpool.py in _make_request(self, conn, method, url, timeout, **httplib_request_kw)
375 except (SocketTimeout, BaseSSLError, SocketError) as e:
--> 376 self._raise_timeout(err=e, url=url, timeout_value=read_timeout)
377 raise
/home/brian/py_env2/lib/python3.5/site-packages/botocore/vendored/requests/packages/urllib3/connectionpool.py in _raise_timeout(self, err, url, timeout_value)
303 if isinstance(err, SocketTimeout):
--> 304 raise ReadTimeoutError(self, url, "Read timed out. (read timeout=%s)" % timeout_value)
305
ReadTimeoutError: HTTPConnectionPool(host='172.29.34.220', port=9000): Read timed out. (read timeout=15)
During handling of the above exception, another exception occurred:
ReadTimeout Traceback (most recent call last)
in ()
----> 1 s3.setxattr('the.bucket/' + path1, a='b')
/home/brian/py_env2/lib/python3.5/site-packages/s3fs/core.py in setxattr(self, path, **kw_args)
431 Key=key,
432 Metadata=metadata,
--> 433 MetadataDirective='REPLACE')
434
435 # refresh metadata
/home/brian/py_env2/lib/python3.5/site-packages/botocore/client.py in _api_call(self, *args, **kwargs)
251 "%s() only accepts keyword arguments." % py_operation_name)
252 # The "self" in this scope is referring to the BaseClient.
--> 253 return self._make_api_call(operation_name, kwargs)
254
255 _api_call.name = str(py_operation_name)
/home/brian/py_env2/lib/python3.5/site-packages/botocore/client.py in _make_api_call(self, operation_name, api_params)
528 else:
529 http, parsed_response = self._endpoint.make_request(
--> 530 operation_model, request_dict)
531
532 self.meta.events.emit(
/home/brian/py_env2/lib/python3.5/site-packages/botocore/endpoint.py in make_request(self, operation_model, request_dict)
139 logger.debug("Making request for %s (verify_ssl=%s) with params: %s",
140 operation_model, self.verify, request_dict)
--> 141 return self._send_request(request_dict, operation_model)
142
143 def create_request(self, params, operation_model=None):
/home/brian/py_env2/lib/python3.5/site-packages/botocore/endpoint.py in _send_request(self, request_dict, operation_model)
168 request, operation_model, attempts)
169 while self._needs_retry(attempts, operation_model, request_dict,
--> 170 success_response, exception):
171 attempts += 1
172 # If there is a stream associated with the request, we need
/home/brian/py_env2/lib/python3.5/site-packages/botocore/endpoint.py in _needs_retry(self, attempts, operation_model, request_dict, response, caught_exception)
247 event_name, response=response, endpoint=self,
248 operation=operation_model, attempts=attempts,
--> 249 caught_exception=caught_exception, request_dict=request_dict)
250 handler_response = first_non_none_response(responses)
251 if handler_response is None:
/home/brian/py_env2/lib/python3.5/site-packages/botocore/hooks.py in emit(self, event_name, **kwargs)
225 handlers.
226 """
--> 227 return self._emit(event_name, kwargs)
228
229 def emit_until_response(self, event_name, **kwargs):
/home/brian/py_env2/lib/python3.5/site-packages/botocore/hooks.py in _emit(self, event_name, kwargs, stop_on_response)
208 for handler in handlers_to_call:
209 logger.debug('Event %s: calling handler %s', event_name, handler)
--> 210 response = handler(**kwargs)
211 responses.append((handler, response))
212 if stop_on_response and response is not None:
/home/brian/py_env2/lib/python3.5/site-packages/botocore/retryhandler.py in call(self, attempts, response, caught_exception, **kwargs)
181
182 """
--> 183 if self._checker(attempts, response, caught_exception):
184 result = self._action(attempts=attempts)
185 logger.debug("Retry needed, action of: %s", result)
/home/brian/py_env2/lib/python3.5/site-packages/botocore/retryhandler.py in call(self, attempt_number, response, caught_exception)
249 def call(self, attempt_number, response, caught_exception):
250 should_retry = self._should_retry(attempt_number, response,
--> 251 caught_exception)
252 if should_retry:
253 if attempt_number >= self._max_attempts:
/home/brian/py_env2/lib/python3.5/site-packages/botocore/retryhandler.py in _should_retry(self, attempt_number, response, caught_exception)
275 # If we've exceeded the max attempts we just let the exception
276 # propogate if one has occurred.
--> 277 return self._checker(attempt_number, response, caught_exception)
278
279
/home/brian/py_env2/lib/python3.5/site-packages/botocore/retryhandler.py in call(self, attempt_number, response, caught_exception)
315 for checker in self._checkers:
316 checker_response = checker(attempt_number, response,
--> 317 caught_exception)
318 if checker_response:
319 return checker_response
/home/brian/py_env2/lib/python3.5/site-packages/botocore/retryhandler.py in call(self, attempt_number, response, caught_exception)
221 elif caught_exception is not None:
222 return self._check_caught_exception(
--> 223 attempt_number, caught_exception)
224 else:
225 raise ValueError("Both response and caught_exception are None.")
/home/brian/py_env2/lib/python3.5/site-packages/botocore/retryhandler.py in _check_caught_exception(self, attempt_number, caught_exception)
357 # the MaxAttemptsDecorator is not interested in retrying the exception
358 # then this exception just propogates out past the retry code.
--> 359 raise caught_exception
/home/brian/py_env2/lib/python3.5/site-packages/botocore/endpoint.py in _get_response(self, request, operation_model, attempts)
202 request, verify=self.verify,
203 stream=operation_model.has_streaming_output,
--> 204 proxies=self.proxies, timeout=self.timeout)
205 except ConnectionError as e:
206 # For a connection error, if it looks like it's a DNS
/home/brian/py_env2/lib/python3.5/site-packages/botocore/vendored/requests/sessions.py in send(self, request, **kwargs)
571
572 # Send the request
--> 573 r = adapter.send(request, **kwargs)
574
575 # Total elapsed time of the request (approximately)
/home/brian/py_env2/lib/python3.5/site-packages/botocore/vendored/requests/adapters.py in send(self, request, stream, timeout, verify, cert, proxies)
431 raise SSLError(e, request=request)
432 elif isinstance(e, ReadTimeoutError):
--> 433 raise ReadTimeout(e, request=request)
434 else:
435 raise
ReadTimeout: HTTPConnectionPool(host='172.29.34.220', port=9000): Read timed out. (read timeout=15)
In [7]: import s3fs
In [8]: s3fs.__version__
Out[8]: '0.1.1'
In [5]: fs = s3fs.S3FileSystem(anon=True)
In [6]: fs.open('nyqpug/asdf.csv')
---------------------------------------------------------------------------
ClientError Traceback (most recent call last)
/Users/jreback/miniconda3/envs/pandas/lib/python3.6/site-packages/s3fs/core.py in info(self, path, refresh, **kwargs)
393 out = self._call_s3(self.s3.head_object,
--> 394 kwargs, Bucket=bucket, Key=key, **self.req_kw)
395 out = {'ETag': out['ETag'], 'Key': '/'.join([bucket, key]),
/Users/jreback/miniconda3/envs/pandas/lib/python3.6/site-packages/s3fs/core.py in _call_s3(self, method, *akwarglist, **kwargs)
167 additional_kwargs.update(kwargs)
--> 168 return method(**additional_kwargs)
169
/Users/jreback/miniconda3/envs/pandas/lib/python3.6/site-packages/botocore/client.py in _api_call(self, *args, **kwargs)
252 # The "self" in this scope is referring to the BaseClient.
--> 253 return self._make_api_call(operation_name, kwargs)
254
/Users/jreback/miniconda3/envs/pandas/lib/python3.6/site-packages/botocore/client.py in _make_api_call(self, operation_name, api_params)
543 error_class = self.exceptions.from_code(error_code)
--> 544 raise error_class(parsed_response, operation_name)
545 else:
ClientError: An error occurred (404) when calling the HeadObject operation: Not Found
During handling of the above exception, another exception occurred:
FileNotFoundError Traceback (most recent call last)
<ipython-input-6-7628c3834aad> in <module>()
----> 1 fs.open('nyqpug/asdf.csv')
/Users/jreback/miniconda3/envs/pandas/lib/python3.6/site-packages/s3fs/core.py in open(self, path, mode, block_size, acl, fill_cache, **kwargs)
289 " and manage bytes" % (mode[0] + 'b'))
290 return S3File(self, path, mode, block_size=block_size, acl=acl,
--> 291 fill_cache=fill_cache, s3_additional_kwargs=kwargs)
292
293 def _lsdir(self, path, refresh=False):
/Users/jreback/miniconda3/envs/pandas/lib/python3.6/site-packages/s3fs/core.py in __init__(self, s3, path, mode, block_size, acl, fill_cache, s3_additional_kwargs)
927 else:
928 try:
--> 929 self.size = self.info()['Size']
930 except (ClientError, ParamValidationError):
931 raise IOError("File not accessible", path)
/Users/jreback/miniconda3/envs/pandas/lib/python3.6/site-packages/s3fs/core.py in info(self, **kwargs)
936 def info(self, **kwargs):
937 """ File information about this path """
--> 938 return self.s3.info(self.path, **kwargs)
939
940 def metadata(self, refresh=False, **kwargs):
/Users/jreback/miniconda3/envs/pandas/lib/python3.6/site-packages/s3fs/core.py in info(self, path, refresh, **kwargs)
398 return out
399 except (ClientError, ParamValidationError):
--> 400 raise FileNotFoundError(path)
401
402 _metadata_cache = {}
FileNotFoundError: nyqpug/asdf.csv
This shows a resource not closed warning on python 3.
Counterpart to fsspec/gcsfs#20
Note that for S3, the ETag is an MD5 hash for simple uploads, and an MD5 hash of concatenated MD5 hashes in the case of multi-part uploads https://stackoverflow.com/questions/6591047/etag-definition-changed-in-amazon-s3
I've tried the following:
import os
import s3fs
fs = s3fs.S3FileSystem() #also tried passing use_ssl=False here but no love
# also tried like this where I pass read_timeout here without luck: s3fs.S3FileSystem(config_kwargs={'read_timeout':3600})
for f in files:
print(f'Copying bucket1/{f} to bucket2/{f}')
fs.copy(f'bucket1/{f}', f'bucket2/{f}', config={'read_timeout':3600})
print('Done', flush=True)
Get the following error:
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
~/notebook-env/lib/python3.6/site-packages/botocore/vendored/requests/packages/urllib3/connectionpool.py in _make_request(self, conn, method, url, timeout, **httplib_request_kw)
371 try: # Python 2.7, use buffering of HTTP responses
--> 372 httplib_response = conn.getresponse(buffering=True)
373 except TypeError: # Python 2.6 and older
TypeError: getresponse() got an unexpected keyword argument 'buffering'
During handling of the above exception, another exception occurred:
timeout Traceback (most recent call last)
~/notebook-env/lib/python3.6/site-packages/botocore/vendored/requests/packages/urllib3/connectionpool.py in _make_request(self, conn, method, url, timeout, **httplib_request_kw)
373 except TypeError: # Python 2.6 and older
--> 374 httplib_response = conn.getresponse()
375 except (SocketTimeout, BaseSSLError, SocketError) as e:
/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/http/client.py in getresponse(self)
1330 try:
-> 1331 response.begin()
1332 except ConnectionError:
/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/http/client.py in begin(self)
296 while True:
--> 297 version, status, reason = self._read_status()
298 if status != CONTINUE:
/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/http/client.py in _read_status(self)
257 def _read_status(self):
--> 258 line = str(self.fp.readline(_MAXLINE + 1), "iso-8859-1")
259 if len(line) > _MAXLINE:
/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/socket.py in readinto(self, b)
585 try:
--> 586 return self._sock.recv_into(b)
587 except timeout:
timeout: timed out
During handling of the above exception, another exception occurred:
ReadTimeoutError Traceback (most recent call last)
~/notebook-env/lib/python3.6/site-packages/botocore/vendored/requests/adapters.py in send(self, request, stream, timeout, verify, cert, proxies)
369 retries=self.max_retries,
--> 370 timeout=timeout
371 )
~/notebook-env/lib/python3.6/site-packages/botocore/vendored/requests/packages/urllib3/connectionpool.py in urlopen(self, method, url, body, headers, retries, redirect, assert_same_host, timeout, pool_timeout, release_conn, **response_kw)
596 retries = retries.increment(method, url, error=e, _pool=self,
--> 597 _stacktrace=sys.exc_info()[2])
598 retries.sleep()
~/notebook-env/lib/python3.6/site-packages/botocore/vendored/requests/packages/urllib3/util/retry.py in increment(self, method, url, response, error, _pool, _stacktrace)
244 if read is False:
--> 245 raise six.reraise(type(error), error, _stacktrace)
246 elif read is not None:
~/notebook-env/lib/python3.6/site-packages/botocore/vendored/requests/packages/urllib3/packages/six.py in reraise(tp, value, tb)
309 raise value.with_traceback(tb)
--> 310 raise value
311
~/notebook-env/lib/python3.6/site-packages/botocore/vendored/requests/packages/urllib3/connectionpool.py in urlopen(self, method, url, body, headers, retries, redirect, assert_same_host, timeout, pool_timeout, release_conn, **response_kw)
543 timeout=timeout_obj,
--> 544 body=body, headers=headers)
545
~/notebook-env/lib/python3.6/site-packages/botocore/vendored/requests/packages/urllib3/connectionpool.py in _make_request(self, conn, method, url, timeout, **httplib_request_kw)
375 except (SocketTimeout, BaseSSLError, SocketError) as e:
--> 376 self._raise_timeout(err=e, url=url, timeout_value=read_timeout)
377 raise
~/notebook-env/lib/python3.6/site-packages/botocore/vendored/requests/packages/urllib3/connectionpool.py in _raise_timeout(self, err, url, timeout_value)
303 if isinstance(err, SocketTimeout):
--> 304 raise ReadTimeoutError(self, url, "Read timed out. (read timeout=%s)" % timeout_value)
305
ReadTimeoutError: HTTPConnectionPool(host='s3.amazonaws.com', port=80): Read timed out. (read timeout=15)
During handling of the above exception, another exception occurred:
ReadTimeout Traceback (most recent call last)
<ipython-input-3-0844ed2d9b82> in <module>()
3 for f in files:
4 print(f'bucket1/{f} to bucket2/{f}')
----> 5 fs.copy(f'bucket1/{f}', f'bucket2/{f}')
6 print('Done', flush=True)
~/notebook-env/lib/python3.6/site-packages/s3fs/core.py in copy(self, path1, path2, **kwargs)
686 self.s3.copy_object,
687 kwargs,
--> 688 Bucket=buc2, Key=key2, CopySource='/'.join([buc1, key1])
689 )
690 except (ClientError, ParamValidationError):
~/notebook-env/lib/python3.6/site-packages/s3fs/core.py in _call_s3(self, method, *akwarglist, **kwargs)
166 # Add the normal kwargs in
167 additional_kwargs.update(kwargs)
--> 168 return method(**additional_kwargs)
169
170 @classmethod
~/notebook-env/lib/python3.6/site-packages/botocore/client.py in _api_call(self, *args, **kwargs)
308 "%s() only accepts keyword arguments." % py_operation_name)
309 # The "self" in this scope is referring to the BaseClient.
--> 310 return self._make_api_call(operation_name, kwargs)
311
312 _api_call.__name__ = str(py_operation_name)
~/notebook-env/lib/python3.6/site-packages/botocore/client.py in _make_api_call(self, operation_name, api_params)
584 else:
585 http, parsed_response = self._endpoint.make_request(
--> 586 operation_model, request_dict)
587
588 self.meta.events.emit(
~/notebook-env/lib/python3.6/site-packages/botocore/endpoint.py in make_request(self, operation_model, request_dict)
139 logger.debug("Making request for %s (verify_ssl=%s) with params: %s",
140 operation_model, self.verify, request_dict)
--> 141 return self._send_request(request_dict, operation_model)
142
143 def create_request(self, params, operation_model=None):
~/notebook-env/lib/python3.6/site-packages/botocore/endpoint.py in _send_request(self, request_dict, operation_model)
168 request, operation_model, attempts)
169 while self._needs_retry(attempts, operation_model, request_dict,
--> 170 success_response, exception):
171 attempts += 1
172 # If there is a stream associated with the request, we need
~/notebook-env/lib/python3.6/site-packages/botocore/endpoint.py in _needs_retry(self, attempts, operation_model, request_dict, response, caught_exception)
247 event_name, response=response, endpoint=self,
248 operation=operation_model, attempts=attempts,
--> 249 caught_exception=caught_exception, request_dict=request_dict)
250 handler_response = first_non_none_response(responses)
251 if handler_response is None:
~/notebook-env/lib/python3.6/site-packages/botocore/hooks.py in emit(self, event_name, **kwargs)
225 handlers.
226 """
--> 227 return self._emit(event_name, kwargs)
228
229 def emit_until_response(self, event_name, **kwargs):
~/notebook-env/lib/python3.6/site-packages/botocore/hooks.py in _emit(self, event_name, kwargs, stop_on_response)
208 for handler in handlers_to_call:
209 logger.debug('Event %s: calling handler %s', event_name, handler)
--> 210 response = handler(**kwargs)
211 responses.append((handler, response))
212 if stop_on_response and response is not None:
~/notebook-env/lib/python3.6/site-packages/botocore/retryhandler.py in __call__(self, attempts, response, caught_exception, **kwargs)
181
182 """
--> 183 if self._checker(attempts, response, caught_exception):
184 result = self._action(attempts=attempts)
185 logger.debug("Retry needed, action of: %s", result)
~/notebook-env/lib/python3.6/site-packages/botocore/retryhandler.py in __call__(self, attempt_number, response, caught_exception)
249 def __call__(self, attempt_number, response, caught_exception):
250 should_retry = self._should_retry(attempt_number, response,
--> 251 caught_exception)
252 if should_retry:
253 if attempt_number >= self._max_attempts:
~/notebook-env/lib/python3.6/site-packages/botocore/retryhandler.py in _should_retry(self, attempt_number, response, caught_exception)
275 # If we've exceeded the max attempts we just let the exception
276 # propogate if one has occurred.
--> 277 return self._checker(attempt_number, response, caught_exception)
278
279
~/notebook-env/lib/python3.6/site-packages/botocore/retryhandler.py in __call__(self, attempt_number, response, caught_exception)
315 for checker in self._checkers:
316 checker_response = checker(attempt_number, response,
--> 317 caught_exception)
318 if checker_response:
319 return checker_response
~/notebook-env/lib/python3.6/site-packages/botocore/retryhandler.py in __call__(self, attempt_number, response, caught_exception)
221 elif caught_exception is not None:
222 return self._check_caught_exception(
--> 223 attempt_number, caught_exception)
224 else:
225 raise ValueError("Both response and caught_exception are None.")
~/notebook-env/lib/python3.6/site-packages/botocore/retryhandler.py in _check_caught_exception(self, attempt_number, caught_exception)
357 # the MaxAttemptsDecorator is not interested in retrying the exception
358 # then this exception just propogates out past the retry code.
--> 359 raise caught_exception
~/notebook-env/lib/python3.6/site-packages/botocore/endpoint.py in _get_response(self, request, operation_model, attempts)
202 request, verify=self.verify,
203 stream=operation_model.has_streaming_output,
--> 204 proxies=self.proxies, timeout=self.timeout)
205 except ConnectionError as e:
206 # For a connection error, if it looks like it's a DNS
~/notebook-env/lib/python3.6/site-packages/botocore/vendored/requests/sessions.py in send(self, request, **kwargs)
571
572 # Send the request
--> 573 r = adapter.send(request, **kwargs)
574
575 # Total elapsed time of the request (approximately)
~/notebook-env/lib/python3.6/site-packages/botocore/vendored/requests/adapters.py in send(self, request, stream, timeout, verify, cert, proxies)
431 raise SSLError(e, request=request)
432 elif isinstance(e, ReadTimeoutError):
--> 433 raise ReadTimeout(e, request=request)
434 else:
435 raise
ReadTimeout: HTTPConnectionPool(host='s3.amazonaws.com', port=80): Read timed out. (read timeout=15)
It seem like however I pass the read_timeout it doesn't take effect. Just to verify I could make the error go away by changing the read_timeout value I went ahead and modified the code of core.py (line 205 and 210) to hardcode a longer read_timeout as follows:
if self.anon:
from botocore import UNSIGNED
conf = Config(connect_timeout=self.connect_timeout,
read_timeout=3600,
signature_version=UNSIGNED, **self.config_kwargs)
self.session = boto3.Session(**self.kwargs)
else:
conf = Config(connect_timeout=self.connect_timeout,
read_timeout=3600,
**self.config_kwargs)
And doing this did fix the timeout error.
Boto3 and Google Cloud Storage have an unfortunate history of not working together. This is unfortunate, because I'd really like to use Dask on Google hardware and this seems like it would be the cheapest way to do it.
There is a somewhat promising stackoverflow answer here: http://stackoverflow.com/questions/34089858/copy-file-from-gcs-to-s3-in-boto3 (see the final edit of the first answer). This, combined with #69 , might provide a solution?
cc @quasiben
Methods like touch, open, mkdir that create things should have keyword argument to specify the ACL (at least one of the canned ones http://docs.aws.amazon.com/AmazonS3/latest/dev/acl-overview.html#canned-acl ).
Optionally, should have another method to change the ACL for something already existing. The access pattern is different enough from posix that trying to emulate chmod probably does not make sense.
Clearing many keys from a single bucket has become a somewhat frequent operation. Currently we do this by cycling through all of the keys one-by-one and removing them. Is there a bulk remove-many or remove-bucket operation in boto3 that we can do instead ?
When using S3Map to write data into S3, currently the list of keys is cached the first time .keys()
is called, so any subsequent items added via __setitem__
do not appear in the list of keys. The underlying call to s3.walk()
offers a refresh option, so it would be possible to ensure a fresh list of keys is retrieved after the map has been modified.
The following message sometimes displays, have not yet determined the conditions that cause it:
Exception ignored in: <bound method S3File.__del__ of <S3File test/temp>>
Traceback (most recent call last):
File "/Users/mdurant/Downloads/s3fs/s3fs/core.py", line 580, in __del__
self.close()
File "/Users/mdurant/Downloads/s3fs/s3fs/core.py", line 574, in close
UploadId=self.mpu['UploadId'], MultipartUpload=part_info)
File "/Users/mdurant/Documents/anaconda/lib/python3.4/site-packages/botocore/client.py", line 310, in _api_call
return self._make_api_call(operation_name, kwargs)
File "/Users/mdurant/Documents/anaconda/lib/python3.4/site-packages/botocore/client.py", line 407, in _make_api_call
raise ClientError(parsed_response, operation_name)
botocore.exceptions.ClientError: An error occurred (AllAccessDisabled) when calling the CompleteMultipartUpload operation: All access to this object has been disabled
This path only gets executed in write-mode.
It'd be nice to support **
in globs as python 3.5 does.
This doesn't seem particularly important, especially with walk
, but I thought I'd mention it as something that'd be nice to have. I ran into a situation where it'd be slightly easier to use a glob with **
than multiple *
s or walk
.
This could be at odds with your goals, so feel free to just shoot it down!
Pandas' use-case for s3fs is at the key / object-level, and S3File
taking an instance of S3FileSystem
I think just causes headaches we'd have to work around (mainly permission level stuff).
Part of that is being worked on here.
A complementary approach would be to have S3File
take a boto3 s3 connection instead of a S3FileSystem
. A quick glance through the code showed that every use of S3File.s3
was only to get at the connection, except for S3File.close
, which invalidated the bucket cache.
I'm going to play with a couple approaches today and see what breaks. Do you see any hope for this happening?
Also the docstring for S3File
says that the first arg should be a boto3 connection
. I'm hoping we can make that true :)
I'm unable to read a CSV file that I know exists. It looks like it has to do with ls
not finding the file, because there are many files above it, and list_objects
returns a limit of 1000 objects.
In [1]: import s3fs
In [2]: fs = s3fs.S3FileSystem(anon=False)
In [3]: df = pd.DataFrame(np.arange(20).reshape(-1, 2))
In [4]: with fs.open('vehcuity/a/b.csv', 'wb') as f:
...: f.write(df.to_string().encode('utf8'))
...:
In [5]: import boto3
In [7]: client = boto3.client('s3')
In [8]: obj = client.get_object(Bucket='vehcuity', Key='test.csv') # it exists
In [10]: with fs.open('vehcuity/a/b.csv', 'rb') as f:
...: print(f.read())
... #OSError, file wasn't found
Looking at the traceback,
ipdb> len(self._ls(path))
1000
It seems that list_objects
only fetches the first 1000 keys. http://boto3.readthedocs.org/en/latest/reference/services/s3.html#S3.Client.list_objects, they suggest
You can use the request parameters as selection criteria to return a subset of the objects in a bucket.
As a partial solution, we could filter on the subdirectory with Prefix='a/'
. That could still fail if the 'a'
"directory" has more that 1,000 keys in it.
Latest release of boto3
breaks s3fs
.
Meanwhile this gets a proper fix, this PR pins boto3
version; #62
I have some AWS credentials for which I can't list buckets. I almost never have bucket level permissions, but I do have permissions on prefixes. So something like
fs = s3fs.S3FileSystem(profile_name='user')
fs.ls('s3://home/sseabold/prefix')
Won't work because I can't list any buckets (S3Client.list_buckets fails), and I also don't generally have permissions at the bucket level.
Similar issues to those here [1, 2] for the default (old) boto behavior.
[1] conda/conda#2126
[2] blaze/odo#448
A colleague (Danielle Fisher) found a bug in s3fs when appending to files larger than 5 MB. At line 734 of core.py, a path argument is passed to the boto upload_part_copy
function. If this path has a s3://
prefix, then an append will fail with the following error:
ClientError: An error occurred (NoSuchBucket) when calling the UploadPartCopy operation: The specified bucket does not exist
If the path does not have the s3://
prefix, then the append works just fine.
This code snippet will trigger the bug:
import s3fs
path = 's3://bucket/prefix/key'
fs = s3fs.S3FileSystem()
with fs.open(path, 'wb') as f:
f.write(b'a' * (10 * 2 ** 20))
with fs.open(path, 'ab') as f:
f.write(b'b' * (10 * 2 ** 20))
>>> s3.ls('')
['temp']
>>> s3.rm('temp')
>>> s3.ls('')
['temp']
(should be [])
How do I override the .ls pagination max of 1000? The default options for S3FileSystem don't take any special parameters and the **kwargs do not get passed from _ls -> _lsdir. Also, the _lsdir should be a yield, so that we can continue to iterate over the paginator.
@mrocklin, it looks like your recent issue with S3 buckets was not related to your permissions/credentials.
When running this code, it fails and says the S3 bucket doesn't exist:
>>> from s3fs import S3Map, S3FileSystem
>>> s3 = S3FileSystem(anon=True)
>>> dc = S3Map('dask-zarr-data/ecmwf/compressed/2014/t2m', s3=s3)
---------------------------------------------------------------------------
ValueError Traceback (most recent call last)
<ipython-input-9-91cfae4d9187> in <module>()
1 from s3fs import S3Map, S3FileSystem
2 s3 = S3FileSystem(anon=True)
----> 3 dc = S3Map('dask-zarr-data', s3=s3)
4
5 from zarr import Array
/opt/anaconda/lib/python2.7/site-packages/s3fs/mapping.pyc in __init__(self, root, s3, check, create)
42 self.s3.mkdir(bucket)
43 elif not self.s3.exists(bucket):
---> 44 raise ValueError("Bucket %s does not exist."
45 " Create bucket with the ``create=True`` keyword")
46
ValueError: Bucket %s does not exist. Create bucket with the ``create=True`` keyword
However, I can inspect the contents of the bucket:
>>> s3.ls('dask-zarr-data')
[u'dask-zarr-data/ecmwf']
It seems like the problem is with the S3FileSystem.exists
check here in s3fs/mapping.py when only a S3 bucket name is provided:
>>> s3.exists('dask-zarr-data/ecmwf')
True
>>> s3.exists('dask-zarr-data')
False
>>> s3.exists('dask-zarr-data/')
False
The implementation of walk in s3fs
returns a flat list of all paths encountered when traversing the tree. In contrast, os.walk
from the standard library returns an iterator of dirname, dirs, files
, where dirname
is the current directory prefix in the traversal, and fils
and dirs
are the names (relative, not absolute) of all files and directories respectively.
This difference in behavior is confusing for those familiar with the standard os.walk
method, and also makes providing the interface used by pyarrow
(which does match the standard library) tricky.
Pyarrow has created a wrapper class that provides the proper method interface, see here. Is there any reason not to just upstream this? We'd potentially need to figure out a good deprecation path, as the current behavior may be used by users.
The passing of argument s3_additional_kw according to documentation like:
s3 = s3fs.S3FileSystem(s3_additional_kwargs={'ServerSideEncryption': 'AES256'})
is not valid. Server side encryption need to be part of botocore.config for the s3 client client like so:
sess = boto3.Session()
conf = botocore.config.Config(s3={'ServerSideEncryption': "AES256"})
client = sess.client('s3', config=conf)
s3 = s3fs.S3FileSystem(key=ACCESS_KEY, secret=SECRET_KEY)
I cannot check for any file using the following methods:
s3.cat(path='s3://test/test.txt')
s3.ls(path='test/')
s3.ls(path='test')
FileNotFoundError Traceback (most recent call last)
in ()
----> 1 s3.cat(path='s3://mariont-test/test.txt')/Users/carlosrodrigues/anaconda/envs/testenv/lib/python2.7/site-packages/s3fs/core.pyc in cat(self, path)
527 def cat(self, path):
528 """ Returns contents of file """
--> 529 with self.open(path, 'rb') as f:
530 return f.read()
531/Users/carlosrodrigues/anaconda/envs/testenv/lib/python2.7/site-packages/s3fs/core.pyc in open(self, path, mode, block_size, acl, fill_cache)
256 " and manage bytes" % (mode[0] + 'b'))
257 return S3File(self, path, mode, block_size=block_size, acl=acl,
--> 258 fill_cache=fill_cache)
259
260 def _lsdir(self, path, refresh=False):/Users/carlosrodrigues/anaconda/envs/testenv/lib/python2.7/site-packages/s3fs/core.pyc in init(self, s3, path, mode, block_size, acl, fill_cache)
853 else:
854 try:
--> 855 self.size = self.info()['Size']
856 except (ClientError, ParamValidationError):
857 raise IOError("File not accessible", path)/Users/carlosrodrigues/anaconda/envs/testenv/lib/python2.7/site-packages/s3fs/core.pyc in info(self)
859 def info(self):
860 """ File information about this path """
--> 861 return self.s3.info(self.path)
862
863 def metadata(self, refresh=False):/Users/carlosrodrigues/anaconda/envs/testenv/lib/python2.7/site-packages/s3fs/core.pyc in info(self, path, refresh)
363 return out
364 except (ClientError, ParamValidationError):
--> 365 raise FileNotFoundError(path)
366
367 _metadata_cache = {}FileNotFoundError: mariont-test/test.txt
Every method I call end up here. Any suggestion how to fix it ?
(ps: I can connect to s3 with boto3.client and put files for example. So it is a bit strange)
It would be convenient to treat an S3 bucket as a MutableMapping
.
This might look something like the following MutableMapping
around a local directory: https://github.com/mrocklin/zict/blob/master/zict/file.py
It would be useful for hosting NDArrays on S3 using an upcoming Zarr refactor: https://github.com/alimanfoo/zarr/pull/22
@martindurant does this interest you?
This is currently on _ls
but not available through ls
or glob
.
list(f) or f.readlines() iterate forever. We must have been relying on the calling function (e.g., pandas' read_csv) to quit when getting no more data, or maybe all were reading binary only.
In s3
, the concept of a bucket and a key are separate. In the current design of s3fs
, both are used together to define a full path. This poses a number of complications:
s3://bucket/path/to/file
, traditional uri schemes would have bucket
be the host, and the remainder be the path. In dask
we've ended up special casing s3fs
and gcfs
when handling this parsing, which is unfortunate. If the bucket was part of the filesystem, this special casing would no longer be necessary.touch
(https://github.com/dask/s3fs/blob/master/s3fs/core.py#L793-L808) is split depending on if the path is just a bucket or a full key. Buckets have different semantics, when in a filesystem all objects should have the same semantics. See glob
for another example (https://github.com/dask/s3fs/blob/master/s3fs/core.py#L548-L549). You can't glob across buckets, because that's all of s3
.Since s3fs
is already in use, we can't perform a breaking change like this without some deprecation cycle. I'm not actually sure if we need to break the old behavior though, but supporting both might make the code more complicated. Proposal:
bucket=None
kwarg to the __init__
of S3FileSystem
. The default is the current behavior.bucket
is specified, it becomes the bucket in all specified paths.Since s3fs
currently works with paths prefixed with s3://
, I'm not sure how that should be handled. IMO s3fs shouldn't accept paths specified with s3://
(and should instead accept plain paths as if they were local paths), but that may be hard to deprecate cleanly. Perhaps if the filesystem has a bucket
specified, error on paths that start with s3://
.
If deprecation of the existing behavior is desired, a warning could be raised on __init__
whenever bucket
is None
, and eventually the existing behavior phased out.
Anyway, this is just one person's opinion. Figured it'd be worth bringing up to get some feedback.
Can we cut a release with the serverside encryption params?
I apologize if this feature already exists, but I did not find it!
boto3
supports profile_name
keyword, where appropriate profile specified in ~/.aws/credetials
.
session = boto3.Session(profile_name='some-profile')
We have an s3-like server that is hosted on a service other than amazon. We need to change the 'endpoint_url' param. We are pretty comfortable using Boto and would like S3FS to simply work within that configuration. I imagine an initialization like the following: #@
session = boto3.Session(profile_name='my_profile')
s3 = session.client('s3', endpoint_url='https://some_unique_endpoint.org')
s3_filesys = S3FileSystem(s3)
The caching scheme is made for some process interacting with only some limited part os a massive file. If a huge file is opened and read beginning to end, eventually the whole thing will be cached in memory, which may not be possible. Should there be a "streaming mode" where, although we can still read ahead, when we get passes data it gets cleaned?
Will you consider adding a zipping method in the future?
I am running into a situation where i can zip locally and upload but i would prefer a zip function that works directly with keys. One that least consumes local memory or local disk space because it is a limited resource.
How do I get involved with this project? I have been using it a lot lately.
I'm running into the following error with rmdir. The info
method says Gets details only for a specific key, directories/buckets cannot be used with info.
However, the rmdir
function on line 641 says if (key and self.info(path)['Size'] == 0) or not key:
. As one might expect, this results in File "/opt/conda/lib/python3.6/site-packages/s3fs/core.py", line 402, in info raise FileNotFoundError(path) FileNotFoundError: <my path redacted>
.
Concreteize required outputs of ls, depending on whether bucket exists, and filling in implied directories
e.g.,
s3.ls('nonexistent`)
-> error
s3.walk('test')
['test/file1', 'test/dir/file2']
s3.ls('test')
['test/file1', 'test/dir']
Should walk/glob also pick up implied directories?
I'm using s3fs and fastparquet to write parquet files to s3. I've configured presto to read from s3 using hive external table.
Problem here is, presto will read from the file where fast parquet is writing, so it is failing saying invalid parquet file. To outcome this problem, I'll be writing to a temporary path, lets say, i'm supposed to write to
filename = 'bucket_name/account_type/yr=2017/mn=10/dt=8/19/de86d8ed-7447-420f-9f25-799412e377adparquet.json'
# let's write to temp file
tmp_file = filename.replace('account_type', 'tmp-account_type')
fastparquet.write(filename, df, open_with=opener)
fs.mv(tmp_file, fllename)
But even in this case, it looks like sometimes, rarely presto is reading incomplete file. How's this possible? How can we make this atomic/isolated with s3fs?
how to mount a s3 bucket just like s3fs-fuse, https://github.com/s3fs-fuse/s3fs-fuse ,and then i can operate the objects like local file? Does this project support this function?
Replication of functionality in fsspec/gcsfs#53
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.