microproduct-l-sar/soilMoisture-L-SAR/SoilMoistureALg.py

267 lines
10 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

# -*- coding: UTF-8 -*-
"""
@Project:microproduct
@File:SoilMoistureALg.py
@Function:实现土壤水分计算的算法
@Contact:
@Author:SHJ
@Date:2021/8/10 10:01
@Version:1.0.0
"""
import logging
from tool.algorithm.image.ImageHandle import ImageHandler
import numpy as np
import time
import scipy
import scipy.spatial.transform._rotation_groups # 解决打包的问题
from scipy.optimize import fmin
import math
from tool.algorithm.algtools.oh2004 import oh2004
logger = logging.getLogger("mylog")
class MoistureAlg:
def __init__(self,):
pass
@staticmethod
def cal_vwc(vwc_path, ndwi_path, e1, e2):
"""
:param vwc_path:
:param ndwi_path:
:param e1:
:param e2:
:return: True or False
"""
image_handler = ImageHandler()
proj = image_handler.get_projection(ndwi_path)
geotrans = image_handler.get_geotransform(ndwi_path)
array = image_handler.get_band_array(ndwi_path, 1)
# 原方案的计算方式
vwc = e1 * np.square(array) + e2 * array
# 论文《基于Radarsat-2全极化数据的高原牧草覆盖地表土壤水分反演》的计算方式
#vwc = e1 * array + e2
image_handler.write_img(vwc_path, proj, geotrans, vwc)
return True
@staticmethod
def cal_bare_soil_bsc(bsc_path, tif_path, vwc_path, arrival_angle_path, c, d):
"""
:param bsc_path:裸土后向散射系数(线性值)
:param tif_path:输入影像 会自动完成 dB 到线性值变换
:param vwc_path:vwc影像路径
:param arrival_angle_path:入射角影像文件(单位:弧度)
:param c:经验系数 a
:param d:经验系数 b
"""
image_handler = ImageHandler()
proj = image_handler.get_projection(tif_path)
geotrans = image_handler.get_geotransform(tif_path)
tif_array = image_handler.get_band_array(tif_path, 1)
vwc_array = image_handler.get_band_array(vwc_path, 1)
angle_array = image_handler.get_band_array(arrival_angle_path, 1)
tif_array=np.power(10.0,tif_array/10.0) # dB --> 线性值 后向散射系数
try:
cos_angle = np.cos(angle_array)
tmp1 = tif_array - c * vwc_array * cos_angle
tmp2 = np.exp(-2 * d * vwc_array * (1/cos_angle))
bsc_array =c * vwc_array * cos_angle + tmp1/tmp2 # --- 公式计算错误
except BaseException as msg:
logger.error(msg)
return False
bsc_array = np.where(bsc_array > 0, bsc_array, 0)
image_handler.write_img(bsc_path, proj, geotrans, bsc_array)
return True
@staticmethod
def cal_soil_moisture(soil_moisture_path, hh_bsc_path, vv_bsc_path, arrival_angle_path, mask_path, λ, f=5.3, T=24.5,bd=1, vsand=0.54, vclay=0.19,block_num=0):
"""
:param soil_moisture_path:土壤水分产品路径
:param hh_bsc_path:hh裸土后向散射系数
:param vv_bsc_path:vv裸土后向散射系数
:param arrival_angle_path:入射角影像文件
:param mask_path:掩膜影像文件
:param λ:经验系数
:param f:微波频率单位GHz
:param T:温度,摄氏度
:param bd:土壤容重
:param vsand:沙土含量范围0-1
:param vclay:黏土含量范围0-1
:return: True or False
"""
image_handler = ImageHandler()
proj = image_handler.get_projection(hh_bsc_path)
geotrans = image_handler.get_geotransform(hh_bsc_path)
angle_array = image_handler.get_band_array(arrival_angle_path, 1)
try:
# 计算土壤介电常数
hh_bsc_array = image_handler.get_band_array(hh_bsc_path, 1)
if np.any(hh_bsc_array < 0):
logger.error("hh_bsc_array include negative value!")
return False
tmp = np.power(10, 0.19) * np.power(float(λ), 0.15) * hh_bsc_array
# 处理异常值
where_tmp1_0 = np.where(tmp == 0.0)
tmp[where_tmp1_0] = 1
except BaseException as msg:
logger.error(msg)
return False
try:
vv_bsc_array = image_handler.get_band_array(vv_bsc_path, 1)
if np.any(vv_bsc_array < 0):
logger.error("vv_bsc_array include negative value!")
return False
tmp2 = np.power(np.cos(angle_array), 1.82) * np.power(np.sin(angle_array), 0.93) *\
np.power(vv_bsc_array, 0.786)
# 处理异常值
where_tmp2_0 = np.where(tmp2 == 0.0)
tmp2[where_tmp2_0] = 1
tmp = tmp/tmp2
# 土壤介电常数
soil_dielectric = (1/(0.024 * np.tan(angle_array))) * np.log10(tmp)
soil_dielectric_path = soil_moisture_path.replace("soil_moisture","soil_dielectric")
image_handler.write_img(soil_dielectric_path, proj, geotrans, soil_dielectric)
except BaseException as msg:
logger.error(msg)
return False
mask_array = ImageHandler.get_band_array(mask_path, 1)
soil_dielectric[np.where(mask_array == 0)] = np.nan
try:
# Dobsen模型计算土壤水分
# soil_moisture = dobsen_inverse(f, T, bd, vsand, vclay, soil_dielectric, block_num)
# topp模型计算土壤水分
soil_moisture = -5.3 * np.power(10.0, -2) + 2.92 * np.power(10.0, -2) * soil_dielectric - \
5.5 * np.power(10.0, -4) * np.power(soil_dielectric, 2) + \
4.3 * np.power(10.0, -6) * np.power(soil_dielectric, 3)
# 处理异常值
soil_moisture[where_tmp1_0] = 0
soil_moisture[where_tmp2_0] = 0
except BaseException as msg:
logger.error(msg)
return False
image_handler.write_img(soil_moisture_path, proj, geotrans, soil_moisture)
return True
@staticmethod
def cal_soilM(soil_moisture_path, hh_bsc_path, vv_bsc_path, hv_bsc_path, vh_bsc_path, angle_path,mask_path, wl):
"""
:param soil_moisture_path: 土壤水分路径
:param hh_bsc_path: hh极化路径
:param vv_bsc_path: vv极化路径
:param hv_bsc_path: hv极化路径
:param vh_bsc_path: vh极化路径
:param angle_path: 入射角
:param wl: 波长
:return:
"""
image_handler = ImageHandler()
proj = image_handler.get_projection(hh_bsc_path)
geotrans = image_handler.get_geotransform(hh_bsc_path)
hh_arr = ImageHandler.get_band_array(hh_bsc_path, 1)
hv_arr = ImageHandler.get_band_array(hv_bsc_path, 1)
vh_arr = ImageHandler.get_band_array(vh_bsc_path, 1)
vv_arr = ImageHandler.get_band_array(vv_bsc_path, 1)
angle_arr = ImageHandler.get_band_array(angle_path, 1)
mask_arr = ImageHandler.get_band_array(mask_path, 1)
f = oh2004.lamda2freq(wl/100)/1e9 # wl 原来是cm 得转成m
n = np.array(hh_arr).shape[0] * np.array(hh_arr).shape[1]
hh = np.array(hh_arr).flatten().astype(np.double)
hh[np.where(hh == -9999)] = np.nan
hv = np.array(hv_arr).flatten().astype(np.double)
hv[np.where(hv == -9999)] = np.nan
vv = np.array(vv_arr).flatten().astype(np.double)
vv[np.where(vv == -9999)] = np.nan
angle = np.array(angle_arr).flatten().astype(np.double)
angle[np.where(angle == -9999)] = np.nan
mask = np.array(mask_arr).flatten().astype(np.int32)
# foo_retrieve = inverse_radar(hh_arr, hv_arr, vh_arr, vv_arr, angle_arr, wl)
# mv, h = foo_retrieve.retrieve_oh2004_Cython()
mv = np.zeros(np.array(hh_arr).shape, dtype=np.double).flatten()
h = np.zeros(np.array(hh_arr).shape, dtype=np.double).flatten()
angle=angle*180/3.1415926
oh2004.retrieve_oh2004_main(n, mv, h, mask, vv, hh, hv, hv, angle, f)
soil = np.array(mv).reshape(np.array(hh_arr).shape)
image_handler.write_img(soil_moisture_path, proj, geotrans, soil)
return True
def dobsen_inverse(f, T, bd, vsand, vclay, soil_dielectric_array,block_num, x0=0):
"""
Dobsen模型土壤水分反演
:param f:微波频率单位GHz
:param T:温度,摄氏度
:param bd:土壤容重
:param vsand:沙土含量范围0-1
:param vclay:黏土含量范围0-1
:param soil_dielectric_array:土壤介电常数
:param x0 :土壤水分寻优,初始值
:return: True or False
"""
alpha = 0.65
sd = 2.65
dcs = (1.01 + 0.44 * sd) ** 2 - 0.062
dc0 = 0.008854
dcw0 = 88.045 - 0.4147 * T + 6.295e-4 * (T ** 2) + 1.075e-5 * (T ** 3)
tpt = 0.11109 - 3.824e-3 * T + 6.938e-5 * (T ** 2) - 5.096e-7 * (T ** 3)
dcwinf = 4.9
if f >= 1.4:
sigma = -1.645 + 1.939 * bd - 2.013e-2 * vsand + 1.594e-2 * vclay
else:
sigma = 0.0467 + 0.2204 * bd - 4.111e-3 * vsand + 6.614e-3 * vclay
dcwr = dcwinf + ((dcw0 - dcwinf) / (1 + (tpt * f) ** 2))
betar = 1.2748 - 0.00519 * vsand - 0.00152 * vclay
betai = 1.33797 - 0.00603 * vsand - 0.00166 * vclay
# fun = lambda vwc: np.abs(np.sqrt(
# ((1.0 + (bd / sd) * ((dcs ** alpha) - 1.0) + (vwc ** betar) * (dcwr ** alpha) - vwc) ** (1 / alpha)) ** 2 +
# ((vwc ** (betai / alpha)) * ((tpt * f * (dcw0 - dcwinf)) / (1 + (tpt * f) ** 2) + sigma * (1.0 - (bd / sd)) / (2 * np.pi * dc0 * f * vwc))) ** 2
# ) - soil_dielectric)
fun = lambda vwc: np.abs(np.sqrt(
((1.0 + (bd / sd) * ((dcs ** alpha) - 1.0) + (vwc ** betar) * (dcwr ** alpha) - vwc) ** (1 / alpha)) ** 2 +
((vwc ** (betai / alpha)) * ((tpt * f * (dcw0 - dcwinf)) / (1 + (tpt * f) ** 2) + sigma * (1.0 - (bd / sd)) / (8 * math.atan(1.0) * dc0 * f * vwc))) ** 2
) - soil_dielectric)
soil_dielectric_array[np.isnan(soil_dielectric_array)] = 0
w = np.where((soil_dielectric_array == 0) == False)
num = len(w[0])
soil_mois = np.zeros(num)
bar_list = [0, int(0.1*num), int(0.2*num), int(0.3*num), int(0.4*num), int(0.5*num), int(0.6*num), int(0.7*num), int(0.8*num), int(0.9*num), num-1]
start = time.perf_counter()
for soil_dielectric, n in zip(soil_dielectric_array[w], range(num)):
soil_mois[n] = fmin(fun, x0, disp=0)
if n in bar_list:
logger.info('block:{},cal soil_moisture proggress bar:{}%,use time :{}s'.format(block_num, round(n/num * 100), int(time.perf_counter()-start)))
soil_dielectric_array[w] = soil_mois # 土壤水分
return soil_dielectric_array