Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
169 changes: 133 additions & 36 deletions python/lsst/pipe/tasks/matchDiffimSourceInjected.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
"MatchInjectedToAssocDiaSourceConfig"]

import astropy.units as u
from astropy.table import Table, join, vstack
import numpy as np
from scipy.spatial import cKDTree

Expand All @@ -43,9 +44,9 @@ class MatchInjectedToDiaSourceConnections(
dimensions=("instrument",
"visit",
"detector")):
injectedCat = connTypes.Input(
injectionCat = connTypes.Input(
Comment on lines -46 to +47
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This connection name change breaks previous pipelines and tests. Is it necessary? If so, you'll need to update drp_pipe as well.

doc="Catalog of sources injected in the images.",
name="{fakesType}_pvi_catalog",
name="{fakesType}VisitDetectorFakeSourceCat",
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should {fakesType} be dropped from all of the connections?

storageClass="ArrowAstropy",
dimensions=("instrument", "visit", "detector"),
)
Expand All @@ -66,7 +67,7 @@ class MatchInjectedToDiaSourceConnections(
"diaSrc. The schema is the union of the schemas for "
"``fakeCat`` and ``diaSrc``.",
name="{fakesType}{coaddName}Diff_matchDiaSrc",
storageClass="DataFrame",
storageClass="ArrowAstropy",
dimensions=("instrument", "visit", "detector"),
)

Expand Down Expand Up @@ -111,12 +112,13 @@ class MatchInjectedToDiaSourceTask(PipelineTask):
_DefaultName = "matchInjectedToDiaSource"
ConfigClass = MatchInjectedToDiaSourceConfig

def run(self, injectedCat, diffIm, diaSources):
# def run(self, injectedCat, injectedTemplateCat, diffIm, diaSources):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove commented-out code

def run(self, injectionCat, diffIm, diaSources):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See above comment on injectedCat vs injectionCat

"""Match injected sources to detected diaSources within a difference image bound.

Parameters
----------
injectedCat : `astropy.table.table.Table`
injectionCat : `astropy.table.table.Table`
Table of catalog of synthetic sources to match to detected diaSources.
diffIm : `lsst.afw.image.Exposure`
Difference image where ``diaSources`` were detected.
Expand All @@ -128,24 +130,32 @@ def run(self, injectedCat, diffIm, diaSources):
Results struct with components.

- ``matchedDiaSources`` : Fakes matched to input diaSources. Has
length of ``injectedCalexpCat``. (`pandas.DataFrame`)
length of ``injectionCat``. (`astropy.table.Table`)
"""

if self.config.doMatchVisit:
fakeCat = self._trimFakeCat(injectedCat, diffIm)
fakeCat = self._trimFakeCat(injectionCat, diffIm)
else:
fakeCat = injectedCat
fakeCat = injectionCat
if self.config.doForcedMeasurement:
self._estimateFakesSNR(fakeCat, diffIm)

return self._processFakes(fakeCat, diaSources)
# Split the fake catalog into the initial injections and the variable sources themselves,
# which are generated as duplicates of the initial injections with a twin_id column.
# We then match only the initial injections to the diaSources,
# and then add back in the variable sources by matching them to their twins
initialFakeCat, variableDoublesFakeCat = self._splitVariables(fakeCat)
matchedFakes = self._processFakes(initialFakeCat, diaSources)
fullMatchedFakes = self._add_variables_to_matched(matchedFakes, variableDoublesFakeCat)

def _estimateFakesSNR(self, injectedCat, diffIm):
return Struct(matchDiaSources=fullMatchedFakes)

def _estimateFakesSNR(self, injectionCat, diffIm):
"""Estimate the signal-to-noise ratio of the fakes in the given catalog.

Parameters
----------
injectedCat : `astropy.table.Table`
injectionCat : `astropy.table.Table`
Catalog of synthetic sources to estimate the S/N of. **This table
will be modified in place**.
diffIm : `lsst.afw.image.Exposure`
Expand Down Expand Up @@ -176,8 +186,8 @@ def _estimateFakesSNR(self, injectedCat, diffIm):

# Create an afw table from the input catalog
outputCatalog = afwTable.SourceCatalog(schema)
outputCatalog.reserve(len(injectedCat))
for row in injectedCat:
outputCatalog.reserve(len(injectionCat))
for row in injectionCat:
outputRecord = outputCatalog.addNew()
outputRecord.setId(row['injection_id'])
outputRecord.setCoord(lsstGeom.SpherePoint(row["ra"], row["dec"], lsstGeom.degrees))
Expand Down Expand Up @@ -205,17 +215,18 @@ def _estimateFakesSNR(self, injectedCat, diffIm):
# Add the forced measurement columns to the input catalog
for column in forcedSources_table.columns:
if "Flux" in column or "flag" in column:
injectedCat["forced_"+column] = forcedSources_table[column]
injectionCat["forced_"+column] = forcedSources_table[column]

# Add the SNR columns to the input catalog
for column in injectedCat.colnames:
for column in injectionCat.colnames:
if column.endswith("instFlux"):
flux = injectedCat[column]
fluxErr = injectedCat[column+"Err"].copy()
# flux = injectionCat[column]
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove commented-out code

flux = np.abs(injectionCat[column])
fluxErr = injectionCat[column+"Err"].copy()
fluxErr = np.where(
(fluxErr <= 0) | (np.isnan(fluxErr)), np.nanmax(fluxErr), fluxErr)

injectedCat[column+"_SNR"] = flux / fluxErr
injectionCat[column+"_SNR"] = flux / fluxErr

def _processFakes(self, injectedCat, diaSources):
"""Match fakes to detected diaSources within a difference image bound.
Expand All @@ -238,12 +249,12 @@ def _processFakes(self, injectedCat, diaSources):
length of ``fakeCat``. (`pandas.DataFrame`)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this actually an astropy Table, not a DataFrame?

"""
# First match the diaSrc to the injected fakes
injectedCat = injectedCat.to_pandas()
# injectedCat = injectedCat.to_pandas()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove commented-out code

nPossibleFakes = len(injectedCat)

fakeVects = self._getVectors(
np.radians(injectedCat.ra),
np.radians(injectedCat.dec))
np.radians(injectedCat['ra']),
np.radians(injectedCat['dec']))
diaSrcVects = self._getVectors(
diaSources['coord_ra'],
diaSources['coord_dec'])
Expand All @@ -252,16 +263,28 @@ def _processFakes(self, injectedCat, diaSources):
dist, idxs = diaSrcTree.query(
fakeVects,
distance_upper_bound=np.radians(self.config.matchDistanceArcseconds / 3600))
nFakesFound = np.isfinite(dist).sum()
# handshake matching, that is symmetrize the match by matching the
# diaSrcs back to the fakes and only keeping those matches where the
# same pair is returned
diaSrcTreeBack = cKDTree(fakeVects)
distBack, idxsBack = diaSrcTreeBack.query(
diaSrcVects,
distance_upper_bound=np.radians(self.config.matchDistanceArcseconds / 3600))

idxsAux = np.where(np.array(idxs) < len(diaSources), idxs, -1)
idxsBackMatched = np.where(idxsAux >= 0, idxsBack[idxsAux], -1)
Comment on lines +274 to +275
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
idxsAux = np.where(np.array(idxs) < len(diaSources), idxs, -1)
idxsBackMatched = np.where(idxsAux >= 0, idxsBack[idxsAux], -1)
valid = idxsAux >= 0
idxsBackMatched = np.full_like(idxsAux, -1)
idxsBackMatched[valid] = idxsBack[idxsAux[valid]]

To better avoid edge cases when idxsAux=-1.

idxsMatched = np.where(idxsBackMatched == np.arange(len(injectedCat)), idxs, -1)
distMatched = np.where(idxsBackMatched == np.arange(len(injectedCat)), dist, np.inf)
nFakesFound = np.isfinite(distMatched).sum()

self.log.info("Found %d out of %d possible in diaSources.", nFakesFound, nPossibleFakes)

# assign diaSourceId to the matched fakes
diaSrcIds = diaSources['id'][np.where(np.isfinite(dist), idxs, 0)]
matchedFakes = injectedCat.assign(diaSourceId=np.where(np.isfinite(dist), diaSrcIds, 0))
matchedFakes['dist_diaSrc'] = np.where(np.isfinite(dist), 3600*np.rad2deg(dist), -1)

return Struct(matchDiaSources=matchedFakes)
diaSrcIds = diaSources['id'][np.where(np.isfinite(distMatched), idxsMatched, 0)]
matchedFakes = injectedCat.copy()
matchedFakes['diaSourceId'] = np.where(np.isfinite(distMatched), diaSrcIds, 0)
matchedFakes['dist_diaSrc'] = np.where(np.isfinite(distMatched), 3600*np.rad2deg(distMatched), -1)
return matchedFakes

def _getVectors(self, ras, decs):
"""Convert ra dec to unit vectors on the sphere.
Expand Down Expand Up @@ -350,6 +373,70 @@ def _trimFakeCat(self, fakeCat, image):

return fakeCat[isContainedRaDec & isContainedXy]

def _splitVariables(self, fakeCat):
"""Split out the duplicated injections, that are used to generate
variable sources in the fake catalog.

Parameters
----------
fakeCat : `astropy.table.table.Table`
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One too many .table I think. Below as well.

The catalog of fake sources that was input

Returns
-------
initialFakeCat : `astropy.table.table.Table`
Subset of the input catalog corresponding to initial sources.
variableDoublesFakeCat : `astropy.table.table.Table`
Subset of the input catalog corresponding to variable sources.
"""
if "twin_id" not in fakeCat.colnames:
self.log.warning("No twin_id column found in fake catalog.")
return fakeCat, None

isVariable = fakeCat["twin_id"] > 0

return fakeCat[~isVariable], fakeCat[isVariable]

def _add_variables_to_matched(self, matchedFakes, variableDoublesFakeCat):
"""Add variable sources back into the matched fakes catalog.

Parameters
----------
matchedFakes : `astropy.table.table.Table`
Catalog of matched fakes to diaSources, corresponding to the static
sources in the input fake catalog.
variableDoublesFakeCat : `astropy.table.table.Table`
Catalog of variable sources in the input fake catalog.

Returns
-------
fullMatchedFakes : `astropy.table.table.Table`
Catalog of matched fakes to diaSources, corresponding to both the
static and variable sources in the input fake catalog.
"""
if variableDoublesFakeCat is None:
return matchedFakes

# For the variable sources, we have a match to diaSources if their twins
# had a match, so we fill the diaSourceId with the diaSourceId of the matched
# twin if it exists and 0 otherwise, and we set the distance to -1 to
# indicate that these are variable sources that were not directly matched
# to diaSources.
variableDoublesFakeCat = variableDoublesFakeCat.copy()
variableDoublesFakeCat['diaSourceId'] = 0
variableDoublesFakeCat['dist_diaSrc'] = -1

# Match variable sources to their twin's matched diaSource
for i, row in enumerate(variableDoublesFakeCat):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I worry that this loop over the fake sources will get very slow for large numbers of injected fakes. Could you use astropy.table.join?

twin_id = row['twin_id']
# Find matching twin in matchedFakes
twin_matches = matchedFakes[matchedFakes['injection_id'] == twin_id]
if len(twin_matches) > 0:
variableDoublesFakeCat['diaSourceId'][i] = twin_matches['diaSourceId'][0]
variableDoublesFakeCat['dist_diaSrc'][i] = twin_matches['dist_diaSrc'][0]

return vstack([matchedFakes, variableDoublesFakeCat], metadata_conflicts='silent')


class MatchInjectedToAssocDiaSourceConnections(
PipelineTaskConnections,
Expand All @@ -371,15 +458,15 @@ class MatchInjectedToAssocDiaSourceConnections(
"diaSrc. The schema is the union of the schemas for "
"``fakeCat`` and ``diaSrc``.",
name="{fakesType}{coaddName}Diff_matchDiaSrc",
storageClass="DataFrame",
storageClass="ArrowAstropy",
dimensions=("instrument", "visit", "detector"),
)
matchAssocDiaSources = connTypes.Output(
doc="A catalog of those fakeCat sources that have a match in "
"associatedDiaSources. The schema is the union of the schemas for "
"``fakeCat`` and ``associatedDiaSources``.",
name="{fakesType}{coaddName}Diff_matchAssocDiaSrc",
storageClass="DataFrame",
storageClass="ArrowAstropy",
dimensions=("instrument", "visit", "detector"),
)

Expand Down Expand Up @@ -415,16 +502,26 @@ def run(self, assocDiaSources, matchDiaSources):
"""
# Match the fakes to the associated sources. For this we don't use the coordinates
# but instead check for the diaSources. Since they were present in the table already
if not isinstance(assocDiaSources, Table):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this necessary, now that the connection is being read as Astropy?

assocDiaSources = Table.from_pandas(assocDiaSources, index=False)

matchDiaSources["diaSourceId"] = np.asarray(matchDiaSources["diaSourceId"], dtype=np.int64)
assocDiaSources["diaSourceId"] = np.asarray(assocDiaSources["diaSourceId"], dtype=np.int64)

nPossibleFakes = len(matchDiaSources)
matchDiaSources['isAssocDiaSource'] = matchDiaSources.diaSourceId.isin(assocDiaSources.diaSourceId)
assocNFakesFound = matchDiaSources.isAssocDiaSource.sum()
matchDiaSources["isAssocDiaSource"] = np.isin(
matchDiaSources["diaSourceId"], assocDiaSources["diaSourceId"]
)
assocNFakesFound = matchDiaSources['isAssocDiaSource'].sum()
self.log.info("Found %d out of %d possible in assocDiaSources."%(assocNFakesFound, nPossibleFakes))

return Struct(
matchAssocDiaSources=matchDiaSources.merge(
assocDiaSources.reset_index(drop=True),
on="diaSourceId",
how="left",
suffixes=('_ssi', '_diaSrc')
matchAssocDiaSources=join(
matchDiaSources,
assocDiaSources,
keys="diaSourceId",
join_type="left",
table_names=("ssi", "diaSrc"),
uniq_col_name="{col_name}_{table_name}",
)
)
3 changes: 1 addition & 2 deletions tests/test_matchSourceInjected.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
#

import numpy as np
import pandas as pd
import unittest

from astropy.table import Table
Expand Down Expand Up @@ -106,7 +105,7 @@ def setUp(self):
)

# only 4 injected sources are associated
self.assocDiaSources = pd.DataFrame(
self.assocDiaSources = Table(
{
"diaSourceId": [101, 102, 103, 201, 202, 205, 207],
"band": np.repeat("r", 7),
Expand Down
Loading