microproduct/tool/algorithm/ml/machineLearning.py

515 lines
22 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import shutil
import sklearn # 用于解决打包错误
import sklearn.utils # 用于解决打包错误
import sklearn.utils._cython_blas # 用于解决打包错误
import sklearn.utils._weight_vector # 用于解决打包错误
import sklearn.neighbors # 用于解决打包错误
import sklearn.neighbors._typedefs # 用于解决打包错误
import sklearn.neighbors._partition_nodes # 用于解决打包错误
import sklearn.neighbors._quad_tree # 用于解决打包错误
import sklearn.tree._utils # 用于解决打包错误
from sklearn.cross_decomposition import PLSRegression
from sklearn.ensemble import ExtraTreesClassifier
from sklearn.ensemble import RandomForestClassifier
from sklearn.ensemble import ExtraTreesClassifier
from sklearn.svm import SVC
import numpy as np
from scipy.stats import pearsonr
from tool.algorithm.image.ImageHandle import ImageHandler
from tool.algorithm.block.blockprocess import BlockProcess
import logging
import os
import glob
from PIL import Image
from tool.file.fileHandle import fileHandle
import multiprocessing
logger = logging.getLogger("mylog")
file = fileHandle()
class MachineLeaning:
"""
机器学习库
"""
def __init__(self):
pass
@staticmethod
def gene_optimal_train_set(train_data_dic, feature_tif_dir, important_threshold=0.3, correlation_threshold=0.7): # todo 修改特征重要性
ml = MachineLeaning()
name_list = ml.get_name_list(feature_tif_dir)
X_train, Y_train = ml.gene_train_set(train_data_dic, feature_tif_dir)
optimal_feature = ml.sel_optimal_feature_set(X_train, Y_train, threshold=important_threshold)
optimal_feature = ml.remove_correlation_feature(X_train, optimal_feature, threshold=correlation_threshold)
X_train = X_train[:, optimal_feature]
logger.info('train_feature:%s', np.array(name_list)[optimal_feature])
return X_train, Y_train, optimal_feature
@ staticmethod
def sel_optimal_feature(X_train, Y_train, name_list,important_threshold=0.3, correlation_threshold=0.7):
ml = MachineLeaning()
optimal_feature = ml.sel_optimal_feature_set(X_train, Y_train, threshold=important_threshold)
optimal_feature = ml.remove_correlation_feature(X_train, optimal_feature, threshold=correlation_threshold)
X_train = X_train[:, optimal_feature]
logger.info('train_feature:%s', np.array(name_list)[optimal_feature])
return X_train, Y_train, optimal_feature
@staticmethod
def gene_test_set(feature_tif_dir, optimal_feature):
"""
生成测试集
:param feature_tif_dir : 特征影像路径字典
:param optimal_feature : 最优特征子集
:return X_test_list : 分块测试集影像路径
"""
in_tif_paths = list(glob.glob(os.path.join(feature_tif_dir, '*.tif')))
cols = ImageHandler.get_img_width(in_tif_paths[0])
rows = ImageHandler.get_img_height(in_tif_paths[0])
workspace_block_tif_path = os.path.join(feature_tif_dir, 'block')
workspace_block_feature_path = os.path.join(feature_tif_dir, 'feature')
feature_cut_dir = os.path.join(feature_tif_dir, 'feature_cut')
file.creat_dirs([workspace_block_tif_path, workspace_block_feature_path, feature_cut_dir])
for m, feature_file in zip(range(len(in_tif_paths)), in_tif_paths):
if m in optimal_feature:
shutil.copy(feature_file, os.path.join(feature_cut_dir, os.path.basename(feature_file)))
# 特征分块
bp = BlockProcess()
block_size = bp.get_block_size(rows, cols)
logger.info('block size is :%s', block_size)
bp.cut(feature_cut_dir, workspace_block_tif_path, ['tif', 'tiff'], 'tif', block_size)
img_dir, img_name = bp.get_file_names(workspace_block_tif_path, ['tif'])
dir_dict_all = bp.get_same_img(img_dir, img_name)
# 选择最优特征子集特征影像
dir_dict = {}
for n, key in zip(range(len(dir_dict_all)), dir_dict_all):
# if n in optimal_feature:
dir_dict.update({key: dir_dict_all[key]})
logger.info('test_feature:%s', dir_dict.keys())
logger.info('blocking tifs success!')
X_test_list = []
# 特征维度合并
for key in dir_dict:
key_name = key
block_num = len(dir_dict[key])
break
for i in range(block_num):
file_list = []
for value in dir_dict.values():
file_list.append(os.path.basename(value[i]))
name = os.path.basename(file_list[0])
suffix = 'features_' + name.split('_')[-4] + "_" + name.split('_')[-3] + "_" + name.split('_')[-2] + "_" + \
name.split('_')[-1]
out_str = os.path.join(workspace_block_feature_path, suffix)
X_test_list.append(out_str)
input_str1 = os.path.dirname(value[0])
input_str2 = ','.join(file_list)
input_str3 = out_str
cmd = r".\baseTool\tifMerge\x64\Release\tifMerge.exe {} {} {}".format(input_str1, input_str2, input_str3)
# print(cmd)
os.system(cmd)
# for n in range(block_num):
# name = os.path.basename(dir_dict[key_name][n])
# suffix = '_' + name.split('_')[-4] + "_" + name.split('_')[-3] + "_" + name.split('_')[-2] + "_" + name.split('_')[-1]
# features_path = os.path.join(workspace_block_feature_path, "features" + suffix) # + "\\features" + suffix
# X_test_list.append(features_path)
# features_array = np.zeros((len(dir_dict), block_size, block_size), dtype='float32')
# for m, value in zip(range(len(dir_dict)), dir_dict.values()):
# features_array[m, :, :] = ImageHandler.get_band_array(value[n])
# features_array[np.isnan(features_array)] = 0.0 # 异常值转为0
# ImageHandler.write_img(features_path, '', [0, 0, 0, 0, 0, 0], features_array)
logger.info('create features matrix success!')
# file.del_folder(workspace_block_tif_path)
# file.del_folder(workspace_block_feature_path)
return X_test_list
@staticmethod
def predict_blok(clf, X_test, rows, cols, img_path, row_begin, col_begin, block_sum, n):
logger.info('total:%s,block:%s testing data !path:%s', block_sum, n, img_path)
Y_test = clf.predict(X_test)
img = Y_test.reshape(rows, cols)
out_image = Image.fromarray(img)
out_image.save(img_path)
# bp = BlockProcess()
# bp.assign_spatial_reference_bypoint(row_begin, col_begin, self.__proj, self.__geo, img_path)
# sr = osr.SpatialReference()
# sr.ImportFromWkt(self.__proj)
# geo_transform = (self.__geo[0] + col_begin * self.__geo[1] + row_begin * self.__geo[2],
# self.__geo[1],
# self.__geo[2],
# self.__geo[3] + col_begin * self.__geo[4] + row_begin * self.__geo[5],
# self.__geo[4],
# self.__geo[5]
# )
# dst_ds = gdal.Open(img_path, gdal.GA_Update)
# if dst_ds is None:
# return False
# dst_ds.SetProjection(sr.ExportToWkt())
# dst_ds.SetGeoTransform(geo_transform)
# del dst_ds
if not os.path.exists(img_path):
logger.error('total:%s,block:%s test data failed !path:%s', block_sum, n, img_path)
logger.info('total:%s,block:%s test data finished !path:%s', block_sum, n, img_path)
return True
@staticmethod
def predict(clf, X_test_list, out_tif_name, workspace_processing_path,rows, cols):
"""
预测数据
:param clf : svm模型
:return X_test_list: 分块测试集影像路径
"""
ml = MachineLeaning()
# 开启多进程处理
bp = BlockProcess()
block_size = bp.get_block_size(rows, cols)
block_features_dir = X_test_list
bp_cover_dir = os.path.join(workspace_processing_path, out_tif_name + '\\') # workspace_processing_path + out_tif_name + '\\'
file.creat_dirs([bp_cover_dir])
processes_num = min([len(block_features_dir), multiprocessing.cpu_count() - 1])
pool = multiprocessing.Pool(processes=processes_num)
for path, n in zip(block_features_dir, range(len(block_features_dir))):
name = os.path.split(path)[1]
# features_array = ImageHandler.get_data(path)
band = ImageHandler.get_bands(path)
if band == 1:
features_array = np.zeros((1, 1024, 1024), dtype=float)
feature_array = ImageHandler.get_data(path)
features_array[0, :, :] = feature_array
else:
features_array = ImageHandler.get_data(path)
X_test = np.reshape(features_array, (features_array.shape[0], features_array[0].size)).T
suffix = '_' + name.split('_')[-4] + "_" + name.split('_')[-3] + "_" + name.split('_')[-2] + "_" + name.split('_')[-1]
img_path = os.path.join(bp_cover_dir, out_tif_name + suffix) # bp_cover_dir + out_tif_name + suffix
row_begin = int(name.split('_')[-4])
col_begin = int(name.split('_')[-2])
pool.apply_async(ml.predict_blok, (clf, X_test, block_size, block_size, img_path, row_begin, col_begin, len(block_features_dir), n))
pool.close()
pool.join()
# 合并影像
data_dir = bp_cover_dir
out_path = workspace_processing_path[0:-1]
bp.combine(data_dir, cols, rows, out_path, file_type=['tif'], datetype='float32')
# 添加地理信息
cover_path = os.path.join(workspace_processing_path, out_tif_name + ".tif") # workspace_processing_path + out_tif_name + ".tif"
# bp.assign_spatial_reference_byfile(self.__ref_img_path, cover_path)
return cover_path
@staticmethod
def predict_VP(clf, X_test_list, out_tif_name, workspace_processing_path, rows, cols):
"""
预测数据
:param clf : svm模型
:return X_test_list: 分块测试集影像路径
"""
ml = MachineLeaning()
# 开启多进程处理
bp = BlockProcess()
block_size = bp.get_block_size(rows, cols)
name_d = out_tif_name.split('_')[6] + '_VTH'
block_features_dir = X_test_list
bp_cover_dir = os.path.join(workspace_processing_path, name_d,
'pre_result\\') # workspace_processing_path + out_tif_name + '\\'
file.creat_dirs([bp_cover_dir])
processes_num = min([len(block_features_dir), multiprocessing.cpu_count() - 7])
pool = multiprocessing.Pool(processes=processes_num)
for path, n in zip(block_features_dir, range(len(block_features_dir))):
name = os.path.split(path)[1]
band = ImageHandler.get_bands(path)
if band == 1:
features_array = np.zeros((1, block_size, block_size), dtype=float)
feature_array = ImageHandler.get_data(path)
features_array[0, :, :] = feature_array
else:
features_array = ImageHandler.get_data(path)
X_test = np.reshape(features_array, (features_array.shape[0], features_array[0].size)).T
suffix = '_' + name.split('_')[-4] + "_" + name.split('_')[-3] + "_" + name.split('_')[-2] + "_" + \
name.split('_')[-1]
img_path = os.path.join(bp_cover_dir, name_d + suffix) # bp_cover_dir + out_tif_name + suffix
row_begin = int(name.split('_')[-4])
col_begin = int(name.split('_')[-2])
pool.apply_async(ml.predict_blok, (clf, X_test, block_size, block_size, img_path, row_begin, col_begin, len(block_features_dir), n))
# ml.predict_blok(clf, X_test, block_size, block_size, img_path, row_begin, col_begin, len(block_features_dir), n)
pool.close()
pool.join()
del pool
# 合并影像
data_dir = bp_cover_dir
out_path = workspace_processing_path[0:-1]
bp.combine(data_dir, cols, rows, out_path, file_type=['tif'], datetype='float32')
# 添加地理信息
cover_path = os.path.join(workspace_processing_path,
name_d + ".tif") # workspace_processing_path + out_tif_name + ".tif"
# bp.assign_spatial_reference_byfile(self.__ref_img_path, cover_path)
return cover_path
@staticmethod
def get_name_list(feature_tif_dir):
in_tif_paths = list(glob.glob(os.path.join(feature_tif_dir, '*.tif')))
name_list = []
dim = len(in_tif_paths)
for n, path in zip(range(dim), in_tif_paths):
name_list.append(str(n)+': '+os.path.split(path)[1])
logger.info('feature_list:%s', name_list)
return name_list
@staticmethod
def gene_train_set(train_data_dic, feature_tif_dir):
"""
生成训练集
:param train_data_dic : 从csv读取的训练数据
:param feature_tif_dir : 特征影像路径路径
:return X_train, Y_train : 训练数据
"""
in_tif_paths = list(glob.glob(os.path.join(feature_tif_dir, '*.tif')))
dim = len(in_tif_paths)
X_train = np.empty(shape=(0, dim))
Y_train = np.empty(shape=(0, 1))
ids = train_data_dic['ids']
positions = train_data_dic['positions']
for id, points in zip(ids, positions):
# for data in train_data_list:
if points == []:
raise Exception('data is empty!')
row, col = zip(*points)
l = len(points)
X = np.empty(shape=(l, dim))
for n, tif_path in zip(range(dim), in_tif_paths):
feature_array = ImageHandler.get_data(tif_path)
feature_array[np.isnan(feature_array)] = 0 # 异常值填充为0
x = feature_array[row, col].T
X[:, n] = x
Y = np.full((l, 1), id)
X_train = np.vstack((X_train, X))
Y_train = np.vstack((Y_train, Y))
Y_train = Y_train.T[0, :]
logger.info("gene_train_set success!")
return X_train, Y_train
@staticmethod
def gene_train_set_deLandcover(train_data_dic, feature_tif_dir, land_cover_tif, coverId):
"""
生成训练集
:param train_data_dic : 从csv读取的训练数据
:param feature_tif_dir : 特征影像路径路径
:return X_train, Y_train : 训练数据
"""
in_tif_paths = list(glob.glob(os.path.join(feature_tif_dir, '*.tif')))
land_arr = ImageHandler.get_band_array(land_cover_tif, 1)
dim = len(in_tif_paths)
X_train = np.empty(shape=(0, dim))
Y_train = np.empty(shape=(0, 1))
ids = train_data_dic['ids']
positions = train_data_dic['positions']
for id, points in zip(ids, positions):
# for data in train_data_list:
if points == []:
raise Exception('data is empty!')
row, col = zip(*points)
l = len(points)
X = np.empty(shape=(l, dim))
for n, tif_path in zip(range(dim), in_tif_paths):
feature_array = ImageHandler.get_data(tif_path)
feature_array[np.isnan(feature_array)] = 0 # 异常值填充为0
for id in coverId:
feature_array[np.where(land_arr == id)] = 0
x = feature_array[row, col].T
X[:, n] = x
Y = np.full((l, 1), id)
X_train = np.vstack((X_train, X))
Y_train = np.vstack((Y_train, Y))
Y_train = Y_train.T[0, :]
logger.info("gene_train_set success!")
return X_train, Y_train
@staticmethod
def standardization(data, num=1):
# 矩阵标准化到[0,1]
min = np.nanmin(data)
max = np.nanmax(data)
data[np.isnan(data)] = min # 异常值填充为0
_range = max - min
return (data - min) / _range * num
@staticmethod
def sel_optimal_feature_set(X_train, Y_train, threshold=0.01):
"""
筛选最优特征组合(极度随机树)
"""
model = ExtraTreesClassifier()
max = np.max(Y_train)
if max < 0.1:
Y_train = (Y_train*10000).astype('int')
model.fit(X_train, Y_train.astype('int'))
# select the relative importance of each attribute
importances = model.feature_importances_
logger.info('importances:%s,threshold=%s', importances, threshold)
importances_resort = -np.sort(-importances) # 从大到小排序
imp_argsort = np.argsort(-importances) # 输出从大到小的序号
optimal_feature = list(imp_argsort[np.where(importances_resort > threshold)]) # 过滤重要性低的特征
logger.info('optimal_feature:%s', optimal_feature)
if len(optimal_feature)==0:
logger.error('optimal_feature is empty')
optimal_feature = list(imp_argsort)
return optimal_feature
@staticmethod
def correlation_map(x, y):
# https://blog.csdn.net/weixin_39836726/article/details/110783640
# cc matrix based on scipy pearsonr
n_row_x = x.shape[0]
n_row_y = x.shape[0]
ccmtx_xy = np.empty((n_row_x, n_row_y))
for n in range(n_row_x):
for m in range(n_row_y):
ccmtx_xy[n, m] = pearsonr(x[n, :], y[m, :])[0]
return ccmtx_xy
@staticmethod
def remove_correlation_feature(X_train,validity_list, threshold=0.85):
"""
相关性抑制,去除相关性
:param X_train : 训练集
:param validity_list : 最优特征子集
:param threshold: 相关性阈值
:return validity_list : 最优特征子集
"""
ccmtx = MachineLeaning().correlation_map(X_train[:, validity_list].T, X_train[:, validity_list].T)
ccmtx = np.abs(ccmtx)
for r in range(len(validity_list)):
for c in range(len(validity_list)):
if c <= r:
ccmtx[r, c] = 0
logger.info('correlation_map:\n %s', ccmtx)
# 相关性大于0.85的特征删除com_sep_coef较大的特征
high_corr = np.unique(np.where(ccmtx > threshold)[1]) # 删除的特征序号
validity_list = np.delete(validity_list, high_corr)
logger.info('validity_list_corr:%s', validity_list)
logger.info(validity_list)
return validity_list
@staticmethod
def gene_train_data(block_features_dir,rows,cols,block_size,measured_data_img):
# 生成训练集
X_train = []
Y_train = []
block_rows = int(np.ceil(rows/block_size))
block_cols = int(np.ceil(cols/block_size))
for data, n in zip(measured_data_img, range(len(measured_data_img))):
row = data[0]
col = data[1]
block_row = row//block_size
block_col = col//block_size
if block_row == block_rows-1:
# part_img_row = row - (rows - block_size)
part_img_row = row - (block_row * block_size)
else:
part_img_row = row % block_size
if block_col == block_cols-1:
# part_img_col = col - (cols-block_size)
part_img_col = col - (block_col * block_size)
else:
part_img_col = col % block_size
# features_path = block_features_dir[block_row*(block_rows-1) + block_col]
features_path = block_features_dir[block_row*block_cols + block_col]
features_array = ImageHandler().get_data(features_path)
feature = features_array[:, part_img_row, part_img_col]
if not np.isnan(feature).any() or np.isinf(feature).any():
X_train.append(list(feature))
Y_train.append([data[2]])
logger.info('total:%s,num:%s create train set success!', len(measured_data_img), n)
return np.array(X_train), np.array(Y_train)
@staticmethod
def get_train_data(features_array, measured_data):
X_train = []
Y_train = []
for data in measured_data:
row = data[0]
col = data[1]
feature = features_array[:, row, col]
if not np.isnan(feature).any() or np.isinf(feature).any():
X_train.append(list(feature))
Y_train.append([data[2]])
return np.array(X_train), np.array(Y_train)
@staticmethod
def trainRF(X_train, Y_train):
#随机森林
logger.info('RF trainning')
clf = RandomForestClassifier()
clf.fit(X_train, Y_train)
return clf
@staticmethod
def trainSVM(X_train, Y_train, cost=1, kernel='rbf'):
logger.info('svm trainning')
clf = SVC(decision_function_shape='ovo')
clf.fit(X_train, Y_train)
SVC(C=cost, cache_size=1000, class_weight='balanced', coef0=0.0, decision_function_shape='ovr',
degree=3, gamma='auto', kernel=kernel, max_iter=-1, probability=False, random_state=None,
shrinking=True, tol=0.001, verbose=True)
return clf
@staticmethod
def vegetationPhenology_combine_feature(feature_dir,workspace_processing_path, name, rows, cols, debug =False):
ml = MachineLeaning()
path_list = list(glob.glob(os.path.join(feature_dir, '*.tif')))
#多维矩阵合并为一个
name_featuresPath_dic = {}
dim = len(path_list)
features_path = workspace_processing_path + name + "/"+ name +'_features.tif'
if debug== False:
features_array = np.zeros((dim, rows, cols), dtype='float16')
for m, path in zip(range(dim), path_list):
data = ImageHandler.get_data(path)
data = ml.standardization(data)
features_array[m, :, :] = data
# 异常值转为0
features_array[np.isnan(features_array)] = 0.0
features_array[np.isinf(features_array)] = 0.0
ImageHandler.write_img(features_path, '', [0, 0, 0, 0, 0, 0], features_array)
name_featuresPath_dic.update({name: features_path})
return name_featuresPath_dic