Git Product home page Git Product logo

Comments (5)

Lancetnik avatar Lancetnik commented on June 11, 2024

Hello, @a14n!

Unfortunately, we can't use aio-pika RPC way. The main goal of FastStream RPC is do not create a special queue to consume responses. Thus, FastStream uses RMQ Direct Reply-to feature to send RPC requests. But, this mechanism has a limitation - you can't send multiple requests at the same time from one channel-connection pair. So, we can't send RPC requests concurrently until #975 is not released.

Although, RPC is not a correct case for RMQ: we already discussed it in #1252.

If you want to consume persistent request-reply stream, please create persistent subscriber and use reply_to header instead of RPC. If you want to match replies with original requests, you can use smth like the following snippet (from Discord)

from asyncio import Future
from contextlib import asynccontextmanager
from typing import Annotated, Any
from uuid import uuid4

from faststream import Context, FastStream, context
from faststream.rabbit import RabbitBroker


@asynccontextmanager
async def lifespan():
    context.set_global("replies_container", {})
    yield


RepliesContainer = Annotated[dict[str, Future[Any]], Context("replies_container")]

broker = RabbitBroker()
app = FastStream(broker, lifespan=lifespan)


@broker.subscriber("replies")
def handle_reply(
    data,
    container: RepliesContainer,
    cor_id: str = Context("message.correlation_id"),
):
    if (future := container.pop(cor_id, None)) and not future.done():
        future.set_result(data)


async def custom_rpc(
    broker: NatsBroker,
    msg: Any,
    subject: str,
    container: RepliesContainer,
) -> Any:
    cor_id = uuid4().hex
    container[cor_id] = result_future = Future()
    await broker.publish(msg, subject, correlation_id=cor_id)
    return await result_future


# Emulation

@broker.subscriber("in")
@broker.publisher("replies")
async def handle_request(data):
    """Your hardcoded not-FastStream service."""
    return data


@app.after_startup
async def t(container: RepliesContainer):
    response = await custom_rpc(broker, "test", "in", container)
    print(response)

This code is the same with aio-pika example thing, so it should solve your problem

from faststream.

a14n avatar a14n commented on June 11, 2024

Thanks for your quick reply.

from faststream.

v2boardbot avatar v2boardbot commented on June 11, 2024
@broker.subscriber("replies")

Hello

If there are multiple producers, and multiple producers are subscribed to replies, and if messages sent by consumers to replies are received by other producers (consumers acting as replicas), then the producer who published the task will not receive the message.

from faststream.

v2boardbot avatar v2boardbot commented on June 11, 2024

producer code:

import os
import random

from faststream.rabbit import RabbitBroker
from faststream import Context, FastStream, context
from fastapi import FastAPI
import uvicorn

from contextlib import asynccontextmanager
from asyncio import Future
from typing import Annotated, Any
import uuid


RepliesContainer = Annotated[dict[str, Future[Any]], Context("replies_container")]
broker = RabbitBroker()


@asynccontextmanager
async def fastapi_lifespan(fastapi_app: FastAPI):
    context.set_global("replies_container", {})
    await broker.start()
    yield
    await broker.close()


fastapi_app = FastAPI(lifespan=fastapi_lifespan)


@broker.subscriber("replies")
def handle_reply(
    data,
    container: RepliesContainer,
    cor_id: str = Context("message.correlation_id"),
):
    print('handle_reply receives message', data, cor_id, os.getpid())
    if (future := container.pop(cor_id, None)) and not future.done():
        future.set_result(data)


async def custom_rpc(
    broker: RabbitBroker,
    msg: Any,
    subject: str,
    container: RepliesContainer,
) -> Any:
    cor_id = uuid.uuid4().hex
    container[cor_id] = result_future = Future()
    print('Publish task', msg, cor_id, os.getpid())
    await broker.publish(msg, subject, correlation_id=cor_id)
    print('Publish task success', msg, cor_id, os.getpid())
    return await result_future


@fastapi_app.post('/push_task')
async def push_task():
    msg = 'test' + str(random.randint(100, 999))
    container = context.get('replies_container')
    response = await custom_rpc(broker, msg, "in", container)
    print('response received', msg, response, os.getpid())

Consumer code:

from faststream import Context, FastStream, context
from faststream.rabbit import RabbitBroker

from asyncio import Future
from contextlib import asynccontextmanager
from typing import Annotated, Any

@asynccontextmanager
async def lifespan():
    context.set_global("replies_container", {})
    yield

RepliesContainer = Annotated[dict[str, Future[Any]], Context("replies_container")]

broker = RabbitBroker()
app = FastStream(broker, lifespan=lifespan)


@broker.subscriber("in")
@broker.publisher("replies")
async def handle_request(data):
    """Your hardcoded not-FastStream service."""
    print('received task', data)
    return data

log:

Publish task test482 cc99276685bd4ac19227680b44743231 9744
Publish task success test482 cc99276685bd4ac19227680b44743231 9744
INFO:     Application startup complete.
2024-04-03 11:18:44,145 INFO     - default | replies | 1b9d8205b0 - Received
handle_reply receives message test482 cc99276685bd4ac19227680b44743231 9744
response received test482 test482 9744
INFO:     127.0.0.1:16378 - "POST /push_task HTTP/1.1" 200 OK
2024-04-03 11:18:44,155 INFO     - default | replies | 1b9d8205b0 - Processed
Publish task test225 46555d6651c943d5a835f10e88a79789 9744
Publish task success test225 46555d6651c943d5a835f10e88a79789 9744
2024-04-03 11:18:47,772 INFO     - default | replies | 95783da1b1 - Received
response received test225 46555d6651c943d5a835f10e88a79789 14688
2024-04-03 11:18:47,780 INFO     - default | replies | 95783da1b1 - Processed

When the process with pid 9744 publishes a task, and the response happens to be received by 9744, then the response is normal. However, if the task published by 9744 is received by 14688, the response cannot be obtained.

Are there any good ideas to solve this problem?

from faststream.

v2boardbot avatar v2boardbot commented on June 11, 2024

Are there any good ideas to solve this problem?

Already solved, the queue is generated based on the pid, and the consumer adds messages to the specified queue.

@broker.subscriber("in")
async def handle_request(data: dict, cor_id: str = Context("message.correlation_id")):
    """Your hardcoded not-FastStream service."""
    print('收到任务', data, cor_id)
    reply_queue = data.pop('reply_queue', None)
    await broker.publish(message=data, queue=reply_queue, correlation_id=cor_id)

from faststream.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.