microproduct/atmosphericDelay/AtmosphericDelayDT.py

198 lines
7.6 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

"""
@Project microproduct
@File AtmosphericDelayDT.py
@Function :主函数
@Author LMM
@Date 2021/10/19 14:39
@Version 1.0.0
"""
from osgeo import gdal, gdalconst, osr
from tool.algorithm.image.ImageHandle import ImageHandler
class DataTransform:
"""数据转换"""
def __int__(self):
pass
def Cover(self, filepath):
"""读取数据"""
array = ImageHandler().get_data(filepath)
return array
def get_arr(self, filepath):
"""读取数据"""
testDst = gdal.Open(filepath, gdalconst.GA_Update)
width = testDst.RasterXSize
height = testDst.RasterYSize
band_count = testDst.RasterCount
band1 = testDst.GetRasterBand(1)
img_arr = band1.ReadAsArray(0, 0, width, height)
return img_arr
def write_tif(self, out_path, lat_path, lon_path, in_path):
lat_arr = self.Cover(lat_path) # 读取lat_rdr
lon_arr = self.Cover(lon_path) # 读取lon_rdr
in_arr = self.Cover(in_path) # 读取hgt_rdr
band = ImageHandler().get_bands(in_path)
if band == 2:
save_array = in_arr[1, :, :]
else:
save_array = in_arr
"""读取数据"""
# 转换参数
# lat_res = abs(lat_arr[1, 0]-lat_arr[0, 0])
# lon_res = abs(lon_arr[0, 1] - lon_arr[0, 0])
import numpy as np
lon_leftup = np.min(lon_arr[np.nonzero(lon_arr)])
lon_rightup = np.max(lon_arr[np.nonzero(lon_arr)])
lat_leftup = np.max(lat_arr[np.nonzero(lat_arr)])
lat_leftdown = np.min(lat_arr[np.nonzero(lat_arr)])
lon_res1 = (lon_rightup - lon_leftup) / lon_arr.shape[1]
lat_res1 = (lat_leftup - lat_leftdown) / lat_arr.shape[0]
# 定义投影系
dst_srs = osr.SpatialReference()
dst_srs.ImportFromEPSG(4326) # 定义一个WGS84坐标系
im_proj = dst_srs.ExportToWkt()
im_geo = [lon_leftup, lon_res1, 0, lat_leftup, 0, -lat_res1]
# [(左上角像元的位置), (像元的宽度), (如果影像是指北的0),(左上角像元的位置),如果影像是指北的0, (像元的高度)]
ImageHandler().write_img(out_path, im_proj, im_geo, save_array, no_data='null')
def unw_write_tif(self, out_path, lat_path, lon_path, in_path):
lat_arr = self.Cover(lat_path) # 读取lat_rdr
lon_arr = self.Cover(lon_path) # 读取lon_rdr
in_arr = self.Cover(in_path) # 读取hgt_rdr
band = ImageHandler().get_bands(in_path)
save_array = in_arr
"""读取数据"""
# 转换参数
lat_res = abs(lat_arr[1, 0]-lat_arr[0, 0])
lon_res = abs(lon_arr[0, 1] - lon_arr[0, 0])
# 定义投影系
dst_srs = osr.SpatialReference()
dst_srs.ImportFromEPSG(4326) # 定义一个WGS84坐标系
im_proj = dst_srs.ExportToWkt()
im_geo = [lon_arr[0][0], lon_res, 0, lat_arr[0][0], 0, -lat_res]
# [(左上角像元的位置), (像元的宽度), (如果影像是指北的0),(左上角像元的位置),如果影像是指北的0, (像元的高度)]
ImageHandler().write_img(out_path, im_proj, im_geo, save_array, no_data='null')
def write_tif1(self, out_path, lat_path, lon_path, in_path):
"""输入的in_path数据翻转输入的lat_path数据翻转"""
lat_arr_src = self.Cover(lat_path) # 读取lat_rdr
lon_arr_src = self.Cover(lon_path) # 读取lon_rdr
in_arr_src = self.Cover(in_path) # 读取hgt_rdr
lat_arr2 = lat_arr_src[::-1]
lon_arr2 = lon_arr_src
band = ImageHandler().get_bands(in_path)
if band == 2:
save_array = in_arr_src[1, :, :]
else:
save_array = in_arr_src
"""读取数据"""
# geo = [0, 0, 0, 0, 0, 0]
lon_width = ImageHandler().get_img_width(lon_path) # 栅格矩阵的行数
lat_width = ImageHandler().get_img_height(lat_path) # 栅格矩阵的行数
# 转换参数
lat_res = abs(lat_arr2[-1, 0] - lat_arr2[0, 0])/lat_width
lon_res = abs(lon_arr2[0, -1] - lon_arr2[0, 0])/lon_width
# 定义投影系
dst_srs = osr.SpatialReference()
dst_srs.ImportFromEPSG(4326) # 定义一个WGS84坐标系
im_proj = dst_srs.ExportToWkt()
im_geo = [lon_arr2[0][0], lon_res, 0, lat_arr2[0][0], 0, -lat_res]
# [(左上角像元的位置), (像元的宽度), (如果影像是指北的0),(左上角像元的位置),如果影像是指北的0, (像元的高度)]
ImageHandler().write_img(out_path, im_proj, im_geo, save_array, no_data='null')
def tif_vrt(self, tif_path, vrt_path):
"""
干涉图.tif重采样为.int的大小
"""
delayDst = gdal.Open(tif_path)
if delayDst is None:
return False
delayWidth = delayDst.RasterXSize
delayHeight = delayDst.RasterYSize
delayBands = delayDst.RasterCount
delayArray = delayDst.ReadAsArray(0, 0, delayWidth, delayHeight)
testDst = gdal.Open(vrt_path, gdalconst.GA_Update)
if testDst is None:
return False
width = testDst.RasterXSize
height = testDst.RasterYSize
Bands = testDst.RasterCount
band1 = testDst.GetRasterBand(1)
# img_arr = band1.ReadAsArray(0, 0, width, height) # 1
if delayWidth!=width or delayHeight!=height or delayBands!=Bands:
return False
# print("修改前:\t", img_arr[0, 0]) # 1
# print("修改后:\t", delayArray[0, 0]) # 1
band1.WriteArray(delayArray)
band1 = None
testDst = None
img_arr = None
# testDst2 = gdal.Open(vrt_path, gdalconst.GA_Update) # 1
# width2 = testDst2.RasterXSize # 1
# height2 = testDst2.RasterYSize # 1
# band12 = testDst2.GetRasterBand(1) # 1
# img_arr2 = band12.ReadAsArray(0, 0, width2, height2) # 1
# print("重新读取后:\t", img_arr2[0, 0]) # 1
# print("over") # 1
return True
def read_tif(self, Atmosp_path):
testDst=gdal.Open(Atmosp_path,gdalconst.GA_Update)
width=testDst.RasterXSize
height=testDst.RasterYSize
band_count=testDst.RasterCount
band1=testDst.GetRasterBand(1)
img_arr=band1.ReadAsArray(0,0,width,height)
return img_arr
def tran(self, vrt_path,Atmo_arr1):
testDst=gdal.Open(vrt_path,gdalconst.GA_Update)
width=testDst.RasterXSize
height=testDst.RasterYSize
band_count=testDst.RasterCount
band1=testDst.GetRasterBand(1)
img_arr=band1.ReadAsArray(0,0,width,height)
# print("修改前:\t",img_arr.dtype)
# print("修改前:\t",img_arr[500,500])
# print("修改后:\t",Atmo_arr1[500,500])
# print("修改后:\t",Atmo_arr1.dtype)
band1.WriteArray(Atmo_arr1)
# band1.WriteArray(img_arr)
band1.FlushCache()
del testDst
band1=None
testDst=None
img_arr=None
def tran_af(self, vrt_path):
testDst2=gdal.Open(vrt_path, gdalconst.GA_Update)
width2=testDst2.RasterXSize
height2=testDst2.RasterYSize
band_count2=testDst2.RasterCount
band12=testDst2.GetRasterBand(1)
img_arr2=band12.ReadAsArray(0,0,width2,height2)
# print("重新读取后:\t",img_arr2[500,500])
# print("重新读取后:\t", img_arr2.dtype)
# print("over")