microproduct-l-sar/tool/algorithm/algtools/ROIAlg.py

237 lines
12 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

# -*- coding: UTF-8 -*-
"""
@Project:microproduct
@File:ROIAlg.py
@Function:
@Contact:
@Author:SHJ
@Date:2021/11/17
@Version:1.0.0
"""
import logging
from tool.algorithm.image.ImageHandle import ImageHandler
from tool.algorithm.algtools.PreProcess import PreProcess as pp
import numpy as np
logger = logging.getLogger("mylog")
class ROIAlg:
def __init__(self,):
pass
@staticmethod
def roi_process(names, processing_path, processing_paras, preprocessed_paras):
roi_paths = []
roi = ROIAlg()
for name in names:
if 'LocalIncidenceAngle' in name:
# 利用角度为nan生成Mask
pp.check_LocalIncidenceAngle(preprocessed_paras[name],preprocessed_paras[name])
angle_nan_mask_path = processing_path + 'angle_nan_mask.tif'
roi.trans_tif2mask(angle_nan_mask_path, preprocessed_paras[name], np.nan)
roi_paths.append(angle_nan_mask_path)
elif ("HH" in name) or ("HV" in name) or ("VH" in name) or ("VV" in name):
# 利用影像的有效范围生成MASK
tif_mask_path = processing_path + name + "_tif_mask.tif"
roi.trans_tif2mask(tif_mask_path, preprocessed_paras[name], np.nan)
roi_paths.append(tif_mask_path)
elif name == 'Covering':
# 利用cover计算植被覆盖范围
if processing_paras['CoveringIDs'] == 'empty':
cover_data = ImageHandler.get_data(preprocessed_paras[name])
cover_data[np.where(np.isnan(cover_data))] = 0
cover_id_list = list(np.unique(cover_data))
else:
cover_id_list = list(processing_paras['CoveringIDs'].split(';'))
cover_id_list = [int(num) for num in cover_id_list]
cover_mask_path = processing_path + "cover_mask.tif"
roi.trans_cover2mask(cover_mask_path, preprocessed_paras[name], cover_id_list)
roi_paths.append(cover_mask_path)
elif name == "NDVI":
# 利用NDVI计算裸土范围该指数的输出值在 -1.0 和 1.0 之间,大部分表示植被量,
# 负值主要根据云、水和雪而生成
# 接近零的值则主要根据岩石和裸土而生成。
# 较低的(小于等于 0.1NDVI 值表示岩石、沙石或雪覆盖的贫瘠区域。
# 中等值0.2 至 0.3)表示灌木丛和草地
# 较高的值0.6 至 0.8)表示温带雨林和热带雨林。
ndvi_mask_path = processing_path + "ndvi_mask.tif"
ndvi_scope = list(processing_paras['NDVIScope'].split(';'))
threshold_of_ndvi_min = float(ndvi_scope[0])
threshold_of_ndvi_max = float(ndvi_scope[1])
roi.trans_tif2mask(ndvi_mask_path, preprocessed_paras[name], threshold_of_ndvi_min, threshold_of_ndvi_max)
roi_paths.append(ndvi_mask_path)
# else:
# # 其他特征影像
# tif_mask_path = processing_path + name + "_mask.tif"
# roi.trans_tif2mask(tif_mask_path, preprocessed_paras[name], np.nan)
# roi_paths.append(tif_mask_path)
bare_land_mask_path = processing_path + "bare_land_mask.tif"
for roi_path in roi_paths:
roi.combine_mask(bare_land_mask_path, roi_path, bare_land_mask_path)
return bare_land_mask_path
@staticmethod
def roi_process_VP(names, processing_path, processing_paras, preprocessed_paras, file_name):
roi_paths = []
roi = ROIAlg()
for name in names:
if 'LocalIncidenceAngle' in name:
# 利用角度为nan生成Mask
pp.check_LocalIncidenceAngle(preprocessed_paras[name], preprocessed_paras[name])
angle_nan_mask_path = processing_path + 'angle_nan_mask.tif'
roi.trans_tif2mask(angle_nan_mask_path, preprocessed_paras[name], np.nan)
roi_paths.append(angle_nan_mask_path)
elif ("HH" in name) or ("HV" in name) or ("VH" in name) or ("VV" in name):
# 利用影像的有效范围生成MASK
tif_mask_path = processing_path + name + "_tif_mask.tif"
roi.trans_tif2mask(tif_mask_path, preprocessed_paras[name], np.nan)
roi_paths.append(tif_mask_path)
elif name == 'Covering':
# 利用cover计算植被覆盖范围
cover_mask_path = processing_path + "cover_mask.tif"
if processing_paras['CoveringIDs'] == 'empty':
cover_data = ImageHandler.get_data(preprocessed_paras[file_name + '_' + name])
cover_data[np.where(np.isnan(cover_data))] = 0
cover_id_list = list(np.unique(cover_data))
else:
cover_id_list = list(processing_paras['CoveringIDs'].split(';'))
cover_id_list = [int(num) for num in cover_id_list]
roi.trans_cover2mask(cover_mask_path, preprocessed_paras[file_name + '_' + name], cover_id_list)
roi_paths.append(cover_mask_path)
elif name == "NDVI":
# 利用NDVI计算裸土范围该指数的输出值在 -1.0 和 1.0 之间,大部分表示植被量,
# 负值主要根据云、水和雪而生成
# 接近零的值则主要根据岩石和裸土而生成。
# 较低的(小于等于 0.1NDVI 值表示岩石、沙石或雪覆盖的贫瘠区域。
# 中等值0.2 至 0.3)表示灌木丛和草地
# 较高的值0.6 至 0.8)表示温带雨林和热带雨林。
ndvi_mask_path = processing_path + "ndvi_mask.tif"
ndvi_scope = list(processing_paras['NDVIScope'].split(';'))
threshold_of_ndvi_min = float(ndvi_scope[0])
threshold_of_ndvi_max = float(ndvi_scope[1])
roi.trans_tif2mask(ndvi_mask_path, preprocessed_paras[name], threshold_of_ndvi_min,
threshold_of_ndvi_max)
roi_paths.append(ndvi_mask_path)
# else:
# # 其他特征影像
# tif_mask_path = processing_path + name + "_mask.tif"
# roi.trans_tif2mask(tif_mask_path, preprocessed_paras[name], np.nan)
# roi_paths.append(tif_mask_path)
bare_land_mask_path = processing_path + "bare_land_mask.tif"
for roi_path in roi_paths:
roi.combine_mask(bare_land_mask_path, roi_path, bare_land_mask_path)
return bare_land_mask_path
@staticmethod
def trans_tif2mask(out_mask_path, in_tif_path, threshold_min, threshold_max = None):
"""
:param out_mask_path:mask输出路径
:param in_tif_path:输入路径
:param threshold_min:最小阈值
:param threshold_max:最大阈值
:return: True or False
"""
image_handler = ImageHandler()
proj = image_handler.get_projection(in_tif_path)
geotrans = image_handler.get_geotransform(in_tif_path)
array = image_handler.get_band_array(in_tif_path, 1)
if threshold_max == None and np.isnan(threshold_min)==True:
nan = np.isnan(array)
mask = (nan.astype(int) == 0).astype(int)
mask1 = ((array == -9999).astype(int) == 0).astype(int)
mask *= mask1
image_handler.write_img(out_mask_path, proj, geotrans, mask)
else:
if threshold_min < threshold_max:
mask = ((array > threshold_min) & (array < threshold_max)).astype(int)
image_handler.write_img(out_mask_path, proj, geotrans, mask)
elif threshold_min > threshold_max:
mask = ((array < threshold_min) & (array > threshold_max)).astype(int)
image_handler.write_img(out_mask_path, proj, geotrans, mask)
elif threshold_max == threshold_min:
mask = ((array == threshold_min).astype(int) == 0).astype(int)
image_handler.write_img(out_mask_path, proj, geotrans, mask)
logger.info("trans_tif2mask success, path: %s", out_mask_path)
return True
@staticmethod
def trans_cover2mask(out_mask_path, in_tif_path, cover_id_list):
"""
:param out_mask_path:mask输出路径
:param in_tif_path:输入路径
:param cover_id_list 地表覆盖类型数据的id
:return: True or False
"""
image_handler = ImageHandler()
proj = image_handler.get_projection(in_tif_path)
geotrans = image_handler.get_geotransform(in_tif_path)
array = image_handler.get_band_array(in_tif_path, 1)
mask = np.zeros(array.shape, dtype=bool)
for id in cover_id_list:
mask_tmp = (array == id)
mask = mask | mask_tmp
mask = mask.astype(int)
image_handler.write_img(out_mask_path, proj, geotrans, mask)
@staticmethod
def combine_mask(out_mask_path, in_main_mask_path, in_sub_mask_path):
"""
:param out_mask_path:输出路径
:param in_main_mask_path:主mask路径输出影像采用主mask的地理信息
:param in_sub_mask_path:副mask路径
"""
image_handler = ImageHandler()
proj = image_handler.get_projection(in_main_mask_path)
geotrans = image_handler.get_geotransform(in_main_mask_path)
main_array = image_handler.get_band_array(in_main_mask_path, 1)
if image_handler.get_dataset(in_sub_mask_path) != None:
sub_array = image_handler.get_band_array(in_sub_mask_path, 1)
main_array = main_array * sub_array
image_handler.write_img(out_mask_path, proj, geotrans, main_array)
logger.info("combine_mask success, path: %s", out_mask_path)
return True
@staticmethod
def cal_roi(out_tif_path, in_tif_path, mask_path, background_value=1):
"""
:param out_tif_path:ROI的影像
:param in_tif_path:计算ROI的影像
:param mask_path:掩模
:param background_value:无效区域设置的背景值
:return: True or False
"""
image_handler = ImageHandler()
proj = image_handler.get_projection(in_tif_path)
geotrans = image_handler.get_geotransform(in_tif_path)
tif_array = image_handler.get_data(in_tif_path) # 读取所有波段的像元值存为数组
mask_array = image_handler.get_band_array(mask_path, 1)
if len(tif_array.shape) == 3:
im_bands, im_height, im_width = tif_array.shape
else:
im_bands, (im_height, im_width) = 1, tif_array.shape
if im_bands == 1:
tif_array[np.isnan(mask_array)] = background_value
tif_array[mask_array == 0] = background_value
elif im_bands>1:
for i in range(0, im_bands):
tif_array[i, :, :][np.isnan(mask_array)] = background_value
tif_array[i, :, :][mask_array == 0] = background_value
image_handler.write_img(out_tif_path, proj, geotrans, tif_array, background_value)
logger.info("cal_roi success, path: %s", out_tif_path)
return True
# if __name__ == '__main__':
# dir = r'G:\MicroWorkspace\C-SAR\SoilMoisture\Temporary\processing/'
# out_tif_path = dir + 'soil_moisture_roi.tif'
# in_tif_path = dir + 'soil_moisture.tif'
# mask_path = dir + 'bare_land_mask.tif'
# background_value = np.nan
# ROIAlg.cal_roi(out_tif_path, in_tif_path, mask_path, background_value)
# pass