Module src.utils.normalization_utils
Functions
def apply_lookup(image_channel, lookup, n_bins, ranges)
-
Expand source code
def apply_lookup(image_channel, lookup, n_bins, ranges): """ Apply the lookup table to the image channel. Parameters: - image_channel: The image channel to transform. - lookup: The lookup table. - n_bins: Number of bins used for histogram. - ranges: Range of intensity values. """ # Map image intensities to bin indices bin_indices = ((image_channel - ranges[0]) / (ranges[1] - ranges[0])) * (n_bins - 1) bin_indices = np.clip(bin_indices.astype(int), 0, n_bins - 1) # Apply the lookup table mapped_bin_indices = lookup[bin_indices] # Map bin indices back to intensity values mapped_image = ((mapped_bin_indices / (n_bins - 1)) * (ranges[1] - ranges[0])) + ranges[0] return mapped_image.astype(image_channel.dtype)
Apply the lookup table to the image channel.
Parameters: - image_channel: The image channel to transform. - lookup: The lookup table. - n_bins: Number of bins used for histogram. - ranges: Range of intensity values.
def apply_normalization(image_channel,
frame_cdf,
reference_value,
method,
n_bins=None,
ranges=None,
current_median=None,
mask=None)-
Expand source code
def apply_normalization(image_channel, frame_cdf, reference_value, method, n_bins=None, ranges=None, current_median=None, mask=None): """ Apply the specified normalization method to the image channel. Parameters: - image_channel: The image channel to normalize. - frame_cdf: The CDF of the image channel. - reference_value: The reference data (CDF, median value, etc.). - method: The normalization method ('pdf', 'cdf', 'median'). - n_bins: Number of bins used for histogram (required for 'pdf' and 'cdf' methods). - ranges: Range of intensity values (required for 'pdf' and 'cdf' methods). - current_median: The current median value of the image channel (if available). - mask: The mask array (if needed). Returns: - Normalized image channel. """ if method in ['pdf', 'cdf']: if n_bins is None or ranges is None: raise ValueError(f"'n_bins' and 'ranges' must be provided for method '{method}'.") ref_cdf = reference_value if frame_cdf is None: print(f"Warning: Frame CDF is None for method '{method}'. Skipping normalization.") return image_channel # Validate CDF length if n_bins != len(ref_cdf): raise ValueError("Number of bins in 'n_bins' is not equal to the number of bins in the reference CDF.") # Create lookup table lookup = create_lookup(frame_cdf, ref_cdf) # Apply the lookup table transformed_channel = apply_lookup(image_channel, lookup, n_bins, ranges) return transformed_channel elif method == 'median': median_value = reference_value if current_median is None: # Compute the current median of the image channel within the mask if mask is not None: channel_values = image_channel[mask > 0] if channel_values.size == 0: print(f"Warning: No valid pixels for median normalization in channel. Skipping.") return image_channel current_median = np.median(channel_values) else: current_median = np.median(image_channel[image_channel > 0]) # Exclude zero values if needed # Avoid division by zero scaling_factor = median_value / current_median if current_median != 0 else 1 transformed_channel = image_channel * scaling_factor # Clip to the valid range (assuming 16-bit images) transformed_channel = np.clip(transformed_channel, 0, 65535) return transformed_channel.astype(image_channel.dtype) else: # If method is None or unrecognized, return the original channel return image_channel
Apply the specified normalization method to the image channel.
Parameters: - image_channel: The image channel to normalize. - frame_cdf: The CDF of the image channel. - reference_value: The reference data (CDF, median value, etc.). - method: The normalization method ('pdf', 'cdf', 'median'). - n_bins: Number of bins used for histogram (required for 'pdf' and 'cdf' methods). - ranges: Range of intensity values (required for 'pdf' and 'cdf' methods). - current_median: The current median value of the image channel (if available). - mask: The mask array (if needed).
Returns: - Normalized image channel.
def create_frame_cdf(frame, channel_configs, channel_indices)
-
Expand source code
def create_frame_cdf(frame, channel_configs, channel_indices): """ Compute the CDFs for each channel in the frame based on the mask. Parameters: - frame: The frame object containing the image and mask. - channel_configs: Dictionary containing normalization configurations per channel. - channel_indices: Dictionary mapping channel names to their indices in the image array. Returns: - A dictionary mapping channel names to their CDFs. """ # Process the mask without modifying the original frame.mask processed_mask = ((frame.mask > 0).astype(np.uint8)) * 255 # Ensure mask dimensions match image if processed_mask.ndim == 2: processed_mask = processed_mask[..., np.newaxis] total_pixels = np.sum(processed_mask > 0) if total_pixels == 0: raise ValueError("Mask has no foreground pixels.") cdfs = {} for channel_lower, config in channel_configs.items(): method = config['method'] if method not in ['pdf', 'cdf']: continue # No need to compute CDF for channels not using pdf/cdf normalization idx = channel_indices.get(channel_lower, None) if idx is None or idx >= frame.image.shape[2]: print(f"Warning: Channel '{channel_lower}' not found in image. Skipping.") continue n_bins = config['n_bins'] ranges = config['ranges'] #convert ranges to list ranges = ranges.tolist() hist = cv2.calcHist( [frame.image], [idx], processed_mask, [n_bins], ranges ).flatten() pdf = hist / total_pixels cdf = pdf_to_cdf(pdf) cdfs[channel_lower] = cdf return cdfs
Compute the CDFs for each channel in the frame based on the mask.
Parameters: - frame: The frame object containing the image and mask. - channel_configs: Dictionary containing normalization configurations per channel. - channel_indices: Dictionary mapping channel names to their indices in the image array.
Returns: - A dictionary mapping channel names to their CDFs.
def create_lookup(cdf_new, cdf_ref)
-
Expand source code
def create_lookup(cdf_new, cdf_ref): """ Create a lookup table to map the new image's intensities to match the reference CDF. """ # Ensure cdf_ref is strictly increasing for interpolation # Add a small epsilon to cdf_ref to avoid issues with duplicate values epsilon = 1e-6 cdf_ref = np.clip(cdf_ref, epsilon, 1 - epsilon) lookup_table = np.interp(cdf_new, cdf_ref, np.arange(len(cdf_ref))) return np.round(lookup_table).astype(np.uint16)
Create a lookup table to map the new image's intensities to match the reference CDF.
def load_normalization_config(hdf5_file_path, params)
-
Expand source code
def load_normalization_config(hdf5_file_path, params): """ Load normalization configuration from an HDF5 file. Returns: - A dictionary mapping channel names to their normalization method, value, n_bins, and ranges. """ if not os.path.exists(hdf5_file_path): raise FileNotFoundError(f"HDF5 file not found: {hdf5_file_path}") normalization_config = {} with h5py.File(hdf5_file_path, 'r') as h5file: for channel in params['channel_names']: channel_lower = channel.lower() if channel in h5file: group = h5file[channel] elif channel_lower in h5file: group = h5file[channel_lower] else: print(f"Warning: Channel '{channel}' not found in HDF5 file. Skipping.") continue # Read the normalization method method = group.attrs.get('method', None) if isinstance(method, bytes): method = method.decode('utf-8') # Decode bytes to string if necessary if method == 'None' or method is None: method = None # Initialize channel configuration channel_config = { 'method': method, 'value': None, 'n_bins': None, 'ranges': None } # Read the value and additional parameters if method in ['pdf', 'cdf']: # Read n_bins n_bins = group.attrs.get('n_bins', None) if n_bins is None: raise ValueError(f"Missing 'n_bins' for channel '{channel}' in HDF5 file.") channel_config['n_bins'] = int(n_bins) # Read ranges if 'ranges' in group: ranges = group['ranges'][:] if len(ranges) != 2: raise ValueError(f"'ranges' dataset for channel '{channel}' must have 2 elements.") channel_config['ranges'] = ranges else: raise ValueError(f"Missing 'ranges' dataset for channel '{channel}' in HDF5 file.") # Read value if 'value' in group: value = group['value'][:] else: print(f"Warning: 'value' dataset not found for channel '{channel}'. Skipping.") continue if method == 'pdf': # Convert PDF to CDF value = pdf_to_cdf(value.flatten()) else: value = value.flatten() channel_config['value'] = value elif method == 'median': # Read value if 'value' in group.attrs: value = group.attrs['value'] elif 'value' in group: value = group['value'][()] else: print(f"Warning: 'value' not found for channel '{channel}'. Skipping.") continue # Ensure the median value is a scalar if isinstance(value, np.ndarray): value = value.item() channel_config['value'] = value elif method is None: channel_config['value'] = None else: raise ValueError(f"Unknown normalization method '{method}' for channel '{channel}'.") normalization_config[channel_lower] = channel_config return normalization_config
Load normalization configuration from an HDF5 file.
Returns: - A dictionary mapping channel names to their normalization method, value, n_bins, and ranges.
def match_image_to_reference(frame, params, hdf5_file_path)
-
Expand source code
def match_image_to_reference(frame, params, hdf5_file_path): """ Match the image channels to the reference distributions based on the normalization configuration. Parameters: - frame: An object containing `image` (numpy array) and `mask` (numpy array). - params: Dictionary containing 'channel_names'. - hdf5_file_path: Path to the HDF5 file containing normalization configurations. Returns: - Transformed image as a numpy array. """ # Load normalization configuration from HDF5 file normalization_config = load_normalization_config(hdf5_file_path, params) # Map channel names to indices channel_names = [x.lower() for x in params['channel_names']] channel_indices = {channel: idx for idx, channel in enumerate(channel_names)} # Compute CDFs for the frame frame_cdfs = create_frame_cdf(frame, normalization_config, channel_indices) # Compute current medians frame_medians = {} for channel_lower, config in normalization_config.items(): method = config['method'] if method != 'median': continue # No need to compute medians for other methods idx = channel_indices.get(channel_lower, None) if idx is None or idx >= frame.image.shape[2]: print(f"Warning: Channel '{channel_lower}' not found in image. Skipping.") continue image_channel = frame.image[:, :, idx] channel_values = image_channel[frame.mask > 0] if channel_values.size == 0: print(f"Warning: No valid pixels for median calculation in channel '{channel_lower}'. Skipping.") continue frame_medians[channel_lower] = np.median(channel_values) # Make a copy of the image to avoid modifying the original transformed_image = frame.image.copy() # Apply normalization per channel for channel_lower, config in normalization_config.items(): method = config['method'] reference_value = config['value'] n_bins = config.get('n_bins', None) ranges = config.get('ranges', None) if method is None: print(f"Channel '{channel_lower}': No normalization applied.") continue # Skip normalization for this channel idx = channel_indices.get(channel_lower, None) if idx is None or idx >= transformed_image.shape[2]: print(f"Warning: Channel '{channel_lower}' not found in image. Skipping.") continue image_channel = transformed_image[:, :, idx] frame_cdf = frame_cdfs.get(channel_lower, None) current_median = frame_medians.get(channel_lower, None) # Apply the specified normalization method transformed_channel = apply_normalization( image_channel, frame_cdf, reference_value, method, n_bins=n_bins, ranges=ranges, current_median=current_median, mask=frame.mask ) transformed_image[:, :, idx] = transformed_channel return transformed_image
Match the image channels to the reference distributions based on the normalization configuration.
Parameters: - frame: An object containing
image
(numpy array) andmask
(numpy array). - params: Dictionary containing 'channel_names'. - hdf5_file_path: Path to the HDF5 file containing normalization configurations.Returns: - Transformed image as a numpy array.
def pdf_to_cdf(pdf)
-
Expand source code
def pdf_to_cdf(pdf): """ Convert a probability density function to a cumulative distribution function. """ cdf = np.cumsum(pdf) cdf = cdf / cdf[-1] return cdf
Convert a probability density function to a cumulative distribution function.