""" @Project :microproduct @File :AHVToPolsarpro.PY @Function :将四个极化数据转成S2矩阵文件 @Author :LMM @Date :2021/10/19 14:39 @Version :1.0.0 """ import os import numpy as np import glob import struct from tool.algorithm.image.ImageHandle import ImageHandler class AHVToPolsarproS2: """ 全极化影像转换为bin格式S2矩阵,支持polsarpro处理 """ def __init__(self): pass @staticmethod def __ahv_to_s2(ahv_dir): """ 全极化影像转S2矩阵 :param ahv_dir: 全极化影像文件夹路径 :return: 极化散射矩阵S2 """ in_tif_paths = list(glob.glob(os.path.join(ahv_dir, '*.tif'))) if in_tif_paths == []: in_tif_paths = list(glob.glob(os.path.join(ahv_dir, '*.tiff'))) s11, s12, s21, s22 = None,None,None,None flag_list = [0, 0, 0, 0] for in_tif_path in in_tif_paths: # 读取原始SAR影像 proj, geotrans, data = ImageHandler.read_img(in_tif_path) # 获取极化类型 if 'HH' in os.path.basename(in_tif_path): data_real = data[0, :, :] # 获取第一个波段 (实部) data_imag = data[1, :, :] # 获取第二个波段 (虚部) s11 = data_real + 1j * data_imag flag_list[0] = 1 elif 'HV' in os.path.basename(in_tif_path): data_real = data[0, :, :] data_imag = data[1, :, :] s12 = data_real + 1j * data_imag flag_list[1] = 1 elif 'VH' in os.path.basename(in_tif_path): data_real = data[0, :, :] data_imag = data[1, :, :] s21 = data_real + 1j * data_imag flag_list[2] = 1 elif 'VV' in os.path.basename(in_tif_path): data_real = data[0, :, :] data_imag = data[1, :, :] s22 = data_real + 1j * data_imag flag_list[3] = 1 else: continue if not flag_list == [1, 1, 1, 1]: raise Exception('tif of HH or HV or VH or VV is not in path :%s', ahv_dir) return s11, s12, s21, s22 def __s2_to_bin(self, out_dir, s11, s12, s21, s22): """ S2矩阵转bin格式,支持 polsarpro处理 :param out_dir: 输出的文件夹路径 :param s11: :param s12: :param s21 :param s22: :return: bin格式矩阵S2和头文件 """ if not os.path.exists(out_dir): os.makedirs(out_dir) rows = s11.shape[0] cols = s11.shape[1] bins_dict = {'s11.bin': s11, 's12.bin': s12, 's21.bin': s21, 's22.bin': s22} for name, data in bins_dict.items(): bin_path = os.path.join(out_dir, name) self.__write_slc_img_bin(data, bin_path,name) out_hdr_path = bin_path+'.hdr' self.__write_bin_hdr(out_hdr_path, bin_path, rows, cols) self.__write_config_file(out_dir, rows, cols) @staticmethod def __write_slc_img_bin(im, file_path,name): """ 写入影像到bin文件中,保存为float32类型 :param im : 影像矩阵数据,暂支持单通道影像数据 :param file_path: bin文件的完整路径 """ with open(file_path, 'wb') as f: rows = im.shape[0] cols = im.shape[1] cre_im = np.zeros((rows, 2*cols), dtype=float) cre_im[:, ::2] = im.real #存 real cre_im[:, 1::2] = im.imag #存 imag for row in range(rows): cre_im_bin = struct.pack("f" * 2*cols, *np.reshape(cre_im[row, :], (2*cols, 1), order='F')) f.write(cre_im_bin) f.close() @staticmethod def read_slc_bin_to_img(bin_path): """ 读取bin格式二进制数据,输出为矩阵 :param bin_path : bin文件的路径,包含.bin,.config :return : 矩阵信息 """ (bin_dir, bin_name) = os.path.split(bin_path) config_path = os.path.join(bin_dir, 'config.txt') config = open(config_path, 'r').read().split('\n', -1) rows = int(config[1]) cols = int(config[4]) bin_file = open(bin_path, 'rb') # 打开二进制文件 size = os.path.getsize(bin_path) # 获得文件大小 if size < rows * cols * 4 * 2: raise Exception( 'bin size less than rows*cols*4! size:', size, 'byte, rows:', rows, 'cols:', cols) bin_data = np.zeros([rows, cols*2], dtype=np.float32) img_array = np.zeros([2,rows, cols], dtype=np.float32) for row in range(rows): data = bin_file.read(4 * cols * 2) # 每次读取一行的二进制数据 row_data = struct.unpack('f' * cols*2, data) # 转为一行float数据 bin_data[row, :] = row_data bin_file.close() img_array[0] = bin_data[:, ::2] # real img_array[1] = bin_data[:, 1::2] # imag return img_array @staticmethod def __write_bin_hdr(out_hdr_path, bin_path, rows, cols): """ 写入影像的头文件 :param out_hdr_path : 头文件的路径 :param bin_path: bin文件的路径 :param rows: 影像的行数 :param cols: 影像的列数 """ h1 = 'ENVI' h2 = 'description = {' h3 = 'ENVI File, Created [] }' h4 = 'samples = ' + str(cols) # 列 h5 = 'lines = ' + str(rows) # 行 h6 = 'bands = 1 ' # 波段数 h7 = 'header offset = 0' h8 = 'file type = ENVI Standard' h9 = 'data type = 6' # 数据格式,6代表复数 h10 = 'interleave = bsq' # 存储格式 h11 = 'sensor type = Unknown' h12 = 'byte order = 0' h13 = 'wavelength units = Unknown' h14 = 'complex function = Power' h = [h1, h2, h3, h4, h5, h6, h7, h8, h9, h10, h11, h12, h13, h14] doc = open(out_hdr_path, 'w') for i in range(0, 14): print(h[i], end='', file=doc) print('\n', end='', file=doc) doc.close() @staticmethod def __write_config_file(out_config_dir, rows, cols): """ 写入polsarpro配置文件 :param out_config_dir : 配置文件路径 :param rows: 影像的行数 :param cols: 影像的列数 """ h1 = 'Nrow' h2 = str(rows) h3 = '---------' h4 = 'Ncol' h5 = str(cols) h6 = '---------' h7 = 'PolarCase' # h8 = 'monostatic' h8 = 'bistatic' h9 = '---------' h10 = 'PolarType' h11 = 'full' h = [h1, h2, h3, h4, h5, h6, h7, h8, h9, h10, h11] out_config_path = os.path.join(out_config_dir, 'config.txt') doc = open(out_config_path, 'w') for i in range(0, 11): print(h[i], end='', file=doc) print('\n', end='', file=doc) doc.close() def api_ahv_to_polsarpro_s2(self, out_file_dir, in_ahv_dir): s11, s12, s21, s22 = self.__ahv_to_s2(in_ahv_dir) self.__s2_to_bin(out_file_dir, s11, s12, s21, s22) # if __name__ == '__main__': # # test() # atp = AHVToPolsarproS2() # ahv_path = r'D:\DATA\GAOFEN3\2-GF3_MYN_WAV_020086_E107.2_N27.6_20200603_L1A_AHV_L10004843087' # # ahv_path = 'D:\\DATA\\GAOFEN3\\2598957_Paris\\' # out_file_path = r'D:\DATA\GAOFEN3\2-GF3_MYN_WAV_020086_E107.2_N27.6_20200603_L1A_AHV_L10004843087\SLC_SHJ_2' # atp.api_ahv_to_polsarpro_s2(out_file_path, ahv_path) # bin_path = r'D:\DATA\GAOFEN3\2-GF3_MYN_WAV_020086_E107.2_N27.6_20200603_L1A_AHV_L10004843087\SLC_SHJ\s11.bin' # # data = atp.read_slc_bin_to_img(bin_path) # print("done")