dabeaz / curio Goto Github PK
View Code? Open in Web Editor NEWGood Curio!
License: Other
Good Curio!
License: Other
os.set_blocking
is missing on Windows, which causes this call in io.py to fail:
os.set_blocking(self._fileno, False)
The reference guide doesn't seem to mention any of it beyond mentioning it.
If Curio is to support Windows, does it do so using IOCP? If so, what does it look like? Is it just an internal implementation detail hidden from view? Or would programmers want to interact with it more directly in some manner?
Just thinking out loud here....
Thinking about switching to py.test for the tests. No particular reason other than having a more concrete excuse to use and learn more about py.test. Thoughts?
This page says:
await curio._read_wait(fileobj, timeout=None)
But:
In [1]: import curio
In [2]: curio._read_wait
AttributeError: module 'curio' has no attribute '_read_wait'
It seems to be only in curio.kernel._read_wait
. I suppose this is a better place for it and the bug is just in the docs, but either way fyi :-)
(Found because I was trying to track down what happens if a timeout expires; it doesn't seem to be mentioned in the reference manual.)
The maintainer of this project (Dave) sometimes experiences a delay in responding to issues and pull requests. What's the story?
I read the source code about Stream
class in io
module. However, the exception handling in Stream._read
method is confusing me. In actually it only process read functionality. So, i think it can do not need to catch WantWrite
exception.
If i am wrong, please tell me.:)
The following code seems to trigger this:
import curio
async def go():
await curio.sleep(2)
async def main():
task1 = await curio.new_task(go())
task2 = await curio.new_task(go())
if __name__ == '__main__':
kernel = curio.Kernel()
kernel.run(main())
Gives this:
Curio: Task Crash: go
Traceback (most recent call last):
File "C:\Anaconda3\lib\site-packages\curio\kernel.py", line 370, in run
getattr(self, op)(current, *args)
File "C:\Anaconda3\lib\site-packages\curio\kernel.py", line 477, in _trap_sleep
self._set_timeout(current, seconds, 'sleep')
File "C:\Anaconda3\lib\site-packages\curio\kernel.py", line 308, in _set_timeout
heapq.heappush(self._sleeping, item)
TypeError: unorderable types: Task() < Task()
Traceback (most recent call last):
File "C:\Users\roger\OneDrive\Documents\Python Scripts\delme.py", line 20, in <module>
kernel.run(main())
File "C:\Anaconda3\lib\site-packages\curio\kernel.py", line 355, in run
self._poll_for_io()
File "C:\Anaconda3\lib\site-packages\curio\kernel.py", line 334, in _poll_for_io
self._reschedule_task(task)
File "C:\Anaconda3\lib\site-packages\curio\kernel.py", line 234, in _reschedule_task
assert task.id in self._tasks, 'Task %r not in the task table' % task
AssertionError: Task Task(id=4, <coroutine object go at 0x000000B1A98EA888>, state='CRASHED') not in the task table
That way you will know if a PR will make an example out-of-date.
py.test apparently has a way to execute code examples found in Sphinx docs. Might be helpful in order to make sure that the docs stays up-to-date with the code.
The broken link under "Can I contribute?" was fixed in index.rst (https://github.com/dabeaz/curio/blob/master/docs/index.rst#questions-and-answers), but it is not in sync with README.rst (https://github.com/dabeaz/curio#questions-and-answers) which is still broken.
So obviously, you cannot call async methods from __del__
methods. And because of this, close
methods generally have to be non-async as well. And in particular, Python enforces that the generator protocol's close
method is synchronous and cannot yield
:
In [5]: def f():
...: try:
...: yield 1
...: finally:
...: yield 2
...:
In [6]: gen = f()
In [7]: next(gen)
Out[7]: 1
In [8]: gen.close()
RuntimeError: generator ignored GeneratorExit
This doesn't come up much in the context of coroutines, because coroutines are usually iterated to exhaustion. But as soon as you have asynchronous generators then it's a real problem.
For example, imagine that we have an asynchronous WSGI-like API that wants to use an async for
loop to collect up the body of an HTTP response. And suppose that we have some code that builds that body by reading from a remote socket::
from async_generator import async_generator, yield_
@async_generator
async def get_html_body_from_socket():
sock = await do_connect(...)
try:
while True:
got = await sock.recv(1024)
if not got:
return
await yield_(got)
finally:
await sock.close()
And suppose that this iterator is not exhausted -- for example, because the HTTP client went away, so the WSGI-like server code stopped iterating over it. Then eventually either the server code or the Python runtime will call .close()
on the get_html_body_from_socket
coroutine, this will inject a GeneratorExit
exception, and we are required to handle this exception without yielding. So the above code is... kinda wrong. You'll get away with it in practice because Curio's Socket.close()
doesn't actually yield, but the easy general rule is that coroutine finally
blocks should never contain await
, and this violates that. And this constraint means that Curio's Socket.close()
must never change in the future to yield, so we might as well make it non-async to remind us of this and enforce it.
For the same reason, I guess Socket
should implement the synchronous __enter__
/__exit__
protocol instead of __aenter__
/__aexit__
.
Possibly there are other places in curio with the same issue.
I have a PR ready to go in https://github.com/brettcannon/curio/tree/travis-osx but I'm waiting for #59 to be dealt with so there's no merge conflict.
The README page suggests that the curio can work with files, is it possible to add an example for what kind of cooperative I/O can be accomplished?
in curio/ssl.py
Consider:
import curio
async def main():
raise Exception("My function failed")
return "Completed"
try:
res = curio.run(main())
except:
print("OK")
raise
else:
print("uh?")
print("Should not be here...")
#Since there was no exception:
assert(res=="Completed")
The exception will be printed to screen (with the log_errors option set to True), but will not be further raised up the stack and the return value will be None.
Therefore the result I get is:
Curio: Task Crash: main
Traceback (most recent call last):
File "/home/zah/sourcecode/curio/curio/kernel.py", line 562, in run
trap = current._send(current.next_value)
File "<ipython-input-2-2eb192ef26ea>", line 3, in main
raise Exception("My function failed")
Exception: My function failed
uh?
Should not be here...
---------------------------------------------------------------------------
AssertionError Traceback (most recent call last)
<ipython-input-2-2eb192ef26ea> in <module>()
14 print("Should not be here...")
15 #Since there was no exception:
---> 16 assert(res=="Completed")
AssertionError:
And I would expect to deal either with my original exception or a with TaskError.
It seems that Task.state is just storing string constants, so maybe it would be good to use an enum instead?
I noticed that, though curio has a 0.1 release on PyPI, there are no tagged releases on GitHub.
Do you plan to tag releases here? Among other things, it makes it easier to browse the code at a given release.
Looking at #70, this looks like a difficult problem. I would like a way to shut down cleanly a Kernel using pools when the user cancels the program (e.g. KeyboardInterrupt
ing it). The most obvious problem is in curio/workers.py
, here https://github.com/dabeaz/curio/blob/master/curio/workers.py#L284: By the time the finally is executed, shutdown
is already called, setting the workers to None
, and thus resulting in an attribute error. Once that is worked around, with e.g.
diff --git a/curio/workers.py b/curio/workers.py
index 78895fd..d779d99 100644
--- a/curio/workers.py
+++ b/curio/workers.py
@@ -282,7 +282,8 @@ class WorkerPool(object):
worker = self.workercls()
raise
finally:
- self.workers.append(worker)
+ if self.workers is not None:
+ self.workers.append(worker)
There are still some issues. If I have a simple program like:
import curio
def f():
import time
time.sleep(40)
async def main():
coro = curio.run_in_process(f)
task = await curio.spawn(coro)
kernel = curio.Kernel()
try:
kernel.run(main())
except KeyboardInterrupt:
print("OK")
and I run it and then press Ctrl+C, it almost works, except for some noisy multiprocessing delete action:
$ python simple.py
^COK
Process Process-1:
If I change f()
to explicitly ignore the interrupt signal, it shuts quiet:
def f():
import time
import signal
signal.signal(signal.SIGINT, signal.SIG_IGN)
time.sleep(40)
$ python simple.py
^COK
However, if I have a more complex code (specifically this https://github.com/NNPDF/reportengine/blob/25f7b0c4680a91b692653e24b1a0b5f8c222dc99/src/reportengine/resourcebuilder.py#L135) with more tasks being ran and waited for I get the dreaded GeneratorExit
thing 4 times, once for each concurrent run_in_process. Not really sure why it behaves differently from the simple case:
Exception ignored in: <bound method Task.__del__ of Task(id=9, <coroutine object run_in_process at 0x7f6dadf60830>, state='READ_WAIT')>
Traceback (most recent call last):
File "/home/zah/sourcecode/curio/curio/task.py", line 53, in __del__
self.coro.close()
File "/home/zah/sourcecode/curio/curio/workers.py", line 108, in run_in_process
return await kernel._process_pool.apply(callable, args, kwargs)
RuntimeError: coroutine ignored GeneratorExit
Exception ignored in: <bound method Task.__del__ of Task(id=22, <coroutine object run_in_process at 0x7f6dae474728>, state='READ_WAIT')>
Traceback (most recent call last):
File "/home/zah/sourcecode/curio/curio/task.py", line 53, in __del__
self.coro.close()
File "/home/zah/sourcecode/curio/curio/workers.py", line 108, in run_in_process
return await kernel._process_pool.apply(callable, args, kwargs)
RuntimeError: coroutine ignored GeneratorExit
Exception ignored in: <bound method Task.__del__ of Task(id=24, <coroutine object run_in_process at 0x7f6dadf69258>, state='READ_WAIT')>
Traceback (most recent call last):
File "/home/zah/sourcecode/curio/curio/task.py", line 53, in __del__
self.coro.close()
File "/home/zah/sourcecode/curio/curio/workers.py", line 108, in run_in_process
return await kernel._process_pool.apply(callable, args, kwargs)
RuntimeError: coroutine ignored GeneratorExit
Exception ignored in: <bound method Task.__del__ of Task(id=25, <coroutine object run_in_process at 0x7f6dadf696d0>, state='READ_WAIT')>
Traceback (most recent call last):
File "/home/zah/sourcecode/curio/curio/task.py", line 53, in __del__
self.coro.close()
File "/home/zah/sourcecode/curio/curio/workers.py", line 108, in run_in_process
return await kernel._process_pool.apply(callable, args, kwargs)
RuntimeError: coroutine ignored GeneratorExit
I'm experimenting with a mixed tinter + curio example. Here is the curio section:
async def generate_messages(app, ctrl, gap):
await app.running.wait()
async with ignore_after(DURATION):
while app.running.is_set():
app._window.generate_message(ctrl)
await sleep(gap)
async def tk_runloop(app):
await app.running.wait()
while app.running.is_set():
app.update()
await sleep(0.1)
async def main():
app = Application('curio/Tk Example')
app.update()
rl = await spawn(tk_runloop(app))
m0 = await spawn(generate_messages(app, 0, 0.05))
m1 = await spawn(generate_messages(app, 1, 0.1))
m2 = await spawn(generate_messages(app, 2, 0.2))
m3 = await spawn(generate_messages(app, 3, 0.4))
await app.running.set()
await rl.join()
if __name__ == '__main__':
k = Kernel(with_monitor=True)
k.run(main())
k.run(None, shutdown=True)
This almost always does what I expect: the GUI appears, the 4 generate_messages()
tasks run, the GUI is updated, then after DURATION
seconds the generate_messages()
tasks exit.
Occasionally only 3 of the tasks exit; the other, always m0
, just keeps on going. Using the monitor's ps
command I can see the 4 tasks' Timeout values decreasing, then 3 tasks disappear while the other's timeout value becomes None and it carries on executing.
Have I misunderstood the purpose and working of ignore_after()
used as a context manager or could this be a problem with curio?
I asked about this in sphinx-doc/sphinx#2105 and they basically said some custom Sphinx code is needed to get the right result. There is enough linked from that issue to come up with an .. await::
directive that acts like a .. method::
declaration but is prefaced with await
(a similar one for functions will also be needed). The work should probably get moved upstream so it can be used in the official Python docs.
Once this is done then proper linking can be done so that all mentioned functions and methods are linked in the docs.
I have read the channel
module, something is confused me. the method send_bytes
process write differently according to the length of buf
.
async def send_bytes(self, buf):
'''
Send a buffer of bytes as a single message
'''
size = len(buf)
header = struct.pack('!I', size)
if size >= 16384:
while header:
try:
nsent = os.write(self.fileno, header)
header = header[nsent:]
except BlockingIOError:
await _write_wait(self.fileno)
else:
buf = header + bytes(buf)
m = memoryview(buf)
while m:
try:
nsent = os.write(self.fileno, m)
m = m[nsent:]
except BlockingIOError:
await _write_wait(self.fileno)
I don't know why we need do this.
I would like to have some kind of discussion forum for curio. However, I do NOT want it to be based on a mailing list or something like Google Groups. email-based discussion doesn't work for me (plus, I'm bad enough about email already). Open to all ideas here.
The latest test run found at https://travis-ci.org/dabeaz/curio should have no warnings about references in the documentation.
Once all warnings have been eliminated then -W
can be added to the doc testing command to make sure all warnings are treated as errors.
I think this is a typo in wrapping curio.ssl.get_server_socket
, instead of
@wraps(_ssl.wrap_socket)
, it should be
@wraps(_ssl.get_server_certificate)
During style fixes with flake8 I got:
curio/io.py:297:35: F821 undefined name 'n'
curio/io.py:298:26: F821 undefined name 'n'
Not tested it, but I think 'maxbytes' is meant instead of 'n'.
currently the definition of new_task
is like below. However, i don't know how does the second parameter, the asterisk, means.
async def new_task(coro, *, daemon=False):
'''
Create a new task.
'''
return await _new_task(coro, daemon)
In fact, if we call the new_task
function with an additional_useless_argument
, just like below:
# hello.py
import curio
async def countdown(n):
while n > 0:
print('T-minus', n)
await curio.sleep(1)
n -= 1
if __name__ == '__main__':
kernel = curio.Kernel()
additional_useless_argument = 0
kernel.run(countdown(10), additional_useless_argument)
error message is shown:
TypeError: new_task() takes 1 positional argument but 2 were given
Hey, great package with a nice interface.
I am trying to use curio with other packages (e.g. aioamqp) that are written using the asyncio.coroutine decorator. When I try to await from these in a coroutine being run by the curio kernel, I am getting the following error:
Curio: Task Crash: foo
Traceback (most recent call last):
File "/Users/billyshambrook/venv/lib/python3.5/site-packages/curio/kernel.py", line 365, in run
op, *args = current.coro.send(current.next_value)
File "/Users/billyshambrook/.pyenv/versions/3.5.1/lib/python3.5/asyncio/futures.py", line 359, in __iter__
assert self.done(), "yield from wasn't used with future"
AssertionError: yield from wasn't used with future
The code I am trying to run is...
import asyncio
import curio
@asyncio.coroutine
def bar():
yield from asyncio.sleep(2)
async def foo():
await bar()
kernel = curio.Kernel()
kernel.run(foo())
Is there anyway around this? or must all coroutines used use the new python 3.5 async
syntax?
Thanks,
Hello,
Would it be wise to use Curio inside a twisted server ?
Regards
Just for the fun, I have launch the following script with websocketd
websocketd --port=8080 python3.5 fib.py
And launch a web browser containing the below javascript script, and I am now able
to calculate fib(n) in Javascript through curio.
In the web browser console enter:
sendws(20)
6765
I find being able to use Curio via Javascript very nice.
Perhaps this could be the base to create microwebservices using Curio....
from sys import stdout
import curio
from curio import run, run_in_process, tcp_server
def fib(n):
if n <= 2:
return 1
else:
return fib(n-1) + fib(n-2)
async def fibserver():
while True:
r = int(input())
result = await run_in_process(fib, r)
print(result)
stdout.flush()
curio.run(fibserver())
And in Javascript
var ws = new WebSocket('ws://' + "168.1.6.52:8080" + "/");
function sendws(msg) {
document.getElementById('count').innerHTML = "Waiting..."
console.log("Waiting");
ws.send(msg)
}
ws.onopen = function() {
document.body.style.backgroundColor = '#cfc';
};
ws.onclose = function() {
document.body.style.backgroundColor = null;
};
ws.onmessage = function(event) {
document.getElementById('count').innerHTML = event.data;
};
Trivial but wanted to write it down before I forgot.
(My workaround for now is just to write
with sock.blocking() as raw_sock:
raw_sock.shutdown(...)
which is fine but ugly.)
Curio: Task Crash: monitor
Traceback (most recent call last):
File "c:\Miniconda3\lib\site-packages\curio\kernel.py", line 365, in run
op, *args = current.coro.send(current.next_value)
File "c:\Miniconda3\lib\site-packages\curio\monitor.py", line 143, in monitor
await monrun(kernel)
File "c:\Miniconda3\lib\site-packages\curio\monitor.py", line 70, in monrun
stdin = Stream(sys.stdin.buffer.raw)
File "c:\Miniconda3\lib\site-packages\curio\io.py", line 276, in init
os.set_blocking(self._fileno, False)
AttributeError: module 'os' has no attribute 'set_blocking'
That way you can know if the docs will be broken by a PR.
These are advanced linux system calls that make file descriptors instead of the classic Posix calls: signal, pipes/sockets and sleep/select(with timeout), thus making some operations either more performant (eventfd, timerfd), or more correct (signalfd).
There are already some implementations of these on pypi.
Discuss! :)
Hi!
Running the following "getting started" code:
async def countdown(n):
while n > 0:
print('T-minus', n)
await curio.sleep(1)
n -= 1
if __name__ == '__main__':
curio.run(countdown(10))
results in the following output:
T-minus 10
Traceback (most recent call last):
File ".../Projects/curio_test/test.py", line 24, in <module>
curio.run(countdown(10))
File "...\VirtualEnvs\curio_test\lib\site-packages\curio\kernel.py", line 613, in run
result = kernel.run(coro, shutdown=True)
File "...\VirtualEnvs\curio_test\lib\site-packages\curio\kernel.py", line 491, in run
events = selector_select(timeout)
File "...\Python\Python35\lib\selectors.py", line 323, in select
r, w, _ = self._select(self._readers, self._writers, [], timeout)
File "...\Python\Python35\lib\selectors.py", line 314, in _select
r, w, x = select.select(r, w, w, timeout)
OSError: [WinError 10022] An invalid argument was supplied
OS: Windows 10 x64
Python: 3.5.2
I'm totally into the "we only have one method" aesthetic but I think maybe it's gone too far :-)
Or maybe it's just that I don't understand what the difference is between kernel.run(coro, shutdown=False)
and kernel.run(coro, shutdown=True)
. The docs say "If shutdown is True, the kernel will cancel all daemonic tasks and perform a clean shutdown once all regular tasks have completed." But isn't that, like... what you always want?
But I'm confident that in the kernel.run(shutdown=False)
version, calling kernel.run
when what I want is for the kernel to not run, is super confusing. Surely this should be kernel.shutdown()
.
Or await curio.shutdown()
for that matter. I don't know why this is the one function in curio that doesn't go through the trap system.
As you know I'm a big fan on timeout_after
. The awesome thing about it is that it's composable, in the sense that I can treat any random coroutine as a black box and slap a timeout_after
around it to impose a timeout on it, so long as the called coroutine follows some simple rules. Basically, it should be written so that my timeout fires, then it promptly cleans up and exits. Not an onerous constraint.
Except... it's hard to follow this rule in coroutines that themselves want to use timeout_after
. The docs allude to some of this; here's an example where everything goes wrong because the internal coroutine can't tell whether it's the internal timeout that fired, or the outer one:
async def get_first_responding(urls):
for url in urls:
try:
# per-URL timeout
async with timeout_after(5):
return (await curio_http.get(url)).text
except TaskTimeout:
print("Too slow! Trying the next")
raise RuntimeError("everyone's too slow!")
async def main():
# Global timeout:
async with timeout_after(10):
print(await get_first_responding([...]))
I suggest that the key missing feature is that when we see a timeout exception, we need to be able to figure out whether it's one that means that we have timed out and should clean up and let our caller get on with things, versus one that means a thing we called has timed out and we should recover however makes sense (e.g. by falling back to trying another server). Are we inside the timeout_after
that fired, or outside?
outer code <--- Are we here?
timeout_after
inner code <-- or here?
Proposal: arrange for the inner code to see a different exception type than the outer code.
Implementation: give each timeout_after
a magic cookie, and when a timeout fires, inject an instance of the "inner" exception type with the magic cookie attached. When an exception passes through a timeout_after
, check if the cookies match, and if so, convert to the "outer" exception type. (And ignore_after
should check the cookie, and swallow the exception iff it matches.)
When there are stacked timeouts that fire simultaneously, e.g.
async with timeout_after(5):
# -- mark --
async with timeout_after(10):
...
then it should always be the outermost timeout whose cookie is used (so the line at -- mark --
should see an "inner" exception, not an "outer" one).
Unsolved problem: I am drawing a blank on what to call these two exception types to distinguish them :-(. OperationTimeOut versus CancelledTimeOut? [Edit: or maybe ExternalTimeout / InternalTimeout? CallerTimeout / CalleeTimeout? OutsideTimeout / InsideTimeout? LocalTimeout?]
(Speaking of timeout exception names, why is it called TaskTimeout
? It doesn't really have anything to do with Tasks in particular, does it? I mean, any more so than literally everything else in curio?)
The traceback of:
import curio
def f(x):
raise ValueError("Hello")
return x
async def main():
await curio.run_in_process(f,5)
return "Completed"
try:
res = curio.run(main())
except curio.TaskError as e:
raise
does not show the line inside the f
function where the exception occurred:
ValueError Traceback (most recent call last)
/home/zah/sourcecode/curio/curio/kernel.py in run(self, coro, shutdown)
561 if current.next_exc is None:
--> 562 trap = current._send(current.next_value)
563 else:
<ipython-input-30-e4e29ac70d40> in main()
7 async def main():
----> 8 await curio.run_in_process(f,5)
9 return "Completed"
/home/zah/sourcecode/curio/curio/workers.py in run_in_process(callable, *args, **kwargs)
81
---> 82 return await kernel._process_pool.apply(callable, args, kwargs)
83
/home/zah/sourcecode/curio/curio/workers.py in apply(self, func, args, kwargs)
249 try:
--> 250 return await worker.apply(func, args, kwargs)
251 except CancelledError:
/home/zah/sourcecode/curio/curio/workers.py in apply(self, func, args, kwargs)
220 else:
--> 221 raise result
222
ValueError: Hello
The above exception was the direct cause of the following exception:
TaskError Traceback (most recent call last)
<ipython-input-30-e4e29ac70d40> in <module>()
14 except curio.TaskError as e:
15 print(type(e.__cause__))
---> 16 raise e
<ipython-input-30-e4e29ac70d40> in <module>()
11
12 try:
---> 13 res = curio.run(main())
14 except curio.TaskError as e:
15 print(type(e.__cause__))
/home/zah/sourcecode/curio/curio/kernel.py in run(coro, pdb, log_errors, with_monitor, selector)
648 kernel = Kernel(selector=selector, with_monitor=with_monitor,
649 log_errors=log_errors, pdb=pdb)
--> 650 result = kernel.run(coro, shutdown=True)
651 return result
652
/home/zah/sourcecode/curio/curio/kernel.py in run(self, coro, shutdown)
627 if maintask:
628 if maintask.next_exc:
--> 629 raise maintask.next_exc
630 else:
631 return maintask.next_value
TaskError: Task Crashed
This is shown with multiprocessing alone:
import multiprocessing
p = multiprocessing.Pool()
p.apply(f, (1,))
shows a "RemoteTraceback"
---------------------------------------------------------------------------
RemoteTraceback Traceback (most recent call last)
RemoteTraceback:
"""
Traceback (most recent call last):
File "/home/zah/anaconda3/lib/python3.5/multiprocessing/pool.py", line 119, in worker
result = (True, func(*args, **kwds))
File "<ipython-input-30-e4e29ac70d40>", line 4, in f
raise ValueError("Hello")
ValueError: Hello
"""
The above exception was the direct cause of the following exception:
ValueError Traceback (most recent call last)
<ipython-input-31-9ccf31c9442a> in <module>()
1 import multiprocessing
2 p = multiprocessing.Pool()
----> 3 p.apply(f, (1,))
/home/zah/anaconda3/lib/python3.5/multiprocessing/pool.py in apply(self, func, args, kwds)
251 '''
252 assert self._state == RUN
--> 253 return self.apply_async(func, args, kwds).get()
254
255 def map(self, func, iterable, chunksize=None):
/home/zah/anaconda3/lib/python3.5/multiprocessing/pool.py in get(self, timeout)
606 return self._value
607 else:
--> 608 raise self._value
609
610 def _set(self, i, obj):
ValueError: Hello
Super kewl project :) Might be nice to have those cute Autodoc/CI/test/coverage badge thingies. The setup is pretty boilerplate and I've done it for other projects https://github.com/kbandla/dpkt (for instance). One of the nice things is the services can 'chime in' on PRs (see kbandla/dpkt#188) as an example... Makes taking contributions a bit easier/nicer..
So @dabeaz will have to register this github project with the following services [readthedocs, travis, coveralls, landscape] or some other set based on his preferences for these things. Once that's done I (or someone else) can setup the boilerplate hooks.
http://sphinx-doc.org/latest/ext/intersphinx.html
E.g., the selectors
module should be automatically linked to the Python docs.
This library is pretty cool. It'd be nice to be able to match the performance of gevent.
What do you say about creating a native extension with Cython to speed the module up?
Curio: Task Crash: monitor
Traceback (most recent call last):
File "c:\Miniconda3\lib\site-packages\curio\kernel.py", line 365, in run
op, *args = current.coro.send(current.next_value)
File "c:\Miniconda3\lib\site-packages\curio\monitor.py", line 143, in monitor
await monrun(kernel)
File "c:\Miniconda3\lib\site-packages\curio\monitor.py", line 70, in monrun
stdin = Stream(sys.stdin.buffer.raw)
File "c:\Miniconda3\lib\site-packages\curio\io.py", line 276, in __init__
os.set_blocking(self._fileno, False)
AttributeError: module 'os' has no attribute 'set_blocking'
I think that adding any complex application level protocols directly in curio (such as HTTP) is not a good idea indeed. However, it would be nice if curio could provide stronger foundation/guidelines for implementing complex protocols. For instance, in asyncio we have Protocols and Transports. They remove the need of writing lots of boiler plate code (like flow control), and they also standardize the approach, so people know how and where to start.
Not sure if I'm overlooking something, but is there a built in way for running multiple coroutines in parallel, a la asyncio.gather()
?
The Monitor class can be configured with a port number but, as far as I can see, the Kernel class or the run() convenience function cannot be so configured.
Unfortunately the default port is already in use on my machine. It would be useful to be able to use an environment variable (CURIOMONITORPORT ?) to specify the port to be used.
I have created a Discourse-based discussion forum with a section for Curio. http://forum.dabeaz.com/c/curio
My code:
# show.py
import curio
if __name__ == '__main__':
kernel = curio.Kernel()
And I failed to run it
$ python3 show.py
Traceback (most recent call last):
File "show.py", line 2, in
import curio
File "/usr/lib/python3.4/site-packages/curio/init.py", line 5, in
from .kernel import *
File "/usr/lib/python3.4/site-packages/curio/kernel.py", line 79
async def join(self, *, timeout=None):
^
SyntaxError: invalid syntax
My Environment:
$ pip freeze
//---snip---
curio==0.1$ python3
Python 3.4.3 (default, Jun 29 2015, 12:16:01)
[GCC 5.1.1 20150618 (Red Hat 5.1.1-4)] on linux
Anyone has ideas on it? Thank you
https://travis-ci.org/dabeaz/curio should not have any warnings generated when building the documentation. Once all warnings have been eliminated then -W
can be added to the Travis file so that any warnings become errors.
I looked for this type of library when I found Twisted doesn't have python3.5 support. I watched a video on YouTube of you at a conference (which was humorous and very informative) and began to research to develop some concurrency for my specific purposes. In the hunt, I came across curio and after reading a little from the Docs, and long before reaching the end of the page, I knew that it was your code and decided to try to implement it.
That being said,I am instituting a UDP server with clients and was surprised that socket.DATA_GRAM isn't supported with curio."socket wrapper" ; curio.network.tcp_server is being called. I looked through the Docs and did not see any mention of UDP or DATA_GRAM. Will udp_server be supported with curio.socket?
Update: based on the discussion below, I now think that the resolution is that Curio should ideally:
await sock.writeable()
(or whatever spelling is preferred)TCP_NOTSENT_LOWAT
on sockets whenever possible/convenient, with a buffer size of ~8-16 KiB. This only provides full benefits for code that uses sock.writeable()
, but it provides some benefits regardless.Original report follows, though note that a bunch of my early statements are incomplete/wrong:
So I just stumbled across a rather arcane corner of the Linux/OS X socket API while trying to understand why I'm seeing weird buffering behavior in some complicate curio/kernel interaction.
As we know, calling socket.send
doesn't immediately dump data onto the network; instead the kernel just sticks it into the socket's send buffer, to be trickled out to the network as and when possible. Or, if the send buffer is full, then the kernel will reject your data and tell you to try again later. (Assuming non-blocking mode of course.)
But for various reasons, it turns out that the kernel's send buffer is usually way larger than you actually want it to be, which means you can end up queueing up a huge amount of data that will take forever to trickle out, introducing latency and causing various problems.
So at least Linux and OS X have introduced the euphoniously named and terribly documented TCP_NOTSENT_LOWAT
feature. Basically what it does is let you use setsockopt
to tell the kernel -- hey, I know that you're willing to buffer, like, 5 MiB of data on this socket. But I don't actually want to do that. Can you only wake me up when the amount of data that's actually buffered drops below, like, 128 KiB, and I'll just top it up to there? (This is a bit of a simplification because there are some subtleties about how you budget for data that's queued to send vs. data that's been sent-but-not-yet-acked, but it's good enough to go on.)
But it turns out that TCP_NOTSENT_LOWAT only affects polling-for-writability. So you absolutely can have a socket where select
and friends say "not writeable", but at the same time send
is happy to let you write lots and lots of data. And this is bad, because it turns out literally the only way the kernel is willing to give you the information you need to avoid over-buffering is with that "not writeable" signal.
So if you want to avoid over-buffering, then you have to always call select
-or-whatever before you call send
, and only proceed if the socket is claimed to be writeable.
And unfortunately, right now, curio never does this: it always tries calling send
first, and then only if that fails does it block waiting for writeability.
The solution is simple: before calling socket.send
, check that the kernel thinks the socket is writeable.
The obvious way to do this would be to replace the current implementation of Socket.send
with something like:
async def send(self, data, flags=0):
while True:
await _write_wait(self._fileno)
try:
return self._socket_send(data, flags)
except WantWrite:
pass
except WantRead:
await _read_wait(self._fileno)
(and similarly for the other methods. I guess sendall
could usefully be rewritten in terms of send
, and I'm not sure what if anything would need to be done for sendmsg
and sendto
. TCP_NOTSENT_LOWAT doesn't apply to UDP, so for sendto
it maybe doesn't matter, but I guess might be better to be safe? And I don't remember what sendmsg
is for at all.)
The one potential downside of this strategy that I can see is that right now, send
never blocks unless the write actually fails, and if we add an await _write_wait
then it will generally suspend the coroutine for one "tick" before doing the actual write, even when the write could have been done without blocking. I guess this might actually be a good thing in that it could promote fairness (think of a coroutine that's constantly writing to a socket with a fast reader, so the writes always succeed and it ends up starving everyone else...), but it might have some performance implications too.
The alternative, which preserves the current semantics, would be to do a quick synchronous check up front, like:
async def send(self, data, flags=0):
if not select.select([], [self._fileno], [], 0)[1]:
await _write_wait(self._fileno)
while True:
try:
return self._socket_send(data, flags)
except WantWrite:
await _write_wait(self._fileno)
except WantRead:
await _read_wait(self._fileno)
How I managed to confirm this for myself (Linux specific, and mostly recording this for reference, not really any need to read it):
echo 128000 | sudo tee /proc/sys/net/ipv4/tcp_notsent_lowat
socat TCP-LISTEN:4002 STDOUT
in one terminal.In [1]: import socket
In [2]: import select
In [3]: s = socket.create_connection(("localhost", 4003))
In [4]: import struct
In [5]: import fcntl
In [6]: def get_send_buf_size(sock):
...: return struct.unpack("I", fcntl.ioctl(sock.fileno(), 0x5411, b"\0\0\
...: 0\0"))[0] # 0x5411 = SIOCOUTQ
...:
In [7]: s.setblocking(False)
Start filling up our send buffer. At first the data goes into our send buffer and then immediately drains into the other side's receive buffer, but since the other side is asleep then eventually this stops and our send buffer starts filling up:
In [8]: get_send_buf_size(s)
Out[8]: 0
In [9]: s.send(b"x" * 1000000)
Out[9]: 174760
In [10]: get_send_buf_size(s)
Out[10]: 0
In [11]: s.send(b"x" * 1000000)
Out[11]: 523864
In [12]: get_send_buf_size(s)
Out[12]: 0
In [13]: s.send(b"x" * 1000000)
Out[13]: 261932
In [14]: get_send_buf_size(s)
Out[14]: 0
In [15]: s.send(b"x" * 1000000)
Out[15]: 130966
In [16]: get_send_buf_size(s)
Out[16]: 121366
Okay, there are 121366 bytes enqueued in our send buffer. That's a little bit below the TCP_NOTSENT_LOWAT that we set, so our socket should still be writeable:
In [17]: select.select([], [s], [])
Out[17]:
([],
[<socket.socket fd=11, family=AddressFamily.AF_INET, type=2049, proto=6, laddr=('127.0.0.1', 52146), raddr=('127.0.0.1', 4003)>],
[])
Put some more data in, pushing it over the 128000 limit:
In [18]: s.send(b"x" * 10000)
Out[18]: 10000
In [19]: get_send_buf_size(s)
Out[19]: 131366
Now if we check using select
, it's not writeable:
In [20]: select.select([], [s], [])
# hangs until:
KeyboardInterrupt:
BUT if we call send
, then no problem, we can definitely write more data to this "non writeable" socket:
In [21]: s.send(b"x" * 100000)
Out[21]: 55483
In [22]: get_send_buf_size(s)
Out[22]: 186849
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.