242 lines
12 KiB
Python
242 lines
12 KiB
Python
# -*- coding: UTF-8 -*-
|
||
"""
|
||
@Project:microproduct
|
||
@File:ROIAlg.py
|
||
@Function:
|
||
@Contact:
|
||
@Author:SHJ
|
||
@Date:2021/11/17
|
||
@Version:1.0.0
|
||
"""
|
||
import logging
|
||
from tool.algorithm.image.ImageHandle import ImageHandler
|
||
from tool.algorithm.algtools.PreProcess import PreProcess as pp
|
||
import numpy as np
|
||
|
||
|
||
logger = logging.getLogger("mylog")
|
||
|
||
|
||
class ROIAlg:
|
||
def __init__(self,):
|
||
pass
|
||
|
||
@staticmethod
|
||
def roi_process(names, processing_path, processing_paras, preprocessed_paras):
|
||
roi_paths = []
|
||
roi = ROIAlg()
|
||
for name in names:
|
||
if 'LocalIncidenceAngle' in name:
|
||
# 利用角度为nan生成Mask
|
||
pp.check_LocalIncidenceAngle(preprocessed_paras[name],preprocessed_paras[name])
|
||
angle_nan_mask_path = processing_path + 'angle_nan_mask.tif'
|
||
roi.trans_tif2mask(angle_nan_mask_path, preprocessed_paras[name], np.nan)
|
||
roi_paths.append(angle_nan_mask_path)
|
||
elif ("HH" in name) or ("HV" in name) or ("VH" in name) or ("VV" in name):
|
||
# 利用影像的有效范围生成MASK
|
||
tif_mask_path = processing_path + name + "_tif_mask.tif"
|
||
roi.trans_tif2mask(tif_mask_path, preprocessed_paras[name], np.nan)
|
||
roi_paths.append(tif_mask_path)
|
||
elif name == 'Covering':
|
||
# 利用cover计算植被覆盖范围
|
||
cover_mask_path = processing_path + "cover_mask.tif"
|
||
if processing_paras['CoveringIDs'] == 'empty':
|
||
cover_data = ImageHandler.get_data(preprocessed_paras[name])
|
||
cover_data[np.where(np.isnan(cover_data))] = 0
|
||
cover_id_list = list(np.unique(cover_data))
|
||
else:
|
||
cover_id_list = list(processing_paras['CoveringIDs'].split(';'))
|
||
cover_id_list = [int(num) for num in cover_id_list]
|
||
roi.trans_cover2mask(cover_mask_path, preprocessed_paras[name], cover_id_list)
|
||
roi_paths.append(cover_mask_path)
|
||
elif name == "NDVI":
|
||
# 利用NDVI计算裸土范围该指数的输出值在 -1.0 和 1.0 之间,大部分表示植被量,
|
||
# 负值主要根据云、水和雪而生成
|
||
# 接近零的值则主要根据岩石和裸土而生成。
|
||
# 较低的(小于等于 0.1)NDVI 值表示岩石、沙石或雪覆盖的贫瘠区域。
|
||
# 中等值(0.2 至 0.3)表示灌木丛和草地
|
||
# 较高的值(0.6 至 0.8)表示温带雨林和热带雨林。
|
||
ndvi_mask_path = processing_path + "ndvi_mask.tif"
|
||
ndvi_scope = list(processing_paras['NDVIScope'].split(';'))
|
||
threshold_of_ndvi_min = float(ndvi_scope[0])
|
||
threshold_of_ndvi_max = float(ndvi_scope[1])
|
||
roi.trans_tif2mask(ndvi_mask_path, preprocessed_paras[name], threshold_of_ndvi_min, threshold_of_ndvi_max)
|
||
roi_paths.append(ndvi_mask_path)
|
||
# else:
|
||
# # 其他特征影像
|
||
# tif_mask_path = processing_path + name + "_mask.tif"
|
||
# roi.trans_tif2mask(tif_mask_path, preprocessed_paras[name], np.nan)
|
||
# roi_paths.append(tif_mask_path)
|
||
|
||
|
||
bare_land_mask_path = processing_path + "bare_land_mask.tif"
|
||
for roi_path in roi_paths:
|
||
roi.combine_mask(bare_land_mask_path, roi_path, bare_land_mask_path)
|
||
return bare_land_mask_path
|
||
|
||
@staticmethod
|
||
def roi_process_VP(names, processing_path, processing_paras, preprocessed_paras, file_name):
|
||
roi_paths = []
|
||
roi = ROIAlg()
|
||
for name in names:
|
||
if 'LocalIncidenceAngle' in name:
|
||
# 利用角度为nan生成Mask
|
||
pp.check_LocalIncidenceAngle(preprocessed_paras[name], preprocessed_paras[name])
|
||
angle_nan_mask_path = processing_path + 'angle_nan_mask.tif'
|
||
roi.trans_tif2mask(angle_nan_mask_path, preprocessed_paras[name], np.nan)
|
||
roi_paths.append(angle_nan_mask_path)
|
||
elif ("HH" in name) or ("HV" in name) or ("VH" in name) or ("VV" in name):
|
||
# 利用影像的有效范围生成MASK
|
||
tif_mask_path = processing_path + name + "_tif_mask.tif"
|
||
roi.trans_tif2mask(tif_mask_path, preprocessed_paras[name], np.nan)
|
||
roi_paths.append(tif_mask_path)
|
||
elif name == 'Covering':
|
||
# 利用cover计算植被覆盖范围
|
||
cover_mask_path = processing_path + "cover_mask.tif"
|
||
if processing_paras['CoveringIDs'] == 'empty':
|
||
cover_data = ImageHandler.get_data(preprocessed_paras[file_name + '_' + name])
|
||
cover_data[np.where(np.isnan(cover_data))] = 0
|
||
cover_id_list = list(np.unique(cover_data))
|
||
else:
|
||
cover_id_list = list(processing_paras['CoveringIDs'].split(';'))
|
||
cover_id_list = [int(num) for num in cover_id_list]
|
||
|
||
roi.trans_cover2mask(cover_mask_path, preprocessed_paras[file_name + '_' + name], cover_id_list)
|
||
roi_paths.append(cover_mask_path)
|
||
elif name == "NDVI":
|
||
# 利用NDVI计算裸土范围该指数的输出值在 -1.0 和 1.0 之间,大部分表示植被量,
|
||
# 负值主要根据云、水和雪而生成
|
||
# 接近零的值则主要根据岩石和裸土而生成。
|
||
# 较低的(小于等于 0.1)NDVI 值表示岩石、沙石或雪覆盖的贫瘠区域。
|
||
# 中等值(0.2 至 0.3)表示灌木丛和草地
|
||
# 较高的值(0.6 至 0.8)表示温带雨林和热带雨林。
|
||
ndvi_mask_path = processing_path + "ndvi_mask.tif"
|
||
ndvi_scope = list(processing_paras['NDVIScope'].split(';'))
|
||
threshold_of_ndvi_min = float(ndvi_scope[0])
|
||
threshold_of_ndvi_max = float(ndvi_scope[1])
|
||
roi.trans_tif2mask(ndvi_mask_path, preprocessed_paras[name], threshold_of_ndvi_min,
|
||
threshold_of_ndvi_max)
|
||
roi_paths.append(ndvi_mask_path)
|
||
# else:
|
||
# # 其他特征影像
|
||
# tif_mask_path = processing_path + name + "_mask.tif"
|
||
# roi.trans_tif2mask(tif_mask_path, preprocessed_paras[name], np.nan)
|
||
# roi_paths.append(tif_mask_path)
|
||
|
||
bare_land_mask_path = processing_path + "bare_land_mask.tif"
|
||
for roi_path in roi_paths:
|
||
roi.combine_mask(bare_land_mask_path, roi_path, bare_land_mask_path)
|
||
return bare_land_mask_path
|
||
|
||
@staticmethod
|
||
def trans_tif2mask(out_mask_path, in_tif_path, threshold_min, threshold_max = None):
|
||
"""
|
||
:param out_mask_path:mask输出路径
|
||
:param in_tif_path:输入路径
|
||
:param threshold_min:最小阈值
|
||
:param threshold_max:最大阈值
|
||
:return: True or False
|
||
"""
|
||
image_handler = ImageHandler()
|
||
proj = image_handler.get_projection(in_tif_path)
|
||
geotrans = image_handler.get_geotransform(in_tif_path)
|
||
array = image_handler.get_band_array(in_tif_path, 1)
|
||
if threshold_max == None and np.isnan(threshold_min)==True:
|
||
nan = np.isnan(array)
|
||
mask = (nan.astype(int) == 0).astype(int)
|
||
mask1 = ((array == -9999).astype(int) == 0).astype(int)
|
||
mask *= mask1
|
||
image_handler.write_img(out_mask_path, proj, geotrans, mask)
|
||
else:
|
||
if threshold_min < threshold_max:
|
||
mask = ((array > threshold_min) & (array < threshold_max)).astype(int)
|
||
image_handler.write_img(out_mask_path, proj, geotrans, mask)
|
||
elif threshold_min > threshold_max:
|
||
mask = ((array < threshold_min) & (array > threshold_max)).astype(int)
|
||
image_handler.write_img(out_mask_path, proj, geotrans, mask)
|
||
elif threshold_max == threshold_min:
|
||
mask = ((array == threshold_min).astype(int) == 0).astype(int)
|
||
image_handler.write_img(out_mask_path, proj, geotrans, mask)
|
||
|
||
logger.info("trans_tif2mask success, path: %s", out_mask_path)
|
||
return True
|
||
|
||
@staticmethod
|
||
def trans_cover2mask(out_mask_path, in_tif_path, cover_id_list):
|
||
"""
|
||
:param out_mask_path:mask输出路径
|
||
:param in_tif_path:输入路径
|
||
:param cover_id_list 地表覆盖类型数据的id
|
||
:return: True or False
|
||
"""
|
||
image_handler = ImageHandler()
|
||
proj = image_handler.get_projection(in_tif_path)
|
||
geotrans = image_handler.get_geotransform(in_tif_path)
|
||
array = image_handler.get_band_array(in_tif_path, 1)
|
||
|
||
mask = np.zeros(array.shape, dtype=bool)
|
||
for id in cover_id_list:
|
||
mask_tmp = (array == id)
|
||
mask = mask | mask_tmp
|
||
|
||
mask = mask.astype(int)
|
||
image_handler.write_img(out_mask_path, proj, geotrans, mask)
|
||
|
||
@staticmethod
|
||
def combine_mask(out_mask_path, in_main_mask_path, in_sub_mask_path):
|
||
"""
|
||
:param out_mask_path:输出路径
|
||
:param in_main_mask_path:主mask路径,输出影像采用主mask的地理信息
|
||
:param in_sub_mask_path:副mask路径
|
||
"""
|
||
image_handler = ImageHandler()
|
||
proj = image_handler.get_projection(in_main_mask_path)
|
||
geotrans = image_handler.get_geotransform(in_main_mask_path)
|
||
main_array = image_handler.get_band_array(in_main_mask_path, 1)
|
||
if image_handler.get_dataset(in_sub_mask_path) != None:
|
||
sub_array = image_handler.get_band_array(in_sub_mask_path, 1)
|
||
main_array = main_array * sub_array
|
||
image_handler.write_img(out_mask_path, proj, geotrans, main_array)
|
||
logger.info("combine_mask success, path: %s", out_mask_path)
|
||
return True
|
||
|
||
@staticmethod
|
||
def cal_roi(out_tif_path, in_tif_path, mask_path, background_value=1):
|
||
"""
|
||
:param out_tif_path:ROI的影像
|
||
:param in_tif_path:计算ROI的影像
|
||
:param mask_path:掩模
|
||
:param background_value:无效区域设置的背景值
|
||
:return: True or False
|
||
"""
|
||
image_handler = ImageHandler()
|
||
proj = image_handler.get_projection(in_tif_path)
|
||
geotrans = image_handler.get_geotransform(in_tif_path)
|
||
tif_array = image_handler.get_data(in_tif_path) # 读取所有波段的像元值存为数组
|
||
mask_array = image_handler.get_band_array(mask_path, 1)
|
||
if len(tif_array.shape) == 3:
|
||
im_bands, im_height, im_width = tif_array.shape
|
||
else:
|
||
im_bands, (im_height, im_width) = 1, tif_array.shape
|
||
if im_bands == 1:
|
||
tif_array[np.isnan(mask_array)] = background_value
|
||
tif_array[mask_array == 0] = background_value
|
||
elif im_bands>1:
|
||
for i in range(0, im_bands):
|
||
tif_array[i, :, :][np.isnan(mask_array)] = background_value
|
||
tif_array[i, :, :][mask_array == 0] = background_value
|
||
image_handler.write_img(out_tif_path, proj, geotrans, tif_array, background_value)
|
||
logger.info("cal_roi success, path: %s", out_tif_path)
|
||
return True
|
||
|
||
if __name__ == '__main__':
|
||
# dir = r'G:\MicroWorkspace\C-SAR\SoilMoisture\Temporary\processing/'
|
||
out_tif_path = r"D:\BaiduNetdiskDownload\veg_landcover_re_mask.tif"
|
||
in_tif_path = r"D:\BaiduNetdiskDownload\veg_landcover_re.tif"
|
||
# mask_path = r'D:\micro\WorkSpace\SurfaceRoughness\Temporary\processing\roi\ndvi_mask.tif'
|
||
# background_value = 0
|
||
# ROIAlg.cal_roi(out_tif_path, in_tif_path, mask_path, background_value)
|
||
cover_id_list = [10, 40, 90]
|
||
|
||
ROIAlg.trans_cover2mask(out_tif_path, in_tif_path, cover_id_list)
|
||
pass |