2023-12-06 09:44:22 +00:00
#!/usr/bin/env python3
# David Bekaert
import os
import glob
import argparse
from uncompressFile import uncompressfile
import shutil
import xml . etree . ElementTree as etree
def createParser ( ) :
'''
Create command line parser .
'''
parser = argparse . ArgumentParser ( description = ' Prepare LT1AB SLC processing (unzip/untar files, organize in date folders, generate script to unpack into isce formats). For now, it cannot merge multiple scenes ' )
parser . add_argument ( ' -i ' , ' --input ' , dest = ' input ' , type = str , required = False ,
help = ' directory with the slc data ' )
parser . add_argument ( ' -rmfile ' , ' --rmfile ' , dest = ' rmfile ' , action = ' store_true ' , default = False ,
help = ' Optional: remove zip/tar/compressed files after unpacking into date structure (default is to keep in archive folder) ' )
parser . add_argument ( ' -o ' , ' --output ' , dest = ' output ' , type = str , required = False ,
help = ' output directory where data needs to be unpacked into isce format (for script generation). ' )
2025-02-05 01:37:37 +00:00
# parser.add_argument('--linux',dest="linux", action='store_true', default=True, help='run in linux')
2023-12-06 09:44:22 +00:00
parser . add_argument ( ' -t ' , ' --text_cmd ' , dest = ' text_cmd ' , type = str , required = False , default = ' source ~/.bash_profile; ' ,
help = ' text command to be added to the beginning of each line of the run files. Default: source ~/.bash_profile; ' )
return parser
def cmdLineParse ( iargs = None ) :
'''
Command line parser .
'''
parser = createParser ( )
return parser . parse_args ( args = iargs )
def get_Date ( LT1ABfolder ) :
# will search for different version of workreport to be compatible with ASf, WInSAR etc
LT1ABfile = glob . glob ( os . path . join ( LT1ABfolder , ' *.meta.xml ' ) )
# if nothing is found return a failure
if len ( LT1ABfile ) > 0 :
LT1ABfile = LT1ABfile [ 0 ]
# loading the date information from the product.xml file
tree = etree . parse ( LT1ABfile )
root = tree . getroot ( )
# for attributes in root.iter('{http://www.rsi.ca/rs2/prod/xml/schemas}sensor'):
# attribute_list = list(attributes)
# for attribute in attribute_list:
# if attribute.tag=='{http://www.rsi.ca/rs2/prod/xml/schemas}rawDataStartTime':
# date = attribute.text
# UTC = date[11:16]
# acquisitionDate = date[0:4]+date[5:7]+date[8:10]
image_time = root . find ( ' productInfo ' ) . find ( ' sceneInfo ' ) . find ( ' start ' ) . find ( ' timeUTC ' ) . text
if image_time != None :
acquisitionDate = image_time [ 0 : 4 ] + image_time [ 5 : 7 ] + image_time [ 8 : 10 ]
if len ( acquisitionDate ) == 8 :
successflag = True
return successflag , acquisitionDate
# if it reached here it could not find the acqusiitionDate
successflag = False
acquisitionDate = ' FAIL '
return successflag , acquisitionDate
def main ( iargs = None ) :
'''
The main driver .
'''
inps = cmdLineParse ( iargs )
# parsing required inputs
inputDir = os . path . abspath ( inps . input )
# parsing optional inputs
if inps . output :
outputDir = os . path . abspath ( inps . output )
else :
outputDir = None
rmfile = inps . rmfile
2025-02-05 01:37:37 +00:00
# inputDirs = r'/mnt/e/MicroWorkspace/HJ2-Deformation/download/'
2023-12-06 09:44:22 +00:00
# inputDir = os.path.abspath(inputDirs)
2025-02-05 01:37:37 +00:00
# outputDirs = r'/mnt/e/MicroWorkspace/HJ2-Deformation/SLC'
2023-12-06 09:44:22 +00:00
# outputDir = os.path.abspath(outputDirs)
# rmfile = False
# filename of the runfile
run_unPack = os . path . join ( inputDir , ' run_unPackLT1AB.txt ' )
# loop over the different folder, LT1AB zip/tar files and unzip them, make the names consistent
LT1AB_extensions = ( os . path . join ( inputDir , ' LT1*.zip ' ) , os . path . join ( inputDir , ' LT1*.tar ' ) , os . path . join ( inputDir , ' LT1*.gz ' ) )
for LT1AB_extension in LT1AB_extensions :
LT1AB_filesfolders = glob . glob ( LT1AB_extension )
for LT1AB_infilefolder in LT1AB_filesfolders :
## the path to the folder/zip
workdir = os . path . dirname ( LT1AB_infilefolder )
## get the output name folder without any extensions
temp = os . path . basename ( LT1AB_infilefolder )
# trim the extensions and keep only very first part
parts = temp . split ( " .tar.gz " )
parts = parts [ 0 ] . split ( ' - ' )
LT1AB_outfolder = parts [ 0 ]
# add the path back in
LT1AB_outfolder = os . path . join ( workdir , LT1AB_outfolder )
# loop over two cases (either file or folder):
### this is a file, try to unzip/untar it
if os . path . isfile ( LT1AB_infilefolder ) :
# unzip the file in the outfolder
successflag_unzip = uncompressfile ( LT1AB_infilefolder , LT1AB_outfolder )
# put failed files in a seperate directory
if not successflag_unzip :
os . makedirs ( os . path . join ( workdir , ' FAILED_FILES ' ) , exist_ok = True )
os . rename ( LT1AB_infilefolder , os . path . join ( workdir , ' FAILED_FILES ' , ' . ' ) )
else :
# check if file needs to be removed or put in archive folder
if rmfile :
os . remove ( LT1AB_infilefolder )
# print('Deleting: ' + LT1AB_infilefolder)
else :
os . makedirs ( os . path . join ( workdir , ' ARCHIVED_FILES ' ) , exist_ok = True )
# cmd = 'mv ' + LT1AB_infilefolder + ' ' + os.path.join(workdir,'ARCHIVED_FILES','.')
# os.system(cmd)
shutil . move ( LT1AB_infilefolder , os . path . join ( workdir , ' ARCHIVED_FILES ' , ' . ' ) )
# loop over the different LT1AB folders and make sure the folder names are consistent.
# this step is not needed unless the user has manually unzipped data before.
LT1AB_folders = glob . glob ( os . path . join ( inputDir , ' LT1* ' ) )
for LT1AB_folder in LT1AB_folders :
# in case the user has already unzipped some files, make sure they are unzipped similar like the uncompressfile code
temp = os . path . basename ( LT1AB_folder )
parts = temp . split ( " .tar.gz " )
parts = parts [ 0 ] . split ( ' - ' )
LT1AB_outfolder_temp = parts [ 0 ]
LT1AB_outfolder_temp = os . path . join ( os . path . dirname ( LT1AB_folder ) , LT1AB_outfolder_temp )
# check if the folder (LT1AB_folder) has a different filename as generated from the uncompressFile code (LT1AB_outfolder_temp)
if not ( LT1AB_outfolder_temp == LT1AB_folder ) :
# it is different, check if the LT1AB_outfolder_temp already exists, if yes, delete the current folder
if os . path . isdir ( LT1AB_outfolder_temp ) :
# print('Remove ' + LT1AB_folder + ' as ' + LT1AB_outfolder_temp + ' exists...')
# check if this folder already exist, if so overwrite it
shutil . rmtree ( LT1AB_folder )
# loop over the different LT1AB folders and organize in date folders
LT1AB_folders = glob . glob ( os . path . join ( inputDir , ' LT1* ' ) )
for LT1AB_folder in LT1AB_folders :
# get the date
successflag , imgDate = get_Date ( LT1AB_folder )
workdir = os . path . dirname ( LT1AB_folder )
if successflag :
# move the file into the date folder
SLC_dir = os . path . join ( workdir , imgDate , ' ' )
if os . path . isdir ( SLC_dir ) :
shutil . rmtree ( SLC_dir )
# cmd = 'mv ' + LT1AB_folder + ' ' + SLC_dir
# os.system(cmd)
shutil . move ( LT1AB_folder , SLC_dir )
print ( ' Succes: ' + imgDate )
else :
print ( ' Failed: ' + LT1AB_folder )
2025-02-05 01:37:37 +00:00
# now generate the unpacking script for all the date dirs
dateDirs = glob . glob ( os . path . join ( inputDir , ' 2* ' ) )
if outputDir is not None :
f = open ( run_unPack , ' w ' )
for dateDir in dateDirs :
LT1ABFiles = glob . glob ( os . path . join ( dateDir , ' LT1*.tiff ' ) )
if len ( LT1ABFiles ) < = 0 :
LT1ABFiles = glob . glob ( os . path . join ( dateDir , ' LT1*.tif ' ) )
if len ( LT1ABFiles ) > 0 :
acquisitionDate = os . path . basename ( dateDir )
slcDir = os . path . join ( outputDir , acquisitionDate )
os . makedirs ( slcDir , exist_ok = True )
cmd = ' unpackFrame_LT1AB.py -i ' + os . path . abspath ( dateDir ) + ' -o ' + slcDir
# result = os.system(cmd)
# print(cmd, result)
f . write ( cmd + ' \n ' )
f . close ( )
2023-12-06 09:44:22 +00:00
if __name__ == ' __main__ ' :
main ( )