microproduct/landcover_s_geo_sar/LandCoverMain.py

395 lines
18 KiB
Python
Raw Permalink Normal View History

2023-08-28 10:17:29 +00:00
# -*- coding: UTF-8 -*-
"""
@Project microproduct
@File LandCoverMain.py
@Function 实现对数据地表覆盖的训练和分类识别
@Author SHJ
@Date 2021/11/15
@Version 1.0.0
"""
import datetime
import logging
import os
import shutil
import sys
import numpy as np
import cv2
import pyproj._compat
# 导入PreProcess模块要在其他包含gdal库的模块前面不然剪切影像会报错详见https://blog.csdn.net/u014656611/article/details/106450006
from tool.algorithm.algtools.PreProcess import PreProcess as pp
from tool.algorithm.image.ImageHandle import ImageHandler
from LandCoverAuxData import LandCoverMeasCsv
from tool.algorithm.xml.AlgXmlHandle import ManageAlgXML, CheckSource, InitPara
from tool.algorithm.algtools.logHandler import LogHandler
from tool.algorithm.xml.CreatMetafile import CreateMetafile
from LandCoverXmlInfo import CreateDict, CreateStadardXmlFile
from tool.config.ConfigeHandle import Config as cf
from tool.csv.csvHandle import csvHandle
from tool.file.fileHandle import fileHandle
from tool.algorithm.ml.machineLearning import MachineLeaning as ml
from tool.algorithm.algtools.ROIAlg import ROIAlg as roi
import multiprocessing
csvh = csvHandle()
if cf.get('debug') == 'True':
FILE_DEBUG = True
else:
FILE_DEBUG = False
EXE_NAME = cf.get('exe_name')
LogHandler.init_log_handler('run_log\\' + EXE_NAME)
logger = logging.getLogger("mylog")
FILTER_SIZE = int(cf.get('filter_size'))
file =fileHandle(FILE_DEBUG)
MAX_TRAN_NUM = int(cf.get('max_tran__num_per_class'))
env_str = os.path.split(os.path.realpath(__file__))[0]
os.environ['PROJ_LIB'] = env_str
DEBUG = False
class LandCoverMain:
"""
算法主函数
"""
def __init__(self, alg_xml_path):
self.alg_xml_path = alg_xml_path
self.imageHandler = ImageHandler()
self.__alg_xml_handler = ManageAlgXML(alg_xml_path)
self.__check_handler = CheckSource(self.__alg_xml_handler)
self.__workspace_path = None
self.__out_para = None
self.__input_paras, self.__processing_paras, self.__preprocessed_paras = {}, {}, {}
# 多时相影像名称
self.__HHHV_names_list, self.__VVVH_names_list, self.__HH_names_list, self.__VV_names_list = [], [],[],[]
# 保存特征数据的名称
self.__feature_name_list = []
# xml参数字典
self.__parameters_dic = {}
# 参考影像路径
self.__ref_img_path = ''
# 宽/列数,高/行数
self.__cols, self.__rows = 0, 0
# 坐标系
self.__proj = ''
# 影像投影变换矩阵
self.__geo = [0, 0, 0, 0, 0, 0]
self.__polar = 1
def check_source(self):
"""
检查算法相关的配置文件图像辅助文件是否齐全
"""
env_str = os.getcwd()
logger.info("sysdir: %s", env_str)
self.__check_handler.check_alg_xml()
self.__check_handler.check_run_env()
# 检查景影像是否为全极化
self.__input_paras = self.__alg_xml_handler.get_input_paras()
checkFlag, self.__parameters_dic = self.__check_handler.check_input_paras(self.__input_paras)
self.__workspace_path = self.__alg_xml_handler.get_workspace_path()
self.__create_work_space()
self.__processing_paras = InitPara.init_processing_paras(self.__input_paras)
self.__processing_paras.update(InitPara(DEBUG).get_mult_tar_gz_infs(self.__processing_paras, self.__workspace_preprocessing_path))
self.__polar = self.__processing_paras['Polarization']
self.image_meta_xml = self.__processing_paras['META']
for name in self.__processing_paras['name_list']:
flag_list = self.__processing_paras[name + '_pola']
if self.__polar == 1:
if flag_list[0] == 1:
self.__HH_names_list.append(name)
elif flag_list[3] == 1:
self.__VV_names_list.append(name)
elif self.__polar == 2:
if flag_list[0] == 1 and flag_list[1] == 1:
self.__HHHV_names_list.append(name)
elif flag_list[2] == 1 and flag_list[3] == 1:
self.__VVVH_names_list.append(name)
self.__feature_name_list = [key for key in self.__processing_paras.keys() if ('FeatureMap' in key)]
self.__out_para = os.path.join(self.__workspace_path, EXE_NAME, 'Output', r"LandCoverProduct.tar.gz")
self.__alg_xml_handler.write_out_para("LandCoverProductDir", self.__out_para) # 写入输出参数
logger.info('check_source success!')
return True
def __create_work_space(self):
"""
删除原有工作区文件夹,创建新工作区文件夹
"""
self.__workspace_preprocessing_path = self.__workspace_path + EXE_NAME + r"\Temporary\preprocessing""\\"
self.__workspace_preprocessed_path = self.__workspace_path + EXE_NAME + r"\Temporary\preprocessed""\\"
self.__workspace_processing_path = self.__workspace_path + EXE_NAME + r"\Temporary\processing""\\"
self.__workspace_block_tif_path = self.__workspace_path + EXE_NAME + r"\Temporary\blockTif""\\"
self.__workspace_block_tif_processed_path = self.__workspace_path + EXE_NAME + r"\Temporary\blockTifProcessed""\\"
self.__feature_tif_dir = self.__workspace_processing_path + 'feature_tif'
self.__product_dir = self.__workspace_processing_path + 'product\\'
path_list = [self.__workspace_preprocessing_path, self.__workspace_preprocessed_path,
self.__workspace_processing_path, self.__workspace_block_tif_path,
self.__workspace_block_tif_processed_path, self.__feature_tif_dir,self.__product_dir]
file.creat_dirs(path_list)
logger.info('create new workspace success!')
def del_temp_workspace(self):
"""
临时工作区
"""
path = self.__workspace_path + EXE_NAME + r"\Temporary"
if os.path.exists(path):
file.del_folder(path)
def get_name_tif_list(self):
para_names = []
for tif_name in self.__HHHV_names_list:
para_names.append(tif_name + '_HH')
para_names.append(tif_name + '_HV')
for tif_name in self.__VVVH_names_list:
para_names.append(tif_name + '_VH')
para_names.append(tif_name + '_VV')
for tif_name in self.__HH_names_list:
para_names.append(tif_name + '_HH')
for tif_name in self.__VV_names_list:
para_names.append(tif_name + '_VV')
return para_names
def preprocess_handle(self):
"""
预处理
"""
# 读取每一张图像,检查图像坐标系
para_names = self.get_name_tif_list()
ref_img_name = para_names[0]
para_names = para_names + self.__feature_name_list
p = pp()
self.__preprocessed_paras = p.preprocessing(para_names, ref_img_name,
self.__processing_paras,
self.__workspace_preprocessing_path, self.__workspace_preprocessed_path)
self.__ref_img_path, self.__cols, self.__rows, self.__proj, self.__geo = p.get_ref_inf(self.__preprocessed_paras[ref_img_name])
logger.info('progress bar: 40%')
logger.info('preprocess_handle success!')
def create_roi(self):
"""
计算ROI掩膜
:return:掩膜路径
"""
para_names = self.get_name_tif_list()
tif_mask_path = roi().roi_process(para_names, self.__workspace_processing_path + "/roi/", self.__processing_paras, self.__preprocessed_paras)
logger.info('create ROI image success!')
return tif_mask_path
def create_features(self):
"""
使用每一景全极化影像生成极化特征影像
:para name_list: 每一景影像的路径
"""
for fea_name in self.__feature_name_list:
path = self.__preprocessed_paras[fea_name]
feature_path = os.path.join(self.__feature_tif_dir, os.path.basename(path))
shutil.copyfile(path, feature_path)
feature_array = ImageHandler.get_band_array(feature_path)
feature_array = ml.standardization(feature_array)
self.imageHandler.write_img(feature_path, self.__proj, self.__geo, feature_array)
if self.__polar == 2:
self.create_double_polar_features()
elif self.__polar == 1:
self.create_single_polar_features()
def create_single_polar_features(self):
name_list = [] + self.__HH_names_list + self.__VV_names_list
for name in name_list:
if name in self.__HH_names_list:
tif_name = name + '_HH'
else:
tif_name = name + '_VV'
path = self.__preprocessed_paras[tif_name]
array_single_pol = ImageHandler.get_band_array(path)
# 后向散射无效值设置为nan
array_single_pol[np.where(array_single_pol < -100.0)] = np.nan
array_single_pol = ml.standardization(array_single_pol)
feature_path = os.path.join(self.__feature_tif_dir, os.path.basename(path))
self.imageHandler.write_img(feature_path, self.__proj, self.__geo, array_single_pol)
def create_double_polar_features(self):
name_list = [] + self.__HHHV_names_list + self.__VVVH_names_list
# # 生成极化组合特征
for name in name_list:
if name in self.__HHHV_names_list:
array_single_pol = ImageHandler.get_band_array(self.__preprocessed_paras[name+'_HH'])
array_cross_pol = ImageHandler.get_band_array(self.__preprocessed_paras[name+'_HV'])
else:
array_single_pol = ImageHandler.get_band_array(self.__preprocessed_paras[name + '_VV'])
array_cross_pol = ImageHandler.get_band_array(self.__preprocessed_paras[name + '_VH'])
# 后向散射无效值设置为nan
array_single_pol[np.where(array_single_pol < -100.0)] = np.nan
array_cross_pol[np.where(array_cross_pol < -100.0)] = np.nan
decomposeList = self.__parameters_dic["FeatureCombination"].split(',')
if len(decomposeList) == 1:
decomposeList = self.__parameters_dic["FeatureCombination"].split(';')
for n in range(8):
if str(n) in decomposeList:
feature_path = os.path.join(self.__feature_tif_dir, name + '_feature'+str(n)+'.tif')
if n == 0:
array = array_single_pol
elif n == 1:
array = array_cross_pol
elif n == 2:
array = array_single_pol + array_cross_pol
elif n == 3:
array = array_single_pol - array_cross_pol
elif n == 4:
array = (array_single_pol - array_cross_pol)/(array_single_pol + array_cross_pol)
elif n == 5:
array = array_single_pol/array_cross_pol
elif n == 6:
array = array_cross_pol + array_single_pol/array_cross_pol
elif n == 7:
array = (array_single_pol**2 + array_cross_pol**2)/(array_single_pol**2 - array_cross_pol**2)
else:
continue
array = ml.standardization(array)
self.imageHandler.write_img(feature_path, self.__proj, self.__geo, array)
logging.info("feature0 = array_single_pol, feature1= array_cross_pol")
logging.info("feature1 = array_cross_pol")
logging.info("feature2 = array_single_pol + array_cross_pol")
logging.info("feature3 = array_single_pol - array_cross_pol")
logging.info("feature4 = (array_single_pol - array_cross_pol)/(array_single_pol + array_cross_pol)")
logging.info("feature5 = array_single_pol/array_cross_pol")
logging.info("feature6 = array_cross_pol + array_single_pol/array_cross_pol")
logging.info("feature7 = (array_single_pol**2 + array_cross_pol**2)/(array_single_pol**2 - array_cross_pol**2)")
def process_handle(self, start):
"""
算法主处理函数
"""
# 读取实测值,获取多边形区域内所有的点,分为训练集数据和测试集数据
pm = LandCoverMeasCsv(self.__processing_paras['LabelData'], self.__ref_img_path,MAX_TRAN_NUM)
train_data_list = pm.api_read_measure()
train_data_dic = csvh.trans_landCover_list2dic(train_data_list)
csvh_roi = csvHandle(self.__rows, self.__cols)
label_img = csvh_roi.get_roi_img()
if (len(label_img) != 0):
self.imageHandler.write_img(self.__workspace_processing_path + "label_img.tif", "", [0, 0, 0, 0, 0, 0],
label_img)
# csvh_roi = csvHandle(self.__rows, self.__cols)
# train_data_dic = csvh_roi.trans_landCover_measuredata_dic(
# csvh_roi.readcsv(self.__processing_paras['LabelData']), self.__preprocessed_paras['ori_sim'], MAX_TRAN_NUM)
# label_img = csvh_roi.get_roi_img()
# if (len(label_img) != 0):
# self.imageHandler.write_img(self.__workspace_processing_path + "label_img.tif", "", [0, 0, 0, 0, 0, 0],
# label_img)
type_id_name, type_id_parent = pm.class_list()
self.create_meta_file(self.__ref_img_path, type_id_name, type_id_parent)
logger.info("read csv data success!")
logger.info('progress bar: 45%')
roi_img = self.imageHandler.get_band_array(self.create_roi())
# out_tif_path = self.ref_lee_filter(name_list)
self.create_features()
logger.info("create_features success!")
logger.info('progress bar: 50%')
# 生成最优特征子集训练集
X_train, Y_train, optimal_feature = ml.gene_optimal_train_set(train_data_dic, self.__feature_tif_dir)
# RF
clf = ml.trainRF(X_train, Y_train)
logger.info('progress bar: 60%')
# 生成测试集
X_test_path_list = ml.gene_test_set(self.__feature_tif_dir, optimal_feature)
# 预测
logger.info('testing')
cover_path = ml.predict(clf, X_test_path_list, EXE_NAME, self.__workspace_processing_path, self.__rows, self.__cols)
logger.info('test success!')
logger.info('progress bar: 95%')
# 转换数据
proj, geo, cover_data = self.imageHandler.read_img(cover_path)
# 形态学闭运算去roi区域噪点
cover_data = np.uint8(cover_data)
kernel = np.ones((5, 5), np.uint8)
cover_data = cv2.erode(cv2.dilate(cover_data, kernel), kernel)
for id, class_id in zip(train_data_dic['ids'], train_data_dic['class_ids']):
cover_data[np.where(cover_data == id)] = class_id
cover_data = np.int16(cover_data)
# 获取影像roi区域
cover_data = cover_data * roi_img
product_dir = self.__workspace_processing_path + 'product\\'
product_path = os.path.join(product_dir, 'LandCoverProduct.tif')
self.imageHandler.write_img(product_path, self.__proj, self.__geo, cover_data)
# 生成快视图
self.imageHandler.write_quick_view(product_path, color_img=True)
# 文件夹打包
file.make_targz(self.__out_para, product_dir)
logger.info('process_handle success!')
def create_meta_file(self, product_path, type_id_name, type_id_parent):
product_dir = self.__workspace_processing_path + 'product\\'
xml_path = "./model_meta.xml"
tem_folder = self.__workspace_path + EXE_NAME + r"\Temporary""\\"
image_path = product_path
out_path1 = os.path.join(tem_folder, "trans_geo_projcs.tif")
out_path2 = os.path.join(tem_folder, "trans_projcs_geo.tif")
par_dict = CreateDict(image_path, out_path1, out_path2).calu_nature(start)
model_xml_path = os.path.join(tem_folder, "creat_standard.meta.xml") # 输出xml路径
CreateStadardXmlFile(xml_path, self.alg_xml_path, par_dict, model_xml_path).create_standard_xml()
SrcImagePath = self.__input_paras["MultiTempSAR"]['ParaValue']
paths = SrcImagePath.split(';')
SrcImageName=os.path.split(paths[0])[1].split('.tar.gz')[0]
if len(paths) >= 2:
for i in range(1, len(paths)):
SrcImageName=SrcImageName+";"+os.path.split(paths[i])[1].split('.tar.gz')[0]
meta_xml_path = product_dir+EXE_NAME+"Product.meta.xml"
CreateMetafile(self.__processing_paras['META'], self.alg_xml_path, model_xml_path, meta_xml_path).process2(type_id_name, type_id_parent, SrcImageName)
if __name__ == '__main__':
multiprocessing.freeze_support() #解决打包与运行错误
start = datetime.datetime.now()
try:
if len(sys.argv) < 2:
xml_path = 'LandCover_geo.xml'
else:
xml_path = sys.argv[1]
main_handler = LandCoverMain(xml_path)
if main_handler.check_source() is False:
raise Exception('check_source() failed!')
if main_handler.preprocess_handle() is False:
raise Exception('preprocess_handle() failed!')
if main_handler.process_handle(start) is False:
raise Exception('process_handle() failed!')
logger.info('successful production of ' + EXE_NAME + ' products!')
except Exception:
logger.exception("run-time error!")
finally:
main_handler.del_temp_workspace()
pass
end = datetime.datetime.now()
msg = 'running use time: %s ' % (end - start)
logger.info(msg)