Source code for stcal.outlier_detection.utils

"""
Utility functions for outlier detection routines
"""
import warnings

import numpy as np
from astropy.stats import sigma_clip
from drizzle.cdrizzle import tblot
from scipy import ndimage
from skimage.util import view_as_windows
import gwcs

from stcal.alignment.util import wcs_bbox_from_shape

import logging
log = logging.getLogger(__name__)
log.setLevel(logging.DEBUG)


__all__ = [
    "medfilt",
    "compute_weight_threshold",
    "flag_crs",
    "flag_resampled_crs",
    "gwcs_blot",
    "calc_gwcs_pixmap",
    "reproject",
]


[docs] def medfilt(arr, kern_size): """ scipy.signal.medfilt (and many other median filters) have undefined behavior for nan inputs. See: https://github.com/scipy/scipy/issues/4800 Parameters ---------- arr : numpy.ndarray The input array kern_size : list of int List of kernel dimensions, length must be equal to arr.ndim. Returns ------- filtered_arr : numpy.ndarray Input array median filtered with a kernel of size kern_size """ padded = np.pad(arr, [[k // 2] for k in kern_size]) windows = view_as_windows(padded, kern_size, np.ones(len(kern_size), dtype='int')) with warnings.catch_warnings(): warnings.filterwarnings("ignore", "All-NaN", RuntimeWarning) return np.nanmedian(windows, axis=np.arange(-len(kern_size), 0))
[docs] def compute_weight_threshold(weight, maskpt): ''' Compute the weight threshold for a single image or cube. Parameters ---------- weight : numpy.ndarray The weight array maskpt : float The percentage of the mean weight to use as a threshold for masking. Returns ------- float The weight threshold for this integration. ''' with warnings.catch_warnings(): warnings.filterwarnings("ignore", "Mean of empty slice", RuntimeWarning) warnings.filterwarnings("ignore", "invalid value", RuntimeWarning) return np.mean( sigma_clip( weight[np.isfinite(weight) & (weight != 0)], sigma=3, maxiters=5, masked=False, copy=False, ), dtype='f8') * maskpt
def _abs_deriv(array): """Take the absolute derivate of a numpy array.""" out = np.zeros_like(array) # use same dtype as input # make output values nan where input is nan (for floating point input) if np.issubdtype(array.dtype, np.floating): out[np.isnan(array)] = np.nan # compute row-wise absolute diffference row_diff = np.abs(np.diff(array, axis=0)) np.putmask(out[1:], np.isfinite(row_diff), row_diff) # no need to do max yet # since these are absolute differences |r0-r1| = |r1-r0| # make a view of the target portion of the array row_offset_view = out[:-1] # compute an in-place maximum np.putmask(row_offset_view, row_diff > row_offset_view, row_diff) del row_diff # compute col-wise absolute difference col_diff = np.abs(np.diff(array, axis=1)) col_offset_view = out[:, 1:] np.putmask(col_offset_view, col_diff > col_offset_view, col_diff) col_offset_view = out[:, :-1] np.putmask(col_offset_view, col_diff > col_offset_view, col_diff) return out
[docs] def flag_crs( sci_data, sci_err, blot_data, snr, ): """ Straightforward detection of outliers for non-dithered data since sci_err includes all noise sources (photon, read, and flat for baseline). Parameters ---------- sci_data : numpy.ndarray "Science" data possibly containing outliers. sci_err : numpy.ndarray Error estimates for sci_data. blot_data : numpy.ndarray Reference data used to detect outliers. snr : float Signal-to-noise ratio used during detection. Returns ------- cr_mask : numpy.ndarray Boolean array where outliers (CRs) are true. """ return np.greater(np.abs(sci_data - blot_data), snr * np.nan_to_num(sci_err))
[docs] def flag_resampled_crs( sci_data, sci_err, blot_data, snr1, snr2, scale1, scale2, backg, ): """ Detect outliers (CRs) using resampled reference data. Parameters ---------- sci_data : numpy.ndarray "Science" data possibly containing outliers sci_err : numpy.ndarray Error estimates for sci_data blot_data : numpy.ndarray Reference data used to detect outliers. snr1 : float Signal-to-noise ratio threshold used prior to smoothing. snr2 : float Signal-to-noise ratio threshold used after smoothing. scale1 : float Scale used prior to smoothing. scale2 : float Scale used after smoothing. backg : float Scalar background to subtract from the difference. Returns ------- cr_mask : numpy.ndarray boolean array where outliers (CRs) are true """ err_data = np.nan_to_num(sci_err) blot_deriv = _abs_deriv(blot_data) diff_noise = np.abs(sci_data - blot_data - backg) # Create a boolean mask based on a scaled version of # the derivative image (dealing with interpolating issues?) # and the standard n*sigma above the noise threshold1 = scale1 * blot_deriv + snr1 * err_data mask1 = np.greater(diff_noise, threshold1) # Smooth the boolean mask with a 3x3 boxcar kernel kernel = np.ones((3, 3), dtype=int) mask1_smoothed = ndimage.convolve(mask1, kernel, mode='nearest') # Create a 2nd boolean mask based on the 2nd set of # scale and threshold values threshold2 = scale2 * blot_deriv + snr2 * err_data mask2 = np.greater(diff_noise, threshold2) # Final boolean mask return mask1_smoothed & mask2
[docs] def gwcs_blot(median_data, median_wcs, blot_shape, blot_wcs, pix_ratio, fillval=0.0): """ Resample the median data to recreate an input image based on the blot wcs. Parameters ---------- median_data : numpy.ndarray The data to blot. median_wcs : gwcs.wcs.WCS The wcs for the median data. blot_shape : tuple of int The target blot data shape. blot_wcs : gwcs.wcs.WCS The target/blotted wcs. pix_ratio : float Pixel ratio. fillval : float, optional Fill value for missing data. Returns ------- blotted : numpy.ndarray The blotted median data. blot_img : datamodel Datamodel containing header and WCS to define the 'blotted' image """ # Compute the mapping between the input and output pixel coordinates pixmap = calc_gwcs_pixmap(blot_wcs, median_wcs, blot_shape) log.debug("Pixmap shape: {}".format(pixmap[:, :, 0].shape)) log.debug("Sci shape: {}".format(blot_shape)) log.info('Blotting {} <-- {}'.format(blot_shape, median_data.shape)) outsci = np.full(blot_shape, fillval, dtype=np.float32) # Currently tblot cannot handle nans in the pixmap, so we need to give some # other value. -1 is not optimal and may have side effects. But this is # what we've been doing up until now, so more investigation is needed # before a change is made. Preferably, fix tblot in drizzle. pixmap[np.isnan(pixmap)] = -1 tblot(median_data, pixmap, outsci, scale=pix_ratio, kscale=1.0, interp='linear', exptime=1.0, misval=fillval, sinscl=1.0) return outsci
[docs] def calc_gwcs_pixmap(in_wcs, out_wcs, in_shape): """ Return a pixel grid map from input frame to output frame. Parameters ---------- in_wcs : gwcs.wcs.WCS Input/source wcs. out_wcs : gwcs.wcs.WCS Output/projected wcs. in_shape : list of int Input shape used to compute the input bounding box. Returns ------- pixmap : numpy.ndarray Computed pixmap. """ bb = wcs_bbox_from_shape(in_shape) log.debug("Bounding box from data shape: {}".format(bb)) grid = gwcs.wcstools.grid_from_bounding_box(bb) return np.dstack(reproject(in_wcs, out_wcs)(grid[0], grid[1]))
[docs] def reproject(wcs1, wcs2): """ Given two WCSs return a function which takes pixel coordinates in wcs1 and computes them in wcs2. It performs the forward transformation of ``wcs1`` followed by the inverse of ``wcs2``. Parameters ---------- wcs1, wcs2 : gwcs.wcs.WCS WCS objects that have `pixel_to_world_values` and `world_to_pixel_values` methods. Returns ------- _reproject : Function to compute the transformations. It takes x, y positions in ``wcs1`` and returns x, y positions in ``wcs2``. """ try: forward_transform = wcs1.pixel_to_world_values backward_transform = wcs2.world_to_pixel_values except AttributeError as err: raise TypeError("Input should be a WCS") from err def _reproject(x, y): sky = forward_transform(x, y) flat_sky = [] for axis in sky: flat_sky.append(axis.flatten()) det = backward_transform(*tuple(flat_sky)) det_reshaped = [] for axis in det: det_reshaped.append(axis.reshape(x.shape)) return tuple(det_reshaped) return _reproject