Source code for stcal.outlier_detection.utils

"""Utility functions for outlier detection routines."""

import logging
import warnings

import numpy as np
from astropy.stats import sigma_clip
from astropy.utils.decorators import deprecated_renamed_argument
from drizzle.resample import blot_image
from scipy import ndimage
from skimage.util import view_as_windows

from stcal.resample.utils import calc_pixmap

log = logging.getLogger(__name__)


__all__ = [
    "medfilt",
    "compute_weight_threshold",
    "flag_crs",
    "flag_resampled_crs",
    "gwcs_blot",
]


[docs] def medfilt(arr, kern_size): """ Median filter. scipy.signal.medfilt (and many other median filters) have undefined behavior for nan inputs. See: https://github.com/scipy/scipy/issues/4800. Parameters ---------- arr : numpy.ndarray The input array kern_size : list of int List of kernel dimensions, length must be equal to arr.ndim. Returns ------- filtered_arr : numpy.ndarray Input array median filtered with a kernel of size kern_size """ padded = np.pad(arr, [[k // 2] for k in kern_size]) windows = view_as_windows(padded, kern_size, np.ones(len(kern_size), dtype="int")) with warnings.catch_warnings(): warnings.filterwarnings("ignore", "All-NaN", RuntimeWarning) return np.nanmedian(windows, axis=np.arange(-len(kern_size), 0))
[docs] def compute_weight_threshold(weight, maskpt): """ Compute the weight threshold for a single image or cube. Parameters ---------- weight : numpy.ndarray The weight array maskpt : float The percentage of the mean weight to use as a threshold for masking. Returns ------- float The weight threshold for this integration. """ with warnings.catch_warnings(): warnings.filterwarnings("ignore", "Mean of empty slice", RuntimeWarning) warnings.filterwarnings("ignore", "invalid value", RuntimeWarning) return ( np.mean( sigma_clip( weight[np.isfinite(weight) & (weight != 0)], sigma=3, maxiters=5, masked=False, copy=False, ), dtype="f8", ) * maskpt )
def _abs_deriv(array): """Take the absolute derivate of a numpy array.""" out = np.zeros_like(array) # use same dtype as input # make output values nan where input is nan (for floating point input) if np.issubdtype(array.dtype, np.floating): out[np.isnan(array)] = np.nan # compute row-wise absolute difference row_diff = np.abs(np.diff(array, axis=0)) np.putmask(out[1:], np.isfinite(row_diff), row_diff) # no need to do max yet # since these are absolute differences |r0-r1| = |r1-r0| # make a view of the target portion of the array row_offset_view = out[:-1] # compute an in-place maximum np.putmask(row_offset_view, row_diff > row_offset_view, row_diff) del row_diff # compute col-wise absolute difference col_diff = np.abs(np.diff(array, axis=1)) col_offset_view = out[:, 1:] np.putmask(col_offset_view, col_diff > col_offset_view, col_diff) col_offset_view = out[:, :-1] np.putmask(col_offset_view, col_diff > col_offset_view, col_diff) return out
[docs] def flag_crs( sci_data, sci_err, blot_data, snr, ): """ Flag outliers. Straightforward detection of outliers for non-dithered data since sci_err includes all noise sources (photon, read, and flat for baseline). Parameters ---------- sci_data : numpy.ndarray "Science" data possibly containing outliers. sci_err : numpy.ndarray Error estimates for sci_data. blot_data : numpy.ndarray Reference data used to detect outliers. snr : float Signal-to-noise ratio used during detection. Returns ------- cr_mask : numpy.ndarray Boolean array where outliers (CRs) are true. """ return np.greater(np.abs(sci_data - blot_data), snr * np.nan_to_num(sci_err))
[docs] def flag_resampled_crs( sci_data, sci_err, blot_data, snr1, snr2, scale1, scale2, backg, ): """ Detect outliers (CRs) using resampled reference data. Parameters ---------- sci_data : numpy.ndarray "Science" data possibly containing outliers sci_err : numpy.ndarray Error estimates for sci_data blot_data : numpy.ndarray Reference data used to detect outliers. snr1 : float Signal-to-noise ratio threshold used prior to smoothing. snr2 : float Signal-to-noise ratio threshold used after smoothing. scale1 : float Scale used prior to smoothing. scale2 : float Scale used after smoothing. backg : float Scalar background to subtract from the difference. Returns ------- cr_mask : numpy.ndarray boolean array where outliers (CRs) are true """ err_data = np.nan_to_num(sci_err) blot_deriv = _abs_deriv(blot_data) diff_noise = np.abs(sci_data - blot_data - backg) # Create a boolean mask based on a scaled version of # the derivative image (dealing with interpolating issues?) # and the standard n*sigma above the noise threshold1 = scale1 * blot_deriv + snr1 * err_data mask1 = np.greater(diff_noise, threshold1) # Smooth the boolean mask with a 3x3 boxcar kernel kernel = np.ones((3, 3), dtype=int) mask1_smoothed = ndimage.convolve(mask1, kernel, mode="nearest") # Create a 2nd boolean mask based on the 2nd set of # scale and threshold values threshold2 = scale2 * blot_deriv + snr2 * err_data mask2 = np.greater(diff_noise, threshold2) # Final boolean mask return mask1_smoothed & mask2
[docs] @deprecated_renamed_argument("pix_ratio", None, since="1.16.1", warning_type=DeprecationWarning) def gwcs_blot( median_data, median_wcs, blot_shape, blot_wcs, pix_ratio=None, # noqa: ARG001 fillval=0.0, pixmap_stepsize=1, pixmap_order=1, ): """ Resample the median data to recreate an input image based on the blot wcs. Parameters ---------- median_data : numpy.ndarray The data to blot. median_wcs : gwcs.wcs.WCS The wcs for the median data. blot_shape : tuple of int The target blot data shape. blot_wcs : gwcs.wcs.WCS The target/blotted wcs. fillval : float, optional Fill value for missing data. pixmap_stepsize : int, optional If ``pixmap_stepsize>1``, perform the full WCS calculation on a sparser grid and use interpolation to fill in the rest of the pixels. This option speeds up pixel map computation by reducing the number of WCS calls, though at the cost of reduced pixel map accuracy. The loss of accuracy is typically negligible if the underlying distortion correction is smooth, but if the distortion is non-smooth, ``pixmap_stepsize>1`` is not recommended. Large ``pixmap_stepsize`` values are automatically reduced to no more than 1/10 of image size. Passed to `stcal.resample.utils.calc_pixmap`. Default 1. pixmap_order : int, optional Order of the 2D spline to interpolate the sparse pixel mapping if ``pixmap_stepsize>1``. Supported values are: 1 (bilinear) or 3 (bicubic). This Parameter is ignored when ``pixmap_stepsize <= 1``. Default 1. Returns ------- blotted : numpy.ndarray The blotted median data. blot_img : datamodel Datamodel containing header and WCS to define the 'blotted' image """ # Compute the mapping between the input and output pixel coordinates pixmap = calc_pixmap(blot_wcs, median_wcs, blot_shape, stepsize=pixmap_stepsize, order=pixmap_order) log.debug(f"Pixmap shape: {pixmap[:, :, 0].shape}") log.debug(f"Sci shape: {blot_shape}") log.info(f"Blotting {blot_shape} <-- {median_data.shape}") outsci = np.full(blot_shape, fillval, dtype=np.float32) # Currently tblot cannot handle nans in the pixmap, so we need to give some # other value. -1 is not optimal and may have side effects. But this is # what we've been doing up until now, so more investigation is needed # before a change is made. Preferably, fix tblot in drizzle. pixmap[np.isnan(pixmap)] = -1 blot_image( data=median_data, pixmap=pixmap, out_img=outsci, fillval=fillval, iscale=1.0, interp="linear", sinscl=1.0, ) return outsci