# 裁剪 解缠后干涉图 # 注意 这个裁剪步骤并未纳入到标准的 干涉处理流程中,属于单独执行步骤 # 不考虑处理异常 # author :chenzenghui # time : 2023.06.04 # 影像裁剪干涉图,这里使用 四至坐标进行裁剪 # 影像裁剪流程 # 1. 获取范围 # 2. 获取裁剪影像范围 # 3. 逐像素裁剪 # 4. 数据格式为isce专用格式 ############################## # 需要裁剪数据类型 # a. geom_reference # 1. hgt.hdr # 2. incLocal.hdr # 3. lat.hdr # 4. lon.hdr # 5. los.hdr # 6. shadowMask.hdr # b. interferograms # 1. filt_fine.unw.vrt # c. SLC ############################## import os import sys import numpy as np from osgeo import gdal import argparse import shutil import isce import isceobj from isce import logging from iscesys.Component.Component import Component from isceobj.Util.ImageUtil import ImageLib as IML from isceobj.Image import createUnwImage from osgeo import gdal,osr,ogr logger = logging.getLogger('isce.insar') def gdal2isce_xml(fname): """ Generate ISCE xml file from gdal supported file Example: import isce from applications.gdal2isce_xml import gdal2isce_xml xml_file = gdal2isce_xml(fname+'.vrt') """ # open the GDAL file and get typical data informationi GDAL2ISCE_DATATYPE = { 1 : 'BYTE', 2 : 'uint16', 3 : 'SHORT', 4 : 'uint32', 5 : 'INT', 6 : 'FLOAT', 7 : 'DOUBLE', 10: 'CFLOAT', 11: 'complex128', } # GDAL2NUMPY_DATATYPE = { # 1 : np.uint8, # 2 : np.uint16, # 3 : np.int16, # 4 : np.uint32, # 5 : np.int32, # 6 : np.float32, # 7 : np.float64, # 10: np.complex64, # 11: np.complex128, # } # check if the input file is a vrt fbase, fext = os.path.splitext(fname) print(fext) if fext == ".vrt": outname = fbase else: outname = fname print(outname) # open the GDAL file and get typical ds information ds = gdal.Open(fname, gdal.GA_ReadOnly) width = ds.RasterXSize length = ds.RasterYSize bands = ds.RasterCount print("width: " + "\t" + str(width)) print("length: " + "\t" + str(length)) print("num of bands:" + "\t" + str(bands)) # getting the datatype information raster = ds.GetRasterBand(1) dataTypeGdal = raster.DataType # user look-up dictionary from gdal to isce format dataType= GDAL2ISCE_DATATYPE[dataTypeGdal] print("dataType: " + "\t" + str(dataType)) # transformation contains gridcorners (lines/pixels or lonlat and the spacing 1/-1 or deltalon/deltalat) transform = ds.GetGeoTransform() # if a complex data type, then create complex image # if a real data type, then create a regular image img = isceobj.createImage() img.setFilename(os.path.abspath(outname)) img.setWidth(width) img.setLength(length) img.setAccessMode('READ') img.bands = bands img.dataType = dataType # interleave md = ds.GetMetadata('IMAGE_STRUCTURE') sch = md.get('INTERLEAVE', None) if sch == 'LINE': img.scheme = 'BIL' elif sch == 'PIXEL': img.scheme = 'BIP' elif sch == 'BAND': img.scheme = 'BSQ' else: print('Unrecognized interleaving scheme, {}'.format(sch)) if bands < 2: print('Assuming default, BIP') img.scheme = 'BIP' else: print('Assuming default, BSQ') img.scheme = 'BSQ' img.firstLongitude = transform[0] img.firstLatitude = transform[3] img.deltaLatitude = transform[5] img.deltaLongitude = transform[1] xml_file = outname + ".xml" img.dump(xml_file) return xml_file def get_geom_mask(bbox,lon_path,lat_path): # bbox=[NSWE] # 使用gdal 读取 # lon lon_ds=gdal.Open(lon_path,gdal.GA_ReadOnly) lat_ds=gdal.Open(lat_path,gdal.GA_ReadOnly) # lon_height=lon_ds.RasterYSize lon_width=lon_ds.RasterYSize lon_count=lon_ds.RasterCount lat_height=lat_ds.RasterYSize lat_width=lat_ds.RasterYSize lat_count=lat_ds.RasterCount if lon_height==lat_height and lon_width==lat_width and lon_count==lat_count: print("lon and lat check pass!!!") else: logger.error("lon and lat check don't pass!!") sys.exit(1) lon_data=lon_ds.ReadAsArray() lat_data=lat_ds.ReadAsArray() S,N,W,E=bbox mask=None # 判断是否跨越+-180 if W-E>180: # 跨 mask=(lat_dataS) & (lon_dataE) else: mask=(lat_dataS) & (lon_data>W) & (lon_data=0: shutil.copyfile( os.path.join(src_unw_path,filename), os.path.join(to_unw_path,filename) ) filenames.remove(filename) else: continue # 处理其他文件 for fobjname in ["fine.int","filt_fine.cor","filt_fine.int","filt_fine.unw.conncomp"]: # band num = 1 repairGeom_refrence(os.path.join(src_unw_path,fobjname), os.path.join(to_unw_path,fobjname),clipmask,ext_vrt=".vrt",ext_xml=".aux.xml") for fobjname in ["fine.cor","filt_fine.unw"]: # band num=2 repairGeom_refrence(os.path.join(src_unw_path,fobjname), os.path.join(to_unw_path,fobjname),clipmask,ext_vrt=".vrt",ext_xml=".aux.xml") pass def ifgProcess(src_ifg_path,to_ifg_path,clipmask): for unw_foldername in os.listdir(src_ifg_path): src_unw_path=os.path.join(src_ifg_path,unw_foldername) if os.path.isdir(src_unw_path): to_unw_path=os.path.join(to_ifg_path,unw_foldername) # 清理文件夹 if os.path.exists(to_unw_path): shutil.rmtree(to_unw_path) else: pass os.makedirs(to_unw_path) # 判断文件夹是否可用 ifgfileProcess(src_unw_path,to_unw_path,clipmask) else: continue pass def slcProcess(src_slc_path,to_slc_path): if os.path.exists(to_slc_path): shutil.rmtree(to_slc_path) shutil.copytree(src_slc_path,to_slc_path) def CutProcess(inps): source_path=inps.source_path outpath=inps.outpath NoData=inps.Nodatavalue bbox=inps.bbox # 路径构建 src_geom_path=os.path.join(source_path,"geom_reference") src_ifg_path=os.path.join(source_path,"interferograms") src_slc_path=os.path.join(source_path,"SLC") to_geom_path=os.path.join(outpath,"geom_reference") to_ifg_path=os.path.join(outpath,"interferograms") to_slc_path=os.path.join(outpath,"SLC") for folder_path in [to_geom_path,to_ifg_path,to_slc_path]: if os.path.exists(folder_path): shutil.rmtree(folder_path) os.makedirs(folder_path) # geom_reference clipmask=geom_referenceProcess(src_geom_path,to_geom_path,bbox) # ifg ifgProcess(src_ifg_path,to_ifg_path,clipmask) # SLC 直接复制 slcProcess(src_slc_path,to_slc_path) def main(): #if not argument provided force the --help flag if(len(sys.argv) == 1): sys.argv.append('-h') # Use the epilog to add usage examples epilog = '将格式为tif 的DEM 转换为ISCE 支持的DEM格式:\n\n' epilog += 'Usage examples:\n\n' epilog += 'cutUnWrapper.py -i merge_folder_path -o new_merge_folder_path -b "37.8 38 109.2 109.3" \n\n' #set the formatter_class=argparse.RawDescriptionHelpFormatter otherwise it splits the epilog lines with its own default format parser = argparse.ArgumentParser(formatter_class=argparse.RawDescriptionHelpFormatter,epilog=epilog) parser.add_argument('-i', '--source', type = str, default ="/mnt/e/isce2/cygwin_sice2/merged", dest = 'source_path', help = '输入 merged 文件夹地址') parser.add_argument('-o', '--outpath', type = str, default = '/mnt/e/isce2/cygwin_sice2/new_merged', dest = 'outpath', help = '输出文件 merged 文件夹地址') parser.add_argument('-b', '--bbox', type = str, default = "37.8 38 109.2 109.3", dest = 'bbox', help = '裁剪bbox') parser.add_argument('-Nodata', '--Nodata', type = float, default = 0.0, dest = 'Nodatavalue', help = '解缠无效值填充') args = parser.parse_args() args.bbox=[float(i) for i in args.bbox.split()] CutProcess(args) # processDEM2ISCE("DEM2ISCE",args.source_path,args.outpath,args.fillvalue,args.Nodatavalue) return -1 if __name__=="__main__": # 测试 sys.exit(main())