#!/usr/bin/env python3 #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # Copyright 2010 California Institute of Technology. ALL RIGHTS RESERVED. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # # United States Government Sponsorship acknowledged. This software is subject to # U.S. export control laws and regulations and has been classified as 'EAR99 NLR' # (No [Export] License Required except when exporting to an embargoed country, # end user, or in support of a prohibited end use). By downloading this software, # the user agrees to comply with all applicable U.S. export laws and regulations. # The user has the responsibility to obtain export licenses, or other export # authority as may be required before exporting this software to any 'EAR99' # embargoed foreign country or citizen of those countries. # # Author: Walter Szeliga #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ from xml.etree.ElementTree import ElementTree import datetime import isceobj from isceobj.Scene.Frame import Frame from isceobj.Planet.Planet import Planet from isceobj.Orbit.Orbit import StateVector, Orbit from isceobj.Orbit.OrbitExtender import OrbitExtender from isceobj.Planet.AstronomicalHandbook import Const from iscesys.Component.Component import Component from iscesys.DateTimeUtil.DateTimeUtil import DateTimeUtil as DTUtil import os import copy import scipy.sparse as ss from concurrent.futures._base import as_completed, wait from concurrent.futures.thread import ThreadPoolExecutor from multiprocessing import Pool import math import datetime from math import sin,cos from scipy.optimize import leastsq import numpy as np sep = "\n" tab = " " lookMap = { 'R' : -1, 'L' : 1} TIFF = Component.Parameter( 'tiff', public_name='TIFF', default='', type=str, mandatory=True, doc='HJ2 tiff imagery file' ) XML = Component.Parameter( 'xml', public_name='XML', default='', type=str, mandatory=True, doc='HJ2 xml metadata file' ) ORBIT_DIRECTORY = Component.Parameter( 'orbitDirectory', public_name = 'orbit directory', default=None, type=str, mandatory=False, doc='Directory with HJ2 precise orbits') ORBIT_FILE = Component.Parameter( 'orbitFile', public_name = 'orbit file', default = None, type = str, mandatory = False, doc = 'Precise orbit file to use') from .Sensor import Sensor ####################################################################################################### # 增加轨道求解模型,为了方便轨道模型的求解,这里对轨道模型进行修改 # 这里采用《Insar原理和应用 》(刘国祥著)中基于空间定位的轨道参数计算方法一章中的内容(Gabriel and Goldstein) # 注意为了方便计算,采用基准时间的方法 ###################################################### def FindInfomationFromJson(HeaderFile_dom_json, node_path_list): """ 在Json文件中,按照指定路径解析出制定节点 """ result_node = HeaderFile_dom_json for nodename in node_path_list: result_node = result_node[nodename] return result_node def GetVectorNorm(Vecter): """ 得到向量的模 """ Vecter = Vecter.reshape(-1,1) Vecter_Norm_pow = np.matmul(Vecter.T,Vecter) return np.sqrt(Vecter_Norm_pow) def XYZOuterM2(A, B): """ 外积(叉乘),日后版本换成可以任意维度的外积运算方程 args: A:nx3 B:nx3 """ cnt = A.shape[0] C = np.zeros((cnt, 3)) C[:, 0] = A[:, 1] * B[:, 2] - A[:, 2] * B[:, 1] C[:, 1] = A[:, 2] * B[:, 0] - A[:, 0] * B[:, 2] C[:, 2] = A[:, 0] * B[:, 1] - A[:, 1] * B[:, 0] return C class SatelliteOrbit(object): def __init__(self) -> None: super().__init__() self.starttime = 1262275200.0 self.modelName="" def get_starttime(self): ''' 返回卫星轨道时间起算点 ''' return self.starttime def ReconstructionSatelliteOrbit(self, GPSPoints_list): ''' 重建卫星轨道,使用多项式拟合法 args: GPSPoints_list:GPS 卫星轨道点 return: SatelliteOrbitModel 卫星轨道模型 ''' self.SatelliteOrbitModel = None def SatelliteSpaceState(self, time_float): ''' 根据时间戳,返回对应时间的卫星的轨迹状态 args: time_float:时间戳 return: State_list:[time,Xp,Yp,Zp,Vx,Vy,Vz] ''' return None class SatelliteOrbitFitPoly(SatelliteOrbit): ''' 继承于SatelliteOribit类,为拟合多项式实现方法 ''' def __init__(self) -> None: super().__init__() self.modelName="多项式" self.polynum=4 def ReconstructionSatelliteOrbit(self, GPSPoints_list, starttime): if len(GPSPoints_list)==2: self.polynum=1 self.starttime = starttime record_count = len(GPSPoints_list) time_arr = np.zeros((record_count, 1), dtype=np.float64) # 使用np.float64只是为了精度高些;如果32位也能满足需求,请用32位 state_arr = np.zeros((record_count, 6), dtype=np.float64) A_arr = np.zeros((self.polynum+1, 6), dtype=np.float64) # 四次项 X=np.ones((record_count,self.polynum+1),dtype=np.float64) # 记录时间坐标 # 将点记录转换为自变量矩阵、因变量矩阵 for i in range(record_count): GPSPoint = GPSPoints_list[i] time_ = GPSPoint[0] - self.starttime # 为了保证精度,对时间进行缩放 X[i,:]=np.array([1,time_]) state_arr[i, :] = np.array(GPSPoint[1:],dtype=np.float64).reshape(1,6) # 空间坐标 self.model_f=[] for i in range(6): Y = state_arr[:, i].reshape(-1,1) A_arr[:,i]=np.matmul(np.matmul(np.linalg.inv(np.matmul(X.T,X)),X.T),Y)[:,0] self.A_arr=copy.deepcopy(A_arr.copy()) return self.A_arr elif len(GPSPoints_list) > 6: self.polynum=4 # 多项式的节点数,理论上是超过5个可以起算,这里为了精度选择10个点起算。 # 多项式 XA=Y ==> A=(X'X)^X'Y,其中 A 为待求系数,X为变量,Y为因变量 # 这里使用三次项多项式,共有6组参数。 # 声明自变量,因变量,系数矩阵 self.starttime = starttime record_count = len(GPSPoints_list) time_arr = np.zeros((record_count, 1), dtype=np.float64) # 使用np.float64只是为了精度高些;如果32位也能满足需求,请用32位 state_arr = np.zeros((record_count, 6), dtype=np.float64) A_arr = np.zeros((self.polynum+1, 6), dtype=np.float64) # 四次项 X=np.ones((record_count,self.polynum+1),dtype=np.float64) # 记录时间坐标 # 将点记录转换为自变量矩阵、因变量矩阵 for i in range(record_count): GPSPoint = GPSPoints_list[i] time_ = GPSPoint[0] - self.starttime # 为了保证精度,对时间进行缩放 X[i,:]=np.array([1,time_,time_**2,time_**3,time_**4]) state_arr[i, :] = np.array(GPSPoint[1:],dtype=np.float64).reshape(1,6) # 空间坐标 self.model_f=[] for i in range(6): Y = state_arr[:, i].reshape(-1,1) A_arr[:,i]=np.matmul(np.matmul(np.linalg.inv(np.matmul(X.T,X)),X.T),Y)[:,0] self.A_arr=copy.deepcopy(A_arr.copy()) ''' 测试误差 from matplotlib import pyplot label_list=['x','y','z','vx','vy','vz'] color_list=['r','g','b','gold','gray','pink'] pyplot.figure() for i in range(6): Y = state_arr[:, i] Y_predict=self.model_f[i](X) pyplot.subplot(int("23{}".format(i+1))) d=Y-Y_predict pyplot.plot(X,d,label=label_list[i],color=color_list[i]) pyplot.title("max:{}".format(np.max(d))) #self.model_f.append(interpolate.interp1d(X,Y,kind='cubic',fill_value='extrapolate')) pyplot.legend() pyplot.show() ''' return self.A_arr else: self.A_arr = None return None def SatelliteSpaceState(self, time_float): ''' 逐像素求解 根据时间戳,返回对应时间的卫星的轨迹状态,会自动计算与起算时间之差 args: time_float:时间戳 return: State_list:[time,Xp,Yp,Zp,Vx,Vy,Vz] ''' if self.model_f is None: return None result_arr=np.zeros((1,7)) time_float = time_float - self.starttime result_arr[0,0]=time_float #time_arr[0, 4] = time_arr[0, 3] * time_float ** 4 time_float=np.array([1,time_float,time_float**2,time_float**3,time_float**4]).reshape(1,5) result_arr=np.matmul(time_float,self.A_arr) return [time_float,result_arr] def getSatelliteSpaceState(self, time_array): ''' 矩阵求解 根据时间戳矩阵,返回对应时刻的卫星空间状态(位置,速度),且会自动计算与起算时间之差 args: time_array:nparray nx1 时间戳 return: SatellitSpaceStateArray:nparray nx6 状态信息 ''' if self.model_f is None: return None # 返回None,表示没有结果 if self.polynum==4: n=time_array.shape[0] result_arr_=np.zeros((n,6),dtype=np.float64) time_float = time_array - self.starttime time_float=time_float.reshape(-1) # nx1 time_arr=np.ones((time_float.shape[0],5)) # nx5 time_arr[:,1]=time_float time_arr[:,2]=time_float**2 time_arr[:,3]=time_float**3 time_arr[:,4]=time_float**4 result_arr_=np.matmul(time_arr,self.A_arr) # nx5 5x6 #time_arr[0, 4] = time_arr[0, 3] * time_float ** 4 #result_arr=result_arr_ return result_arr_ # 位置矩阵 else: n=time_array.shape[0] result_arr_=np.zeros((n,6),dtype=np.float64) time_float = time_array - self.starttime time_float=time_float.reshape(-1) # nx1 time_arr=np.ones((time_float.shape[0],self.polynum+1)) # nx5 time_arr[:,1]=time_float result_arr_=np.matmul(time_arr,self.A_arr) # nx5 5x6 #time_arr[0, 4] = time_arr[0, 3] * time_float ** 4 #result_arr=result_arr_ return result_arr_ # 位置矩阵 def ReconstructionSatelliteOrbit(GPSPoints_list, starttime): ''' 构建卫星轨道 args: GPSPoints_list:卫星轨道点 starttime:起算时间 ''' print("GPS Point",len(GPSPoints_list))#,"\n",GPSPoints_list) print("GPS Start Time ",starttime) try: SatelliteOrbitModel = SatelliteOrbitFitPoly() if SatelliteOrbitModel.ReconstructionSatelliteOrbit(GPSPoints_list, starttime=starttime) is None: return None return SatelliteOrbitModel except Exception as ex: print("ReconstructionSatelliteOrbit",ex) return None ######## # 函数列表 ######## def poly1dfunc(p,x): # 一次函数 k,b=p return k*x+b def poly2dfunc(p,x): # 二次函数 k1,k2,b=p return b+k1*x+k2*x*x def poly3dfunc(p,x): k1,k2,k3,b=p return b+k1*x+k2*x*x+k3*x*x*x def poly4dfunc(p,x): k1,k2,k3,k4,b=p return b+k1*x+k2*x**2+k3*x**3+k4*x**4 def poly5dfunc(p,x): k1,k2,k3,k4,k5,b=p return b+k1*x+k2*x**2+k3*x**3+k4*x**4+k5*x**5 def poly1derror(p,x,y): return poly1dfunc(p,x)-y def poly2derror(p,x,y): return poly2dfunc(p,x)-y def poly3derror(p,x,y): return poly3dfunc(p,x)-y def poly4derror(p,x,y): return poly4dfunc(p,x)-y def poly5derror(p,x,y): return poly5dfunc(p,x)-y class orbitVector: def __init__(self,UTCTimes,vx,vy,vz,px,py,pz,dateformat="%Y-%m-%dT%H:%M:%S.%f"): self.UTCTime=datetime.datetime.strptime(UTCTimes,dateformat) # 字符串转UTC时间 self.time_stamp=self.UTCTime.timestamp() # 时间戳 self.vx=vx self.vy=vy self.vz=vz self.px=px self.py=py self.pz=pz self.orbitVector2GG() # 将坐标进行变换 self.Check() pass def orbitVector2GG(self): Cx=self.py*self.vz-self.pz*self.vy # 叉乘 Cy=self.pz*self.vx-self.px*self.vz Cz=self.px*self.vy-self.py*self.vx C=(Cx**2+Cy**2+Cz**2)**0.5 self.rho=(self.px**2+self.py**2+self.pz**2)**0.5 self.I=math.acos(Cz/C) self.omega=math.asin(self.pz/(self.rho*math.sin(self.I))) self.Omega=math.atan(Cx*(-1)/Cy) return [self.rho,self.I,self.omega,self.Omega] def Check(self): rho=self.rho I=self.I omega=self.omega Omega=self.Omega Xp=rho*(cos(omega)*cos(Omega)-sin(omega)*sin(Omega)*cos(I)) Yp=rho*(cos(omega)*sin(Omega)+sin(omega)*cos(Omega)*cos(I)) Zp=rho*sin(Omega)*sin(I) print("计算插值:",self.UTCTime,self.px,self.py,self.pz,self.vx,self.vy,self.vz,"|",abs(Xp)-abs(self.px),abs(Yp)-abs(self.py),abs(Zp)-abs(self.pz)) class SARorbit(object): # 作为自定义轨道计算基类 # 定义几个基本方法:addvector(),createOrbit(),getTime(),getTimes() # 注意均为UTC def __init__(self): self.vectors=[] self.baseTime=None def addvector(self,UTCTime,vx,vy,vz,px,py,pz): neworbit_point=orbitVector(UTCTime,vx,vy,vz,px,py,pz) self.vectors.append(neworbit_point) if len(self.vectors)==1: self.baseTime=neworbit_point.time_stamp-1 # 基线 else: if self.baseTime>neworbit_point.time_stamp: self.baseTime=neworbit_point.time_stamp-1 def createOrbit(self): pass def getTimeOrbit(self,UTCTime): return None def getTimeOrbitStamp(self,StampTime): utcStr=datetime.datetime.fromtimestamp(StampTime).strftime("%Y-%m-%dT%H:%M:%S.%f") return self.getTimeOrbit(utcStr) def getTimeOrbits(self,UTCStartTime,UTCEndTime,orbitnum=100): # startTime_stamp=datetime.datetime.strptime(UTCStartTime,"%Y-%m-%dT%H:%M:%S.%f").timestamp()-0.2 endTime_stamp=datetime.datetime.strptime(UTCEndTime,"%Y-%m-%dT%H:%M:%S.%f").timestamp()+0.2 if startTime_stamp>endTime_stamp: raise delta_t=(endTime_stamp-startTime_stamp)*1000/orbitnum delta_t=int(delta_t)/1000 # 获取 extractOrbits=[] # temptime=startTime_stamp while temptimestartTime_s-30 and stateVectors[i].timeStamp.timestamp() < stopTime_s+30: # tempOrbit.addvector(stateVectors[i].timeStamp.strftime("%Y-%m-%dT%H:%M:%S.%f"), # stateVectors[i].xVelocity, stateVectors[i].yVelocity, stateVectors[i].zVelocity, # stateVectors[i].xPosition, stateVectors[i].yPosition, stateVectors[i].zPosition) # time_list.append(stateVectors[i].timeStamp) time_list.append([stateVectors[i].timeStamp.timestamp(), stateVectors[i].xPosition, stateVectors[i].yPosition, stateVectors[i].zPosition, stateVectors[i].xVelocity, stateVectors[i].yVelocity, stateVectors[i].zVelocity]) num += 1 # sv= StateVector() # sv.setTime(stateVectors[i].timeStamp) # sv.setPosition([stateVectors[i].xPosition, stateVectors[i].yPosition, stateVectors[i].zPosition]) # sv.setVelocity([stateVectors[i].xVelocity, stateVectors[i].yVelocity, stateVectors[i].zVelocity]) # self.frame.getOrbit().addStateVector(sv) # 插值结果写进了轨道模型中 # num+=1 model = ReconstructionSatelliteOrbit(time_list, starttime=centerTime_s) time_dif = ((stopTime_s + 30) - (startTime_s - 30)) / 1000 time = np.zeros((1000, 1)) for i in range(1000): time[i,:]=((startTime_s - 30) + time_dif * i) t = time.reshape(-1) statepoints = model.getSatelliteSpaceState(t) # print("初始插值-----------------------------------------------------") # self.frame.setSensingStart(datetime.datetime.fromtimestamp(t[2])) # self.frame.setSensingStop(datetime.datetime.fromtimestamp(t[len(t)-3])) self.frame.setSensingStart(dataStartTime) self.frame.setSensingStop(dataStopTime) diffTime = DTUtil.timeDeltaToSeconds(self.frame.sensingStop-self.frame.sensingStart)/2.0 sensingMid = self.frame.sensingStart + datetime.timedelta(microseconds=int(diffTime*1e6)) self.frame.setSensingMid(sensingMid) # planet = self.frame.instrument.platform.planet # orbExt = OrbitExtender(planet=planet) # orbExt.configure() # newOrb = orbExt.extendOrbit(tempOrbit) # tempOrbit.createOrbit() # 构建轨道模型 # newOrb=tempOrbit.getTimeOrbits(self.frame.sensingStart.strftime("%Y-%m-%dT%H:%M:%S.%f"), # self.frame.sensingStop.strftime("%Y-%m-%dT%H:%M:%S.%f"), # orbitnum=500) # for svect in newOrb: # sv= StateVector() # sv.setTime(svect.UTCTime) # sv.setPosition([svect.px,svect.py,svect.pz]) # sv.setVelocity([svect.vx,svect.vy,svect.vz]) # self.frame.getOrbit().addStateVector(sv) # 插值结果写进了轨道模型中 for i, value in zip(range(len(statepoints)), statepoints): sv= StateVector() sv.setTime(datetime.datetime.fromtimestamp(t[i])) sv.setPosition([value[0],value[1],value[2]]) sv.setVelocity([value[3],value[4],value[5]]) self.frame.getOrbit().addStateVector(sv) # 插值结果写进了轨道模型中 # print("插值后的gps点", datetime.datetime.fromtimestamp(t[i]),value[0],value[1],value[2],value[3],value[4],value[5]) print('Orbits list len is %d' %num) print('Successfully read state vectors from product XML') def extractPreciseOrbit(self, orbitfile, tstart, tend): ''' Extract precise orbits for given time-period from orbit file. ''' self.frame.getOrbit().setOrbitSource('File: ' + orbitfile) tmin = tstart - datetime.timedelta(seconds=30.) tmax = tstart + datetime.timedelta(seconds=30.) fid = open(orbitfile, 'r') for line in fid: if not line.startswith('; Position'): continue else: break for line in fid: if not line.startswith(';###END'): tstamp = convertRSTimeToDateTime(line) if (tstamp >= tmin) and (tstamp <= tmax): sv = StateVector() sv.configure() sv.setTime( tstamp) sv.setPosition( [float(x) for x in fid.readline().split()]) sv.setVelocity( [float(x) for x in fid.readline().split()]) self.frame.getOrbit().addStateVector(sv) else: fid.readline() fid.readline() dummy = fid.readline() if not dummy.startswith(';'): raise Exception('Expected line to start with ";". Got {0}'.format(dummy)) fid.close() print('Successfully read {0} state vectors from {1}'.format( len(self.frame.getOrbit()._stateVectors), orbitfile)) def extractImage(self, verbose=True): ''' Use gdal to extract the slc. ''' try: from osgeo import gdal except ImportError: raise Exception('GDAL python bindings not found. Need this for RSAT2 / TandemX / Sentinel1A.') self.parse() width = self.frame.getNumberOfSamples() lgth = self.frame.getNumberOfLines() # lineFlip = (self.product.imageAttributes.rasterAttributes.lineTimeOrdering.upper() == 'DECREASING') # pixFlip = (self.product.imageAttributes.rasterAttributes.pixelTimeOrdering.upper() == 'DECREASING') lineFlip = True pixFlip = False src = gdal.Open(self.tiff.strip(), gdal.GA_ReadOnly) cJ = np.complex64(1.0j) ####Images are small enough that we can do it all in one go - Piyush real = src.GetRasterBand(1).ReadAsArray(0,0,width,lgth) imag = src.GetRasterBand(2).ReadAsArray(0,0,width,lgth) if (real is None) or (imag is None): raise Exception('Input HJ2 SLC seems to not be a 2 band Int16 image.') data = real+cJ*imag real = None imag = None src = None if lineFlip: if verbose: print('Vertically Flipping data') data = np.flipud(data) if pixFlip: if verbose: print('Horizontally Flipping data') data = np.fliplr(data) data.tofile(self.output) #### slcImage = isceobj.createSlcImage() slcImage.setByteOrder('l') slcImage.setFilename(self.output) slcImage.setAccessMode('read') slcImage.setWidth(width) slcImage.setLength(lgth) slcImage.setXmin(0) slcImage.setXmax(width) #slcImage.renderHdr() self.frame.setImage(slcImage) def extractDoppler(self): ''' self.parse() Extract doppler information as needed by mocomp ''' ins = self.frame.getInstrument() dc = self.product.processInfo.DopplerCentroidCoefficients quadratic = {} r0 = self.frame.startingRange fs = ins.getRangeSamplingRate() tNear = 2*r0/Const.c tMid = tNear + 0.5*self.frame.getNumberOfSamples()/fs t0 = self.product.processInfo.DopplerParametersReferenceTime poly = self.product.processInfo.DopplerCentroidCoefficients fd_mid = 0.0 for kk in range(len(poly)): fd_mid += poly[kk] * (tMid - t0)**kk ####For insarApp quadratic['a'] = fd_mid / ins.getPulseRepetitionFrequency() quadratic['b'] = 0. quadratic['c'] = 0. ####For roiApp ####More accurate from isceobj.Util import Poly1D coeffs = poly dr = self.frame.getInstrument().getRangePixelSize() rref = 0.5 * Const.c * t0 # rref = Const.c * t0 r0 = self.frame.getStartingRange() norm = 0.5*Const.c/dr dcoeffs = [] for ind, val in enumerate(coeffs): dcoeffs.append( val / (norm**ind)) poly = Poly1D.Poly1D() poly.initPoly(order=len(coeffs)-1) poly.setMean( (rref - r0)/dr - 1.0) poly.setCoeffs(dcoeffs) # print("****" * 10) # print(Const.c) # print(rref) # print(dr) # print(dcoeffs) # print(r0) # print("****" * 10) poly = Poly1D.Poly1D() poly.initPoly(order=len(coeffs)-1) poly.setMean( (rref - r0)/dr - 1.0) poly.setCoeffs(dcoeffs) pix = np.linspace(0, self.frame.getNumberOfSamples(), num=len(coeffs)+1) evals = poly(pix) fit = np.polyfit(pix,evals, len(coeffs)-1) self.frame._dopplerVsPixel = list(fit[::-1]) # print("---- radar ------------------------------------------print") # print("t0",t0) # print("****" * 10) # print('lightspeed',Const.c) # print('rref',rref) # print('dr',dr) # print('dcoeff',dcoeffs) # print('r0',r0) # print("****" * 10) # print('pix',pix) # print('evals',evals) # print('fit',fit) # print('Doppler Fit: ', fit[::-1]) # print('---------------------------------------------------') return quadratic class HJ2_SLCNamespace(object): def __init__(self): self.uri = "" def elementName(self,element): return "{%s}%s" % (self.uri,element) def convertToDateTime(self,string): try: dt = datetime.datetime.strptime(string,"%Y-%m-%d %H:%M:%S.%f") return dt except: dt = datetime.datetime.strptime(string,"%Y-%m-%d %H:%M:%S") return dt def convertToDateTime_(self,string): try: dt = datetime.datetime.strptime(string,"%Y-%m-%d %H:%M:%S.%f") return dt except: dt = datetime.datetime.strptime(string,"%Y-%m-%d %H:%M:%S") return dt def convertToDateTime_S(self,string): try: dt = datetime.datetime.strptime(string,"%Y-%m-%d %H:%M:%S.%f") return dt except: dt = datetime.datetime.strptime(string,"%Y-%m-%d %H:%M:%S") return dt class _Product(HJ2_SLCNamespace): def __init__(self): HJ2_SLCNamespace.__init__(self) self.satellite = None self.orbitType = None self.direction = None self.productId = None self.documentIdentifier = None self.isZeroDopplerSteering = None self.station = None self.sensor = _Sensor() self.platform = _Platform() self.GPS = _OrbitInformation() self.ATTI = _ATTI() self.productInfo = _prodInfo() self.imageInfo = _imageInfo() self.processInfo = _processInfo() def set_from_etnode(self,node): for z in node: if z.tag == 'satellite': self.satellite = z.text elif z.tag == 'orbitType': self.orbitType = z.text elif z.tag == 'Direction': if z.text.upper() == 'DEC': self.direction = 'Descending' elif z.text.upper() == 'ASC': self.direction = 'Ascending' self.direction = z.text elif z.tag == 'productID': self.productId = z.text elif z.tag == 'DocumentIdentifier': self.documentIdentifier = z.text elif z.tag == 'IsZeroDopplerSteering': self.isZeroDopplerSteering = z.text elif z.tag == 'Station': self.station = z.text elif z.tag == 'sensor': self.sensor.set_from_etnode(z) elif z.tag == 'platform': self.platform.set_from_etnode(z) elif z.tag == 'GPS': self.GPS.set_from_etnode(z) elif z.tag == 'ATTI': self.ATTI.set_from_etnode(z) elif z.tag == 'productinfo': self.productInfo.set_from_etnode(z) elif z.tag == 'imageinfo': self.imageInfo.set_from_etnode(z) elif z.tag == 'processinfo': self.processInfo.set_from_etnode(z) def __str__(self): retstr = "Product:"+sep+tab retlst = () retstr += "productID=%s"+sep+tab retlst += (self.productId,) retstr += "documentIdentifier=%s"+sep retlst += (self.documentId,) retstr += "%s"+sep retlst += (str(self.sourceAttributes),) retstr += "%s"+sep retlst += (str(self.imageGenerationParameters),) retstr += ":Product" return retstr % retlst class _Sensor(HJ2_SLCNamespace): def __init__(self): HJ2_SLCNamespace.__init__(self) self.sensorID = None self.imagingMode = None self.lamda = None self.RadarCenterFrequency = None self.satelStartTime = None self.satelEndTime = None self.lookDirection = None self.antennaMode = None self.waveParams = _WaveParams() def set_from_etnode(self,node): for z in node: if z.tag == 'sensorID': self.sensorID = z.text elif z.tag == 'imagingMode': self.imagingMode = z.text # elif z.tag == 'lamda': self.lamda = float(z.text) elif z.tag == 'RadarCenterFrequency': self.RadarCenterFrequency = float(z.text) * 1e9 # GHZ转HZ elif z.tag == 'satelliteTime': satelliteTime = z for n in satelliteTime: if n.tag == 'start': self.satelStartTime = self.convertToDateTime_(n.text) elif n.tag == 'end': self.satelEndTime = self.convertToDateTime_(n.text) elif z.tag == 'waveParams': self.waveParams.set_from_etnode(z) elif z.tag == 'lookDirection': self.lookDirection = z.text elif z.tag == 'antennaMode': self.antennaMode = z.text def __str__(self): retstr = "_Sensor:%s"+sep+tab retlst = () retstr = "sensorID:%s"+sep+tab retlst = (self.sensorID,) retstr += "imagingMode=%s"+sep+tab retlst += (self.imagingMode,) retstr += "lamda=%s"+sep+tab retlst += (self.lamda,) retstr += "RadarCenterFrequency=%s"+sep+tab retlst += (self.RadarCenterFrequency,) retstr += "satelStartTime:%s"+sep+tab retlst += (self.satelStartTime,) retstr += "satelEndTime:%s"+sep+tab retlst += (self.satelEndTime,) retstr += "%s" retlst += (str(self.waveParams),) retstr += ":Sensor" return retstr % retlst class _WaveParams(HJ2_SLCNamespace): def __init__(self): HJ2_SLCNamespace.__init__(self) self.waveCode = None self.startLookAngle = None self.centerLookAngle = None self.endLookAngle = None self.prf = None self.proBandwidth = None self.sampleRate = None self.sampleDelay = None self.bandWidth = None self.pulseWidth = None self.frameLength = None self.valueMGC = [] self.groundVelocity = None self.averageAltitude = None def set_from_etnode(self,node): i = 0 for w in node: if w.tag == 'wave': wave = w for z in wave: if z.tag == 'waveCode': self.waveCode = z.text elif z.tag == 'startLookAngle': self.startLookAngle = float(z.text) elif z.tag == 'centerLookAngle': self.centerLookAngle = float(z.text) elif z.tag == 'endLookAngle': self.endLookAngle = float(z.text) elif z.tag == 'prf': self.prf = float(z.text) elif z.tag == 'proBandwidth': self.proBandwidth = float(z.text) elif z.tag == 'sampleRate': self.sampleRate = float(z.text) elif z.tag == 'sampleDelay': self.sampleDelay = float(z.text) elif z.tag == 'bandWidth': self.bandWidth = float(z.text) * 1e6 #MHZ转HZ elif z.tag == 'pulseWidth': self.pulseWidth = float(z.text) * 1e-6 #us转为s elif z.tag == 'frameLength': self.frameLength = int(z.text) # elif z.tag == 'valueMGC': # for value in z: # self.valueMGC.append(float(value.text)) elif z.tag == 'groundVelocity': self.groundVelocity = float(z.text) elif z.tag == 'averageAltitude': self.averageAltitude = float(z.text) def __str__(self): retstr = "_WaveParams:"+sep+tab retlst = () retstr += "centerLookAngle=%s"+sep+tab retlst += (self.centerLookAngle,) retstr += "prf=%s"+sep+tab retlst += (self.prf,) retstr += "proBandwidth=%s"+sep+tab retlst += (self.proBandwidth,) retstr += "pulseWidth=%s"+sep+tab retlst += (self.pulseWidth,) retstr += "groundVelocity=%s"+sep retlst += (self.groundVelocity,) retstr += ":RadarParameters"+sep return retstr % retlst class _Platform(HJ2_SLCNamespace): def __init__(self): HJ2_SLCNamespace.__init__(self) self.CenterTime = None self.Rs = None self.satVelocity = None self.RollAngle = None self.PitchAngle = None self.YawAngle = None self.Xs = None self.Ys = None self.Zs = None self.Vxs = None self.Vys = None self.Vzs = None def set_from_etnode(self,node): for z in node: if z.tag == 'CenterTime': self.CenterTime = self.convertToDateTime_(z.text) elif z.tag == 'Rs': self.Rs = float(z.text) elif z.tag == 'satVelocity': self.satVelocity = float(z.text) elif z.tag == 'RollAngle': self.RollAngle = float(z.text) elif z.tag == 'PitchAngle': self.PitchAngle = float(z.text) elif z.tag == 'YawAngle': self.YawAngle = float(z.text) elif z.tag == 'Xs': self.Xs = float(z.text) elif z.tag == 'Ys': self.Ys = float(z.text) elif z.tag == 'Zs': self.Zs = float(z.text) elif z.tag == 'Vxs': self.Vxs = float(z.text) elif z.tag == 'Vys': self.Vys = float(z.text) elif z.tag == 'Vzs': self.Vzs = float(z.text) def __str__(self): retstr = "_Platform:"+sep+tab retlst = () retstr += "CenterTime=%s"+sep+tab retlst += (self.CenterTime,) retstr += "Rs=%s"+sep+tab retlst += (self.Rs,) retstr += "RollAngle=%s"+sep+tab retlst += (self.RollAngle,) retstr += "PitchAngle=%s"+sep+tab retlst += (self.PitchAngle,) retstr += "YawAngle=%s"+sep+tab retlst += (self.YawAngle,) retstr += sep+":_Platform" return retstr % retlst class _ATTI(HJ2_SLCNamespace): def __init__(self): HJ2_SLCNamespace.__init__(self) self.ATTIParam = _ATTIParam() def set_from_etnode(self,node): for z in node: if z.tag == 'ATTIParam': self.ATTIParam.set_from_etnode(z) def __str__(self): return "" class _prodInfo(HJ2_SLCNamespace): def __init__(self): HJ2_SLCNamespace.__init__(self) self.WidthInMeters = None self.productLevel = None self.productType = None self.productFormat = None self.productGentime = None self.productPolar = None self.NominalResolution = None def set_from_etnode(self,node): for z in node: if z.tag == 'NominalResolution': self.NominalResolution = float(z.text) elif z.tag == 'WidthInMeters': self.WidthInMeters = float(z.text) if z.tag == 'productLevel': self.productLevel = 1 # 设置默认值 elif z.tag == 'productType': self.productType = z.text elif z.tag == 'productFormat': self.productFormat = z.text elif z.tag == 'productGentime': self.productGentime = self.convertToDateTime_S(z.text) elif z.tag == 'productPolar': self.productPolar = z.text def __str__(self): retstr = "OrbitAndAttitude:"+sep retlst = () retstr += "NominalResolution=%s"+sep+tab retlst += (self.NominalResolution,) retstr += "WidthInMeters=%s"+sep+tab retlst += (self.WidthInMeters,) retstr += "productType=%s"+sep+tab retlst += (self.productType,) retstr += "productGentime=%s"+sep+tab retlst += (self.productGentime,) retstr += "productPolar=%s"+sep+tab retlst += (self.productPolar,) retstr += ":OrbitAndAttitude"+sep return retstr % retlst class _OrbitInformation(HJ2_SLCNamespace): def __init__(self): HJ2_SLCNamespace.__init__(self) self.passDirection = None self.orbitDataSource = None self.orbitDataFile = None self.stateVectors = [] self.gpsParam = [] def set_from_etnode(self,node): for z in node: if z.tag == self.elementName('passDirection'): self.passDirection = z.text elif z.tag == self.elementName('orbitDataSource'): self.orbitDataSource = z.text elif z.tag == self.elementName('orbitDataFile'): self.orbitDataFile = z.text elif z.tag == self.elementName('stateVector'): sv = _StateVector() sv.set_from_etnode(z) self.stateVectors.append(sv) elif z.tag == 'GPSParam': gps = _StateVector() gps.set_from_etnode(z) self.gpsParam.append(gps) def __str__(self): retstr = "OrbitInformation:"+sep+tab retlst = () retstr += "passDirection=%s"+sep+tab retlst += (self.passDirection,) retstr += "orbitDataSource=%s"+sep+tab retlst += (self.orbitDataSource,) retstr += "orbitDataFile=%s"+sep retlst += (self.orbitDataFile,) retstr += ":OrbitInformation"+sep return retstr % retlst class _StateVector(HJ2_SLCNamespace): def __init__(self): HJ2_SLCNamespace.__init__(self) self.timeStamp = None self.xPosition = None self.yPosition = None self.zPosition = None self.xVelocity = None self.yVelocity = None self.zVelocity = None def set_from_etnode(self,node): for z in node: if z.tag == self.elementName('timeStamp'): self.timeStamp = self.convertToDateTime(z.text) if z.tag == 'TimeStamp': self.timeStamp = self.convertToDateTime_(z.text) elif z.tag == 'xPosition': self.xPosition = float(z.text) elif z.tag == 'yPosition': self.yPosition = float(z.text) elif z.tag == 'zPosition': self.zPosition = float(z.text) elif z.tag == 'xVelocity': self.xVelocity = float(z.text) elif z.tag == 'yVelocity': self.yVelocity = float(z.text) elif z.tag == 'zVelocity': self.zVelocity = float(z.text) def __str__(self): retstr = "StateVector:"+sep+tab retlst = () retstr += "timeStamp=%s"+sep+tab retlst += (self.timeStamp,) retstr += "xPosition=%s"+sep+tab retlst += (self.xPosition,) retstr += "yPosition=%s"+sep+tab retlst += (self.yPosition,) retstr += "zPosition=%s"+sep+tab retlst += (self.zPosition,) retstr += "xVelocity=%s"+sep+tab retlst += (self.xVelocity,) retstr += "yVelocity=%s"+sep+tab retlst += (self.yVelocity,) retstr += "zVelocity=%s"+sep+tab retlst += (self.zVelocity,) retstr += sep+":StateVector" return retstr % retlst class _imageInfo(HJ2_SLCNamespace): def __init__(self): HJ2_SLCNamespace.__init__(self) self.imagingStartTime = None self.imagingEndTime = None self.nearRange = None self.refRange = None self.eqvFs = None self.eqvPRF = None self.center = [0, 0] self.corner = _corner() self.width = None self.height = None self.widthspace = None self.heightspace = None self.QualifyValue = [0, 0, 0, 0] def set_from_etnode(self,node): for z in node: if z.tag == 'imagingTime': imagingTime = z for t in imagingTime: if t.tag == 'start': self.imagingStartTime = self.convertToDateTime_(t.text) if t.tag == 'end': self.imagingEndTime = self.convertToDateTime_(t.text) elif z.tag == 'nearRange': self.nearRange = float(z.text) elif z.tag == 'refRange': self.refRange = float(z.text) elif z.tag == 'eqvFs': self.eqvFs = float(z.text) # HJ2 的单位是 Hz elif z.tag == 'eqvPRF': self.eqvPRF = float(z.text) elif z.tag == 'center': center = z for c in center: if c.tag == 'latitude': self.center[1] = float(c.text) elif c.tag == 'longitude': self.center[0] = float(c.text) elif z.tag == 'corner': self.corner.set_from_etnode(z) elif z.tag == 'width': self.width = int(z.text) elif z.tag == 'height': self.height = int(z.text) elif z.tag == 'widthspace': self.widthspace = float(z.text) elif z.tag == 'heightspace': self.heightspace = float(z.text) elif z.tag == 'QualifyValue': QualifyValue = z for value in QualifyValue: if value.tag == 'HH': self.QualifyValue[0] = float(value.text) elif value.tag == 'HV': self.QualifyValue[1] = float(value.text) elif value.tag == 'VH': self.QualifyValue[2] = float(value.text) elif value.tag == 'VV': self.QualifyValue[3] = float(value.text) def __str__(self): retstr = "_ImageInfo:"+sep+tab retlst = () retstr += "nearRange=%s"+sep+tab retlst += (self.nearRange,) retstr += "refRange=%s"+sep+tab retlst += (self.refRange,) retstr += "eqvFs=%s"+sep+tab retlst += (self.eqvFs,) retstr += "eqvPRF=%s"+sep+tab retlst += (self.eqvPRF,) retstr += "width=%s"+sep+tab retlst += (self.width,) retstr += "height=%s"+sep+tab retlst += (self.height,) retstr += "widthspace=%s"+sep+tab retlst += (self.widthspace,) retstr += "heightspace=%s"+sep+tab retlst += (self.heightspace,) retstr += ":_ImageInfo"+sep return retstr % retlst class _ATTIParam(HJ2_SLCNamespace): def __init__(self): HJ2_SLCNamespace.__init__(self) self.timeStamp = None self.yawAngle = None self.rollAngle = None self.pitchAngle = None def set_from_etnode(self,node): for z in node: if z.tag == 'TimeStamp': self.timeStamp = self.convertToDateTime_(z.text) elif z.tag == 'yawAngle': self.yaw = float(z.text) elif z.tag == 'rollAngle': self.roll = float(z.text) elif z.tag == 'pitchAngle': self.pitch = float(z.text) def __str__(self): retstr = "_ATTIParam:"+sep+tab retlst = () retstr += "TimeStamp=%s"+sep+tab retlst += (self.timeStamp,) retstr += "yawAngle=%s"+sep+tab retlst += (self.yaw,) retstr += "rollAngle=%s"+sep+tab retlst += (self.roll,) retstr += "pitchAngle=%s"+sep+tab retlst += (self.pitch,) retstr += sep+":__ATTIParam" return retstr % retlst class _corner(HJ2_SLCNamespace): def __init__(self): HJ2_SLCNamespace.__init__(self) self.topLeft = [0, 0] self.topRight = [0, 0] self.bottomLeft = [0, 0] self.bottomRight = [0, 0] def set_from_etnode(self,node): for z in node: if z.tag == 'topLeft': topLeft = z for p in topLeft: if p.tag == 'latitude': self.topLeft[1] = float(p.text) elif p.tag == 'longitude': self.topLeft[0] = float(p.text) elif z.tag == 'topRight': topRight = z for p in topRight: if p.tag == 'latitude': self.topRight[1] = float(p.text) elif p.tag == 'longitude': self.topRight[0] = float(p.text) elif z.tag == 'bottomLeft': bottomLeft = z for p in bottomLeft: if p.tag == 'latitude': self.bottomLeft[1] = float(p.text) elif p.tag == 'longitude': self.bottomLeft[0] = float(p.text) elif z.tag == 'bottomRight': bottomRight = z for p in bottomRight: if p.tag == 'latitude': self.bottomRight[1] = float(p.text) elif p.tag == 'longitude': self.bottomRight[0] = float(p.text) def __str__(self): retstr = "_Corner:"+sep retlst = () retstr += ":_Corner" return retstr % retlst class _processInfo(HJ2_SLCNamespace): def __init__(self): HJ2_SLCNamespace.__init__(self) self.RangeWeightType = None self.RangeWeightPara = [] self.AzimuthWeightType = None self.AzimuthWeightPara = [] self.incidenceAngleNearRange = None self.incidenceAngleFarRange = None self.RangeLookBandWidth = None self.AzimuthLookBandWidth = None self.TotalProcessedAzimuthBandWidth = None self.DopplerParametersReferenceTime = None self.DopplerCentroidCoefficients = [0, 1, 2, 3, 4] self.DopplerRateValuesCoefficients = [0, 1, 2, 3, 4] def set_from_etnode(self,node): for z in node: if z.tag == 'RangeWeightType': self.RangeWeightType = int(z.text) elif z.tag == 'RangeWeightPara': self.RangeWeightPara = z.text.split(',') elif z.tag == 'AzimuthWeightType': self.AzimuthWeightType = float(z.text) elif z.tag == 'AzimuthWeightPara': self.AzimuthWeightPara = z.text.split(',') elif z.tag == 'incidenceAngleNearRange': self.incidenceAngleNearRange = float(z.text) elif z.tag == 'incidenceAngleFarRange': self.incidenceAngleFarRange = float(z.text) elif z.tag == 'RangeLookBandWidth': self.RangeLookBandWidth = float(z.text) elif z.tag == 'AzimuthLookBandWidth': self.AzimuthLookBandWidth = float(z.text) elif z.tag == 'TotalProcessedAzimuthBandWidth': self.TotalProcessedAzimuthBandWidth = float(z.text) elif z.tag == 'DopplerParametersReferenceTime': self.DopplerParametersReferenceTime = float(z.text) * 1e-6 # elif z.tag == 'DopplerCentroidCoefficients': DopplerCentroidCoefficient = z for value in DopplerCentroidCoefficient: if value.tag == 'd0': self.DopplerCentroidCoefficients[0] = float(value.text) elif value.tag == 'd1': self.DopplerCentroidCoefficients[1] = float(value.text) elif value.tag == 'd2': self.DopplerCentroidCoefficients[2] = float(value.text) elif value.tag == 'd3': self.DopplerCentroidCoefficients[3] = float(value.text) elif value.tag == 'd4': self.DopplerCentroidCoefficients[4] = float(value.text) elif z.tag == 'DopplerRateValuesCoefficients': DopplerRateValuesCoefficient = z for value in DopplerRateValuesCoefficient: if value.tag == 'r0': self.DopplerRateValuesCoefficients[0] = float(value.text) elif value.tag == 'r1': self.DopplerRateValuesCoefficients[1] = float(value.text) elif value.tag == 'r2': self.DopplerRateValuesCoefficients[2] = float(value.text) elif value.tag == 'r3': self.DopplerRateValuesCoefficients[3] = float(value.text) elif value.tag == 'r4': self.DopplerRateValuesCoefficients[4] = float(value.text) def __str__(self): retstr = "_ProcessInfo:"+sep+tab retlst = () retstr += "incidenceAngleNearRange=%s"+sep+tab retlst += (self.incidenceAngleNearRange,) retstr += "incidenceAngleFarRange=%s"+sep+tab retlst += (self.incidenceAngleFarRange,) retstr += "DopplerParametersReferenceTime=%s"+sep+tab retlst += (self.DopplerParametersReferenceTime,) retstr += "DopplerCentroidCoefficients=%s"+sep retlst += (self.DopplerCentroidCoefficients,) retstr += "DopplerRateValuesCoefficients=%s"+sep retlst += (self.DopplerRateValuesCoefficients,) retstr += ":_ProcessInfo"+sep return retstr % retlst def findPreciseOrbit(dirname, fname, year): ''' Find precise orbit file in given folder. ''' import glob ###First try root folder itself res = glob.glob( os.path.join(dirname, fname.lower())) if len(res) == 0: res = glob.glob( os.path.join(dirname, "{0}".format(year), fname.lower())) if len(res) == 0: raise Exception('Orbit Dirname provided but no suitable orbit file found in {0}'.format(dirname)) if len(res) > 1: print('More than one matching result found. Using first result.') return res[0] def convertRSTimeToDateTime(instr): ''' Convert RS2 orbit time string to datetime. ''' parts = instr.strip().split('-') tparts = parts[-1].split(':') secs = float(tparts[2]) intsecs = int(secs) musecs = int((secs - intsecs)*1e6) timestamp = datetime.datetime(int(parts[0]),1,1, int(tparts[0]), int(tparts[1]), intsecs, musecs) + datetime.timedelta(days = int(parts[1])-1) return timestamp