microproduct/vegetationPhenology/VegetationPhenologyMain.py

395 lines
16 KiB
Python
Raw Permalink Normal View History

2023-08-28 10:17:29 +00:00
# -*- coding: UTF-8 -*-
"""
@Project microproduct
@File PhenologyMain.py
@Author SHJ
@Contact土壤盐碱度算法主函数
@Date 2021/9/6
@Version 1.0.0
"""
import logging
import os
import datetime
import pyproj._compat
import cv2
import numpy as np
from osgeo import gdal, osr, ogr
from tool.algorithm.algtools.PreProcess import PreProcess as pp
from tool.algorithm.image.ImageHandle import ImageHandler
from tool.algorithm.xml.AlgXmlHandle import ManageAlgXML, CheckSource, InitPara
from tool.algorithm.algtools.logHandler import LogHandler
from tool.algorithm.xml.CreatMetafile import CreateMetafile
from VegetationPhenologyXmlInfo import CreateDict, CreateStadardXmlFile
from tool.file.fileHandle import fileHandle
import sys
from tool.algorithm.transforml1a.transHandle import TransImgL1A
from tool.csv.csvHandle import csvHandle
from tool.algorithm.polsarpro.createfeature import CreateFeature
import multiprocessing
from tool.algorithm.ml.machineLearning import MachineLeaning as ml
from tool.config.ConfigeHandle import Config as cf
csvh = csvHandle()
if cf.get('debug') == 'True':
DEBUG = True
else:
DEBUG = False
EXE_NAME = cf.get('exe_name')
LogHandler.init_log_handler('run_log\\' + EXE_NAME)
logger = logging.getLogger("mylog")
FILTER_SIZE = int(cf.get('filter_size'))
file =fileHandle(DEBUG)
env_str = os.path.split(os.path.realpath(__file__))[0]
os.environ['PROJ_LIB'] = env_str
class PhenologyMain:
"""
土壤粗糙度处理主函数
"""
def __init__(self, alg_xml_path):
self.alg_xml_path = alg_xml_path
self.imageHandler = ImageHandler()
self.__alg_xml_handler = ManageAlgXML(alg_xml_path)
self.__check_handler = CheckSource(self.__alg_xml_handler)
self.__workspace_path, self.__out_para = None, None
self.__input_paras, self.__processing_paras, self.__preprocessed_paras = {}, {}, {}
self.processinfo = [1, 1, 1, 1]
self._name_tr_dic = {} #l1a转换geo的对象
def check_source(self):
"""
检查算法相关的配置文件图像辅助文件是否齐全
"""
self._env_str = os.getcwd()
logger.info("sysdir: %s", self._env_str)
self.__check_handler.check_alg_xml()
self.__check_handler.check_run_env()
# 检查景影像是否为全极化
self.__input_paras = self.__alg_xml_handler.get_input_paras()
if self.__check_handler.check_input_paras(self.__input_paras) is False:
return False
self.__workspace_path = self.__alg_xml_handler.get_workspace_path()
self.__create_work_space()
self.__processing_paras = InitPara.init_processing_paras(self.__input_paras)
self.__processing_paras.update(InitPara(DEBUG).get_mult_tar_gz_infs(self.__processing_paras, self.__workspace_preprocessing_path))
self.__out_para = os.path.join(self.__workspace_path, EXE_NAME, 'Output', r"VegetationPhenologyProduct.tar.gz")
self.__alg_xml_handler.write_out_para("VegetationPhenologyProduct", self.__out_para) #写入输出参数
logger.info('check_source success!')
logger.info('progress bar: 10%')
return True
def __create_work_space(self):
"""
删除原有工作区文件夹,创建新工作区文件夹
"""
self.__workspace_preprocessing_path = self.__workspace_path + EXE_NAME + '\\Temporary\\preprocessing\\'
self.__workspace_preprocessed_path = self.__workspace_path + EXE_NAME + '\\Temporary\\preprocessed\\'
self.__workspace_processing_path = self.__workspace_path + EXE_NAME + '\\Temporary\\processing\\'
self.__product_dic = self.__workspace_processing_path + 'product\\'
path_list = [self.__workspace_preprocessing_path, self.__workspace_preprocessed_path,
self.__workspace_processing_path, self.__product_dic]
file.creat_dirs(path_list)
logger.info('create new workspace success!')
def del_temp_workspace(self):
"""
临时工作区
"""
if DEBUG is True:
return
path = self.__workspace_path + EXE_NAME + r'\Temporary'
if os.path.exists(path):
file.del_folder(path)
def preprocess_single_tar(self,name,scopes_roi):
key_list = [key for key in self.__processing_paras.keys() if((name in key) and ('inc_angle' not in key) and ('LocalIncidenceAngle' not in key)and ('pola' not in key))]
ori_sim_key = [key for key in key_list if ('ori_sim' in key)][0]
ori_sim_path = self.__processing_paras[ori_sim_key]
tr = TransImgL1A(ori_sim_path, scopes_roi)
for k in key_list:
out_path = os.path.join(self.__workspace_preprocessed_path, k + "_preprocessed.tif")
tr.cut_L1A(self.__processing_paras[k], out_path)
self.__preprocessed_paras.update({k: out_path})
self._name_tr_dic.update({name: tr})
def preprocess_handle(self):
"""
预处理
"""
scopes_roi = pp().cal_scopes_roi(self.__processing_paras)
for name in self.__processing_paras['name_list']:
self.preprocess_single_tar(name, scopes_roi)
logger.info('preprocess_handle success!')
logger.info('progress bar: 10%')
def gene_test_set(self, test_data, features_path_dic, name):
# 生成测试集
X_test_dic = {} # 同时测试多张影像
for tif_name in features_path_dic:
feature_array = self.imageHandler.get_data(features_path_dic[tif_name])
dim = feature_array.shape[0]
row_list = []
col_list = []
x_test = np.empty(shape=(0, dim))
for data in test_data:
if data[0] == tif_name:
row, col = zip(*data[2])
row_list = row_list + list(row)
col_list = col_list + list(col)
x_test = np.vstack((x_test, feature_array[:, row, col].T))
if row_list != []:
X_test_dic.update({tif_name: [x_test, row_list, col_list]})
return X_test_dic
def get_name_rows_cols(self, name):
key_list = [key for key in self.__preprocessed_paras.keys() if
((name in key) and ('inc_angle' not in key) and ('LocalIncidenceAngle' not in key))]
for key in key_list:
if "_HH" in key:
cols = self.imageHandler.get_img_width(self.__preprocessed_paras[key])
rows = self.imageHandler.get_img_height(self.__preprocessed_paras[key])
return rows, cols
def create_feature_single_tar(self, name):
key_list = [key for key in self.__preprocessed_paras.keys() if((name in key) and ('inc_angle' not in key) and ('LocalIncidenceAngle' not in key))]
ori_sim_key = [key for key in key_list if ('ori_sim' in key)][0]
ori_sim_path = self.__preprocessed_paras[ori_sim_key]
# 读取实测值,获取多边形区域内所有的点,分为训练集数据和测试集数据
train_data, test_data, type_map = csvh.trans_VegePhenology_measdata_dic(csvh.readcsv(self.__processing_paras['MeasuredData']), ori_sim_path)
logger.info("read phenology Measure.csv success!")
# 特征分解
hh_hv_vh_vv_list = ["","","",""]
for key in key_list:
if "_HH" in key :
hh_hv_vh_vv_list[0] = self.__preprocessed_paras[key]
if "_HV" in key:
hh_hv_vh_vv_list[1] = self.__preprocessed_paras[key]
if "_VH" in key:
hh_hv_vh_vv_list[2] = self.__preprocessed_paras[key]
if "_VV" in key:
hh_hv_vh_vv_list[3] = self.__preprocessed_paras[key]
cols = self.imageHandler.get_img_width(hh_hv_vh_vv_list[0])
rows = self.imageHandler.get_img_height(hh_hv_vh_vv_list[0])
feature_dir = CreateFeature.decompose_single_tar(hh_hv_vh_vv_list, self.__workspace_processing_path, self.__workspace_preprocessing_path, name,self._env_str, rows, cols, FILTER_SIZE=3, debug=DEBUG)
# 获取训练集提取特征的信息
ids = []
class_ids = []
ch_names = []
positions = []
for data in train_data:
if data[0] == name:
class_ids.append(data[1])
positions.append(data[2])
ch_names.append(data[3])
class_id = [map for map in type_map if (data[1] == map[1])][0]
ids.append(class_id[0])
train_data_dic = {}
train_data_dic.update({"ids": ids})
train_data_dic.update({"class_ids": class_ids})
train_data_dic.update({"ch_names": ch_names})
train_data_dic.update({"positions": positions})
name_test_data =[]
for dt in test_data:
if dt[0] == name:
name_test_data.append(dt)
return feature_dir, train_data_dic, name_test_data, type_map
def process_handle(self, start):
"""
算法主处理函数
:return: True or False
"""
# 生成每个时相的特征, 并提取训练集和测试集
# 每个时相的影像生成特征图
X_train, Y_train = None, None
flag = True
total_name_list = []
test_data = []
X_test_dic = {}
for name in self.__processing_paras['name_list']:
feature_dir, train_data_dic, test_data_part, type_map = self.create_feature_single_tar(name)
#生成训练集
X_train_part, Y_train_part = ml.gene_train_set(train_data_dic, feature_dir)
name_list = ml.get_name_list(feature_dir)
# 生成测试集合
rows, cols = self.get_name_rows_cols(name)
name_featuresPath_dic_part = ml.vegetationPhenology_combine_feature(feature_dir, self.__workspace_processing_path, name, rows, cols, DEBUG)
X_test_dic_part = self.gene_test_set(test_data_part, name_featuresPath_dic_part, name)
X_test_dic.update(X_test_dic_part)
if flag:
X_train = X_train_part
Y_train = Y_train_part
total_name_list = name_list
flag = False
test_data = test_data_part
else:
X_train = np.vstack((X_train, X_train_part))
Y_train = np.hstack((Y_train, Y_train_part))
total_name_list = total_name_list + name_list
test_data = test_data + test_data_part
logger.info("create_features success!")
logger.info('progress bar: 20%')
# if DEBUG:
# optimal_feature = [1, 2, 3]
# optimal_X_train = X_train[:, optimal_feature]
# optimal_Y_train = Y_train
# optimal_X_train = optimal_X_train[0:100, :]
# optimal_Y_train = optimal_Y_train[0:100]
# optimal_Y_train[0:100] = 1
# optimal_Y_train[50:100] = 2
# else:
optimal_X_train, optimal_Y_train, optimal_feature = ml.sel_optimal_feature(X_train, Y_train, total_name_list, correlation_threshold=0.7)
logger.info("generate train and test set success!")
logger.info('progress bar: 30%')
#RF
clf = ml.trainRF(optimal_X_train, optimal_Y_train)
logger.info('svm train success!')
logger.info('progress bar: 80%')
# 测试数据
logger.info('mode testing')
product_path = self.predict(clf, X_test_dic, optimal_feature, type_map, start)
logger.info('mode test success!')
self.create_meta_file(product_path)
# 文件夹打包
file.make_targz(self.__out_para, self.__product_dic)
logger.info('progress bar: 100%')
def predict(self, mode, X_test_dic, validity_list, type_map, start):
# 测试数据
clf = mode
for img_name, value in X_test_dic.items():
X_test = value[0]
row_list = value[1]
col_list = value[2]
# a = X_test[:, validity_list]
# Y_test = clf.predict(a)
Y_test = clf.predict(X_test[:, validity_list])
# for type in type_map:
# train_type = type[0]
# type_id = type[1]
# Y_test[np.where(Y_test == train_type)] = type_id
# rows, cols = self.get_name_rows_cols(img_name)
# array = np.zeros((rows, cols), dtype='float16')
# array[row_list, col_list] = Y_test
# array[np.where(array == 0)] = np.nan
# product_path = self.__workspace_processing_path + img_name + '_VegetationPhenologyProduct.tif'
# product_geo_path = self.__product_dic + img_name + '_VegetationPhenologyProduct.tif'
# int_16_array = np.array(array, dtype=np.int16)
rows, cols = self.get_name_rows_cols(img_name)
array = np.zeros((rows, cols), dtype='float16')
array[row_list, col_list] = Y_test
array[np.where(array == 0)] = np.nan
temp_path = self.__workspace_processing_path + img_name + '_temp.tif'
self.imageHandler.write_img(temp_path, '', [0, 0, 0, 0, 0, 0], array)
proj, geo, cover_data = self.imageHandler.read_img(temp_path)
cover_data = np.uint8(cover_data)
kernel = np.ones((3, 3), np.uint8)
cover_data = cv2.erode(cv2.dilate(cover_data, kernel), kernel)
for type in type_map:
train_type = type[0]
type_id = type[1]
cover_data[np.where(cover_data == train_type)] = type_id
product_path = self.__workspace_processing_path + img_name + '_VegetationPhenologyProduct.tif'
product_geo_path = self.__product_dic + img_name + '_VegetationPhenologyProduct.tif'
# int_16_array = np.array(array, dtype=np.int16)
ImageHandler.write_img(product_path, '', [0, 0, 0, 0, 0, 0], cover_data)
tr = self._name_tr_dic[img_name]
# l1a图像坐标转换地理坐标
tr.l1a_2_geo_int(self.__preprocessed_paras[img_name + '_ori_sim'], product_path, product_geo_path, 'nearest')
# 生成快视图
self.imageHandler.write_quick_view(product_geo_path, color_img=True)
return product_geo_path
# def test_resamp():
# # 形态学闭运算去roi区域噪点
# # cover_data = np.uint8(cover_data)
# # kernel = np.ones((5, 5), np.uint8)
# # cover_data = cv2.erode(cv2.dilate(cover_data, kernel), kernel)
# pass
def create_meta_file(self, product_path):
# 生成元文件案例
xml_path = "./model_meta.xml"
tem_folder = self.__workspace_path + EXE_NAME + r"\Temporary""\\"
image_path = product_path
out_path1 = os.path.join(tem_folder, "trans_geo_projcs.tif")
out_path2 = os.path.join(tem_folder, "trans_projcs_geo.tif")
par_dict = CreateDict(image_path, self.processinfo, out_path1, out_path2).calu_nature(start)
model_xml_path = os.path.join(tem_folder, "creat_standard.meta.xml") # 输出xml路径
CreateStadardXmlFile(xml_path, self.alg_xml_path, par_dict, model_xml_path).create_standard_xml()
SrcImagePath = self.__input_paras["AHVS"]['ParaValue']
paths = SrcImagePath.split(';')
SrcImageName = os.path.split(paths[0])[1].split('.tar.gz')[0]
if len(paths) >= 2:
for i in range(1, len(paths)):
SrcImageName = SrcImageName + ";" + os.path.split(paths[i])[1].split('.tar.gz')[0]
meta_xml_path = os.path.join(self.__product_dic, EXE_NAME + "Product.meta.xml")
self.type_id_name = csvh.vegePhenology_class_list(self.__processing_paras['MeasuredData'])
CreateMetafile(self.__processing_paras['META'], self.alg_xml_path, model_xml_path, meta_xml_path).process2(
self.type_id_name, None, SrcImageName)
if __name__ == '__main__':
multiprocessing.freeze_support()
start = datetime.datetime.now()
try:
if len(sys.argv) < 2:
xml_path = 'VegetationPhenology.xml'
else:
xml_path = sys.argv[1]
main_handler = PhenologyMain(xml_path)
if main_handler.check_source() is False:
raise Exception('check_source() failed!')
if main_handler.preprocess_handle() is False:
raise Exception('preprocess_handle() failed!')
if main_handler.process_handle(start) is False:
raise Exception('process_handle() failed!')
logger.info('successful production of ' + EXE_NAME + ' products!')
except Exception:
logger.exception("run-time error!")
finally:
main_handler.del_temp_workspace()
end = datetime.datetime.now()
msg = 'running use time: %s ' % (end - start)
logger.info(msg)