# -*- coding: UTF-8 -*- """ @Project :microproduct @File :PhenologyMain.py @Author :SHJ @Contact:土壤盐碱度算法主函数 @Date :2021/9/6 @Version :1.0.0 """ import logging import os import datetime import pyproj._compat import cv2 import numpy as np from osgeo import gdal, osr, ogr from tool.algorithm.algtools.PreProcess import PreProcess as pp from tool.algorithm.image.ImageHandle import ImageHandler from tool.algorithm.xml.AlgXmlHandle import ManageAlgXML, CheckSource, InitPara from tool.algorithm.algtools.logHandler import LogHandler from tool.algorithm.xml.CreatMetafile import CreateMetafile from VegetationPhenologyXmlInfo import CreateDict, CreateStadardXmlFile from tool.file.fileHandle import fileHandle import sys from tool.algorithm.transforml1a.transHandle import TransImgL1A from tool.csv.csvHandle import csvHandle from tool.algorithm.polsarpro.createfeature import CreateFeature import multiprocessing from tool.algorithm.ml.machineLearning import MachineLeaning as ml from tool.config.ConfigeHandle import Config as cf csvh = csvHandle() if cf.get('debug') == 'True': DEBUG = True else: DEBUG = False EXE_NAME = cf.get('exe_name') LogHandler.init_log_handler('run_log\\' + EXE_NAME) logger = logging.getLogger("mylog") FILTER_SIZE = int(cf.get('filter_size')) file =fileHandle(DEBUG) env_str = os.path.split(os.path.realpath(__file__))[0] os.environ['PROJ_LIB'] = env_str class PhenologyMain: """ 土壤粗糙度处理主函数 """ def __init__(self, alg_xml_path): self.alg_xml_path = alg_xml_path self.imageHandler = ImageHandler() self.__alg_xml_handler = ManageAlgXML(alg_xml_path) self.__check_handler = CheckSource(self.__alg_xml_handler) self.__workspace_path, self.__out_para = None, None self.__input_paras, self.__processing_paras, self.__preprocessed_paras = {}, {}, {} self.processinfo = [1, 1, 1, 1] self._name_tr_dic = {} #l1a转换geo的对象 def check_source(self): """ 检查算法相关的配置文件,图像,辅助文件是否齐全 """ self._env_str = os.getcwd() logger.info("sysdir: %s", self._env_str) self.__check_handler.check_alg_xml() self.__check_handler.check_run_env() # 检查景影像是否为全极化 self.__input_paras = self.__alg_xml_handler.get_input_paras() if self.__check_handler.check_input_paras(self.__input_paras) is False: return False self.__workspace_path = self.__alg_xml_handler.get_workspace_path() self.__create_work_space() self.__processing_paras = InitPara.init_processing_paras(self.__input_paras) self.__processing_paras.update(InitPara(DEBUG).get_mult_tar_gz_infs(self.__processing_paras, self.__workspace_preprocessing_path)) self.__out_para = os.path.join(self.__workspace_path, EXE_NAME, 'Output', r"VegetationPhenologyProduct.tar.gz") self.__alg_xml_handler.write_out_para("VegetationPhenologyProduct", self.__out_para) #写入输出参数 logger.info('check_source success!') logger.info('progress bar: 10%') return True def __create_work_space(self): """ 删除原有工作区文件夹,创建新工作区文件夹 """ self.__workspace_preprocessing_path = self.__workspace_path + EXE_NAME + '\\Temporary\\preprocessing\\' self.__workspace_preprocessed_path = self.__workspace_path + EXE_NAME + '\\Temporary\\preprocessed\\' self.__workspace_processing_path = self.__workspace_path + EXE_NAME + '\\Temporary\\processing\\' self.__product_dic = self.__workspace_processing_path + 'product\\' path_list = [self.__workspace_preprocessing_path, self.__workspace_preprocessed_path, self.__workspace_processing_path, self.__product_dic] file.creat_dirs(path_list) logger.info('create new workspace success!') def del_temp_workspace(self): """ 临时工作区 """ if DEBUG is True: return path = self.__workspace_path + EXE_NAME + r'\Temporary' if os.path.exists(path): file.del_folder(path) def preprocess_single_tar(self,name,scopes_roi): key_list = [key for key in self.__processing_paras.keys() if((name in key) and ('inc_angle' not in key) and ('LocalIncidenceAngle' not in key)and ('pola' not in key))] ori_sim_key = [key for key in key_list if ('ori_sim' in key)][0] ori_sim_path = self.__processing_paras[ori_sim_key] tr = TransImgL1A(ori_sim_path, scopes_roi) for k in key_list: out_path = os.path.join(self.__workspace_preprocessed_path, k + "_preprocessed.tif") tr.cut_L1A(self.__processing_paras[k], out_path) self.__preprocessed_paras.update({k: out_path}) self._name_tr_dic.update({name: tr}) def preprocess_handle(self): """ 预处理 """ scopes_roi = pp().cal_scopes_roi(self.__processing_paras) for name in self.__processing_paras['name_list']: self.preprocess_single_tar(name, scopes_roi) logger.info('preprocess_handle success!') logger.info('progress bar: 10%') def gene_test_set(self, test_data, features_path_dic, name): # 生成测试集 X_test_dic = {} # 同时测试多张影像 for tif_name in features_path_dic: feature_array = self.imageHandler.get_data(features_path_dic[tif_name]) dim = feature_array.shape[0] row_list = [] col_list = [] x_test = np.empty(shape=(0, dim)) for data in test_data: if data[0] == tif_name: row, col = zip(*data[2]) row_list = row_list + list(row) col_list = col_list + list(col) x_test = np.vstack((x_test, feature_array[:, row, col].T)) if row_list != []: X_test_dic.update({tif_name: [x_test, row_list, col_list]}) return X_test_dic def get_name_rows_cols(self, name): key_list = [key for key in self.__preprocessed_paras.keys() if ((name in key) and ('inc_angle' not in key) and ('LocalIncidenceAngle' not in key))] for key in key_list: if "_HH" in key: cols = self.imageHandler.get_img_width(self.__preprocessed_paras[key]) rows = self.imageHandler.get_img_height(self.__preprocessed_paras[key]) return rows, cols def create_feature_single_tar(self, name): key_list = [key for key in self.__preprocessed_paras.keys() if((name in key) and ('inc_angle' not in key) and ('LocalIncidenceAngle' not in key))] ori_sim_key = [key for key in key_list if ('ori_sim' in key)][0] ori_sim_path = self.__preprocessed_paras[ori_sim_key] # 读取实测值,获取多边形区域内所有的点,分为训练集数据和测试集数据 train_data, test_data, type_map = csvh.trans_VegePhenology_measdata_dic(csvh.readcsv(self.__processing_paras['MeasuredData']), ori_sim_path) logger.info("read phenology Measure.csv success!") # 特征分解 hh_hv_vh_vv_list = ["","","",""] for key in key_list: if "_HH" in key : hh_hv_vh_vv_list[0] = self.__preprocessed_paras[key] if "_HV" in key: hh_hv_vh_vv_list[1] = self.__preprocessed_paras[key] if "_VH" in key: hh_hv_vh_vv_list[2] = self.__preprocessed_paras[key] if "_VV" in key: hh_hv_vh_vv_list[3] = self.__preprocessed_paras[key] cols = self.imageHandler.get_img_width(hh_hv_vh_vv_list[0]) rows = self.imageHandler.get_img_height(hh_hv_vh_vv_list[0]) feature_dir = CreateFeature.decompose_single_tar(hh_hv_vh_vv_list, self.__workspace_processing_path, self.__workspace_preprocessing_path, name,self._env_str, rows, cols, FILTER_SIZE=3, debug=DEBUG) # 获取训练集提取特征的信息 ids = [] class_ids = [] ch_names = [] positions = [] for data in train_data: if data[0] == name: class_ids.append(data[1]) positions.append(data[2]) ch_names.append(data[3]) class_id = [map for map in type_map if (data[1] == map[1])][0] ids.append(class_id[0]) train_data_dic = {} train_data_dic.update({"ids": ids}) train_data_dic.update({"class_ids": class_ids}) train_data_dic.update({"ch_names": ch_names}) train_data_dic.update({"positions": positions}) name_test_data =[] for dt in test_data: if dt[0] == name: name_test_data.append(dt) return feature_dir, train_data_dic, name_test_data, type_map def process_handle(self, start): """ 算法主处理函数 :return: True or False """ # 生成每个时相的特征, 并提取训练集和测试集 # 每个时相的影像生成特征图 X_train, Y_train = None, None flag = True total_name_list = [] test_data = [] X_test_dic = {} for name in self.__processing_paras['name_list']: feature_dir, train_data_dic, test_data_part, type_map = self.create_feature_single_tar(name) #生成训练集 X_train_part, Y_train_part = ml.gene_train_set(train_data_dic, feature_dir) name_list = ml.get_name_list(feature_dir) # 生成测试集合 rows, cols = self.get_name_rows_cols(name) name_featuresPath_dic_part = ml.vegetationPhenology_combine_feature(feature_dir, self.__workspace_processing_path, name, rows, cols, DEBUG) X_test_dic_part = self.gene_test_set(test_data_part, name_featuresPath_dic_part, name) X_test_dic.update(X_test_dic_part) if flag: X_train = X_train_part Y_train = Y_train_part total_name_list = name_list flag = False test_data = test_data_part else: X_train = np.vstack((X_train, X_train_part)) Y_train = np.hstack((Y_train, Y_train_part)) total_name_list = total_name_list + name_list test_data = test_data + test_data_part logger.info("create_features success!") logger.info('progress bar: 20%') # if DEBUG: # optimal_feature = [1, 2, 3] # optimal_X_train = X_train[:, optimal_feature] # optimal_Y_train = Y_train # optimal_X_train = optimal_X_train[0:100, :] # optimal_Y_train = optimal_Y_train[0:100] # optimal_Y_train[0:100] = 1 # optimal_Y_train[50:100] = 2 # else: optimal_X_train, optimal_Y_train, optimal_feature = ml.sel_optimal_feature(X_train, Y_train, total_name_list, correlation_threshold=0.7) logger.info("generate train and test set success!") logger.info('progress bar: 30%') #RF clf = ml.trainRF(optimal_X_train, optimal_Y_train) logger.info('svm train success!') logger.info('progress bar: 80%') # 测试数据 logger.info('mode testing') product_path = self.predict(clf, X_test_dic, optimal_feature, type_map, start) logger.info('mode test success!') self.create_meta_file(product_path) # 文件夹打包 file.make_targz(self.__out_para, self.__product_dic) logger.info('progress bar: 100%') def predict(self, mode, X_test_dic, validity_list, type_map, start): # 测试数据 clf = mode for img_name, value in X_test_dic.items(): X_test = value[0] row_list = value[1] col_list = value[2] # a = X_test[:, validity_list] # Y_test = clf.predict(a) Y_test = clf.predict(X_test[:, validity_list]) # for type in type_map: # train_type = type[0] # type_id = type[1] # Y_test[np.where(Y_test == train_type)] = type_id # rows, cols = self.get_name_rows_cols(img_name) # array = np.zeros((rows, cols), dtype='float16') # array[row_list, col_list] = Y_test # array[np.where(array == 0)] = np.nan # product_path = self.__workspace_processing_path + img_name + '_VegetationPhenologyProduct.tif' # product_geo_path = self.__product_dic + img_name + '_VegetationPhenologyProduct.tif' # int_16_array = np.array(array, dtype=np.int16) rows, cols = self.get_name_rows_cols(img_name) array = np.zeros((rows, cols), dtype='float16') array[row_list, col_list] = Y_test array[np.where(array == 0)] = np.nan temp_path = self.__workspace_processing_path + img_name + '_temp.tif' self.imageHandler.write_img(temp_path, '', [0, 0, 0, 0, 0, 0], array) proj, geo, cover_data = self.imageHandler.read_img(temp_path) cover_data = np.uint8(cover_data) kernel = np.ones((3, 3), np.uint8) cover_data = cv2.erode(cv2.dilate(cover_data, kernel), kernel) for type in type_map: train_type = type[0] type_id = type[1] cover_data[np.where(cover_data == train_type)] = type_id product_path = self.__workspace_processing_path + img_name + '_VegetationPhenologyProduct.tif' product_geo_path = self.__product_dic + img_name + '_VegetationPhenologyProduct.tif' # int_16_array = np.array(array, dtype=np.int16) ImageHandler.write_img(product_path, '', [0, 0, 0, 0, 0, 0], cover_data) tr = self._name_tr_dic[img_name] # l1a图像坐标转换地理坐标 tr.l1a_2_geo_int(self.__preprocessed_paras[img_name + '_ori_sim'], product_path, product_geo_path, 'nearest') # 生成快视图 self.imageHandler.write_quick_view(product_geo_path, color_img=True) return product_geo_path # def test_resamp(): # # 形态学(闭运算)去roi区域噪点 # # cover_data = np.uint8(cover_data) # # kernel = np.ones((5, 5), np.uint8) # # cover_data = cv2.erode(cv2.dilate(cover_data, kernel), kernel) # pass def create_meta_file(self, product_path): # 生成元文件案例 xml_path = "./model_meta.xml" tem_folder = self.__workspace_path + EXE_NAME + r"\Temporary""\\" image_path = product_path out_path1 = os.path.join(tem_folder, "trans_geo_projcs.tif") out_path2 = os.path.join(tem_folder, "trans_projcs_geo.tif") par_dict = CreateDict(image_path, self.processinfo, out_path1, out_path2).calu_nature(start) model_xml_path = os.path.join(tem_folder, "creat_standard.meta.xml") # 输出xml路径 CreateStadardXmlFile(xml_path, self.alg_xml_path, par_dict, model_xml_path).create_standard_xml() SrcImagePath = self.__input_paras["AHVS"]['ParaValue'] paths = SrcImagePath.split(';') SrcImageName = os.path.split(paths[0])[1].split('.tar.gz')[0] if len(paths) >= 2: for i in range(1, len(paths)): SrcImageName = SrcImageName + ";" + os.path.split(paths[i])[1].split('.tar.gz')[0] meta_xml_path = os.path.join(self.__product_dic, EXE_NAME + "Product.meta.xml") self.type_id_name = csvh.vegePhenology_class_list(self.__processing_paras['MeasuredData']) CreateMetafile(self.__processing_paras['META'], self.alg_xml_path, model_xml_path, meta_xml_path).process2( self.type_id_name, None, SrcImageName) if __name__ == '__main__': multiprocessing.freeze_support() start = datetime.datetime.now() try: if len(sys.argv) < 2: xml_path = 'VegetationPhenology.xml' else: xml_path = sys.argv[1] main_handler = PhenologyMain(xml_path) if main_handler.check_source() is False: raise Exception('check_source() failed!') if main_handler.preprocess_handle() is False: raise Exception('preprocess_handle() failed!') if main_handler.process_handle(start) is False: raise Exception('process_handle() failed!') logger.info('successful production of ' + EXE_NAME + ' products!') except Exception: logger.exception("run-time error!") finally: main_handler.del_temp_workspace() end = datetime.datetime.now() msg = 'running use time: %s ' % (end - start) logger.info(msg)