"""Utility functions for outlier detection routines."""
import logging
import warnings
import numpy as np
from astropy.stats import sigma_clip
from astropy.utils.decorators import deprecated_renamed_argument
from drizzle.resample import blot_image
from scipy import ndimage
from skimage.util import view_as_windows
from stcal.resample.utils import calc_pixmap
log = logging.getLogger(__name__)
__all__ = [
"medfilt",
"compute_weight_threshold",
"flag_crs",
"flag_resampled_crs",
"gwcs_blot",
]
[docs]
def medfilt(arr, kern_size):
"""
Median filter.
scipy.signal.medfilt (and many other median filters) have undefined behavior
for nan inputs. See: https://github.com/scipy/scipy/issues/4800.
Parameters
----------
arr : numpy.ndarray
The input array
kern_size : list of int
List of kernel dimensions, length must be equal to arr.ndim.
Returns
-------
filtered_arr : numpy.ndarray
Input array median filtered with a kernel of size kern_size
"""
padded = np.pad(arr, [[k // 2] for k in kern_size])
windows = view_as_windows(padded, kern_size, np.ones(len(kern_size), dtype="int"))
with warnings.catch_warnings():
warnings.filterwarnings("ignore", "All-NaN", RuntimeWarning)
return np.nanmedian(windows, axis=np.arange(-len(kern_size), 0))
[docs]
def compute_weight_threshold(weight, maskpt):
"""
Compute the weight threshold for a single image or cube.
Parameters
----------
weight : numpy.ndarray
The weight array
maskpt : float
The percentage of the mean weight to use as a threshold for masking.
Returns
-------
float
The weight threshold for this integration.
"""
with warnings.catch_warnings():
warnings.filterwarnings("ignore", "Mean of empty slice", RuntimeWarning)
warnings.filterwarnings("ignore", "invalid value", RuntimeWarning)
return (
np.mean(
sigma_clip(
weight[np.isfinite(weight) & (weight != 0)],
sigma=3,
maxiters=5,
masked=False,
copy=False,
),
dtype="f8",
)
* maskpt
)
def _abs_deriv(array):
"""Take the absolute derivate of a numpy array."""
out = np.zeros_like(array) # use same dtype as input
# make output values nan where input is nan (for floating point input)
if np.issubdtype(array.dtype, np.floating):
out[np.isnan(array)] = np.nan
# compute row-wise absolute difference
row_diff = np.abs(np.diff(array, axis=0))
np.putmask(out[1:], np.isfinite(row_diff), row_diff) # no need to do max yet
# since these are absolute differences |r0-r1| = |r1-r0|
# make a view of the target portion of the array
row_offset_view = out[:-1]
# compute an in-place maximum
np.putmask(row_offset_view, row_diff > row_offset_view, row_diff)
del row_diff
# compute col-wise absolute difference
col_diff = np.abs(np.diff(array, axis=1))
col_offset_view = out[:, 1:]
np.putmask(col_offset_view, col_diff > col_offset_view, col_diff)
col_offset_view = out[:, :-1]
np.putmask(col_offset_view, col_diff > col_offset_view, col_diff)
return out
[docs]
def flag_crs(
sci_data,
sci_err,
blot_data,
snr,
):
"""
Flag outliers.
Straightforward detection of outliers for non-dithered data since
sci_err includes all noise sources (photon, read, and flat for baseline).
Parameters
----------
sci_data : numpy.ndarray
"Science" data possibly containing outliers.
sci_err : numpy.ndarray
Error estimates for sci_data.
blot_data : numpy.ndarray
Reference data used to detect outliers.
snr : float
Signal-to-noise ratio used during detection.
Returns
-------
cr_mask : numpy.ndarray
Boolean array where outliers (CRs) are true.
"""
return np.greater(np.abs(sci_data - blot_data), snr * np.nan_to_num(sci_err))
[docs]
def flag_resampled_crs(
sci_data,
sci_err,
blot_data,
snr1,
snr2,
scale1,
scale2,
backg,
):
"""
Detect outliers (CRs) using resampled reference data.
Parameters
----------
sci_data : numpy.ndarray
"Science" data possibly containing outliers
sci_err : numpy.ndarray
Error estimates for sci_data
blot_data : numpy.ndarray
Reference data used to detect outliers.
snr1 : float
Signal-to-noise ratio threshold used prior to smoothing.
snr2 : float
Signal-to-noise ratio threshold used after smoothing.
scale1 : float
Scale used prior to smoothing.
scale2 : float
Scale used after smoothing.
backg : float
Scalar background to subtract from the difference.
Returns
-------
cr_mask : numpy.ndarray
boolean array where outliers (CRs) are true
"""
err_data = np.nan_to_num(sci_err)
blot_deriv = _abs_deriv(blot_data)
diff_noise = np.abs(sci_data - blot_data - backg)
# Create a boolean mask based on a scaled version of
# the derivative image (dealing with interpolating issues?)
# and the standard n*sigma above the noise
threshold1 = scale1 * blot_deriv + snr1 * err_data
mask1 = np.greater(diff_noise, threshold1)
# Smooth the boolean mask with a 3x3 boxcar kernel
kernel = np.ones((3, 3), dtype=int)
mask1_smoothed = ndimage.convolve(mask1, kernel, mode="nearest")
# Create a 2nd boolean mask based on the 2nd set of
# scale and threshold values
threshold2 = scale2 * blot_deriv + snr2 * err_data
mask2 = np.greater(diff_noise, threshold2)
# Final boolean mask
return mask1_smoothed & mask2
[docs]
@deprecated_renamed_argument("pix_ratio", None, since="1.16.1", warning_type=DeprecationWarning)
def gwcs_blot(
median_data,
median_wcs,
blot_shape,
blot_wcs,
pix_ratio=None, # noqa: ARG001
fillval=0.0,
pixmap_stepsize=1,
pixmap_order=1,
):
"""
Resample the median data to recreate an input image based on the blot wcs.
Parameters
----------
median_data : numpy.ndarray
The data to blot.
median_wcs : gwcs.wcs.WCS
The wcs for the median data.
blot_shape : tuple of int
The target blot data shape.
blot_wcs : gwcs.wcs.WCS
The target/blotted wcs.
fillval : float, optional
Fill value for missing data.
pixmap_stepsize : int, optional
If ``pixmap_stepsize>1``, perform the full WCS calculation on a sparser
grid and use interpolation to fill in the rest of the pixels. This
option speeds up pixel map computation by reducing the number of WCS
calls, though at the cost of reduced pixel map accuracy. The loss
of accuracy is typically negligible if the underlying distortion
correction is smooth, but if the distortion is non-smooth,
``pixmap_stepsize>1`` is not recommended. Large ``pixmap_stepsize``
values are automatically reduced to no more than 1/10 of image size.
Passed to `stcal.resample.utils.calc_pixmap`. Default 1.
pixmap_order : int, optional
Order of the 2D spline to interpolate the sparse pixel mapping
if ``pixmap_stepsize>1``. Supported values are: 1 (bilinear) or 3 (bicubic).
This Parameter is ignored when ``pixmap_stepsize <= 1``. Default 1.
Returns
-------
blotted : numpy.ndarray
The blotted median data.
blot_img : datamodel
Datamodel containing header and WCS to define the 'blotted' image
"""
# Compute the mapping between the input and output pixel coordinates
pixmap = calc_pixmap(blot_wcs, median_wcs, blot_shape, stepsize=pixmap_stepsize, order=pixmap_order)
log.debug(f"Pixmap shape: {pixmap[:, :, 0].shape}")
log.debug(f"Sci shape: {blot_shape}")
log.info(f"Blotting {blot_shape} <-- {median_data.shape}")
outsci = np.full(blot_shape, fillval, dtype=np.float32)
# Currently tblot cannot handle nans in the pixmap, so we need to give some
# other value. -1 is not optimal and may have side effects. But this is
# what we've been doing up until now, so more investigation is needed
# before a change is made. Preferably, fix tblot in drizzle.
pixmap[np.isnan(pixmap)] = -1
blot_image(
data=median_data,
pixmap=pixmap,
out_img=outsci,
fillval=fillval,
iscale=1.0,
interp="linear",
sinscl=1.0,
)
return outsci