Comments (5)
Hello, @a14n!
Unfortunately, we can't use aio-pika RPC way. The main goal of FastStream RPC is do not create a special queue to consume responses. Thus, FastStream uses RMQ Direct Reply-to feature to send RPC requests. But, this mechanism has a limitation - you can't send multiple requests at the same time from one channel-connection pair. So, we can't send RPC requests concurrently until #975 is not released.
Although, RPC is not a correct case for RMQ: we already discussed it in #1252.
If you want to consume persistent request-reply stream, please create persistent subscriber and use reply_to
header instead of RPC. If you want to match replies with original requests, you can use smth like the following snippet (from Discord)
from asyncio import Future
from contextlib import asynccontextmanager
from typing import Annotated, Any
from uuid import uuid4
from faststream import Context, FastStream, context
from faststream.rabbit import RabbitBroker
@asynccontextmanager
async def lifespan():
context.set_global("replies_container", {})
yield
RepliesContainer = Annotated[dict[str, Future[Any]], Context("replies_container")]
broker = RabbitBroker()
app = FastStream(broker, lifespan=lifespan)
@broker.subscriber("replies")
def handle_reply(
data,
container: RepliesContainer,
cor_id: str = Context("message.correlation_id"),
):
if (future := container.pop(cor_id, None)) and not future.done():
future.set_result(data)
async def custom_rpc(
broker: NatsBroker,
msg: Any,
subject: str,
container: RepliesContainer,
) -> Any:
cor_id = uuid4().hex
container[cor_id] = result_future = Future()
await broker.publish(msg, subject, correlation_id=cor_id)
return await result_future
# Emulation
@broker.subscriber("in")
@broker.publisher("replies")
async def handle_request(data):
"""Your hardcoded not-FastStream service."""
return data
@app.after_startup
async def t(container: RepliesContainer):
response = await custom_rpc(broker, "test", "in", container)
print(response)
This code is the same with aio-pika example thing, so it should solve your problem
from faststream.
Thanks for your quick reply.
from faststream.
@broker.subscriber("replies")
Hello
If there are multiple producers, and multiple producers are subscribed to replies, and if messages sent by consumers to replies are received by other producers (consumers acting as replicas), then the producer who published the task will not receive the message.
from faststream.
producer code:
import os
import random
from faststream.rabbit import RabbitBroker
from faststream import Context, FastStream, context
from fastapi import FastAPI
import uvicorn
from contextlib import asynccontextmanager
from asyncio import Future
from typing import Annotated, Any
import uuid
RepliesContainer = Annotated[dict[str, Future[Any]], Context("replies_container")]
broker = RabbitBroker()
@asynccontextmanager
async def fastapi_lifespan(fastapi_app: FastAPI):
context.set_global("replies_container", {})
await broker.start()
yield
await broker.close()
fastapi_app = FastAPI(lifespan=fastapi_lifespan)
@broker.subscriber("replies")
def handle_reply(
data,
container: RepliesContainer,
cor_id: str = Context("message.correlation_id"),
):
print('handle_reply receives message', data, cor_id, os.getpid())
if (future := container.pop(cor_id, None)) and not future.done():
future.set_result(data)
async def custom_rpc(
broker: RabbitBroker,
msg: Any,
subject: str,
container: RepliesContainer,
) -> Any:
cor_id = uuid.uuid4().hex
container[cor_id] = result_future = Future()
print('Publish task', msg, cor_id, os.getpid())
await broker.publish(msg, subject, correlation_id=cor_id)
print('Publish task success', msg, cor_id, os.getpid())
return await result_future
@fastapi_app.post('/push_task')
async def push_task():
msg = 'test' + str(random.randint(100, 999))
container = context.get('replies_container')
response = await custom_rpc(broker, msg, "in", container)
print('response received', msg, response, os.getpid())
Consumer code:
from faststream import Context, FastStream, context
from faststream.rabbit import RabbitBroker
from asyncio import Future
from contextlib import asynccontextmanager
from typing import Annotated, Any
@asynccontextmanager
async def lifespan():
context.set_global("replies_container", {})
yield
RepliesContainer = Annotated[dict[str, Future[Any]], Context("replies_container")]
broker = RabbitBroker()
app = FastStream(broker, lifespan=lifespan)
@broker.subscriber("in")
@broker.publisher("replies")
async def handle_request(data):
"""Your hardcoded not-FastStream service."""
print('received task', data)
return data
log:
Publish task test482 cc99276685bd4ac19227680b44743231 9744
Publish task success test482 cc99276685bd4ac19227680b44743231 9744
INFO: Application startup complete.
2024-04-03 11:18:44,145 INFO - default | replies | 1b9d8205b0 - Received
handle_reply receives message test482 cc99276685bd4ac19227680b44743231 9744
response received test482 test482 9744
INFO: 127.0.0.1:16378 - "POST /push_task HTTP/1.1" 200 OK
2024-04-03 11:18:44,155 INFO - default | replies | 1b9d8205b0 - Processed
Publish task test225 46555d6651c943d5a835f10e88a79789 9744
Publish task success test225 46555d6651c943d5a835f10e88a79789 9744
2024-04-03 11:18:47,772 INFO - default | replies | 95783da1b1 - Received
response received test225 46555d6651c943d5a835f10e88a79789 14688
2024-04-03 11:18:47,780 INFO - default | replies | 95783da1b1 - Processed
When the process with pid 9744 publishes a task, and the response happens to be received by 9744, then the response is normal. However, if the task published by 9744 is received by 14688, the response cannot be obtained.
Are there any good ideas to solve this problem?
from faststream.
Are there any good ideas to solve this problem?
Already solved, the queue is generated based on the pid, and the consumer adds messages to the specified queue.
@broker.subscriber("in")
async def handle_request(data: dict, cor_id: str = Context("message.correlation_id")):
"""Your hardcoded not-FastStream service."""
print('收到任务', data, cor_id)
reply_queue = data.pop('reply_queue', None)
await broker.publish(message=data, queue=reply_queue, correlation_id=cor_id)
from faststream.
Related Issues (20)
- RFE: NATS same subject subscription not registering handlers HOT 1
- Implement explicit methods annotations in Kafka and confluent brokers (Refer RabbitBroker) HOT 1
- Feature: use subscribe in KafkaBroker to create handlers with explicit options propagation
- Bug: Warning about deprecated features when using latest RabbitMQ HOT 2
- Feature: Allow passing kwargs to underlying broker library HOT 2
- Bug: rpc calls cannot be executed asynchronously HOT 1
- Feature: Deferred initialization of topic names HOT 1
- Bug: After Closing a RabbitBroker(), others RabbitBroker() can't publish anymore HOT 1
- Create docs according to design (image)
- Add dev containers (for different brokers and for all brokers)
- Bug: AsyncConfluentProducer / AsyncConfluentConsumer are not that async
- Bug: Reload option sometimes triggers TypeError: not all arguments converted during string formatting HOT 1
- ActiveMQ support HOT 3
- Bug: Deserialization of list[Model[T]] not working in v5 HOT 3
- Bug:<RedisBroker> failed to connect to Redis instance by <url> HOT 3
- Bug: unbound error on base middleware handler HOT 4
- Bug: Publisher Direct Usage HOT 4
- Feature: more stronger unique RPC subject names
- Bug: Router Prefix breaks Path() from properly retrieving the subject token defined.
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from faststream.