Manual-Labeling-Tool/WindFileData/WindDataHandler.py

101 lines
3.6 KiB
Python
Raw Permalink Normal View History

2025-11-20 09:32:46 +00:00
import ctypes
# 定义与 C++ 中 DataFileInfo 对应的结构体
class DataFileInfo(ctypes.Structure):
_fields_ = [
("Height", ctypes.c_int64),
("Width", ctypes.c_int64),
("Num", ctypes.c_int64),
("firstTimestamp", ctypes.c_int64),
("lastTimestamp", ctypes.c_int64),
("minLon", ctypes.c_double),
("maxLon", ctypes.c_double),
("minLat", ctypes.c_double),
("maxLat", ctypes.c_double),
("ESPGCODE", ctypes.c_int64),
("T11", ctypes.c_double),
("T12", ctypes.c_double),
("T13", ctypes.c_double),
("T21", ctypes.c_double),
("T22", ctypes.c_double),
("T23", ctypes.c_double),
("fileSize", ctypes.c_int64)
]
class WindDataHandler:
def __init__(self, dll_path='./LAMPWindData.dll'):
self.dll_path = dll_path
self.wind_lib = ctypes.CDLL(dll_path)
self._setup_prototypes()
2025-11-20 09:32:46 +00:00
self.wind_lib.getDataFileInfo.argtypes = [ctypes.c_char_p] # 参数是 const char*
self.wind_lib.getDataFileInfo.restype = DataFileInfo # 返回类型是 DataFileInfo 结构体
self.current_file_info = None
2025-11-20 09:32:46 +00:00
def _setup_prototypes(self):
"""设置 DLL 函数的参数和返回类型"""
# ... 函数原型设置代码见上一节,此处省略 ...
def get_data_file_info(self, filepath):
"""获取风场数据文件信息"""
# 将Python字符串转换为字节字符串
filepath_bytes = filepath.encode('utf-8')
info = self.wind_lib.getDataFileInfo(filepath_bytes)
self.current_file_info = info
return info
def get_wind_data_file_time_arr(self, filepath, info):
"""获取时间数组"""
filepath_bytes = filepath.encode('utf-8')
if info.Num <= 0:
return []
# 创建足够大的数组来存储时间戳
time_arr = (ctypes.c_int64 * info.Num)()
result = self.wind_lib.get_WindDataFileTimeArr(filepath_bytes, info, time_arr)
if result == 0: # 假设返回0表示成功
return list(time_arr)
else:
print(f"Error getting time array: {result}")
return []
def read_wind_data_file(self, filepath, info, time_id):
"""读取指定时间ID的风场数据 (U/V分量)"""
filepath_bytes = filepath.encode('utf-8')
layer_size = info.Height * info.Width
# 创建数组来存储U和V分量数据
u_arr = (ctypes.c_double * layer_size)()
v_arr = (ctypes.c_double * layer_size)()
result = self.wind_lib.Read_WindDataFile(filepath_bytes, info, time_id, u_arr, v_arr)
2025-11-20 09:32:46 +00:00
if result >= 0:
# 将ctypes数组转换为Python列表或numpy数组如果可用
return list(u_arr), list(v_arr)
else:
print(f"Error reading wind data: {result}")
return None, None
2025-11-20 09:32:46 +00:00
def write_wind_data_file(self, filepath, info, timeInt64,time_id, u_arr, v_arr):
"""将风场数据写入文件"""
filepath_bytes = filepath.encode('utf-8')
layer_size = info.Height * info.Width
# 确保输入数据长度正确
if len(u_arr) != layer_size or len(v_arr) != layer_size:
raise ValueError(f"UArr and VArr must have length {layer_size}")
# 将Python列表转换为ctypes数组
u_arr_ctypes = (ctypes.c_double * layer_size)(*u_arr)
v_arr_ctypes = (ctypes.c_double * layer_size)(*v_arr)
2025-11-20 09:32:46 +00:00
result = self.wind_lib.Write_WindDataFile(filepath_bytes, info, timeInt64,time_id, u_arr_ctypes, v_arr_ctypes)
return result