microproduct/dem-C-SAR/AlgXmlHandle.py

505 lines
20 KiB
Python
Raw Permalink Normal View History

2024-01-02 08:10:36 +00:00
# -*- coding: UTF-8 -*-
"""
@Project microproduct
@File AlgXmlHandle.py
@Function 算法描述文件读写和检查
@Contact https://www.cnblogs.com/feifeifeisir/p/10893127.html
@Author SHJ
@Date 2021/9/6
@Version 1.0.0
"""
import logging
from xml.etree.ElementTree import ElementTree
import os
import re
import platform
import psutil
import multiprocessing
import ctypes
from tool.algorithm.image import ImageHandle
logger = logging.getLogger("mylog")
class ManageAlgXML:
"""
检查和读取XML文件信息
"""
def __init__(self, xml_path):
self.in_path = xml_path
self.__tree = ElementTree()
self.__root = None
self.__alg_compt = None
self.__workspace_path = None
self.__taskID = None
self.__envs = {}
self.__input_paras = {}
self.__output_paras = {}
self.__init_flag = False
def init_xml(self):
"""
初始化XML文件
:return: True初始化成功 False 初始化失败
"""
try:
self.__tree.parse(self.in_path)
except FileNotFoundError as ex:
raise Exception(ex)
except BaseException:
raise Exception("cannot open algXMl")
self.__root = self.__tree.getroot()
if self.__root is None:
raise Exception("get root failed")
self.__alg_compt = self.__root.find("AlgCompt")
if self.__alg_compt is None:
raise Exception("get AlgCompt failed")
self.__workspace_path = self.__check_workspace_path()
if self.__workspace_path is None:
raise Exception("check workspace_path failed")
self.__taskID = self.__check_task_id()
if self.__taskID is None:
raise Exception("check taskID failed")
self.__envs = self.__check_environment()
if self.__envs is None or self.__envs == {}:
raise Exception("check environment failed")
self.__input_paras = self.__check_input_para()
if self.__input_paras is None or self.__input_paras == {}:
raise Exception("check input para failed")
self.__output_paras = self.__check_output_para()
self.__init_flag = True
return True
def get_workspace_path(self):
"""
获取工作空间路径
:return: 工作空间路径 None-异常
"""
if not self.__init_flag:
raise Exception("XML is not initialized")
return self.__workspace_path
def get_task_id(self):
"""
获取任务ID
:return: taskID None-异常
"""
if not self.__init_flag:
raise Exception("XML is not initialized")
return self.__taskID
def get_envs(self):
"""
获取运行环境要求
:return:运行环境要求 None-异常
"""
if not self.__init_flag:
raise Exception("XML is not initialized")
return self.__envs
def get_input_paras(self):
"""
获取输入参数
:return:输入参数 None-异常
"""
if not self.__init_flag:
raise Exception("XML is not initialized")
return self.__input_paras
def get_output_paras(self):
"""
获取输出参数
:return:输出参数 None-异常
"""
if not self.__init_flag:
raise Exception("XML is not initialized")
return self.__output_paras
def __check_workspace_path(self):
"""
检查工作空间路径
:return: 工作空间路径 None-异常
"""
workspace_note = self.__root.find("WorkSpace")
workspace_path = str(workspace_note.text).replace("\n", "").replace(' ', '') #去除空格和回车
if workspace_path is None:
raise Exception("'workspace_path' is None")
if not os.path.isdir(workspace_path):
raise Exception("'workspace_path' is not save:%s",workspace_path)
if workspace_path[-1] != '\\':
workspace_path += "'\'"
return workspace_path
def __check_environment(self):
"""
检查XML文件中运行环境要求
:return: dic-运行环境要求 None-异常
"""
env_note = self.__alg_compt.find("Environment")
is_cluster = int(env_note.find("IsCluster").text.replace("\n", "").replace(' ', ''))
is_legal = is_cluster in [0, 1]
if not is_legal:
raise Exception("IsCluster is not 0 or 1")
cluster_num = int(env_note.find("ClusterNum").text)
is_legal = cluster_num in [0, 1, 2, 3, 4, 5, 6, 7]
if not is_legal:
raise Exception("cluster_num is beyond [0,1,2,3,4,5,6,7]")
operating_system = env_note.find("OperatingSystem").text.replace("\n", "").replace(' ', '') #去除空格和回车
# is_legal = operating_system in ["Windows10", "Windows7", "WindowsXP"]
# if not is_legal:
# raise Exception("OperatingSystem is beyond [Windows10, Windows7, WindowsXP]")
cpu = env_note.find("CPU").text.replace("\n", "").replace(' ', '') #去除空格和回车
is_legal = cpu in ["单核", "双核", "3核", "4核", "6核", "8核"]
if not is_legal:
raise Exception("OperatingSystem is beyond [单核, 双核, 3核, 4核, 6核, 8核]")
memory = env_note.find("Memory").text.replace("\n", "").replace(' ', '') #去除空格和回车
is_legal = memory in ["1GB", "2GB", "4GB", "6GB", "8GB", "10GB", "12GB", "16GB"]
if not is_legal:
raise Exception("OperatingSystem is beyond [1GB, 2GB, 4GB, 6GB, 8GB, 10GB, 12GB, 16GB]")
storage = env_note.find("Storage").text.replace("\n", "").replace(' ', '') #去除空格和回车
is_legal = int(storage[:-2]) > 0
if not is_legal:
raise Exception("Storage < 0GB")
network_card = env_note.find("NetworkCard").text
# is_legal = network_card in ["无需求"]
# if not is_legal:
# # 输出异常
# return
band_width = env_note.find("Bandwidth").text
# is_legal = band_width in ["无需求"]
# if not is_legal:
# # 输出异常
# return
gpu = env_note.find("GPU").text
# is_legal = GPU in ["无需求"]
# if not is_legal:
# # 输出异常
# return
envs = {"is_Cluster": is_cluster, "cluster_num": cluster_num, "operating_system": operating_system,
"CPU": cpu, "memory": memory}
envs.update({"Storage": storage, "network_card": network_card, "band_width": band_width, "GPU": gpu})
return envs
def __check_input_para(self):
"""
检查XML文件中输入参数
:return: dic-输入参数 None-异常
"""
input_paras_note = self.__alg_compt.find("Inputs")
paras_num = int(input_paras_note.attrib.get("ParameterNum"))
para_list = input_paras_note.findall("Parameter")
if paras_num != len(para_list):
msg ="'ParameterNum':"+ str(paras_num) + " != number of 'Parameter':" + str(len(para_list))
raise Exception(msg)
input_paras = {}
for para in para_list:
para_name = para.find("ParaName").text.replace("\n", "").replace(' ', '') #去除空格和回车
para_chs_name = para.find("ParaChsName").text.replace("\n", "").replace(' ', '') #去除空格和回车
para_type = para.find("ParaType").text.replace("\n", "").replace(' ', '') #去除空格和回车
data_type = para.find("DataType").text.replace("\n", "").replace(' ', '') #去除空格和回车
para_value = para.find("ParaValue").text.replace("\n", "").replace(' ', '') #去除空格和回车
input_para = {"ParaName": para_name, "ParaChsName": para_chs_name, "ParaType": para_type,
"DataType": data_type, "ParaValue": para_value}
#print(para_name)
if para_type == "Value":
max_value = para.find("MaxValue").text
min_value = para.find("MinValue").text
option_value = para.find("OptionValue").text.replace("\n", "").replace(' ', '') #去除空格和回车
input_para.update({"MaxValue": max_value, "MinValue": min_value, "OptionValue": option_value})
if para_name is None or para_type is None or para_value is None:
msg = 'there is None among para_name:' + para_name + ',para_type:' + para_type + 'or para_value:' + para_value + '!'
raise Exception(msg)
input_paras.update({para_name: input_para})
return input_paras
def __check_output_para(self):
"""
检查XML文件中输出参数
:return: dic-输出参数 None-异常
"""
output_paras_note = self.__alg_compt.find("Outputs")
paras_num = int(output_paras_note.attrib.get("ParameterNum"))
para_list = output_paras_note.findall("Parameter")
if paras_num != len(para_list):
raise Exception("'ParameterNum' != number of 'Parameter'")
output_paras = {}
# for para in para_list:
#
# para_name = para.find("ParaName").text.replace("\n", "").replace(' ', '') #去除空格和回车
# para_chs_name = para.find("ParaChsName").text.replace("\n", "").replace(' ', '') #去除空格和回车
# para_type = para.find("ParaType").text.replace("\n", "").replace(' ', '') #去除空格和回车
# data_type = para.find("DataType").text.replace("\n", "").replace(' ', '') #去除空格和回车
# para_value = para.find("ParaValue").text.replace("\n", "").replace(' ', '') #去除空格和回车
# no_data_value = para.find("NoDataValue").text.replace("\n", "").replace(' ', '') #去除空格和回车
# output_para = {"ParaName": para_name, "ParaChsName": para_chs_name, "ParaType": para_type,
# "DataType": data_type, "ParaValue": para_value, "NoDataValue": no_data_value}
#
# if para_type == "Value":
# max_value = para.find("MaxValue").text.replace("\n", "").replace(' ', '') #去除空格和回车
# min_value = para.find("MinValue").text.replace("\n", "").replace(' ', '') #去除空格和回车
# option_value = para.find("OptionValue").text.replace("\n", "").replace(' ', '') #去除空格和回车
# output_para.update({"MaxValue": max_value, "MinValue": min_value, "OptionValue": option_value})
#
# if para_name is None or para_type is None or para_value is None:
# msg = 'there is None among para_name:'+ para_name + ',para_type:'+ para_type + 'or para_value:'+ para_value +'!'
# raise Exception(msg)
# output_paras.update({para_name: output_para})
return output_paras
def write_out_para(self,para_name='BackScatteringProduct', para_value="D:\\workspace\\Output\\BackScatteringProduct.tar.gz"):
"""
写入输出参数
"""
self.__tree.parse(self.in_path)
root = self.__tree.getroot()
alg_compt = root.find("AlgCompt")
output_paras_note =alg_compt.find("Outputs")
para_list = output_paras_note.findall("Parameter")
flag = False
for para in para_list:
if para.find("ParaName").text == para_name:
para.find("ParaValue").text = para_value
flag = True
if flag == False:
raise Exception('Cannot find Output Parameter:'+para_name+'!')
self.__tree.write(self.in_path, encoding="utf-8", xml_declaration=True)
def __check_task_id(self):
"""
检查任务ID
:return: taskID None-异常
"""
task_id_note = self.__root.find("TaskID")
task_id = str(task_id_note.text).replace("\n", "").replace(' ', '') #去除空格和回车
if task_id is None:
raise Exception("'TaskID' is None")
return task_id
class CheckSource:
"""
检查配置文件中资源的完整性和有效性
"""
def __init__(self, alg_xml_handle):
self.__alg_xml_handle = alg_xml_handle
self.imageHandler = ImageHandle.ImageHandler()
self.__ParameterDic={}
def check_alg_xml(self):
"""
检查算法配置文件
"""
if self.__alg_xml_handle.init_xml():
logger.info('init algXML succeed')
return True
else:
raise Exception('init algXML failed')
def check_run_env(self):
"""
:return: True-正常False-异常
"""
envs = self.__alg_xml_handle.get_envs()
# 检查操作系统
local_plat = platform.platform()
local_plat_list = local_plat.split("-")
flag = envs['operating_system'] == local_plat_list[0]+local_plat_list[1]
if flag is False:
msg = 'operating_system:' + local_plat_list[0] + local_plat_list[1] + ' is not ' + envs['operating_system']
#raise Exception(msg)
# 检查电脑显存
mem = psutil.virtual_memory()
mem_total = int(round(mem.total / 1024 / 1024 / 1024, 0))
mem_free = round(mem.free / 1024 / 1024 / 1024, 0)
env_memory = envs['memory']
env_memory = int(env_memory[:-2])
if env_memory > mem_total:
msg = 'memory_total ' + str(mem_total) + ' less than'+str(env_memory) + 'GB'
raise Exception(msg)
if env_memory >= mem_free:
msg = 'mem_free ' + str(mem_free) + 'GB less than' + str(env_memory) + 'GB'
logger.warning(msg)
# 检查CPU核数
env_cpu = envs['CPU']
if env_cpu == "单核":
env_cpu_core_num = 1
elif env_cpu == "双核":
env_cpu_core_num = 2
elif env_cpu == "三核":
env_cpu_core_num = 3
else:
env_cpu_core_num = int(env_cpu[:-1])
local_cpu_core_num = int(multiprocessing.cpu_count()/2)
if env_cpu_core_num > local_cpu_core_num:
msg = 'CPU_core_num ' + str(local_cpu_core_num) + 'core less than' + str(env_cpu_core_num) + ' core'
raise Exception(msg)
# 检查磁盘的内存
env_storage = envs['Storage']
env_storage = int(env_storage[:-2])
workspace_path = self.__alg_xml_handle.get_workspace_path()
if not os.path.isdir(workspace_path):
raise Exception('workspace_path:%s do not exist!', workspace_path)
local_storage = self.__get_free_space_mb(workspace_path)
if env_storage > local_storage:
msg = 'workspace storage ' + str(local_storage) + 'GB less than' + envs['Storage'] +"GB"
# raise Exception(msg)
return True
@staticmethod
def __get_free_space_mb(folder):
"""
:param folder:检查的路径 eg:'C:\\'
:return: folder/drive free space (GB)
"""
if platform.system() == 'Windows':
free_bytes = ctypes.c_ulonglong(0)
ctypes.windll.kernel32.GetDiskFreeSpaceExW(ctypes.c_wchar_p(folder), None, None, ctypes.pointer(free_bytes))
return free_bytes.value / 1024 / 1024 / 1024
else:
st = os.statvfs(folder)
return st.f_bavail * st.f_frsize / 1024 / 1024
def check_input_paras(self, input_para_names):
"""
:param input_para_names :需要检查参数的名称列表[name1,name2,...]
:return: 检测是否正常
"""
workspace_path = self.__alg_xml_handle.get_workspace_path()
input_paras = self.__alg_xml_handle.get_input_paras()
for name in input_para_names:
para = input_paras[name]
if para is None:
msg = "check para:"+name + " is failed!"+"para is None!"
raise Exception(msg)
if para['ParaType'] == 'File':
if para['DataType'] == 'tif':
para_value_list = para['ParaValue'].split(";")
for para_value in para_value_list:
# para_path = workspace_path + para_value
para_path = para_value
if self.__check_tif(para_path) is False:
msg = "check para:"+name + " is failed!" + "Path:" + para_path
raise Exception(msg)
if para['DataType'] == 'xml':
para_path = workspace_path + para['ParaValue']
if not os.path.exists(para_path):
raise Exception('para_file:%s is inexistent!', para_path)
if para['DataType'] == 'File':
para_path = workspace_path + para['ParaValue']
if os.path.isdir(para_path) is False:
msg = "check para:" + name + " is failed!" + "FilePath:" + para_path
raise Exception(msg)
elif para['ParaType'] == 'Value':
if para['DataType'] == 'float' or para['DataType'] == 'int' or para['DataType'] == 'double':
if para['ParaValue'] is None:
msg = "check para:"+name + " is failed!"+"'ParaValue' is None"
raise Exception(msg)
if self.__is_number(para['ParaValue']) is False:
raise Exception("para:"+name+" is not number!" )
self.__ParameterDic[name]=para['ParaValue']
__workspace_path = workspace_path
__input_paras = input_paras
return True,self.__ParameterDic
def check_output_paras(self, output_para_names):
"""
:param output_para_names :需要检查参数的名称列表[name1,name2,...]
:return: Ture or False
"""
workspace_path = self.__alg_xml_handle.get_workspace_path()
output_paras = self.__alg_xml_handle.get_output_paras()
for name in output_para_names:
para = output_paras[name]
#print(para)
if para is None:
msg = "check para:" + name + " is failed!" + "para is None!"
raise Exception(msg)
if para['ParaType'] == 'File':
if para['DataType'] == 'tif':
para_path = workspace_path + para['ParaValue']
para_dir = os.path.split(para_path)
flag_isdir = os.path.isdir(para_dir[0])
flag_istif = (para_dir[1].split(".", 1)[1] == "tif")
if flag_isdir and flag_istif is False:
msg = "check para:" + name + " is failed!" + para_path + "is invalid!"
raise Exception(msg)
if para['DataType'] == 'File':
para_path = workspace_path + para['ParaValue']
if os.path.isdir(para_path) is False:
os.makedirs(para_path)
if os.path.isdir(para_path) is False:
msg = "check para:" + name + " is failed!" + para_path + "is invalid!"
raise Exception(msg)
return True
@staticmethod
def __is_number(str_num):
"""
:param str_num :检查str是否为float或者double
:return: True or False
"""
if str_num[0] == '-':
str_num = str_num[1:]
pattern = re.compile(r'(.*)\.(.*)\.(.*)')
if pattern.match(str_num):
return False
return str_num.replace(".", "").isdigit()
def __check_tif(self, filename):
"""
:filename: 文件的路径
:return: True or False
"""
im_proj, im_geotrans, im_arr = self.imageHandler.read_img(filename)
im_scope = self.imageHandler.get_scope(filename)
if im_proj is None or im_geotrans is None or im_arr.size == 0 or im_scope is None:
msg = "im_proj is None or im_geotrans is None or im_arr.size == 0 or im_scope is None,finame: " + filename
raise Exception(msg)
return True