microproduct-l-sar/vegetationPhenology-L-SAR/VegetationPhenologyMain.py

631 lines
29 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

# -*- coding: UTF-8 -*-
"""
@Project microproduct
@File PhenologyMain.py
@Author SHJ
@Contact土壤盐碱度算法主函数
@Date 2021/9/6
@Version 1.0.0
"""
import csv
import glob
import logging
import os
import datetime
import shutil
import pyproj._compat
import cv2
import numpy as np
from osgeo import gdal, osr, ogr
from tool.algorithm.algtools.PreProcess import PreProcess as pp
from tool.algorithm.image.ImageHandle import ImageHandler
from tool.algorithm.xml.AlgXmlHandle import ManageAlgXML, CheckSource, InitPara
from tool.algorithm.algtools.logHandler import LogHandler
from tool.algorithm.xml.AnalysisXml import xml_extend
from tool.algorithm.algtools.ROIAlg import ROIAlg as alg
from VegetationPhenologyAuxData import PhenoloyMeasCsv_geo
from tool.algorithm.polsarpro.AHVToPolsarpro import AHVToPolsarpro
from tool.algorithm.xml.CreateMetaDict import CreateMetaDict, CreateProductXml
from tool.file.fileHandle import fileHandle
from tool.algorithm.algtools.ROIAlg import ROIAlg as roi
import sys
import multiprocessing
from tool.algorithm.transforml1a.transHandle import TransImgL1A
from tool.csv.csvHandle import csvHandle
from tool.algorithm.polsarpro.createfeature import CreateFeature
import multiprocessing
from tool.algorithm.ml.machineLearning import MachineLeaning as ml
from tool.config.ConfigeHandle import Config as cf
csvh = csvHandle()
if cf.get('debug') == 'True':
DEBUG = True
else:
DEBUG = False
EXE_NAME = cf.get('exe_name')
LogHandler.init_log_handler('run_log\\' + EXE_NAME)
logger = logging.getLogger("mylog")
tar = r'-' + cf.get('tar')
productLevel = cf.get('productLevel')
FILTER_SIZE = int(cf.get('filter_size'))
MAX_TRAN_NUM = int(cf.get('max_tran__num_per_class'))
multiprocessing_num = int(cf.get('multiprocessing_num'))
file =fileHandle(DEBUG)
# env_str = os.path.split(os.path.realpath(__file__))[0]
env_str =os.path.dirname(os.path.abspath(sys.argv[0]))
os.environ['PROJ_LIB'] = env_str
class PhenologyMain:
"""
土壤粗糙度处理主函数
"""
def __init__(self, alg_xml_path):
self.alg_xml_path = alg_xml_path
self.imageHandler = ImageHandler()
self.__alg_xml_handler = ManageAlgXML(alg_xml_path)
self.__check_handler = CheckSource(self.__alg_xml_handler)
self.__workspace_path, self.__out_para = None, None
self.__input_paras, self.__processing_paras, self.__preprocessed_paras = {}, {}, {}
self.processinfo = [1, 1, 1, 1]
self._name_tr_dic = {} #l1a转换geo的对象
self.__FeatureMap = {}
# 特征索引对应的文件名
self.___FeatureFileNameMap = {}
# 初始化特征参数
self.__FeatureParaInit()
def check_source(self):
"""
检查算法相关的配置文件,图像,辅助文件是否齐全
"""
self._env_str = os.getcwd()
logger.info("sysdir: %s", self._env_str)
self.__check_handler.check_alg_xml()
self.__check_handler.check_run_env()
# 检查景影像是否为全极化
self.__input_paras = self.__alg_xml_handler.get_input_paras()
checkFlag, self.__parameters_dic = self.__check_handler.check_input_paras(self.__input_paras)
if checkFlag is False:
return False
# if self.__check_handler.check_input_paras(self.__input_paras) is False:
# return False
self.__workspace_path = self.__alg_xml_handler.get_workspace_path()
self.__create_work_space()
self.__processing_paras = InitPara.init_processing_paras(self.__input_paras, self.__workspace_preprocessed_path)
self.__processing_paras.update(InitPara(DEBUG).get_mult_tar_gz_infs(self.__processing_paras, self.__workspace_preprocessing_path))
SrcImagePath = self.__input_paras["AHVS"]['ParaValue']
paths = SrcImagePath.split(';')
SrcImageName = os.path.split(paths[0])[1].split('.tar.gz')[0]
result_name = SrcImageName + tar + ".tar.gz"
self.__out_para = os.path.join(self.__workspace_path, EXE_NAME, 'Output', result_name)
self.__alg_xml_handler.write_out_para("VegetationPhenologyProduct", self.__out_para) #写入输出参数
logger.info('check_source success!')
logger.info('progress bar: 10%')
return True
def __create_work_space(self):
"""
删除原有工作区文件夹,创建新工作区文件夹
"""
self.__workspace_preprocessing_path = self.__workspace_path + EXE_NAME + '\\Temporary\\preprocessing\\'
self.__workspace_preprocessed_path = self.__workspace_path + EXE_NAME + '\\Temporary\\preprocessed\\'
self.__workspace_processing_path = self.__workspace_path + EXE_NAME + '\\Temporary\\processing\\'
self.__product_dic = self.__workspace_processing_path + 'product\\'
path_list = [self.__workspace_preprocessing_path, self.__workspace_preprocessed_path,
self.__workspace_processing_path, self.__product_dic]
file.creat_dirs(path_list)
logger.info('create new workspace success!')
def del_temp_workspace(self):
"""
临时工作区
"""
if DEBUG is True:
return
path = self.__workspace_path + EXE_NAME + r'\Temporary'
if os.path.exists(path):
file.del_folder(path)
def preprocess_single_tar(self,name,scopes_roi):
key_list = [key for key in self.__processing_paras.keys() if((name in key) and ('inc_angle' not in key) and ('LocalIncidenceAngle' not in key)and ('pola' not in key)
and ('paraMeter' not in key) and ('ori_sim' not in key)and ('Origin_META' not in key)and ('META' not in key))]
ori_sim_key = [key for key in key_list if ('sim_ori' in key)][0]
ori_sim_path = self.__processing_paras[ori_sim_key]
key_list.remove(ori_sim_key)
hh_key = [key for key in key_list if ('HH' in key)][0]
l1a_width = ImageHandler.get_img_width(self.__processing_paras[hh_key])
l1a_height = ImageHandler.get_img_height(self.__processing_paras[hh_key])
tr = TransImgL1A(ori_sim_path, scopes_roi, l1a_height, l1a_width)
for k in key_list:
out_path = os.path.join(self.__workspace_preprocessed_path, k + "_preprocessed.tif")
tr.cut_L1A(self.__processing_paras[k], out_path)
self.__preprocessed_paras.update({k: out_path})
self._name_tr_dic.update({name: tr})
def preprocess_handle(self):
"""
预处理
"""
# scopes_roi = pp().cal_scopes_roi(self.__processing_paras)
p = pp()
for name in self.__processing_paras['name_list']:
para_names_geo = [name + '_sim_ori', 'Covering']
p.check_img_projection(self.__workspace_preprocessing_path, para_names_geo, self.__processing_paras)
# 计算roi
scopes = ()
scopes += (xml_extend(self.__processing_paras[name + '_META']).get_extend(),)
scopes += p.box2scope(self.__processing_paras['box'])
# 计算图像的轮廓,并求相交区域
intersect_shp_path = self.__workspace_preprocessing_path + 'IntersectPolygon.shp'
scopes_roi = p.cal_intersect_shp(intersect_shp_path, para_names_geo, self.__processing_paras, scopes)
cutted_img_paths = p.cut_imgs_VP(self.__workspace_preprocessing_path, para_names_geo, self.__processing_paras,
intersect_shp_path, name)
self.__preprocessed_paras.update({name + '_sim_ori': cutted_img_paths.get(name + '_sim_ori')})
self.__preprocessed_paras.update({name + '_Covering': cutted_img_paths.get('Covering')})
self.preprocess_single_tar(name, scopes_roi)
# self.__feature_name_list = para_names_geo
# cutted_img_paths, scopes_roi = p.cut_geoimg(self.__workspace_preprocessing_path, para_names_geo,
# self.__processing_paras)
# self.__preprocessed_paras.update({name + '_sim_ori': cutted_img_paths.get(name + '_sim_ori')})
# self.__preprocessed_paras.update({name + '_Covering': cutted_img_paths.get('Covering')})
logger.info('preprocess_handle success!')
logger.info('progress bar: 10%')
def gene_test_set(self, test_data, features_path_dic, name):
# 生成测试集
X_test_dic = {} # 同时测试多张影像
for tif_name in features_path_dic:
feature_array = self.imageHandler.get_data(features_path_dic[tif_name])
dim = feature_array.shape[0]
row_list = []
col_list = []
x_test = np.empty(shape=(0, dim))
for data in test_data:
if data[0] == tif_name:
row, col = zip(*data[2])
row_list = row_list + list(row)
col_list = col_list + list(col)
x_test = np.vstack((x_test, feature_array[:, row, col].T))
if row_list != []:
X_test_dic.update({tif_name: [x_test, row_list, col_list]})
return X_test_dic
def get_name_rows_cols(self, name):
key_list = [key for key in self.__preprocessed_paras.keys() if
((name in key) and ('inc_angle' not in key) and ('LocalIncidenceAngle' not in key))]
for key in key_list:
if "_HH" in key:
cols = self.imageHandler.get_img_width(self.__preprocessed_paras[key])
rows = self.imageHandler.get_img_height(self.__preprocessed_paras[key])
return rows, cols
def __getInputFeatures(self):
"""
获取输入特征组合
1.Freeman表面散射p_s、偶次散射p_d、体散射p_v 对应序号:[0,1,2]
2.Touzi散射角α_s、散射相位ϕ_α、目标散射对称度τ、相对能量λ_i 对应序号:[3,4,5,6]
3.Yamaguchi表面散射f_s、二次散射f_d、体散射f_v、螺旋体散射f_h 对应序号:[7,8,9,10]
4.Cloude - Pottier分解散射熵H、反熵A、平均散射角α 对应序号:[11,12,13]
"""
decomposeList = self.__parameters_dic["FeatureCombination"].split(',')
stepSet = set()
featureInput = set()
for item in decomposeList:
stepSet.add(int(item))
for i in stepSet:
if 0 <= i < 3:
featureInput.add('Freeman')
elif 3 <= i < 7:
featureInput.add('Yamaguchi')
elif 7 <= i < 10:
featureInput.add('Cloude')
else:
logger.warning("__decomposeSteps Invalid data:%s!", i)
return featureInput
def __FeatureParaInit(self):
self.__FeatureMap["Freeman"] = [0, 1, 2]
self.__FeatureMap["Yamaguchi"] = [3, 4, 5, 6]
self.__FeatureMap["Cloude"] = [7, 8, 9]
self.___FeatureFileNameMap[0] = ['Freeman', "Freeman_Odd.bin"]
self.___FeatureFileNameMap[1] = ['Freeman', "Freeman_Dbl.bin"]
self.___FeatureFileNameMap[2] = ['Freeman', "Freeman_Vol.bin"]
self.___FeatureFileNameMap[3] = ['Yamaguchi', "Yamaguchi4_Odd.bin"]
self.___FeatureFileNameMap[4] = ['Yamaguchi', "Yamaguchi4_Dbl.bin"]
self.___FeatureFileNameMap[5] = ['Yamaguchi', "Yamaguchi4_Vol.bin"]
self.___FeatureFileNameMap[6] = ['Yamaguchi', "Yamaguchi4_Hlx.bin"]
self.___FeatureFileNameMap[7] = ['Cloude', "anisotropy.bin"]
self.___FeatureFileNameMap[8] = ['Cloude', "entropy.bin"]
self.___FeatureFileNameMap[9] = ['Cloude', "alpha.bin"]
def calInterpolation_bil_Wgs84_rc_sar_sigma(self, parameter_path, dem_rc, in_sar, out_sar):
'''
# std::cout << "mode 11";
# std::cout << "SIMOrthoProgram-L-SAR.exe 11 in_parameter_path in_rc_wgs84_path in_ori_sar_path out_orth_sar_path";
'''
exe_path = r".\baseTool\x64\Release\SIMOrthoProgram-L-SAR.exe"
exe_cmd = r"set PROJ_LIB=.\baseTool\x64\Release; & {0} {1} {2} {3} {4} {5}".format(exe_path, 11, parameter_path,
dem_rc, in_sar, out_sar)
# print(exe_cmd)
os.system(exe_cmd)
# print(os.system(exe_cmd))
# print("==========================================================================")
def create_feature_single_tar(self, name):
key_list = [key for key in self.__preprocessed_paras.keys() if((name in key) and ('inc_angle' not in key) and ('LocalIncidenceAngle' not in key))]
# ori_sim_key = [key for key in key_list if ('ori_sim' in key)][0]
sim_ori_key = [key for key in key_list if ('sim_ori' in key)][0]
# ori_sim_path = self.__preprocessed_paras[ori_sim_key]
sim_ori_path = self.__preprocessed_paras[sim_ori_key]
hh_path = self.__preprocessed_paras[name + "_HH"]
hh_geo_path = os.path.join(self.__workspace_processing_path, name.split('-')[0] + '_geo.tif')
paramter = self.__processing_paras[name + "paraMeter"]
self.calInterpolation_bil_Wgs84_rc_sar_sigma(paramter, sim_ori_path, hh_path, hh_geo_path)
# 读取实测值,获取多边形区域内所有的点,分为训练集数据和测试集数据
pm = PhenoloyMeasCsv_geo(self.__processing_paras['MeasuredData'], hh_geo_path, MAX_TRAN_NUM)
train_data_list = pm.api_read_measure_by_name(name)
train_data_dic = csvh.trans_landCover_list2dic(train_data_list)
# train_data, test_data, type_map = csvh.trans_VegePhenology_measdata_dic(csvh.readcsv(self.__processing_paras['MeasuredData']), ori_sim_path)
logger.info("read phenology Measure.csv success!")
# 添加四极化后向散射系数到特征图中
feature_tif_dir = os.path.join(self.__workspace_processing_path, name.split('_')[7], 'features')
if not os.path.exists(feature_tif_dir):
os.makedirs(feature_tif_dir)
origin_xml = self.__processing_paras[name + "_Origin_META"]
# 特征分解
hh_hv_vh_vv_list = ["","","",""]
for key in key_list:
if "_HH" in key:
hh_hv_vh_vv_list[0] = self.__preprocessed_paras[key]
outPath = os.path.join(feature_tif_dir, 'cal_HH.tif')
AHVToPolsarpro.sar_backscattering_sigma(hh_hv_vh_vv_list[0], origin_xml, outPath)
if "_HV" in key:
hh_hv_vh_vv_list[1] = self.__preprocessed_paras[key]
outPath = os.path.join(feature_tif_dir, 'cal_HV.tif')
AHVToPolsarpro.sar_backscattering_sigma(hh_hv_vh_vv_list[1], origin_xml, outPath)
if "_VH" in key:
hh_hv_vh_vv_list[2] = self.__preprocessed_paras[key]
outPath = os.path.join(feature_tif_dir, 'cal_VH.tif')
AHVToPolsarpro.sar_backscattering_sigma(hh_hv_vh_vv_list[2], origin_xml, outPath)
if "_VV" in key:
hh_hv_vh_vv_list[3] = self.__preprocessed_paras[key]
outPath = os.path.join(feature_tif_dir, 'cal_VV.tif')
AHVToPolsarpro.sar_backscattering_sigma(hh_hv_vh_vv_list[3], origin_xml, outPath)
cols = self.imageHandler.get_img_width(hh_hv_vh_vv_list[0])
rows = self.imageHandler.get_img_height(hh_hv_vh_vv_list[0])
featureInput = self.__getInputFeatures()
feature_dir = CreateFeature.decompose_single_tar(hh_hv_vh_vv_list, self.__workspace_processing_path, self.__workspace_preprocessing_path, name, self._env_str, rows, cols, FILTER_SIZE=3, debug=DEBUG, FeatureInput=featureInput)
logger.info("features slc to wgs84 start!")
feature_geo_dir = self.features_geo(feature_dir, paramter, sim_ori_path, name)
logger.info("features to wgs84 end!")
# # 获取训练集提取特征的信息
# ids = []
# class_ids = []
# ch_names = []
# positions = []
# for data in train_data:
# if data[0] == name:
# class_ids.append(data[1])
# positions.append(data[2])
# ch_names.append(data[3])
# class_id = [map for map in type_map if (data[1] == map[1])][0]
# ids.append(class_id[0])
# train_data_dic = {}
# train_data_dic.update({"ids": ids})
# train_data_dic.update({"class_ids": class_ids})
# train_data_dic.update({"ch_names": ch_names})
# train_data_dic.update({"positions": positions})
# name_test_data =[]
# for dt in test_data:
# if dt[0] == name:
# name_test_data.append(dt)
logger.info("create_features success!")
logger.info('progress bar: 20%')
# return feature_dir, train_data_dic, name_test_data, type_map
return feature_geo_dir, train_data_dic
def features_geo(self, features_path, paraMeter, sim_ori, sar_name):
dir = os.path.join(self.__workspace_processing_path, sar_name.split('_')[7], 'features_geo')
if not os.path.exists(dir):
os.mkdir(dir)
in_tif_paths = list(glob.glob(os.path.join(features_path, '*.tif')))
processes_num = min([len(in_tif_paths), multiprocessing_num, multiprocessing.cpu_count() - 1])
pool = multiprocessing.Pool(processes=processes_num)
pl = []
for file in in_tif_paths:
name = os.path.basename(file).split('.')[0] + '_geo.tif'
out_path = os.path.join(dir, name)
pl.append(pool.apply_async(self.calInterpolation_bil_Wgs84_rc_sar_sigma, (paraMeter, sim_ori, file, out_path))) # topp
pool.close()
pool.join()
return dir
def process_handle(self, start):
"""
算法主处理函数
:return: True or False
"""
# 生成每个时相的特征, 并提取训练集和测试集
# 每个时相的影像生成特征图
X_train = []
Y_train = []
flag = True
total_name_list = []
X_test_dic = {}
for name in self.__processing_paras['name_list']:
feature_dir, train_data_dic = self.create_feature_single_tar(name)
# 生成训练集
# X_train_part, Y_train_part, optimal_feature = ml.gene_optimal_train_set(train_data_dic, feature_dir, 0.08, 0.7)
X_train_part, Y_train_part = ml.gene_train_set(train_data_dic, feature_dir)
name_list = ml.get_name_list(feature_dir)
# 生成测试集合
# X_test_path_list = ml.gene_test_set(feature_dir, optimal_feature)
# X_test_dic.update({name: X_test_path_list})
X_test_dic.update({name + '_features': feature_dir})
if X_train == []:
X_train = X_train_part
Y_train = Y_train_part
total_name_list = name_list
else:
X_train = np.vstack((X_train, X_train_part))
Y_train = np.hstack((Y_train, Y_train_part))
total_name_list = total_name_list + name_list
logger.info("generate train and test set success!")
logger.info('progress bar: 30%')
optimal_X_train, optimal_Y_train, optimal_feature = ml.sel_optimal_feature(X_train, Y_train, total_name_list, 0.07, 0.85)
# RF
clf = ml.trainRF(optimal_X_train, optimal_Y_train)
logger.info('RF train success!')
# 测试数据
logger.info('mode testing')
for name in self.__processing_paras['name_list']:
feature_dir = X_test_dic.get(name + '_features')
X_test_path_list = ml.gene_test_set(feature_dir, optimal_feature)
in_tif_paths = list(glob.glob(os.path.join(feature_dir, '*.tif')))
rows = ImageHandler.get_img_height(in_tif_paths[0])
cols = ImageHandler.get_img_width(in_tif_paths[0])
proj_geo, geo_geo, cover_data_geo = self.imageHandler.read_img(in_tif_paths[0])
product_path = ml.predict_VP(clf, X_test_path_list, name, self.__workspace_processing_path, rows, cols)
# product_path = ml.predict_VP(clf, X_test_dic.get(name), name, self.__workspace_processing_path, rows, cols)
proj, geo, cover_data = self.imageHandler.read_img(product_path)
# 形态学闭运算去roi区域噪点
cover_data = np.uint8(cover_data)
kernel = np.ones((5, 5), np.uint8)
cover_data = cv2.erode(cv2.dilate(cover_data, kernel), kernel)
cover_data = np.int16(cover_data)
train_dic = self.get_train_dic(self.__processing_paras['MeasuredData'])
for id, class_id in zip(train_dic['ids'], train_dic['class_ids']):
cover_data[np.where(cover_data == id)] = class_id
roi_img = self.imageHandler.get_band_array(self.create_roi(in_tif_paths[0]))
# 获取影像roi区域
cover_data_pro = cover_data * roi_img
cover_geo_path = os.path.join(self.__workspace_processing_path, os.path.basename(product_path).split('.tif')[0] + '-VPtemp.tif')
self.imageHandler.write_img(cover_geo_path, proj_geo, geo_geo, cover_data_pro)
self.resampleImgs(name, cover_geo_path)
para_names = ['Covering']
mask_dir = os.path.join(self.__workspace_processing_path, name + '\\')
bare_land_mask_path = roi().roi_process_VP(para_names, mask_dir,
self.__processing_paras, self.__preprocessed_paras, name)
product_path_pro = os.path.join(self.__product_dic, os.path.basename(product_path).split('.tif')[0] + tar + '.tif')
# 获取影像roi区域
roi.cal_roi(product_path_pro, cover_geo_path, bare_land_mask_path, background_value=0)
self.imageHandler.write_quick_view(product_path_pro, color_img=True)
meta_xml_path = self.create_meta_file(product_path_pro)
temp_folder = os.path.join(self.__workspace_path, EXE_NAME, 'Output')
out_xml = os.path.join(temp_folder, os.path.basename(meta_xml_path))
if os.path.exists(temp_folder) is False:
os.mkdir(temp_folder)
# CreateProductXml(para_dict, model_path, out_xml).create_standard_xml()
shutil.copy(meta_xml_path, out_xml)
logger.info('mode test success!')
logger.info('progress bar: 80%')
# 文件夹打包
file.make_targz(self.__out_para, self.__product_dic)
logger.info('progress bar: 100%')
def get_train_dic(self, csv_path):
reader = csv.reader(open(csv_path, newline=''))
ids = []
class_ids = []
csv_list = []
for line_data in reader:
csv_list.append(line_data)
for data in csv_list[1:]:
ids.append(data[0])
class_ids.append(data[1])
train_data_dic = {}
train_data_dic.update({"ids": ids})
train_data_dic.update({"class_ids": class_ids})
return train_data_dic
def predict(self, mode, X_test_dic, validity_list, type_map, start):
# 测试数据
clf = mode
for img_name, value in X_test_dic.items():
X_test = value[0]
row_list = value[1]
col_list = value[2]
# a = X_test[:, validity_list]
# Y_test = clf.predict(a)
Y_test = clf.predict(X_test[:, validity_list])
# for type in type_map:
# train_type = type[0]
# type_id = type[1]
# Y_test[np.where(Y_test == train_type)] = type_id
# rows, cols = self.get_name_rows_cols(img_name)
# array = np.zeros((rows, cols), dtype='float16')
# array[row_list, col_list] = Y_test
# array[np.where(array == 0)] = np.nan
# product_path = self.__workspace_processing_path + img_name + '_VegetationPhenologyProduct.tif'
# product_geo_path = self.__product_dic + img_name + '_VegetationPhenologyProduct.tif'
# int_16_array = np.array(array, dtype=np.int16)
rows, cols = self.get_name_rows_cols(img_name)
array = np.zeros((rows, cols), dtype='float16')
array[row_list, col_list] = Y_test
array[np.where(array == 0)] = np.nan
temp_path = self.__workspace_processing_path + img_name + '_temp.tif'
self.imageHandler.write_img(temp_path, '', [0, 0, 0, 0, 0, 0], array)
proj, geo, cover_data = self.imageHandler.read_img(temp_path)
cover_data = np.uint8(cover_data)
kernel = np.ones((3, 3), np.uint8)
cover_data = cv2.erode(cv2.dilate(cover_data, kernel), kernel)
for type in type_map:
train_type = type[0]
type_id = type[1]
cover_data[np.where(cover_data == train_type)] = type_id
product_path = self.__workspace_processing_path + img_name + '_VegetationPhenologyProduct.tif'
product_geo_path = self.__product_dic + img_name + '_VegetationPhenologyProduct.tif'
# int_16_array = np.array(array, dtype=np.int16)
ImageHandler.write_img(product_path, '', [0, 0, 0, 0, 0, 0], cover_data)
tr = self._name_tr_dic[img_name]
# l1a图像坐标转换地理坐标
tr.l1a_2_geo_int(self.__preprocessed_paras[img_name + '_ori_sim'], product_path, product_geo_path, 'nearest')
# 生成快视图
self.imageHandler.write_quick_view(product_geo_path, color_img=True)
return product_geo_path
def resampleImgs(self, name, refer_img_path):
cover_rampling_path = os.path.join(self.__workspace_processing_path, name + "_cover.tif")
pp.resampling_by_scale(self.__preprocessed_paras[name + "_Covering"], cover_rampling_path, refer_img_path)
self.__preprocessed_paras[name + "_Covering"] = cover_rampling_path
def create_roi(self, img_path):
"""
计算ROI掩膜
:return:掩膜路径
"""
processing_path = self.__workspace_processing_path
# 利用影像的有效范围生成MASK
tif_mask_path = os.path.join(processing_path, "tif_mask.tif") # processing_path + "tif_mask.tif"
alg.trans_tif2mask(tif_mask_path, img_path, np.nan)
logger.info('create ROI image success!')
return tif_mask_path
# def test_resamp():
# # 形态学闭运算去roi区域噪点
# # cover_data = np.uint8(cover_data)
# # kernel = np.ones((5, 5), np.uint8)
# # cover_data = cv2.erode(cv2.dilate(cover_data, kernel), kernel)
# pass
def create_meta_file(self, product_path):
# 生成元文件案例
# xml_path = "./model_meta.xml"
tem_folder = self.__workspace_path + EXE_NAME + r"\Temporary""\\"
image_path = product_path
out_path1 = os.path.join(tem_folder, "trans_geo_projcs.tif")
out_path2 = os.path.join(tem_folder, "trans_projcs_geo.tif")
# par_dict = CreateDict(image_path, self.processinfo, out_path1, out_path2).calu_nature(start)
# model_xml_path = os.path.join(tem_folder, "creat_standard.meta.xml") # 输出xml路径
# CreateStadardXmlFile(xml_path, self.alg_xml_path, par_dict, model_xml_path).create_standard_xml()
#
# SrcImagePath = self.__input_paras["AHVS"]['ParaValue']
# paths = SrcImagePath.split(';')
# SrcImageName = os.path.split(paths[0])[1].split('.tar.gz')[0]
# if len(paths) >= 2:
# for i in range(1, len(paths)):
# SrcImageName = SrcImageName + ";" + os.path.split(paths[i])[1].split('.tar.gz')[0]
# meta_xml_path = os.path.join(self.__product_dic, EXE_NAME + "Product.meta.xml")
# self.type_id_name = csvh.vegePhenology_class_list(self.__processing_paras['MeasuredData'])
#
# CreateMetafile(self.__processing_paras['META'], self.alg_xml_path, model_xml_path, meta_xml_path).process2(
# self.type_id_name, None, SrcImageName)
SrcImageName = os.path.basename(product_path).split('.tif')[0]
model_path = "./product.xml"
meta_xml_path = os.path.join(self.__product_dic, SrcImageName + ".meta.xml")
key = os.path.basename(product_path).split('-VP.tif')[0] + '_Origin_META'
para_dict = CreateMetaDict(image_path, self.__processing_paras[key], self.__workspace_processing_path,
out_path1, out_path2).calu_nature()
para_dict.update({"imageinfo_ProductName": "植被物候"})
para_dict.update({"imageinfo_ProductIdentifier": "VegetationPhenology"})
para_dict.update({"imageinfo_ProductLevel": productLevel})
para_dict.update({"ProductProductionInfo_BandSelection": "1,2"})
para_dict.update({"ProductProductionInfo_AuxiliaryDataDescription": "Label"})
CreateProductXml(para_dict, model_path, meta_xml_path).create_standard_xml()
return meta_xml_path
if __name__ == '__main__':
multiprocessing.freeze_support()
start = datetime.datetime.now()
try:
if len(sys.argv) < 2:
xml_path = 'VegetationPhenology.xml'
else:
xml_path = sys.argv[1]
main_handler = PhenologyMain(xml_path)
if main_handler.check_source() is False:
raise Exception('check_source() failed!')
if main_handler.preprocess_handle() is False:
raise Exception('preprocess_handle() failed!')
if main_handler.process_handle(start) is False:
raise Exception('process_handle() failed!')
logger.info('successful production of ' + EXE_NAME + ' products!')
except Exception:
logger.exception("run-time error!")
finally:
main_handler.del_temp_workspace()
end = datetime.datetime.now()
msg = 'running use time: %s ' % (end - start)
logger.info(msg)