Git Product home page Git Product logo

gcn-kafka-python's Introduction

PyPI codecov

GCN Kafka Client for Python

This is the official Python client for the General Coordinates Network (GCN). It is a very lightweight wrapper around confluent-kafka-python.

To Install

Run this command to install with pip:

pip install gcn-kafka

or this command to install with with conda:

conda install -c conda-forge gcn-kafka

To Use

Create a consumer.

from gcn_kafka import Consumer
consumer = Consumer(client_id='fill me in',
                    client_secret='fill me in')

List all topics:

print(consumer.list_topics().topics)

Subscribe to topics and receive alerts:

consumer.subscribe(['gcn.classic.text.FERMI_GBM_FIN_POS',
                    'gcn.classic.text.LVC_INITIAL'])
while True:
    for message in consumer.consume(timeout=1):
        if message.error():
            print(message.error())
            continue
        print(message.value())

The timeout argument to consume(), given as an integer number of seconds, will allow the program to exit quickly once it has reached the end of the existing message buffer. This is useful for users who just want to recover an older message from the stream. timeout will also make the while True infinite loop interruptible via the standard ctrl-c key sequence, which consume() ignores.

Testing and Development Kafka Clusters

GCN has three Kafka clusters: production, testing, and an internal development deployment. Use the optional domain keyword argument to select which broker to connect to.

# Production (default)
consumer = Consumer(client_id='fill me in',
                    client_secret='fill me in',
                    domain='gcn.nasa.gov')

# Testing
consumer = Consumer(client_id='fill me in',
                    client_secret='fill me in',
                    domain='test.gcn.nasa.gov')

# Development (internal)
consumer = Consumer(client_id='fill me in',
                    client_secret='fill me in',
                    domain='dev.gcn.nasa.gov')

FAQ

How can I keep track of the last read message when restarting a client?

A key feature of kafka consumer clients is the ability to perform persistent tracking of which messages have been read. This allows clients to recover missed messages after a restart by beginning at the earliest unread message rather than the next available message from the stream. In order to enable this feature, you will need to set a client Group ID using the configuration dictionary argument for the Consumer class as well as change the auto offset reset option to the ‘earliest’ setting. Once this is done, every new client with the given Group ID will begin reading the specified topic at the earliest unread message. When doing this, it is recommended to turn OFF the auto commit feature because it can lose track of the last read message if the client crashes before the auto commit interval (5 seconds by default) occurs. Manually committing messages (i.e. storing the state of the last read message) once they are read is the most robust method for tracking the last read message.

Example code:

from gcn_kafka import Consumer

config = {'group.id': 'my group name',
          'auto.offset.reset': 'earliest',
          'enable.auto.commit': False}

consumer = Consumer(config=config,
                    client_id='fill me in',
                    client_secret='fill me in',
                    domain='gcn.nasa.gov')

topics = ['gcn.classic.voevent.FERMI_GBM_SUBTHRESH']
consumer.subscribe(topics)

while True:
    for message in consumer.consume(timeout=1):
        print(message.value())
        consumer.commit(message)

How can I read messages beginning at the earliest available messages for a given stream?

You can begin reading a given topic stream from the earliest message that is present in the stream buffer by setting the Group ID to an empty string and applying the ‘earliest’ setting for the auto offset reset option in the configuration dictionary argument for the Consumer class. This feature allows the user to scan for older messages for testing purposes or to recover messages that may have been missed due to a crash or network outage. Just keep in mind that the stream buffers are finite in size. They currently hold messages from the past few days.

Example code:

from gcn_kafka import Consumer

config = {'auto.offset.reset': 'earliest'}

consumer = Consumer(config=config,
                    client_id='fill me in',
                    client_secret='fill me in',
                    domain='gcn.nasa.gov')

topics = ['gcn.classic.voevent.INTEGRAL_SPIACS']
consumer.subscribe(topics)

while True:
    for message in consumer.consume(timeout=1):
        print(message.value())

How can I search for messages occurring within a given date range?

To search for messages in a given date range, you can use the offsets_for_times() function from the Consumer class to get the message offsets for the desired date range. You can then assign the starting offset to the Consumer and read the desired number of messages. When doing so, keep in mind that the stream buffers are finite in size. It is not possible to recover messages prior to the start of the stream buffer. The GCN stream buffers are currently set to hold messages from the past few days.

Example code:

import datetime
from gcn_kafka import Consumer
from confluent_kafka import TopicPartition

consumer = Consumer(client_id='fill me in',
                    client_secret='fill me in',
                    domain='gcn.nasa.gov')

# get messages occurring 3 days ago
timestamp1 = int((datetime.datetime.now() - datetime.timedelta(days=3)).timestamp() * 1000)
timestamp2 = timestamp1 + 86400000 # +1 day

topic = 'gcn.classic.voevent.INTEGRAL_SPIACS'
start = consumer.offsets_for_times(
    [TopicPartition(topic, 0, timestamp1)])
end = consumer.offsets_for_times(
    [TopicPartition(topic, 0, timestamp2)])

consumer.assign(start)
for message in consumer.consume(end[0].offset - start[0].offset, timeout=1):
    print(message.value())

Known Issues

confluent-kafka-python

If you use confluent-kafka-python v2.1.0 or v2.1.1 with librdkafka v2.1.1 you will encounter a segmentation fault when subscribed to unavailable topics.

Please refer to the confluent-kafka-python github issue for updates on the issue.

gcn-kafka-python's People

Contributors

courey avatar dakota002 avatar joshuarwood avatar lpsinger avatar pre-commit-ci[bot] avatar titodalcanton avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar

gcn-kafka-python's Issues

consumer segfaulting when connecting to test.gcn.nasa.gov

Hi

Was trying to listen in on some testing messages I'm trying to generate and send from icecube.* but am having trouble with simply listening to the test instance.

This code
consumer = Consumer(client_id='XXX',
client_secret='XXX',
domain='test.gcn.nasa.gov')

Subscribe to topics and receive alerts

consumer.subscribe(['gcn.classic.text.ICECUBE_ASTROTRACK_BRONZE',
'gcn.classic.text.ICECUBE_ASTROTRACK_GOLD',
'gcn.classic.text.ICECUBE_CASCADE'])
while True:
for message in consumer.consume(timeout=1):
value = message.value()
print(value)

(when using credentials from test.gcn.nasa.gov)
segfaults (backtrace attached)

Similar code, without the "domain" setting and client_* from gcn.nasa.gov has worked fine

This is gcn-kafka 0.3.0, using Homebrew python (Mac OS 13.3.1,Python 3.11.3)

Anything I'm doing wrong?

gcn_kafka_crash.txt

Document the `timeout` kwarg of `consumer.consume()`

The example code I found (both in this repository and on the new GCN website) suggests calling consumer.consume() to receive messages. I found a minor annoyance with this: the call blocks the script until the next message is received, and during this interval, SIGINT signals are ignored. I guess this behavior is inherited from the confluent_kafka client. This implies that the client script cannot be easily interrupted with the standard Ctrl-C, which is an annoyance when a new user is playing and debugging.

Fortunately, consume() has a convenient timeout kwarg (see the confluent_kafka documentation). Setting this to 1, for example, restores the ability to interrupt the client script with Ctrl-C, because the call returns after 1 s at most.

The timeout kwarg is mentioned in the README here, but I think the various GCN examples should include a timeout=1 by default, to make it immediately obvious to everyone that this is possible.

Trouble installing with 'pip3 install gcn-kafka' on macOS 11.6.6

Trying to pip install gcn-kafka on macOS 11.6.6 fails with:

confluent_kafka/src/confluent_kafka.h:23:10: fatal error: 'librdkafka/rdkafka.h' file not found #include <librdkafka/rdkafka.h>

because macOS 10.6.6 comes with Python3.10.2 and the latest release of confluent-kafka-python does not have wheels for that version. These are the steps that I used to get it working:

  1. install openssl with headers that we can compile against
    brew install openssl
  2. add headers/lib to the environment
    export LDFLAGS="-L/usr/local/opt/openssl@3/lib"
    export CPPFLAGS="-I/usr/local/opt/openssl@3/include"
  3. compile librdkafka against openssl installed with homebrew. I used the --prefix of ./configure to install outside of usr since I generally avoid messing with usr.
  4. add to environment
    export CPPFLAGS="$CPPFLAGS -I/path/librdkafka/install/include -L/path/librdkafka/install/lib"
    export DYLD_LIBRARY_PATH=$DYLD_LIBRARY_PATH:"/path/librdkafka/install/lib"
  5. pip3 install gcn-kafka

This method probably isn't recommended for the average user since it involves messing around with a lot of environmental variables. I'll try a simpler method tomorrow using a homebrew install of librdkafka.

Increase test coverage

#8 highlights the need for better test coverage, especially in the highest level Consumer and Producer constructors.

Consumer.consume() auto-commits even when disabled by 'enable.auto.commit'

consume() currently always auto-commits, regardless of the setting for 'enable.auto.commit'. I believe this is an upstream issue in confluent-kafka, see confluentinc/confluent-kafka-python#1299

It might be nice to adjust the example in the readme in the meantime to use poll() instead (which works as expected), or add a comment to it that the functionality is currently broken.

I'll be happy to make a PR for an updated example if that's desired. Might save someone else some time trying to track down this issue.

Subscribed topic not available: xxx Broker: Unknown topic or partition

I (think) I've sent some new icecube test alerts, via code like:

    from gcn_kafka import Producer
    if alert is None:
            logger.fatal('Found no alert to send')
    producer = Producer(client_id='xx',
                        client_secret='xx',
                        domain='test.gcn.nasa.gov')
    topic = 'icecube.test.gold_bronze_track_alerts'
    sendme = json.dumps(alert)
    ret = producer.produce(topic, sendme.encode())
    producer.flush()
    return ret

Where the client_* is a valid ID from my test.gcn.nasa.gov account, and alert is a dictionary of alert values.
Sending test alerts returns None (I think as expected) and does not throw any errors

But, when I setup a test listener configured for
consumer.subscribe(['gcn.classic.text.ICECUBE_ASTROTRACK_BRONZE',
'icecube.test.gold_bronze_track_alerts',
'gcn.classic.text.FERMI_LAT_POS_TEST',
'gcn.classic.text.ICECUBE_ASTROTRACK_GOLD',
'gcn.classic.text.ICECUBE_CASCADE',
'gcn.classic.text.LVC_INITIAL',
'gcn.classic.text.LVC_PRELIMINARY',
'gcn.classic.text.LVC_RETRACTION’])
with the listener scope:
scope: gcn.nasa.gov/kafka-icecube-consumer
I see:
b'Subscribed topic not available: gcn.classic.text.FERMI_LAT_POS_TEST: Broker: Topic authorization failed'
b'Subscribed topic not available: gcn.classic.text.ICECUBE_ASTROTRACK_BRONZE: Broker: Topic authorization failed'
b'Subscribed topic not available: gcn.classic.text.ICECUBE_ASTROTRACK_GOLD: Broker: Topic authorization failed'
b'Subscribed topic not available: gcn.classic.text.ICECUBE_CASCADE: Broker: Topic authorization failed'
b'Subscribed topic not available: gcn.classic.text.LVC_INITIAL: Broker: Topic authorization failed'
b'Subscribed topic not available: gcn.classic.text.LVC_PRELIMINARY: Broker: Topic authorization failed'
b'Subscribed topic not available: gcn.classic.text.LVC_RETRACTION: Broker: Topic authorization failed'
b'Subscribed topic not available: icecube.test.gold_bronze_track_alerts: Broker: Unknown topic or partition’

If I use a consumer with scope:

scope: gcn.nasa.gov/kafka-public-consumer with the same list
I just get:
b'Subscribed topic not available: icecube.test.gold_bronze_track_alerts: Broker: Unknown topic or partition’

But somehow, I'm not able to see the alerts I've send on the icecube.test.gold_bronze_track_alerts topic.

Any suggestions or guidance?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.