# -*- coding: UTF-8 -*- """ @Project:microproduct @File:ROIAlg.py @Function: @Contact: @Author:SHJ @Date:2021/11/17 @Version:1.0.0 """ import logging from tool.algorithm.image.ImageHandle import ImageHandler from tool.algorithm.algtools.PreProcess import PreProcess as pp import numpy as np logger = logging.getLogger("mylog") class ROIAlg: def __init__(self,): pass @staticmethod def roi_process(names, processing_path, processing_paras, preprocessed_paras): roi_paths = [] roi = ROIAlg() for name in names: if 'LocalIncidenceAngle' in name: # 利用角度为nan生成Mask pp.check_LocalIncidenceAngle(preprocessed_paras[name],preprocessed_paras[name]) angle_nan_mask_path = processing_path + 'angle_nan_mask.tif' roi.trans_tif2mask(angle_nan_mask_path, preprocessed_paras[name], np.nan) roi_paths.append(angle_nan_mask_path) elif ("HH" in name) or ("HV" in name) or ("VH" in name) or ("VV" in name): # 利用影像的有效范围生成MASK tif_mask_path = processing_path + name + "_tif_mask.tif" roi.trans_tif2mask(tif_mask_path, preprocessed_paras[name], np.nan) roi_paths.append(tif_mask_path) elif name == 'Covering': # 利用cover计算植被覆盖范围 cover_mask_path = processing_path + "cover_mask.tif" if processing_paras['CoveringIDs'] == 'empty': cover_data = ImageHandler.get_data(preprocessed_paras[name]) cover_data[np.where(np.isnan(cover_data))] = 0 cover_id_list = list(np.unique(cover_data)) else: cover_id_list = list(processing_paras['CoveringIDs'].split(';')) cover_id_list = [int(num) for num in cover_id_list] roi.trans_cover2mask(cover_mask_path, preprocessed_paras[name], cover_id_list) roi_paths.append(cover_mask_path) elif name == "NDVI": # 利用NDVI计算裸土范围该指数的输出值在 -1.0 和 1.0 之间,大部分表示植被量, # 负值主要根据云、水和雪而生成 # 接近零的值则主要根据岩石和裸土而生成。 # 较低的(小于等于 0.1)NDVI 值表示岩石、沙石或雪覆盖的贫瘠区域。 # 中等值(0.2 至 0.3)表示灌木丛和草地 # 较高的值(0.6 至 0.8)表示温带雨林和热带雨林。 ndvi_mask_path = processing_path + "ndvi_mask.tif" ndvi_scope = list(processing_paras['NDVIScope'].split(';')) threshold_of_ndvi_min = float(ndvi_scope[0]) threshold_of_ndvi_max = float(ndvi_scope[1]) roi.trans_tif2mask(ndvi_mask_path, preprocessed_paras[name], threshold_of_ndvi_min, threshold_of_ndvi_max) roi_paths.append(ndvi_mask_path) # else: # # 其他特征影像 # tif_mask_path = processing_path + name + "_mask.tif" # roi.trans_tif2mask(tif_mask_path, preprocessed_paras[name], np.nan) # roi_paths.append(tif_mask_path) bare_land_mask_path = processing_path + "bare_land_mask.tif" for roi_path in roi_paths: roi.combine_mask(bare_land_mask_path, roi_path, bare_land_mask_path) return bare_land_mask_path @staticmethod def roi_process_VP(names, processing_path, processing_paras, preprocessed_paras, file_name): roi_paths = [] roi = ROIAlg() for name in names: if 'LocalIncidenceAngle' in name: # 利用角度为nan生成Mask pp.check_LocalIncidenceAngle(preprocessed_paras[name], preprocessed_paras[name]) angle_nan_mask_path = processing_path + 'angle_nan_mask.tif' roi.trans_tif2mask(angle_nan_mask_path, preprocessed_paras[name], np.nan) roi_paths.append(angle_nan_mask_path) elif ("HH" in name) or ("HV" in name) or ("VH" in name) or ("VV" in name): # 利用影像的有效范围生成MASK tif_mask_path = processing_path + name + "_tif_mask.tif" roi.trans_tif2mask(tif_mask_path, preprocessed_paras[name], np.nan) roi_paths.append(tif_mask_path) elif name == 'Covering': # 利用cover计算植被覆盖范围 cover_mask_path = processing_path + "cover_mask.tif" if processing_paras['CoveringIDs'] == 'empty': cover_data = ImageHandler.get_data(preprocessed_paras[file_name + '_' + name]) cover_data[np.where(np.isnan(cover_data))] = 0 cover_id_list = list(np.unique(cover_data)) else: cover_id_list = list(processing_paras['CoveringIDs'].split(';')) cover_id_list = [int(num) for num in cover_id_list] roi.trans_cover2mask(cover_mask_path, preprocessed_paras[file_name + '_' + name], cover_id_list) roi_paths.append(cover_mask_path) elif name == "NDVI": # 利用NDVI计算裸土范围该指数的输出值在 -1.0 和 1.0 之间,大部分表示植被量, # 负值主要根据云、水和雪而生成 # 接近零的值则主要根据岩石和裸土而生成。 # 较低的(小于等于 0.1)NDVI 值表示岩石、沙石或雪覆盖的贫瘠区域。 # 中等值(0.2 至 0.3)表示灌木丛和草地 # 较高的值(0.6 至 0.8)表示温带雨林和热带雨林。 ndvi_mask_path = processing_path + "ndvi_mask.tif" ndvi_scope = list(processing_paras['NDVIScope'].split(';')) threshold_of_ndvi_min = float(ndvi_scope[0]) threshold_of_ndvi_max = float(ndvi_scope[1]) roi.trans_tif2mask(ndvi_mask_path, preprocessed_paras[name], threshold_of_ndvi_min, threshold_of_ndvi_max) roi_paths.append(ndvi_mask_path) # else: # # 其他特征影像 # tif_mask_path = processing_path + name + "_mask.tif" # roi.trans_tif2mask(tif_mask_path, preprocessed_paras[name], np.nan) # roi_paths.append(tif_mask_path) bare_land_mask_path = processing_path + "bare_land_mask.tif" for roi_path in roi_paths: roi.combine_mask(bare_land_mask_path, roi_path, bare_land_mask_path) return bare_land_mask_path @staticmethod def trans_tif2mask(out_mask_path, in_tif_path, threshold_min, threshold_max = None): """ :param out_mask_path:mask输出路径 :param in_tif_path:输入路径 :param threshold_min:最小阈值 :param threshold_max:最大阈值 :return: True or False """ image_handler = ImageHandler() proj = image_handler.get_projection(in_tif_path) geotrans = image_handler.get_geotransform(in_tif_path) array = image_handler.get_band_array(in_tif_path, 1) if threshold_max == None and np.isnan(threshold_min)==True: nan = np.isnan(array) mask = (nan.astype(int) == 0).astype(int) mask1 = ((array == -9999).astype(int) == 0).astype(int) mask *= mask1 image_handler.write_img(out_mask_path, proj, geotrans, mask) else: if threshold_min < threshold_max: mask = ((array > threshold_min) & (array < threshold_max)).astype(int) image_handler.write_img(out_mask_path, proj, geotrans, mask) elif threshold_min > threshold_max: mask = ((array < threshold_min) & (array > threshold_max)).astype(int) image_handler.write_img(out_mask_path, proj, geotrans, mask) elif threshold_max == threshold_min: mask = ((array == threshold_min).astype(int) == 0).astype(int) image_handler.write_img(out_mask_path, proj, geotrans, mask) logger.info("trans_tif2mask success, path: %s", out_mask_path) return True @staticmethod def trans_cover2mask(out_mask_path, in_tif_path, cover_id_list): """ :param out_mask_path:mask输出路径 :param in_tif_path:输入路径 :param cover_id_list 地表覆盖类型数据的id :return: True or False """ image_handler = ImageHandler() proj = image_handler.get_projection(in_tif_path) geotrans = image_handler.get_geotransform(in_tif_path) array = image_handler.get_band_array(in_tif_path, 1) mask = np.zeros(array.shape, dtype=bool) for id in cover_id_list: mask_tmp = (array == id) mask = mask | mask_tmp mask = mask.astype(int) image_handler.write_img(out_mask_path, proj, geotrans, mask) @staticmethod def combine_mask(out_mask_path, in_main_mask_path, in_sub_mask_path): """ :param out_mask_path:输出路径 :param in_main_mask_path:主mask路径,输出影像采用主mask的地理信息 :param in_sub_mask_path:副mask路径 """ image_handler = ImageHandler() proj = image_handler.get_projection(in_main_mask_path) geotrans = image_handler.get_geotransform(in_main_mask_path) main_array = image_handler.get_band_array(in_main_mask_path, 1) if image_handler.get_dataset(in_sub_mask_path) != None: sub_array = image_handler.get_band_array(in_sub_mask_path, 1) main_array = main_array * sub_array image_handler.write_img(out_mask_path, proj, geotrans, main_array) logger.info("combine_mask success, path: %s", out_mask_path) return True @staticmethod def cal_roi(out_tif_path, in_tif_path, mask_path, background_value=1): """ :param out_tif_path:ROI的影像 :param in_tif_path:计算ROI的影像 :param mask_path:掩模 :param background_value:无效区域设置的背景值 :return: True or False """ image_handler = ImageHandler() proj = image_handler.get_projection(in_tif_path) geotrans = image_handler.get_geotransform(in_tif_path) tif_array = image_handler.get_data(in_tif_path) # 读取所有波段的像元值存为数组 mask_array = image_handler.get_band_array(mask_path, 1) if len(tif_array.shape) == 3: im_bands, im_height, im_width = tif_array.shape else: im_bands, (im_height, im_width) = 1, tif_array.shape if im_bands == 1: tif_array[np.isnan(mask_array)] = background_value tif_array[mask_array == 0] = background_value elif im_bands>1: for i in range(0, im_bands): tif_array[i, :, :][np.isnan(mask_array)] = background_value tif_array[i, :, :][mask_array == 0] = background_value image_handler.write_img(out_tif_path, proj, geotrans, tif_array, '-9999') logger.info("cal_roi success, path: %s", out_tif_path) return True # if __name__ == '__main__': # dir = r'G:\MicroWorkspace\C-SAR\SoilMoisture\Temporary\processing/' # out_tif_path = dir + 'soil_moisture_roi.tif' # in_tif_path = dir + 'soil_moisture.tif' # mask_path = dir + 'bare_land_mask.tif' # background_value = np.nan # ROIAlg.cal_roi(out_tif_path, in_tif_path, mask_path, background_value) # pass