implementation of thermal noise removal
parent
9bfa2dd6db
commit
b1bbf0f10f
|
@ -31,6 +31,7 @@
|
|||
|
||||
import isce
|
||||
import xml.etree.ElementTree as ElementTree
|
||||
from collections import defaultdict
|
||||
import datetime
|
||||
import isceobj
|
||||
from isceobj.Util import Poly1D, Poly2D
|
||||
|
@ -121,8 +122,9 @@ class Sentinel1(Component):
|
|||
self.noiseCorrectionApplied = False
|
||||
|
||||
self.betaLUT = None
|
||||
self.noiseLUT = None
|
||||
self.gr2srLUT = None
|
||||
self.noiseRangeLUT = None
|
||||
self.noiseAzimuthLUT = None
|
||||
|
||||
|
||||
self._xml_root=None
|
||||
|
@ -637,24 +639,19 @@ class Sentinel1(Component):
|
|||
Extract Noise look up table from calibration file.
|
||||
'''
|
||||
|
||||
if not self.noiseCorrectionApplied:
|
||||
self.noiseLUT = 0.0
|
||||
return
|
||||
|
||||
|
||||
from scipy.interpolate import RectBivariateSpline
|
||||
from scipy.interpolate import interp1d, InterpolatedUnivariateSpline
|
||||
|
||||
if self.noiseXml is None:
|
||||
raise Exception('No calibration file provided')
|
||||
|
||||
if self.noiseXml.startswith('/vsizip'):
|
||||
if '.zip' in self.noiseXml:
|
||||
import zipfile
|
||||
parts = self.noiseXml.split(os.path.sep)
|
||||
zipname = os.path.join(*(parts[2:-4]))
|
||||
fname = os.path.join(*(parts[-4:]))
|
||||
parts = self.noiseXml.split('.zip/')
|
||||
zipname = parts[0] + '.zip'
|
||||
fname = parts[1]
|
||||
|
||||
try:
|
||||
with zipfile.ZipFile(zipname, 'r'):
|
||||
with zipfile.ZipFile(zipname, 'r') as zf:
|
||||
xmlstr = zf.read(fname)
|
||||
except:
|
||||
raise Exception('Could not read noise file: {0}'.format(self.calibrationXml))
|
||||
|
@ -665,33 +662,78 @@ class Sentinel1(Component):
|
|||
except:
|
||||
raise Exception('Could not read noise file: {0}'.format(self.calibrationXml))
|
||||
|
||||
if float(self.IPFversion) < 2.90:
|
||||
noise_range_vector_name = "noiseVectorList"
|
||||
noise_range_lut_name = "noiseLut"
|
||||
has_azimuth_noise_vectors = False
|
||||
self.noiseAzimuthLUT = None
|
||||
else:
|
||||
noise_range_vector_name = "noiseRangeVectorList"
|
||||
noise_range_lut_name = "noiseRangeLut"
|
||||
has_azimuth_noise_vectors = True
|
||||
|
||||
print("Extracting noise LUT's...")
|
||||
|
||||
_xml_root = ElementTree.fromstring(xmlstr)
|
||||
|
||||
node = _xml_root.find('noiseVectorList')
|
||||
num = int(node.attrib['count'])
|
||||
node = _xml_root.find(noise_range_vector_name)
|
||||
num_vectors = int(node.attrib['count'])
|
||||
|
||||
lines = []
|
||||
pixels = []
|
||||
data = None
|
||||
print("File contains {} range noise vectors.".format(num_vectors))
|
||||
|
||||
full_samples_range = np.arange(self.product.numberOfSamples)
|
||||
noise_range_lut_indices = np.zeros((num_vectors,))
|
||||
noise_range_lut_values = np.zeros((num_vectors, self.product.numberOfSamples))
|
||||
|
||||
for ii, child in enumerate(node.getchildren()):
|
||||
print("Processing range noise vector {}/{}".format(ii + 1, num_vectors))
|
||||
pixnode = child.find('pixel')
|
||||
nump = int(pixnode.attrib['count'])
|
||||
|
||||
if ii==0:
|
||||
data = np.zeros((num,nump))
|
||||
pixels = [float(x) for x in pixnode.text.split()]
|
||||
sample_pixels = [float(x) for x in pixnode.text.split()]
|
||||
|
||||
signode = child.find(noise_range_lut_name)
|
||||
vector = np.asarray([float(x) for x in signode.text.split()])
|
||||
vector_interpolator = InterpolatedUnivariateSpline(sample_pixels, vector, k=1)
|
||||
vector_interpolated = vector_interpolator(full_samples_range)
|
||||
|
||||
lines.append( int(child.find('line').text))
|
||||
signode = child.find('noiseLut')
|
||||
data[ii,:] = [float(x) for x in signode.text.split()]
|
||||
noise_range_lut_indices[ii] = int(child.find('line').text)
|
||||
noise_range_lut_values[ii] = vector_interpolated
|
||||
|
||||
fp.close()
|
||||
lines = np.array(lines)
|
||||
pixels = np.array(pixels)
|
||||
self.noiseRangeLUT = interp1d(noise_range_lut_indices, noise_range_lut_values, kind='linear', axis=0, fill_value="extrapolate")
|
||||
|
||||
if has_azimuth_noise_vectors:
|
||||
|
||||
node = _xml_root.find("noiseAzimuthVectorList")
|
||||
num_vectors = int(node.attrib['count'])
|
||||
|
||||
print("File contains {} azimuth noise blocks.".format(num_vectors))
|
||||
|
||||
noise_azimuth_lut_indices = defaultdict(list)
|
||||
noise_azimuth_lut_values = defaultdict(list)
|
||||
|
||||
for block_i, child in enumerate(node.getchildren()):
|
||||
print("Processing azimuth noise vector {}/{}".format(block_i + 1, num_vectors))
|
||||
linenode = child.find('line')
|
||||
signode = child.find("noiseAzimuthLut")
|
||||
block_range_start = int(child.find('firstRangeSample').text)
|
||||
block_range_end = int(child.find('lastRangeSample').text)
|
||||
block_azimuth_start = int(child.find('firstAzimuthLine').text)
|
||||
block_azimuth_end = int(child.find('lastAzimuthLine').text)
|
||||
block_line_index = [float(x) for x in linenode.text.split()]
|
||||
block_vector = [float(x) for x in signode.text.split()]
|
||||
|
||||
block_line_range = np.arange(block_azimuth_start, block_azimuth_end + 1)
|
||||
block_vector_interpolator = InterpolatedUnivariateSpline(block_line_index, block_vector, k=1)
|
||||
|
||||
for line in block_line_range:
|
||||
noise_azimuth_lut_indices[line].extend([block_range_start, block_range_end])
|
||||
noise_azimuth_lut_values[line].extend([block_vector_interpolator(line)] * 2)
|
||||
|
||||
self.noiseAzimuthLUT = (noise_azimuth_lut_indices, noise_azimuth_lut_values)
|
||||
|
||||
else:
|
||||
print("File contains no azimuth noise blocks.")
|
||||
|
||||
self.noiseLUT = RectBivariateSpline(lines,pixels, data, kx=1,ky=1)
|
||||
|
||||
if False:
|
||||
import matplotlib.pyplot as plt
|
||||
|
@ -711,7 +753,7 @@ class Sentinel1(Component):
|
|||
except ImportError:
|
||||
raise Exception('GDAL python bindings not found. Need this for RSAT2/ TandemX / Sentinel1.')
|
||||
|
||||
from scipy.interpolate import interp2d
|
||||
from scipy.interpolate import interp1d
|
||||
|
||||
if parse:
|
||||
self.parse()
|
||||
|
@ -744,6 +786,9 @@ class Sentinel1(Component):
|
|||
fid = open(self.output, 'wb')
|
||||
pix = np.arange(self.product.numberOfSamples)
|
||||
|
||||
if self.noiseAzimuthLUT is not None:
|
||||
noise_azimuth_lut_indices, noise_azimuth_lut_values = self.noiseAzimuthLUT
|
||||
|
||||
for ii in range(self.product.numberOfLines//100 + 1):
|
||||
ymin = int(ii*100)
|
||||
ymax = int(np.clip(ymin+100,0, self.product.numberOfLines))
|
||||
|
@ -752,19 +797,25 @@ class Sentinel1(Component):
|
|||
break
|
||||
|
||||
lin = np.arange(ymin,ymax)
|
||||
####Read in one line of data
|
||||
####Read in one block of data
|
||||
data = 1.0 * band.ReadAsArray(0, ymin, self.product.numberOfSamples, ymax-ymin)
|
||||
|
||||
lut = self.betaLUT(lin,pix,grid=True)
|
||||
|
||||
if noiseFactor != 0.0:
|
||||
noise = self.noiseLUT(lin,pix,grid=True)
|
||||
noise = self.noiseRangeLUT(lin)
|
||||
if self.noiseAzimuthLUT is not None:
|
||||
block_azimuth_noise = np.zeros_like(noise)
|
||||
for l_i, line in enumerate(lin):
|
||||
interpolator = interp1d(noise_azimuth_lut_indices[line], noise_azimuth_lut_values[line], kind='previous', fill_value="extrapolate")
|
||||
block_azimuth_noise[l_i] = interpolator(np.arange(self.product.numberOfSamples))
|
||||
noise *= block_azimuth_noise
|
||||
else:
|
||||
noise = 0.0
|
||||
|
||||
|
||||
#outdata = data
|
||||
outdata = data*data/(lut*lut) + noiseFactor * noise/(lut*lut)
|
||||
outdata = np.clip(data*data/(lut*lut) + noiseFactor * noise/(lut*lut), 0, None)
|
||||
#outdata = 10 * np.log10(outdata)
|
||||
|
||||
outdata.astype(np.float32).tofile(fid)
|
||||
|
|
Loading…
Reference in New Issue