246 lines
12 KiB
Python
246 lines
12 KiB
Python
import json
|
||
from xml.etree.ElementTree import ElementTree, Element
|
||
import shutil
|
||
from tool.algorithm.image.ImageHandle import ImageHandler
|
||
from tool.algorithm.algtools.PreProcess import PreProcess as pp
|
||
from osgeo import gdal
|
||
import numpy as np
|
||
import datetime
|
||
import os
|
||
import glob
|
||
import xmltodict
|
||
# os.environ['PROJ_LIB'] = r"E:\soft\Anaconda\envs\micro\Lib\site-packages\osgeo\data\proj"
|
||
|
||
class CreateMetaDict:
|
||
def __init__(self, image_path, origin_xml, pack_path, out_path1, out_path2):
|
||
self.ImageHandler = ImageHandler()
|
||
self.image_path = image_path
|
||
self.origin_xml = origin_xml
|
||
self.pack_path = pack_path
|
||
self.file_size = self.get_file_size()
|
||
self.out_path1 = out_path1
|
||
self.out_path2 = out_path2
|
||
self.timeDict = self.get_productTime()
|
||
pass
|
||
|
||
def calu_nature(self):
|
||
"""
|
||
将productinfo节点需要填写的信息存入字典中
|
||
image_path:影像路径
|
||
image_pair:输入的压缩包中的极化对 例:hh,hv,vh,vv=【1,1,1,1】
|
||
out_path1:地理转平面的输出路径
|
||
out_path2:平面转地理的输出路径
|
||
"""
|
||
|
||
para_dict = {}
|
||
|
||
proj = self.ImageHandler.get_projection(self.image_path) # 输出的影像若是投影坐标系则先转成地理坐标系
|
||
keyword = proj.split("[", 2)[0] # 若是地理坐标系则pass
|
||
if keyword == "GEOGCS":
|
||
pass
|
||
elif keyword == "PROJCS":
|
||
pp.trans_projcs2geogcs(self.out_path2, self.image_path)
|
||
image_path = self.out_path2
|
||
elif len(keyword) == 0 or keyword.strip() == "" or keyword.isspace() is True:
|
||
raise Exception('image projection is missing!')
|
||
|
||
pp.trans_geogcs2projcs(self.out_path1, self.image_path) # 坐标投影, 地理转平面投影坐标
|
||
imageinfo_widthspace = self.ImageHandler.get_geotransform(self.out_path1)[1] # 投影后的分辨率
|
||
# imageinfo_heightspace = -self.ImageHandler.get_geotransform(out_path1)[5] # 投影后的分辨率
|
||
# para_dict.update({"imageinfo_widthspace": imageinfo_widthspace})
|
||
# para_dict.update({"imageinfo_heightspace": imageinfo_heightspace})
|
||
|
||
para_dict.update({"imageinfo_ProductResolution": imageinfo_widthspace})
|
||
|
||
para_dict.update({"imageinfo_ProductFormat": "GEOTIFF"})
|
||
para_dict.update({"imageinfo_CompressionMethod": "None"})
|
||
para_dict.update({"imageinfo_ProductSize": str(self.file_size) + "MB"}) #todo 产品总大小
|
||
|
||
get_scope = self.ImageHandler.get_scope(self.image_path)
|
||
point_upleft, point_upright, point_downleft, point_downright = get_scope[0], get_scope[1], get_scope[2], get_scope[3]
|
||
para_dict.update({"SpatialCoverageInformation_TopLeftLatitude": point_upleft[1]})
|
||
para_dict.update({"SpatialCoverageInformation_TopLeftLongitude": point_upleft[0]})
|
||
para_dict.update({"SpatialCoverageInformation_TopRightLatitude": point_upright[1]})
|
||
para_dict.update({"SpatialCoverageInformation_TopRightLongitude": point_upright[0]})
|
||
para_dict.update({"SpatialCoverageInformation_BottomLeftLatitude": point_downleft[1]})
|
||
para_dict.update({"SpatialCoverageInformation_BottomLeftLongitude": point_downleft[0]})
|
||
para_dict.update({"SpatialCoverageInformation_BottomRightLatitude": point_downright[1]})
|
||
para_dict.update({"SpatialCoverageInformation_BottomRightLongitude": point_downright[0]})
|
||
longitude_max = np.array([point_upleft[0], point_upright[0], point_downleft[0], point_downright[0]]).max()
|
||
longitude_min = np.array([point_upleft[0], point_upright[0], point_downleft[0], point_downright[0]]).min()
|
||
latitude_max = np.array([point_upleft[1], point_upright[1], point_downleft[1], point_downright[1]]).max()
|
||
latitude_min = np.array([point_upleft[1], point_upright[1], point_downleft[1], point_downright[1]]).min()
|
||
imageinfo_center_latitude = (latitude_max + latitude_min) / 2
|
||
imageinfo_center_longitude = (longitude_max + longitude_min) / 2
|
||
para_dict.update({"SpatialCoverageInformation_CenterLatitude": imageinfo_center_latitude})
|
||
para_dict.update({"SpatialCoverageInformation_CenterLongitude": imageinfo_center_longitude})
|
||
|
||
para_dict.update({"TimeCoverageInformation_StartTime": self.timeDict.get("startTime")})
|
||
para_dict.update({"TimeCoverageInformation_CenterTime": self.timeDict.get("centerTime")})
|
||
para_dict.update({"TimeCoverageInformation_EndTime": self.timeDict.get("endTime")})
|
||
|
||
para_dict.update({"CoordinateReferenceSystemInformation_EarthEllipsoid": "WGS84"})
|
||
para_dict.update({"CoordinateReferenceSystemInformation_MapProjection": "UTM"})
|
||
para_dict.update({"CoordinateReferenceSystemInformation_ZoneNo": "None"})
|
||
|
||
para_dict.update({"MetaInfo_Unit": "none"}) # 设置单位
|
||
para_dict.update({"MetaInfo_UnitDes": "无量纲"}) # 设置单位
|
||
|
||
# 补充ProductProductionInfo节信息
|
||
data_name = os.path.basename(self.image_path)
|
||
strs = data_name.split("_")
|
||
para_dict.update({"DataSources_DataSource_Satellite": strs[0]})
|
||
para_dict.update({"DataSources_DataSource_Sensor": strs[0]})
|
||
|
||
para_dict.update({"ObservationGeometry_SatelliteAzimuth": "None"})
|
||
para_dict.update({"ObservationGeometry_SatelliteRange": "None"})
|
||
|
||
para_dict.update({"ProductProductionInfo_BandSelection": "1"})
|
||
para_dict.update({"ProductProductionInfo_DataSourceDescription": "None"})
|
||
para_dict.update({"ProductProductionInfo_DataSourceProcessingDescription": "参考产品介绍PDF"})
|
||
productGentime = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||
para_dict.update({"ProductProductionInfo_ProductionDate": productGentime})
|
||
para_dict.update({"ProductProductionInfo_AuxiliaryDataDescription": ""})
|
||
|
||
# para_dict.update({"ProductPublishInfo_Processor": "德清院"}) # 生产者
|
||
# para_dict.update({"ProductPublishInfo_DistributionUnit": "none"}) # 分发单位
|
||
# para_dict.update({"ProductPublishInfo_ContactInformation": "none"}) # 联系信息
|
||
return para_dict
|
||
|
||
def get_productTime(self):
|
||
time_dict = {}
|
||
tree = ElementTree()
|
||
tree.parse(self.origin_xml)
|
||
root = tree.getroot()
|
||
|
||
platform = root.find("platform")
|
||
if platform is None:
|
||
centerTime = " "
|
||
else:
|
||
centerTime = platform.find("CenterTime").text.split(".")[0]
|
||
|
||
productInfo = root.find("imageinfo")
|
||
imagingTime = productInfo.find("imagingTime")
|
||
if imagingTime is None:
|
||
startTime = " "
|
||
endTime = " "
|
||
else:
|
||
startTime = imagingTime.find("start").text.split(".")[0]
|
||
endTime = imagingTime.find("end").text.split(".")[0]
|
||
|
||
time_dict.update({"startTime": startTime})
|
||
time_dict.update({"centerTime": centerTime})
|
||
time_dict.update({"endTime": endTime})
|
||
return time_dict
|
||
|
||
def get_file_size(self):
|
||
in_tif_paths = list(glob.glob(os.path.join(self.pack_path, '*.tif')))
|
||
in_tif_paths1 = list(glob.glob(os.path.join(self.pack_path, '*.tiff')))
|
||
in_tif_paths += in_tif_paths1
|
||
size = 0
|
||
for file in in_tif_paths:
|
||
fsize = os.path.getsize(file) # 返回的是字节大小
|
||
size += fsize
|
||
return round(size / float(1024*1024), 2)
|
||
|
||
|
||
class CreateProductXml:
|
||
def __init__(self, par_dict, model_path, xml_path):
|
||
self.par_dict = par_dict
|
||
self.xml_path = xml_path
|
||
shutil.copy(model_path, xml_path)
|
||
pass
|
||
|
||
def create_standard_xml(self):
|
||
"""将字典中的信息写入到copy的xml文件中"""
|
||
tree = ElementTree()
|
||
tree.parse(self.xml_path) # 影像头文件
|
||
root = tree.getroot()
|
||
|
||
productinfo = root.find("ProductBasicInfo")
|
||
for key, value in self.par_dict.items():
|
||
if key.split("_")[0] == "imageinfo":
|
||
productinfo.find(key.split("_")[1]).text = str(value)
|
||
elif key.split("_")[0] == "SpatialCoverageInformation":
|
||
imageinfo = productinfo.find("SpatialCoverageInformation")
|
||
imageinfo.find(key.split("_")[1]).text = str(value)
|
||
elif key.split("_")[0] == "TimeCoverageInformation":
|
||
timeInfo = productinfo.find("TimeCoverageInformation")
|
||
timeInfo.find(key.split("_")[1]).text = str(value)
|
||
elif key.split("_")[0] == "CoordinateReferenceSystemInformation":
|
||
geoInfo = productinfo.find("CoordinateReferenceSystemInformation")
|
||
geoInfo.find(key.split("_")[1]).text = str(value)
|
||
elif key.split("_")[0] == "MetaInfo":
|
||
metaInfo = productinfo.find("MetaInfo")
|
||
metaInfo.find(key.split("_")[1]).text = str(value)
|
||
ProductProductionInfo = root.find("ProductProductionInfo") # 原始数据信息
|
||
for key, value in self.par_dict.items():
|
||
if key.split("_")[0] == "DataSources":
|
||
dataSources = ProductProductionInfo.find("DataSources")
|
||
dataSource = dataSources.find("DataSource")
|
||
dataSource.find(key.split("_")[2]).text = str(value)
|
||
elif key.split("_")[0] == "ObservationGeometry":
|
||
ObservationGeometry = ProductProductionInfo.find("ObservationGeometry")
|
||
ObservationGeometry.find(key.split("_")[1]).text = str(value)
|
||
elif key.split("_")[0] == "ProductProductionInfo":
|
||
ProductProductionInfo.find(key.split("_")[1]).text = str(value)
|
||
|
||
# ProductPublishInfo = root.find("ProductPublishInfo") # 发布者信息
|
||
# for key, value in self.par_dict.items():
|
||
# if key.split("_")[0] == "ProductPublishInfo":
|
||
# ProductPublishInfo.find(key.split("_")[1]).text = str(value)
|
||
|
||
tree.write(self.xml_path, encoding="utf-8", xml_declaration=True)
|
||
|
||
|
||
class OrthoAzimuth:
|
||
|
||
@staticmethod
|
||
def FindInfomationFromJson(HeaderFile_dom_json, node_path_list):
|
||
"""
|
||
在Json文件中,按照指定路径解析出制定节点
|
||
"""
|
||
result_node = HeaderFile_dom_json
|
||
for nodename in node_path_list:
|
||
result_node = result_node[nodename]
|
||
return result_node
|
||
|
||
@staticmethod
|
||
def get_Azimuth_incidence(Azimuth_path):
|
||
Azimuth_incidence = 0
|
||
if not os.path.exists(Azimuth_path):
|
||
return Azimuth_incidence
|
||
with open(Azimuth_path) as f:
|
||
Azimuth_incidence = f.readline()
|
||
return Azimuth_incidence
|
||
|
||
@staticmethod
|
||
def read_Azimuth_incidence(xml_path):
|
||
# tree = ElementTree()
|
||
# tree.parse(xml_path)
|
||
# root = tree.getroot()
|
||
# Azimuth_incidence = float(root.find('ProductProductionInfo').find('ObservationGeometry').find('SatelliteAzimuth').text)
|
||
# return Azimuth_incidence
|
||
with open(xml_path, 'r', encoding='utf-8') as fp:
|
||
HeaderFile_dom_str = fp.read()
|
||
HeaderFile_dom = xmltodict.parse(HeaderFile_dom_str) # 将XML转成json文本
|
||
HeaderFile_dom_json = json.loads(json.dumps(HeaderFile_dom))
|
||
node_path_list = ['Root', 'ProductProductionInfo', 'ObservationGeometry', 'SatelliteAzimuth']
|
||
Azimuth_incidence = OrthoAzimuth.FindInfomationFromJson(HeaderFile_dom_json, node_path_list)
|
||
return Azimuth_incidence
|
||
|
||
|
||
if __name__ == '__main__':
|
||
|
||
image_path = r'D:\Micro\WorkSpace\test\GF3B_MYC_QPSI_003581_E120.6_N31.3_20220729_L1B_h_h_L10000073024_db_RD_geo.tif'
|
||
origin_xml = r'D:\Micro\WorkSpace\Ortho\Temporary\package\GF3B_MYC_QPSI_003581_E120.6_N31.3_20220729_L1A_AHV_L10000073024.meta.xml'
|
||
tem_folder = r'D:\Micro\WorkSpace\test'
|
||
pack_path = r'D:\Micro\WorkSpace\Ortho\Temporary\package'
|
||
out_dem_path1 = os.path.join(tem_folder, "trans_dem_geo_projcs.tif")
|
||
out_dem_path2 = os.path.join(tem_folder, "trans_dem_projcs_geo.tif")
|
||
para_dict = CreateMetaDict(image_path, origin_xml, pack_path, out_dem_path1, out_dem_path2).calu_nature()
|
||
|
||
model_path = r'D:\Project\microproduct\Ortho\product.xml'
|
||
xml_path = r'D:\Micro\WorkSpace\test\test.xml'
|
||
CreateProductXml(para_dict, model_path, xml_path).create_standard_xml()
|