microproduct/dem-sentiral/ISCEApp/site-packages/isce3/splitspectrum/splitspectrum.py

614 lines
23 KiB
Python
Raw Normal View History

2023-08-28 10:17:29 +00:00
import numpy as np
import journal
from scipy.fft import fft, ifft, fftfreq
from scipy.signal import resample
from dataclasses import dataclass
import isce3
from nisar.workflows.focus import cosine_window
@dataclass(frozen=True)
class bandpass_meta_data:
# slant range spacing
rg_pxl_spacing: float
# wavelength
wavelength: float
# sampling frequency
rg_sample_freq: float
#bandiwdth
rg_bandwidth: float
#center frequency
center_freq: float
#slant range
slant_range: 'method'
@classmethod
def load_from_slc(cls, slc_product, freq):
"""Get meta data from SLC object.
Parameters
----------
slc_product : nisar.products.readers.SLC
slc object
freq : {'A', 'B'}
frequency band
Returns
-------
meta_data : bandpass_meta_data
bandpass meta data object
"""
rdr_grid = slc_product.getRadarGrid(freq)
rg_sample_freq = isce3.core.speed_of_light * 0.5 / rdr_grid.range_pixel_spacing
rg_bandwidth = slc_product.getSwathMetadata(freq).processed_range_bandwidth
center_frequency = isce3.core.speed_of_light / rdr_grid.wavelength
return cls(rdr_grid.range_pixel_spacing, rdr_grid.wavelength, rg_sample_freq,
rg_bandwidth, center_frequency, rdr_grid.slant_range)
def check_range_bandwidth_overlap(ref_slc, sec_slc, pols):
"""Check if bandpass is needed.
If the two SLCs differ in center frequency or bandwidth, then
one SLC shall be bandpassed to a common frequency band. If
necessary, determine which SLC will be bandpassed
Parameters
----------
ref_slc : nisar.products.readers.SLC
Reference SLC object
sec_slc : nisar.products.readers.SLC
Secondary SLC object
pols : dict
Dict keying frequency ('A' or 'B') to list of polarization values.
Returns
-------
mode : dict
Dict mapping frequency band (e.g. "A" or "B") to
SLC to be bandpassed ("ref" or "sec").
"""
mode = dict()
for freq, pol_list in pols.items():
ref_meta_data = bandpass_meta_data.load_from_slc(ref_slc, freq)
sec_meta_data = bandpass_meta_data.load_from_slc(sec_slc, freq)
ref_wvl = ref_meta_data.wavelength
sec_wvl = sec_meta_data.wavelength
ref_bw = ref_meta_data.rg_bandwidth
sec_bw = sec_meta_data.rg_bandwidth
# check if two SLCs have same bandwidth and center frequency
if (ref_wvl != sec_wvl) or (ref_bw != sec_bw):
if ref_bw > sec_bw:
mode[freq] = 'ref'
else:
mode[freq] = 'sec'
return mode
class SplitSpectrum:
'''
Split the slant range spectrum
'''
def __init__(self,
rg_sample_freq,
rg_bandwidth,
center_frequency,
slant_range,
freq):
"""Initialized Bandpass Class with SLC meta data
Parameters
----------
rg_sample_freq : float
range sampling frequency
rg_bandwidth : float
range bandwidth [Hz]
center_frequency : float
center frequency of SLC [Hz]
slant_range : new center frequency for bandpass [Hz]
freq : {'A', 'B'}
frequency band
"""
self.freq = freq
self.rg_sample_freq = rg_sample_freq
self.rg_pxl_spacing = isce3.core.speed_of_light / 2.0 / self.rg_sample_freq
self.rg_bandwidth = rg_bandwidth
self.center_frequency = center_frequency
self.slant_range = slant_range
def bandpass_shift_spectrum(self,
slc_raster,
low_frequency,
high_frequency,
new_center_frequency,
window_function,
window_shape=0.25,
fft_size=None,
resampling=True
):
"""Bandpass SLC for a given bandwidth and shift the bandpassed
spectrum to a new center frequency
Parameters
----------
slc_raster : numpy.ndarray
numpy array of slc raster,
low_frequency : float
low frequency of band to be passed [Hz]
high_frequency : float
high frequency band to be passed [Hz]
new_center_frequency : float
new center frequency for new bandpassed slc [Hz]
window_function : str
window type {tukey, kaiser, cosine}
window_shape : float
parameter for the raised cosine filter (e.g. 0 ~ 1)
fft_size : int
fft size.
resampling : bool
if True, then resample SLC and meta data with new range spacing
If False, return SLC and meta with original range spacing
Returns
-------
resampled_slc or slc_demodulate: numpy.ndarray
numpy array of bandpassed slc
if resampling is True, return resampled slc with bandpass and demodulation
if resampling is False, return slc with bandpass and demodulation without resampling
meta : dict
dict containing meta data of bandpassed slc
center_frequency, rg_bandwidth, range_spacing, slant_range
"""
error_channel = journal.error('splitspectrum.bandpass_shift_spectrum')
rg_sample_freq = self.rg_sample_freq
rg_bandwidth = self.rg_bandwidth
diff_frequency = self.center_frequency - new_center_frequency
height, width = slc_raster.shape
slc_raster = np.asanyarray(slc_raster, dtype='complex')
slc_bp = self.bandpass_spectrum(
slc_raster=slc_raster,
low_frequency=low_frequency,
high_frequency=high_frequency,
window_function=window_function,
window_shape=window_shape,
fft_size=fft_size,
)
# demodulate the SLC to be baseband to new center frequency
# if fft_size > width, then crop the spectrum from 0 to width
slc_demodulate = self.demodulate_slc(slc_bp[:, :width],
diff_frequency,
rg_sample_freq)
# update metadata with new parameters
meta = dict()
new_bandwidth = high_frequency - low_frequency
meta['center_frequency'] = new_center_frequency
meta['rg_bandwidth'] = new_bandwidth
# Resampling changes the spacing and slant range
if resampling:
resampling_scale_factor = rg_bandwidth / new_bandwidth
sub_width = int(width / resampling_scale_factor)
x_cand = np.arange(1, width + 1)
# find the maximum of the multiple of resampling_scale_factor
resample_width_end = np.max(x_cand[x_cand % resampling_scale_factor == 0])
# resample SLC
resampled_slc = resample(slc_demodulate[:, :resample_width_end], sub_width, axis=1)
meta['range_spacing'] = self.rg_pxl_spacing * resampling_scale_factor
meta['slant_range'] = np.linspace(self.slant_range(0), self.slant_range(width),
sub_width, endpoint=False)
return resampled_slc, meta
else:
meta['range_spacing'] = self.rg_pxl_spacing
meta['slant_range'] = np.linspace(self.slant_range(0), self.slant_range(width),
width, endpoint=False)
return slc_demodulate, meta
def bandpass_spectrum(self,
slc_raster,
low_frequency,
high_frequency,
window_function,
window_shape=0.25,
fft_size=None,
):
"""Bandpass SLC for given center frequency and bandwidth
Parameters
----------
slc_raster : numpy.ndarray
numpy array of slc raster,
low_frequency : float
low frequency of band to be passed [Hz]
high_frequency : float
high frequency band to be passed [Hz]
window_function: str
window type {'tukey', 'kaiser', 'cosine'}
window_shape : float
parameter for the window shape
kaiser 0<= window_shape < inf
tukey and cosine 0 <= window_shape <= 1
fft_size : int
fft size.
Returns
-------
slc_bandpassed : numpy.ndarray
numpy array of bandpassed slc
"""
error_channel = journal.error('splitspectrum.bandpass_spectrum')
rg_sample_freq = self.rg_sample_freq
rg_bandwidth = self.rg_bandwidth
center_frequency = self.center_frequency
height, width = slc_raster.shape
slc_raster = np.asanyarray(slc_raster, dtype='complex')
new_bandwidth = high_frequency - low_frequency
resampling_scale_factor = rg_bandwidth / new_bandwidth
if new_bandwidth < 0:
err_str = f"Low frequency is higher than high frequency"
error_channel.log(err_str)
raise ValueError(err_str)
if fft_size is None:
fft_size = width
if fft_size < width:
err_str = f"FFT size is smaller than number of range bins"
error_channel.log(err_str)
raise ValueError(err_str)
# construct window to be deconvolved from the original SLC in freq domain
window_target = self.get_range_bandpass_window(
center_frequency=0,
freq_low=-rg_bandwidth/2,
freq_high=rg_bandwidth/2,
sampling_frequency=rg_sample_freq,
fft_size=fft_size,
window_function=window_function,
window_shape=window_shape
)
# construct window to bandpass spectrum
# for given low and high frequencies
window_bandpass = self.get_range_bandpass_window(
center_frequency=0,
freq_low=low_frequency - center_frequency,
freq_high=high_frequency - center_frequency,
sampling_frequency=rg_sample_freq,
fft_size=fft_size,
window_function=window_function,
window_shape=window_shape
)
# remove the windowing effect from the spectrum
spectrum_target = fft(slc_raster, n=fft_size) / \
window_target
# apply new bandpass window to spectrum
slc_bandpassed = ifft(spectrum_target
* window_bandpass
* np.sqrt(resampling_scale_factor), n=fft_size)
return slc_bandpassed
def demodulate_slc(self, slc_array, diff_frequency, rg_sample_freq):
""" Demodulate SLC
If diff_frequency is not zero, then the spectrum of SLC is shifted
so that the sub-band slc is demodulated to center the sub band spectrum
Parameters
----------
slc_array : numpy.ndarray
SLC raster or block of SLC raster
diff_frequency : float
difference between original and new center frequency [Hz]
rg_sample_freq : float
range sampling frequency [Hz]
Returns
-------
slc_baseband : numpy.ndarray
demodulated SLC
"""
height, width = slc_array.shape
range_time = np.arange(width) / rg_sample_freq
slc_shifted = slc_array * np.exp(1j * 2.0 * np.pi
* diff_frequency * range_time)
return slc_shifted
def freq_spectrum(self, cfrequency, dt, fft_size):
''' Return Discrete Fourier Transform sample frequencies with center frequency bias.
Parameters:
----------
cfrequency : float
Center frequency (Hz)
dt : float
Sample spacing.
fft_size : int
Window length.
Returns:
-------
freq : ndarray
Array of length fft_size containing sample frequencies.
'''
freq = cfrequency + fftfreq(fft_size, dt)
return freq
def get_range_bandpass_window(self,
center_frequency,
sampling_frequency,
fft_size,
freq_low,
freq_high,
window_function='tukey',
window_shape=0.25):
'''Get range bandpass window such as Tukey, Kaiser, cosine.
Window is constructed in frequency domain from low to high frequencies
with given window_function and shape.
Parameters
----------
center_frequency : float
Center frequency of frequency bin [Hz]
If slc is basebanded, center_frequency can be 0.
sampling_frequency : float
sampling frequency [Hz]
fft_size : int
fft size
freq_low : float
low frequency to be passed [Hz]
freq_high: float
high frequency to be passed [Hz]
window_function : str
window type {tukey, kaiser, cosine}
window_shape : float
parameter for the window shape
kaiser 0<= window_shape < inf
tukey and cosine 0 <= window_shape <= 1
Returns
-------
filter_1d : np.ndarray
one dimensional bandpass filter in frequency domain
'''
error_channel = journal.error('splitspectrum.get_range_bandpass_window')
# construct the frequency bin [Hz]
frequency = self.freq_spectrum(
cfrequency=center_frequency,
dt=1.0/sampling_frequency,
fft_size=fft_size
)
window_kind = window_function.lower()
# Windowing effect will appear from freq_low to freq_high for given frequency bin
if window_kind == 'tukey':
if not (0 <= window_shape <= 1):
err_str = f"Expected window_shape between 0 and 1, got {window_shape}."
error_channel.log(err_str)
raise ValueError(err_str)
filter_1d = self.construct_range_bandpass_tukey(
frequency_range=frequency,
freq_low=freq_low,
freq_high=freq_high,
window_shape=window_shape
)
elif window_kind == 'kaiser':
if not (window_shape > 0):
err_str = f"Expected pedestal bigger than 0, got {window_shape}."
error_channel.log(err_str)
raise ValueError(err_str)
filter_1d = self.construct_range_bandpass_kaiser(
frequency_range=frequency,
freq_low=freq_low,
freq_high=freq_high,
window_shape=window_shape
)
elif window_kind == 'cosine':
if not (0 <= window_shape <= 1):
err_str = f"Expected window_shape between 0 and 1, got {window_shape}."
error_channel.log(err_str)
raise ValueError(err_str)
filter_1d = self.construct_range_bandpass_cosine(
frequency_range=frequency,
freq_low=freq_low,
freq_high=freq_high,
window_shape=window_shape
)
else:
err_str = f"window {window_kind} not in (Kaiser, Cosine, Tukey)."
error_channel.log(err_str)
raise ValueError(err_str)
return filter_1d
def construct_range_bandpass_cosine(self,
frequency_range,
freq_low,
freq_high,
window_shape):
'''Generate a Cosine bandpass window
Parameters
----------
frequency_range : np.ndarray
Discrete Fourier Transform sample frequency range bins[Hz]
freq_low : float
low frequency to be passed [Hz]
freq_high: float
high frequency to be passed [Hz]
window_shape : float
parameter for the cosine window
Returns
-------
filter_1d : np.ndarray
one dimensional Cosine bandpass filter in frequency domain
'''
filter_1d = self._construct_range_bandpass_kaiser_cosine(frequency_range,
freq_low,
freq_high,
cosine_window,
window_shape)
return filter_1d
def construct_range_bandpass_kaiser(self,
frequency_range,
freq_low,
freq_high,
window_shape):
'''Generate a Kaiser bandpass window
Parameters
----------
frequency_range : np.ndarray
Discrete Fourier Transform sample frequency range bins[Hz]
freq_low : float
low frequency to be passed [Hz]
freq_high: float
high frequency to be passed [Hz]
window_shape : float
parameter for the kaiser window
Returns
-------
filter_1d : np.ndarray
one dimensional kaiser bandpass filter in frequency domain
'''
filter_1d = self._construct_range_bandpass_kaiser_cosine(frequency_range,
freq_low,
freq_high,
np.kaiser,
window_shape)
return filter_1d
def _construct_range_bandpass_kaiser_cosine(self,
frequency_range,
freq_low,
freq_high,
window_function,
window_shape):
'''Generate a Kaiser or cosine bandpass window
Parameters
----------
frequency_range : np.ndarray
Discrete Fourier Transform sample frequency range bins[Hz]
freq_low : float
low frequency to be passed [Hz]
freq_high: float
high frequency to be passed [Hz]
window_function : class function
window type {np.kaiser, cosine_window}
window_shape : float
parameter for the kaiser window
Returns
-------
filter_1d : np.ndarray
one dimensional kaiser bandpass filter in frequency domain
'''
subbandwidth = np.abs(freq_high - freq_low)
fft_size = len(frequency_range)
if freq_high > np.max(frequency_range):
err_str = f"High frequency is out of frequency bins."
error_channel.log(err_str)
raise ValueError(err_str)
if freq_low < np.min(frequency_range):
err_str = f"Low frequency is out of frequency bins."
error_channel.log(err_str)
raise ValueError(err_str)
# sampling frequency is 1.2 times wider than bandwith
sampling_bandwidth_ratio = 1.2
sampling_low_frequency = freq_low - (sampling_bandwidth_ratio - 1) * subbandwidth * 0.5
sampling_high_frequency = freq_high + (sampling_bandwidth_ratio - 1) * subbandwidth * 0.5
# index for low and high sampling frequency in frequency_range
idx_freq_low = np.abs(frequency_range - sampling_low_frequency).argmin()
idx_freq_high = np.abs(frequency_range - sampling_high_frequency).argmin()
if idx_freq_low >= idx_freq_high:
subband_length = idx_freq_high + fft_size - idx_freq_low + 1
else:
subband_length = idx_freq_high - idx_freq_low + 1
filter_1d = np.zeros([fft_size], dtype='complex')
# window_function is function class {np.kaiser or consine}
subwindow = window_function(subband_length, window_shape)
if idx_freq_low >= idx_freq_high:
filter_1d[idx_freq_low :] = subwindow[0 : fft_size - idx_freq_low]
filter_1d[: idx_freq_high + 1] = subwindow[fft_size - idx_freq_low:]
else:
filter_1d[idx_freq_low : idx_freq_high+1] = subwindow
return filter_1d
def construct_range_bandpass_tukey(self,
frequency_range,
freq_low,
freq_high,
window_shape):
'''Generate a Tukey (raised-cosine) window
Parameters
----------
frequency_range : np.ndarray
Discrete Fourier Transform sample frequency range [Hz]
freq_low : float
low frequency to be passed [Hz]
freq_high, : list of float
high frequency to be passed [Hz]
window_shape : float
parameter for the Tukey (raised cosine) filter
Returns
-------
filter_1d : np.ndarray
one dimensional Tukey bandpass filter in frequency domain
'''
fft_size = len(frequency_range)
freq_mid = 0.5 * (freq_low + freq_high)
subbandwidth = np.abs(freq_high - freq_low)
df = 0.5 * subbandwidth * window_shape
filter_1d = np.zeros(fft_size, dtype='complex')
for i in range(0, fft_size):
# Get the absolute value of shifted frequency
freq = frequency_range[i]
freqabs = np.abs(freq - freq_mid)
# Passband. i.e. range of frequencies that can pass through a filter
if (freq <= (freq_high - df)) and (freq >= (freq_low + df)):
filter_1d[i] = 1
# Transition region
elif ((freq < freq_low + df) and (freq >= freq_low - df)) \
or ((freq <= freq_high + df) and (freq > freq_high - df)):
filter_1d[i] = 0.5 * (1.0 + np.cos(np.pi / (subbandwidth * window_shape)
* (freqabs - 0.5 * (1.0 - window_shape) * subbandwidth)))
return filter_1d