Source code for shigawire.pathway

import asyncio
import json
import logging
from base64 import b64encode
from io import BytesIO

import gracedb_sdk
import ligo.skymap.bayestar
import ligo.skymap.io.events.ligolw
import pathway as pw
from astropy.time import Time
from gcn_kafka.core import get_config
from igwn_ligolw.ligolw import Document
from igwn_ligolw.lsctables import CoincInspiralTable, ProcessTable, SnglInspiralTable
from igwn_ligolw.utils import load_fileobj
from ligo.skymap.io.events.ligolw import ContentHandler
from ligo.skymap.io.fits import write_sky_map

from . import Settings
from .models import EventData as _EventData

__all__ = ("main",)


def load_xml_bytes(raw: bytes) -> Document:
    return load_fileobj(BytesIO(raw), contenthandler=ContentHandler)


class EventData(_EventData, pw.Schema):
    pass


@pw.udf(deterministic=True)
def parse_coinc_xml(raw: bytes):
    """Parse event metadata from a LIGO-LW coinc.xml file."""
    doc = load_xml_bytes(raw)
    (coinc_inspiral,) = CoincInspiralTable.get_table(doc)
    (process,) = ProcessTable.get_table(doc)
    (sngl_inspiral, *_) = SnglInspiralTable.get_table(doc)
    return (
        coinc_inspiral.combined_far,
        coinc_inspiral.snr,
        sngl_inspiral.mass1,
        sngl_inspiral.mass2,
        sngl_inspiral.mtotal,
        sngl_inspiral.mchirp,
        Time(
            coinc_inspiral.end_time, 1e-9 * coinc_inspiral.end_time_ns, format="gps"
        ).utc.unix
        * 1000,
        process.program,
        ",".join(coinc_inspiral.instruments),
    )


@pw.udf(deterministic=True)
def localize(raw: bytes) -> bytes:
    """Read a coinc.xml file, and transform it into a sky map."""
    xml_doc = load_xml_bytes(raw)
    (event,) = ligo.skymap.io.events.ligolw.open(xml_doc, xml_doc, None).values()
    sky_map = ligo.skymap.bayestar.localize(event, f_low=15.0)
    with BytesIO() as f:
        write_sky_map(f, sky_map, moc=True)
        return f.getvalue()


@pw.udf
def make_alert(
    far: float,
    instruments: str,
    skymap: bytes,
    program: str,
    timestamp: float,
    superevent_id: pw.Pointer,
) -> str:
    """Construct an igwn.gwalert record."""
    return json.dumps(
        {
            "event": {
                "far": far,
                "group": "CBC",
                "instruments": instruments,
                "skymap": b64encode(skymap).decode(),
                "pipeline": program,
                "time": Time(0.001 * timestamp, format="unix").isot,
            },
            "superevent_id": str(superevent_id),
            "time_created": Time.now().isot,
        }
    )


[docs] def main(settings: Settings | None = None): """Main entrypoint for Pathway pipeline.""" if settings is None: settings = Settings() # type: ignore[call-arg] event = pw.io.kafka.read( get_config("consumer", settings.kafka_config), settings.kafka_topic_coinc, format="raw", ).select(parsed=parse_coinc_xml(pw.this.data), raw=pw.this.data) event = pw.utils.col.unpack_col(event.parsed, schema=EventData).select( *pw.this, raw=event.raw, event_uuid=pw.this.id ) superevent = ( event.windowby(event.event_time, window=pw.temporal.session(max_gap=1000)) .reduce( list_of_event_uuid=pw.reducers.tuple(pw.this.id), list_of_event_raw=pw.reducers.tuple(pw.this.raw), superevent_time=pw.reducers.avg(pw.this.event_time), superevent_time_min=pw.reducers.min(pw.this.event_time), superevent_time_max=pw.reducers.max(pw.this.event_time), preferred_event_uuid=pw.reducers.argmax(pw.this.snr), ) .join(event, pw.left.preferred_event_uuid == event.event_uuid) .select( pw.this.list_of_event_uuid, pw.this.list_of_event_raw, pw.this.superevent_time, pw.this.superevent_time_min, pw.this.superevent_time_max, pw.this.preferred_event_uuid, event.far, event.instruments, event.event_time, event.program, superevent_uuid=pw.this.id, skymap=localize(event.raw), ) ) if settings.gracedb_url is not None: gracedb = gracedb_sdk.AsyncClient( url=settings.gracedb_url, auth_reload=True, fail_if_noauth=True ) async def write_to_gracedb(key, row, *args, **kwargs): responses = await asyncio.gather( *( gracedb.events.create( filename="coinc.xml", group="Test", pipeline="gstlal", filecontents=raw, ) for raw in row["list_of_event_raw"] ) ) events = [response["graceid"] for response in responses] preferred_event = events[ row["list_of_event_uuid"].index(row["preferred_event_uuid"]) ] logging.info("GraceDB: wrote events %s", " ".join(events)) response = await gracedb.superevents.create( preferred_event=preferred_event, events=events, t_0=Time(row["superevent_time"] * 1e-3, format="unix").gps, t_start=Time(row["superevent_time_min"] * 1e-3, format="unix").gps, t_end=Time(row["superevent_time_max"] * 1e-3, format="unix").gps, category="test", ) logging.info("GraceDB: wrote superevent %s", response["superevent_id"]) pw.io.subscribe(superevent, write_to_gracedb) pw.io.kafka.write( superevent.select( data=make_alert( far=pw.this.far, instruments=pw.this.instruments, skymap=pw.this.skymap, program=pw.this.program, timestamp=pw.this.event_time, superevent_id=pw.this.superevent_uuid, ) ), get_config( "producer", settings.kafka_config, **{"message.max.bytes": "4194304"} ), settings.kafka_topic_alert, format="plaintext", ) pw.io.postgres.write( event.without(pw.this.raw), settings.postgres_config, "event", init_mode="create_if_not_exists", output_table_type="snapshot", primary_key=[pw.this.event_uuid], ) pw.io.postgres.write( event.select(pw.this.event_uuid, pw.this.raw), settings.postgres_config, "eventxml", init_mode="create_if_not_exists", output_table_type="snapshot", primary_key=[pw.this.event_uuid], ) pw.io.postgres.write( superevent.select( pw.this.superevent_uuid, event_uuid=pw.this.list_of_event_uuid, ).flatten(pw.this.event_uuid), settings.postgres_config, "eventmap", init_mode="create_if_not_exists", output_table_type="snapshot", primary_key=[pw.this.event_uuid, pw.this.superevent_uuid], ) pw.io.postgres.write( superevent.select( pw.this.superevent_uuid, pw.this.superevent_time, pw.this.preferred_event_uuid, ), settings.postgres_config, "superevent", init_mode="create_if_not_exists", output_table_type="snapshot", primary_key=[pw.this.superevent_uuid], ) pw.io.postgres.write( superevent.select(pw.this.superevent_uuid, raw=pw.this.skymap), settings.postgres_config, "supereventfits", init_mode="create_if_not_exists", output_table_type="snapshot", primary_key=[pw.this.superevent_uuid], ) pw.run(monitoring_level=pw.MonitoringLevel.NONE)
if __name__ == "__main__": main()