Git Product home page Git Product logo

mqttasgi's People

Contributors

ivanmviveros avatar mzitelli97 avatar nim65s avatar sivulich avatar sootyowl avatar tlifschitz avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar

mqttasgi's Issues

MQTT lost subscription when connect_max_retries is set to 0

Hello! thanks for the project, I find it very useful.
I realized a consumer that will listen to some mqtt topics.
I noticed that if I unplug the network cable, wait more than 60 seconds and replug it the client will reconnect, however no more messages will be received on the subscribed topics.
I placed the topic subscription on the connect event handler and the topic unsubscription on the disconnect event handler.
What is happening is that once the client detects the disconnection it won't trigger the disconnect event handler, and when it is connecting again it will not subscribe the topics as they are already present in the topic list.
For this reason no more messages are received after the disconnection.
A workaround for this is to add some additional logic on the connect handler, that will check if the topics were previously subscribed, and, if so, it will unsubscribe before subscribing it again.
I don't know if this is an issue or it is simply wrong the way I am using this software.

I also think that adding the keepalive option to the client options would be useful, what do you think?

Attached the code of the consumer

class MQTTChannelsConsumer(MqttConsumer):
    groupName = 'frontend'
    topics = [
        { "topic": "topic1", "qos": 2},
        { "topic": "topic2", "qos": 2},
    ]
    already_subscribed_topics = []

    async def connect(self):
        logger.debug('[mqttasgi][consumer][connect] - Connected!')
        for topic in self.topics:
            if topic["topic"] in self.already_subscribed_topics:
                #unsubscribing topic before resubscribing
                await self.unsubscribe(topic["topic"])

            await self.subscribe(topic["topic"], topic["qos"])
            self.already_subscribed_topics.append(topic["topic"])

    async def receive(self, mqtt_message):
        logger.debug('Received a message at topic:', mqtt_message["topic"])
        logger.debug('With payload', mqtt_message["payload"])
        logger.debug('And QOS:', mqtt_message["qos"])

    async def disconnect(self):
        logger.debug('[mqttasgi][consumer][disconnect] - Disconnected!')
        for topic in self.topics:
            await self.unsubscribe(topic["topic"])

Thank you and have a good day.
A.

Breaking change with paho-mqtt release 2.0

Thank you for your work on this project. 🙇

Discovered this afternoon after paho-mqtt 2 was installed in a new container build.

The docs cite a new CallbackAPIVersion parameter, which corresponds with what I'm seeing in a stack trace:

   File "/usr/local/lib/python3.11/site-packages/mqttasgi/server.py", line 37, in __init__
    self.client = mqtt.Client(client_id=self.client_id, transport=self.transport, userdata={
    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
 TypeError: Client.__init__() missing 1 required positional argument: 'callback_api_version'

Cant run mqttasgi on docker container

Hi, I was wondering if there is any issue with running MQTT asgi on a container.
I am facing this error :

File "/usr/local/lib/python3.9/site-packages/mqttasgi/server.py", line 331, in create_application
mqtt_consumer | self.application_data[app_id]['instance'] = application(scope)
mqtt_consumer | TypeError: call() missing 2 required positional arguments: 'receive' and 'send'

But when I run on my local system, everything works fine.

TypeError: __call__() missing 2 required positional arguments: 'receive' and 'send'

I am getting this error when running this command:

mqttasgi -H test.mosquitto.org -p 1883 iot_logger.asgi:application

File "/Users/xxxx/Documents/Projects/proj_test/proj/env/lib/python3.9/site-packages/mqttasgi/server.py", line 331, in create_application
self.application_data[app_id]['instance'] = application(scope)
TypeError: call() missing 2 required positional arguments: 'receive' and 'send'

problem with running app with mqttasgi

Hi,
Quick question. I've problem with running testing project. Mqttasgi starts and I'am getting messages from mqtt broker but my website is unavailable. I try with dafne and works ok (on dev server website works too) it looks like mqttasgi not runnig my app. What can be wrong?
my asgi.py

import os
import django
from channels.routing import ProtocolTypeRouter
from testmqtt.consumer import MyMqttConsumer
from django.core.asgi import get_asgi_application

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'testmqtt.settings')
django.setup()

application = ProtocolTypeRouter({
        'http': get_asgi_application(),
        'mqtt': MyMqttConsumer.as_asgi(),
    })

in settings.py I changed only:

INSTALLED_APPS = [
    'mqttasgi',
    'main',
...
WSGI_APPLICATION = 'testmqtt.wsgi.application'
ASGI_APPLICATION = "testmqtt.asgi.application"

ImportError: cannot import name 'main' from 'mqttasgi.cli'

After follows the "how-to" steps in README with the description below

On PyCharm 2021.2.2 with the following libraries:

Python==3.7
Django==3.2.11
channels==3.0.4
mqttasgi==0.3.0
paho-mqtt==1.6.1

I have the following traceback error:

PS C:\Users\Dr.Ghonzo\PycharmProjects\testProject> cd app
PS C:\Users\Dr.Ghonzo\PycharmProjects\testProject\app> mqttasgi -H localhost -p 1883 mqtt_app.asgi:application
Traceback (most recent call last):
  File "C:\Users\Dr.Ghonzo\AppData\Local\Programs\Python\Python37\lib\runpy.py", line 193, in _run_module_as_main
    "__main__", mod_spec)
  File "C:\Users\Dr.Ghonzo\AppData\Local\Programs\Python\Python37\lib\runpy.py", line 85, in _run_code
    exec(code, run_globals)
  File "C:\Users\Dr.Ghonzo\PycharmProjects\testProject\env\Scripts\mqttasgi.exe\__main__.py", line 4, in <module>
ImportError: cannot import name 'main' from 'mqttasgi.cli' (c:\users\dr.ghonzo\pycharmprojects\testproject\env\lib\site-packages\mqttasgi\cli.py)
PS C:\Users\Dr.Ghonzo\PycharmProjects\testProject\app>

Error when importing models in consumers.py - Apps aren't loaded yet

Hi,
Great project.
Not sure if this is a bug or not. Your sample code works perfectly for me however when I try and import a model (to save MQTT message to DB) I get an error.

consumers.py

from mqttasgi.consumers import MqttConsumer
from .models import MQTTMessage

class MyMqttConsumer(MqttConsumer):

    async def connect(self):
        await self.subscribe('#', 2)

    async def receive(self, mqtt_message):
        m = MQTTMessage()
        m.topic = mqtt_message['topic']
        m.payload = mqtt_message['payload']
        m.qos = mqtt_message['qos']
        m.save()

    async def disconnect(self):
        await self.unsubscribe('#')

Error

mqttasgi -H localhost -p 1883 iot_platform.asgi:application

Traceback (most recent call last):
  File "/home/patrick/.local/share/virtualenvs/iot_platform-NhBFDZeq/lib/python3.10/site-packages/django/utils/translation/trans_real.py", line 210, in _add_installed_apps_translations
    app_configs = reversed(apps.get_app_configs())
  File "/home/patrick/.local/share/virtualenvs/iot_platform-NhBFDZeq/lib/python3.10/site-packages/django/apps/registry.py", line 147, in get_app_configs
    self.check_apps_ready()
  File "/home/patrick/.local/share/virtualenvs/iot_platform-NhBFDZeq/lib/python3.10/site-packages/django/apps/registry.py", line 138, in check_apps_ready
    raise AppRegistryNotReady("Apps aren't loaded yet.")
django.core.exceptions.AppRegistryNotReady: Apps aren't loaded yet.

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/patrick/.local/share/virtualenvs/iot_platform-NhBFDZeq/bin/mqttasgi", line 33, in <module>
    sys.exit(load_entry_point('mqttasgi==1.1.1', 'console_scripts', 'mqttasgi')())
  File "/home/patrick/.local/share/virtualenvs/iot_platform-NhBFDZeq/lib/python3.10/site-packages/mqttasgi/cli.py", line 41, in main
    application = get_application(args.application)
  File "/home/patrick/.local/share/virtualenvs/iot_platform-NhBFDZeq/lib/python3.10/site-packages/mqttasgi/utils.py", line 8, in get_application
    application = importlib.import_module(module_path)
  File "/usr/lib/python3.10/importlib/__init__.py", line 126, in import_module
    return _bootstrap._gcd_import(name[level:], package, level)
  File "<frozen importlib._bootstrap>", line 1050, in _gcd_import
  File "<frozen importlib._bootstrap>", line 1027, in _find_and_load
  File "<frozen importlib._bootstrap>", line 1006, in _find_and_load_unlocked
  File "<frozen importlib._bootstrap>", line 688, in _load_unlocked
  File "<frozen importlib._bootstrap_external>", line 883, in exec_module
  File "<frozen importlib._bootstrap>", line 241, in _call_with_frames_removed
  File "/home/patrick/django/iot_platform/./iot_platform/asgi.py", line 14, in <module>
    from external_api.consumers import MyMqttConsumer
  File "/home/patrick/django/iot_platform/./external_api/consumers.py", line 6, in <module>
    class MQTTMessage(models.Model):
  File "/home/patrick/django/iot_platform/./external_api/consumers.py", line 7, in MQTTMessage
    class Meta:
  File "/home/patrick/django/iot_platform/./external_api/consumers.py", line 8, in Meta
    verbose_name = _("MQTT Message")
  File "/home/patrick/.local/share/virtualenvs/iot_platform-NhBFDZeq/lib/python3.10/site-packages/django/utils/translation/__init__.py", line 95, in gettext
    return _trans.gettext(message)
  File "/home/patrick/.local/share/virtualenvs/iot_platform-NhBFDZeq/lib/python3.10/site-packages/django/utils/translation/trans_real.py", line 374, in gettext
    _default = _default or translation(settings.LANGUAGE_CODE)
  File "/home/patrick/.local/share/virtualenvs/iot_platform-NhBFDZeq/lib/python3.10/site-packages/django/utils/translation/trans_real.py", line 287, in translation
    _translations[language] = DjangoTranslation(language)
  File "/home/patrick/.local/share/virtualenvs/iot_platform-NhBFDZeq/lib/python3.10/site-packages/django/utils/translation/trans_real.py", line 164, in __init__
    self._add_installed_apps_translations()
  File "/home/patrick/.local/share/virtualenvs/iot_platform-NhBFDZeq/lib/python3.10/site-packages/django/utils/translation/trans_real.py", line 212, in _add_installed_apps_translations
    raise AppRegistryNotReady(
django.core.exceptions.AppRegistryNotReady: The translation infrastructure cannot be initialized before the apps registry is ready. Check that you don't make non-lazy gettext calls at import time.

Clarify Worker API usage

Hi,

I've been really happy with the basic usage of mqttasgi in my project, however I'm struggling to understand how to use the worker API.

I'm attempting to use spawn_worker so I can handle different subscriptions/topics in different consumers.

# try to spawn a worker
await self.spawn_worker(
    app_id=123,
    consumer_path="server.consumers:MQTTConsumer",
    consumer_params={"mqttTopics": ['dt/#']},
)

However, this call always produces an exception:

2022-08-24 12:42:30,687 ERROR    [mqttasgi][app][send] - Exception of 0
2022-08-24 12:42:30,687 ERROR    MQTTConsumer.__init__() got an unexpected keyword argument 'receive'
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/site-packages/mqttasgi/server.py", line 309, in _application_send
    await action_map[msg['type']](app_id, msg)
  File "/usr/local/lib/python3.10/site-packages/mqttasgi/server.py", line 276, in mqttasgi_worker_spawn
    self.create_application(new_app_id, consumer_path=new_consumer_path,
  File "/usr/local/lib/python3.10/site-packages/mqttasgi/server.py", line 349, in create_application
    self.application_data[app_id]['instance'] = application(scope,
TypeError: MQTTConsumer.__init__() got an unexpected keyword argument 'receive'

I've attempted to dig into the code but I've not been able to understand where I'm going wrong here. This error also happens if I attempt to use the provided 'TestMqttConsumer' class and send it a message on the spawn topic:

[2022-08-24 12:52:47] DEBUG [mqttasgi.consumers:97] [mqttasgi][consumer][connect] - Connected!
[2022-08-24 12:52:47] DEBUG [mqttasgi.cli:200] [mqttasgi][app][subscribe] - Subscription to spawn:1
[2022-08-24 12:52:47] DEBUG [mqttasgi.cli:200] [mqttasgi][app][subscribe] - Subscription to kill:1
[2022-08-24 12:53:05] DEBUG [mqttasgi.cli:142] [mqttasgi][mqtt][receive] - Added message to queue app_ids:{0} topic:spawn
[2022-08-24 12:53:05] DEBUG [mqttasgi.consumers:110] [mqttasgi][consumer][receive] - Received spawn:b'123'
[2022-08-24 12:53:05] ERROR [mqttasgi.cli:311] [mqttasgi][app][send] - Exception of 0
[2022-08-24 12:53:05] ERROR [mqttasgi.cli:313] MqttConsumer.__init__() got an unexpected keyword argument 'receive'
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/site-packages/mqttasgi/server.py", line 309, in _application_send
    await action_map[msg['type']](app_id, msg)
  File "/usr/local/lib/python3.10/site-packages/mqttasgi/server.py", line 276, in mqttasgi_worker_spawn
    self.create_application(new_app_id, consumer_path=new_consumer_path,
  File "/usr/local/lib/python3.10/site-packages/mqttasgi/server.py", line 349, in create_application
    self.application_data[app_id]['instance'] = application(scope,
TypeError: MqttConsumer.__init__() got an unexpected keyword argument 'receive'

I understand that the worker API is experimental at the moment, but can you point me in the right direction/clarify the usage of this feature?

Thanks

Issues specifying daphne arguments

Hi.

I run a project using daphne using the following command: daphne -b 0.0.0.0 config.asgi:application.

Is possible to specify something like that in the mqttasgi command ?

mqttasgi -H localhost -p 1883 my_application.asgi:application

Contacto

Hola Santiago: estoy tratando de contactarte pero no lo consigo. ¿Puedes contactarme por favor?

Issue when trying to connect to mqtt with user and password.

Hello @sivulich I have been able to connect with MQTTASGI without any problem when I had no user and password but now when I try to, I am getting an error.
2022-07-01T13:22:01.572991119Z app[web.1]: 2022-07-01 13:22:01,564 DEBUG Found GEOS DLL: <CDLL '/app/.heroku/python/lib/python3.10/site-packages/Shapely.libs/libgeos_c-fdac42b1.so.1.16.0', handle 55dc13f79fb0 at 0x7f9c13663a00>, using it. 2022-07-01T13:22:01.634577656Z app[web.1]: 2022-07-01 13:22:01,634 INFO Numpy was not imported, continuing without requires() 2022-07-01T13:22:01.985655426Z app[web.1]: 2022-07-01 08:22:01,985 DEBUG Using selector: EpollSelector 2022-07-01T13:22:03.206859929Z app[web.1]: 2022-07-01 08:22:03,192 DEBUG Using selector: EpollSelector 2022-07-01T13:22:03.206903494Z app[web.1]: 2022-07-01 08:22:03,203 INFO MQTTASGI initialized. The complete MQTT ASGI protocol server. 2022-07-01T13:22:03.206908379Z app[web.1]: 2022-07-01 08:22:03,204 INFO MQTTASGI Event loops running forever, press Ctrl+C to interrupt. 2022-07-01T13:22:03.206912427Z app[web.1]: 2022-07-01 08:22:03,204 INFO pid 15: send SIGINT or SIGTERM to exit. 2022-07-01T13:22:03.431780079Z app[web.1]: 2022-07-01 08:22:03,429 ERROR Task exception was never retrieved 2022-07-01T13:22:03.431833366Z app[web.1]: future: <Task finished name='Task-2' coro=<Server.mqtt_receive_loop() done, defined at /app/.heroku/python/lib/python3.10/site-packages/mqttasgi/server.py:141> exception=gaierror(-2, 'Name or service not known')> 2022-07-01T13:22:03.431839251Z app[web.1]: Traceback (most recent call last): 2022-07-01T13:22:03.431842375Z app[web.1]: File "/app/.heroku/python/lib/python3.10/site-packages/mqttasgi/server.py", line 146, in mqtt_receive_loop 2022-07-01T13:22:03.431845961Z app[web.1]: self.client.connect(self.host, self.port) 2022-07-01T13:22:03.431848959Z app[web.1]: File "/app/.heroku/python/lib/python3.10/site-packages/paho/mqtt/client.py", line 914, in connect 2022-07-01T13:22:03.431852245Z app[web.1]: return self.reconnect() 2022-07-01T13:22:03.431855144Z app[web.1]: File "/app/.heroku/python/lib/python3.10/site-packages/paho/mqtt/client.py", line 1044, in reconnect 2022-07-01T13:22:03.431858679Z app[web.1]: sock = self._create_socket_connection() 2022-07-01T13:22:03.431861831Z app[web.1]: File "/app/.heroku/python/lib/python3.10/site-packages/paho/mqtt/client.py", line 3685, in _create_socket_connection 2022-07-01T13:22:03.431865209Z app[web.1]: return socket.create_connection(addr, timeout=self._connect_timeout, source_address=source) 2022-07-01T13:22:03.431868442Z app[web.1]: File "/app/.heroku/python/lib/python3.10/socket.py", line 824, in create_connection 2022-07-01T13:22:03.431871699Z app[web.1]: for res in getaddrinfo(host, port, 0, SOCK_STREAM): 2022-07-01T13:22:03.431874631Z app[web.1]: File "/app/.heroku/python/lib/python3.10/socket.py", line 955, in getaddrinfo 2022-07-01T13:22:03.431887711Z app[web.1]: for res in _socket.getaddrinfo(host, port, family, type, proto, flags): 2022-07-01T13:22:03.431891019Z app[web.1]: socket.gaierror: [Errno -2] Name or service not known 2022-07-01T13:22:03.432808197Z app[web.1]: 2022-07-01 08:22:03,432 DEBUG Creating tcp connection to ('dokku-redis-redis-iotstracontech', 6379) 2022-07-01T13:22:03.439470824Z app[web.1]: 2022-07-01 08:22:03,438 DEBUG Closed 0 connection(s)
Could you please help me, I know that my host is working because I can connect myself from a tool called MQTT Explorer.

Here is how i am initializing mqttasgi:
web: mqttasgi -H mosquito-stracontech.clicker.pe -p 1883 --username=jonathan.prieto --password=F.....2022 --verbosity=2 iot_stracontech.asgi:application
And here you can see my MQTT Explorer Config:
Captura de Pantalla 2022-07-01 a la(s) 08 27 41

CONNECTION_ERROR

Hello, i have error.

django==3.0
channels==3.0.5

from mqttasgi.consumers import MqttConsumer
class StepMqttConsumer(MqttConsumer):

async def connect(self):
    await self.subscribe('my/testing/topic', 2)
    
async def receive(self, mqtt_message):
    print('Received a message at topic:', mqtt_message['topic'])
    print('With payload', mqtt_message['payload'])
    print('And QOS:', mqtt_message['qos'])
    pass

async def disconnect(self):
    await self.unsubscribe('my/testing/topic')

application = ProtocolTypeRouter({
'http': django_asgi_app,
'websocket': AllowedHostsOriginValidator(
#AuthMiddlewareStack(
JwtAuthMiddlewareStack(
URLRouter(
[path("ws/chat/<room_name>/", ChatConsumer.as_asgi()),
path("ws/tasks/", TasksConsumer.as_asgi()),]
)
)
),
'mqtt': StepMqttConsumer.as_asgi(),
})

medmike@wstep:~/step/step_api$ /home/medmike/step/venv/bin/mqttasgi -H localhost -p 1883 step_api.asgi:application
2023-12-22 17:02:51,716 WARNING MQTTASGI Received signal CONNECTION_ERROR, terminating
2023-12-22 17:02:51,716 WARNING MQTTASGI Waiting for all applications to close

Can you help me?

MqttConsumers not given a chance to shutdown on SIGINT and SIGTERM

Currently, running MqttConsumer instances do not fully complete shutdown before the server is killed. This results in disconnect()s not being called and therefore unsubscribes not working.

I've come up with a fix for this issue that I'll be posting as a PR below this issue for discussion.

MQTTAsgi queues deleted on unit / script reset

Mqtt asgi uses asyncio queues for communication with applications (line 333 of server.py). These could be stored outside the running instance to enable persistent queues at reset, for example using Redis.
This will prevent the loss of potentially irreplaceable data if the server shutdowns for any reason.

Support for windows

When running on windows:

for signame in ('SIGINT', 'SIGTERM'):
            loop.add_signal_handler(
                getattr(signal, signame),
                functools.partial(self.stop_server, signame)
            )

Generates a NotImplemented exception. This is due to the fact that Windows handles signals different that Unix systems.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.