Git Product home page Git Product logo

purerpc's People

Contributors

banana-string-theory avatar belm0 avatar frepond avatar ledmonster avatar penguin138 avatar standy66 avatar stereobutter avatar vincentvanlaer avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

purerpc's Issues

[Discussion] Make integrations with asyncio projects more friendly

Right now purerpc uses curio event loop, which is really pleasant to use (thx @dabeaz) but limits interoperability with other asyncio projects. Curio was chosen in 2017 mainly because of this @njsmith's post. As of 2019, some things changed:

  1. Python 3.7 got decent upgrade to asyncio, some problems mentioned in the blog post were fixed, e.g. StreamWriter now has wait_closed() method, there is now a asyncio.get_running_loop() function and other new stuff. Some may say, as asyncio evolves and becomes more async/await-native, there is a possibility to migrate to it, especially given that almost all event loop logic in purerpc is abstracted away in grpc_socket.py. This is one of the paths going forward, and we won't have to think about asyncio interoperability anymore, but there is caveats. Asycnio still lacks curio's wonderful TaskGroups, Thread and Process pools.
  2. curio.bridge.AsyncioLoop may be used to bridge together two worlds: asyncio's and curio's. I don't know whether or not there are performance implications and caveats with this approach, but we get to keep awesome curio functionality. We used this bridge internally with aiopg and it worked fine (cc @penguin138). But we need to make this bridge transparent to the user (maybe some tornado docs may help), so curio and asyncio work together in handlers:
async some_purerpc_handler(...):
  async for request in requests:
    await asyncio.sleep
    await aiohttp_fetch(request.url)
  1. We can look at Nathaniel's trio library which kinda improves on what David has built in curio. There is also trio-asyncio bridge, but I've never heard of anyone using it.

ClientStubStreamStream uses yield inside a task group

See

async def call_aiter(self, message_aiter, metadata):
stream = await self._stream_fn(metadata=metadata)
async with anyio.create_task_group() as task_group:
task_group.start_soon(send_multiple_messages_client, stream, message_aiter)
async for value in stream_to_async_iterator(stream):
yield value

This causes internal corruption in trio.

I think there are two potential solutions:

  • Don't run the sending stream side of the client when yielding. One way to accomplish this is to cancel the sending side whenever a message is received from the server. This does require some form of cancel safety of the sending stream (meaning that if send_message is canceled, the message must not be sent). An implementation would then look something like this:
NO_MESSAGE_WAITING = object()


class ClientStubStreamStream(ClientStub):
    async def call_aiter(self, message_aiter, metadata):
        # If send_multiple is cancelled while trying to send a message, that message will be saved in this variable
        current_message = NO_MESSAGE_WAITING

        async def send_multiple():
            nonlocal current_message

            while True:
                try:
                    # Check if we got cancelled while trying to send a message
                    if current_message is NO_MESSAGE_WAITING:
                        current_message = await message_aiter.__anext__()
                except StopAsyncIteration:
                    return
                await stream.send_message(current_message)  # This must be cancel safe
                current_message = NO_MESSAGE_WAITING

        stream = await self._stream_fn(metadata=metadata)
        async with aclosing(message_aiter) as message_aiter:
            try:
                while True:
                    async with anyio.create_task_group() as task_group:
                        task_group.start_soon(send_multiple)
                        try:
                            value = await stream_to_async_iterator(stream).__anext__()
                        except StopAsyncIteration:
                            return
                        task_group.cancel_scope.cancel()
                    # Outside the task group
                    yield value
            finally:
                stream.close()
  • Wrap the generator in a context manager, like trio-utils provides. This would be a significant API change.

I prefer the second option as it is significantly cleaner and has less sharp edges regarding cancel safety.

Support creating channels on unix sockets

I need to read/write from a unix socket in my rpc client.

I had a little dig around and it seems like the current implementation here assumes a tcp stream.

https://github.com/standy66/purerpc/blob/5faa35f2cdf5b36d1398a1aa6a9e681c5344060f/src/purerpc/client.py#L23

Seems like anyio does support unix sockets:

https://anyio.readthedocs.io/en/stable/networking.html#working-with-unix-sockets

I am not sure how you imagine this to work in the current insecure_channel API? Or would it be another API method?

If you provide some direction, I can try to submit the patch. Or otherwise, help test it.

Thanks for this rad library ✋

FYI they pushed a fix for this in the gRPC implementation over at grpc/grpc#5701.

Have support multi process?

My code:

server = Server(50055, num_processes=5)

server.add_service(JobEngine().service)

server.serve(backend="asyncio")

Exception:
TypeError: init() got an unexpected keyword argument 'num_processes'

I don't understand why your example code is shown as if good.

  • misc/greeter/failing_server.py
  • misc/greeter/main_pingpong_servicer.py

Could you tell me why?

environment:
purerpc==0.5.2
python == 3.6

Crash when message is larger than max

When message is larger than maximum we crash instead of reporting the error to the peer:

[2019-03-06 09:53:54,561 - root - ERROR]:  Got exception in main dispatch loop
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/purerpc/server.py", line 173, in __call__
    async for stream in self.grpc_socket.listen():
  File "/usr/local/lib/python3.6/site-packages/purerpc/grpc_socket.py", line 228, in listen
    async for stream in self._listen():
  File "/usr/local/lib/python3.6/site-packages/purerpc/grpc_socket.py", line 194, in _listen
    events = self._grpc_connection.receive_data(data)
  File "/usr/local/lib/python3.6/site-packages/purerpc/grpclib/connection.py", line 181, in receive_data
    grpc_events.extend(self._data_received(event))
  File "/usr/local/lib/python3.6/site-packages/purerpc/grpclib/connection.py", line 110, in _data_received
    event.flow_controlled_length)
  File "/usr/local/lib/python3.6/site-packages/purerpc/grpclib/buffers.py", line 78, in data_received
    self._process_new_messages()
  File "/usr/local/lib/python3.6/site-packages/purerpc/grpclib/buffers.py", line 116, in _process_new_messages
    result, flow_controlled_length = self._parse_one_message()
  File "/usr/local/lib/python3.6/site-packages/purerpc/grpclib/buffers.py", line 103, in _parse_one_message
    raise MessageTooLargeError("Received message larger than max: "
purerpc.grpclib.exceptions.MessageTooLargeError: Received message larger than max: 4440990 > 4194304
[2019-03-06 09:53:54,563 - curio.kernel - ERROR]:  Task Crash: Task(id=293457, name='ConnectionHandler.request_received', state='TERMINATED')
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/curio/kernel.py", line 736, in _run_coro
    trap = current._send(current.next_value)
  File "/usr/local/lib/python3.6/site-packages/curio/task.py", line 167, in _task_runner
    return await coro
  File "/usr/local/lib/python3.6/site-packages/purerpc/server.py", line 119, in request_received
    await stream.start_response()
  File "/usr/local/lib/python3.6/site-packages/purerpc/grpc_proto.py", line 37, in start_response
    custom_metadata)
  File "/usr/local/lib/python3.6/site-packages/purerpc/grpc_socket.py", line 153, in start_response
    await self._socket.flush()
  File "/usr/local/lib/python3.6/site-packages/purerpc/grpc_socket.py", line 42, in flush
    await self._socket.sendall(data)
  File "/usr/local/lib/python3.6/site-packages/curio/io.py", line 179, in sendall
    nsent = self._socket_send(buffer, flags)
OSError: [Errno 9] Bad file descriptor

BREAKING CHANGE: Stable API for 1.0 release

We need to fix the API for 1.0 release. Unfortunately, there are going to be some breaking changes ;)
Here are some of them:

  • stream_stream client call (and maybe stream_unary) should really be an async context manager: there is implicit thread of execution running that's feeding the internal send buffer with data from async iterable. If the exception occurs when sending the data (e.g. server expects 3 messages, but client sends 4), this exception should not be discarded. This is in contrast to Google's official grpc implementation, which in the example above will silently discard the exception. I do not think this is reasonable.
  • #4: I think I am going to make optional request argument mandatory context argument.
  • Need to expose stream-like API for those, who does not need separate execution thread for stream_stream calls, this may break something, but I am not sure.
  • Maybe we should change generated files suffix from _grpc.py to something more self-describing, like _purerpc.py

Overall, I think the API should be as close as it can get to grpcio modulo the corner cases described above.

Review TODOs

Find and evaluate remaining TODOs in the code.

dictConfig in misc/greeter is excessive

It doesn't make sense to have the copy-pasted verbose logging configuration in main.py, main_pingpong.py and main_pingpong_servicer.py, which has the effect equal to

logging.basicConfig(format="[%(asctime)s - %(name)s - %(levelname)s]:  %(message)s", level=logging.WARNING)

test_util requires forge dependency

if purerpc.test_utils is part of the public API, then it's dependencies need to be represented in setup.py install_requires.

test_utils depends on forge package:

    from purerpc.test_utils import run_purerpc_service_in_process, purerpc_channel
.venv/lib/python3.7/site-packages/purerpc/test_utils.py:21: in <module>
    import forge
E   ModuleNotFoundError: No module named 'forge'

yet forge is only listed in tests_require:
https://github.com/standy66/purerpc/blob/5faa35f2cdf5b36d1398a1aa6a9e681c5344060f/setup.py#L76-L84

(tests_require is deprecated, and in any case was only for specifying the dependencies needed to run the package's tests)

If test_utils is rarely used or has heavy dependencies, then perhaps add a corresponding "test_utils" build variant using the extras_require option of setuptools.

Docs needed

For all public interfaces and maybe even explaining internal details to show that gRPC is really a very simple protocol on top of HTTP/2

Broken Pipe on long running Service Stub

When trying to call method of long running service, BrokenPipeError is raised due to the fact that
connection that was used to send data was broken and wasn't restarted.
For example, for the following service defined in my_service.proto:

syntax = "proto3"
service MyService {
    rpc MyMethod (InputMessage) returns (OutputMessage);
}

and the following python script my_script.py:

import curio
from purerpc import Channel
from my_service_grpc import MyServiceStub
from my_service_pb2 import InputMessage


async def main():
    channel = Channel("localhost", 8000)
    my_service_stub = MyServiceStub(channel)
    request =InputMessage()
    response = await my_service_stub.MyMethod(request)

if __name__ == "__main__":
    curio.run(main)

we have the following traceback:

Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/purerpc/server.py", line 132, in request_received
    await call_server_stream_stream(method_fn, stream)
  File "/usr/local/lib/python3.6/site-packages/purerpc/wrappers.py", line 74, in call_server_stream_stream
    await send_multiple_messages_server(stream, func(input_message_stream))
  File "/usr/local/lib/python3.6/site-packages/purerpc/wrappers.py", line 33, in send_multiple_messages_server
    async for message in tmp:
  File "/usr/local/lib/python3.6/site-packages/my_script.py", line 11, in MyMethod
    response = await my_service_stub.MyMethod(request)
  File "/usr/local/lib/python3.6/site-packages/purerpc/wrappers.py", line 84, in __call__
    stream = await self._stream_fn()
  File "/usr/local/lib/python3.6/site-packages/purerpc/client.py", line 35, in rpc
    self.channel.port))
  File "/usr/local/lib/python3.6/site-packages/purerpc/grpc_proto.py", line 48, in start_request
    content_type_suffix if content_type_suffix else "+proto", custom_metadata
  File "/usr/local/lib/python3.6/site-packages/purerpc/grpc_socket.py", line 231, in start_request
    await self._socket.flush()
  File "/usr/local/lib/python3.6/site-packages/purerpc/grpc_socket.py", line 32, in flush
    await self._socket.sendall(data)
  File "/usr/local/lib/python3.6/site-packages/curio/io.py", line 179, in sendall
    nsent = self._socket_send(buffer, flags)
BrokenPipeError: [Errno 32] Broken pipe

ExceptionGroup support

Since anyio 4.0, all task groups raise ExceptionGroups by default. We need to check whether that causes us to miss exceptions, and whether we are passing exception groups to users where we do not intent to raise one.

I did a quick scan for where we use task groups in the main code, assuming that no other anyio functions raise exception groups:

task_group = await self.enter_async_context(anyio.create_task_group())

task_group = await self.enter_async_context(anyio.create_task_group())

async def __call__(self, message_aiter, *, metadata=None):
stream = await self._stream_fn(metadata=metadata)
async with anyio.create_task_group() as task_group:
task_group.start_soon(send_multiple_messages_client, stream, message_aiter)
return await extract_message_from_singleton_stream(stream)

async def call_aiter(self, message_aiter, metadata):
stream = await self._stream_fn(metadata=metadata)
async with anyio.create_task_group() as task_group:
task_group.start_soon(send_multiple_messages_client, stream, message_aiter)
yield stream_to_async_iterator(stream)

async with GRPCProtoSocket(self.config, stream_) as grpc_socket:
# TODO: resource usage warning
# TODO: TaskGroup() uses a lot of memory if the connection is kept for a long time
# TODO: do we really need it here?
async with anyio.create_task_group() as task_group:
async for stream in grpc_socket.listen():
task_group.start_soon(self.request_received, stream)

Other uses of tasks groups is in tests and some sample scripts. While those probably should be double checked, the ones above are the most important ones.

TLS

This seems to be fairly easy. Curio already has wrappers around ssl module in standard library. SSLContext (and, in case of HTTP/2 client, server hostname, ALPN protocol list) needs to be passed down to curio.open_connection and curio.tcp_server/curio.run_server.

maintenance tasks

The purerpc repo was transferred to python-trio. Here are the short-term tasks I'm intending, with the goal of pushing a maintenance release to pypi for active dependees that may be having problems with the old anyio 1.x dependency of this package:

  • note on README crediting original author, explain that the project is in maintenance mode (keep things working for active dependees), point to https://github.com/grpc/grpc for people needing asyncio only
  • misc: doc updates, TODO's I noted while walking through the code, prune unused imports
  • set up CI (travis-ci.org is defunct, use github actions), pin requirements for reproducible tests
  • upgrade to anyio 3.x. It does imply that curio support will be dropped. Related to the upgrade, Server.serve() will be strongly discouraged in favor of a newly-added Server.serve_async(). PR #33
  • drop Python 3.5 and 3.6 support - they are past end of life, and it's a large burden to the implementation verbosity and performance (async_generator shim, etc.)
  • remove runtime patching of anyio.run(). This was to support backend="uvloop" (run() already supports use_uvloop backend-specific option, use the correct way) and the PURERPC_BACKEND environment variable (introduce something like purerpc_run()).
  • tag release and push to pypi

tests fail on Python 3.8 (macOS)

e.g.

_____ ERROR at setup of test_grpc_client_large_payload[purerpc_echo_port] ______

cm_gen = <function run_purerpc_service_in_process.<locals>.target_fn at 0x10c175310>

    @contextlib.contextmanager
    def _run_context_manager_generator_in_process(cm_gen):
        parent_conn, child_conn = multiprocessing.Pipe(duplex=False)
        target_fn = _wrap_gen_in_process(child_conn)(cm_gen)

        process = multiprocessing.Process(target=target_fn)
>       process.start()

src/purerpc/test_utils.py:97:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
../../.pyenv/versions/3.8.5/lib/python3.8/multiprocessing/process.py:121: in start
    self._popen = self._Popen(self)
../../.pyenv/versions/3.8.5/lib/python3.8/multiprocessing/context.py:224: in _Popen
    return _default_context.get_context().Process._Popen(process_obj)
../../.pyenv/versions/3.8.5/lib/python3.8/multiprocessing/context.py:284: in _Popen
    return Popen(process_obj)
../../.pyenv/versions/3.8.5/lib/python3.8/multiprocessing/popen_spawn_posix.py:32: in __init__
    super().__init__(process_obj)
../../.pyenv/versions/3.8.5/lib/python3.8/multiprocessing/popen_fork.py:19: in __init__
    self._launch(process_obj)
../../.pyenv/versions/3.8.5/lib/python3.8/multiprocessing/popen_spawn_posix.py:47: in _launch
    reduction.dump(process_obj, fp)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

obj = <Process name='Process-1' parent=73643 initial>
file = <_io.BytesIO object at 0x10c1ddf40>, protocol = None

    def dump(obj, file, protocol=None):
        '''Replacement for pickle.dump() using ForkingPickler.'''
>       ForkingPickler(file, protocol).dump(obj)
E       AttributeError: Can't pickle local object 'run_purerpc_service_in_process.<locals>.target_fn'

../../.pyenv/versions/3.8.5/lib/python3.8/multiprocessing/reduction.py:60: AttributeError

How do I have a long-lasting client?

I am writing an async app (using Trio). I have an Application class with various async "handle_x_event" methods that get scheduled in the Trio nursery. Some such events need to trigger a gRPC call.

Currently, this only works if I create the channel and stub right when I need to use it:

class Application:
    def __init__(self):
        # various setup omitted

    async def handle_keystrokes(self):
        # handle some keys

    async def handle_document_updates(self):
        # handle document changes

    async def handle_screen_updates(self):
        async for _ in self.screen_update.events():
            logging.debug("sending screen update")
            req = self.make_request()
            logging.debug("sending {!r}".format(req))
            async with purerpc.insecure_channel('127.0.0.1', TABULA_PORT) as rpc_channel:
                stub = imprimare_grpc.TabulaStub(rpc_channel)
                response = await stub.UpdateDisplay(req)
            logging.debug(response)

async def main():
    async with trio.open_nursery() as nursery:
        application = Application()
        nursery.start_soon(application.handle_keystrokes)
        nursery.start_soon(application.handle_document_updates)
        nursery.start_soon(application.handle_screen_updates)

If I try to create the rpc_channel or the stub in my main function and pass it in to the Application class for use, then the await stub.UpdateDisplay(req) call never returns. There's no error message; that task just silently waits forever, and the server does not receive my request.

I want a long-lasting client connection. How can I set one up?

Give access to request headers and ability to respond with custom headers

Need to address 4 use cases:

  1. Client wants to send additional request headers to server
  2. Client wants to read all of the response headers or trailers
  3. Server wants to read all of the request headers
  4. Server wants to send additional response headers or trailers to client

In gRPC, methods differ by cardinality: request may be either unary in both ways (UNARY_UNARY), client-side streaming (STREAM_UNARY), server-side streaming (UNARY_STREAM) and bidirectional streaming (STREAM_STREAM). Keeping that in mind, some cases listed above are far more difficult to design than the others.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.