aio-libs / aiocassandra Goto Github PK
View Code? Open in Web Editor NEWSimple threaded cassandra wrapper for asyncio
Home Page: https://pypi.python.org/pypi/aiocassandra
License: MIT License
Simple threaded cassandra wrapper for asyncio
Home Page: https://pypi.python.org/pypi/aiocassandra
License: MIT License
After calling __aenter__
, paginator starts fetching pages into memory, no matter how quickly rows are consumed from it. So, if consumer iterates over rows much slower than paginator fetches them, then rows deque starts to growing and causes a large memory consumption.
Hi there,
I am new in asyncio coding. Recently, I wrote a web application with aiohttp, during a request, I need do some db reading from and writing to cassandra. I use the aiocassandra as the driver, in the test environment, all stuff is ok. But in production environment, when the throughput(at the peek around 2k-3k QPS) is going high, I got these errors very frequently(50-80 per 15 minutes)
{'status': 'failed', 'content': None, 'errorCode': 'Internal Server Error', 'errorMessage': 'Traceback (most recent call last):\n File "/app_libs/common/web/middlewares.py", line 74, in error_middleware\n response = await handler(request)\n File "/app_libs/common/web/middlewares.py", line 26, in request_trace_middleware\n response = await handler(request)\n File "/app_libs/common/web/middlewares.py", line 59, in track_time_middleware\n response = await handler(request)\n File "/app/routes/collecting.py", line 40, in webcollect\n matchresult = await integration_data(request.app, request_data, config_data, cass_flag=True, kafka_flag=True)\n File "/app/services/integration.py", line 34, in integration_data\n result = await integration_event(app, request_data, configs, **kwargs)\n File "/app/services/integration.py", line 62, in integration_event\n user_data.online_data[\'page\'], app[\'cassandra\'], profile_id, user_data.device_id)\n File "/app/services/integration.py", line 282, in check_pt_id\n \'event\', \'device_id\', device_id, profile_id)\n File "/app/libs/core.py", line 179, in cassandra_read\n result.extend(await query(table, pk_str, pk_value))\n File "/app/libs/core.py", line 168, in query\n res = await session.execute_future(cql, (pk_v, ))\n File "/usr/local/lib/python3.7/site-packages/aiocassandra.py", line 153, in execute_future\n return await asyncio_fut\nconcurrent.futures._base.CancelledError\n'}
... execute_future\n return await asyncio_fut\nconcurrent.futures._base.CancelledError\n'...
At first, I do those cassandra reading work after the request is finished, make those processes under an asynio task. Then when I got those errors, I wonder if that is caused by aiohttp framewok, as when a request is done, aiohttp may cancel tasks binded a request. Then I invoked aiojobs to make sure all those tasks would be waited even the response is returned. BUT the problem is still there, I try to put the cassandra operations into the request lifelong, take them out from the post-request background asyncio tasks, BUT, still, not solved the prolem. I search a lot on the internet, but not find anything really helpful.
I notice that this driver is wrapped with the datastax version with async query. I am not sure if I am using this driver correctlly or indeed there an issue exists. I dont know what to do with this, changing the thread/process executor or reuse the datastax non-aio verion back? Any advice from you guys?
Thank you very much. (By the way, I also use aiokafka in the same env but has not occured the same errors, and again, when the thoughput is at low level, all stuff is working.)
++++
python 3.7.4
aiohttp==3.6.0
aiocassandra==2.0.1
cassandra-driver==3.18.0
Hi ,
my sample code objective is asyn write and read (aiocassandra). Both the cases it could be millions. But failing to do so. Especial if i go for selection and looping back large set of data say 100000.
Also it gives me -
1.
cqlsh> select * from test.dummytbl;
u'key' | u'col1' | u'col2'
--------+---------+---------
(0 rows)
Failed to format value u'key' : 'unicode' object has no attribute 'formatter'
Failed to format value u'col1' : 'unicode' object has no attribute 'formatter'
1 more decoding errors suppressed.
Current time - End - session : 1551876594703
2019-03-06 18:19:55,200 [DEBUG] aiocassandra: Paginator is closed, cleared in-memory 0 records
[root@vm-9 cassandra_poc]# cqlsh
Connection error: ('Unable to connect to any servers', {'127.0.0.1': error(111, "Tried connecting to [('127.0.0.1', 9042)]. Last error: Connection refused")})
your help is really appreciated.
I have attached the file here.
Regards,
Dwarika
Do like this:
REPO_NAME={{ YOUR_REPO_NAME }}
travis encrypt -r "aio-libs/${REPO_NAME}" --api-endpoint 'https://api.travis-ci.com/'
Ref: https://github.com/orgs/aio-libs/teams/admins/discussions/9
It's erroring out during dpkg install
of some java-related thing: https://travis-ci.com/aio-libs/aiocassandra/jobs/150197735#L469
I haven't found any methods for it
Can it be used with cqlengine objects?
Could you give some examples of insert queries ?
Something like this
executable_query = cassandra.prepare("""INSERT INTO units (date, marks, user_id) VALUES (%s, %s, %s)""", (date,marks,user_id,))
await cassandra.execute_future(executable_query)
or this:
executable_query = cassandra.prepare("""INSERT INTO units (date, marks, user_id) VALUES (%s, %s, %s)""")
await cassandra.execute_future(executable_query, (date,marks,user_id,))
fails with the error:
cassandra.protocol.SyntaxException: <Error from server: code=2000 [Syntax error in CQL query] message="line 1:49 no viable alternative at character '%'">
There is seems to be a bug in travis, https://github.com/aio-libs/aiocassandra/blob/master/.travis.yml#L39
deploy section uses system provided Python2.7 with broken SNI module and it fails hard
Hello,
there is a problem with this line:
Line 77 in 3ffd252
The __aexit__
coroutine is not correctly setting the self._finish_event
. There are problems with the way aiocassandra uses threading, to correct this, we have to call self._loop.call_soon_threadsafe(self._finish_event.set)
.
This leads to problems when a task working through results of the pagination is suddenly cancelled.
Would it be possible to fix this, or should I make a custom fork for my use?
I'm considering this library for a project. Thank you for creating and maintaining it.
Is there a reason why it uses a threadpool? The Cassandra driver is already async and has a callback interface. Wouldn't it be more performant to create an interface between the Cassandra driver and asyncio futures?
I could probably write this myself, as it should only be a handful of lines of code. But your pagination context manager is something I'd love to take advantage of.
i use uvloop instead of asyncio.get_event_loop,and aiocassandra can not work.
The error is "attached to a different loop"
I would like to use Sanic, an Async Python web server (https://github.com/channelcat/sanic), with Aiocassandra. But I cannot figure out how to successfully deploy it with multiple workers.
Here is a part of my code:
cluster = Cluster()
cassandra_session = cluster.connect()
# http://sanic.readthedocs.io/en/latest/sanic/middleware.html?highlight=listener#listeners
@app.listener('before_server_start')
def init_cassandra_session(app, loop):
aiosession(cassandra_session, loop=loop)
@app.route('/')
def my_route(request):
# Cassandra query and return result
if __name__ == "__main__":
app.run(host="0.0.0.0", port=8000, debug=True, workers=4)
When I set more than one workers, the query seems got stuck and won't return any results.
But when I change the number of workers to 1, the server works fine.
Is there anything I did wrong here?
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.