Source code for stcal.saturation.saturation

import logging

import numpy as np
from scipy import ndimage

log = logging.getLogger(__name__)


__all__ = ["flag_saturated_pixels"]


[docs] def flag_saturated_pixels( data, gdq, pdq, sat_thresh, sat_dq, atod_limit, dqflags, n_pix_grow_sat=1, zframe=None, read_pattern=None, bias=None, ): """ Flag saturated pixels. Apply flagging for saturation based on threshold values stored in the saturation reference file data `sat_thresh` and A/D floor based on testing for 0 DN values. For A/D floor flagged groups, the DO_NOT_USE flag is also set. Parameters ---------- data : np.ndarray 4-D science array gdq : np.ndarray 4-D group dq array pdq : np.ndarray 2-D pixel dq array sat_thresh : np.ndarray Pixel-wise threshold for saturation, same shape `data` sat_dq : np.ndarray Data quality flags associated with `sat_thresh` atod_limit : int Hard DN limit of 16-bit A-to-D converter dqflags : dict A dictionary with at least the following keywords: DO_NOT_USE, SATURATED, AD_FLOOR, NO_SAT_CHECK providing a mapping between flag names and their integer values. n_pix_grow_sat : int Number of pixels that each flagged saturated pixel should be 'grown', to account for charge spilling. Default is 1. zframe : np.ndarray The ZEROFRAME 3-D array. read_pattern : List[List[float or int]] or None The times or indices of the frames composing each group. bias : np.ndarray 2-D superbias array. For use in group 2 saturation flagging for frame-averaged groups. Returns ------- gdq : np.ndarray Updated 4-D group dq array pdq : np.ndarray Updated 2-D pixel dq array """ nints, ngroups, nrows, ncols = data.shape dnu = int(dqflags["DO_NOT_USE"]) saturated = int(dqflags["SATURATED"]) ad_floor = int(dqflags["AD_FLOOR"]) no_sat_check = int(dqflags["NO_SAT_CHECK"]) # Identify pixels flagged in reference file as NO_SAT_CHECK, no_sat_check_mask = np.bitwise_and(sat_dq, no_sat_check) == no_sat_check # Flag pixels in the saturation threshold array with NO_SAT_CHECK, # and add them to to no_sat_check_mask. sat_dq[np.isnan(sat_thresh)] |= no_sat_check no_sat_check_mask |= np.isnan(sat_thresh) # Set the saturation check threshold above the A-to-D # converter limit so that they don't get flagged as saturated, for # pixels in the no_sat_check_mask. sat_thresh[no_sat_check_mask] = atod_limit + 1 # If no bias is provided set it to zero if bias is None: bias = 0.0 for ints in range(nints): # Work forward through the groups for initial pass at saturation # We want to flag saturation in all subsequent groups after # the one in which it was found. Use this boolean array to # keep a running tally of pixels that have saturated. previously_saturated = np.zeros(shape=(nrows, ncols), dtype="bool") for group in range(ngroups): plane = data[ints, group, :, :] # for saturation, the flag is set in the current plane # and all following planes. # Update the running tally of all pixels that have ever # experienced saturation to account for this. previously_saturated |= plane >= sat_thresh flagarray = (previously_saturated * saturated).astype(np.uint32) gdq[ints, group, :, :] |= flagarray # for A/D floor, the flag is only set of the current plane flaglowarray = ((plane <= 0) * (ad_floor | dnu)).astype(np.uint32) gdq[ints, group, :, :] |= flaglowarray del flagarray del flaglowarray # now, flag any pixels that border saturated pixels (not A/D floor pix) if n_pix_grow_sat > 0: gdq_slice = gdq[ints, group, :, :] _adjacent_pixels(gdq_slice, saturated, n_pix_grow_sat, inplace=True) # Work backward through the groups for a second pass at saturation # This is to flag things that actually saturated in prior groups but # were not obvious because of group averaging for group in range(ngroups - 2, -1, -1): plane = data[ints, group, :, :] thisdq = gdq[ints, group, :, :] nextdq = gdq[ints, group + 1, :, :] # Determine the dilution factor due to group averaging # No point in this step if the dilution factor is 1. In # that case, there is no way that we would have missed # saturation before but flag it now, since the threshold # would be the same. if read_pattern is not None: # Single value dilution factor for this group dilution_factor = np.mean(read_pattern[group]) / read_pattern[group][-1] if dilution_factor == 1: continue # Broadcast to array size dilution_factor = np.where(no_sat_check_mask, 1, dilution_factor) else: dilution_factor = 1 continue # Find where this plane looks like it might saturate given # the dilution factor, *and* this group did not already get # flagged as saturated or do not use, *and* the next group # was flagged as saturated. Result of the line below is a # boolean array. partial_sat = ( (plane >= sat_thresh * dilution_factor) & (thisdq & (saturated | dnu) == 0) & (nextdq & saturated != 0) ) flagarray = (partial_sat * dnu).astype(np.uint32) # Grow the newly-flagged saturating pixels if n_pix_grow_sat > 0: _adjacent_pixels(flagarray, dnu, n_pix_grow_sat, inplace=True) # Add them to the gdq array gdq[ints, group, :, :] |= flagarray # Add an additional pass to look for things saturating in the second group # that can be particularly tricky to identify if (read_pattern is not None) & (ngroups > 2): dq2 = gdq[ints, 1, :, :] dq3 = gdq[ints, 2, :, :] # Identify groups which we wouldn't expect to saturate by the third group, # on the basis of the first group scigp1 = data[ints, 0, :, :] - bias mask = ((scigp1 / np.mean(read_pattern[0])) * read_pattern[2][-1]) + bias < sat_thresh # Identify groups with suspiciously large values in the second group # by comparing the change between group 1 and 2 to the dynamic range between # the group 1 and saturation threshold. Flag any differences sufficiently large # that they could come from a saturating event in the last frame of the group. scigp2 = data[ints, 1, :, :] - data[ints, 0, :, :] mask &= scigp2 > (sat_thresh - data[ints, 0, :, :]) / len(read_pattern[1]) # Identify groups that are saturated in the third group but not yet flagged in the second gp3mask = (np.bitwise_and(dq3, saturated) != 0) & (np.bitwise_and(dq2, saturated) == 0) mask &= gp3mask # Flag the 2nd group for the pixels passing that gauntlet flagarray = (mask * dnu).astype(np.uint32) # Add them to the gdq array np.bitwise_or(gdq[ints, 1, :, :], flagarray, gdq[ints, 1, :, :]) # Check ZEROFRAME. if zframe is not None: plane = zframe[ints, :, :] flagarray, flaglowarray = _plane_saturation(plane, sat_thresh, dqflags) zdq = flagarray | flaglowarray if n_pix_grow_sat > 0: _adjacent_pixels(zdq, saturated, n_pix_grow_sat, inplace=True) plane[zdq != 0] = 0.0 zframe[ints] = plane n_sat = np.any(np.any(np.bitwise_and(gdq, saturated), axis=0), axis=0).sum() log.info("Detected %i saturated pixels", n_sat) n_floor = np.any(np.any(np.bitwise_and(gdq, ad_floor), axis=0), axis=0).sum() log.info("Detected %i A/D floor pixels", n_floor) pdq = np.bitwise_or(pdq, sat_dq) return gdq, pdq, zframe
def _adjacent_pixels(plane_gdq, saturated, n_pix_grow_sat=1, inplace=False): """ Flag pixels adjacent to saturated pixels. Parameters ---------- plane_gdq : np.ndarray The data quality flags of the current. saturated : uint8 The saturation flag. n_pix_grow_sat : int Number of pixels that each flagged saturated pixel should be 'grown', to account for charge spilling. Default is 1. inplace : bool Update plane_gdq in place, returning None? Default False. Returns ------- sat_pix : np.ndarray The saturated pixels in the current plane. """ # noqa: D205 if not inplace: cgdq = plane_gdq.copy() else: cgdq = plane_gdq only_sat = plane_gdq & saturated > 0 dilated = only_sat.copy() box_dim = (n_pix_grow_sat * 2) + 1 # The for loops below are equivalent to # # struct = np.ones((box_dim, box_dim)).astype(bool) # dilated = ndimage.binary_dilation(only_sat, structure=struct).astype(only_sat.dtype) # # The explicit loop over the box, followed by taking care of the # array edges, turns out to be faster by around an order of magnitude. # There must be poor coding in the underlying routine for # ndimage.binary_dilation as of scipy 1.14.1. for i in range(box_dim): for j in range(box_dim): # Explicit binary dilation over the inner ('valid') # region of the convolution/filter i2 = only_sat.shape[0] - box_dim + i + 1 j2 = only_sat.shape[1] - box_dim + j + 1 k1, k2, l1, l2 = [n_pix_grow_sat, -n_pix_grow_sat, n_pix_grow_sat, -n_pix_grow_sat] dilated[k1:k2, l1:l2] |= only_sat[i:i2, j:j2] for i in range(n_pix_grow_sat - 1, -1, -1): for j in range(i + n_pix_grow_sat, -1, -1): # March from the limit of the 'valid' region toward # each edge. Maximum filter ensures correct dilation. dilated[i] |= ndimage.maximum_filter(only_sat[j], box_dim) dilated[:, i] |= ndimage.maximum_filter(only_sat[:, j], box_dim) dilated[-i - 1] |= ndimage.maximum_filter(only_sat[-j - 1], box_dim) dilated[:, -i - 1] |= ndimage.maximum_filter(only_sat[:, -j - 1], box_dim) cgdq[dilated] |= saturated if inplace: return None else: return cgdq def _plane_saturation(plane, sat_thresh, dqflags): """ Check a 2D plane for saturation and A/D floor. Parameters ---------- plane : np.ndarray, 2D float The plane to check for saturation and A/D floor. sat_thresh : np.ndarray Pixel-wise threshold for saturation, same shape `data`. dqflags : dict A dictionary with at least the following keywords: DO_NOT_USE, SATURATED, AD_FLOOR, NO_SAT_CHECK Returns ------- flagarray : np.ndarray, 2D uint32 The saturation flag array. flaglowarray : np.ndarray, 2D uint32 The A/D floor flag array. """ # noqa: D205 donotuse = dqflags["DO_NOT_USE"] saturated = dqflags["SATURATED"] ad_floor = dqflags["AD_FLOOR"] flagarray = np.zeros(plane.shape, dtype=np.uint32) flaglowarray = np.zeros(plane.shape, dtype=np.uint32) # Update the 4D gdq array with the saturation flag. # check for saturation flagarray[:, :] = np.where(plane[:, :] >= sat_thresh, saturated, 0) # check for A/D floor flaglowarray[:, :] = np.where(plane[:, :] <= 0, ad_floor | donotuse, 0) return flagarray, flaglowarray