microproduct/dem-sentiral/AlgXmlHandle.py

505 lines
20 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

# -*- coding: UTF-8 -*-
"""
@Project microproduct
@File AlgXmlHandle.py
@Function :算法描述文件读写和检查
@Contact https://www.cnblogs.com/feifeifeisir/p/10893127.html
@Author SHJ
@Date 2021/9/6
@Version 1.0.0
"""
import logging
from xml.etree.ElementTree import ElementTree
import os
import re
import platform
import psutil
import multiprocessing
import ctypes
from tool.algorithm.image import ImageHandle
logger = logging.getLogger("mylog")
class ManageAlgXML:
"""
检查和读取XML文件信息
"""
def __init__(self, xml_path):
self.in_path = xml_path
self.__tree = ElementTree()
self.__root = None
self.__alg_compt = None
self.__workspace_path = None
self.__taskID = None
self.__envs = {}
self.__input_paras = {}
self.__output_paras = {}
self.__init_flag = False
def init_xml(self):
"""
初始化XML文件
:return: True初始化成功 False 初始化失败
"""
try:
self.__tree.parse(self.in_path)
except FileNotFoundError as ex:
raise Exception(ex)
except BaseException:
raise Exception("cannot open algXMl")
self.__root = self.__tree.getroot()
if self.__root is None:
raise Exception("get root failed")
self.__alg_compt = self.__root.find("AlgCompt")
if self.__alg_compt is None:
raise Exception("get AlgCompt failed")
self.__workspace_path = self.__check_workspace_path()
if self.__workspace_path is None:
raise Exception("check workspace_path failed")
self.__taskID = self.__check_task_id()
if self.__taskID is None:
raise Exception("check taskID failed")
self.__envs = self.__check_environment()
if self.__envs is None or self.__envs == {}:
raise Exception("check environment failed")
self.__input_paras = self.__check_input_para()
if self.__input_paras is None or self.__input_paras == {}:
raise Exception("check input para failed")
self.__output_paras = self.__check_output_para()
self.__init_flag = True
return True
def get_workspace_path(self):
"""
获取工作空间路径
:return: 工作空间路径, None-异常
"""
if not self.__init_flag:
raise Exception("XML is not initialized")
return self.__workspace_path
def get_task_id(self):
"""
获取任务ID
:return: taskID None-异常
"""
if not self.__init_flag:
raise Exception("XML is not initialized")
return self.__taskID
def get_envs(self):
"""
获取运行环境要求
:return:运行环境要求, None-异常
"""
if not self.__init_flag:
raise Exception("XML is not initialized")
return self.__envs
def get_input_paras(self):
"""
获取输入参数
:return:输入参数, None-异常
"""
if not self.__init_flag:
raise Exception("XML is not initialized")
return self.__input_paras
def get_output_paras(self):
"""
获取输出参数
:return:输出参数, None-异常
"""
if not self.__init_flag:
raise Exception("XML is not initialized")
return self.__output_paras
def __check_workspace_path(self):
"""
检查工作空间路径
:return: 工作空间路径, None-异常
"""
workspace_note = self.__root.find("WorkSpace")
workspace_path = str(workspace_note.text).replace("\n", "").replace(' ', '') #去除空格和回车
if workspace_path is None:
raise Exception("'workspace_path' is None")
if not os.path.isdir(workspace_path):
raise Exception("'workspace_path' is not save:%s",workspace_path)
if workspace_path[-1] != '\\':
workspace_path += "'\'"
return workspace_path
def __check_environment(self):
"""
检查XML文件中运行环境要求
:return: dic-运行环境要求, None-异常
"""
env_note = self.__alg_compt.find("Environment")
is_cluster = int(env_note.find("IsCluster").text.replace("\n", "").replace(' ', ''))
is_legal = is_cluster in [0, 1]
if not is_legal:
raise Exception("IsCluster is not 0 or 1")
cluster_num = int(env_note.find("ClusterNum").text)
is_legal = cluster_num in [0, 1, 2, 3, 4, 5, 6, 7]
if not is_legal:
raise Exception("cluster_num is beyond [0,1,2,3,4,5,6,7]")
operating_system = env_note.find("OperatingSystem").text.replace("\n", "").replace(' ', '') #去除空格和回车
# is_legal = operating_system in ["Windows10", "Windows7", "WindowsXP"]
# if not is_legal:
# raise Exception("OperatingSystem is beyond [Windows10, Windows7, WindowsXP]")
cpu = env_note.find("CPU").text.replace("\n", "").replace(' ', '') #去除空格和回车
is_legal = cpu in ["单核", "双核", "3核", "4核", "6核", "8核"]
if not is_legal:
raise Exception("OperatingSystem is beyond [单核, 双核, 3核, 4核, 6核, 8核]")
memory = env_note.find("Memory").text.replace("\n", "").replace(' ', '') #去除空格和回车
is_legal = memory in ["1GB", "2GB", "4GB", "6GB", "8GB", "10GB", "12GB", "16GB"]
if not is_legal:
raise Exception("OperatingSystem is beyond [1GB, 2GB, 4GB, 6GB, 8GB, 10GB, 12GB, 16GB]")
storage = env_note.find("Storage").text.replace("\n", "").replace(' ', '') #去除空格和回车
is_legal = int(storage[:-2]) > 0
if not is_legal:
raise Exception("Storage < 0GB")
network_card = env_note.find("NetworkCard").text
# is_legal = network_card in ["无需求"]
# if not is_legal:
# # 输出异常
# return
band_width = env_note.find("Bandwidth").text
# is_legal = band_width in ["无需求"]
# if not is_legal:
# # 输出异常
# return
gpu = env_note.find("GPU").text
# is_legal = GPU in ["无需求"]
# if not is_legal:
# # 输出异常
# return
envs = {"is_Cluster": is_cluster, "cluster_num": cluster_num, "operating_system": operating_system,
"CPU": cpu, "memory": memory}
envs.update({"Storage": storage, "network_card": network_card, "band_width": band_width, "GPU": gpu})
return envs
def __check_input_para(self):
"""
检查XML文件中输入参数
:return: dic-输入参数, None-异常
"""
input_paras_note = self.__alg_compt.find("Inputs")
paras_num = int(input_paras_note.attrib.get("ParameterNum"))
para_list = input_paras_note.findall("Parameter")
if paras_num != len(para_list):
msg ="'ParameterNum':"+ str(paras_num) + " != number of 'Parameter':" + str(len(para_list))
raise Exception(msg)
input_paras = {}
for para in para_list:
para_name = para.find("ParaName").text.replace("\n", "").replace(' ', '') #去除空格和回车
para_chs_name = para.find("ParaChsName").text.replace("\n", "").replace(' ', '') #去除空格和回车
para_type = para.find("ParaType").text.replace("\n", "").replace(' ', '') #去除空格和回车
data_type = para.find("DataType").text.replace("\n", "").replace(' ', '') #去除空格和回车
para_value = para.find("ParaValue").text.replace("\n", "").replace(' ', '') #去除空格和回车
input_para = {"ParaName": para_name, "ParaChsName": para_chs_name, "ParaType": para_type,
"DataType": data_type, "ParaValue": para_value}
#print(para_name)
if para_type == "Value":
max_value = para.find("MaxValue").text
min_value = para.find("MinValue").text
option_value = para.find("OptionValue").text.replace("\n", "").replace(' ', '') #去除空格和回车
input_para.update({"MaxValue": max_value, "MinValue": min_value, "OptionValue": option_value})
if para_name is None or para_type is None or para_value is None:
msg = 'there is None among para_name:' + para_name + ',para_type:' + para_type + 'or para_value:' + para_value + '!'
raise Exception(msg)
input_paras.update({para_name: input_para})
return input_paras
def __check_output_para(self):
"""
检查XML文件中输出参数
:return: dic-输出参数, None-异常
"""
output_paras_note = self.__alg_compt.find("Outputs")
paras_num = int(output_paras_note.attrib.get("ParameterNum"))
para_list = output_paras_note.findall("Parameter")
if paras_num != len(para_list):
raise Exception("'ParameterNum' != number of 'Parameter'")
output_paras = {}
# for para in para_list:
#
# para_name = para.find("ParaName").text.replace("\n", "").replace(' ', '') #去除空格和回车
# para_chs_name = para.find("ParaChsName").text.replace("\n", "").replace(' ', '') #去除空格和回车
# para_type = para.find("ParaType").text.replace("\n", "").replace(' ', '') #去除空格和回车
# data_type = para.find("DataType").text.replace("\n", "").replace(' ', '') #去除空格和回车
# para_value = para.find("ParaValue").text.replace("\n", "").replace(' ', '') #去除空格和回车
# no_data_value = para.find("NoDataValue").text.replace("\n", "").replace(' ', '') #去除空格和回车
# output_para = {"ParaName": para_name, "ParaChsName": para_chs_name, "ParaType": para_type,
# "DataType": data_type, "ParaValue": para_value, "NoDataValue": no_data_value}
#
# if para_type == "Value":
# max_value = para.find("MaxValue").text.replace("\n", "").replace(' ', '') #去除空格和回车
# min_value = para.find("MinValue").text.replace("\n", "").replace(' ', '') #去除空格和回车
# option_value = para.find("OptionValue").text.replace("\n", "").replace(' ', '') #去除空格和回车
# output_para.update({"MaxValue": max_value, "MinValue": min_value, "OptionValue": option_value})
#
# if para_name is None or para_type is None or para_value is None:
# msg = 'there is None among para_name:'+ para_name + ',para_type:'+ para_type + 'or para_value:'+ para_value +'!'
# raise Exception(msg)
# output_paras.update({para_name: output_para})
return output_paras
def write_out_para(self,para_name='BackScatteringProduct', para_value="D:\\workspace\\Output\\BackScatteringProduct.tar.gz"):
"""
写入输出参数
"""
self.__tree.parse(self.in_path)
root = self.__tree.getroot()
alg_compt = root.find("AlgCompt")
output_paras_note =alg_compt.find("Outputs")
para_list = output_paras_note.findall("Parameter")
flag = False
for para in para_list:
if para.find("ParaName").text == para_name:
para.find("ParaValue").text = para_value
flag = True
if flag == False:
raise Exception('Cannot find Output Parameter:'+para_name+'!')
self.__tree.write(self.in_path, encoding="utf-8", xml_declaration=True)
def __check_task_id(self):
"""
检查任务ID
:return: taskID None-异常
"""
task_id_note = self.__root.find("TaskID")
task_id = str(task_id_note.text).replace("\n", "").replace(' ', '') #去除空格和回车
if task_id is None:
raise Exception("'TaskID' is None")
return task_id
class CheckSource:
"""
检查配置文件中资源的完整性和有效性
"""
def __init__(self, alg_xml_handle):
self.__alg_xml_handle = alg_xml_handle
self.imageHandler = ImageHandle.ImageHandler()
self.__ParameterDic={}
def check_alg_xml(self):
"""
检查算法配置文件
"""
if self.__alg_xml_handle.init_xml():
logger.info('init algXML succeed')
return True
else:
raise Exception('init algXML failed')
def check_run_env(self):
"""
:return: True-正常False-异常
"""
envs = self.__alg_xml_handle.get_envs()
# 检查操作系统
local_plat = platform.platform()
local_plat_list = local_plat.split("-")
flag = envs['operating_system'] == local_plat_list[0]+local_plat_list[1]
if flag is False:
msg = 'operating_system:' + local_plat_list[0] + local_plat_list[1] + ' is not ' + envs['operating_system']
#raise Exception(msg)
# 检查电脑显存
mem = psutil.virtual_memory()
mem_total = int(round(mem.total / 1024 / 1024 / 1024, 0))
mem_free = round(mem.free / 1024 / 1024 / 1024, 0)
env_memory = envs['memory']
env_memory = int(env_memory[:-2])
if env_memory > mem_total:
msg = 'memory_total ' + str(mem_total) + ' less than'+str(env_memory) + 'GB'
raise Exception(msg)
if env_memory >= mem_free:
msg = 'mem_free ' + str(mem_free) + 'GB less than' + str(env_memory) + 'GB'
logger.warning(msg)
# 检查CPU核数
env_cpu = envs['CPU']
if env_cpu == "单核":
env_cpu_core_num = 1
elif env_cpu == "双核":
env_cpu_core_num = 2
elif env_cpu == "三核":
env_cpu_core_num = 3
else:
env_cpu_core_num = int(env_cpu[:-1])
local_cpu_core_num = int(multiprocessing.cpu_count()/2)
if env_cpu_core_num > local_cpu_core_num:
msg = 'CPU_core_num ' + str(local_cpu_core_num) + 'core less than' + str(env_cpu_core_num) + ' core'
raise Exception(msg)
# 检查磁盘的内存
env_storage = envs['Storage']
env_storage = int(env_storage[:-2])
workspace_path = self.__alg_xml_handle.get_workspace_path()
if not os.path.isdir(workspace_path):
raise Exception('workspace_path:%s do not exist!', workspace_path)
local_storage = self.__get_free_space_mb(workspace_path)
if env_storage > local_storage:
msg = 'workspace storage ' + str(local_storage) + 'GB less than' + envs['Storage'] +"GB"
# raise Exception(msg)
return True
@staticmethod
def __get_free_space_mb(folder):
"""
:param folder:检查的路径 eg:'C:\\'
:return: folder/drive free space (GB)
"""
if platform.system() == 'Windows':
free_bytes = ctypes.c_ulonglong(0)
ctypes.windll.kernel32.GetDiskFreeSpaceExW(ctypes.c_wchar_p(folder), None, None, ctypes.pointer(free_bytes))
return free_bytes.value / 1024 / 1024 / 1024
else:
st = os.statvfs(folder)
return st.f_bavail * st.f_frsize / 1024 / 1024
def check_input_paras(self, input_para_names):
"""
:param input_para_names :需要检查参数的名称列表[name1,name2,...]
:return: 检测是否正常
"""
workspace_path = self.__alg_xml_handle.get_workspace_path()
input_paras = self.__alg_xml_handle.get_input_paras()
for name in input_para_names:
para = input_paras[name]
if para is None:
msg = "check para:"+name + " is failed!"+"para is None!"
raise Exception(msg)
if para['ParaType'] == 'File':
if para['DataType'] == 'tif':
para_value_list = para['ParaValue'].split(";")
for para_value in para_value_list:
# para_path = workspace_path + para_value
para_path = para_value
if self.__check_tif(para_path) is False:
msg = "check para:"+name + " is failed!" + "Path:" + para_path
raise Exception(msg)
if para['DataType'] == 'xml':
para_path = workspace_path + para['ParaValue']
if not os.path.exists(para_path):
raise Exception('para_file:%s is inexistent!', para_path)
if para['DataType'] == 'File':
para_path = workspace_path + para['ParaValue']
if os.path.isdir(para_path) is False:
msg = "check para:" + name + " is failed!" + "FilePath:" + para_path
raise Exception(msg)
elif para['ParaType'] == 'Value':
if para['DataType'] == 'float' or para['DataType'] == 'int' or para['DataType'] == 'double':
if para['ParaValue'] is None:
msg = "check para:"+name + " is failed!"+"'ParaValue' is None"
raise Exception(msg)
if self.__is_number(para['ParaValue']) is False:
raise Exception("para:"+name+" is not number!" )
self.__ParameterDic[name]=para['ParaValue']
__workspace_path = workspace_path
__input_paras = input_paras
return True,self.__ParameterDic
def check_output_paras(self, output_para_names):
"""
:param output_para_names :需要检查参数的名称列表[name1,name2,...]
:return: Ture or False
"""
workspace_path = self.__alg_xml_handle.get_workspace_path()
output_paras = self.__alg_xml_handle.get_output_paras()
for name in output_para_names:
para = output_paras[name]
#print(para)
if para is None:
msg = "check para:" + name + " is failed!" + "para is None!"
raise Exception(msg)
if para['ParaType'] == 'File':
if para['DataType'] == 'tif':
para_path = workspace_path + para['ParaValue']
para_dir = os.path.split(para_path)
flag_isdir = os.path.isdir(para_dir[0])
flag_istif = (para_dir[1].split(".", 1)[1] == "tif")
if flag_isdir and flag_istif is False:
msg = "check para:" + name + " is failed!" + para_path + "is invalid!"
raise Exception(msg)
if para['DataType'] == 'File':
para_path = workspace_path + para['ParaValue']
if os.path.isdir(para_path) is False:
os.makedirs(para_path)
if os.path.isdir(para_path) is False:
msg = "check para:" + name + " is failed!" + para_path + "is invalid!"
raise Exception(msg)
return True
@staticmethod
def __is_number(str_num):
"""
:param str_num :检查str是否为float或者double
:return: True or False
"""
if str_num[0] == '-':
str_num = str_num[1:]
pattern = re.compile(r'(.*)\.(.*)\.(.*)')
if pattern.match(str_num):
return False
return str_num.replace(".", "").isdigit()
def __check_tif(self, filename):
"""
:filename: 文件的路径
:return: True or False
"""
im_proj, im_geotrans, im_arr = self.imageHandler.read_img(filename)
im_scope = self.imageHandler.get_scope(filename)
if im_proj is None or im_geotrans is None or im_arr.size == 0 or im_scope is None:
msg = "im_proj is None or im_geotrans is None or im_arr.size == 0 or im_scope is None,finame: " + filename
raise Exception(msg)
return True