import ctypes # 定义与 C++ 中 DataFileInfo 对应的结构体 class DataFileInfo(ctypes.Structure): _fields_ = [ ("Height", ctypes.c_int64), ("Width", ctypes.c_int64), ("Num", ctypes.c_int64), ("firstTimestamp", ctypes.c_int64), ("lastTimestamp", ctypes.c_int64), ("minLon", ctypes.c_double), ("maxLon", ctypes.c_double), ("minLat", ctypes.c_double), ("maxLat", ctypes.c_double), ("ESPGCODE", ctypes.c_int64), ("T11", ctypes.c_double), ("T12", ctypes.c_double), ("T13", ctypes.c_double), ("T21", ctypes.c_double), ("T22", ctypes.c_double), ("T23", ctypes.c_double), ("fileSize", ctypes.c_int64) ] class WindDataHandler: def __init__(self, dll_path='./LAMPWindData.dll'): self.dll_path = dll_path self.wind_lib = ctypes.CDLL(dll_path) self._setup_prototypes() self.wind_lib.getDataFileInfo.argtypes = [ctypes.c_char_p] # 参数是 const char* self.wind_lib.getDataFileInfo.restype = DataFileInfo # 返回类型是 DataFileInfo 结构体 self.current_file_info = None def _setup_prototypes(self): """设置 DLL 函数的参数和返回类型""" # ... 函数原型设置代码见上一节,此处省略 ... def get_data_file_info(self, filepath): """获取风场数据文件信息""" # 将Python字符串转换为字节字符串 filepath_bytes = filepath.encode('utf-8') info = self.wind_lib.getDataFileInfo(filepath_bytes) self.current_file_info = info return info def get_wind_data_file_time_arr(self, filepath, info): """获取时间数组""" filepath_bytes = filepath.encode('utf-8') if info.Num <= 0: return [] # 创建足够大的数组来存储时间戳 time_arr = (ctypes.c_int64 * info.Num)() result = self.wind_lib.get_WindDataFileTimeArr(filepath_bytes, info, time_arr) if result == 0: # 假设返回0表示成功 return list(time_arr) else: print(f"Error getting time array: {result}") return [] def read_wind_data_file(self, filepath, info, time_id): """读取指定时间ID的风场数据 (U/V分量)""" filepath_bytes = filepath.encode('utf-8') layer_size = info.Height * info.Width # 创建数组来存储U和V分量数据 u_arr = (ctypes.c_double * layer_size)() v_arr = (ctypes.c_double * layer_size)() result = self.wind_lib.Read_WindDataFile(filepath_bytes, info, time_id, u_arr, v_arr) if result >= 0: # 将ctypes数组转换为Python列表或numpy数组(如果可用) return list(u_arr), list(v_arr) else: print(f"Error reading wind data: {result}") return None, None def write_wind_data_file(self, filepath, info, timeInt64,time_id, u_arr, v_arr): """将风场数据写入文件""" filepath_bytes = filepath.encode('utf-8') layer_size = info.Height * info.Width # 确保输入数据长度正确 if len(u_arr) != layer_size or len(v_arr) != layer_size: raise ValueError(f"UArr and VArr must have length {layer_size}") # 将Python列表转换为ctypes数组 u_arr_ctypes = (ctypes.c_double * layer_size)(*u_arr) v_arr_ctypes = (ctypes.c_double * layer_size)(*v_arr) result = self.wind_lib.Write_WindDataFile(filepath_bytes, info, timeInt64,time_id, u_arr_ctypes, v_arr_ctypes) return result