Git Product home page Git Product logo

ampel-ztf's Introduction


ZTF support for AMPEL



ZTF-specific implementations for Ampel such as:

  • An AlertSupplier compatible with IPAC generated alerts
  • Shaper classes for ingestion
  • Encoding utilities for ZTF names (AMPEL requires integer ids)
  • Classes for archiving alerts

ampel-ztf's People

Contributors

jvansanten avatar vbrinnel avatar renovate[bot] avatar sjoertvv avatar simeonreusch avatar dependabot[bot] avatar wombaugh avatar renovate-bot avatar tiarahung avatar mickaelrigault avatar egraikou avatar github-actions[bot] avatar mafn avatar andi-matter avatar

Watchers

James Cloos avatar  avatar  avatar sheng yang avatar  avatar Ludwig Rauch avatar  avatar

ampel-ztf's Issues

Not able to ingest by avroID

Slack Summary for neutrinos had ZTF18abqkfxn, with 3 detections. However, marshall avro ingestion for ID 595183421115015070 gives an error. This ID is not present in either our database or the marshall one. Potentially, the slack publishing is not giving the most recent photopoint, and an alert was not issued for the first detection. However, the date of most recent detection suggests this is being done correctly.

CatalogMatch handling multiple matches

Make sure that CatalogMatch nicely handles cases where multiple matches are found within search radius. Possible alternatives include returning all matches (in some way that is understood by downstream units), returning the closest in arcsec and returning the closest in physical distance. A prototype exists in CatalogMatchLocal (v7)

Filtering based on additional properties could be handled.

Monitor topic offsets in UW Kafka stream

More than once, the UW Kafka stream has just stopped producing alerts. While the lack of alerts ingested on our side does result in a Grafana alarm, there is no way to tell whether the problem is in Ampel or at UW without manually querying the offsets of the appropriate topics from the Kafka server, e.g. with

[transit] /home/jvsanten > singularity exec --contain /scratch/jvs/singularity/kafka-2.11-2.0.1.simg /opt/kafka/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list epyc.astro.washington.edu:9092 --topic ztf_20181117_programid2
ztf_20181117_programid2:0:0
ztf_20181117_programid2:1:0
ztf_20181117_programid2:2:0
ztf_20181117_programid2:3:0
ztf_20181117_programid2:4:0
ztf_20181117_programid2:5:0
ztf_20181117_programid2:6:0
ztf_20181117_programid2:7:0
ztf_20181117_programid2:8:0
ztf_20181117_programid2:9:0
ztf_20181117_programid2:10:0
ztf_20181117_programid2:11:0
ztf_20181117_programid2:12:0
ztf_20181117_programid2:13:0
ztf_20181117_programid2:14:0
ztf_20181117_programid2:15:0

It should be possible to query the confluent_kafka.Consumer inside of UWAlertLoader for the watermark offsets of each of the topics it is subscribed to and publish these to our own Graphite instance as e.g. ampel.ztf.uw.kafka.topics.ztf_YYYMMDD_programidX. Then, one could construct a Graphite query that resolves to the name of the topic for most recent night and trips an alarm if the sum of high-watermark offsets is not growing.

Action Required: Fix Renovate Configuration

There is an error with this repository's Renovate configuration that needs to be fixed. As a precaution, Renovate will stop PRs until it is resolved.

Error type: Cannot find preset's package (github>AmpelProject/renovate-config)

Missing data when a public alert is first encountered in the previous history of a private alert

As photopoints are not reingested, a photopoint first encountered as a previous candidate is not saved again if it appears as a candidate at some later time. This matters since not all data are included in the previous candidate information. The DB will thus not be stable towards the order in which alerts are processed. (In principle this is a problem with the stream, rather than ampel.)

What can be done:

  • A T3 could make sure information is updated consistently, but this would make the base DB content dependent on whether and when a T3 was run.
  • We could force all candidates to be ingested, even if already in the DB. This could imply a large performance penalty. This could still be an option for some channels that are critically using this information.
  • Change the ingestor such that some missing information are inherited to the previous candidates.

Right now it seems like the third, least invasive, option is sufficient. These are the affected keys:

Key that can be inherited: 'nmtchps', 'srmag2', 'objectidps1', 'distpsnr1', 'maggaiabright', 'sgmag2', 'simag2', 'maggaia', 'nmatches', 'objectidps3', 'distpsnr2', 'sgscore3', 'distpsnr3', 'sgmag3', 'objectidps2', 'simag3', 'jdstartref', 'sgmag1', 'neargaiabright', 'szmag3', 'szmag1', 'srmag1', 'srmag3', 'neargaia', 'jdendref', 'simag1', 'sgscore1', 'sgscore2', 'rfid', 'nframesref', 'szmag2'
(note that we are making the assumption that reference build did not change between history and new obs)

There are a number of keys that for principal reason should not be saved ['jdstarthist', 'ndethist', 'ncovhist', 'jdendhist']. This are different in that they do not only depend on the candidate, but rather on the state at processing (actually more similar to a science record)

A final list of properties which also cannot be inherited, as they are exposure dependent, but which should really be part of the alert. Should contact caltech about these: 'zpclrcov', 'ssnrms', 'dsdiff', 'dsnrms', 'clrrms', 'zpmed', 'clrmed', ,'exptime', 'tooflag'

Dependency Dashboard

This issue lists Renovate updates and detected dependencies. Read the Dependency Dashboard docs to learn more.

Open

These updates have all been created already. Click a checkbox below to force a retry/rebase of any.

Detected dependencies

Branch master
github-actions
.github/workflows/main.yml
  • AmpelProject/Ampel-interface ci-py12-v3@e880b6e4cc9827cb4a85a06d00cb36738d9dda93
poetry
pyproject.toml
  • python ^3.10
  • astropy >=5.0
  • matplotlib ^3.3.4
  • aiohttp ^3.7.3
  • nest-asyncio ^1.4.3
  • backoff ^2.0.0
  • fastavro ^1.9.2
  • requests ^2.25.1
  • requests-toolbelt ^1.0.0
  • confluent-kafka ^2.0.0
  • healpy ^1.15
  • light-curve >=0.9,<0.10
  • ampel-ztf-archive ^0.8.0-alpha.0
  • ampel-core >=0.10.1,<0.11
  • ampel-alerts >=0.10.1,<0.11
  • ampel-photometry >=0.10.0a0,<0.11
  • ampel-plot ^0.9.0a2
  • pandas >=1.5
  • scipy ^1.9.3
  • planobs ^0.7.0
  • pytest ^8.3.2
  • pytest-cov ^5.0.0
  • pytest-mock ^3.14.0
  • mongomock ^4.1.2
  • setuptools *
  • mypy ^1.11.1
  • pytest-timeout ^2.3.1
  • pytest-asyncio ^0.23.8
  • types-requests ^2.25.9
  • before_after ^1.0.1
regex
.github/workflows/main.yml
  • conda-forge/python 3.12.4
Branch stable/v0.8
github-actions
.github/workflows/main.yml
  • AmpelProject/Ampel-interface ci-py12-v3@e880b6e4cc9827cb4a85a06d00cb36738d9dda93
poetry
pyproject.toml
  • python >=3.10,<3.12
  • astropy ^5.0
  • matplotlib ^3.3.4
  • aiohttp ^3.7.3
  • nest-asyncio ^1.4.3
  • backoff ^2.0.0
  • fastavro ^1.9.2
  • requests ^2.25.1
  • requests-toolbelt >=1
  • confluent-kafka ^2.0.0
  • healpy ^1.15
  • light-curve >=0.7,<0.8
  • ampel-ztf-archive ^0.8.0-alpha.0
  • ampel-interface ^0.8.7,<0.9
  • ampel-core >=0.8.9,<0.9
  • ampel-photometry ^0.8.3,<0.9
  • ampel-alerts ^0.8.4,<0.9
  • ampel-plot ^0.8.3-3,<0.9
  • pandas ^1.5.2
  • scipy ^1.9.3
  • planobs ^0.6.1
  • pytest ^7.4.2
  • pytest-cov ^4.1.0
  • pytest-mock ^3.11.1
  • mongomock ^4.1.2
  • mypy ^1.6.0
  • pytest-timeout ^2.2.0
  • pytest-asyncio ^0.21.1
  • before_after ^1.0.1

  • Check this box to trigger a request for Renovate to run again on this repository

Maximize state size or guarantee replayabilitiy (or both)?

Consider a transient observed at increasing times A, B, C, D, E.
These photopoints are delivered in two alerts: 1 and 2.
1 contains ABC and 2 contains CDE.
That fact that 2 does not contain all photopoints could depend e.g. on the 30 day ZTF cap on alert history.

How should the ingester create states based on these alerts, and how is this affected by the order in which the alerts are received?

Current implementation (post afb8e32) only ingests datapoints contained in the alerts.
Two states are created, one with the ABC and one with the CDE datapoints.
This guarantees full replayability (alert-order does not matter).

The previous implementation complemented a state with any older datapoints in the DB.
https://github.com/AmpelProject/Ampel-ZTF/blob/7c15460de57c2026795cb9136af919f20c9655fd/ampel/ztf/ingest/ZiAlertContentIngester.py#L268-L272
This means that if the alerts are received in correct time-order (1 -> 2), we get the states ABC and ABCDE.
However, if the alerts are processed in a time-reversed fashion (2 -> 1) the states will be CDE and ABC.
The outcome thus varies with alert order.

From an information perspective, one could argue that one should always maximize the data available in the state (and thus complement all alerts with all DB data). This would in the correct time-order yield the states ABC and ABCDE and with reversed alert order CDE and ABCDE.

This leaves question:

  • Is this behaviour solely controlled by this clause in the implemented ingester, or thus the same question appear in different places?
  • Is there a "correct" or "desired" solution?
  • Do we actually want this to be something that is parameter driven? In principle a channel could give config parameters to the ingester which regulates whether to maximize state content or guarantee replayability?

Read from archive with arbitrary number of workers

Currently ZTF alerts are inserted into the archive database along with the index of the Kafka partition they were read from. When reading back from the archive, each client gets an index, and reads only alerts with that partition id. Since the UW Kafka mirror has 16 partitions per topic, this means that you must use exactly 16 AlertProcessors to consume alerts from the archive. Using more will result in idle workers, and using fewer will result in seeing only e.g. 7/16 of the alerts.

Instead, it should be possible to split alerts from the archive across an arbitrary number of AlertProcessors. The most straightforward way to do this requires implementing a shared queue in PostgreSQL. Here's a sketch of the design:

  1. Similar to Kafka, consumers will identify themselves by a shared group name. Unlike Kafka, though, the group name is associated with a query, so different groups can consume different sets of alerts.
  2. Upon start, each consumer will attempt to insert a new row in the read_queue_groups table, which has a unique constraint on the group name. At most one consumer will succeed; this one is the alert query executor and will proceed to step 3. All others will proceed to step 4. If the queue for this group already existed, all consumers proceed to step 4.
  3. The alert query executor runs the query (e.g. an observation date range and/or cone search), and inserts rows of (group_id,array(alert_id)) into the read_queue table, where the size of the array is a predetermined block size, e.g. 5000. The block size should be large enough that the row is much larger postgres' fixed tuple overhead, but not so large that hours of work would be lost if the consumer were shut down before it processed the block.
  4. Each consumer queries read_queue for a block of alert_ids using FOR UPDATE SKIP LOCKED to atomically acquire a lock an unclaimed item and release it if the consumer dies before it can process the block.

confluent-kafka 1.9 compatibility

KafkaError seems to have changed:

Traceback (most recent call last):
  File '/opt/conda/envs/env/lib/python3.10/site-packages/ampel/alert/AlertConsumer.py', line 281, in proceed
    for alert in self.alert_supplier:
  File '/opt/conda/envs/env/lib/python3.10/site-packages/ampel/ztf/alert/ZiAlertSupplier.py', line 33, in __next__
    next(self.alert_loader) # type: ignore
  File '/opt/conda/envs/env/lib/python3.10/site-packages/ampel/ztf/t0/load/UWAlertLoader.py', line 84, in __next__
    return next(self._it)
  File '/opt/conda/envs/env/lib/python3.10/site-packages/ampel/ztf/t0/load/UWAlertLoader.py', line 64, in alerts
    for message in itertools.islice(self._consumer, limit):
  File '/opt/conda/envs/env/lib/python3.10/site-packages/ampel/ztf/t0/load/AllConsumingConsumer.py', line 149, in __next__
    message = self.consume()
  File '/opt/conda/envs/env/lib/python3.10/site-packages/ampel/ztf/t0/load/AllConsumingConsumer.py', line 209, in consume
    raise KafkaError(message.error())
  File '/opt/conda/envs/env/lib/python3.10/site-packages/ampel/ztf/t0/load/AllConsumingConsumer.py', line 97, in __init__
    super().__init__(kafka_err.args[0])
AttributeError: 'cimpl.KafkaError' object has no attribute 'args'

ZiAlertContentIngester has a data race for reprocessed photopoints

ZiAlertContentIngester detects reprocessed detections by querying the t0 collection for documents with the same stock, less than 30 days older than the alert jd, whose ids are not in the alert. Those documents are then tagged SUPERSEDED. This can break in two related ways:

  1. alerts are processed out of order, or
  2. alerts are received simultaneously by two AlertProcessors, such that the bulk writes from each are still pending when the t0 collection is queried for history.

In case (1), the document corresponding to a newer alert is tagged SUPERSEDED, while in case (2), neither document is tagged, leading to duplicated photopoints. The latter does happen in the wild, for example with these two alerts that were received on two different Kafka partitions within 100 ms of each other (ingestion_time is UNIX epoch in microseconds):

ztfarchive=# select * from alert where candid in (1391345455815015017, 1391345455815015018);
 alert_id  |       candid        | programid |   objectId   | partition_id |  ingestion_time  |       jd        | schemavsn
-----------+---------------------+-----------+--------------+--------------+------------------+-----------------+-----------
 402732186 | 1391345455815015017 |         1 | ZTF18acruwxq |            5 | 1603456166118019 | 2459145.8454514 | 3.3
 402732183 | 1391345455815015018 |         1 | ZTF18acruwxq |           13 | 1603456166011908 | 2459145.8454514 | 3.3

Since the reprocessing check is fundamentally a multi-document operation, it doesn't seem like there's a way around this that doesn't use transactions.

Proposed flagging system for non-stellar sources

@jvansanten and I discussed a proposed addition to the ingestion process. Currently, the Archive database is being queried directly for ToO follow up, and this is slow/time-consuming. Estimates of the rates suggest that, by using the Decent Filter, the alert rate can be cut to ~0.5% of the base rate by only searching real+non-stellar objects. The idea is that a channel could be defined that would use the Decent filter as a T0, and then setting up a T3 to output the candidateid of all sources (presumably non-stellar) that pass this filter. These alerts could be tagged in the ArchiveDB, and then ToO queries could search only those alerts with the non-stellar flag. A possible additional use case could be to speed up re-runs of ZTF data. In both cases, a naive estimate would be a factor ~200 speed-up. There would be an unavoidable lag, based on the frequency of T3 scheduling, in propagating the tagging through to the database following ingestion. This lag would be a caveat to using this method. Synchronisation issues could also occur, if for example the base filter was changed at some point. At present, the ToO requirement would be only from present alerts going forward, so would not need to be run backwards over the past 2 years of ZTF data.

Ingester can break replayability

Note: does not affect production but can affect re-runs.

Current version of the ingester creates compounds based on all available photopoints (union of alert and DB photopoints). As a consequence, processing the stream backwards would result in the creation of only one 'state': the latest one. This can be easily fixed by excluding (from the compound) photopoints with jd greater than the photopoint that triggered the alert (defined in candidate). That way, processing the stream backward would result in the creation of all intermediate states as it should be and thanks to the union, we would still be able to create compounds containing photopoints older than 30 days.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.