# -*- coding: UTF-8 -*- """ @Project:__init__.py @File:AHVToPolsarpro.py @Function:全极化影像转成polsarpro格式T3数据 @Contact: @Author:SHJ @Date:2021/9/18 16:44 @Version:1.0.0 """ import os import numpy as np import glob import pickle import struct from tool.algorithm.algtools.MetaDataHandler import MetaDataHandler from tool.algorithm.image.ImageHandle import ImageHandler class AHVToPolsarpro: """ 全极化影像转换为bin格式T3矩阵,支持polsarpro处理 """ def __init__(self, hh_hv_vh_vv_path_list=[]): self._hh_hv_vh_vv_path_list = hh_hv_vh_vv_path_list pass @staticmethod def __ahv_to_s2_veg(ahv_dir): """ 全极化影像转S2矩阵 :param ahv_dir: 全极化影像文件夹路径 :return: 极化散射矩阵S2 """ global s11 in_tif_paths = list(glob.glob(os.path.join(ahv_dir, '*.tif'))) in_tif_paths1 = list(glob.glob(os.path.join(ahv_dir, '*.tiff'))) in_tif_paths += in_tif_paths1 s11, s12, s21, s22 = None, None, None, None flag_list = [0, 0, 0, 0] for in_tif_path in in_tif_paths: # 读取原始SAR影像 proj, geotrans, data = ImageHandler.read_img(in_tif_path) # 获取极化类型 if '_HH' in os.path.basename(in_tif_path): data_real = data[0, :, :] data_imag = data[1, :, :] s11 = data_real + 1j * data_imag flag_list[0] = 1 elif '_HV' in os.path.basename(in_tif_path): data_real = data[0, :, :] data_imag = data[1, :, :] s12 = data_real + 1j * data_imag flag_list[1] = 1 elif '_VH' in os.path.basename(in_tif_path): data_real = data[0, :, :] data_imag = data[1, :, :] s21 = data_real + 1j * data_imag flag_list[2] = 1 elif '_VV' in os.path.basename(in_tif_path): data_real = data[0, :, :] data_imag = data[1, :, :] s22 = data_real + 1j * data_imag flag_list[3] = 1 else: continue if not flag_list == [1, 1, 1, 1]: raise Exception('HH or HV or VH or VV is not in path :%s', ahv_dir) return s11, s12, s21, s22 @staticmethod def __ahv_to_s2_soil(ahv_dir): """ 全极化影像转S2矩阵 :param ahv_dir: 全极化影像文件夹路径 :return: 极化散射矩阵S2 """ global s11 in_tif_paths = list(glob.glob(os.path.join(ahv_dir, '*.tif'))) in_tif_paths1 = list(glob.glob(os.path.join(ahv_dir, '*.tiff'))) in_tif_paths += in_tif_paths1 s11, s12, s21, s22 = None, None, None, None flag_list = [0, 0, 0, 0] for in_tif_path in in_tif_paths: # 读取原始SAR影像 proj, geotrans, data = ImageHandler.read_img(in_tif_path) # 获取极化类型 if 'HH' in os.path.basename(in_tif_path): data_real = data[0, :, :] data_imag = data[1, :, :] s11 = data_real + 1j * data_imag flag_list[0] = 1 elif 'HV' in os.path.basename(in_tif_path): data_real = data[0, :, :] data_imag = data[1, :, :] s12 = data_real + 1j * data_imag flag_list[1] = 1 elif 'VH' in os.path.basename(in_tif_path): data_real = data[0, :, :] data_imag = data[1, :, :] s21 = data_real + 1j * data_imag flag_list[2] = 1 elif 'VV' in os.path.basename(in_tif_path): data_real = data[0, :, :] data_imag = data[1, :, :] s22 = data_real + 1j * data_imag flag_list[3] = 1 else: continue if not flag_list == [1, 1, 1, 1]: raise Exception('HH or HV or VH or VV is not in path :%s', ahv_dir) return s11, s12, s21, s22 @staticmethod def __ahv_to_s2_list(ahv_path_list): """ 全极化影像转S2矩阵 :param ahv_dir: 全极化影像文件夹路径 :return: 极化散射矩阵S2 """ global s11 in_tif_paths = ahv_path_list s11, s12, s21, s22 = None, None, None, None flag_list = [0, 0, 0, 0] for in_tif_path in in_tif_paths: # 读取原始SAR影像 proj, geotrans, data = ImageHandler.read_img(in_tif_path) # 获取极化类型 if 'HH' in os.path.basename(in_tif_path): data_real = data[0, :, :] data_imag = data[1, :, :] s11 = data_real + 1j * data_imag flag_list[0] = 1 elif 'HV' in os.path.basename(in_tif_path): data_real = data[0, :, :] data_imag = data[1, :, :] s12 = data_real + 1j * data_imag flag_list[1] = 1 elif 'VH' in os.path.basename(in_tif_path): data_real = data[0, :, :] data_imag = data[1, :, :] s21 = data_real + 1j * data_imag flag_list[2] = 1 elif 'VV' in os.path.basename(in_tif_path): data_real = data[0, :, :] data_imag = data[1, :, :] s22 = data_real + 1j * data_imag flag_list[3] = 1 else: continue if not flag_list == [1, 1, 1, 1]: raise Exception('HH or HV or VH or VV is not in path') return s11, s12, s21, s22 @staticmethod def __ahv_to_s2_list_2(hh_hv_vh_vv_path_list): """ 全极化影像转S2矩阵 :param ahv_dir: 全极化影像文件夹路径 :return: 极化散射矩阵S2 """ global s11 in_tif_paths = hh_hv_vh_vv_path_list s11, s12, s21, s22 = None, None, None, None flag_list = [0, 0, 0, 0] for in_tif_path, n in zip(in_tif_paths, range(len(in_tif_paths))): # 读取原始SAR影像 proj, geotrans, data = ImageHandler.read_img(in_tif_path) # 获取极化类型 if n == 0: data_real = data[0, :, :] data_imag = data[1, :, :] s11 = data_real + 1j * data_imag flag_list[0] = 1 elif n == 1: data_real = data[0, :, :] data_imag = data[1, :, :] s12 = data_real + 1j * data_imag flag_list[1] = 1 elif n == 2: data_real = data[0, :, :] data_imag = data[1, :, :] s21 = data_real + 1j * data_imag flag_list[2] = 1 elif n == 3: data_real = data[0, :, :] data_imag = data[1, :, :] s22 = data_real + 1j * data_imag flag_list[3] = 1 else: continue if not flag_list == [1, 1, 1, 1]: raise Exception('HH or HV or VH or VV is not in path') return s11, s12, s21, s22 @staticmethod def __s2_to_t3(s11, s12, s21, s22): """ S2矩阵转T3矩阵 :param s11: HH极化数据 :param s12: HV极化数据 :param s21: VH极化数据 :param s22: VV极化数据 :return: 极化相干矩阵T3 """ HH = s11 HV = s12 VH = s21 VV = s22 t11 = (np.abs(HH + VV)) ** 2 / 2 t12 = (HH + VV) * np.conj(HH - VV) / 2 t13 = (HH + VV) * np.conj(HV + VH) t21 = (HH - VV) * np.conj(HH + VV) / 2 t22 = np.abs(HH - VV) ** 2 / 2 t23 = (HH - VV) * np.conj(HV + VH) t31 = (HV + VH) * np.conj(HH + VV) t32 = (HV + VH) * np.conj(HH - VV) t33 = 2 * np.abs(HV + VH) ** 2 return t11, t12, t13, t21, t22, t23, t31, t32, t33 def __t3_to_polsarpro_t3(self, out_dir, t11, t12, t13, t22, t23, t33): """ T3矩阵转bin格式,支持 polsarpro处理 :param out_dir: 输出的文件夹路径 :param t11: :param t12: :param t13: :param t22: :param t23: :param t33: :return: bin格式矩阵T3和头文件 """ if not os.path.exists(out_dir): os.makedirs(out_dir) rows = t11.shape[0] cols = t11.shape[1] bins_dict = { 'T11.bin': t11, 'T12_real.bin': t12.real, 'T12_imag.bin': t12.imag, 'T13_real.bin': t13.real, 'T13_imag.bin': t13.imag, 'T22.bin': t22, 'T23_real.bin': t23.real, 'T23_imag.bin': t23.imag, 'T33.bin': t33} for name, data in bins_dict.items(): bin_path = os.path.join(out_dir, name) self.__write_img_bin(data, bin_path) # todo 修改T3阵保存方式 # data.tofile(bin_path) out_hdr_path = bin_path + '.hdr' self.__write_bin_hdr(out_hdr_path, bin_path, rows, cols) self.__write_config_file(out_dir, rows, cols) def rows(self): """获取影像行数""" return self._rows def cols(self): """获取影像列数""" return self._cols def __write_img_bin(self, im, file_path): """ 写入影像到bin文件中,保存为float32类型 :param im : 影像矩阵数据,暂支持单通道影像数据 :param file_path: bin文件的完整路径 """ with open(file_path, 'wb') as f: self._rows = im.shape[0] self._cols = im.shape[1] for row in range(self._rows): im_bin = struct.pack("f" * self._cols, *np.reshape(im[row, :], (self._cols, 1), order='F')) f.write(im_bin) f.close() # self._rows = im.shape[0] # self._cols = im.shape[1] # with open(file_path, 'wb') as f: # pickle.dump(im, f) @staticmethod def __write_bin_hdr(out_hdr_path, bin_path, rows, cols): """ 写入影像的头文件 :param out_hdr_path : 头文件的路径 :param bin_path: bin文件的路径 :param rows: 影像的行数 :param cols: 影像的列数 """ h1 = 'ENVI' h2 = 'description = {' h3 = 'File Imported into ENVI. }' h4 = 'samples = ' + str(cols) # 列 h5 = 'lines = ' + str(rows) # 行 h6 = 'bands = 1 ' # 波段数 h7 = 'header offset = 0' h8 = 'file type = ENVI Standard' h9 = 'data type = 4' # 数据格式 h10 = 'interleave = bsq' # 存储格式 h11 = 'sensor type = Unknown' h12 = 'byte order = 0' h13 = 'band names = {' h14 = bin_path + '}' # h = [h1, h2, h3, h4, h5, h6, h7, h8, h9, h10, h11, h12, h13, h14] # doc = open(out_hdr_path, 'w') # for i in range(0, 14): # print(h[i], end='', file=doc) # print('\n', end='', file=doc) h = [h1, h4, h5, h6, h7, h8, h9, h10, h12] doc = open(out_hdr_path, 'w') for i in range(0, 9): print(h[i], end='', file=doc) print('\n', end='', file=doc) doc.close() @staticmethod def __write_config_file(out_config_dir, rows, cols): """ 写入polsarpro配置文件 :param out_config_dir : 配置文件路径 :param rows: 影像的行数 :param cols: 影像的列数 """ h1 = 'Nrow' h2 = str(rows) h3 = '---------' h4 = 'Ncol' h5 = str(cols) h6 = '---------' h7 = 'PolarCase' h8 = 'monostatic' h9 = '---------' h10 = 'PolarType' h11 = 'full' h = [h1, h2, h3, h4, h5, h6, h7, h8, h9, h10, h11] out_config_path = os.path.join(out_config_dir, 'config.txt') doc = open(out_config_path, 'w') for i in range(0, 11): print(h[i], end='', file=doc) print('\n', end='', file=doc) doc.close() def incidence_tif2bin(self, incidence_file, out_path): if not os.path.exists(out_path): os.mkdir(out_path) incidence_bin = os.path.join(out_path, 'incidence.bin') data = ImageHandler().get_data(incidence_file) rows = data.shape[0] cols = data.shape[1] self.__write_img_bin(data, incidence_bin) if not os.path.exists(incidence_bin): raise Exception('incidence to bin failed') out_hdr_path = incidence_bin + '.hdr' self.__write_bin_hdr(out_hdr_path, incidence_bin, rows, cols) return incidence_bin def ahv_to_polsarpro_t3_veg(self, out_file_dir, in_ahv_dir=''): if self._hh_hv_vh_vv_path_list == [] : s11, s12, s21, s22 = self.__ahv_to_s2_veg(in_ahv_dir) else: s11, s12, s21, s22 = self.__ahv_to_s2_list_2(self._hh_hv_vh_vv_path_list) t11, t12, t13, t21, t22, t23, t31, t32, t33 = self.__s2_to_t3( s11, s12, s21, s22) self.__t3_to_polsarpro_t3(out_file_dir, t11, t12, t13, t22, t23, t33) def ahv_to_polsarpro_t3_soil(self, out_file_dir, in_ahv_dir=''): if self._hh_hv_vh_vv_path_list == [] : s11, s12, s21, s22 = self.__ahv_to_s2_soil(in_ahv_dir) else: s11, s12, s21, s22 = self.__ahv_to_s2_list_2(self._hh_hv_vh_vv_path_list) t11, t12, t13, t21, t22, t23, t31, t32, t33 = self.__s2_to_t3( s11, s12, s21, s22) self.__t3_to_polsarpro_t3(out_file_dir, t11, t12, t13, t22, t23, t33) def calibration(self, calibration_value, in_ahv_dir='', name=''): if name == '': out_dir = os.path.join(in_ahv_dir, 'calibration') else: out_dir = os.path.join(in_ahv_dir, name, 'calibration') flag_list = [0, 0, 0, 0] if self._hh_hv_vh_vv_path_list == []: # 地表覆盖、土壤盐碱度 in_tif_paths = list(glob.glob(os.path.join(in_ahv_dir, '*.tif'))) in_tif_paths1 = list(glob.glob(os.path.join(in_ahv_dir, '*.tiff'))) in_tif_paths += in_tif_paths1 for in_tif_path in in_tif_paths: # 读取原始SAR影像 proj, geotrans, data = ImageHandler.read_img(in_tif_path) name = os.path.basename(in_tif_path) data_new = np.zeros(data.shape) # 获取极化类型 if 'HH' in os.path.basename(in_tif_path): data_new[0, :, :] = data[0, :, :] * calibration_value[0] data_new[1, :, :] = data[1, :, :] * calibration_value[0] ImageHandler.write_img(os.path.join(out_dir, name), proj, geotrans, data_new) flag_list[0] = 1 elif 'HV' in os.path.basename(in_tif_path): data_new[0, :, :] = data[0, :, :] * calibration_value[1] data_new[1, :, :] = data[1, :, :] * calibration_value[1] ImageHandler.write_img(os.path.join(out_dir, name), proj, geotrans, data_new) flag_list[1] = 1 elif 'VH' in os.path.basename(in_tif_path): data_new[0, :, :] = data[0, :, :] * calibration_value[2] data_new[1, :, :] = data[1, :, :] * calibration_value[2] ImageHandler.write_img(os.path.join(out_dir, name), proj, geotrans, data_new) flag_list[2] = 1 elif 'VV' in os.path.basename(in_tif_path): data_new[0, :, :] = data[0, :, :] * calibration_value[3] data_new[1, :, :] = data[1, :, :] * calibration_value[3] ImageHandler.write_img(os.path.join(out_dir, name), proj, geotrans, data_new) flag_list[3] = 1 if not flag_list == [1, 1, 1, 1]: raise Exception('calibration error! ') else: for in_tif_path in self._hh_hv_vh_vv_path_list: # 植被物候 # 读取原始SAR影像 proj, geotrans, data = ImageHandler.read_img(in_tif_path) name = os.path.basename(in_tif_path) data_new = np.zeros(data.shape) # 获取极化类型 if '_HH' in os.path.basename(in_tif_path): data_new[0, :, :] = data[0, :, :] * calibration_value[0] data_new[1, :, :] = data[1, :, :] * calibration_value[0] ImageHandler.write_img(os.path.join(out_dir, name), proj, geotrans, data_new) flag_list[0] = 1 elif '_HV' in os.path.basename(in_tif_path): data_new[0, :, :] = data[0, :, :] * calibration_value[1] data_new[1, :, :] = data[1, :, :] * calibration_value[1] ImageHandler.write_img(os.path.join(out_dir, name), proj, geotrans, data_new) flag_list[1] = 1 elif '_VH' in os.path.basename(in_tif_path): data_new[0, :, :] = data[0, :, :] * calibration_value[2] data_new[1, :, :] = data[1, :, :] * calibration_value[2] ImageHandler.write_img(os.path.join(out_dir, name), proj, geotrans, data_new) flag_list[2] = 1 elif '_VV' in os.path.basename(in_tif_path): data_new[0, :, :] = data[0, :, :] * calibration_value[3] data_new[1, :, :] = data[1, :, :] * calibration_value[3] ImageHandler.write_img(os.path.join(out_dir, name), proj, geotrans, data_new) flag_list[3] = 1 if not flag_list == [1, 1, 1, 1]: raise Exception('calibration error! ') self._hh_hv_vh_vv_path_list = [] return out_dir @staticmethod def sar_backscattering_sigma(in_sar_tif, meta_file_path, out_sar_tif, replece_VV=False, is_DB=True): # 读取原始SAR影像 proj, geotrans, in_data = ImageHandler.read_img(in_sar_tif) # 计算强度信息 I = np.array(in_data[0], dtype="float32") Q = np.array(in_data[1], dtype="float32") where_9999_0 = np.where(I == -9999) where_9999_1 = np.where(Q == -9999) I[where_9999_0] = 1.0 Q[where_9999_1] = 1.0 I2 = np.square(I) Q2 = np.square(Q) intensity_arr = I2 + Q2 # 获取极化类型 if 'HH' in os.path.basename(in_sar_tif): polarization = 'HH' elif 'HV' in os.path.basename(in_sar_tif): polarization = 'HV' elif 'VH' in os.path.basename(in_sar_tif): polarization = 'VH' elif 'VV' in os.path.basename(in_sar_tif): polarization = 'VV' if replece_VV: polarization = 'HV' # 土壤水分算法中可能会用HV替换VV elif 'DH' in os.path.basename(in_sar_tif): polarization = 'HH' else: raise Exception('there are not HH、HV、VH、VV in path:', in_sar_tif) # 获取参数 QualifyValue = MetaDataHandler.get_QualifyValue(meta_file_path, polarization) Kdb = MetaDataHandler.get_Kdb(meta_file_path, polarization) # 计算后向散射系数 # 对数形式 coef_arr = 10 * (np.log10(intensity_arr * ((QualifyValue / 32767) ** 2))) - Kdb coef_arr[np.isnan(coef_arr)] = -9999 coef_arr[np.isinf(coef_arr)] = -9999 coef_arr[where_9999_0] = -9999 coef_arr[where_9999_1] = -9999 ## 输出的SAR后向散射系数产品 # ImageHandler.write_img(out_sar_tif, proj, geotrans, coef_arr, 0) tif_array = np.power(10.0, coef_arr / 10.0) # dB --> 线性值 后向散射系数 tif_array[np.isnan(tif_array)] = 0 tif_array[np.isinf(tif_array)] = 0 tif_array[where_9999_0] = 0 tif_array[where_9999_1] = 0 ImageHandler.write_img(out_sar_tif, proj, geotrans, tif_array, 0) return True if __name__ == '__main__': #实例1: # atp = AHVToPolsarpro() # ahv_path = 'D:\\DATA\\GAOFEN3\\2-GF3_MYN_WAV_020086_E107.2_N27.6_20200603_L1A_AHV_L10004843087\\' # # ahv_path = 'D:\\DATA\\GAOFEN3\\2598957_Paris\\' # out_file_path = 'D:\\bintest0923\\' # atp.ahv_to_polsarpro_t3(out_file_path, ahv_path) # # 极化分解得到T3矩阵 # atp = AHVToPolsarpro() # ahv_path = r"I:\MicroWorkspace\product\C-SAR\SoilSalinity\GF3B_MYC_QPSI_003581_E120.6_N31.3_20220729_L1A_AHV_L10000073024_RPC" # t3_path = ahv_path + 'psp_t3\\' # atp.ahv_to_polsarpro_t3(t3_path, ahv_path) #实例2: # dir = r'D:\MicroWorkspace\product\C-SAR\VegetationPhenology\Temporary\preprocessed/' # path_list = [dir +'GF3_SAY_QPSI_011444_E118.9_N31.4_20181012_L1A_AHV_L10003515422_RPC_HH_preprocessed.tif', # dir +'GF3_SAY_QPSI_011444_E118.9_N31.4_20181012_L1A_AHV_L10003515422_RPC_HV_preprocessed.tif', # dir +'GF3_SAY_QPSI_011444_E118.9_N31.4_20181012_L1A_AHV_L10003515422_RPC_VH_preprocessed.tif', # dir +'GF3_SAY_QPSI_011444_E118.9_N31.4_20181012_L1A_AHV_L10003515422_RPC_VV_preprocessed.tif'] # # # atp = AHVToPolsarpro(path_list) # atp.ahv_to_polsarpro_t3(r'D:\MicroWorkspace\product\C-SAR\VegetationPhenology\Temporary\processing\GF3_SAY_QPSI_011444_E118.9_N31.4_20181012_L1A_AHV_L10003515422_RPC/t3') print("done")