import numpy as np import journal from scipy.fft import fft, ifft, fftfreq from scipy.signal import resample from dataclasses import dataclass import isce3 from nisar.workflows.focus import cosine_window @dataclass(frozen=True) class bandpass_meta_data: # slant range spacing rg_pxl_spacing: float # wavelength wavelength: float # sampling frequency rg_sample_freq: float #bandiwdth rg_bandwidth: float #center frequency center_freq: float #slant range slant_range: 'method' @classmethod def load_from_slc(cls, slc_product, freq): """Get meta data from SLC object. Parameters ---------- slc_product : nisar.products.readers.SLC slc object freq : {'A', 'B'} frequency band Returns ------- meta_data : bandpass_meta_data bandpass meta data object """ rdr_grid = slc_product.getRadarGrid(freq) rg_sample_freq = isce3.core.speed_of_light * 0.5 / rdr_grid.range_pixel_spacing rg_bandwidth = slc_product.getSwathMetadata(freq).processed_range_bandwidth center_frequency = isce3.core.speed_of_light / rdr_grid.wavelength return cls(rdr_grid.range_pixel_spacing, rdr_grid.wavelength, rg_sample_freq, rg_bandwidth, center_frequency, rdr_grid.slant_range) def check_range_bandwidth_overlap(ref_slc, sec_slc, pols): """Check if bandpass is needed. If the two SLCs differ in center frequency or bandwidth, then one SLC shall be bandpassed to a common frequency band. If necessary, determine which SLC will be bandpassed Parameters ---------- ref_slc : nisar.products.readers.SLC Reference SLC object sec_slc : nisar.products.readers.SLC Secondary SLC object pols : dict Dict keying frequency ('A' or 'B') to list of polarization values. Returns ------- mode : dict Dict mapping frequency band (e.g. "A" or "B") to SLC to be bandpassed ("ref" or "sec"). """ mode = dict() for freq, pol_list in pols.items(): ref_meta_data = bandpass_meta_data.load_from_slc(ref_slc, freq) sec_meta_data = bandpass_meta_data.load_from_slc(sec_slc, freq) ref_wvl = ref_meta_data.wavelength sec_wvl = sec_meta_data.wavelength ref_bw = ref_meta_data.rg_bandwidth sec_bw = sec_meta_data.rg_bandwidth # check if two SLCs have same bandwidth and center frequency if (ref_wvl != sec_wvl) or (ref_bw != sec_bw): if ref_bw > sec_bw: mode[freq] = 'ref' else: mode[freq] = 'sec' return mode class SplitSpectrum: ''' Split the slant range spectrum ''' def __init__(self, rg_sample_freq, rg_bandwidth, center_frequency, slant_range, freq): """Initialized Bandpass Class with SLC meta data Parameters ---------- rg_sample_freq : float range sampling frequency rg_bandwidth : float range bandwidth [Hz] center_frequency : float center frequency of SLC [Hz] slant_range : new center frequency for bandpass [Hz] freq : {'A', 'B'} frequency band """ self.freq = freq self.rg_sample_freq = rg_sample_freq self.rg_pxl_spacing = isce3.core.speed_of_light / 2.0 / self.rg_sample_freq self.rg_bandwidth = rg_bandwidth self.center_frequency = center_frequency self.slant_range = slant_range def bandpass_shift_spectrum(self, slc_raster, low_frequency, high_frequency, new_center_frequency, window_function, window_shape=0.25, fft_size=None, resampling=True ): """Bandpass SLC for a given bandwidth and shift the bandpassed spectrum to a new center frequency Parameters ---------- slc_raster : numpy.ndarray numpy array of slc raster, low_frequency : float low frequency of band to be passed [Hz] high_frequency : float high frequency band to be passed [Hz] new_center_frequency : float new center frequency for new bandpassed slc [Hz] window_function : str window type {tukey, kaiser, cosine} window_shape : float parameter for the raised cosine filter (e.g. 0 ~ 1) fft_size : int fft size. resampling : bool if True, then resample SLC and meta data with new range spacing If False, return SLC and meta with original range spacing Returns ------- resampled_slc or slc_demodulate: numpy.ndarray numpy array of bandpassed slc if resampling is True, return resampled slc with bandpass and demodulation if resampling is False, return slc with bandpass and demodulation without resampling meta : dict dict containing meta data of bandpassed slc center_frequency, rg_bandwidth, range_spacing, slant_range """ error_channel = journal.error('splitspectrum.bandpass_shift_spectrum') rg_sample_freq = self.rg_sample_freq rg_bandwidth = self.rg_bandwidth diff_frequency = self.center_frequency - new_center_frequency height, width = slc_raster.shape slc_raster = np.asanyarray(slc_raster, dtype='complex') slc_bp = self.bandpass_spectrum( slc_raster=slc_raster, low_frequency=low_frequency, high_frequency=high_frequency, window_function=window_function, window_shape=window_shape, fft_size=fft_size, ) # demodulate the SLC to be baseband to new center frequency # if fft_size > width, then crop the spectrum from 0 to width slc_demodulate = self.demodulate_slc(slc_bp[:, :width], diff_frequency, rg_sample_freq) # update metadata with new parameters meta = dict() new_bandwidth = high_frequency - low_frequency meta['center_frequency'] = new_center_frequency meta['rg_bandwidth'] = new_bandwidth # Resampling changes the spacing and slant range if resampling: resampling_scale_factor = rg_bandwidth / new_bandwidth sub_width = int(width / resampling_scale_factor) x_cand = np.arange(1, width + 1) # find the maximum of the multiple of resampling_scale_factor resample_width_end = np.max(x_cand[x_cand % resampling_scale_factor == 0]) # resample SLC resampled_slc = resample(slc_demodulate[:, :resample_width_end], sub_width, axis=1) meta['range_spacing'] = self.rg_pxl_spacing * resampling_scale_factor meta['slant_range'] = np.linspace(self.slant_range(0), self.slant_range(width), sub_width, endpoint=False) return resampled_slc, meta else: meta['range_spacing'] = self.rg_pxl_spacing meta['slant_range'] = np.linspace(self.slant_range(0), self.slant_range(width), width, endpoint=False) return slc_demodulate, meta def bandpass_spectrum(self, slc_raster, low_frequency, high_frequency, window_function, window_shape=0.25, fft_size=None, ): """Bandpass SLC for given center frequency and bandwidth Parameters ---------- slc_raster : numpy.ndarray numpy array of slc raster, low_frequency : float low frequency of band to be passed [Hz] high_frequency : float high frequency band to be passed [Hz] window_function: str window type {'tukey', 'kaiser', 'cosine'} window_shape : float parameter for the window shape kaiser 0<= window_shape < inf tukey and cosine 0 <= window_shape <= 1 fft_size : int fft size. Returns ------- slc_bandpassed : numpy.ndarray numpy array of bandpassed slc """ error_channel = journal.error('splitspectrum.bandpass_spectrum') rg_sample_freq = self.rg_sample_freq rg_bandwidth = self.rg_bandwidth center_frequency = self.center_frequency height, width = slc_raster.shape slc_raster = np.asanyarray(slc_raster, dtype='complex') new_bandwidth = high_frequency - low_frequency resampling_scale_factor = rg_bandwidth / new_bandwidth if new_bandwidth < 0: err_str = f"Low frequency is higher than high frequency" error_channel.log(err_str) raise ValueError(err_str) if fft_size is None: fft_size = width if fft_size < width: err_str = f"FFT size is smaller than number of range bins" error_channel.log(err_str) raise ValueError(err_str) # construct window to be deconvolved from the original SLC in freq domain window_target = self.get_range_bandpass_window( center_frequency=0, freq_low=-rg_bandwidth/2, freq_high=rg_bandwidth/2, sampling_frequency=rg_sample_freq, fft_size=fft_size, window_function=window_function, window_shape=window_shape ) # construct window to bandpass spectrum # for given low and high frequencies window_bandpass = self.get_range_bandpass_window( center_frequency=0, freq_low=low_frequency - center_frequency, freq_high=high_frequency - center_frequency, sampling_frequency=rg_sample_freq, fft_size=fft_size, window_function=window_function, window_shape=window_shape ) # remove the windowing effect from the spectrum spectrum_target = fft(slc_raster, n=fft_size) / \ window_target # apply new bandpass window to spectrum slc_bandpassed = ifft(spectrum_target * window_bandpass * np.sqrt(resampling_scale_factor), n=fft_size) return slc_bandpassed def demodulate_slc(self, slc_array, diff_frequency, rg_sample_freq): """ Demodulate SLC If diff_frequency is not zero, then the spectrum of SLC is shifted so that the sub-band slc is demodulated to center the sub band spectrum Parameters ---------- slc_array : numpy.ndarray SLC raster or block of SLC raster diff_frequency : float difference between original and new center frequency [Hz] rg_sample_freq : float range sampling frequency [Hz] Returns ------- slc_baseband : numpy.ndarray demodulated SLC """ height, width = slc_array.shape range_time = np.arange(width) / rg_sample_freq slc_shifted = slc_array * np.exp(1j * 2.0 * np.pi * diff_frequency * range_time) return slc_shifted def freq_spectrum(self, cfrequency, dt, fft_size): ''' Return Discrete Fourier Transform sample frequencies with center frequency bias. Parameters: ---------- cfrequency : float Center frequency (Hz) dt : float Sample spacing. fft_size : int Window length. Returns: ------- freq : ndarray Array of length fft_size containing sample frequencies. ''' freq = cfrequency + fftfreq(fft_size, dt) return freq def get_range_bandpass_window(self, center_frequency, sampling_frequency, fft_size, freq_low, freq_high, window_function='tukey', window_shape=0.25): '''Get range bandpass window such as Tukey, Kaiser, cosine. Window is constructed in frequency domain from low to high frequencies with given window_function and shape. Parameters ---------- center_frequency : float Center frequency of frequency bin [Hz] If slc is basebanded, center_frequency can be 0. sampling_frequency : float sampling frequency [Hz] fft_size : int fft size freq_low : float low frequency to be passed [Hz] freq_high: float high frequency to be passed [Hz] window_function : str window type {tukey, kaiser, cosine} window_shape : float parameter for the window shape kaiser 0<= window_shape < inf tukey and cosine 0 <= window_shape <= 1 Returns ------- filter_1d : np.ndarray one dimensional bandpass filter in frequency domain ''' error_channel = journal.error('splitspectrum.get_range_bandpass_window') # construct the frequency bin [Hz] frequency = self.freq_spectrum( cfrequency=center_frequency, dt=1.0/sampling_frequency, fft_size=fft_size ) window_kind = window_function.lower() # Windowing effect will appear from freq_low to freq_high for given frequency bin if window_kind == 'tukey': if not (0 <= window_shape <= 1): err_str = f"Expected window_shape between 0 and 1, got {window_shape}." error_channel.log(err_str) raise ValueError(err_str) filter_1d = self.construct_range_bandpass_tukey( frequency_range=frequency, freq_low=freq_low, freq_high=freq_high, window_shape=window_shape ) elif window_kind == 'kaiser': if not (window_shape > 0): err_str = f"Expected pedestal bigger than 0, got {window_shape}." error_channel.log(err_str) raise ValueError(err_str) filter_1d = self.construct_range_bandpass_kaiser( frequency_range=frequency, freq_low=freq_low, freq_high=freq_high, window_shape=window_shape ) elif window_kind == 'cosine': if not (0 <= window_shape <= 1): err_str = f"Expected window_shape between 0 and 1, got {window_shape}." error_channel.log(err_str) raise ValueError(err_str) filter_1d = self.construct_range_bandpass_cosine( frequency_range=frequency, freq_low=freq_low, freq_high=freq_high, window_shape=window_shape ) else: err_str = f"window {window_kind} not in (Kaiser, Cosine, Tukey)." error_channel.log(err_str) raise ValueError(err_str) return filter_1d def construct_range_bandpass_cosine(self, frequency_range, freq_low, freq_high, window_shape): '''Generate a Cosine bandpass window Parameters ---------- frequency_range : np.ndarray Discrete Fourier Transform sample frequency range bins[Hz] freq_low : float low frequency to be passed [Hz] freq_high: float high frequency to be passed [Hz] window_shape : float parameter for the cosine window Returns ------- filter_1d : np.ndarray one dimensional Cosine bandpass filter in frequency domain ''' filter_1d = self._construct_range_bandpass_kaiser_cosine(frequency_range, freq_low, freq_high, cosine_window, window_shape) return filter_1d def construct_range_bandpass_kaiser(self, frequency_range, freq_low, freq_high, window_shape): '''Generate a Kaiser bandpass window Parameters ---------- frequency_range : np.ndarray Discrete Fourier Transform sample frequency range bins[Hz] freq_low : float low frequency to be passed [Hz] freq_high: float high frequency to be passed [Hz] window_shape : float parameter for the kaiser window Returns ------- filter_1d : np.ndarray one dimensional kaiser bandpass filter in frequency domain ''' filter_1d = self._construct_range_bandpass_kaiser_cosine(frequency_range, freq_low, freq_high, np.kaiser, window_shape) return filter_1d def _construct_range_bandpass_kaiser_cosine(self, frequency_range, freq_low, freq_high, window_function, window_shape): '''Generate a Kaiser or cosine bandpass window Parameters ---------- frequency_range : np.ndarray Discrete Fourier Transform sample frequency range bins[Hz] freq_low : float low frequency to be passed [Hz] freq_high: float high frequency to be passed [Hz] window_function : class function window type {np.kaiser, cosine_window} window_shape : float parameter for the kaiser window Returns ------- filter_1d : np.ndarray one dimensional kaiser bandpass filter in frequency domain ''' subbandwidth = np.abs(freq_high - freq_low) fft_size = len(frequency_range) if freq_high > np.max(frequency_range): err_str = f"High frequency is out of frequency bins." error_channel.log(err_str) raise ValueError(err_str) if freq_low < np.min(frequency_range): err_str = f"Low frequency is out of frequency bins." error_channel.log(err_str) raise ValueError(err_str) # sampling frequency is 1.2 times wider than bandwith sampling_bandwidth_ratio = 1.2 sampling_low_frequency = freq_low - (sampling_bandwidth_ratio - 1) * subbandwidth * 0.5 sampling_high_frequency = freq_high + (sampling_bandwidth_ratio - 1) * subbandwidth * 0.5 # index for low and high sampling frequency in frequency_range idx_freq_low = np.abs(frequency_range - sampling_low_frequency).argmin() idx_freq_high = np.abs(frequency_range - sampling_high_frequency).argmin() if idx_freq_low >= idx_freq_high: subband_length = idx_freq_high + fft_size - idx_freq_low + 1 else: subband_length = idx_freq_high - idx_freq_low + 1 filter_1d = np.zeros([fft_size], dtype='complex') # window_function is function class {np.kaiser or consine} subwindow = window_function(subband_length, window_shape) if idx_freq_low >= idx_freq_high: filter_1d[idx_freq_low :] = subwindow[0 : fft_size - idx_freq_low] filter_1d[: idx_freq_high + 1] = subwindow[fft_size - idx_freq_low:] else: filter_1d[idx_freq_low : idx_freq_high+1] = subwindow return filter_1d def construct_range_bandpass_tukey(self, frequency_range, freq_low, freq_high, window_shape): '''Generate a Tukey (raised-cosine) window Parameters ---------- frequency_range : np.ndarray Discrete Fourier Transform sample frequency range [Hz] freq_low : float low frequency to be passed [Hz] freq_high, : list of float high frequency to be passed [Hz] window_shape : float parameter for the Tukey (raised cosine) filter Returns ------- filter_1d : np.ndarray one dimensional Tukey bandpass filter in frequency domain ''' fft_size = len(frequency_range) freq_mid = 0.5 * (freq_low + freq_high) subbandwidth = np.abs(freq_high - freq_low) df = 0.5 * subbandwidth * window_shape filter_1d = np.zeros(fft_size, dtype='complex') for i in range(0, fft_size): # Get the absolute value of shifted frequency freq = frequency_range[i] freqabs = np.abs(freq - freq_mid) # Passband. i.e. range of frequencies that can pass through a filter if (freq <= (freq_high - df)) and (freq >= (freq_low + df)): filter_1d[i] = 1 # Transition region elif ((freq < freq_low + df) and (freq >= freq_low - df)) \ or ((freq <= freq_high + df) and (freq > freq_high - df)): filter_1d[i] = 0.5 * (1.0 + np.cos(np.pi / (subbandwidth * window_shape) * (freqabs - 0.5 * (1.0 - window_shape) * subbandwidth))) return filter_1d