import json from xml.etree.ElementTree import ElementTree, Element import shutil from tool.algorithm.image.ImageHandle import ImageHandler from tool.algorithm.algtools.PreProcess import PreProcess as pp from osgeo import gdal import numpy as np import datetime import os import glob import xmltodict # os.environ['PROJ_LIB'] = r"E:\soft\Anaconda\envs\micro\Lib\site-packages\osgeo\data\proj" class CreateMetaDict: def __init__(self, image_path, origin_xml, pack_path, out_path1, out_path2): self.ImageHandler = ImageHandler() self.image_path = image_path self.origin_xml = origin_xml self.pack_path = pack_path self.file_size = self.get_file_size() self.out_path1 = out_path1 self.out_path2 = out_path2 self.timeDict = self.get_productTime() pass def calu_nature(self): """ 将productinfo节点需要填写的信息存入字典中 image_path:影像路径 image_pair:输入的压缩包中的极化对 例:hh,hv,vh,vv=【1,1,1,1】 out_path1:地理转平面的输出路径 out_path2:平面转地理的输出路径 """ para_dict = {} proj = self.ImageHandler.get_projection(self.image_path) # 输出的影像若是投影坐标系则先转成地理坐标系 keyword = proj.split("[", 2)[0] # 若是地理坐标系则pass if keyword == "GEOGCS": pass elif keyword == "PROJCS": pp.trans_projcs2geogcs(self.out_path2, self.image_path) image_path = self.out_path2 elif len(keyword) == 0 or keyword.strip() == "" or keyword.isspace() is True: raise Exception('image projection is missing!') pp.trans_geogcs2projcs(self.out_path1, self.image_path) # 坐标投影, 地理转平面投影坐标 imageinfo_widthspace = self.ImageHandler.get_geotransform(self.out_path1)[1] # 投影后的分辨率 # imageinfo_heightspace = -self.ImageHandler.get_geotransform(out_path1)[5] # 投影后的分辨率 # para_dict.update({"imageinfo_widthspace": imageinfo_widthspace}) # para_dict.update({"imageinfo_heightspace": imageinfo_heightspace}) para_dict.update({"imageinfo_ProductResolution": imageinfo_widthspace}) para_dict.update({"imageinfo_ProductFormat": "GEOTIFF"}) para_dict.update({"imageinfo_CompressionMethod": "None"}) para_dict.update({"imageinfo_ProductSize": str(self.file_size) + "MB"}) #todo 产品总大小 get_scope = self.ImageHandler.get_scope(self.image_path) point_upleft, point_upright, point_downleft, point_downright = get_scope[0], get_scope[1], get_scope[2], get_scope[3] para_dict.update({"SpatialCoverageInformation_TopLeftLatitude": point_upleft[1]}) para_dict.update({"SpatialCoverageInformation_TopLeftLongitude": point_upleft[0]}) para_dict.update({"SpatialCoverageInformation_TopRightLatitude": point_upright[1]}) para_dict.update({"SpatialCoverageInformation_TopRightLongitude": point_upright[0]}) para_dict.update({"SpatialCoverageInformation_BottomLeftLatitude": point_downleft[1]}) para_dict.update({"SpatialCoverageInformation_BottomLeftLongitude": point_downleft[0]}) para_dict.update({"SpatialCoverageInformation_BottomRightLatitude": point_downright[1]}) para_dict.update({"SpatialCoverageInformation_BottomRightLongitude": point_downright[0]}) longitude_max = np.array([point_upleft[0], point_upright[0], point_downleft[0], point_downright[0]]).max() longitude_min = np.array([point_upleft[0], point_upright[0], point_downleft[0], point_downright[0]]).min() latitude_max = np.array([point_upleft[1], point_upright[1], point_downleft[1], point_downright[1]]).max() latitude_min = np.array([point_upleft[1], point_upright[1], point_downleft[1], point_downright[1]]).min() imageinfo_center_latitude = (latitude_max + latitude_min) / 2 imageinfo_center_longitude = (longitude_max + longitude_min) / 2 para_dict.update({"SpatialCoverageInformation_CenterLatitude": imageinfo_center_latitude}) para_dict.update({"SpatialCoverageInformation_CenterLongitude": imageinfo_center_longitude}) para_dict.update({"TimeCoverageInformation_StartTime": self.timeDict.get("startTime")}) para_dict.update({"TimeCoverageInformation_CenterTime": self.timeDict.get("centerTime")}) para_dict.update({"TimeCoverageInformation_EndTime": self.timeDict.get("endTime")}) para_dict.update({"CoordinateReferenceSystemInformation_EarthEllipsoid": "WGS84"}) para_dict.update({"CoordinateReferenceSystemInformation_MapProjection": "UTM"}) para_dict.update({"CoordinateReferenceSystemInformation_ZoneNo": "None"}) para_dict.update({"MetaInfo_Unit": "none"}) # 设置单位 para_dict.update({"MetaInfo_UnitDes": "无量纲"}) # 设置单位 # 补充ProductProductionInfo节信息 data_name = os.path.basename(self.image_path) strs = data_name.split("_") para_dict.update({"DataSources_DataSource_Satellite": strs[0]}) para_dict.update({"DataSources_DataSource_Sensor": strs[0]}) para_dict.update({"ObservationGeometry_SatelliteAzimuth": "None"}) para_dict.update({"ObservationGeometry_SatelliteRange": "None"}) para_dict.update({"ProductProductionInfo_BandSelection": "1"}) para_dict.update({"ProductProductionInfo_DataSourceDescription": "None"}) para_dict.update({"ProductProductionInfo_DataSourceProcessingDescription": "参考产品介绍PDF"}) productGentime = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") para_dict.update({"ProductProductionInfo_ProductionDate": productGentime}) para_dict.update({"ProductProductionInfo_AuxiliaryDataDescription": ""}) # para_dict.update({"ProductPublishInfo_Processor": "德清院"}) # 生产者 # para_dict.update({"ProductPublishInfo_DistributionUnit": "none"}) # 分发单位 # para_dict.update({"ProductPublishInfo_ContactInformation": "none"}) # 联系信息 return para_dict def get_productTime(self): time_dict = {} tree = ElementTree() tree.parse(self.origin_xml) root = tree.getroot() platform = root.find("platform") if platform is None: centerTime = " " else: centerTime = platform.find("CenterTime").text.split(".")[0] productInfo = root.find("imageinfo") imagingTime = productInfo.find("imagingTime") if imagingTime is None: startTime = " " endTime = " " else: startTime = imagingTime.find("start").text.split(".")[0] endTime = imagingTime.find("end").text.split(".")[0] time_dict.update({"startTime": startTime}) time_dict.update({"centerTime": centerTime}) time_dict.update({"endTime": endTime}) return time_dict def get_file_size(self): in_tif_paths = list(glob.glob(os.path.join(self.pack_path, '*.tif'))) in_tif_paths1 = list(glob.glob(os.path.join(self.pack_path, '*.tiff'))) in_tif_paths += in_tif_paths1 size = 0 for file in in_tif_paths: fsize = os.path.getsize(file) # 返回的是字节大小 size += fsize return round(size / float(1024*1024), 2) class CreateProductXml: def __init__(self, par_dict, model_path, xml_path): self.par_dict = par_dict self.xml_path = xml_path shutil.copy(model_path, xml_path) pass def create_standard_xml(self): """将字典中的信息写入到copy的xml文件中""" tree = ElementTree() tree.parse(self.xml_path) # 影像头文件 root = tree.getroot() productinfo = root.find("ProductBasicInfo") for key, value in self.par_dict.items(): if key.split("_")[0] == "imageinfo": productinfo.find(key.split("_")[1]).text = str(value) elif key.split("_")[0] == "SpatialCoverageInformation": imageinfo = productinfo.find("SpatialCoverageInformation") imageinfo.find(key.split("_")[1]).text = str(value) elif key.split("_")[0] == "TimeCoverageInformation": timeInfo = productinfo.find("TimeCoverageInformation") timeInfo.find(key.split("_")[1]).text = str(value) elif key.split("_")[0] == "CoordinateReferenceSystemInformation": geoInfo = productinfo.find("CoordinateReferenceSystemInformation") geoInfo.find(key.split("_")[1]).text = str(value) elif key.split("_")[0] == "MetaInfo": metaInfo = productinfo.find("MetaInfo") metaInfo.find(key.split("_")[1]).text = str(value) ProductProductionInfo = root.find("ProductProductionInfo") # 原始数据信息 for key, value in self.par_dict.items(): if key.split("_")[0] == "DataSources": dataSources = ProductProductionInfo.find("DataSources") dataSource = dataSources.find("DataSource") dataSource.find(key.split("_")[2]).text = str(value) elif key.split("_")[0] == "ObservationGeometry": ObservationGeometry = ProductProductionInfo.find("ObservationGeometry") ObservationGeometry.find(key.split("_")[1]).text = str(value) elif key.split("_")[0] == "ProductProductionInfo": ProductProductionInfo.find(key.split("_")[1]).text = str(value) # ProductPublishInfo = root.find("ProductPublishInfo") # 发布者信息 # for key, value in self.par_dict.items(): # if key.split("_")[0] == "ProductPublishInfo": # ProductPublishInfo.find(key.split("_")[1]).text = str(value) tree.write(self.xml_path, encoding="utf-8", xml_declaration=True) class OrthoAzimuth: @staticmethod def FindInfomationFromJson(HeaderFile_dom_json, node_path_list): """ 在Json文件中,按照指定路径解析出制定节点 """ result_node = HeaderFile_dom_json for nodename in node_path_list: result_node = result_node[nodename] return result_node @staticmethod def get_Azimuth_incidence(Azimuth_path): Azimuth_incidence = 0 if not os.path.exists(Azimuth_path): return Azimuth_incidence with open(Azimuth_path) as f: Azimuth_incidence = f.readline() return Azimuth_incidence @staticmethod def read_Azimuth_incidence(xml_path): # tree = ElementTree() # tree.parse(xml_path) # root = tree.getroot() # Azimuth_incidence = float(root.find('ProductProductionInfo').find('ObservationGeometry').find('SatelliteAzimuth').text) # return Azimuth_incidence with open(xml_path, 'r', encoding='utf-8') as fp: HeaderFile_dom_str = fp.read() HeaderFile_dom = xmltodict.parse(HeaderFile_dom_str) # 将XML转成json文本 HeaderFile_dom_json = json.loads(json.dumps(HeaderFile_dom)) node_path_list = ['Root', 'ProductProductionInfo', 'ObservationGeometry', 'SatelliteAzimuth'] Azimuth_incidence = OrthoAzimuth.FindInfomationFromJson(HeaderFile_dom_json, node_path_list) return Azimuth_incidence if __name__ == '__main__': image_path = r'D:\Micro\WorkSpace\test\GF3B_MYC_QPSI_003581_E120.6_N31.3_20220729_L1B_h_h_L10000073024_db_RD_geo.tif' origin_xml = r'D:\Micro\WorkSpace\Ortho\Temporary\package\GF3B_MYC_QPSI_003581_E120.6_N31.3_20220729_L1A_AHV_L10000073024.meta.xml' tem_folder = r'D:\Micro\WorkSpace\test' pack_path = r'D:\Micro\WorkSpace\Ortho\Temporary\package' out_dem_path1 = os.path.join(tem_folder, "trans_dem_geo_projcs.tif") out_dem_path2 = os.path.join(tem_folder, "trans_dem_projcs_geo.tif") para_dict = CreateMetaDict(image_path, origin_xml, pack_path, out_dem_path1, out_dem_path2).calu_nature() model_path = r'D:\Project\microproduct\Ortho\product.xml' xml_path = r'D:\Micro\WorkSpace\test\test.xml' CreateProductXml(para_dict, model_path, xml_path).create_standard_xml()