import asyncio
import json
import logging
from base64 import b64encode
from io import BytesIO
import gracedb_sdk
import ligo.skymap.bayestar
import ligo.skymap.io.events.ligolw
import pathway as pw
from astropy.time import Time
from gcn_kafka.core import get_config
from igwn_ligolw.ligolw import Document
from igwn_ligolw.lsctables import CoincInspiralTable, ProcessTable, SnglInspiralTable
from igwn_ligolw.utils import load_fileobj
from ligo.skymap.io.events.ligolw import ContentHandler
from ligo.skymap.io.fits import write_sky_map
from . import Settings
from .models import EventData as _EventData
__all__ = ("main",)
def load_xml_bytes(raw: bytes) -> Document:
return load_fileobj(BytesIO(raw), contenthandler=ContentHandler)
class EventData(_EventData, pw.Schema):
pass
@pw.udf(deterministic=True)
def parse_coinc_xml(raw: bytes):
"""Parse event metadata from a LIGO-LW coinc.xml file."""
doc = load_xml_bytes(raw)
(coinc_inspiral,) = CoincInspiralTable.get_table(doc)
(process,) = ProcessTable.get_table(doc)
(sngl_inspiral, *_) = SnglInspiralTable.get_table(doc)
return (
coinc_inspiral.combined_far,
coinc_inspiral.snr,
sngl_inspiral.mass1,
sngl_inspiral.mass2,
sngl_inspiral.mtotal,
sngl_inspiral.mchirp,
Time(
coinc_inspiral.end_time, 1e-9 * coinc_inspiral.end_time_ns, format="gps"
).utc.unix
* 1000,
process.program,
",".join(coinc_inspiral.instruments),
)
@pw.udf(deterministic=True)
def localize(raw: bytes) -> bytes:
"""Read a coinc.xml file, and transform it into a sky map."""
xml_doc = load_xml_bytes(raw)
(event,) = ligo.skymap.io.events.ligolw.open(xml_doc, xml_doc, None).values()
sky_map = ligo.skymap.bayestar.localize(event, f_low=15.0)
with BytesIO() as f:
write_sky_map(f, sky_map, moc=True)
return f.getvalue()
@pw.udf
def make_alert(
far: float,
instruments: str,
skymap: bytes,
program: str,
timestamp: float,
superevent_id: pw.Pointer,
) -> str:
"""Construct an igwn.gwalert record."""
return json.dumps(
{
"event": {
"far": far,
"group": "CBC",
"instruments": instruments,
"skymap": b64encode(skymap).decode(),
"pipeline": program,
"time": Time(0.001 * timestamp, format="unix").isot,
},
"superevent_id": str(superevent_id),
"time_created": Time.now().isot,
}
)
[docs]
def main(settings: Settings | None = None):
"""Main entrypoint for Pathway pipeline."""
if settings is None:
settings = Settings() # type: ignore[call-arg]
event = pw.io.kafka.read(
get_config("consumer", settings.kafka_config),
settings.kafka_topic_coinc,
format="raw",
).select(parsed=parse_coinc_xml(pw.this.data), raw=pw.this.data)
event = pw.utils.col.unpack_col(event.parsed, schema=EventData).select(
*pw.this, raw=event.raw, event_uuid=pw.this.id
)
superevent = (
event.windowby(event.event_time, window=pw.temporal.session(max_gap=1000))
.reduce(
list_of_event_uuid=pw.reducers.tuple(pw.this.id),
list_of_event_raw=pw.reducers.tuple(pw.this.raw),
superevent_time=pw.reducers.avg(pw.this.event_time),
superevent_time_min=pw.reducers.min(pw.this.event_time),
superevent_time_max=pw.reducers.max(pw.this.event_time),
preferred_event_uuid=pw.reducers.argmax(pw.this.snr),
)
.join(event, pw.left.preferred_event_uuid == event.event_uuid)
.select(
pw.this.list_of_event_uuid,
pw.this.list_of_event_raw,
pw.this.superevent_time,
pw.this.superevent_time_min,
pw.this.superevent_time_max,
pw.this.preferred_event_uuid,
event.far,
event.instruments,
event.event_time,
event.program,
superevent_uuid=pw.this.id,
skymap=localize(event.raw),
)
)
if settings.gracedb_url is not None:
gracedb = gracedb_sdk.AsyncClient(
url=settings.gracedb_url, auth_reload=True, fail_if_noauth=True
)
async def write_to_gracedb(key, row, *args, **kwargs):
responses = await asyncio.gather(
*(
gracedb.events.create(
filename="coinc.xml",
group="Test",
pipeline="gstlal",
filecontents=raw,
)
for raw in row["list_of_event_raw"]
)
)
events = [response["graceid"] for response in responses]
preferred_event = events[
row["list_of_event_uuid"].index(row["preferred_event_uuid"])
]
logging.info("GraceDB: wrote events %s", " ".join(events))
response = await gracedb.superevents.create(
preferred_event=preferred_event,
events=events,
t_0=Time(row["superevent_time"] * 1e-3, format="unix").gps,
t_start=Time(row["superevent_time_min"] * 1e-3, format="unix").gps,
t_end=Time(row["superevent_time_max"] * 1e-3, format="unix").gps,
category="test",
)
logging.info("GraceDB: wrote superevent %s", response["superevent_id"])
pw.io.subscribe(superevent, write_to_gracedb)
pw.io.kafka.write(
superevent.select(
data=make_alert(
far=pw.this.far,
instruments=pw.this.instruments,
skymap=pw.this.skymap,
program=pw.this.program,
timestamp=pw.this.event_time,
superevent_id=pw.this.superevent_uuid,
)
),
get_config(
"producer", settings.kafka_config, **{"message.max.bytes": "4194304"}
),
settings.kafka_topic_alert,
format="plaintext",
)
pw.io.postgres.write(
event.without(pw.this.raw),
settings.postgres_config,
"event",
init_mode="create_if_not_exists",
output_table_type="snapshot",
primary_key=[pw.this.event_uuid],
)
pw.io.postgres.write(
event.select(pw.this.event_uuid, pw.this.raw),
settings.postgres_config,
"eventxml",
init_mode="create_if_not_exists",
output_table_type="snapshot",
primary_key=[pw.this.event_uuid],
)
pw.io.postgres.write(
superevent.select(
pw.this.superevent_uuid,
event_uuid=pw.this.list_of_event_uuid,
).flatten(pw.this.event_uuid),
settings.postgres_config,
"eventmap",
init_mode="create_if_not_exists",
output_table_type="snapshot",
primary_key=[pw.this.event_uuid, pw.this.superevent_uuid],
)
pw.io.postgres.write(
superevent.select(
pw.this.superevent_uuid,
pw.this.superevent_time,
pw.this.preferred_event_uuid,
),
settings.postgres_config,
"superevent",
init_mode="create_if_not_exists",
output_table_type="snapshot",
primary_key=[pw.this.superevent_uuid],
)
pw.io.postgres.write(
superevent.select(pw.this.superevent_uuid, raw=pw.this.skymap),
settings.postgres_config,
"supereventfits",
init_mode="create_if_not_exists",
output_table_type="snapshot",
primary_key=[pw.this.superevent_uuid],
)
pw.run(monitoring_level=pw.MonitoringLevel.NONE)
if __name__ == "__main__":
main()