# -*- coding: UTF-8 -*- """ @Project :microproduct @File :AlgXmlHandle.py @Function :算法描述文件读写和检查 @Contact :https://www.cnblogs.com/feifeifeisir/p/10893127.html @Author :SHJ @Date :2021/9/6 @Version :1.0.0 """ import logging from xml.etree.ElementTree import ElementTree from tool.algorithm.block.blockprocess import BlockProcess from tool.algorithm.image.ImageHandle import ImageHandler from tool.file.fileHandle import fileHandle import os import re import platform import psutil import multiprocessing import ctypes logger = logging.getLogger("mylog") import glob class ManageAlgXML: """ 检查和读取XML文件信息 """ def __init__(self, xml_path): self.in_path = xml_path self.__tree = ElementTree() self.__root = None self.__alg_compt = None self.__workspace_path = None self.__taskID = None self.__algorithm_name = None self.__envs = {} self.__input_paras = {} self.__output_paras = {} self.__init_flag = False def init_xml(self): """ 初始化XML文件 :return: True:初始化成功 False: 初始化失败 """ try: self.__tree.parse(self.in_path) except FileNotFoundError as ex: msg = ex + "xml_path = " + self.in_path raise Exception(msg) except BaseException: raise Exception("cannot open algXMl") self.__root = self.__tree.getroot() if self.__root is None: raise Exception("get root failed") self.__alg_compt = self.__root.find("AlgCompt") if self.__alg_compt is None: raise Exception("get AlgCompt failed") self.__workspace_path = self.__check_workspace_path() if self.__workspace_path is None: raise Exception("check workspace_path failed") self.__taskID = self.__check_task_id() if self.__taskID is None: raise Exception("check taskID failed") self.__algorithm_name = self.__check_algorithm_name() if self.__algorithm_name is None: raise Exception("check AlgorithmName failed") self.__envs = self.__check_environment() if self.__envs is None or self.__envs == {}: raise Exception("check environment failed") self.__input_paras = self.__check_input_para() if self.__input_paras is None or self.__input_paras == {}: raise Exception("check input para failed") self.__output_paras = self.__check_output_para() self.__init_flag = True return True def get_workspace_path(self): """ 获取工作空间路径 :return: 工作空间路径, None-异常 """ if not self.__init_flag: raise Exception("XML is not initialized") return self.__workspace_path def get_task_id(self): """ 获取任务ID :return: taskID, None-异常 """ if not self.__init_flag: raise Exception("XML is not initialized") return self.__taskID def get_algorithm_name(self): """ 获取算法名 :return: """ if not self.__init_flag: raise Exception("AlgorithmName is not initialized") return self.__algorithm_name def get_envs(self): """ 获取运行环境要求 :return:运行环境要求, None-异常 """ if not self.__init_flag: raise Exception("XML is not initialized") return self.__envs def get_input_paras(self): """ 获取输入参数 :return:输入参数, None-异常 """ if not self.__init_flag: raise Exception("XML is not initialized") return self.__input_paras def get_output_paras(self): """ 获取输出参数 :return:输出参数, None-异常 """ if not self.__init_flag: raise Exception("XML is not initialized") return self.__output_paras def __check_workspace_path(self): """ 检查工作空间路径 :return: 工作空间路径, None-异常 """ workspace_note = self.__root.find("WorkSpace") workspace_path = str(workspace_note.text).replace("\n", "").replace(' ', '') #去除空格和回车 if workspace_path is None: raise Exception("'workspace_path' is None") if not os.path.isdir(workspace_path): raise Exception("'workspace_path' is not save:%s",workspace_path) if workspace_path[-1] != '\\': workspace_path += "'\'" return workspace_path def __check_environment(self): """ 检查XML文件中运行环境要求 :return: dic-运行环境要求, None-异常 """ env_note = self.__alg_compt.find("Environment") is_cluster = int(env_note.find("IsCluster").text.replace("\n", "").replace(' ', '')) is_legal = is_cluster in [0, 1] if not is_legal: raise Exception("IsCluster is not 0 or 1") cluster_num = int(env_note.find("ClusterNum").text) is_legal = cluster_num in [0, 1, 2, 3, 4, 5, 6, 7] if not is_legal: raise Exception("cluster_num is beyond [0,1,2,3,4,5,6,7]") operating_system = env_note.find("OperatingSystem").text.replace("\n", "").replace(' ', '') #去除空格和回车 # is_legal = operating_system in ["Windows10", "Windows7", "WindowsXP"] # if not is_legal: # raise Exception("OperatingSystem is beyond [Windows10, Windows7, WindowsXP]") cpu = env_note.find("CPU").text.replace("\n", "").replace(' ', '') #去除空格和回车 is_legal = cpu in ["单核", "双核", "3核", "4核", "6核", "8核"] if not is_legal: raise Exception("OperatingSystem is beyond [单核, 双核, 3核, 4核, 6核, 8核]") memory = env_note.find("Memory").text.replace("\n", "").replace(' ', '') #去除空格和回车 is_legal = memory in ["1GB", "2GB", "4GB", "6GB", "8GB", "10GB", "12GB", "16GB"] # if not is_legal: # raise Exception("OperatingSystem is beyond [1GB, 2GB, 4GB, 6GB, 8GB, 10GB, 12GB, 16GB]") storage = env_note.find("Storage").text.replace("\n", "").replace(' ', '') #去除空格和回车 is_legal = int(storage[:-2]) > 0 if not is_legal: raise Exception("Storage < 0GB") network_card = env_note.find("NetworkCard").text # is_legal = network_card in ["无需求"] # if not is_legal: # # 输出异常 # return band_width = env_note.find("Bandwidth").text # is_legal = band_width in ["无需求"] # if not is_legal: # # 输出异常 # return gpu = env_note.find("GPU").text # is_legal = GPU in ["无需求"] # if not is_legal: # # 输出异常 # return envs = {"is_Cluster": is_cluster, "cluster_num": cluster_num, "operating_system": operating_system, "CPU": cpu, "memory": memory} envs.update({"Storage": storage, "network_card": network_card, "band_width": band_width, "GPU": gpu}) return envs def __check_input_para(self): """ 检查XML文件中输入参数 :return: dic-输入参数, None-异常 """ input_paras_note = self.__alg_compt.find("Inputs") paras_num = int(input_paras_note.attrib.get("ParameterNum")) para_list = input_paras_note.findall("Parameter") if paras_num != len(para_list): msg ="'ParameterNum':"+ str(paras_num) + " != number of 'Parameter':" + str(len(para_list)) logger.warning(msg) input_paras = {} for para in para_list: para_name = para.find("ParaName").text.replace("\n", "").replace(' ', '') #去除空格和回车 para_chs_name = para.find("ParaChsName").text.replace("\n", "").replace(' ', '') #去除空格和回车 para_type = para.find("ParaType").text.replace("\n", "").replace(' ', '') #去除空格和回车 data_type = para.find("DataType").text.replace("\n", "").replace(' ', '') #去除空格和回车 para_value = para.find("ParaValue").text.replace("\n", "").replace(' ', '') #去除空格和回车 input_para = {"ParaName": para_name, "ParaChsName": para_chs_name, "ParaType": para_type, "DataType": data_type, "ParaValue": para_value} #print(para_name) if para_type == "Value": # max_value = para.find("MaxValue").text # min_value = para.find("MinValue").text # option_value = para.find("OptionValue").text.replace("\n", "").replace(' ', '') #去除空格和回车 # input_para.update({"MaxValue": max_value, "MinValue": min_value, "OptionValue": option_value}) # input_para.update({"OptionValue": option_value}) todo pass if para_name is None or para_type is None or para_value is None: msg = 'there is None among para_name:' + para_name + ',para_type:' + para_type + 'or para_value:' + para_value + '!' raise Exception(msg) input_paras.update({para_name: input_para}) return input_paras def __check_output_para(self): """ 检查XML文件中输出参数 :return: dic-输出参数, None-异常 """ output_paras_note = self.__alg_compt.find("Outputs") paras_num = int(output_paras_note.attrib.get("ParameterNum")) para_list = output_paras_note.findall("Parameter") if paras_num != len(para_list): raise Exception("'ParameterNum' != number of 'Parameter'") output_paras = {} return output_paras def write_out_para(self, para_name, para_value): """ 写入输出参数 """ output_paras_note = self.__alg_compt.find("Outputs") para_list = output_paras_note.findall("Parameter") flag = False for para in para_list: if para.find("ParaName").text == para_name: para.find("ParaValue").text = para_value flag = True if flag == False: raise Exception('Cannot find Output Parameter:'+para_name+'!') self.__tree.write(self.in_path, encoding="utf-8", xml_declaration=True) def __check_task_id(self): """ 检查任务ID :return: taskID, None-异常 """ task_id_note = self.__root.find("TaskID") task_id = str(task_id_note.text).replace("\n", "").replace(' ', '') #去除空格和回车 if task_id is None: raise Exception("'TaskID' is None") return task_id def __check_algorithm_name(self): algorithm_name_note = self.__alg_compt.find("AlgorithmName") algorithm_name = str(algorithm_name_note.text).replace("\n", "").replace(' ', '') #去除空格和回车 if algorithm_name is None: raise Exception("'AlgorithmName' is None") return algorithm_name class CheckSource: """ 检查配置文件中资源的完整性和有效性 """ def __init__(self, alg_xml_handle): self.__alg_xml_handle = alg_xml_handle self.imageHandler = ImageHandler() self.__ParameterDic={} def check_alg_xml(self): """ 检查算法配置文件 """ if self.__alg_xml_handle.init_xml(): logger.info('init algXML succeed') return True else: raise Exception('init algXML failed') def check_run_env(self): """ :return: True-正常,False-异常 """ envs = self.__alg_xml_handle.get_envs() # 检查操作系统 local_plat = platform.platform() local_plat_list = local_plat.split("-") flag = envs['operating_system'] == local_plat_list[0]+local_plat_list[1] if flag is False: msg = 'operating_system:' + local_plat_list[0] + local_plat_list[1] + ' is not ' + envs['operating_system'] #raise Exception(msg) # 检查电脑显存 mem = psutil.virtual_memory() mem_total = int(round(mem.total / 1024 / 1024 / 1024, 0)) mem_free = round(mem.free / 1024 / 1024 / 1024, 0) env_memory = envs['memory'] env_memory = int(env_memory[:-2]) if env_memory > mem_total: msg = 'memory_total ' + str(mem_total) + ' less than'+str(env_memory) + 'GB' # raise Exception(msg) if env_memory >= mem_free: msg = 'mem_free ' + str(mem_free) + 'GB less than' + str(env_memory) + 'GB' logger.warning(msg) # 检查CPU核数 env_cpu = envs['CPU'] if env_cpu == "单核": env_cpu_core_num = 1 elif env_cpu == "双核": env_cpu_core_num = 2 elif env_cpu == "三核": env_cpu_core_num = 3 else: env_cpu_core_num = int(env_cpu[:-1]) local_cpu_core_num = int(multiprocessing.cpu_count()/2) if env_cpu_core_num > local_cpu_core_num: msg = 'CPU_core_num ' + str(local_cpu_core_num) + 'core less than' + str(env_cpu_core_num) + ' core' # raise Exception(msg) # 检查磁盘的内存 env_storage = envs['Storage'] env_storage = int(env_storage[:-2]) workspace_path = self.__alg_xml_handle.get_workspace_path() if not os.path.isdir(workspace_path): raise Exception('workspace_path:%s do not exist!', workspace_path) local_storage = self.__get_free_space_mb(workspace_path) if env_storage > local_storage: msg = 'workspace storage ' + str(local_storage) + 'GB less than' + envs['Storage'] +"GB" # raise Exception(msg) return True @staticmethod def __get_free_space_mb(folder): """ :param folder:检查的路径 eg:'C:\\' :return: folder/drive free space (GB) """ if platform.system() == 'Windows': free_bytes = ctypes.c_ulonglong(0) ctypes.windll.kernel32.GetDiskFreeSpaceExW(ctypes.c_wchar_p(folder), None, None, ctypes.pointer(free_bytes)) return free_bytes.value / 1024 / 1024 / 1024 else: st = os.statvfs(folder) return st.f_bavail * st.f_frsize / 1024 / 1024 def check_input_paras(self, input_para_names): """ :param input_para_names :需要检查参数的名称列表[name1,name2,...] :return: 检测是否正常 """ workspace_path = self.__alg_xml_handle.get_workspace_path() input_paras = self.__alg_xml_handle.get_input_paras() for name in input_para_names: para = input_paras[name] if para is None: msg = "check para:"+name + " is failed!"+"para is None!" raise Exception(msg) if para['ParaType'] == 'File': if para['DataType'] == 'tif': if para['ParaValue'] != 'empty' and para['ParaValue'] != 'Empty'and para['ParaValue'] != '': para_value_list = para['ParaValue'].split(";") for para_value in para_value_list: para_path = para_value if self.__check_tif(para_path) is False: msg = "check para:"+name + " is failed!" + "Path:" + para_path raise Exception(msg) if para['DataType'] == 'xml': para_path = para['ParaValue'] if not os.path.exists(para_path): raise Exception('para_file:%s is inexistent!', para_path) if para['DataType'] == 'File': para_path = para['ParaValue'] if os.path.isdir(para_path) is False: msg = "check para:" + name + " is failed!" + "FilePath:" + para_path raise Exception(msg) if para["DataType"]=="ymal": para_path = para['ParaValue'] if os.path.isfile(para_path) is False: msg = "check para: " + name + " is failed! " + " FilePath: " + para_path raise Exception(msg) elif para['ParaType'] == 'Value': if para['DataType'] == 'float' or para['DataType'] == 'int' or para['DataType'] == 'double': if para['ParaValue'] is None: msg = "check para:"+name + " is failed!"+"'ParaValue' is None" raise Exception(msg) if self.__is_number(para['ParaValue']) is False: raise Exception("para:"+name+" is not number!") # if (para['MaxValue'] is not None) and (self.__is_number(para['MaxValue']) is True): # value = para['ParaValue'] # max = para['MaxValue'] # if float(value) > float(max): # msg = "para:" + name + " > max, para:" + value + "max:" + max # raise Exception(msg) # if (para['MinValue'] is not None) and (self.__is_number(para['MinValue']) is True): # value = para['ParaValue'] # min = para['MinValue'] # if float(value) < float(min): # msg = "para:" + name + " < min, para:" + value + "min:" + min # raise Exception(msg) self.__ParameterDic[name] = para['ParaValue'] __workspace_path = workspace_path __input_paras = input_paras return True, self.__ParameterDic def check_output_paras(self, output_para_names): """ :param output_para_names :需要检查参数的名称列表[name1,name2,...] :return: Ture or False """ workspace_path = self.__alg_xml_handle.get_workspace_path() output_paras = self.__alg_xml_handle.get_output_paras() for name in output_para_names: para = output_paras[name] #print(para) if para is None: msg = "check para:" + name + " is failed!" + "para is None!" raise Exception(msg) if para['ParaType'] == 'File': if para['DataType'] == 'tif': para_path = workspace_path + para['ParaValue'] para_dir = os.path.split(para_path) flag_isdir = os.path.isdir(para_dir[0]) flag_istif = (para_dir[1].split(".", 1)[1] == "tif") if flag_isdir and flag_istif is False: msg = "check para:" + name + " is failed!" + para_path + "is invalid!" raise Exception(msg) if para['DataType'] == 'File': para_path = workspace_path + para['ParaValue'] if os.path.isdir(para_path) is False: os.makedirs(para_path) if os.path.isdir(para_path) is False: msg = "check para:" + name + " is failed!" + para_path + "is invalid!" raise Exception(msg) return True @staticmethod def __is_number(str_num): """ :param str_num :检查str是否为float或者double :return: True or False """ if str_num[0] == '-': str_num = str_num[1:] pattern = re.compile(r'(.*)\.(.*)\.(.*)') if pattern.match(str_num): return False return str_num.replace(".", "").isdigit() def __check_tif(self, filename): """ :filename: 文件的路径 :return: True or False """ if self.imageHandler.get_dataset(filename) is None: msg = "read tif error!,finame: " + filename raise Exception(msg) return True class InitPara: def __init__(self,debug = False): self._debug = debug @staticmethod def init_processing_paras(input_paras, out_path): """ :param names:字典列表,每个字典为一个输入产品的配置信息 """ processing_paras = {} for name in input_paras: para = input_paras[name] if para is None: logger.error(name + "is None!") return False if para['ParaType'] == 'File': if para['DataType'] == 'tif' or para['DataType'] == 'csv': para_value_list = para['ParaValue'].split(";") if len(para_value_list) == 1: para_path = para['ParaValue'] if para_path != 'empty' and para_path != '': processing_paras.update({name: para_path}) else: for n, para_value in zip(range(len(para_value_list)), para_value_list): processing_paras.update({name+str(n): para_value}) elif para['DataType'] == 'tar.gz': paths = para['ParaValue'].split(';') for n, path in zip(range(len(paths)), paths): processing_paras.update({'sar_path' + str(n): path}) elif para['DataType'] == 'zip': # temp_para = para['ParaValue'].split(".")[0] para_value_list = para['ParaValue'].split(";") if len(para_value_list) == 1: para_path = para['ParaValue'] if para_path != 'empty' and para_path != '': file_path = BlockProcess.unzip_file(para_path, out_path) processing_paras.update({name: file_path}) else: for n, para_value_zip in zip(range(len(para_value_list)), para_value_list): file_path = BlockProcess.unzip_file(para_value_zip, out_path) processing_paras.update({name+str(n): file_path}) else: para_path = para['ParaValue'] processing_paras.update({name: para_path}) elif para['ParaType'] == 'Value': if para['DataType'] == 'float': value = float(para['ParaValue']) elif para['DataType'] == 'int': value = int(para['ParaValue']) else: # 默认string value = para['ParaValue'] processing_paras.update({name: value}) elif para['ParaType'] == 'String': value = para['ParaValue'] if value == 'empty': continue else: processing_paras.update({name: value}) return processing_paras # 获取文件夹内的文件 @staticmethod def get_tif_paths(file_dir,name): in_tif_paths = [] if os.path.exists(file_dir + name + '\\'): in_tif_paths = list(glob.glob(os.path.join(file_dir + name + '\\', '*.tif'))) in_tif_paths1 = list(glob.glob(os.path.join(file_dir + name + '\\', '*.tiff'))) if in_tif_paths1 != []: in_tif_paths = in_tif_paths + in_tif_paths1 else: in_tif_paths = list(glob.glob(os.path.join(file_dir, '*.tif'))) in_tif_paths1 = list(glob.glob(os.path.join(file_dir, '*.tiff'))) if in_tif_paths != []: in_tif_paths = in_tif_paths + in_tif_paths1 return in_tif_paths @staticmethod def get_tif_paths_new(file_dir, name): in_tif_paths = [] if os.path.exists(file_dir + name + '\\'): in_tif_paths = list(glob.glob(os.path.join(file_dir + name + '\\', '*.tif'))) in_tif_paths1 = list(glob.glob(os.path.join(file_dir + name + '\\', '*.tiff'))) if in_tif_paths1 != []: in_tif_paths = in_tif_paths + in_tif_paths1 else: in_tif_paths = list(glob.glob(os.path.join(file_dir, '*.tif'))) in_tif_paths1 = list(glob.glob(os.path.join(file_dir, '*.tiff'))) if len(in_tif_paths) == 0: in_tif_paths = in_tif_paths + in_tif_paths1 return in_tif_paths @staticmethod def get_polarization_mode(in_tif_paths): pol_dic = {} pola_list = [0,0,0,0] for in_tif_path in in_tif_paths: # 获取极化类型 if '_HH_' in os.path.basename(in_tif_path): pol_dic.update({'HH': in_tif_path}) pola_list[0] = 1 elif '_HV_' in os.path.basename(in_tif_path): pol_dic.update({'HV': in_tif_path}) pola_list[1] = 1 elif '_VH_' in os.path.basename(in_tif_path): pol_dic.update({'VH': in_tif_path}) pola_list[2] = 1 elif '_VV_' in os.path.basename(in_tif_path): pol_dic.update({'VV': in_tif_path}) pola_list[3] = 1 elif 'LocalIncidenceAngle' in os.path.basename(in_tif_path) or 'ncidenceAngle' in os.path.basename(in_tif_path): pol_dic.update({'LocalIncidenceAngle': in_tif_path}) elif 'inc_angle' in os.path.basename(in_tif_path): pol_dic.update({'inc_angle': in_tif_path}) elif 'inci_Angle-ortho' in os.path.basename(in_tif_path): pol_dic.update({'inci_Angle-ortho': in_tif_path}) elif 'LocalincidentAngle-ortho' in os.path.basename(in_tif_path): pol_dic.update({'LocalIncidentAngle-ortho': in_tif_path}) elif 'ori_sim' in os.path.basename(in_tif_path): pol_dic.update({'ori_sim': in_tif_path}) elif 'sim_ori' in os.path.basename(in_tif_path): pol_dic.update({'sim_ori': in_tif_path}) pol_dic.update({'pola':pola_list}) return pol_dic @staticmethod def get_meta_paths(file_dir, name): meta_xml_paths = [] if os.path.exists(file_dir + name + '\\'): meta_xml_paths = list(glob.glob(os.path.join(file_dir + name, '*.meta.xml'))) else: meta_xml_paths = list(glob.glob(os.path.join(file_dir, '*.meta.xml'))) if meta_xml_paths is None or meta_xml_paths == []: raise Exception('there is not .meta.xml in path: ', file_dir + '\\') return meta_xml_paths @staticmethod def get_incidence_xml_paths(file_dir, name): meta_xml_paths = [] if os.path.exists(file_dir + name + '\\'): meta_xml_paths = list(glob.glob(os.path.join(file_dir + name, '*.incidence.xml'))) else: meta_xml_paths = list(glob.glob(os.path.join(file_dir, '*.incidence.xml'))) if meta_xml_paths is None or meta_xml_paths == []: raise Exception('there is not .incidence.xml in path: ', file_dir + '\\') return meta_xml_paths @staticmethod def get_meta_dic(meta_xml_paths, name): para_dic = {} for mete_path in meta_xml_paths: if name in mete_path: para_dic.update({'META': mete_path}) if para_dic is {}: raise Exception('the name of .meta.xml is error!') return para_dic @staticmethod def get_incidence_dic(meta_xml_paths, name): para_dic = {} for mete_path in meta_xml_paths: if name in mete_path: para_dic.update({'Incidence': mete_path}) if para_dic is {}: raise Exception('the name of .incidence.xml is error!') return para_dic @staticmethod def get_meta_dic_new(meta_xml_paths, name): para_dic = {} for mete_path in meta_xml_paths: if name in os.path.basename(mete_path): para_dic.update({'META': mete_path}) else: para_dic.update({'Origin_META': mete_path}) if para_dic is {}: raise Exception('the name of .meta.xml is error!') return para_dic @staticmethod def get_meta_dic_VP(meta_xml_paths, name): para_dic = {} for mete_path in meta_xml_paths: if name in os.path.basename(mete_path): para_dic.update({name + '_META': mete_path}) else: para_dic.update({name + '_Origin_META': mete_path}) if para_dic is {}: raise Exception('the name of .meta.xml is error!') return para_dic def get_mult_tar_gz_inf(self,tar_gz_path, workspace_preprocessing_path): para_dic = {} name = os.path.split(tar_gz_path)[1].rstrip('.tar.gz') para_dic.update({'name': name}) file_dir = os.path.join(workspace_preprocessing_path, name + '\\') if self._debug == False: fileHandle().de_targz(tar_gz_path, file_dir) # 元文件字典 para_dic.update(InitPara.get_meta_dic_VP(InitPara.get_meta_paths(file_dir, name), name)) # tif路径字典 pol_dic = InitPara.get_polarization_mode(InitPara.get_tif_paths(file_dir, name)) parameter_path = os.path.join(file_dir, "orth_para.txt") para_dic.update({name + "paraMeter": parameter_path}) for key, in_tif_path in pol_dic.items(): para_dic.update({name + '_' + key: in_tif_path}) return para_dic def get_mult_tar_gz_infs(self,processing_paras, workspace_preprocessing_path): tif_names_list = [] tar_inf_dic = {} for key, value in processing_paras.items(): if 'sar_path' in key: para_dic = self.get_mult_tar_gz_inf(value, workspace_preprocessing_path) tif_names_list.append(para_dic['name']) para_dic.pop('name') tar_inf_dic.update(para_dic) tar_inf_dic.update({'name_list': tif_names_list}) return tar_inf_dic