Git Product home page Git Product logo

fastapi-mqtt's Introduction

fastapi-mqtt

MQTT is a lightweight publish/subscribe messaging protocol designed for M2M (machine to machine) telemetry in low bandwidth environments. Fastapi-mqtt is the client for working with MQTT.

For more information about MQTT, please refer to here: MQTT

Fastapi-mqtt wraps around gmqtt module. Gmqtt Python async client for MQTT client implementation. Module has support of MQTT version 5.0 protocol

MIT licensed GitHub stars GitHub forks GitHub issues Downloads


Documentation: FastApi-MQTT

The key feature are:

MQTT specification avaliable with help decarator methods using callbacks:

  • on_connect()

  • on_disconnect()

  • on_subscribe()

  • on_message()

  • subscribe(topic)

  • MQTT Settings available with pydantic class

  • Authentication to broker with credentials

  • unsubscribe certain topics and publish to certain topics

🔨 Installation

pip install fastapi-mqtt

🕹 Guide

from contextlib import asynccontextmanager
from typing import Any

from fastapi import FastAPI
from gmqtt import Client as MQTTClient

from fastapi_mqtt import FastMQTT, MQTTConfig

mqtt_config = MQTTConfig()
fast_mqtt = FastMQTT(config=mqtt_config)


@asynccontextmanager
async def _lifespan(_app: FastAPI):
    await fast_mqtt.mqtt_startup()
    yield
    await fast_mqtt.mqtt_shutdown()


app = FastAPI(lifespan=_lifespan)


@fast_mqtt.on_connect()
def connect(client: MQTTClient, flags: int, rc: int, properties: Any):
    client.subscribe("/mqtt")  # subscribing mqtt topic
    print("Connected: ", client, flags, rc, properties)

@fast_mqtt.subscribe("mqtt/+/temperature", "mqtt/+/humidity", qos=1)
async def home_message(client: MQTTClient, topic: str, payload: bytes, qos: int, properties: Any):
    print("temperature/humidity: ", topic, payload.decode(), qos, properties)

@fast_mqtt.on_message()
async def message(client: MQTTClient, topic: str, payload: bytes, qos: int, properties: Any):
    print("Received message: ", topic, payload.decode(), qos, properties)

@fast_mqtt.subscribe("my/mqtt/topic/#", qos=2)
async def message_to_topic_with_high_qos(
    client: MQTTClient, topic: str, payload: bytes, qos: int, properties: Any
):
    print(
        "Received message to specific topic and QoS=2: ", topic, payload.decode(), qos, properties
    )

@fast_mqtt.on_disconnect()
def disconnect(client: MQTTClient, packet, exc=None):
    print("Disconnected")

@fast_mqtt.on_subscribe()
def subscribe(client: MQTTClient, mid: int, qos: int, properties: Any):
    print("subscribed", client, mid, qos, properties)

@app.get("/test")
async def func():
    fast_mqtt.publish("/mqtt", "Hello from Fastapi")  # publishing mqtt topic
    return {"result": True, "message": "Published"}

Publish method:

async def func():
    fast_mqtt.publish("/mqtt", "Hello from Fastapi")  # publishing mqtt topic
    return {"result": True, "message": "Published"}

Subscribe method:

@fast_mqtt.on_connect()
def connect(client, flags, rc, properties):
    client.subscribe("/mqtt")  # subscribing mqtt topic
    print("Connected: ", client, flags, rc, properties)

Changing connection params

mqtt_config = MQTTConfig(
    host="mqtt.mosquito.org",
    port=1883,
    keepalive=60,
    username="username",
    password="strong_password",
)
fast_mqtt = FastMQTT(config=mqtt_config)

✅ Testing

  • Clone the repository and install it with poetry.
  • Run tests with pytest, using an external MQTT broker to connect (defaults to 'test.mosquitto.org').
  • Explore the fastapi app examples and run them with uvicorn
# (opc) Run a local mosquitto MQTT broker with docker
docker run -d --name mosquitto -p 9001:9001 -p 1883:1883 eclipse-mosquitto:1.6.15
# Set host for test broker when running pytest
TEST_BROKER_HOST=localhost pytest
# Run the example apps against local broker, with uvicorn
TEST_BROKER_HOST=localhost uvicorn examples.app:app --port 8000 --reload
TEST_BROKER_HOST=localhost uvicorn examples.ws_app.app:application --port 8000 --reload

Contributing

Fell free to open issue and send pull request.

Thanks To Contributors. Contributions of any kind are welcome!

Before you start please read CONTRIBUTING

fastapi-mqtt's People

Contributors

aabadie avatar ajstewart avatar aliyevh avatar andrewlaptev avatar azogue avatar felixonmars avatar fhpriamo avatar ftapajos avatar jthetzel avatar krappramiro avatar leon0824 avatar mblo avatar phe avatar sabuhish avatar spontanurlaub avatar turall avatar vincentto13 avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar

fastapi-mqtt's Issues

FastAPI Application error....ERROR: Application startup failed. Exiting. TimeoutError: [Errno 10060] Connect call failed (mqtt broker ip,port) connecting to a mqtt broker

Issue:

TimeoutError: [Errno 10060] Connect call failed ('mqtt_brokerip,mqtt_brokerport)

ERROR: Application startup failed. Exiting.
ERROR:uvicorn.error:Application startup failed. Exiting.

Time out error connecting to a mqtt broker using fastapi_mqtt library and the fastapi application is exiting.

Error Log

INFO: Started server process [24304]
INFO:uvicorn.error:Started server process [24304]
INFO: Waiting for application startup.
INFO:uvicorn.error:Waiting for application startup.
WARNING: Used broker version is 5
WARNING:uvicorn.error:Used broker version is 5
ERROR: Traceback (most recent call last):
File "D:\python_samples\fastapi_framework\venv\lib\site-packages\starlette\routing.py", line 645, in lifespan
async with self.lifespan_context(app):
File "D:\python_samples\fastapi_framework\venv\lib\site-packages\starlette\routing.py", line 540, in aenter
await self._router.startup()
File "D:\python_samples\fastapi_framework\venv\lib\site-packages\starlette\routing.py", line 622, in startup
await handler()
File "D:\python_samples\fastapi_framework\venv\lib\site-packages\fastapi_mqtt\fastmqtt.py", line 277, in startup
await self.connection()
File "D:\python_samples\fastapi_framework\venv\lib\site-packages\fastapi_mqtt\fastmqtt.py", line 129, in connection
await self.client.connect(self.client._host,self.client._port,self.client._ssl,self.client._keepalive,version)
File "D:\python_samples\fastapi_framework\venv\lib\site-packages\gmqtt\client.py", line 225, in connect
self._connection = await self._create_connection(
File "D:\python_samples\fastapi_framework\venv\lib\site-packages\gmqtt\client.py", line 241, in _create_connection
connection = await MQTTConnection.create_connection(host, port, ssl, clean_session, keepalive)
File "D:\python_samples\fastapi_framework\venv\lib\site-packages\gmqtt\mqtt\connection.py", line 27, in create_connection
transport, protocol = await loop.create_connection(MQTTProtocol, host, port, ssl=ssl)
File "C:\Program Files\Python39\lib\asyncio\base_events.py", line 1065, in create_connection
raise exceptions[0]
File "C:\Program Files\Python39\lib\asyncio\base_events.py", line 1050, in create_connection
sock = await self._connect_sock(
File "C:\Program Files\Python39\lib\asyncio\base_events.py", line 961, in _connect_sock
await self.sock_connect(sock, address)
File "C:\Program Files\Python39\lib\asyncio\selector_events.py", line 500, in sock_connect
return await fut
File "C:\Program Files\Python39\lib\asyncio\selector_events.py", line 535, in _sock_connect_cb
raise OSError(err, f'Connect call failed {address}')
TimeoutError: [Errno 10060] Connect call failed ()

ERROR:uvicorn.error:Traceback (most recent call last):
File "D:\python_samples\fastapi_framework\venv\lib\site-packages\starlette\routing.py", line 645, in lifespan
async with self.lifespan_context(app):
File "D:\python_samples\fastapi_framework\venv\lib\site-packages\starlette\routing.py", line 540, in aenter
await self._router.startup()
File "D:\python_samples\fastapi_framework\venv\lib\site-packages\starlette\routing.py", line 622, in startup
await handler()
File "D:\python_samples\fastapi_framework\venv\lib\site-packages\fastapi_mqtt\fastmqtt.py", line 277, in startup
await self.connection()
File "D:\python_samples\fastapi_framework\venv\lib\site-packages\fastapi_mqtt\fastmqtt.py", line 129, in connection
await self.client.connect(self.client._host,self.client._port,self.client._ssl,self.client._keepalive,version)
File "D:\python_samples\fastapi_framework\venv\lib\site-packages\gmqtt\client.py", line 225, in connect
self._connection = await self._create_connection(
File "D:\python_samples\fastapi_framework\venv\lib\site-packages\gmqtt\client.py", line 241, in _create_connection
connection = await MQTTConnection.create_connection(host, port, ssl, clean_session, keepalive)
File "D:\python_samples\fastapi_framework\venv\lib\site-packages\gmqtt\mqtt\connection.py", line 27, in create_connection
transport, protocol = await loop.create_connection(MQTTProtocol, host, port, ssl=ssl)
File "C:\Program Files\Python39\lib\asyncio\base_events.py", line 1065, in create_connection
raise exceptions[0]
File "C:\Program Files\Python39\lib\asyncio\base_events.py", line 1050, in create_connection
sock = await self._connect_sock(
File "C:\Program Files\Python39\lib\asyncio\base_events.py", line 961, in _connect_sock
await self.sock_connect(sock, address)
File "C:\Program Files\Python39\lib\asyncio\selector_events.py", line 500, in sock_connect
return await fut
File "C:\Program Files\Python39\lib\asyncio\selector_events.py", line 535, in _sock_connect_cb
raise OSError(err, f'Connect call failed {address}')
TimeoutError: [Errno 10060] Connect call failed ()

ERROR: Application startup failed. Exiting.
ERROR:uvicorn.error:Application startup failed. Exiting.

mqtt reconnect_retries、reconnect_delay settings error;

async def __set_connetion_config(self) -> None:
    """
    The connected MQTT clients will always try to reconnect in case of lost connections.
    The number of reconnect attempts is unlimited.
    For changing this behavior, set reconnect_retries and reconnect_delay with its values.
    For more info: https://github.com/wialon/gmqtt#reconnects
    """
    if self.config.reconnect_retries:
        self.client.set_config(
            # reconnect_retries=self.config.reconnect_retries
            config=self.config
        )

    if self.config.reconnect_delay:
        # self.client.set_config(reconnect_delay=self.config.reconnect_delay)
        self.client.set_config(config=self.config)

RuntimeError: no validator found for <class 'ssl.SSLContext'>, see `arbitrary_types_allowed` in Config

class MQTTConfig(BaseModel):
File "pydantic/main.py", line 197, in pydantic.main.ModelMetaclass.new
File "pydantic/fields.py", line 506, in pydantic.fields.ModelField.infer
File "pydantic/fields.py", line 436, in pydantic.fields.ModelField.init
File "pydantic/fields.py", line 552, in pydantic.fields.ModelField.prepare
File "pydantic/fields.py", line 663, in pydantic.fields.ModelField._type_analysis
File "pydantic/fields.py", line 808, in pydantic.fields.ModelField._create_sub_type
File "pydantic/fields.py", line 436, in pydantic.fields.ModelField.init
File "pydantic/fields.py", line 557, in pydantic.fields.ModelField.prepare
File "pydantic/fields.py", line 831, in pydantic.fields.ModelField.populate_validators
File "pydantic/validators.py", line 765, in find_validators
RuntimeError: no validator found for <class 'ssl.SSLContext'>, see arbitrary_types_allowed in Config

im fix by

class MQTTConfig(BaseModel):

class Config:

arbitrary_types_allowed = True

....

api endpoint for subscribe

@sabuhish I am trying to create a api endpoint that subscribe to mqtt topic and return data when have api call to that endpoint. How can I implement that? I just see in the example for the publish, not subscribe?
Are you plan to switch to use paho client instead of gmqtt?
And I face a problem that to use mqtt with renew jwt, then use it as credential to subscribe to mqtt broker, in your code I didn't see the implementation for that?
Thanks so much for your support? If anyone want to support on implement above issues, I hope that we can co-operate to do that in case of the author does not have so much time for it.

Example code not working

Hi, I tried running the example code and got the following error:
OSError: Multiple exceptions: [Errno 61] Connect call failed ('::1', 1883, 0, 0), [Errno 61] Connect call failed ('127.0.0.1', 1883)
I'd like to convert my http API to a mqtt for better IoT support but this happens.
rant it with uvicorn main:app --reload command thx in advance

Running with hyper corn gives this error:
../fastapi_mqtt/fastmqtt.py", line 78, in init
log_info = logger
NameError: name 'logger' is not defined

Is this project dead?

There seem to be issues even running sample code - likely some changes in gunicorn. They've been outlined in other issues Is this project still being worked on?

Adding reason_code during disconnect

Hello, first I want to thank you for making it easy to use gmqtt with FastAPI!
Second is more of a wish than issue. I need a way to send reason_code during shutdown, so if possible, can you change to this?

async def mqtt_shutdown(self, reason_code=0) -> None:
    """Final disconnection for MQTT client, for lifespan shutdown."""
    await self.client.disconnect(reason_code)

Many thx
//Victor

How to feed mqtt messages from within a fastapi websocket ? (ie. proxying)

Sorry to post an issue - as it might better be a support request.

TLDR : how can I achieve the equivalent of having @mqttclient.on_subscribe() within fastapi @app.websocket() ?

I fail to understand if/how I can "locally" subscribe to topics from within my async fastapi websocket interfaces.

May be fastapi-mqtt is just not meant for this purpose.

Context is

  • parallel instances of an application of mine publish their states via MQTT, eg. engine/213/progress for the instance id 213.
  • I have a fastapi backend and multiple workers
  • I can subscribe and get notified of engine/# at the "root" level of my fast api app with @mqttclient.on_subscribe()
  • on the other hand I can feed data live to client from fastapi websocket @app.websocket() async interfaces

But I wish I subscribed and transmitted the only topics that are actually open as webockets (1 websocket = 1 subscribe, activated on demand when a client connects to fastapi websocket for a given instance, then closed when the websocket goes away)

Ie. how can my back-end proxy the MQTT state of engine/An/progress only on demand, only from within the @app.websocket("/engine/{An}/wsstatus"), and only when a web client is connected ?

Meanwhile I feel like I have to@mq.on_subscribe() to all topics at the "root" level of my app, then try to asynchronously notify the currently opened fastapi websockets if something new arrives for them. This seems ugly and I am not even sure I can do it efficiently with multiple workers (may be gunicorn's --preload would help / using redis or memcached is overkill)

Well, or I revert to a non-fastapi mqtt client instanciated in the @app.websocket() !

(PS: I don't want to use paho-mqtt native websockets, it would require me to expose the mqtt network and bypass my fastapi auth)

Multiple subscriptions with multiple workers

Hello,

I just recently switch to fastpi-mqtt for some testing. While developing with a single worker everything works fine as soon as I put the code in a docker instance I noticed that it has issues with multiple workers though. Per default tiangolo/uvicorn-gunicorn-fastapi creates 8 worker processes for me, as a result I have 8 subscriptions as well and if a change is published my on_message method is called 8 times which is not ideal. Either I am missing something or there are issues with multiple workers.

Error run fastapi-mqtt using gunicorn

I use fastapi-mqtt==0.3.0, python==3.9.7, fastapi==0.68.1
I run example application by command : uvicorn app:app --host 0.0.0.0 --port 7000 --reload
Everything is ok -> app run

But:
I run example application by command :
gunicorn app:app -k uvicorn.workers.UvicornWorker --workers=9 -b 0.0.0.0:7000

Error :
File "/home/hoanganh/smarthome-server/venv/lib/python3.9/site-packages/gmqtt/client.py", line 230, in connect
await self._connected.wait()
File "/usr/lib/python3.9/asyncio/locks.py", line 226, in wait
await fut
RuntimeError: Task <Task pending name='Task-3' coro=<LifespanOn.main() running at /home/hoanganh/smarthome-server/venv/lib/python3.9/site-packages/uvicorn/lifespan/on.py:84>> got Future attached to a different loop

[2021-09-21 20:42:13 +0700] [10419] [ERROR] Application startup failed. Exiting.
[2021-09-21 20:42:13 +0700] [10419] [INFO] Worker exiting (pid: 10419)
Task was destroyed but it is pending!
task: <Task pending name='Task-1' coro=<Client._resend_qos_messages() running at /home/hoanganh/smarthome-server/venv/lib/python3.9/site-packages/gmqtt/client.py:176>>
sys:1: RuntimeWarning: coroutine 'Client._resend_qos_messages' was never awaited
[2021-09-21 20:42:14 +0700] [10418] [INFO] Shutting down: Master
[2021-09-21 20:42:14 +0700] [10418] [INFO] Reason: Worker failed to boot.

init_app doesn't seem to work with lifespans

Versions: fastapi: 0.103.2, fastapi-mqtt: 2.0.0

When I use a fastapi lifespan (e.g. in order to setup a db connection) the init_app method does not seem to work anymore. The client will not connect. I guess because the now deprecated startup and shutdown events won't get triggered when using lifespans.

Lifespan documentation: https://fastapi.tiangolo.com/advanced/events/

The init_app method for reference:

def init_app(self, app: FastAPI) -> None:  # pragma: no cover
    @app.on_event("startup")
    async def startup():
        await self.connection()

    @app.on_event("shutdown")
    async def shutdown():
        await self.client.disconnect()

Calling the connection() and disconnect() methods themselves from the lifespan then works again:

...
mqtt_config = MQTTConfig(host=settings.mqtt_host, port=settings.mqtt_port)
mqtt = FastMQTT(config=mqtt_config)

@asynccontextmanager
async def lifespan(_: FastAPI) -> AsyncGenerator[None, None]:
    """Run tasks before and after the server starts.

    Specifically, we initialize the database models before the server starts
    to ensure that the database has the correct schema.
    """
    try:
        init_models()
        await mqtt.connection()

    except ConnectionRefusedError as exc:
        raise ConnectionRefusedError("Failed to connect to the database") from exc
    yield
    await mqtt.client.disconnect()

Is it an error on my part with using the lifespan? Otherwise adding a hint for this to the official documentation will help others.

Add an ability to move mqtt handlers outside main

Hi @sabuhish, thanks for the great library!

I'm wondering if it's possible to move mqtt handlers to a different package rather than putting everything into main?

Usually, when you build a production-level code, you organize it into logical packages, e.g. api, model, etc. FastAPI has a good concept of APIRouter as an example.

If I run a sample from the README within a script where FastAPI instance is created, everything works as expected. But when I try to move mqtt code somewhere else (outside main), the app is failing at the start:

ERROR:    Traceback (most recent call last):
  File "/project_path/.venv/lib/python3.8/site-packages/starlette/routing.py", line 621, in lifespan
    async with self.lifespan_context(app):
  File "/project_path/.venv/lib/python3.8/site-packages/starlette/routing.py", line 518, in __aenter__
    await self._router.startup()
  File "/project_path/.venv/lib/python3.8/site-packages/starlette/routing.py", line 598, in startup
    await handler()
  File "/project_path/app/main.py", line 77, in startup
    await mqtt_client.mqtt.connection()
  File "/project_path/.venv/lib/python3.8/site-packages/fastapi_mqtt/fastmqtt.py", line 129, in connection
    await self.client.connect(self.client._host,self.client._port,self.client._ssl,self.client._keepalive,version)
  File "/project_path/.venv/lib/python3.8/site-packages/gmqtt/client.py", line 230, in connect
    await self._connected.wait()
  File "/usr/lib/python3.8/asyncio/locks.py", line 309, in wait
    await fut
RuntimeError: Task <Task pending name='Task-3' coro=<LifespanOn.main() running at /project_path/.venv/lib/python3.8/site-packages/uvicorn/lifespan/on.py:84>> got Future <Future pending> attached to a different loop

As far as I understood, mqtt instance is running in a different event loop for some reason if it's located in the other package? Is there any workaround for that?

P.S. I run the code via uvicorn.

Deprecation Warning

FastAPI: 0.105.0
fastapi-mqtt 2.1.1

 /usr/local/lib/python3.11/site-packages/fastapi_mqtt/fastmqtt.py:235: DeprecationWarning: 
          on_event is deprecated, use lifespan event handlers instead.
  
          Read more about it in the
          [FastAPI docs for Lifespan Events](https://fastapi.tiangolo.com/advanced/events/).
          
    @app.on_event("shutdown")

Pass MQTT client to a router

Hi,

I would like to know if it is possible to pass the MQTT client initiated by init_app() to a router that is included by included_router()?

mqtt_config = MQTTConfig()
mqtt = FastMQTT(
    config=mqtt_config
)

app = FastAPI()
mqtt.init_app(app)

app.include_router(activities.router)

I would like to access the @mqtt.on_message() decorator from the activities.router routes to read the message only when a specific route is called.

Thanks for your help.

Wrapper for fastapi-mqtt

fastapi-mqtt will be used gmqtt as a wrapper on which aims to be asynchronous for MQTT connection

QOS != 0 gives exception

Hello,

When publishing with QOS of 1 or 2 I get the following stack trace

Traceback (most recent call last):
File "e:\code\python\pythontest\venv\lib\site-packages\uvicorn\protocols\http\h11_impl.py", line 396, in run_asgi
result = await app(self.scope, self.receive, self.send)
File "e:\code\python\pythontest\venv\lib\site-packages\uvicorn\middleware\proxy_headers.py", line 45, in call
return await self.app(scope, receive, send)
File "e:\code\python\pythontest\venv\lib\site-packages\fastapi\applications.py", line 199, in call
await super().call(scope, receive, send)
File "e:\code\python\pythontest\venv\lib\site-packages\starlette\applications.py", line 111, in call
await self.middleware_stack(scope, receive, send)
File "e:\code\python\pythontest\venv\lib\site-packages\starlette\middleware\errors.py", line 181, in call
raise exc from None
File "e:\code\python\pythontest\venv\lib\site-packages\starlette\middleware\errors.py", line 159, in call
await self.app(scope, receive, _send)
File "e:\code\python\pythontest\venv\lib\site-packages\starlette\exceptions.py", line 82, in call
raise exc from None
File "e:\code\python\pythontest\venv\lib\site-packages\starlette\exceptions.py", line 71, in call
await self.app(scope, receive, sender)
File "e:\code\python\pythontest\venv\lib\site-packages\starlette\routing.py", line 566, in call
await route.handle(scope, receive, send)
File "e:\code\python\pythontest\venv\lib\site-packages\starlette\routing.py", line 227, in handle
await self.app(scope, receive, send)
File "e:\code\python\pythontest\venv\lib\site-packages\starlette\routing.py", line 41, in app
response = await func(request)
File "e:\code\python\pythontest\venv\lib\site-packages\fastapi\routing.py", line 201, in app
raw_response = await run_endpoint_function(
File "e:\code\python\pythontest\venv\lib\site-packages\fastapi\routing.py", line 148, in run_endpoint_function
return await dependant.call(**values)
File ".\main.py", line 60, in func
await fast_mqtt.publish("test", "Hello from Fastapi", qos=1) #publishing mqtt topic
File "e:\code\python\pythontest\venv\lib\site-packages\fastapi_mqtt\fastmqtt.py", line 226, in publish
return await self.loop.run_in_executor(self.executor, func)
File "C:\Python38\lib\concurrent\futures\thread.py", line 57, in run
result = self.fn(*self.args, **self.kwargs)
File "e:\code\python\pythontest\venv\lib\site-packages\gmqtt\client.py", line 291, in publish
self._persistent_storage.push_message_nowait(mid, package)
File "e:\code\python\pythontest\venv\lib\site-packages\gmqtt\storage.py", line 12, in push_message_nowait
return asyncio.ensure_future(self.push_message(mid, raw_package))
File "C:\Python38\lib\asyncio\tasks.py", line 660, in ensure_future
loop = events.get_event_loop()
File "C:\Python38\lib\asyncio\events.py", line 639, in get_event_loop
raise RuntimeError('There is no current event loop in thread %r.'
RuntimeError: There is no current event loop in thread 'ThreadPoolExecutor-0_0'.
C:\Python38\lib\asyncio\base_events.py:1860: RuntimeWarning: coroutine 'HeapPersistentStorage.push_message' was never awaited
handle = None # Needed to break cycles when an exception occurs.


Using QOS of 0 works fine

I took the example on the site and change the config to my server and managed to reproduce it. Here it is:

from fastapi_mqtt.fastmqtt import FastMQTT
from fastapi import FastAPI
from fastapi_mqtt.config import MQQTConfig
import ssl

mqtt_config = MQQTConfig()

def config():
server_cert =
client_cert =
client_key =

context = ssl.create_default_context(ssl.Purpose.SERVER_AUTH, cafile=server_cert)
context.load_cert_chain(certfile=client_cert, keyfile=client_key)

mqtt_config.ssl = context
mqtt_config.password = <MY PASSWORD>
mqtt_config.username = <MY USERNAME>
mqtt_config.host = <MY SERVER>
mqtt_config.port = 8883

config()

fast_mqtt = FastMQTT(
config=mqtt_config
)

app = FastAPI()

fast_mqtt.init_app(app)

@fast_mqtt.on_connect()
def connect(client, flags, rc, properties):
fast_mqtt.client.subscribe("/mqtt") #subscribing mqtt topic
print("Connected: ", client, flags, rc, properties)

@fast_mqtt.subscribe("mqtt/+/temperature", "mqtt/+/humidity")
async def home_message(client, topic, payload, qos, properties):
print("temperature/humidity: ", topic, payload.decode(), qos, properties)
return 0

@fast_mqtt.on_message()
async def message(client, topic, payload, qos, properties):
print("Received message: ",topic, payload.decode(), qos, properties)
return 0

@fast_mqtt.on_disconnect()
def disconnect(client, packet, exc=None):
print("Disconnected")

@fast_mqtt.on_subscribe()
def subscribe(client, mid, qos, properties):
print("subscribed", client, mid, qos, properties)

@app.get("/")
async def func():
await fast_mqtt.publish("test", "Hello from Fastapi", qos=1) #publishing mqtt topic

return {"result": True,"message":"Published" }

Version 1.8 is broken

It seems that the method self.client has been changed to self.config for set_config and results in the below error on startup the problem is cased in line line 140, in __set_connetion_config

AttributeError: 'MQTTConfig' object has no attribute 'set_config'
Also the versioning has changed from 1.0.X to 1.8 for some reason

Will message functionality and example broken

When trying to use the last will message I found that the functionality is broken which can also be seen when running the example provided in the repo:

❯ uvicorn examples.app_will_message:app --reload
INFO:     Will watch for changes in these directories: ['/Users/user/github/fastapi-mqtt']
INFO:     Uvicorn running on http://127.0.0.1:8000 (Press CTRL+C to quit)
INFO:     Started reloader process [4334] using StatReload
INFO:     on_connect handler accepted
INFO:     on_message handler accepted
INFO:     on_subscribe handler accepted
INFO:     Started server process [4336]
INFO:     Waiting for application startup.
WARNING:  Used broker version is 5
ERROR:    Traceback (most recent call last):
  File "/Users/user/miniforge3/envs/fastapi-mqtt/lib/python3.10/site-packages/starlette/routing.py", line 671, in lifespan
    async with self.lifespan_context(app):
  File "/Users/user/miniforge3/envs/fastapi-mqtt/lib/python3.10/site-packages/starlette/routing.py", line 566, in __aenter__
    await self._router.startup()
  File "/Users/user/miniforge3/envs/fastapi-mqtt/lib/python3.10/site-packages/starlette/routing.py", line 648, in startup
    await handler()
  File "/Users/user/github/fastapi-mqtt/./fastapi_mqtt/fastmqtt.py", line 218, in startup
    await self.connection()
  File "/Users/user/github/fastapi-mqtt/./fastapi_mqtt/fastmqtt.py", line 124, in connection
    await self.client.connect(
  File "/Users/user/miniforge3/envs/fastapi-mqtt/lib/python3.10/site-packages/gmqtt/client.py", line 228, in connect
    await self._connection.auth(self._client_id, self._username, self._password, will_message=self._will_message,
  File "/Users/user/miniforge3/envs/fastapi-mqtt/lib/python3.10/site-packages/gmqtt/mqtt/connection.py", line 61, in auth
    await self._protocol.send_auth_package(client_id, username, password, self._clean_session,
  File "/Users/user/miniforge3/envs/fastapi-mqtt/lib/python3.10/site-packages/gmqtt/mqtt/protocol.py", line 114, in send_auth_package
    pkg = package.LoginPackageFactor.build_package(client_id, username, password, clean_session,
  File "/Users/user/miniforge3/envs/fastapi-mqtt/lib/python3.10/site-packages/gmqtt/mqtt/package.py", line 70, in build_package
    connect_flags |= 0x04 | ((will_message.qos & 0x03) << 3) | ((will_message.retain & 0x01) << 5)
TypeError: unsupported operand type(s) for &: 'str' and 'int'

Digging in I found this is because of two reasons:

doesn't recognize payload as valid json

This just keeps happening and suddenly its gone, without any changes. This only happens with this framework.
I have to solve it like this:

json.loads(str(payload.decode())[1:])

So i guess, there is some kind of character in the way, but this:

payload.decode()[0]

gives me an empty string.

WebSocket support

Is web sockets support planned for this library?
A sample on how to make it work would be highly appreciated

Error found on the example page of the document

https://sabuhish.github.io/fastapi-mqtt/example/

The mqtt variable is undefined in the file. It should be replaced with fast_mqtt

from fastapi_mqtt.fastmqtt import FastMQTT
from fastapi import FastAPI
from fastapi_mqtt.config import MQTTConfig

app = FastAPI()

mqtt_config = MQTTConfig()

fast_mqtt = FastMQTT(config=mqtt_config)

fast_mqtt.init_app(app)

# Should be fast_mqtt here
@mqtt.on_connect()
def connect(client, flags, rc, properties):
    mqtt.client.subscribe("/mqtt") #subscribing mqtt topic
    print("Connected: ", client, flags, rc, properties)

🚨 Fix GH Action for pypi publish

cc @sabuhish

pypi.org has changed their security police 🚨, and using --username ${{ secrets.PYPI_USERNAME }} --password ${{ secrets.PYPI_PASSWORD }} is not valid anymore -> https://github.com/sabuhish/fastapi-mqtt/actions/runs/7396082004/job/20120516144

HTTP Error 403: Username/Password authentication is no longer supported. Migrate to API Tokens or Trusted Publishers instead.
See https://pypi.org/help/#apitoken and https://pypi.org/help/#trusted-publishers
Access was denied to this resource.

Publish action needs to change, but also the account configuration in pypi.org ⚠️

Raising errors

Raising errors should be applicable in case of an unexpected cases.

There is a typo in the MQQTConfig class

Hi guys.

Love the work you are doing - you wanted to inform you, that you have a typo in the MQQTConfig class. This should be named MQTTConfig right? :-)

Best regards.

🐛 IndexError in topic matching algorithm

The current logic for matching topics fails with an IndexError when the pattern has more parts than the topic.

Failing examples:

  • pattern/subscription: "sport/tennis/player1", against topic "sport/tennis"
  • pattern/subscription: "sport/tennis/+", against topic "sport/tennis"

In addition, there are special cases with "$SYS" topics (from http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718106) which are not implemented, like:

  • pattern/subscription: "#" shouldn't match "$SYS/anything"
  • pattern/subscription: "+/monitor/Clients" shouldn't match "$SYS/monitor/Clients"

Suggestion: Update requirement to newest fastapi automatically

PS E:\xx\libs> pip install -U fastapi-mqtt --proxy=http://localhost:6666
Collecting fastapi-mqtt
  Downloading fastapi_mqtt-1.0.4-py3-none-any.whl (8.0 kB)
Collecting fastapi<0.76.0,>=0.75.1
  Using cached fastapi-0.75.2-py3-none-any.whl (54 kB)
Collecting uvicorn<0.18.0,>=0.17.6
  Downloading uvicorn-0.17.6-py3-none-any.whl (53 kB)
     ---------------------------------------- 53.6/53.6 kB 173.4 kB/s eta 0:00:00
Requirement already satisfied: gmqtt<0.7.0,>=0.6.11 in c:\python310\lib\site-packages (from fastapi-mqtt) (0.6.11)
Requirement already satisfied: pydantic<2.0.0,>=1.9.0 in c:\python310\lib\site-packages (from fastapi-mqtt) (1.9.0)
Collecting starlette==0.17.1
  Using cached starlette-0.17.1-py3-none-any.whl (58 kB)
Requirement already satisfied: anyio<4,>=3.0.0 in c:\python310\lib\site-packages (from starlette==0.17.1->fastapi<0.76.0,>=0.75.1->fastapi-mqtt) (3.5.0)
Requirement already satisfied: typing-extensions>=3.7.4.3 in c:\python310\lib\site-packages (from pydantic<2.0.0,>=1.9.0->fastapi-mqtt) (3.10.0.0)
Requirement already satisfied: click>=7.0 in c:\python310\lib\site-packages (from uvicorn<0.18.0,>=0.17.6->fastapi-mqtt) (8.0.4)
Requirement already satisfied: asgiref>=3.4.0 in c:\python310\lib\site-packages (from uvicorn<0.18.0,>=0.17.6->fastapi-mqtt) (3.5.0)
Requirement already satisfied: h11>=0.8 in c:\python310\lib\site-packages (from uvicorn<0.18.0,>=0.17.6->fastapi-mqtt) (0.12.0)
Requirement already satisfied: colorama in c:\python310\lib\site-packages (from click>=7.0->uvicorn<0.18.0,>=0.17.6->fastapi-mqtt) (0.4.4)
Requirement already satisfied: sniffio>=1.1 in c:\python310\lib\site-packages (from anyio<4,>=3.0.0->starlette==0.17.1->fastapi<0.76.0,>=0.75.1->fastapi-mqtt) (1.2.0)
Requirement already satisfied: idna>=2.8 in c:\python310\lib\site-packages (from anyio<4,>=3.0.0->starlette==0.17.1->fastapi<0.76.0,>=0.75.1->fastapi-mqtt) (3.2)
Installing collected packages: uvicorn, starlette, fastapi, fastapi-mqtt
  Attempting uninstall: uvicorn
    Found existing installation: uvicorn 0.15.0
    Uninstalling uvicorn-0.15.0:
      Successfully uninstalled uvicorn-0.15.0
  Attempting uninstall: starlette
    Found existing installation: starlette 0.19.1
    Uninstalling starlette-0.19.1:
      Successfully uninstalled starlette-0.19.1
  Attempting uninstall: fastapi
    Found existing installation: fastapi 0.77.1
    Uninstalling fastapi-0.77.1:
      Successfully uninstalled fastapi-0.77.1
Successfully installed fastapi-0.75.2 fastapi-mqtt-1.0.4 starlette-0.17.1 uvicorn-0.17.6

error Application startup failed. Exiting. when running!

Hi there. I implemented codes example but I get this error when uvicorn runs:

  File "/home/ali/Documents/first_fastapi/venv/lib/python3.9/site-packages/starlette/routing.py", line 540, in lifespan
    async for item in self.lifespan_context(app):
  File "/home/ali/Documents/first_fastapi/venv/lib/python3.9/site-packages/starlette/routing.py", line 481, in default_lifespan
    await self.startup()
  File "/home/ali/Documents/first_fastapi/venv/lib/python3.9/site-packages/starlette/routing.py", line 516, in startup
    await handler()
  File "/home/ali/Documents/first_fastapi/venv/lib/python3.9/site-packages/fastapi_mqtt/fastmqtt.py", line 276, in startup
    await self.connection()
  File "/home/ali/Documents/first_fastapi/venv/lib/python3.9/site-packages/fastapi_mqtt/fastmqtt.py", line 128, in connection
    await self.client.connect(self.client._host,self.client._port,self.client._ssl,self.client._keepalive,version)
  File "/home/ali/Documents/first_fastapi/venv/lib/python3.9/site-packages/gmqtt/client.py", line 225, in connect
    self._connection = await self._create_connection(
  File "/home/ali/Documents/first_fastapi/venv/lib/python3.9/site-packages/gmqtt/client.py", line 241, in _create_connection
    connection = await MQTTConnection.create_connection(host, port, ssl, clean_session, keepalive)
  File "/home/ali/Documents/first_fastapi/venv/lib/python3.9/site-packages/gmqtt/mqtt/connection.py", line 27, in create_connection
    transport, protocol = await loop.create_connection(MQTTProtocol, host, port, ssl=ssl)
  File "uvloop/loop.pyx", line 2024, in create_connection
  File "uvloop/loop.pyx", line 2001, in uvloop.loop.Loop.create_connection
ConnectionRefusedError: [Errno 111] Connection refused

ERROR:    Application startup failed. Exiting.

Version 1.0.8 is broken

Hi!
During startup it showed the following error:

image

As far as I understood, in the last update, MQTTConfig was omitted in the body of the __set_connetion_config method in the arguments of the self.client.set_config method call.

I fix this in the pull request #57

Insert MQTT Messages into Database

As requested by @sabuhish I have added this here as an issue.
Here's the link to my original post.

TL;DR: I was not able to implement the insertion of MQTT messages received by fastapi-mqtt into my database.
I figured a work-around using paho-mqtt in another python script that makes a post request to the API, but of course,
it's not nearly as graceful as it would be if fastapi could do this all by itself.
i.e. Connecting -> Subscribing -> Receiving -> Insert to DB

P.S. : Thanks fastapi-mqtt devs for making this library. Your work is witnessed and appreciated!!!

Using Pydantic models for messages

Are there any plans to make this library more like FastAPI with regard to tightly defined models and being more opinionated about the default way to do things? If so, where would we start?

I'm thinking maybe:

  • Moving on_connect, on_message and so on to an advanced example rather than the main one
  • Adding a PayloadModel to the the subscribe decorator.
  • Add a decorator (@topics?) for outbound traffic, with the model as a hint in the payload field
  • Adding an error decorator, that is called when an exception is raised when constructing
  • A decorator for a Last Will and Testament function
  • A router class so the code can be nicely split up into different files, like in FastAPI
  • Integrate support for encoding/decoding Pydantic models to lightweight transport formats rather than json (bencoding, bson, protobuf, cson, asn.1 or cap'n proto?)

But I'm a noob here. Would there be any love for the models stuff? And where's best to start?

FastAPI's on_event is deprecated, use lifespan event handlers instead

FastAPI's startup and shutdown events are deprecated (https://fastapi.tiangolo.com/advanced/events/#alternative-events-deprecated).

However, they are still used

@app.on_event("startup")

To resolve this, fastapi-mqtt must use the lifespan events. (There will be no side effect on required version of FastAPI - lifespan events are supported sine 0.93.0)

Please let me know if PR support is required

Initialization fails in absence of MQTT broker

Problem Statement

The MQTT connection gracefully handles reconnects if the connection is lost at runtime, but if it is unable to establish the initial connection on startup, it hard fails.

2023-01-08 12:41:40,616 uvicorn.error  INFO  Started server process [31368]
2023-01-08 12:41:40,617 uvicorn.error  INFO  Waiting for application startup.
2023-01-08 12:41:40,617 uvicorn.error  WARNI Used broker version is 5
2023-01-08 12:41:44,708 uvicorn.error  ERROR Traceback (most recent call last):
  File "C:\Repositories\sandbox\fastapi_mqtt\.venv\lib\site-packages\starlette\routing.py", line 671, in lifespan
    async with self.lifespan_context(app):
  File "C:\Repositories\sandbox\fastapi_mqtt\.venv\lib\site-packages\starlette\routing.py", line 566, in __aenter__
    await self._router.startup()
  File "C:\Repositories\sandbox\fastapi_mqtt\.venv\lib\site-packages\starlette\routing.py", line 648, in startup
    await handler()
  File "C:\Repositories\sandbox\fastapi_mqtt\.venv\lib\site-packages\fastapi_mqtt\fastmqtt.py", line 232, in startup
    await self.connection()
  File "C:\Repositories\sandbox\fastapi_mqtt\.venv\lib\site-packages\fastapi_mqtt\fastmqtt.py", line 130, in connection
    await self.client.connect(
  File "C:\Repositories\sandbox\fastapi_mqtt\.venv\lib\site-packages\gmqtt\client.py", line 225, in connect
    self._connection = await self._create_connection(
  File "C:\Repositories\sandbox\fastapi_mqtt\.venv\lib\site-packages\gmqtt\client.py", line 241, in _create_connection
    connection = await MQTTConnection.create_connection(host, port, ssl, clean_session, keepalive)
  File "C:\Repositories\sandbox\fastapi_mqtt\.venv\lib\site-packages\gmqtt\mqtt\connection.py", line 27, in create_connection
    transport, protocol = await loop.create_connection(MQTTProtocol, host, port, ssl=ssl)
  File "C:\Users\<REDACTED>\AppData\Local\Programs\Python\Python39\lib\asyncio\base_events.py", line 1061, in create_connection
    raise exceptions[0]
  File "C:\Users\<REDACTED>\AppData\Local\Programs\Python\Python39\lib\asyncio\base_events.py", line 1041, in create_connection
    sock = await self._connect_sock(
  File "C:\Users\<REDACTED>\AppData\Local\Programs\Python\Python39\lib\asyncio\base_events.py", line 955, in _connect_sock
    await self.sock_connect(sock, address)
  File "C:\Users\<REDACTED>\AppData\Local\Programs\Python\Python39\lib\asyncio\proactor_events.py", line 702, in sock_connect
    return await self._proactor.connect(sock, address)
  File "C:\Users\<REDACTED>\AppData\Local\Programs\Python\Python39\lib\asyncio\windows_events.py", line 817, in _poll
    value = callback(transferred, key, ov)
  File "C:\Users\<REDACTED>\AppData\Local\Programs\Python\Python39\lib\asyncio\windows_events.py", line 604, in finish_connect
    ov.getresult()
ConnectionRefusedError: [WinError 1225] The remote computer refused the network connection

Request

Are there any examples you can point to that get around this problem? Or will it require some changes to the fastapi-mqtt library itself to support transient MQTT broker availability?

Background

My use case for fastapi-mqtt is for writing applications that leverage IoT for inter-process communication; we typically have small groups of applications, often all running on the same host, that rely on a custom in-house MQTT broker service. The MQTT broker service could be shut down, such as when deploying updates, and it would be nice if the separate apps (leveraging fastapi-mqtt ) could start up while the broker is down and continue trying to connect until the broker is back up.

TL;DR: I'd love to be able to do say "just keep trying to reconnect to the MQTT broker if you can't establish connection immediately on startup".

Docs

At the initial stage documentation will be provided from README.md later on we should move docs to mkdocs

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.