Manual-Labeling-Tool/WindFileData/WindDataHandler.py

101 lines
3.6 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import ctypes
# 定义与 C++ 中 DataFileInfo 对应的结构体
class DataFileInfo(ctypes.Structure):
_fields_ = [
("Height", ctypes.c_int64),
("Width", ctypes.c_int64),
("Num", ctypes.c_int64),
("firstTimestamp", ctypes.c_int64),
("lastTimestamp", ctypes.c_int64),
("minLon", ctypes.c_double),
("maxLon", ctypes.c_double),
("minLat", ctypes.c_double),
("maxLat", ctypes.c_double),
("ESPGCODE", ctypes.c_int64),
("T11", ctypes.c_double),
("T12", ctypes.c_double),
("T13", ctypes.c_double),
("T21", ctypes.c_double),
("T22", ctypes.c_double),
("T23", ctypes.c_double),
("fileSize", ctypes.c_int64)
]
class WindDataHandler:
def __init__(self, dll_path='./LAMPWindData.dll'):
self.dll_path = dll_path
self.wind_lib = ctypes.CDLL(dll_path)
self._setup_prototypes()
self.wind_lib.getDataFileInfo.argtypes = [ctypes.c_char_p] # 参数是 const char*
self.wind_lib.getDataFileInfo.restype = DataFileInfo # 返回类型是 DataFileInfo 结构体
self.current_file_info = None
def _setup_prototypes(self):
"""设置 DLL 函数的参数和返回类型"""
# ... 函数原型设置代码见上一节,此处省略 ...
def get_data_file_info(self, filepath):
"""获取风场数据文件信息"""
# 将Python字符串转换为字节字符串
filepath_bytes = filepath.encode('utf-8')
info = self.wind_lib.getDataFileInfo(filepath_bytes)
self.current_file_info = info
return info
def get_wind_data_file_time_arr(self, filepath, info):
"""获取时间数组"""
filepath_bytes = filepath.encode('utf-8')
if info.Num <= 0:
return []
# 创建足够大的数组来存储时间戳
time_arr = (ctypes.c_int64 * info.Num)()
result = self.wind_lib.get_WindDataFileTimeArr(filepath_bytes, info, time_arr)
if result == 0: # 假设返回0表示成功
return list(time_arr)
else:
print(f"Error getting time array: {result}")
return []
def read_wind_data_file(self, filepath, info, time_id):
"""读取指定时间ID的风场数据 (U/V分量)"""
filepath_bytes = filepath.encode('utf-8')
layer_size = info.Height * info.Width
# 创建数组来存储U和V分量数据
u_arr = (ctypes.c_double * layer_size)()
v_arr = (ctypes.c_double * layer_size)()
result = self.wind_lib.Read_WindDataFile(filepath_bytes, info, time_id, u_arr, v_arr)
if result >= 0:
# 将ctypes数组转换为Python列表或numpy数组如果可用
return list(u_arr), list(v_arr)
else:
print(f"Error reading wind data: {result}")
return None, None
def write_wind_data_file(self, filepath, info, timeInt64,time_id, u_arr, v_arr):
"""将风场数据写入文件"""
filepath_bytes = filepath.encode('utf-8')
layer_size = info.Height * info.Width
# 确保输入数据长度正确
if len(u_arr) != layer_size or len(v_arr) != layer_size:
raise ValueError(f"UArr and VArr must have length {layer_size}")
# 将Python列表转换为ctypes数组
u_arr_ctypes = (ctypes.c_double * layer_size)(*u_arr)
v_arr_ctypes = (ctypes.c_double * layer_size)(*v_arr)
result = self.wind_lib.Write_WindDataFile(filepath_bytes, info, timeInt64,time_id, u_arr_ctypes, v_arr_ctypes)
return result