# -*- coding: UTF-8 -*- """ @Project :microproduct @File :SoilMoistureMain.py @Author :SHJ @Contact: @Date :2021/7/26 17:11 @Version :1.0.0 修改历史: [修改序列] [修改日期] [修改者] [修改内容] 1 2022-6-30 石海军 1.使用cover_roi_id来选取ROI区域; 2.内部处理使用地理坐标系(4326); """ from tool.algorithm.algtools.PreProcess import PreProcess as pp from tool.algorithm.xml.AlgXmlHandle import ManageAlgXML, CheckSource,InitPara # 导入xml文件读取与检查文件 from tool.algorithm.image.ImageHandle import ImageHandler from tool.algorithm.algtools.logHandler import LogHandler from SoilMoistureALg import MoistureAlg as alg from tool.algorithm.block.blockprocess import BlockProcess from tool.algorithm.algtools.MetaDataHandler import MetaDataHandler from tool.algorithm.xml.CreateMetaDict import CreateMetaDict, CreateProductXml from tool.config.ConfigeHandle import Config as cf from tool.algorithm.xml.CreatMetafile import CreateMetafile from tool.algorithm.algtools.ROIAlg import ROIAlg as roi from SoilMoistureXmlInfo import CreateDict, CreateStadardXmlFile from tool.algorithm.algtools.filter.lee_Filter import Filter from tool.file.fileHandle import fileHandle import logging import os import shutil import datetime import numpy as np import sys import multiprocessing cover_id_list = [] threshold_of_ndvi_min = 0 threshold_of_ndvi_max = 0 multiprocessing_num = int(cf.get('multiprocessing_num')) if cf.get('debug') == 'True': DEBUG = True else: DEBUG = False file =fileHandle(DEBUG) EXE_NAME = cf.get('exe_name') FILTER_SIZE = int(cf.get('filter_size')) soil_moisture_value_min = float(cf.get('product_value_min')) soil_moisture_value_max = float(cf.get('product_value_max')) tar = r'-' + cf.get('tar') productLevel = cf.get('productLevel') LogHandler.init_log_handler('run_log\\' + EXE_NAME) logger = logging.getLogger("mylog") # env_str = os.path.split(os.path.realpath(__file__))[0] env_str =os.path.dirname(os.path.abspath(sys.argv[0])) os.environ['PROJ_LIB'] = env_str block_size_config=int(cf.get('block_size_config')) class MoistureMain: """ 土壤水分处理主函数 """ def __init__(self, alg_xml_path): self.alg_xml_path = alg_xml_path self.imageHandler = ImageHandler() self.__alg_xml_handler = ManageAlgXML(alg_xml_path) self.__check_handler = CheckSource(self.__alg_xml_handler) self.__workspace_path, self.__out_para = None, None self.__input_paras, self.__output_paras, self.__processing_paras, self.__preprocessed_paras = {},{},{},{} # 参考影像路径 self.__ref_img_path = '' # 宽/列数,高/行数 self.__cols, self.__rows = 0, 0 # 坐标系 self.__proj = '' # 影像投影变换矩阵 self.__geo = [0, 0, 0, 0, 0, 0] def check_source(self): """ 检查算法相关的配置文件,图像,辅助文件是否齐全 """ env_str = os.getcwd() logger.info("sysdir: %s", env_str) if self.__check_handler.check_alg_xml() is False: return False if self.__check_handler.check_run_env() is False: return False input_para_names = ['box', 'CoveringIDs', 'DualPolarSAR', 'Covering', 'NDVI', 'NDWI', 'e1', 'e2', 'Chh', 'Dhh', 'Cvv', 'Dvv']#, 'T', 'Pb', 'vsand', 'vclay'] if self.__check_handler.check_input_paras(input_para_names) is False: return False self.__workspace_path = self.__alg_xml_handler.get_workspace_path() self.__create_work_space() self.__input_paras = self.__alg_xml_handler.get_input_paras() self.__processing_paras = InitPara.init_processing_paras(self.__input_paras) self.__processing_paras.update(self.get_tar_gz_inf(self.__processing_paras["sar_path0"])) SrcImageName = os.path.split(self.__input_paras["DualPolarSAR"]['ParaValue'])[1].split('.tar.gz')[0] result_name = SrcImageName + tar + ".tar.gz" self.__out_para = os.path.join(self.__workspace_path, EXE_NAME, 'Output', result_name) self.__alg_xml_handler.write_out_para("SoilMoistureProduct", self.__out_para) #写入输出参数 logger.info('check_source success!') logger.info('progress bar: 10%') return True def get_tar_gz_inf(self, tar_gz_path): para_dic = {} name = os.path.split(tar_gz_path)[1].rstrip('.tar.gz') file_dir = os.path.join(self.__workspace_preprocessing_path, name + '\\') file.de_targz(tar_gz_path, file_dir) # 元文件字典 para_dic.update(InitPara.get_meta_dic_new(InitPara.get_meta_paths(file_dir, name), name)) # tif路径字典 pol_dic = InitPara.get_polarization_mode(InitPara.get_tif_paths(file_dir, name)) flag_list = [0, 0, 0, 0, 0] for key, in_tif_path in pol_dic.items(): # 获取极化类型 if 'HH' == key: para_dic.update({'HH': in_tif_path}) flag_list[0] = 1 elif 'HV' == key: para_dic.update({'HV': in_tif_path}) # 如果没有VV,用HV代替 flag_list[1] = 1 elif 'VV' == key: para_dic.update({'VV': in_tif_path}) flag_list[2] = 1 elif 'VH' == key: para_dic.update({'VH': in_tif_path}) flag_list[3] = 1 # elif 'inc_angle' == key: elif 'inc_angle' == key: para_dic.update({'LocalIncidenceAngle': in_tif_path}) flag_list[4] = 1 if flag_list != [1, 1, 1, 1, 1]: raise Exception('There are not tar_gz path: HH、HV(VV)、LocalIncidenceAngle or meta.xml in path:', tar_gz_path) self.processinfo = flag_list return para_dic def __create_work_space(self): """ 删除原有工作区文件夹,创建新工作区文件夹 """ self.__workspace_preprocessing_path = self.__workspace_path + EXE_NAME + r'\Temporary\preprocessing''\\' self.__workspace_preprocessed_path = self.__workspace_path + EXE_NAME + r'\Temporary\preprocessed''\\' self.__workspace_processing_path = self.__workspace_path + EXE_NAME + r'\Temporary\processing''\\' self.__workspace_block_tif_path = self.__workspace_path + EXE_NAME + r'\Temporary\blockTif''\\' self.__workspace_block_tif_processed_path = self.__workspace_path + EXE_NAME + r'\Temporary\blockTifProcessed''\\' self.__product_dic = self.__workspace_processing_path + 'product\\' path_list = [self.__workspace_preprocessing_path, self.__workspace_preprocessed_path, self.__workspace_processing_path, self.__workspace_block_tif_path, self.__workspace_block_tif_processed_path, self.__product_dic] file.creat_dirs(path_list) logger.info('create new workspace success!') def preprocess_handle(self): """ 预处理 """ para_names = ['HH', 'VV', 'VH', 'HV', 'Covering', 'NDVI', 'NDWI', 'LocalIncidenceAngle'] # LocalIncidenceAngle ref_img_name = 'HH' p = pp() self.__preprocessed_paras = p.preprocessing(para_names, ref_img_name, self.__processing_paras, self.__workspace_preprocessing_path, self.__workspace_preprocessed_path) self.__ref_img_path, self.__cols, self.__rows, self.__proj, self.__geo = p.get_ref_inf(self.__preprocessed_paras[ref_img_name]) logger.info('progress bar: 40%') def create_roi(self): """ 计算ROI掩膜 :return: 掩膜路径 """ processing_path = self.__workspace_processing_path # 利用角度为nan生成Mask pp.check_LocalIncidenceAngle(self.__preprocessed_paras['LocalIncidenceAngle'], self.__preprocessed_paras['LocalIncidenceAngle']) angle_nan_mask_path = processing_path + 'angle_nan_mask.tif' roi.trans_tif2mask(angle_nan_mask_path, self.__preprocessed_paras['LocalIncidenceAngle'], np.nan) # 利用影像的有效范围生成MASK tif_mask_path = processing_path + 'tif_mask.tif' roi.trans_tif2mask(tif_mask_path, self.__preprocessed_paras['HH'], 0, 0) # 0,0修改为np.nan tif_zero_mask_path = processing_path + 'tif_mask_zero.tif' roi.trans_tif2mask(tif_zero_mask_path, self.__preprocessed_paras['HH'], np.nan) # 利用cover计算植被覆盖范围 cover_mask_path = processing_path + 'cover_mask.tif' #alg.trans_tif2mask(cover_mask_path, self.__preprocessed_paras['Covering'], threshold_of_cover_min, threshold_of_cover_max) if self.__processing_paras['CoveringIDs'] == 'empty': cover_data = ImageHandler.get_data(self.__preprocessed_paras["Covering"]) cover_data[np.where(np.isnan(cover_data))] = 0 cover_id_list = list(np.unique(cover_data)) else: cover_id_list = list(self.__processing_paras['CoveringIDs'].split(';')) cover_id_list = [int(num) for num in cover_id_list] roi.trans_cover2mask(cover_mask_path, self.__preprocessed_paras["Covering"], cover_id_list) # 利用NDVI计算裸土范围该指数的输出值在 -1.0 和 1.0 之间,大部分表示植被量, # 负值主要根据云、水和雪而生成 # 接近零的值则主要根据岩石和裸土而生成。 # 较低的(小于等于 0.1)NDVI 值表示岩石、沙石或雪覆盖的贫瘠区域。 # 中等值(0.2 至 0.3)表示灌木丛和草地 # 较高的值(0.6 至 0.8)表示温带雨林和热带雨林。 ndvi_mask_path = processing_path + 'ndvi_mask.tif' ndvi_scope = list(self.__processing_paras['NDVIScope'].split(';')) threshold_of_ndvi_min = float(ndvi_scope[0]) threshold_of_ndvi_max = float(ndvi_scope[1]) roi.trans_tif2mask(ndvi_mask_path, self.__preprocessed_paras['NDVI'], threshold_of_ndvi_min, threshold_of_ndvi_max) logger.info('create masks success!') # 利用覆盖范围和裸土范围 生成MASK bare_land_mask_path = processing_path + 'bare_land_mask.tif' roi.combine_mask(bare_land_mask_path, cover_mask_path, ndvi_mask_path) roi.combine_mask(bare_land_mask_path, bare_land_mask_path, tif_mask_path) roi.combine_mask(bare_land_mask_path, bare_land_mask_path, tif_zero_mask_path) roi.combine_mask(bare_land_mask_path, bare_land_mask_path, angle_nan_mask_path) logger.info('combine_mask success!') # 计算roi区域 roi.cal_roi(self.__preprocessed_paras['LocalIncidenceAngle'], self.__preprocessed_paras['LocalIncidenceAngle'], bare_land_mask_path, background_value=1) shutil.copyfile(bare_land_mask_path, self.__workspace_preprocessed_path + 'mask.tif') logger.info('create ROI image success!') return bare_land_mask_path def process_handle(self,start): """ 算法主处理函数 :return: True or False """ radar_center_frequency = MetaDataHandler.get_RadarCenterFrequency(self.__processing_paras['Origin_META']) # lamda = MetaDataHandler.get_lamda(self.__processing_paras['Origin_META']) # * 100 #单位转为cm -> 修改 m, 陈增辉 2023.08.29 lamda = float(299792458 / radar_center_frequency) # k=2*3.1415926/lamda # 1/m --- ? # 计算ROI区域 bare_land_mask_path = self.create_roi() logger.info('progress bar: 50%') para_names = ['HH', 'VV', 'VH', 'HV'] for i in para_names: if os.path.exists(self.__preprocessed_paras[i]): lee_path = os.path.join(self.__workspace_preprocessed_path, os.path.basename(self.__preprocessed_paras[i]).split(".")[0] + '_lee.tif') Filter().lee_process_sar(self.__preprocessed_paras[i], lee_path, 3, 0.25) logger.info('lee process finish: ' + self.__preprocessed_paras[i]) os.remove(self.__preprocessed_paras[i]) self.__preprocessed_paras.update({i: lee_path}) # 分块 bp = BlockProcess() # block_size = bp.get_block_size(self.__rows, self.__cols,block_size_config) block_size = bp.get_block_size(self.__rows, self.__cols) bp.cut(self.__workspace_preprocessed_path, self.__workspace_block_tif_path, ['tif', 'tiff'], 'tif', block_size) logger.info('blocking tifs success!') img_dir, img_name = bp.get_file_names(self.__workspace_block_tif_path, ['tif']) dir_dict = bp.get_same_img(img_dir, img_name) angle_list, hh_list, vv_list,vh_list, hv_list, ndwi_list, mask_list = None, None, None, None, None, None, None for key in dir_dict.keys(): tmp = key.split('_', 2)[0] if tmp == 'LocalIncidenceAngle': angle_list = dir_dict[key] elif tmp == 'HH': hh_list = dir_dict[key] elif tmp == 'VV': vv_list = dir_dict[key] elif tmp == 'VH': vh_list = dir_dict[key] elif tmp == 'HV': hv_list = dir_dict[key] elif tmp == 'NDWI': ndwi_list = dir_dict[key] elif tmp == 'mask': mask_list = dir_dict[key] if angle_list is None or hh_list is None or vv_list is None or vh_list is None or hv_list is None or ndwi_list is None: logger.info('angle_list is None or hh_list is None or vv_list is None or vh_list is None or hv_list is None or ndwi_list is None') return False if not len(angle_list) == len(hh_list) == len(vv_list) == len(vh_list) == len(hv_list) == len(ndwi_list): logger.info('[len(angle_list) == len(hh_list) == len(vv_list) == len(vh_list) == len(hv_list) == len(ndwi_list)] is False!') return False processes_num = min([len(angle_list), multiprocessing_num, multiprocessing.cpu_count() - 1]) # f = Filter() # f.lee_filter_multiprocess(hh_list, hh_list, FILTER_SIZE, processes_num) # f.lee_filter_multiprocess(vv_list, vv_list, FILTER_SIZE, processes_num) # f.lee_filter_multiprocess(vh_list, vh_list, FILTER_SIZE, processes_num) # f.lee_filter_multiprocess(hv_list, hv_list, FILTER_SIZE, processes_num) # 开启多进程处理 pool = multiprocessing.Pool(processes=processes_num) pl = [] for i in range(len(angle_list)): suffix = bp.get_suffix(os.path.basename(angle_list[i])) # 利用NDWI计算VWCs vwc_path = self.__workspace_block_tif_processed_path + 'vwc' + suffix alg.cal_vwc(vwc_path, ndwi_list[i], self.__processing_paras['e1'], self.__processing_paras['e2']) # # 利用VWC、入射角 计算裸土后向散射系数 hh_bare_soil_bsc_path = self.__workspace_block_tif_processed_path + 'hh_bare_soil_bsc' + suffix vv_bare_soil_bsc_path = self.__workspace_block_tif_processed_path + 'vv_bare_soil_bsc' + suffix vh_bare_soil_bsc_path = self.__workspace_block_tif_processed_path + 'vh_bare_soil_bsc' + suffix hv_bare_soil_bsc_path = self.__workspace_block_tif_processed_path + 'hv_bare_soil_bsc' + suffix flag = alg.cal_bare_soil_bsc(hh_bare_soil_bsc_path, hh_list[i], vwc_path, angle_list[i], self.__processing_paras['Chh'], self.__processing_paras['Dhh']) # 裸土后向散射系统(线性值) if flag is False: return False flag = alg.cal_bare_soil_bsc(vv_bare_soil_bsc_path, vv_list[i], vwc_path, angle_list[i], self.__processing_paras['Cvv'], self.__processing_paras['Dvv']) if flag is False: return False flag = alg.cal_bare_soil_bsc(vh_bare_soil_bsc_path, vh_list[i], vwc_path, angle_list[i], self.__processing_paras['Cvv'], self.__processing_paras['Dvv']) if flag is False: return False flag = alg.cal_bare_soil_bsc(hv_bare_soil_bsc_path, hv_list[i], vwc_path, angle_list[i], self.__processing_paras['Cvv'], self.__processing_paras['Dvv']) if flag is False: return False # 利用后向散射系数,计算土壤含水量 soil_moisture_path = self.__workspace_block_tif_processed_path + 'soil_moisture' + suffix # pl.append(pool.apply_async(alg.cal_soil_moisture, (soil_moisture_path, hh_bare_soil_bsc_path, vv_bare_soil_bsc_path, # angle_list[i], mask_list[i], lamda, radar_center_frequency, self.__processing_paras['T'], # self.__processing_paras['Pb'], self.__processing_paras['vsand'], self.__processing_paras['vclay'],i))) # dobsen pl.append(pool.apply_async(alg.cal_soil_moisture, (soil_moisture_path, hh_bare_soil_bsc_path, vv_bare_soil_bsc_path, angle_list[i], mask_list[i], lamda, radar_center_frequency, 0, 0,0, 0,i))) # topp # pl.append((pool.apply_async(alg.cal_soilM, (soil_moisture_path, hh_list[i], vv_list[i], hv_list[i], vh_list[i], angle_list[i], mask_list[i], lamda)))) # oh 2004 # alg.cal_soilM(soil_moisture_path, hh_list[i], vv_list[i], hv_list[i], vh_list[i], angle_list[i], mask_list[i], lamda) logger.info('total:%s, block:%s calculating soil_moisture!', len(angle_list), i) pool.close() pool.join() logger.info(pl) logger.info('progress bar: 80%') # 合并处理后的影像 data_dir = self.__workspace_block_tif_processed_path w = ImageHandler.get_img_width(self.__preprocessed_paras['HH']) h = ImageHandler.get_img_height(self.__preprocessed_paras['HH']) out_path = self.__workspace_processing_path[0:-1] tif_type = bp.get_tif_dtype(self.__preprocessed_paras['HH']) bp.combine(data_dir, w, h, out_path, file_type=['tif'], datetype=tif_type) # 添加地理信息 soil_moisture_path = self.__workspace_processing_path + 'soil_moisture.tif' bp.assign_spatial_reference_byfile(self.__preprocessed_paras['HH'], soil_moisture_path) # 生成roi区域 SrcImageName = os.path.split(self.__input_paras["DualPolarSAR"]['ParaValue'])[1].split('.tar.gz')[ 0] + tar + '.tif' product_path = os.path.join(self.__product_dic, SrcImageName) roi.cal_roi(product_path, soil_moisture_path, bare_land_mask_path, background_value=np.nan) logger.info('cal soil_moisture success!') proj, geos, data = self.imageHandler.read_img(product_path) data[data < soil_moisture_value_min] = soil_moisture_value_min data[data > soil_moisture_value_max] = soil_moisture_value_max self.imageHandler.write_img(product_path, proj, geos, data) # 生成快视图 self.imageHandler.write_quick_view(product_path) # 生成元文件案例 xml_path = "./model_meta.xml" tem_folder = self.__workspace_path + EXE_NAME + r"\Temporary""\\" image_path = product_path out_path1 = os.path.join(tem_folder, "trans_geo_projcs.tif") out_path2 = os.path.join(tem_folder, "trans_projcs_geo.tif") model_xml_path = os.path.join(tem_folder, "creat_standard.meta.xml") # 输出xml路径 # id_min = 0 # id_max = 1000 # threshold_of_ndvi_min = 0 # threshold_of_ndvi_max = 1 # set_threshold = [id_max, id_min, threshold_of_ndvi_min, threshold_of_ndvi_max] # par_dict = CreateDict(image_path, self.processinfo, out_path1, out_path2).calu_nature(start) # CreateStadardXmlFile(xml_path, self.alg_xml_path, par_dict, set_threshold, model_xml_path).create_standard_xml() SrcImagePath = self.__input_paras["DualPolarSAR"]['ParaValue'] paths = SrcImagePath.split(';') SrcImageName=os.path.split(paths[0])[1].split('.tar.gz')[0] # if len(paths) >= 2: # for i in range(1, len(paths)): # SrcImageName=SrcImageName+";"+os.path.split(paths[i])[1].split('.tar.gz')[0] # meta_xml_path = self.__product_dic+EXE_NAME+"Product.meta.xml" # CreateMetafile(self.__processing_paras['META'], self.alg_xml_path, model_xml_path, meta_xml_path).process(SrcImageName) model_path = "./product.xml" meta_xml_path = os.path.join(self.__product_dic, SrcImageName + tar + ".meta.xml") para_dict = CreateMetaDict(image_path, self.__processing_paras['Origin_META'], self.__product_dic, out_path1, out_path2).calu_nature() para_dict.update({"imageinfo_ProductName": "土壤水分产品"}) para_dict.update({"imageinfo_ProductIdentifier": "SoilMositure"}) para_dict.update({"imageinfo_ProductLevel": productLevel}) para_dict.update({"ProductProductionInfo_BandSelection": "1,2"}) CreateProductXml(para_dict, model_path, meta_xml_path).create_standard_xml() temp_folder = os.path.join(self.__workspace_path, EXE_NAME, 'Output') out_xml = os.path.join(temp_folder, os.path.basename(meta_xml_path)) if os.path.exists(temp_folder) is False: os.mkdir(temp_folder) # CreateProductXml(para_dict, model_path, out_xml).create_standard_xml() shutil.copy(meta_xml_path, out_xml) # 文件夹打包 file.make_targz(self.__out_para, self.__product_dic) logger.info('process_handle success!') logger.info('progress bar: 100%') def del_temp_workspace(self): """ 临时工作区 """ if DEBUG is False: path = self.__workspace_path + EXE_NAME + r'\Temporary' if os.path.exists(path): file.del_folder(path) if __name__ == '__main__': multiprocessing.freeze_support() start = datetime.datetime.now() try: if len(sys.argv) < 2: xml_path = 'SoilMoisture.xml' else: xml_path = sys.argv[1] main_handler = MoistureMain(xml_path) if main_handler.check_source() is False: raise Exception('check_source() failed!') if main_handler.preprocess_handle() is False: raise Exception('preprocess_handle() failed!') if main_handler.process_handle(start) is False: raise Exception('process_handle() failed!') logger.info('successful production of ' + EXE_NAME + ' products!') except Exception: logger.exception('run-time error!') finally: # main_handler.del_temp_workspace() pass end = datetime.datetime.now() msg = 'running use time: %s ' % (end - start) logger.info(msg)