microproduct-l-sar/vegetationHeight-L-SAR/VegetationHeightMain.py

1110 lines
53 KiB
Python
Raw Permalink Normal View History

2024-01-03 01:42:21 +00:00
"""
@Project microproduct
@File AtmosphericDelayMain.py
@Function 主程序执行入口
@Author LMM
@Date 2021/09/3 14:39
@Version 1.0.0
"""
from xml.etree.ElementTree import ElementTree
import pyproj._compat # 解决打包错误
from osgeo import gdal
import glob
from pykrige import OrdinaryKriging # 强制引入包
from tool.algorithm.transforml1a.transHandle import TransImgL1A, TransImgL1A_ori
from tool.algorithm.algtools.MetaDataHandler import Calibration, MetaDataHandler
from tool.algorithm.polsarpro.AHVToPolsarpro import AHVToPolsarpro
from tool.algorithm.xml.AlgXmlHandle import ManageAlgXML, CheckSource # 导入xml文件读取与检查文件
from tool.algorithm.algtools.ROIAlg import ROIAlg
from VegetationHeightDT import DataTransform
from AHVToPolSarPro import AHVToPolSarProS2
from SatelliteGPS import SARgps
from tool.config.ConfigeHandle import Config as cf
import os
import shutil
import datetime
import sys
import tarfile
from tool.algorithm.algtools.logHandler import LogHandler
import logging
import numpy as np
import geocoding as dem
import netCDF4 as Nc # 解决打包错误
import cftime # 解决打包错误
from tool.algorithm.image.ImageHandle import ImageHandler
from autorun import auto_run_main
import gc
from Orthorectification import header_file_read_angle
import math
from tool.algorithm.algtools.PreProcess import PreProcess as pp
from tool.algorithm.xml.CreateMetaDict import CreateMetaDict, CreateProductXml
EXE_NAME = cf.get('exe_name')
if cf.get('debug') == 'True':
DEBUG = True
else:
DEBUG = False
tar = '-' + cf.get('tar')
LIGHTSPEED = 299792458
productLevel = cf.get('productLevel')
LogHandler.init_log_handler(r'run_log\\' + EXE_NAME)
logger = logging.getLogger("mylog")
# env_str = os.path.split(os.path.realpath(__file__))[0]
env_str = os.path.dirname(os.path.abspath(sys.argv[0]))
os.environ['PROJ_LIB'] = env_str
class AtmosphericMain:
"""
大气延迟校正主函数
"""
def __init__(self, alg_xml_path):
self.alg_xml_path = alg_xml_path
self.imageHandler = ImageHandler()
self.ROIAlg = ROIAlg()
self.__alg_xml_handler = ManageAlgXML(alg_xml_path) # 导入大气延迟的xml文件
self.__check_handler = CheckSource(self.__alg_xml_handler)
self.AHVToPolSarProS2 = AHVToPolSarProS2()
self.__workspace_path = None
self.__workspace_tem_dir_path = None
self.__input_paras = {}
self.__processing_paras = {}
self.__preprocessed_paras = {}
self.__preprocessed_paras2 = {}
self.__out_para = None
self.pro_img_path = None
self.tif_angle_mask_path = None
self.intersect_shp_path = None
self.ori_xml_m = None
# 坐标系
self.__proj = ''
# 影像投影变换矩阵
self.__geo = [0, 0, 0, 0, 0, 0]
def check_source(self):
"""
检查算法相关的配置文件图像辅助文件是否齐全
"""
self.env_str = os.getcwd()
logger.info("sysdir: %s", self.env_str)
if self.__check_handler.check_alg_xml() is False:
return False
if self.__check_handler.check_run_env() is False:
return False
input_para_names = ["MasterSarData", "AuxiliarySarData", "DEM", "box"]
if self.__check_handler.check_input_paras(input_para_names) is False:
return False
self.__workspace_path = self.__alg_xml_handler.get_workspace_path()
self.__workspace_tem_dir_path = self.__workspace_path + EXE_NAME + "\\Temporary""\\"
self.__create_work_space()
self.__input_paras = self.__alg_xml_handler.get_input_paras() # 获取输入文件夹中的数据名、类型、地址
self.__processing_paras = self.__init_processing_paras(self.__input_paras) # 输出{文件名:地址}
SrcImageName = os.path.split(self.__input_paras["MasterSarData"]['ParaValue'])[1].split('.tar.gz')[0]
result_name = SrcImageName + tar + ".tar.gz"
self.__out_para = os.path.join(self.__workspace_path, EXE_NAME, 'Output', result_name)
self.__alg_xml_handler.write_out_para("VegetationHeight", self.__out_para) # 写入输出参数
logger.info('check_source success!')
logger.info('progress bar :10')
return True
def __init_processing_paras(self, names):
""""
param: names:字典列表每个字典为一个输入产品的配置信息
"""
processing_paras = {}
for name in names:
para = self.__input_paras[name]
if para is None:
logger.error(name + "is None!")
return False
para_path = para['ParaValue']
if para['ParaType'] == 'File':
if para['DataType'] == "nc":
processing_paras.update({name: para_path})
elif para['DataType'] == 'file':
if name in ["MasterNC", "AuxiliaryNC"]:
processing_paras.update({name: para_path})
if name in ['MasterSarData', 'AuxiliarySarData']:
paths = para['ParaValue'].split(';')
slc_path = os.path.join(self.__workspace_origin_path, 'SARS')
if not os.path.exists(slc_path):
os.mkdir(slc_path)
# 获取文件名中的关键字:如"20190206"/"20190113"
if name == 'MasterSarData':
filename = os.path.basename(paths[0])
part = filename.split("_")
name = part[7]
self.mas_key_word = name
shutil.copy(paths[0], os.path.join(slc_path, filename))
elif name == 'AuxiliarySarData':
filename = os.path.basename(paths[0])
part = filename.split("_")
name = part[7]
self.aux_key_word = name
shutil.copy(paths[0], os.path.join(slc_path, filename))
# for path in paths:
# shutil.copy(path, self.__workspace_slc_path)
processing_paras.update({'slc': slc_path})
# processing_paras.update({'slc': self.__workspace_slc_path})
if name == 'DEM':
if para['DataType'] == 'File':
processing_paras.update({'DEM': para['ParaValue']})
else:
para_path_list = para['ParaValue'].split(";")
if len(para_path_list) != 0:
dem_path = os.path.join(self.__workspace_origin_path, para['ParaName'])
if os.path.exists(dem_path) is False:
os.mkdir(dem_path)
for file_path in para_path_list:
tif_name = os.path.basename(file_path)
shutil.copy(file_path, os.path.join(dem_path, tif_name))
para_path = os.path.join(self.__workspace_origin_path, para['ParaName'])
processing_paras.update({'DEM': para_path})
# # 解压DEM到指定文件夹
# path = para['ParaValue']
# import zipfile
# zip_file = zipfile.ZipFile(path)
# zip_list = zip_file.namelist() # 得到压缩包里所有文件
# for f in zip_list:
# zip_file.extract(f, self.__workspace_dem_path) # 循环解压文件到指定目录
# if os.path.splitext(f)[1] == '.wgs84':
# dem_name = f
# processing_paras.update({'dem': os.path.join(self.__workspace_dem_path, f)})
# zip_file.close()
# self.verifyAndModifyWgsXml(self.__workspace_dem_path + '\\' + dem_name + '.xml',
# self.__workspace_dem_path + '\\' + dem_name)
if name == 'Orbits':
if para['DataType'] == 'File':
processing_paras.update({'orbits': para['ParaValue']})
else:
para_path_list = para['ParaValue'].split(";")
if len(para_path_list) != 0:
dem_path = os.path.join(self.__workspace_origin_path, para['ParaName'])
if os.path.exists(dem_path) is False:
os.mkdir(dem_path)
for file_path in para_path_list:
tif_name = os.path.basename(file_path)
shutil.copy(file_path, os.path.join(dem_path, tif_name))
para_path = os.path.join(self.__workspace_origin_path, para['ParaName'])
processing_paras.update({'orbits': para_path})
if name == 'box':
datas = para['ParaValue'].split(';')
if len(datas) != 4:
msg = 'para: box is error!box:' + para['ParaValue']
raise Exception(msg)
box = datas[0] + ' ' + datas[1] + ' ' + datas[2] + ' ' + datas[3]
processing_paras.update({'box': box})
return processing_paras
def verifyAndModifyWgsXml(self, xmlPath, demPath):
import xml.dom.minidom as xmldom
domobj = xmldom.parse(xmlPath)
rootNode = domobj.documentElement
pathInxml = ''
# 获得子标签
propertyElementObj = rootNode.getElementsByTagName("property")
for property in propertyElementObj:
if property.hasAttribute("name"):
if property.getAttribute("name") == "file_name":
pathNode = property.getElementsByTagName("value")[0]
pathInxml = pathNode.childNodes[0].data
print('pathInxml1:', pathInxml)
pathNode.childNodes[0].data = r"/".join(demPath.split("\\"))
pathInxml = pathNode.childNodes[0].data
print('pathInxml2:', pathInxml)
with open(xmlPath, 'w') as f:
# 缩进换行编码
domobj.writexml(f, addindent=' ', encoding='utf-8')
def __create_work_space(self):
"""
删除原有工作区文件夹,创建新工作区文件夹
"""
self.__workspace_preprocessing_path = self.__workspace_tem_dir_path + "preprocessing""\\"
self.__workspace_preprocessed_path = self.__workspace_tem_dir_path + "preprocessed""\\"
self.__workspace_cut_path = self.__workspace_tem_dir_path + "cut""\\"
# self.__workspace_resame2_path = self.__workspace_tem_dir_path + "resame2""\\"
self.__workspace_roi_path = self.__workspace_tem_dir_path + "roi""\\"
self.__workspace_atmos_dely_path = self.__workspace_tem_dir_path + "VegetationHeight""\\"
# self.__workspace_resame_itself_path = self.__workspace_tem_dir_path + "resame_itself""\\"
self.__workspace_Output_path = self.__workspace_path + EXE_NAME + r"\Output""\\"
self.__workspace_iscePreprocessed_path = self.__workspace_path + EXE_NAME + r"\Temporary\isce\preprocessed""\\"
self.__workspace_isceProcessing_path = self.__workspace_path + EXE_NAME + r"\Temporary\isce\processing""\\"
self.__workspace_isce_path = os.path.join(self.__workspace_isceProcessing_path, 'isce_workspace')
self.__workspace_mintpy_path = os.path.join(self.__workspace_isceProcessing_path, 'mintpy_workspace')
self.__workspace_dem_path = os.path.join(self.__workspace_iscePreprocessed_path, 'dem')
self.__workspace_slc_path = self.__workspace_path + EXE_NAME + r"\Temporary\slc"
self.__workspace_origin_path = os.path.join(self.__workspace_path, EXE_NAME, "Temporary", "origin")
# self.__workspace_resame2_path,self.__workspace_resame_itself_path,
path_list = [self.__workspace_preprocessing_path, self.__workspace_preprocessed_path,
self.__workspace_cut_path, self.__workspace_origin_path,
self.__workspace_roi_path, self.__workspace_atmos_dely_path,
self.__workspace_Output_path,
self.__workspace_iscePreprocessed_path, self.__workspace_isceProcessing_path,
self.__workspace_dem_path, self.__workspace_isce_path,
self.__workspace_slc_path, self.__workspace_mintpy_path]
for path in path_list:
if os.path.exists(path):
# if DEBUG is True:
# continue
self.del_folder(path)
os.makedirs(path)
else:
os.makedirs(path)
logger.info('create new workspace success!')
@staticmethod
def force_del_file(file_path):
"""
强制删除文件
"""
if os.path.isdir(file_path):
for main_dir, subdir, file_name_list in os.walk(file_path):
for filename in file_name_list:
apath = main_dir + filename
# noinspection PyBroadException
try:
os.remove(apath)
except Exception as error: # 使用windows命令行强制删除
os.system("del /f /q %s" % apath)
elif os.path.isfile(file_path) is True:
# noinspection PyBroadException
try:
os.remove(file_path)
except Exception as error: # 使用windows命令行强制删除
os.system("del /f /q %s" % file_path)
def convertToDateTime(self, string):
dt = datetime.datetime.strptime(string, "%Y-%m-%dT%H:%M:%S.%f")
return dt
def del_file(self, path_data):
"""
只删除文件不删除文件夹
"""
for i in os.listdir(path_data):
file_data = path_data + "\\" + i
if os.path.isfile(file_data) is True: # os.path.isfile判断是否为文件,如果是文件,就删除.如果是文件夹.递归给del_file.
os.remove(file_data)
return True
else:
self.del_file(file_data)
@staticmethod
def del_folder(path_data):
"""
删除整个文件夹
"""
if os.path.isdir(path_data):
shutil.rmtree(path_data)
@staticmethod
def del_folder_list(path_data_list):
"""
删除文件夹列表
"""
for path_data in path_data_list:
if os.path.isdir(path_data):
shutil.rmtree(path_data)
def del_temp_workspace(self):
"""
临时工作区
"""
if DEBUG is True:
return
path = self.__workspace_path + EXE_NAME + r'\Temporary'
if os.path.exists(path):
self.del_folder(path)
def calFlatEffect(self, bPar, lamda):
print("1. calFlatEffect 去地平 ")
# bParPath = os.path.join(self.__workspace_isce_path, 'merged', 'baselines', self.aux_key_word,
# self.aux_key_word + '_Bpar.npy')
# bParPath = r'D:\micro\LWork\20230404\20230404_Bpar.npy'
# bPar = np.load(bParPath)
h, w = bPar.shape
if int(self.mas_key_word) < int(self.aux_key_word):
flatEffect = ((-4) * np.pi / lamda) * bPar
else:
flatEffect = (4 * np.pi / lamda) * bPar
flatEffect_path = self.__workspace_preprocessed_path + "FlatEffect""\\"
self.out_flat_tif = flatEffect_path + "flatEffect_path.tif"
ImageHandler().write_img(self.out_flat_tif, '', [0, 0, 0, 0, 0, 0], flatEffect)
flatEffect_angle = np.exp(1j * flatEffect)
flatEffect_angle_path = flatEffect_path + "flatEffect_angle_path.bin"
flatEffect_angleHdr_path = flatEffect_path + "flatEffect_angle_path.hdr"
self.AHVToPolSarProS2.write_slc_img_bin(flatEffect_angle, flatEffect_angle_path)
self.AHVToPolSarProS2.write_bin_hdr(flatEffect_angleHdr_path, h, w)
return flatEffect_angle
def getRange_by_npy(self, flatEffect):
rangePath = os.path.join(self.__workspace_isce_path, 'merged', 'baselines', self.aux_key_word,
self.aux_key_word + '_Range.npy')
rangePath = r"D:\micro\LWork\20230404\20230404_Range.npy"
range_arr = np.load(rangePath)
h, w = flatEffect.shape
Range = np.zeros((h, w), dtype=float)
Range[:, :] = range_arr
return Range
def getRange_azTime(self, xml_path, mas_lines_set):
root = ElementTree(file=xml_path).getroot()
rangePixelSize = float(
root.find("productInfo").find("imageDataInfo").find("imageRaster").find("columnSpacing").text)
slantRangeTimeToFirstRangeSample = float(
root.find("productInfo").find("sceneInfo").find("rangeTime").find("firstPixel").text)
startingRange = slantRangeTimeToFirstRangeSample * (LIGHTSPEED / 2)
nearRange = startingRange + rangePixelSize * mas_lines_set[2]
cols = mas_lines_set[3] - mas_lines_set[2]
slantRange = [nearRange + rangePixelSize * c for c in range(cols)]
dataStartTime = self.convertToDateTime(
root.find("productInfo").find("sceneInfo").find("start").find("timeUTC").text)
dataStopTime = self.convertToDateTime(
root.find("productInfo").find("sceneInfo").find("stop").find("timeUTC").text)
lines = int(root.find("productInfo").find("imageDataInfo").find("imageRaster").find("numberOfRows").text)
orig_prf = (lines - 1) / ((dataStopTime - dataStartTime).total_seconds())
nearTime = dataStartTime + datetime.timedelta(seconds=mas_lines_set[0] / orig_prf)
rows = mas_lines_set[1] - mas_lines_set[0]
azimuthTime = [nearTime + datetime.timedelta(seconds=r / orig_prf) for r in range(rows)]
return slantRange, azimuthTime
def preprocess_handle(self):
slc_paths = os.path.join(self.__workspace_origin_path, 'SARS', self.mas_key_word)
for file in os.listdir(slc_paths):
if file.endswith('.meta.xml'):
self.ori_xml_m = os.path.join(slc_paths, file)
slc_paths = os.path.join(self.__workspace_origin_path, 'SARS', self.aux_key_word)
for file in os.listdir(slc_paths):
if file.endswith('.meta.xml'):
self.ori_xml_a = os.path.join(slc_paths, file)
print("========= 生成kz.bin start ===========")
m_incidence_file = list(glob.glob(os.path.join(self.__workspace_origin_path, "SARS", self.mas_key_word,
'*.incidence.xml'))) # _incidence.xml文件路径
a_incidence_file = list(
glob.glob(os.path.join(self.__workspace_origin_path, "SARS", self.aux_key_word, '*.incidence.xml')))
mas_in_tif_paths = list(
glob.glob(os.path.join(self.__workspace_origin_path, "SARS", self.mas_key_word, '*.tiff')))
MasterSar_width = ImageHandler.get_img_width(mas_in_tif_paths[0])
MasterSar_height = ImageHandler.get_img_height(mas_in_tif_paths[0])
aux_in_tif_paths = list(
glob.glob(os.path.join(self.__workspace_origin_path, "SARS", self.aux_key_word, '*.tiff')))
AuxiliarySar_width = ImageHandler.get_img_width(aux_in_tif_paths[0])
AuxiliarySar_height = ImageHandler.get_img_height(aux_in_tif_paths[0])
m_all_angle_array = header_file_read_angle().read_all_angle(m_incidence_file[0], MasterSar_height,
MasterSar_width)
m_incidence_path = self.__workspace_preprocessed_path + "m_incidence\\"
if not os.path.exists(m_incidence_path):
os.makedirs(m_incidence_path)
m_cut_img = m_all_angle_array[self.mas_lines_set[0]:self.mas_lines_set[1],
self.mas_lines_set[2]:self.mas_lines_set[3]]
ImageHandler.write_img(m_incidence_path + "m_incidence.tif", '', [0, 0, 0, 0, 0, 0], m_cut_img)
self.__preprocessed_paras.update({"m_incidence": m_incidence_path + "m_incidence.tif"})
a_all_angle_array = header_file_read_angle().read_all_angle(a_incidence_file[0], AuxiliarySar_height,
AuxiliarySar_width)
a_incidence_path = self.__workspace_preprocessed_path + "a_incidence\\"
if not os.path.exists(a_incidence_path):
os.makedirs(a_incidence_path)
a_cut_img = a_all_angle_array[self.aux_lines_set[0]:self.aux_lines_set[1],
self.aux_lines_set[2]:self.aux_lines_set[3]]
ImageHandler.write_img(a_incidence_path + "a_incidence.tif", '', [0, 0, 0, 0, 0, 0], a_cut_img)
self.__preprocessed_paras.update({"a_incidence": a_incidence_path + "a_incidence.tif"})
radar_center_frequency = MetaDataHandler.get_RadarCenterFrequency(self.ori_xml_m)
lamda = float(299792458 / radar_center_frequency)
SatelliteOrbitModel = SARgps()
SatelliteOrbit_m = SatelliteOrbitModel.getGPSModel(self.ori_xml_m)
SatelliteOrbit_a = SatelliteOrbitModel.getGPSModel(self.ori_xml_a)
slantRange_m, azimuthTime_m = self.getRange_azTime(self.ori_xml_m, self.mas_lines_set)
slantRange_a, azimuthTime_a = self.getRange_azTime(self.ori_xml_a, self.aux_lines_set)
Bpar, Bperp = SatelliteOrbitModel.calBaseline(slantRange_m, azimuthTime_m, azimuthTime_a, SatelliteOrbit_m,
SatelliteOrbit_a, lamda, self.ori_xml_m, self.ori_xml_a)
flatEffect = self.calFlatEffect(Bpar, lamda)
# Range = self.getRange_by_npy(flatEffect)
self.calc_kz(m_cut_img, Bperp, slantRange_m, lamda)
# self.calc_kz(m_cut_img, a_cut_img, lamda)
print("========= 生成kz.bin end ===========")
# 计算地平相位
# 1、slc->s2
print("========= slc->s2 start ===========")
atp = AHVToPolsarpro()
# 主影像转S2
master_s2 = self.__workspace_preprocessed_path + "master_s2""\\"
os.makedirs(master_s2)
polarization = ['HH', 'HV', 'VH', 'VV']
slc_paths = os.path.join(self.__workspace_origin_path, 'SARS', self.mas_key_word)
for file in os.listdir(slc_paths):
if file.endswith('.meta.xml'):
self.ori_xml_m = os.path.join(slc_paths, file)
calibration = Calibration.get_Calibration_coefficient(self.ori_xml_m, polarization)
master_tif_path = atp.calibration(calibration, in_ahv_dir=self.__workspace_preprocessed_path + "MasterSar""\\")
self.AHVToPolSarProS2.api_ahv_to_polsarpro_s2(master_s2, master_tif_path) # 全极化影像转S2矩阵
print("--------- 主影像转S2 完成 -----------")
# 辅影像转S2
slave_s2 = self.__workspace_preprocessed_path + "slave_s2""\\"
os.makedirs(slave_s2)
polarization = ['HH', 'HV', 'VH', 'VV']
calibration = Calibration.get_Calibration_coefficient(self.ori_xml_a, polarization)
master_tif_path = atp.calibration(calibration,
in_ahv_dir=self.__workspace_preprocessed_path + "AuxiliarySar""\\")
self.AHVToPolSarProS2.api_ahv_to_polsarpro_s2flatEffect(slave_s2,
master_tif_path, flatEffect) # 全极化影像转S2矩阵 ,同时完成去地平
# # 陈增辉修改
slavepath = slave_s2 + "no_flat\\"
if not os.path.exists(slavepath):
os.mkdir(slavepath)
self.AHVToPolSarProS2.api_ahv_to_polsarpro_s2(slavepath,
master_tif_path) # 全极化影像转S2矩阵
print("--------- 辅影像转S2 完成 -----------")
print("========= slc->s2 end ===========")
current_path = os.path.dirname(os.path.abspath(sys.argv[0]))
print("========= 极化矩阵滤波并转T6 start ===========")
cols = ImageHandler.get_img_width(self.__preprocessed_paras["MasterSar_HH"])
rows = ImageHandler.get_img_height(self.__preprocessed_paras["MasterSar_HH"])
"""
lee_refined_filter_dual.exe -idm "F:/Toos/PolSARpro/data/LT_data/VegetationHeight/Temporary/preprocessed/master_s2"
-ids "F:/Toos/PolSARpro/data/LT_data/VegetationHeight/Temporary/preprocessed/slave_s2"
-od "F:/Toos/PolSARpro/data/LT_data/VegetationHeight/Temporary/preprocessed/master_s2_slave_s2_LEE/T6"
-iodf S2T6 -nw 7 -nlk 1 -ofr 0 -ofc 0 -fnr 7903 -fnc 11898
"""
lee_t6_path = self.__workspace_preprocessed_path + "master_s2_slave_s2_LEE""\\""T6""\\"
if not os.path.exists(lee_t6_path):
os.makedirs(lee_t6_path)
lee_refined_filter_dual_path = os.path.join(current_path, "lee_refined_filter_dual.exe")
cmd = "{} -idm {} -ids {} -od {} -iodf S2T6 -nw 7 -nlk 1 -ofr 0 -ofc 0 -fnr {} -fnc {}".format(
lee_refined_filter_dual_path,
master_s2,
slave_s2,
lee_t6_path,
rows,
cols)
logger.info("{} {}".format(os.system(cmd), cmd))
print("========= 极化矩阵滤波并转T6 end ===========")
S2_configtxt = list(glob.glob(os.path.join(master_s2, "*.txt")))[0]
shutil.copy(S2_configtxt, os.path.join(lee_t6_path, "config.txt"))
print("========= 复相干系数估计 start ===========")
coherence_estimation_path = os.path.join(current_path, "complex_coherence_estimation.exe")
# coherence_opt_estimation_path = os.path.join(current_path, "complex_coherence_opt_estimation_T6.exe")
master_slave_t6_box_coh = lee_t6_path
cor_list = ["HH", "HV", "VV", "LL", "LR", "RR", "HHpVV", "HHmVV", "HVpVH", "HHVV"]
"""
complex_coherence_estimation.exe -id "F:/Toos/PolSARpro/data/LT_data/VegetationHeight/Temporary/preprocessed/master_s2_slave_s2_LEE/T6"
-od "F:/Toos/PolSARpro/data/LT_data/VegetationHeight/Temporary/preprocessed/master_s2_slave_s2_LEE/T6"
-iodf T6 -type HHmVV -nwr 7 -nwc 7 -ofr 0 -ofc 0 -fnr 7903 -fnc 11898
"""
for cor_type in cor_list:
cmd = "{} -id {} -od {} -iodf T6 -type {} -nwr 7 -nwc 7 -ofr 0 -ofc 0 -fnr {} -fnc {}".format(
coherence_estimation_path, lee_t6_path, master_slave_t6_box_coh, cor_type, rows, cols)
logger.info("{} {}".format(os.system(cmd), cmd))
coherence_estimation_opt_path = os.path.join(current_path, "complex_coherence_opt_estimation.exe")
cmd = "{} -id {} -od {} -iodf T6 -nwr 7 -nwc 7 -ofr 0 -ofc 0 -fnr {} -fnc {}".format(
coherence_estimation_opt_path, lee_t6_path, master_slave_t6_box_coh, rows, cols)
logger.info("{} {}".format(os.system(cmd), cmd))
print("========= 复相干系数估计 end ===========")
print("========= 基于RVOG模型三阶段植被高度反演方法 start ===========")
"""
height_estimation_inversion_procedure_RVOG.exe -id "F:/Toos/PolSARpro/micro/datas/vegetation/track_master_track_slave"
-od "F:/Toos/PolSARpro/micro/datas/vegetation/track_master_track_slave"
-ifgh "F:/Toos/PolSARpro/micro/datas/vegetation/track_master_track_slave/cmplx_coh_HV.bin"
-ifgl "F:/Toos/PolSARpro/micro/datas/vegetation/track_master_track_slave/cmplx_coh_HHpVV.bin"
-kz "F:/Toos/PolSARpro/micro/datas/vegetation/track_slave/kz.bin"
-nwr 11 -nwc 11 -coef 0.5 -nc 11898 -ofr 0 -ofc 0 -fnr 7903 -fnc 11898
"""
height_estimation_inversion_procedure_RVOG_path = os.path.join(current_path,
"height_estimation_inversion_procedure_RVOG.exe")
cmplx_coh_HV_path = lee_t6_path + "cmplx_coh_HV.bin"
cmplx_coh_HHpVV_path = lee_t6_path + "cmplx_coh_HHpVV.bin"
cmd = "{} -id {} -od {} -ifgh {} -ifgl {} -kz {} -nwr 11 -nwc 11 -coef 0.5 -nc {} -ofr 0 -ofc 0 -fnr {} -fnc {}".format(
height_estimation_inversion_procedure_RVOG_path,
lee_t6_path,
lee_t6_path,
cmplx_coh_HV_path,
cmplx_coh_HHpVV_path,
self.out_kz,
cols, rows, cols)
logger.info("{} {}".format(os.system(cmd), cmd))
print("========= 基于RVOG模型三阶段植被高度反演方法 end ===========")
print("========= 正射 start ===========")
RVOG_heights_path = lee_t6_path + "RVOG_heights.bin"
array = np.fromfile(RVOG_heights_path, dtype=np.float32).reshape(rows, cols)
srcName_m = os.path.split(self.__input_paras["MasterSarData"]['ParaValue'])[1].split('.tar.gz')[0]
result_name = srcName_m + tar + ".tiff"
vegetation_height_tif = os.path.join(self.__workspace_atmos_dely_path, result_name)
vegetation_height_tif = dem.get_Dem(self.__workspace_isce_path, self.__workspace_preprocessed_path,
self.__workspace_Output_path, vegetation_height_tif, array)
self.imageHandler.write_quick_view(vegetation_height_tif)
meta_xml_path = self.creat_xml(vegetation_height_tif)
temp_folder = os.path.join(self.__workspace_path, EXE_NAME, 'Output')
out_xml = os.path.join(temp_folder, os.path.basename(meta_xml_path))
if os.path.exists(temp_folder) is False:
os.mkdir(temp_folder)
shutil.copy(meta_xml_path, out_xml)
self.make_targz(self.__out_para, self.__workspace_atmos_dely_path)
print("========= 正射 end ===========")
pass
def creat_xml(self, dem_proPath):
"""
生成元文件案例
product_path: 大气延迟校正产品输出的影像文件路径
"""
# os.chdir(env_str)
model_path = os.path.join(env_str, "product.xml")
tem_folder = self.__workspace_path + EXE_NAME + r"\Temporary""\\"
image_path = dem_proPath
out_path1 = os.path.join(tem_folder, "trans_geo_projcs.tif")
out_path2 = os.path.join(tem_folder, "trans_projcs_geo.tif")
srcName_m = os.path.split(self.__input_paras["MasterSarData"]['ParaValue'])[1].split('.tar.gz')[0]
meta_xml_path = os.path.join(self.__workspace_atmos_dely_path, srcName_m + "-VTH.meta.xml")
para_dict = CreateMetaDict(image_path, self.ori_xml_a, self.__workspace_atmos_dely_path,
out_path1, out_path2).calu_nature()
para_dict.update({"imageinfo_ProductName": "植被高度产品"})
para_dict.update({"imageinfo_ProductIdentifier": "DEM"})
para_dict.update({"imageinfo_ProductLevel": productLevel})
para_dict.update({"ProductProductionInfo_BandSelection": "1,2"})
para_dict.update({"ProductProductionInfo_AuxiliaryDataDescription": "DEM"})
CreateProductXml(para_dict, model_path, meta_xml_path).create_standard_xml()
return meta_xml_path
def reasmple_incLocal(self, para_name1, para_name2):
m_incLocal = self.__preprocessed_paras[para_name1[0]]
a_incLocal = self.__preprocessed_paras[para_name1[1]]
m_hgt = self.__preprocessed_paras[para_name2[0]]
m_incLocal_resample = os.path.join(self.__workspace_slc_path, "m_incLocal_resample.tif")
a_incLocal_resample = os.path.join(self.__workspace_slc_path, "a_incLocal_resample.tif")
pp.resampling_by_scale(m_incLocal, m_incLocal_resample, m_hgt)
pp.resampling_by_scale(a_incLocal, a_incLocal_resample, m_hgt)
return m_incLocal_resample, a_incLocal_resample
def calc_kz(self, a_angle_array, bParp, Range, lamda):
# def calc_kz(self, m_angle_array, a_angle_array, lamda):
"""计算kz参数"""
# bParpPath = os.path.join(self.__workspace_isce_path, 'merged', 'baselines', self.aux_key_word,
# self.aux_key_word + '_Bpar.npy')
# bParpPath = r"D:\micro\LWork\20230404\20230404_Bperp.npy"
# bParp = np.load(bParpPath)
kz_array = (2 * np.pi / lamda) * (bParp / np.multiply(Range, np.sin(a_angle_array)))
# kz_array = np.exp(1j * kz_array)
kz_path = self.__workspace_preprocessed_path + "kz""\\"
self.out_kz_tif = kz_path + "kz.tif"
ImageHandler().write_img(self.out_kz_tif, '', [0, 0, 0, 0, 0, 0], kz_array)
self.out_kz = kz_path + "kz.bin"
# self.AHVToPolSarProS2.write_slc_img_bin(kz_array, self.out_kz)
self.AHVToPolSarProS2.array_bin(self.out_kz, kz_array)
logger.info("calc kz finish ")
return bParp
# 方法2 通过正射计算的雷达入射角进行计算
# try:
# v1 = m_angle_array / 180 * math.pi # 转成弧度
# v2 = a_angle_array / 180 * math.pi
# mean = (v1 + v2) / 2
# kz_array = 4 * math.pi * (abs(v2 - v1)) / (np.sin(mean) * lamda)
#
# self.mask_m_angle_array = m_angle_array
# self.mask_a_angle_array = a_angle_array
# ####################################
#
# kz_path = self.__workspace_preprocessed_path + "kz""\\"
#
# self.out_kz_tif = kz_path + "kz.tif"
# ImageHandler().write_img(self.out_kz_tif, '', [0, 0, 0, 0, 0, 0], kz_array)
#
# self.out_kz = kz_path + "kz.bin"
# self.AHVToPolSarProS2.array_bin(self.out_kz, kz_array)
#
# logger.info("calc kz finish ")
# except:
# return False
def check_img_projection(self, para_names1, para_names2):
"""
读取每一张图像,检查图像坐标系如果是平面坐标则转换为WGS84经纬度坐标
:param para_names1:需要检查的参数名称
:param para_names2:需要检查的参数名称
"""
if len(para_names1) == 0 or len(para_names2) == 0:
return False
pro_img_path = {}
for name1 in para_names1:
# dem、入射角
in_para = self.__processing_paras[name1]
proj = self.imageHandler.get_projection(in_para)
keyword = proj.split("[", 2)[0]
out_para = self.__workspace_preprocessed_path + "pro_" + os.path.basename(self.__processing_paras[name1])
if keyword == "GEOGCS":
shutil.copy(in_para, out_para)
elif keyword == "PROJCS":
pp.trans_epsg4326(out_para, in_para)
elif len(keyword) == 0 or keyword.strip() == "" or keyword.isspace() is True:
raise Exception('the coordinate of dem or incident is missing!')
pro_img_path.update({name1: out_para})
logger.info('image coordinate transformation finished!')
for name2 in para_names2:
in_para = self.__workspace_preprocessing_path + name2 + ".tif"
proj = self.imageHandler.get_projection(in_para)
keyword = proj.split("[", 2)[0]
out_para = self.__workspace_preprocessed_path + name2 + ".tif"
if keyword == "GEOGCS":
shutil.copy(in_para, out_para)
elif keyword == "PROJCS":
pp.trans_epsg4326(out_para, in_para)
elif len(keyword) == 0 or keyword.strip() == "" or keyword.isspace() is True:
raise Exception('data miss coordinate!')
pro_img_path.update({name2: out_para})
logger.info('ERA data coordinate transformation finished!')
return pro_img_path
def cal_intersect_shp(self, shp_path, para_dict, para_names):
"""
:param shp_path:相交区域矢量文件保存区域
:param para_dict: 字典 {影像名称:路径}
:param para_names:判断相交影像的名称列表
:return: True or False
"""
if len(para_names) == 0:
return False
scopes = ()
for name in para_names:
scope_tuple = (self.imageHandler.get_scope(para_dict[name]),)
scopes += scope_tuple
intersect_polygon = pp.intersect_polygon(scopes)
if intersect_polygon is None:
logger.error('image range does not overlap!')
return False
if pp.write_polygon_shp(shp_path, intersect_polygon, 4326) is False:
return False
logger.info('calculate intersect finished!')
return True
def intersect_judge(self, para_dict, para_names, para_names2):
"""
:param para_dict: 字典 {影像名称:路径}
:param para_names:判断相交影像的名称列表
:param para_names2:判断相交影像的名称列表
:return: True or False
"""
if len(para_names) == 0 or len(para_names2) == 0:
return False
scopes_01 = ()
for name in para_names:
scope_tuple_01 = (self.imageHandler.get_scope(para_dict[name]),)
scopes_01 += scope_tuple_01
intersect_polygon_01 = pp.intersect_polygon(scopes_01)
scopes_02 = ()
for name in para_names2:
scope_tuple_02 = (self.imageHandler.get_scope(para_dict[name]),)
scopes_02 += scope_tuple_02
intersect_polygon_02 = pp.intersect_polygon(scopes_02)
intersect_polygon_01 = np.array(intersect_polygon_01)
intersect_polygon_02 = np.array(intersect_polygon_02)
flag_left, flag_right, flag_down, flag_up = 0, 0, 0, 0
if intersect_polygon_02[0][0] >= intersect_polygon_01[0][0]:
flag_left = 1
if intersect_polygon_02[2][0] <= intersect_polygon_01[2][0]:
flag_right = 1
if intersect_polygon_02[0][1] >= intersect_polygon_01[0][1]:
flag_down = 1
if intersect_polygon_02[2][1] <= intersect_polygon_01[2][1]:
flag_up = 1
if flag_left == 0 or flag_right == 0 or flag_down == 0 or flag_up == 0:
return False
return True
def cut_img(self, para_names1, shp_path):
"""
使用矢量数据裁剪影像
:param para_names1:需要检查的参数名称
:param shp_path裁剪的shp文件
"""
if len(para_names1) == 0:
return None
cutted_img_path = {}
# noinspection PyBroadException
try:
for name1 in para_names1:
input_path = self.pro_img_path[name1]
output_path = self.__workspace_cut_path + name1 + ".tif"
pp.cut_img(output_path, input_path, shp_path)
cutted_img_path.update({name1: output_path})
logger.info("cut %s success!", name1)
except BaseException:
logger.error('cut_img failed!')
return None
logger.info('cut image finished!')
return cutted_img_path
@staticmethod
def resampling_img(para_names1, img_paths, out_path, refer_img_path):
"""
以主影像为参考对影像重采样
:param para_names1:需要检查的参数名称
:param img_paths待重采样影像路径
:param out_path保存数据的文件夹路径
:param refer_img_path参考影像路径
"""
if len(para_names1) == 0 or len(img_paths) == 0:
return
prepro_imgs_path = {}
for name1 in para_names1:
img_path = img_paths[name1]
output_para = out_path + name1 + ".tif"
pp.resampling_by_scale(img_path, output_para, refer_img_path)
prepro_imgs_path.update({name1: output_para})
logger.info("resampling %s success!", name1)
logger.info('resampling finished !')
return prepro_imgs_path
def check_LocalIncidenceAngle(self):
"""
将角度的无效值设置为nan把角度值转为弧度值
"""
# 主影像雷达入射角
M_Angle_path = self.__preprocessed_paras['m_incLocal']
out_M_Angle_path = os.path.splitext(M_Angle_path)[0] + r"_check.tif"
pp.check_LocalIncidenceAngle(out_M_Angle_path, M_Angle_path)
if os.path.isfile(M_Angle_path) is True:
os.remove(M_Angle_path)
self.__preprocessed_paras['m_incLocal'] = out_M_Angle_path
# 辅影像雷达入射角
A_Angle_path = self.__preprocessed_paras['a_incLocal']
out_A_Angle_path = os.path.splitext(A_Angle_path)[0] + r"_check.tif"
pp.check_LocalIncidenceAngle(out_A_Angle_path, A_Angle_path)
if os.path.isfile(A_Angle_path) is True:
os.remove(A_Angle_path)
self.__preprocessed_paras['a_incLocal'] = out_A_Angle_path
@staticmethod
def make_targz(output_filename, source_dir):
"""
一次性打包整个根目录空子目录会被打包
如果只打包不压缩"w:gz"参数改为"w:""w"即可
:param output_filename:输出压缩包的完整路径eg:'E:\test.tar.gz'
:param source_dir:需要打包的跟目录eg: 'E:\testFfile\'打包文件夹里面的所有文件,'E:\testFfile'打包文件夹
"""
with tarfile.open(output_filename, "w:gz") as tar:
tar.add(source_dir, arcname=os.path.basename(source_dir))
def transformZtd(self, scopes, ori_sim, out_mas_ztd_path, geo_mas_ztd_path):
self._tr = TransImgL1A_ori(ori_sim, scopes)
self._tr.l1a_2_geo(ori_sim, out_mas_ztd_path, geo_mas_ztd_path)
def get_scopes(self, ori_sim):
ori_sim_data = self.imageHandler.get_data(ori_sim)
lon = ori_sim_data[0, :, :]
lat = ori_sim_data[1, :, :]
min_lon = np.nanmin(lon)
max_lon = np.nanmax(lon)
min_lat = np.nanmin(lat)
max_lat = np.nanmax(lat)
scopes = [[min_lon, max_lat], [max_lon, max_lat], [min_lon, min_lat], [max_lon, min_lat]]
return scopes
def prod_handle(self):
"""
干涉测量相位大气延迟校正处理
"""
pass
logger.info('progress bar :77')
def isce_autorun(self, isce_exe_dir, isce_work_space):
"""执行 autorun.py"""
# cmd = 'python {}/autorun.py -e {} -o {}'.format(isce_exe_dir, isce_exe_dir, isce_work_space)
# 注释by-shj-解决打包错误
# cmd = 'python autorun.py -e {} -o {} -cygwinFlag False'.format(isce_exe_dir, isce_work_space)
# logger.info('autorun_cmd:{}'.format(cmd))
# result = os.system(cmd)
isce_exe_dir = r"/".join(os.path.join(self.env_str, "ISCEApp").split("\\"))
os.chdir(isce_exe_dir)
cmd = ['-e', isce_exe_dir, '-o', isce_work_space]
logger.info('autorun_cmd:{}'.format(cmd))
auto_run_main(cmd)
os.chdir(self.env_str)
# logger.info('cmd_result:{}'.format(result))
# if result != 0:
# raise Exception('autorun.py run failed!')
logger.info('autorun_cmd success!')
def isce_write(self, in_dict, out_dict):
"""isce(.rdr、.vrt)-->.tif"""
for i in range(0, len(in_dict)):
output_file = os.path.join(self.__workspace_slc_path, out_dict[i])
limit_lat = os.path.join(self.__workspace_isce_path, "geom_reference", "lat.rdr")
limit_lon = os.path.join(self.__workspace_isce_path, "geom_reference", "lon.rdr")
if in_dict[i] != "filt_fine.int.vrt" and in_dict[i] != "filt_fine.unw.vrt":
input_file = os.path.join(self.__workspace_isce_path, "geom_reference", in_dict[i])
if os.path.isfile(input_file):
DataTransform().write_tif(output_file, limit_lat, limit_lon, input_file)
else:
raise Exception('can not find file: ', input_file)
name = out_dict[i].split(".")[0]
self.__processing_paras.update({name: output_file})
def isce_run_steps(self, run_steps, target):
for i in range(0, len(run_steps)):
uwm_file = os.path.join(self.__workspace_isce_path, "run_files", run_steps[i])
shutil.move(uwm_file, target)
def isce_del_dir(self, path_list):
for path in path_list:
if os.path.exists(path):
if os.path.isdir(path):
shutil.rmtree(path)
os.makedirs(path)
logger.info('create new floder success!')
def unPackLT1AB(self, run_unPackLT1AB):
global result
print(run_unPackLT1AB)
if not os.path.exists(run_unPackLT1AB):
raise Exception("run_unPackGF3.txt not found!")
with open(run_unPackLT1AB, 'r', encoding='utf-8') as fp:
cmd_lines = fp.readlines()
for cmd_line in cmd_lines:
cmdStr = cmd_line.replace("\n", "")
pyFileName = cmdStr.split(' ')[0]
exeName = "{0}.exe".format(pyFileName.split('.')[0])
newCmdLine = cmdStr.replace(pyFileName, exeName)
print("cmd_txt:{0}".format(newCmdLine))
if len(newCmdLine) == 0:
print("cmd_line{0} cmd_txt is null".format(cmd_line))
continue
result = os.system(newCmdLine)
return result
def get_interv_img(self):
"""根据干涉形变算法生成干涉图"""
# 执行isce2.5生成干涉图
# 生成工作流配置文件
dem_dir = r"/".join(self.__processing_paras['DEM'].split("\\"))
isce_work_space = r"/".join(self.__workspace_isce_path.split("\\"))
isce_work_space = '/cygdrive/' + isce_work_space.replace(":/", "/")
box = "'" + self.__processing_paras['box'] + "'"
isce_exe_dir = r"/".join(os.path.join(self.env_str, "ISCEApp").split("\\"))
target = isce_work_space
os.chdir(isce_exe_dir)
# 前处理 转换tif影像为 wgs84格式
dem_dir = '/cygdrive/' + dem_dir.replace(":/", "/")
out_dem_dir = self.__workspace_dem_path
out_dem_dir = '/cygdrive/' + out_dem_dir.replace(":\\", "/")
# cmd = "demhgt2wgs.exe --tif_path {} --hgt_path {} --ASTGTM2".format(dem_dir, out_dem_dir)
cmd = "DEM2ISCE.exe -s {} -o {}".format(dem_dir, out_dem_dir)
logger.info('demhgt2wgs_cmd:{}'.format(cmd))
result = os.system(cmd)
logger.info('cmd_result:{}'.format(result))
import glob
in_tif_paths = list(glob.glob(os.path.join(self.__workspace_dem_path, '*.wgs84')))
if in_tif_paths == []:
raise Exception('demhgt2wgs.exe run failed!')
dem_path = r"/".join(in_tif_paths[0].split("\\"))
dem_path = '/cygdrive/' + dem_path.replace(":/", "/")
logger.info('demhgt2wgs finish!')
logger.info('progress bar: 5%')
# 一、 辅影像
# 1.1 执行stackSentinel.exe 生成运行配置文件
# key_word = "20190206"
slc_dir = r"/".join(self.__processing_paras['slc'].split("\\")) + "/"
slc_dir = '/cygdrive/' + slc_dir.replace(":/", "/")
out_slc_dir = r"/".join(os.path.join(self.__workspace_preprocessed_path, 'slc').split("\\")) + "/"
if not os.path.exists(out_slc_dir):
os.mkdir(out_slc_dir)
out_slc_dir = '/cygdrive/' + out_slc_dir.replace(":/", "/")
cmd = "prepSlcLT1AB.exe -i {} -o {}".format(slc_dir, out_slc_dir)
logger.info('prepSlcLT1AB_cmd:{}'.format(cmd))
result = os.system(cmd)
logger.info('cmd_result:{}'.format(result))
run_unPackLT1AB = os.path.join(self.__workspace_origin_path, 'SARS', 'run_unPackLT1AB.txt')
result = self.unPackLT1AB(run_unPackLT1AB)
logger.info('unpackFrame_LT1AB_cmd:{}'.format(cmd))
logger.info('cmd_result:{}'.format(result))
logger.info('slc to isce_data finish!')
logger.info('progress bar: 10%')
cmd = "stackStripMap.exe -s {} -w {} -d {} -m {} -a {} -r {} -x {} -u 'snaphu' --nofocus".format(out_slc_dir,
isce_work_space,
dem_path,
self.mas_key_word,
3, 3,
box)
logger.info('stackStripMap_cmd:{}'.format(cmd))
result = os.system(cmd)
logger.info('cmd_result:{}'.format(result))
run_files = os.path.join(self.__workspace_isce_path, 'run_files')
for file in list(glob.glob(os.path.join(run_files, '*.job'))):
os.remove(file)
logger.info('mas slc stackStripMap finish!')
# 1.2 执行autorun.py 生成干涉图
# "run_02_reference"
# "run_03_focus_split",
# "run_04_geo2rdr_coarseResamp",
# "run_05_refineSecondaryTiming",
# "run_06_invertMisreg",
# "run_07_fineResamp",
# "run_08_grid_baseline",
run_steps = ["run_03_focus_split",
"run_04_geo2rdr_coarseResamp",
"run_05_refineSecondaryTiming",
"run_06_invertMisreg",
"run_07_fineResamp",
"run_08_grid_baseline",
"run_09_igram"]
self.isce_run_steps(run_steps, self.__workspace_isce_path)
cmd = ['-e', isce_exe_dir, '-o', self.__workspace_isce_path]
logger.info('autorun_cmd:{}'.format(cmd))
auto_run_main(cmd)
logger.info('cmd_result:{}'.format(result))
if result != 0:
raise Exception('autorun.py run failed!')
# self.isce_autorun(isce_exe_dir, isce_work_space)
logger.info('progress bar :30')
# 裁剪
mas_file_path = os.path.join(self.__workspace_preprocessed_path, 'slc', self.mas_key_word,
self.mas_key_word + ".txt")
with open(mas_file_path, 'r') as file:
lines = file.readlines()
self.mas_lines_set = list(int(line.replace("\n", "")) for line in lines)
aux_file_path = os.path.join(self.__workspace_preprocessed_path, 'slc', self.aux_key_word,
self.aux_key_word + ".txt")
with open(aux_file_path, 'r') as file:
lines = file.readlines()
self.aux_lines_set = list(int(line.replace("\n", "")) for line in lines)
mas_r_number = self.mas_lines_set[1] - self.mas_lines_set[0]
mas_c_number = self.mas_lines_set[3] - self.mas_lines_set[2]
aux_r_number = self.aux_lines_set[1] - self.aux_lines_set[0]
aux_c_number = self.aux_lines_set[3] - self.aux_lines_set[2]
self.aux_lines_set[1] += mas_r_number - aux_r_number
self.aux_lines_set[3] += mas_c_number - aux_c_number
mas_out_cut_path = self.__workspace_preprocessed_path + "MasterSar"
if not os.path.exists(mas_out_cut_path):
os.makedirs(mas_out_cut_path)
aux_out_cut_path = self.__workspace_preprocessed_path + "AuxiliarySar"
if not os.path.exists(aux_out_cut_path):
os.makedirs(aux_out_cut_path)
mas_in_tif_paths = list(
glob.glob(os.path.join(self.__workspace_origin_path, "SARS", self.mas_key_word, '*.tiff')))
for tif in mas_in_tif_paths:
file_name = os.path.basename(tif).split("_")[9]
out_path = os.path.join(mas_out_cut_path, "MasterSar_" + file_name + ".tif")
AtmosphericMain.cut_L1A(tif, out_path, self.mas_lines_set)
self.__preprocessed_paras.update({"MasterSar_" + file_name: out_path})
print("-----裁剪后主影像结果-----", out_path)
aux_in_tif_paths = list(
glob.glob(os.path.join(self.__workspace_origin_path, "SARS", self.aux_key_word, '*.tiff')))
for tif in aux_in_tif_paths:
file_name = os.path.basename(tif).split("_")[9]
out_path = os.path.join(aux_out_cut_path, "AuxiliarySar_" + file_name + ".tif")
AtmosphericMain.cut_L1A(tif, out_path, self.aux_lines_set)
self.__preprocessed_paras.update({"AuxiliarySar_" + file_name: out_path})
print("-----裁剪后辅影像结果-----", out_path)
logger.info('progress bar :40')
return True
@staticmethod
def cut_L1A(in_path, out_path, lines_set):
img = ImageHandler.get_data(in_path)
if len(img.shape) == 3:
cut_img = img[:, lines_set[0]:lines_set[1], lines_set[2]:lines_set[3]]
ImageHandler.write_img(out_path, '', [0, 0, 0, 0, 0, 0], cut_img)
else:
cut_img = img[lines_set[0]:lines_set[1] + 1, lines_set[2]:lines_set[3] + 1]
ImageHandler.write_img(out_path, '', [0, 0, 0, 0, 0, 0], cut_img)
if __name__ == "__main__":
start = datetime.datetime.now()
# noinspection PyBroadException
try:
if len(sys.argv) < 2:
xml_path = 'VegetationHeight.xml'
else:
xml_path = sys.argv[1]
AtmosphericMain = AtmosphericMain(xml_path)
if AtmosphericMain.check_source() is False:
raise Exception('check_source() failed!')
if AtmosphericMain.get_interv_img() is False: # 影像粗配准、主辅影像裁剪
raise Exception('get_interv_img() failed!')
if AtmosphericMain.preprocess_handle() is False:
raise Exception('preprocess_handle() failed!')
logger.info('successful production of ' + EXE_NAME + ' Product' + '.mate.xml')
except Exception:
logger.exception("run-time error!")
finally:
AtmosphericMain.del_temp_workspace()
# pass
end = datetime.datetime.now()
msg = 'running use time: %s ' % (end - start)
logger.info(msg)