Git Product home page Git Product logo

python-extractor-utils-mqtt's Introduction

Cognite extractor-utils MQTT extension

The MQTT extension for Cognite extractor-utils provides a way to easily write your own extractors for systems exposing an MQTT interface.

The library is currently under development, and should not be used in production environments yet.

Overview

The MQTT extension for extractor utils subscribes to MQTT topics, automatically serializes the payload into user-defined DTO classes, and handles uploading of data to CDF.

The only part of the extractor necessary to for a user to implement are

  • Describing the payload schema using Python dataclasses
  • Implementing a mapping from the source data model to the CDF data model

As an example, consider this example payload:

{
    "elements": [
        {
            "pumpId": "bridge-pump-1453",
            "startTime": "2022-02-27T12:13:00",
            "duration": 16,
        },
        {
            "pumpId": "bridge-pump-254",
            "startTime": "2022-02-26T16:12:23",
            "duration": 124,
        },
    ]
}

We want to make an extractor that can turn these MQTT messages into CDF events. First, we need to create some data classes describing the expected schema of the payloads:

@dataclass
class PumpEvent:
    pumpId: str
    startTime: str
    duration: int

@dataclass
class PumpEventList:
    elements: List[PumpEvent]

Then, we can create an MqttExtractor instance, subscribe to the appropriate topic, and convert the payload into CDF events:

extractor = MqttExtractor(
    name="PumpMqttExtractor",
    description="Extracting pumping events from an MQTT source",
    version="1.0.0",
)

@extractor.topic(topic="mytopic", qos=1, response_type=PumpEventList)
def subscribe_pump_events(events: PumpEventList) -> Iterable[Event]:
    external_id_prefix = MqttExtractor.get_current_config_file()

    for pump_event in events.elements:
        start_time = arrow.get(pump_event.startTime)
        end_time = start_time.shift(seconds=pump_event.duration)

        yield Event(
            external_id=f"{external_id_prefix}{pump_event.pumpId}-{uuid.uuid4()}",
            start_time=start_time.int_timestamp*1000,
            end_time=end_time.int_timestamp*1000,
        )

with extractor:
    extractor.run()

A demo example is provided in the example.py file.

Contributing

See the contribution guide for extractor-utils for details on contributing.

python-extractor-utils-mqtt's People

Contributors

renovate[bot] avatar mathialo avatar einarmo avatar

Stargazers

GOTO Satoru avatar

Watchers

Peter Nicolai Motzfeldt avatar Louis Salin avatar Vegard Økland avatar Ermias avatar Morten Mjelva avatar Robert Collins avatar Pratuat Amatya avatar Trygve Andre Tønnesland avatar Simon Wolfgang Funke avatar James Cloos avatar Emil Sandstø avatar Aleksandrs Livincovs avatar Nakarin Phooripoom avatar Ilya petin avatar Wim Notredame avatar Øystein Hagen Pettersen avatar Sergei avatar Eirik L. Vullum avatar Daniel Priori avatar Luka Mikec avatar  avatar David Liu avatar Nils Barlaug avatar Vibha Srinivasan avatar Johan avatar Robert Lombardo avatar Jacek Lakomiec avatar Tor Erik avatar Alireza Kandeh avatar Rogerio Saboia Júnior avatar Udeshika Sewwandi avatar Per Magne Florvaag avatar QuocViet Le avatar Priyanka Perera avatar Cecilie Uppard avatar Shehan Neomal Mark Fonseka avatar Shashan avatar  avatar Morten Grønbech avatar

python-extractor-utils-mqtt's Issues

Dependency Dashboard

This issue lists Renovate updates and detected dependencies. Read the Dependency Dashboard docs to learn more.

Open

These updates have all been created already. Click a checkbox below to force a retry/rebase of any.

Ignored or Blocked

These are blocked by an existing closed PR and will not be recreated unless you click a checkbox below.

Detected dependencies

github-actions
.github/workflows/release.yaml
  • actions/checkout v3
  • actions/setup-python v4
  • namoshek/mosquitto-github-action v1
.github/workflows/test_and_build.yml
  • actions/checkout v3
  • actions/setup-python v4
  • namoshek/mosquitto-github-action v1
pep621
pyproject.toml
  • poetry >=0.12
poetry
pyproject.toml
  • python >=3.8,<3.11
  • cognite-extractor-utils ^4.0.0
  • paho-mqtt ^1.6.1
  • black *
  • isort *
  • mypy *
  • flake8 *
  • pytest ^7.0.0
  • pytest-cov ^4.0.0
  • sphinx ^6.0.0
  • sphinx-rtd-theme ^1.0.0
  • pre-commit ^3.0.0
  • twine ^4.0.0

  • Check this box to trigger a request for Renovate to run again on this repository

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.