from __future__ import annotations
import os
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'
os.environ['TF_CPP_MIN_VLOG_LEVEL'] = '3'
import numpy as np
import pandas as pd
import matplotlib
matplotlib.use('Agg')
import matplotlib.pyplot as plt
plt.rcParams.update({
"text.usetex": True,
"font.size": 12
})
import xarray as xr
import tensorflow as tf
import keras
from keras import layers
from keras import regularizers
from .utils import AutoEncoders, functions
import datetime
import json
import argparse
from itertools import groupby
import warnings
from typing import Union
import requests
import traceback
import sympy
from scipy.spatial import minkowski_distance
from pathlib import Path
import glob
import seaborn as sns
sns.set(font_scale=1.3, rc={'text.usetex' : True})
sns.set_style(style='white')
[docs]def square_dims(size:Union[int, list[int], np.ndarray[int]], ratio_w_h:Union[int,float]=1):
"""
square_dims
Function that return the needed dimensions for the plots of the encoded, given the latent dimension and the ratio between width and height.
Parameters
----------
size: int of list[int]
The latent dimension size.
ratio_w_h: int or float
Desired ration between width and height.
Returns
-------
: tuple
(width, height) dimensions for grid.
Raises
------
ValueError
If input size is not positive.
If ratio_w_h is <= 0.
Notes
-----
- Uses sympy.divisors for efficient factor calculation
- Always returns smaller dimension first in tuple
- For non-integer ratios, finds closest divisor match
"""
divs = np.array(sympy.divisors(size))
dist_to_root = np.abs(divs-np.sqrt(size)*ratio_w_h)
i = np.argmin(dist_to_root)
x_size = int(divs[i])
y_size = size//x_size
return (x_size, y_size) if x_size < y_size else (y_size, x_size)
[docs]def standardize_dims(data: Union[xr.DataArray, xr.Dataset]):
"""
standardize_dims
Standardize dimension names to 'latitude' and 'longitude' across common variants.
Parameters
----------
data : xarray.DataArray or xarray.Dataset
Input data with spatial dimensions to standardize.
Returns
-------
xarray.DataArray or xarray.Dataset
Data with standardized dimension names.
Raises
------
ValueError
If input is not an xarray object.
Notes
-----
- Handles common dimension name variants:
- Latitude: 'lat', 'latitude'
- Longitude: 'lon', 'long', 'longitude'
- Case-insensitive matching
- Preserves all other dimensions unchanged
- Returns original object if no dimension renaming needed
Examples
--------
>>> ds = xr.Dataset(coords={'LAT': [0, 1], 'LON': [0, 1]})
>>> standardized = standardize_dims(ds)
>>> list(standardized.dims)
['latitude', 'longitude']
"""
dim_map = {}
for dim in data.dims:
dim_lower = dim.lower()
if dim_lower in {'lat', 'latitude'}:
dim_map[dim] = 'latitude'
elif dim_lower in {'lon', 'long', 'longitude'}:
dim_map[dim] = 'longitude'
if dim_map:
data = data.rename(dim_map)
return data
[docs]def runAE(input_dim: Union[int, list[int]], latent_dim: int, arch: int, use_VAE: bool, with_cpu: bool, n_epochs: int, data_pred: Union[np.ndarray, list, xr.DataArray], file_save: str, verbose: bool, compile_params: dict = {}, fit_params : dict() = {}):
"""
runAE
Function that performs the AE traing.
Parameters
----------
input_dim: int or list of int
Contains the shape of the input data to the keras.model.
latent_dim: int
Represent the shape of the latent (code) space.
arch: int
Value that determine which model architecture sould be used to build the model.
use_VAE : bool
Value that determines if the model should be a Variational Autoencoder or not.
with_cpu: bool
Value that determines if the cpu should be used instead of (default) gpu.
n_epochs: int
The number of epochs for the keras.model.
data_pred: np.ndarray
Driver/predictor data (usually) to train the model.
file_save: str
Where to save the .h5 model.
verbose: bool
Value that determines if the execution information should be displayed.
compile_params: dict
Dictionary that contains all the parameters (avaible depending on tensorflow/keras version) to use for the .compile() function.
fit_params: dict
Dictionary that contains all the parametes (avaible depending on tensorflow/keras version) to use for the .fit() function, except for epochs and verbose.
Returns
----------
AE.encoder: keras.model.
Keras object that correspond to the fitted encoder model.
Raises
------
ValueError
If input/output dimensions mismatch.
RuntimeError
If GPU device unavailable when requested.
Notes
-----
- Saves encoder/decoder as separate files
- Generates training loss plots in ./figures/
- Automatically creates directories if missing
"""
verbose = 1 if verbose else 0
if with_cpu:
with tf.device("/cpu:0"):
AE = AutoEncoders.AE_conv(input_dim=input_dim,latent_dim=latent_dim,arch=arch,in_channels=np.shape(data_pred)[-1],out_channels=np.shape(data_pred)[-1],VAE=use_VAE)
AE.compile(**compile_params)
history = AE.fit(data_pred, data_pred, epochs=n_epochs, min_delta=1e-6, patience=50, verbose=verbose, **fit_params)
else:
AE = AutoEncoders.AE_conv(input_dim=input_dim,latent_dim=latent_dim,arch=arch,in_channels=np.shape(data_pred)[-1],out_channels=np.shape(data_pred)[-1],VAE=use_VAE)
AE.compile(**compile_params)
history = AE.fit(data_pred, data_pred, epochs=n_epochs, min_delta=1e-6, patience=50, verbose=verbose, **fit_params)
Path("./figures").mkdir(parents=True, exist_ok=True)
plt.plot(history.history['loss'],label='loss')
plt.plot(history.history['val_loss'],label='val_loss')
plt.yscale('log')
plt.legend()
plt.savefig(f'./figures/history-{file_save[9:-3]}.png')
plt.savefig(f'./figures/history-{file_save[9:-3]}.pdf')
plt.close()
Path(file_save.split("/")[-2]).mkdir(parents=True, exist_ok=True)
AE.encoder.save(file_save)
AE.decoder.save(file_save[:-3]+"_dec_"+file_save[-3:])
return AE.encoder
[docs]def get_AE_stats(with_cpu: bool, use_VAE: bool, AE_pre = None, AE_ind = None, pre_indust_pred: Union[list, np.ndarray] = None, indust_pred: Union[list, np.ndarray] = None, data_of_interest_pred: Union[list, np.ndarray] = None, period : str = 'both') -> Union[np.ndarray, list]:
"""
get_AE_stats
Function used to obtain statistical information about the encoded data by the Autoencoder. It codifies train data based on the period and specific details of the architecture.
Parameters
----------
use_VAE, with_cpu: bool
Booleans values that determines if the model should be VAE, if the cpu should be used instead gpu or if the, respectively.
AE_pre, AE_ind: keras.model
Encoders keras.model for pre and post industrial period.
pre_indust_pred, indust_pred: list or np.ndarray
Driver/predictor data of pre and post industrial period.
data_of_interes_pred:list or np.ndarray
Driver/predictor data of interest.
period: str
Value that handle wich part of the data is used.
Returns
----------
: list or ndarray.
A ndarray containind the data.
Raises
------
ValueError
If invalid period specified.
If missing required models/data for period.
Notes
-----
- Computes absolute differences in latent space
- Concatenates results for multi-period analyses
- Flattens outputs to 1D array
"""
if with_cpu:
with tf.device("/cpu:0"):
if use_VAE:
if period == 'both':
pre_indust_pred_encoded = np.abs(AE_pre.predict(pre_indust_pred)-AE_pre.predict(data_of_interest_pred)).flatten()
indust_pred_encoded = np.abs(AE_pre.predict(indust_pred)-AE_pre.predict(data_of_interest_pred)).flatten()
pre_encoded = np.concatenate((pre_indust_pred_encoded, indust_pred_encoded), axis=0)
pre_indust_pred_encoded = np.abs(AE_ind.predict(pre_indust_pred)-AE_ind.predict(data_of_interest_pred)).flatten()
indust_pred_encoded = np.abs(AE_ind.predict(indust_pred)-AE_ind.predict(data_of_interest_pred)).flatten()
ind_encoded = np.concatenate((pre_indust_pred_encoded, indust_pred_encoded), axis=0)
elif period == 'pre':
pre_encoded = np.abs(AE_pre.predict(pre_indust_pred)-AE_pre.predict(data_of_interest_pred)).flatten()
else:
ind_encoded = np.abs(AE_ind.predict(indust_pred)-AE_ind.predict(data_of_interest_pred)).flatten()
else:
if period == 'both':
pre_indust_pred_encoded = np.abs(AE_pre.predict(pre_indust_pred)-AE_pre.predict(data_of_interest_pred)).flatten()
indust_pred_encoded = np.abs(AE_pre.predict(indust_pred)-AE_pre.predict(data_of_interest_pred)).flatten()
pre_encoded = np.concatenate((pre_indust_pred_encoded, indust_pred_encoded), axis=0)
pre_indust_pred_encoded = np.abs(AE_ind.predict(pre_indust_pred)-AE_ind.predict(data_of_interest_pred)).flatten()
indust_pred_encoded = np.abs(AE_ind.predict(indust_pred)-AE_ind.predict(data_of_interest_pred)).flatten()
ind_encoded = np.concatenate((pre_indust_pred_encoded, indust_pred_encoded), axis=0)
elif period == 'pre':
pre_encoded = np.abs(AE_pre.predict(pre_indust_pred)-AE_pre.predict(data_of_interest_pred)).flatten()
else:
ind_encoded = np.abs(AE_ind.predict(indust_pred)-AE_ind.predict(data_of_interest_pred)).flatten()
else:
if use_VAE:
if period == 'both':
pre_indust_pred_encoded = np.abs(AE_pre.predict(pre_indust_pred)-AE_pre.predict(data_of_interest_pred)).flatten()
indust_pred_encoded = np.abs(AE_pre.predict(indust_pred)-AE_pre.predict(data_of_interest_pred)).flatten()
pre_encoded = np.concatenate((pre_indust_pred_encoded, indust_pred_encoded), axis=0)
pre_indust_pred_encoded = np.abs(AE_ind.predict(pre_indust_pred)-AE_ind.predict(data_of_interest_pred)).flatten()
indust_pred_encoded = np.abs(AE_ind.predict(indust_pred)-AE_ind.predict(data_of_interest_pred)).flatten()
ind_encoded = np.concatenate((pre_indust_pred_encoded, indust_pred_encoded), axis=0)
elif period == 'pre':
pre_encoded = np.abs(AE_pre.predict(pre_indust_pred)-AE_pre.predict(data_of_interest_pred)).flatten()
else:
ind_encoded = np.abs(AE_ind.predict(indust_pred)-AE_ind.predict(data_of_interest_pred)).flatten()
else:
if period == 'both':
pre_indust_pred_encoded = np.abs(AE_pre.predict(pre_indust_pred)-AE_pre.predict(data_of_interest_pred)).flatten()
indust_pred_encoded = np.abs(AE_pre.predict(indust_pred)-AE_pre.predict(data_of_interest_pred)).flatten()
pre_encoded = np.concatenate((pre_indust_pred_encoded, indust_pred_encoded), axis=0)
pre_indust_pred_encoded = np.abs(AE_ind.predict(pre_indust_pred)-AE_ind.predict(data_of_interest_pred)).flatten()
indust_pred_encoded = np.abs(AE_ind.predict(indust_pred)-AE_ind.predict(data_of_interest_pred)).flatten()
ind_encoded = np.concatenate((pre_indust_pred_encoded, indust_pred_encoded), axis=0)
elif period == 'pre':
pre_encoded = np.abs(AE_pre.predict(pre_indust_pred)-AE_pre.predict(data_of_interest_pred)).flatten()
else:
ind_encoded = np.abs(AE_ind.predict(indust_pred)-AE_ind.predict(data_of_interest_pred)).flatten()
if period == 'both':
res = np.concatenate((pre_encoded, ind_encoded), axis=0)
elif period == 'pre':
res = pre_encoded
else:
res = ind_encoded
return res
[docs]def am(file_params_name: str, ident: bool, teleg: bool, save_recons: bool, teleg_file: str = '.secret.txt'):
"""
am
Main function that orchestrates the Analog Method (AM) workflow. It handles configuration loading, preprocessing,
analog search execution, and post-processing. Supports Telegram notifications and result saving.
Parameters
----------
file_params_name : str
Path to the JSON configuration file containing analysis parameters.
ident : bool
Flag to enable heatwave period identification before analysis.
teleg : bool
Flag to enable Telegram notifications for warnings and errors.
save_recons : bool
Flag to save reconstructed data as NetCDF files in the './data/' directory.
teleg_file : str, optional
Path to Telegram credentials file (default is '.secret.txt').
Raises
------
OSError
If the configuration file is not found.
ValueError
If invalid parameters are detected in the configuration.
Notes
-----
- Reads parameters from JSON configuration file
- Handles preprocessing of climate data
- Performs analog search using specified method
- Manages Telegram notifications if enabled
- Saves reconstructions if enabled
- Executes post-processing for result analysis
The function coordinates the entire AM workflow including optional heatwave identification,
data preprocessing, model training, analog search, and result post-processing.
Returns
----------
"""
# Teleg
token = None
chat_id = None
user_name = None
if teleg:
with open(teleg_file) as f:
token = f.readline().strip()
chat_id = f.readline().strip()
user_name = f.readline().strip()
f.close()
# Read params
try:
file_params = open(file_params_name)
except:
message = OSError(f'File {file_params_name} not found. To identify the Heat wave period a configuration parameters file is needed.')
if teleg:
url = f"https://api.telegram.org/bot{token}/sendMessage?chat_id={chat_id}&parse_mode=HTML&text={'[<b>'+type(message).__name__+'</b>] '+user_name+': '+str(message)}"
print(requests.get(url).json())
raise message
else:
params = json.load(file_params)
file_params.close()
# Perform preprocessing
params, img_size, data_pred, data_target, time_pre_indust_pred, time_indust_pred, data_of_interest_pred, data_of_interest_target, x_train_pre_pred, x_train_ind_pred, x_test_pre_pred, x_test_ind_pred, pre_indust_pred, pre_indust_target, indust_pred, indust_target = perform_preprocess(params)
# Call analogSearch
## Obtain stats
## Stats analog
if params["enhanced_distance"]:
if params["period"] == 'both':
stat_data = np.concatenate(((x_train_ind_pred-data_of_interest_pred).flatten(),(x_train_pre_pred-data_of_interest_pred).flatten()),axis=0)
elif params["period"] == 'pre':
stat_data = (x_train_pre_pred-data_of_interest_pred).flatten()
else:
stat_data = (x_train_ind_pred-data_of_interest_pred).flatten()
stat_mean = np.abs(stat_data).mean()
stat_std = np.abs(stat_data).std()
stat_max = np.abs(stat_data).max()
stat_min = np.abs(stat_data).min()
print(f'Mean analog: {stat_mean}')
print(f'Std analog: {stat_std}')
print(f'Max anlog: {stat_max}')
print(f'Min analog: {stat_min}')
print(f'len analog: {len(stat_data)}')
print(f'len th analog: {len(stat_data[np.abs(stat_data) < (stat_mean-0.3*stat_std)])}')
## Stats AE
if params["enhanced_distance"]:
if params["period"] == 'both':
encoded = get_AE_stats(params["with_cpu"], params["use_VAE"], AE_pre, AE_ind, x_train_pre_pred, x_train_ind_pred, data_of_interest_pred)
elif params["period"] == 'pre':
encoded = get_AE_stats(with_cpu=params["with_cpu"], use_VAE=params["use_VAE"], AE_pre=AE_pre, pre_indust_pred=x_train_pre_pred, data_of_interest_pred=data_of_interest_pred, period=params["period"])
else:
encoded = get_AE_stats(with_cpu=params["with_cpu"], use_VAE=params["use_VAE"], AE_ind=AE_ind, indust_pred=x_train_ind_pred, data_of_interest_pred=data_of_interest_pred, period=params["period"])
print(f'Mean: {encoded.mean()}')
print(f'Std: {encoded.std()}')
print(f'Max AE: {encoded.max()}')
print(f'Min AE: {encoded.min()}')
print(f'len AE: {len(encoded)}')
print(f'len th AE: {len(encoded[encoded < (encoded.mean() - 0.3*encoded.std())])}')
## This is the threshold of difference between driver/predictor maps and target
## to be acepted as low difference
## Only used for the local proximity of enhanced distance
threshold = 0
threshold_AE = 0
if params["enhanced_distance"]:
threshold = stat_mean-0.3*stat_std
threshold_AE = encoded.mean()-0.3*encoded.std()
Path("./comparison-csv").mkdir(parents=True, exist_ok=True)
if params["period"] in ['both', 'pre']:
file_time_name = f'./comparison-csv/analogues-pre-{params["season"]}{params["name"]}x{params["iter"]}-{params["data_of_interest_init"]}-epoch{params["n_epochs"]}-latent{params["latent_dim"]}-k{params["k"]}-arch{params["arch"]}-{"VAE" if params["use_VAE"] else "noVAE"}{current.year}-{current.month}-{current.day}-{current.hour}-{current.minute}-{current.second}.npy'.replace(" ","").replace("'", "").replace(",","")
analog_pre = analogSearch(params["p"], params["k"], pre_indust_pred, data_of_interest_pred, time_pre_indust_pred, pre_indust_target, params["enhanced_distance"], threshold=threshold, img_size=img_size, iter=params["iter"], replace_choice=params["replace_choice"], target_var_name=params["target_var_name"], file_time_name=file_time_name)
# Analog Post
if params["period"] in ['both', 'post']:
file_time_name = f'./comparison-csv/analogues-post-{params["season"]}{params["name"]}x{params["iter"]}-{params["data_of_interest_init"]}-epoch{params["n_epochs"]}-latent{params["latent_dim"]}-k{params["k"]}-arch{params["arch"]}-{"VAE" if params["use_VAE"] else "noVAE"}{current.year}-{current.month}-{current.day}-{current.hour}-{current.minute}-{current.second}.npy'.replace(" ","").replace("'", "").replace(",","")
analog_ind = analogSearch(params["p"], params["k"], indust_pred, data_of_interest_pred, time_indust_pred, indust_target, params["enhanced_distance"], threshold=threshold, img_size=img_size, iter=params["iter"], replace_choice=params["replace_choice"], target_var_name=params["target_var_name"], file_time_name=file_time_name)
if not ident:
post_process(file_params_name, is_atribution = period == 'both')
message = f'Post process of method {args.method} for {params["name"]} with arch {params["arch"]} and latent dim {params["latent_dim"]} has finished successfully'
if teleg:
url = f"https://api.telegram.org/bot{token}/sendMessage?chat_id={chat_id}&text={'[WARN]: '+message}"
requests.get(url).json()
warnings.warn(message)
return
[docs]def analogSearch(p:int, k: int, data_pred: Union[list, np.ndarray], data_of_interest_pred: Union[list, np.ndarray], time_pred: xr.DataArray, data_target: xr.Dataset, enhanced_distance:bool, threshold: Union[int, float], img_size: Union[list, np.ndarray], iter: int, threshold_offset_counter: int = 20, replace_choice: bool = True, target_var_name : str = 'air', file_time_name: str = 'analogues.npy') -> tuple:
"""
analogSearch
Funtion that performs the Analog Search Method for a given diver/predictor and target variable.
Parameters
----------
p: int
The p-order of Minskowski distance to perform.
k: int
Number of near neighbours to search.
data_pred: list or ndarray
Driver/predictor data where to search.
data_of_interes_pred: list or ndarray
Driver/predictor data to be searched.
time_pred: DataArray
Time DataArray corresponding to the driver/predictor data where is searching.
data_target: Dataset
Target Dataset Dataset used to check the target value.
enhanced_distance: bool
Flag that decides if local proximity has to be performed or no.
threshold: int or float
Threshold used in analogSearch to compute local proximity.
img_size: list or ndarray
List that determine the size of the driver/predictor and target images.
iter: int
How many random neighbours to select.
threshold_offset_counter: int
Number used to perform the local proximity. Default 20.
replace_choice: bool
Flag that indicates if iter selected can be replaced.
target_var_name: str
The name of the Target Dataset variable in case of working with different Dataset.
file_time_name: str
The name of the file where to save the found analogues.
Returns
----------
: tuple
A tuple containing selected driver/predictor and target.
Raises
------
ValueError
If k exceeds available analogs.
If p <= 0.
Notes
-----
- Uses scipy.spatial.minkowski_distance
- Saves selected analog times to numpy file
- Handles both encoded and raw predictor data
"""
is_not_encoded = (len(np.shape(data_pred)) == 4)
d_i_s = np.shape([data_of_interest_pred])
d_of_i = np.reshape([data_of_interest_pred], (d_i_s[0], int(np.size(data_of_interest_pred)/d_i_s[0])))
d_p_s = np.shape(data_pred)
d_p = np.reshape(data_pred, (d_p_s[0],int(np.size(data_pred)/d_p_s[0])))
dist = minkowski_distance(d_of_i, d_p, p=p)
if enhanced_distance:
data_diff = np.abs(data_pred-data_of_interest_pred)
d = np.zeros_like(data_diff)
d[data_diff <= threshold] = -1
ax_to_sum = np.arange(len(np.shape(data_diff)))[1:]
dist += np.sum(d, axis=tuple(ax_to_sum)) + threshold_offset_counter
minindex = dist.argsort()
time_prediction = time_pred[minindex[:k]]
prediction = data_target.sel(time=time_prediction[:k])[target_var_name].data
predf = data_pred[minindex[:k]]
idx = np.random.choice(np.arange(predf.shape[0]), size=iter, replace=replace_choice)
selected_target = prediction[idx,:,:]
#print(f'\nTime prediction: \n {np.shape(time_prediction)} \n {time_prediction}')
selected_time = time_prediction[idx]
np.save(file_time_name, selected_time)
if is_not_encoded:
selected_psr = predf[idx,:,:,:]
else:
selected_psr = predf[idx,:]
return selected_psr, selected_target, selected_time
[docs]def calculate_interest_region(interest_region: Union[list, np.ndarray], dims_list: int, resolution: Union[int, float, str] = 2, is_teleg: bool = False, secret_file:str = './secret.txt') -> list:
"""
calculate_interest_region
Method which transform latitude/longitude degrees to index. It is used to increase the speed of the methods by using numpy arrays insted of Dataset or DataArray.
Parameters
----------
interest_region: list or ndarray
List which contains the latitude and longitude degrees to be converted as index.
dims_list: list of int
List that contain, in this order, the minimum latitude, maximum latitude, minumin longitude, maximum longitude. When resolution is 'auto', dims_list should be a tuple with (latitude, longitude).
resolution: int, float or str
Degrees resolution employed. Default value is 2º. If resolution is 'auto' it will infer automatically the resolution (useful when resolution is not constant along the dimensions)
is_teleg: bool
Flag that indicate if the warnings have to be sent to Telegram or not.
secret_file: str
Auxiliar variable only needed if is_teleg True to read token and chat_id values.
Returns
----------
new_interest_region: list
A list that contains the equivalent index values, as [lat_start_idx, lat_end_idx, lon_start_idx, lon_end_idx]
Raises
------
ValueError
If resolution='auto' without coordinate data.
If interest_region outside domain bounds.
Notes
-----
- Handles both 0-360 and -180-180 longitude systems
- Automatically adjusts out-of-bounds regions
- Uses np.isclose for coordinate matching with 'auto' resolution
"""
token = None
chat_id = None
if is_teleg:
with open(secret_file) as f:
token = f.readline().strip()
chat_id = f.readline().strip()
user_name = f.readline().strip()
f.close()
if resolution=='auto':
new_interest_region = np.zeros(4)
lat, lon = dims_list
new_interest_region[0] = np.where(np.isclose(interest_region[0], lat))[0][0]
new_interest_region[1] = np.where(np.isclose(interest_region[1], lat))[0][0]
new_interest_region[2] = np.where(np.isclose(interest_region[2], lon))[0][0]
new_interest_region[3] = np.where(np.isclose(interest_region[3], lon))[0][0] + 1
new_interest_region = new_interest_region.astype(int).tolist()
else:
new_interest_region = []
for idx, elem in enumerate(interest_region):
if idx%2==0:
if elem < dims_list[idx]:
message = 'Your interest region is out of lat or lon minimum limits. I correct it, but be aware'
if is_teleg:
url = f"https://api.telegram.org/bot{token}/sendMessage?chat_id={chat_id}&text={'[WARN]: '+message}"
requests.get(url).json()
warnings.warn(message, stacklevel=2)
new_elem = int(max((elem - dims_list[idx]) // resolution, 0))
new_interest_region.append(new_elem)
else:
if elem > dims_list[idx]:
message = 'Your interest region is out of lat or lon maximum limits. I correct it, but be aware'
if is_teleg:
url = f"https://api.telegram.org/bot{token}/sendMessage?chat_id={chat_id}&text={'[WARN]: '+message}"
requests.get(url).json()
warnings.warn(message, stacklevel=2)
new_elem = int(min((elem - dims_list[idx-1]) // resolution , (dims_list[idx] - dims_list[idx-1]) // resolution) + 1)
new_interest_region.append(new_elem)
return new_interest_region
[docs]def save_reconstruction(params: dict, reconstructions_Pre_Analog: list, reconstructions_Post_Analog: list, reconstructions_Pre_AE: list, reconstructions_Post_AE: list):
"""
save_reconstruction
Method that save the target reconstruction based on the runs maded. It do not return anything, only save the Xarray Datasets on the corresponding file on data folder. Each file have the format [name]-[period]-[method]-[time].nc.
Parameters
----------
params: dict
A dictionary which contains all the needed parameters and configuration. Mainly loaded from the configuration file, with some auxiliar parameters added by other functions.
reconstruction_Pre_Analog: list
A list with the multiple reconstructed pre-industrial data by the Analog Method, for each day (or week).
reconstruction_Post_Analog: list
A list with the multiple reconstructed post-industrial data by the Analog Method, for each day (or week).
reconstruction_Pre_AE: list
A list with the multiple reconstructed pre-industrial data by the AutoEncoder, for each day (or week).
reconstruction_Post_AE: list
A list with the multiple reconstructed post-industrial data by the AutoEncoder, for each day (or week).
Returns
----------
Raises
------
IOError
If NetCDF writing fails.
Notes
-----
- Saves files to ./data/ directory
- Uses xarray for NetCDF export
- Filenames include execution timestamp
- Averages multiple reconstructions
"""
current = datetime.datetime.now()
int_reg = params["interest_region"]
resolution = params["resolution"]
Path("./data").mkdir(parents=True, exist_ok=True)
if params["save_recons"]:
if params["period"] in ["both", "pre"]:
Path("./data").mkdir(parents=True, exist_ok=True)
reconstruction_Pre_Analog = np.mean(reconstructions_Pre_Analog, axis=0)
print('Size Recons Pre Analog: ', np.size(reconstruction_Pre_Analog))
xr_Pre_Analog = xr.Dataset(data_vars=dict(y=(["reconstruction", "latitude", "longitude"], reconstruction_Pre_Analog)),
coords=dict(reconstruction = np.arange(params["iter"]),
latitude = np.arange(int_reg[0], int_reg[1]+resolution, resolution),
longitude = np.arange(int_reg[2], int_reg[3]+resolution, resolution)
))
xr_Pre_Analog.to_netcdf(f'./data/reconstruction-{params["season"]}{params["name"]}x{params["iter"]}-Pre-AM-{current.year}-{current.month}-{current.day}-{current.hour}-{current.minute}-{current.second}.nc'.replace(" ","").replace("'", "").replace(",",""))
if params["period"] in ["both", "post"]:
Path("./data").mkdir(parents=True, exist_ok=True)
reconstruction_Post_AE = np.mean(reconstructions_Post_AE, axis=0)
print('Size Recons Post AE: ', np.size(reconstruction_Post_AE))
xr_Post_AE = xr.Dataset(data_vars=dict(y=(["reconstruction", "latitude", "longitude"], reconstruction_Post_AE)),
coords=dict(reconstruction = np.arange(params["iter"]),
latitude = np.arange(int_reg[0], int_reg[1]+resolution, resolution),
longitude = np.arange(int_reg[2], int_reg[3]+resolution, resolution)
))
xr_Post_AE.to_netcdf(f'./data/reconstruction-{params["season"]}{params["name"]}x{params["iter"]}-Post-AE-AM-{current.year}-{current.month}-{current.day}-{current.hour}-{current.minute}-{current.second}.nc'.replace(" ","").replace("'", "").replace(",",""))
reconstruction_Post_Analog = np.mean(reconstructions_Post_Analog, axis=0)
xr_Post_Analog = xr.Dataset(data_vars=dict(y=(["reconstruction", "latitude", "longitude"], reconstruction_Post_Analog)),
coords=dict(reconstruction = np.arange(params["iter"]),
latitude = np.arange(int_reg[0], int_reg[1]+resolution, resolution),
longitude = np.arange(int_reg[2], int_reg[3]+resolution, resolution)
))
xr_Post_Analog.to_netcdf(f'./data/reconstruction-{params["season"]}{params["name"]}x{params["iter"]}-Post-AM-{current.year}-{current.month}-{current.day}-{current.hour}-{current.minute}-{current.second}.nc'.replace(" ","").replace("'", "").replace(",",""))
[docs]def runComparison(params: dict)-> tuple:
"""
runComparison
Method that perform the preprocessing, use of the others previous methods, and comparison between analogSearch and AE + analogSearch.
Parameters
----------
params: dict
A dictionary with needed parameters and configuration. Mainly loaded from the configuration file, with some auxiliar parameters added by other functions.
Returns
----------
: tuple
A tuple of 4 elemets, each containing the corresponding reconstructions list data.
Raises
------
RuntimeError
If model loading fails.
Notes
-----
- Generates comparison CSV files
- Produces KDE plots of results
- Handles both CPU/GPU execution
- Supports multiple latent dimensions
"""
# Set teleg
is_teleg = False
token = None
chat_id = None
if "teleg" in params:
if params["teleg"]:
is_teleg = True
with open(params["secret_file"]) as f:
token = f.readline().strip()
chat_id = f.readline().strip()
user_name = f.readline().strip()
f.close()
params, img_size, data_pred, data_target, time_pre_indust_pred, time_indust_pred, data_of_interest_pred, data_of_interest_target, x_train_pre_pred, x_train_ind_pred, x_test_pre_pred, x_test_ind_pred, pre_indust_pred, pre_indust_target, indust_pred, indust_target = perform_preprocess(params)
if params["verbose"]:
print('Reshape')
print(np.shape(data_of_interest_pred))
print(np.shape(data_of_interest_target[params["interest_var_name"]].data))
if params["period"] in ['both', 'pre']:
print(np.shape(pre_indust_pred))
print(np.shape(pre_indust_target[params["target_var_name"]].data))
print(np.shape(indust_pred))
print(np.shape(indust_target[params["target_var_name"]].data))
# AutoEncoder
if not "compile_params" in params.keys():
params["compile_params"] = {}
if not "fit_params" in params.keys():
params["fit_params"] = {}
if params["load_AE"]:
if params["period"] in ['both', 'pre']:
AE_pre = keras.models.load_model(params["file_AE_pre"], custom_objects={'keras': keras,'AutoEncoders': AutoEncoders})
if params["period"] in ['both', 'post']:
AE_ind = keras.models.load_model(params["file_AE_post"], custom_objects={'keras': keras,'AutoEncoders': AutoEncoders})
if params["verbose"]:
print('AE loaded')
elif params["load_AE_pre"]:
if params["verbose"]:
print('Start fitting post')
if "kl_factor" in params.keys():
input_dim = [data_pred.dims.get('latitude'),data_pred.dims.get('longitude'),params["kl_factor"]]
elif "cvae_params" in params.keys():
input_dim = [data_pred.dims.get('latitude'),data_pred.dims.get('longitude')] + params["cvae_params"]
else:
input_dim = [data_pred.dims.get('latitude'),data_pred.dims.get('longitude')]
if params["period"] in ['both', 'pre']:
AE_pre = keras.models.load_model(params["file_AE_pre"], custom_objects={'keras': keras,'AutoEncoders': AutoEncoders})
if params["period"] in ['both', 'post']:
AE_ind = runAE(input_dim, params["latent_dim"], params["arch"], params["use_VAE"], params["with_cpu"], params["n_epochs"], x_train_ind_pred, params["file_AE_post"], params["verbose"], params["compile_params"], params["fit_params"])
if params["verbose"]:
print('Fitting finished for post & AE loaded for pre')
else:
if params["verbose"]:
print('Start fitting')
if "kl_factor" in params.keys():
input_dim = [data_pred.dims.get('latitude'),data_pred.dims.get('longitude'),params["kl_factor"]]
elif "cvae_params" in params.keys():
input_dim = [data_pred.dims.get('latitude'),data_pred.dims.get('longitude')] + params["cvae_params"]
else:
input_dim = [data_pred.dims.get('latitude'),data_pred.dims.get('longitude')]
if params["period"] in ['both', 'pre']:
AE_pre = runAE(input_dim, params["latent_dim"], params["arch"], params["use_VAE"], params["with_cpu"], params["n_epochs"], x_train_pre_pred, params["file_AE_pre"], params["verbose"], params["compile_params"], params["fit_params"])
if params["period"] in ['both', 'post']:
AE_ind = runAE(input_dim, params["latent_dim"], params["arch"], params["use_VAE"], params["with_cpu"], params["n_epochs"], x_train_ind_pred, params["file_AE_post"], params["verbose"], params["compile_params"], params["fit_params"])
if params["verbose"]:
print('Fitting finished')
# Analog comparison
## Stats analog
if params["enhanced_distance"]:
if params["period"] == 'both':
stat_data = np.concatenate(((x_train_ind_pred-data_of_interest_pred).flatten(),(x_train_pre_pred-data_of_interest_pred).flatten()),axis=0)
elif params["period"] == 'pre':
stat_data = (x_train_pre_pred-data_of_interest_pred).flatten()
else:
stat_data = (x_train_ind_pred-data_of_interest_pred).flatten()
stat_mean = np.abs(stat_data).mean()
stat_std = np.abs(stat_data).std()
stat_max = np.abs(stat_data).max()
stat_min = np.abs(stat_data).min()
print(f'Mean analog: {stat_mean}')
print(f'Std analog: {stat_std}')
print(f'Max anlog: {stat_max}')
print(f'Min analog: {stat_min}')
print(f'len analog: {len(stat_data)}')
print(f'len th analog: {len(stat_data[np.abs(stat_data) < (stat_mean-0.3*stat_std)])}')
## Stats AE
if params["enhanced_distance"]:
if params["period"] == 'both':
encoded = get_AE_stats(params["with_cpu"], params["use_VAE"], AE_pre, AE_ind, x_train_pre_pred, x_train_ind_pred, data_of_interest_pred)
elif params["period"] == 'pre':
encoded = get_AE_stats(with_cpu=params["with_cpu"], use_VAE=params["use_VAE"], AE_pre=AE_pre, pre_indust_pred=x_train_pre_pred, data_of_interest_pred=data_of_interest_pred, period=params["period"])
else:
encoded = get_AE_stats(with_cpu=params["with_cpu"], use_VAE=params["use_VAE"], AE_ind=AE_ind, indust_pred=x_train_ind_pred, data_of_interest_pred=data_of_interest_pred, period=params["period"])
print(f'Mean: {encoded.mean()}')
print(f'Std: {encoded.std()}')
print(f'Max AE: {encoded.max()}')
print(f'Min AE: {encoded.min()}')
print(f'len AE: {len(encoded)}')
print(f'len th AE: {len(encoded[encoded < (encoded.mean() - 0.3*encoded.std())])}')
## Time and interest region
current = datetime.datetime.now()
int_reg = params["interest_region"]
if params["interest_region_type"] == "coord":
if params["resolution"] == 'auto':
int_reg = calculate_interest_region(params["interest_region"], (data_target.latitude.data, data_target.longitude.data), params["resolution"], is_teleg, params["secret_file"])
else:
int_reg = calculate_interest_region(params["interest_region"], [params["latitude_min"], params["latitude_max"], params["longitude_min"], params["longitude_max"]], params["resolution"], is_teleg, params["secret_file"])
## This is the threshold of difference between driver/predictor maps and target
## to be acepted as low difference
## Only used for the local proximity of enhanced distance
threshold = 0
threshold_AE = 0
if params["enhanced_distance"]:
threshold = stat_mean-0.3*stat_std
threshold_AE = encoded.mean()-0.3*encoded.std()
## Encode
if params["with_cpu"]:
with tf.device("/cpu:0"):
if params["period"] in ['both', 'pre']:
pre_indust_pred_encoded = AE_pre.predict(pre_indust_pred)
data_of_interest_pred_encoded_pre = AE_pre.predict(data_of_interest_pred)
if params["period"] in ['both', 'post']:
indust_pred_encoded = AE_ind.predict(indust_pred)
data_of_interest_pred_encoded_ind = AE_ind.predict(data_of_interest_pred)
else:
if params["period"] in ['both', 'pre']:
pre_indust_pred_encoded = AE_pre.predict(pre_indust_pred)
data_of_interest_pred_encoded_pre = AE_pre.predict(data_of_interest_pred)
if params["period"] in ['both', 'post']:
indust_pred_encoded = AE_ind.predict(indust_pred)
data_of_interest_pred_encoded_ind = AE_ind.predict(data_of_interest_pred)
if params["verbose"]:
print('Starting iterations')
ax = plt.subplot(1, 1, 1)
if params["period"] in ['both', 'post']:
plt.imshow(data_of_interest_pred_encoded_ind.reshape(square_dims(params["latent_dim"])))
else:
plt.imshow(data_of_interest_pred_encoded_pre.reshape(square_dims(params["latent_dim"])))
plt.colorbar()
ax.get_xaxis().set_visible(False)
ax.get_yaxis().set_visible(False)
plt.title('Encoded')
plt.savefig(f'./encoded_latent{params["latent_dim"]}.png')
plt.savefig(f'./encoded_latent{params["latent_dim"]}.pdf')
plt.close()
Path("./comparison-csv").mkdir(parents=True, exist_ok=True)
# Analog Pre
if params["period"] in ['both', 'pre']:
file_time_name = f'./comparison-csv/analogues-am-pre-{params["season"]}{params["name"]}x{params["iter"]}-{params["data_of_interest_init"]}-epoch{params["n_epochs"]}-latent{params["latent_dim"]}-k{params["k"]}-arch{params["arch"]}-{"VAE" if params["use_VAE"] else "noVAE"}{current.year}-{current.month}-{current.day}-{current.hour}-{current.minute}-{current.second}.npy'.replace(" ","").replace("'", "").replace(",","")
analog_pre = analogSearch(params["p"], params["k"], pre_indust_pred, data_of_interest_pred, time_pre_indust_pred, pre_indust_target, params["enhanced_distance"], threshold=threshold, img_size=img_size, iter=params["iter"], replace_choice=params["replace_choice"], target_var_name=params["target_var_name"], file_time_name=file_time_name)
# Analog Post
if params["period"] in ['both', 'post']:
file_time_name = f'./comparison-csv/analogues-am-post-{params["season"]}{params["name"]}x{params["iter"]}-{params["data_of_interest_init"]}-epoch{params["n_epochs"]}-latent{params["latent_dim"]}-k{params["k"]}-arch{params["arch"]}-{"VAE" if params["use_VAE"] else "noVAE"}{current.year}-{current.month}-{current.day}-{current.hour}-{current.minute}-{current.second}.npy'.replace(" ","").replace("'", "").replace(",","")
analog_ind = analogSearch(params["p"], params["k"], indust_pred, data_of_interest_pred, time_indust_pred, indust_target, params["enhanced_distance"], threshold=threshold, img_size=img_size, iter=params["iter"], replace_choice=params["replace_choice"], target_var_name=params["target_var_name"], file_time_name=file_time_name)
# AE Pre
if params["period"] in ['both', 'pre']:
file_time_name = f'./comparison-csv/analogues-ae-am-pre-{params["season"]}{params["name"]}x{params["iter"]}-{params["data_of_interest_init"]}-epoch{params["n_epochs"]}-latent{params["latent_dim"]}-k{params["k"]}-arch{params["arch"]}-{"VAE" if params["use_VAE"] else "noVAE"}{current.year}-{current.month}-{current.day}-{current.hour}-{current.minute}-{current.second}.npy'.replace(" ","").replace("'", "").replace(",","")
latent_analog_pre = analogSearch(params["p"], params["k"], pre_indust_pred_encoded, data_of_interest_pred_encoded_pre, time_pre_indust_pred, pre_indust_target, params["enhanced_distance"], threshold=threshold_AE, img_size=img_size, iter=params["iter"], replace_choice=params["replace_choice"], target_var_name=params["target_var_name"], file_time_name=file_time_name)
# AE Post
if params["period"] in ['both', 'post']:
file_time_name = f'./comparison-csv/analogues-ae-am-post-{params["season"]}{params["name"]}x{params["iter"]}-{params["data_of_interest_init"]}-epoch{params["n_epochs"]}-latent{params["latent_dim"]}-k{params["k"]}-arch{params["arch"]}-{"VAE" if params["use_VAE"] else "noVAE"}{current.year}-{current.month}-{current.day}-{current.hour}-{current.minute}-{current.second}.npy'.replace(" ","").replace("'", "").replace(",","")
latent_analog_ind = analogSearch(params["p"], params["k"], indust_pred_encoded, data_of_interest_pred_encoded_ind, time_indust_pred, indust_target, params["enhanced_distance"], threshold=threshold_AE, img_size=img_size, iter=params["iter"], replace_choice=params["replace_choice"], target_var_name=params["target_var_name"], file_time_name=file_time_name)
dict_stats = {}
reconstruction_Pre_Analog = []
reconstruction_Pre_AE = []
reconstruction_Post_Analog = []
reconstruction_Post_AE = []
for i in range(params["iter"]):
if params["period"] in ['both', 'pre']:
dict_stats[f'WithoutAE-Pre{i}'] = [np.nansum(np.abs(data_of_interest_pred - analog_pre[0][i]))/img_size,
np.nanmean(np.abs(data_of_interest_target[params["interest_var_name"]].data - analog_pre[1][i])[:,int_reg[0]:int_reg[1],int_reg[2]:int_reg[3]]),
np.nanmean(analog_pre[1][i][int_reg[0]:int_reg[1],int_reg[2]:int_reg[3]]),
str(analog_pre[2][i].data)[:10]]
reconstruction_Pre_Analog.append(analog_pre[1][i][int_reg[0]:int_reg[1],int_reg[2]:int_reg[3]])
else:
dict_stats[f'WithoutAE-Pre{i}'] = [np.nan, np.nan, np.nan]
if params["period"] in ['both', 'post']:
dict_stats[f'WithoutAE-Post{i}'] = [np.nansum(np.abs(data_of_interest_pred - analog_ind[0][i]))/img_size,
np.nanmean(np.abs(data_of_interest_target[params["interest_var_name"]].data - analog_ind[1][i])[:,int_reg[0]:int_reg[1],int_reg[2]:int_reg[3]]),
np.nanmean(analog_ind[1][i][int_reg[0]:int_reg[1],int_reg[2]:int_reg[3]]),
str(analog_ind[2][i].data)[:10]]
reconstruction_Post_Analog.append(analog_ind[1][i][int_reg[0]:int_reg[1],int_reg[2]:int_reg[3]])
else:
dict_stats[f'WithoutAE-Post{i}'] = [np.nan, np.nan, np.nan]
if params["period"] in ['both', 'pre']:
dict_stats[f'WithAE-Pre-Pre{i}'] = [np.nansum(np.abs(data_of_interest_pred_encoded_pre - latent_analog_pre[0][i]))/img_size,
np.nanmean(np.abs(data_of_interest_target[params["interest_var_name"]].data - latent_analog_pre[1][i])[:,int_reg[0]:int_reg[1],int_reg[2]:int_reg[3]]),
np.nanmean(latent_analog_pre[1][i][int_reg[0]:int_reg[1],int_reg[2]:int_reg[3]]),
str(latent_analog_pre[2][i].data)[:10]]
reconstruction_Pre_AE.append(latent_analog_pre[1][i][int_reg[0]:int_reg[1],int_reg[2]:int_reg[3]])
else:
dict_stats[f'WithAE-Pre-Pre{i}'] = [np.nan, np.nan, np.nan]
if params["period"] in ['both', 'post']:
dict_stats[f'WithAE-Post-Post{i}'] = [np.nansum(np.abs(data_of_interest_pred_encoded_ind - latent_analog_ind[0][i]))/img_size,
np.nanmean(np.abs(data_of_interest_target[params["interest_var_name"]].data - latent_analog_ind[1][i])[:,int_reg[0]:int_reg[1],int_reg[2]:int_reg[3]]),
np.nanmean(latent_analog_ind[1][i][int_reg[0]:int_reg[1],int_reg[2]:int_reg[3]]),
str(latent_analog_ind[2][i].data)[:10]]
reconstruction_Post_AE.append(latent_analog_ind[1][i][int_reg[0]:int_reg[1],int_reg[2]:int_reg[3]])
else:
dict_stats[f'WithAE-Post-Post{i}'] = [np.nan, np.nan, np.nan]
dict_stats[f'Original{i}']=[0,0,np.nanmean(((data_of_interest_target[params["interest_var_name"]].data)[:,int_reg[0]:int_reg[1],int_reg[2]:int_reg[3]]))]
if params["verbose"]:
print(f'Iteration {i} finished')
df_stats = pd.DataFrame.from_dict(dict_stats,orient='index',columns=['pred-diff','target-diff','target','time'])
Path("./comparison-csv").mkdir(parents=True, exist_ok=True)
df_stats.to_csv(f'./comparison-csv/{params["season"]}{params["name"]}x{params["iter"]}-{params["data_of_interest_init"]}-epoch{params["n_epochs"]}-latent{params["latent_dim"]}-k{params["k"]}-arch{params["arch"]}-{"VAE" if params["use_VAE"] else "noVAE"}-analog-comparision-stats{current.year}-{current.month}-{current.day}-{current.hour}-{current.minute}-{current.second}.csv'.replace(" ","").replace("'", "").replace(",",""))
return reconstruction_Pre_Analog, reconstruction_Post_Analog, reconstruction_Pre_AE, reconstruction_Post_AE
[docs]def identify_heatwave_days(params: dict) -> Union[list, np.ndarray]:
"""
identify_heatwave_days
Method that perform the identifitacion of the heat wave period, following the definition from http://doi.org/10.1088/1748-9326/10/12/124003.
Parameters
----------
params: dict
A dictionary with needed parameters and configuration. Mainly loaded from the configuration file, with some auxiliar parameters added by other functions.
Returns
----------
heatwave_period: list or ndarray
A list of datetime that contains the heat wave period.
Raises
------
ValueError
If percentile out of [0,100] range.
Notes
-----
- Uses 90th percentile by default
- Follows Russo et al. (2015) methodology
- Generates validation plots in ./figures/
- Handles both single and multi-day events
"""
# Load data
if not "ident_dataset" in params:
params["ident_dataset"] = 'data/data_dailyMax_t2m_1940-2022.nc'
if not "ident_var_name" in params:
params["ident_var_name"] = 't2m_dailyMax'
data_target=xr.load_dataset(params["ident_dataset"], engine='netcdf4') #Target Datasets
if params["verbose"]:
print('Data loaded')
# Preprocess
## Change longitude coordinates from 0 - 359 to -180 - 179
if data_target.longitude.min() == 0:
data_target = data_target.assign_coords(longitude=(((data_target.longitude + 180) % 360) - 180))
data_target = data_target.sortby(data_target.longitude)
data_target = data_target.sortby(data_target.latitude)
if params["verbose"]:
print('Coord changed')
## Load Target Dataset
data_target = data_target.sel(latitude=slice(params["latitude_min"],params["latitude_max"]),longitude=slice(params["longitude_min"],params["longitude_max"]))
time_x = data_target.sel(time=slice(params["data_of_interest_init"],params["data_of_interest_end"])).get_index('time')
## Extract interest data
idx_interest = params["interest_region"]
if params["interest_region_type"] == "coord":
if params["resolution"] == 'auto':
idx_interest = calculate_interest_region(params["interest_region"], (data_target.latitude.data, data_target.longitude.data), params["resolution"], params["teleg"], params["secret_file"])
else:
idx_interest = calculate_interest_region(params["interest_region"], [params["latitude_min"], params["latitude_max"], params["longitude_min"], params["longitude_max"]], params["resolution"], params["teleg"], params["secret_file"])
data_target = data_target.isel(latitude=slice(idx_interest[0], idx_interest[1]),longitude=slice(idx_interest[2], idx_interest[3]))
data_target = data_target.mean(dim=['latitude', 'longitude'])
## Percentil
which_percentile = 90
if "percentile" in params.keys():
which_percentile = params["percentile"]
percentile_threshold = functions.hw_pctl(data_target[params["ident_var_name"]], ['1981', '2010'], which_percentile)
data_target = data_target.sel(time=slice(params["data_of_interest_init"], params["data_of_interest_end"]))
percentile_threshold = percentile_threshold.sel(dayofyear=slice(time_x[0].day_of_year, time_x[-1].day_of_year))
surpass_threshold = functions.isHW_in_ds(data_target, percentile_threshold, params["ident_var_name"]).isHW.data
amount_surpass_threshold = ((surpass_threshold).astype(np.int8)).sum()
data_target_array = data_target[params["ident_var_name"]].data
percentile_threshold_array = percentile_threshold.pctl_th.data
plt.figure()
plt.plot(time_x, (data_target_array - 273.15), marker='o', label='Days studied')
plt.xticks(rotation=45)
plt.plot(time_x, (percentile_threshold_array - 273.15), color='r', label=f'p{which_percentile} threshold')
if params["verbose"]:
print(f'Threshold: {percentile_threshold_array - 273.15}')
print(f'Amount of days that surpass the threshold: {amount_surpass_threshold}' )
if amount_surpass_threshold == len(data_target_array):
heatwave_period = time_x#.astype('datetime64[D]')
elif amount_surpass_threshold == 0:
heatwave_period = []
else:
grouped_surpass = np.array([(val , sum(1 for i in val_to_count)) for val, val_to_count in groupby(surpass_threshold)])
mask = grouped_surpass[:,0]==1
idx = np.arange(len(grouped_surpass))[mask][np.argmax(grouped_surpass[mask], axis=0)[1]]
cumsum = np.cumsum(grouped_surpass, axis=0)
heatwave_period = time_x[(0 if idx == 0 else cumsum[idx-1][1]):cumsum[min(np.shape(cumsum)[0],idx)][1]]#.astype('datetime64[D]')
Path("./figures").mkdir(parents=True, exist_ok=True)
if len(heatwave_period) > 0:
plt.title(f'Heatwave: {heatwave_period[0].date()} - {heatwave_period[-1].date()}')
plt.axvspan(heatwave_period[0], heatwave_period[-1], label='Heatwave period', color='crimson', alpha=0.3)
plt.ylabel('Target Dataset (ºC)')
plt.legend()
plt.tight_layout()
plt.savefig(f'./figures/percentile{which_percentile}-for-{params["season"]}{params["name"][:-1]}.png')
plt.savefig(f'./figures/percentile{which_percentile}-for-{params["season"]}{params["name"][:-1]}.pdf')
if params["verbose"]:
print(f'Heat wave period is {heatwave_period}')
plt.show()
plt.close()
return heatwave_period
def _get_post_information(params: dict):
"""
_get_post_information
Auxiliar function used to obtain target var long_name + units, and if is Mv or not.
Parameters
----------
params : dict
Dictionary with the parameters.
Returns
-------
tuple
A tuple of Strings. First one contains "Mv" or "" if the pred dataset is multivariate
or not. Second one contains long_name and units of target variable.
"""
data_pred = xr.open_dataset(params["pred_dataset"])
is_Mv = "Mv" if len(data_pred.data_vars) > 1 else ""
data_target = xr.open_dataset(params["target_dataset"])
var_name = list(data_target.data_vars)[0]
target_var = rf'{data_target[var_name].attrs["long_name"]} {data_target[var_name].attrs["units"]}'
return is_Mv, target_var
[docs]def post_process(params_file: str, save_stats: bool = True, is_atribution: bool = False, compare_to_am: bool = True, target_stat: str = "max"):
"""
post_process
Function to perform the post-process after the execution of the main code. This method will save a comparative figure and a
statistical summary of the resultss.
Parameters
----------
params_file : str
Path to the parameters file.
save_stats : bool, optional
If the statistical summary needs to be saved or not, by default True. If False, stats are printed but not saved.
is_atribution : bool, optional
Flag in case you are performing Atribution and want to get a comparison between Pre/Post period results, by default False
compare_to_am : bool, optional
Falg in case you performed also the Classical AM and want compare between AE-AM and AM, by default True
target_stat : str, optional
How to obtain the target value, by mean, max, min, etc., by default "max".
Returns
----------
Raises
------
FileNotFoundError
If result files missing.
Notes
-----
- Generates KDE comparison plots
- Produces multi-method statistical summaries
- Handles both attribution and detection modes
- Supports parallel execution results
"""
# Read params
file_params = open(params_file)
params = json.load(file_params)
file_params.close()
# Datasets information
is_Mv, target_var = _get_post_information(params)
current = datetime.datetime.now()
# Perform post-process
Path("./comparison-csv").mkdir(parents=True, exist_ok=True)
Path("./figures").mkdir(parents=True, exist_ok=True)
path = f'./comparison-csv/*{params["name"]}*{params["latent_dim"]}*arch{params["arch"]}*.csv'
files_interest = glob.glob(path)
files_interest = sorted(files_interest)
list_interest = [pd.read_csv(df) for df in files_interest]
## Obtain stats
tar_fun = getattr(np, target_stat)
target = tar_fun([elem.get('target')[4::5] if 'target' in elem.columns else elem.get('temp')[4::5] for elem in list_interest])
AE_Pre = np.array([elem.get('target')[2::5] if 'target' in elem.columns else elem.get('temp')[2::5] for elem in list_interest]).flatten()
AE_Pre_preddiff = np.array([elem.get('pred-diff')[2::5] for elem in list_interest]).flatten()
AE_Ind = np.array([elem.get('target')[3::5] if 'target' in elem.columns else elem.get('temp')[3::5] for elem in list_interest]).flatten()
AE_Ind_preddiff = np.array([elem.get('pred-diff')[3::5] for elem in list_interest]).flatten()
if compare_to_am:
analog_Pre = np.array([elem.get('target')[0::5] if 'target' in elem.columns else elem.get('temp')[0::5] for elem in list_interest]).flatten()
analog_Pre_preddiff = np.array([elem.get('pred-diff')[0::5] for elem in list_interest]).flatten()
analog_Ind = np.array([elem.get('target')[1::5] if 'target' in elem.columns else elem.get('temp')[1::5] for elem in list_interest]).flatten()
analog_Ind_preddiff = np.array([elem.get('pred-diff')[1::5] for elem in list_interest]).flatten()
## Reduce dim
is_execs = np.any(["exec" in file_name for file_name in files_interest])
reduce_dim = params["iter"]
if is_execs:
if "n_execs" in params.keys():
reduce_dim = params["n_execs"] * reduce_dim
else:
reduce_dim = 5 * reduce_dim
AE_Pre = np.reshape(AE_Pre, (int(len(AE_Pre)/reduce_dim), reduce_dim))
AE_Pre = np.mean(AE_Pre, axis=0)
AE_Ind = np.reshape(AE_Ind, (int(len(AE_Ind)/reduce_dim), reduce_dim))
AE_Ind = np.mean(AE_Ind, axis=0)
if compare_to_am:
analog_Pre = np.reshape(analog_Pre, (int(len(analog_Pre)/reduce_dim), reduce_dim))
analog_Pre = np.mean(analog_Pre, axis=0)
analog_Ind = np.reshape(analog_Ind, (int(len(analog_Ind)/reduce_dim), reduce_dim))
analog_Ind = np.mean(analog_Ind, axis=0)
## Make plot
if is_atribution:
if compare_to_am:
matrix_comp = np.array([analog_Pre, analog_Ind, AE_Pre, AE_Ind])
df_comp = pd.DataFrame(matrix_comp.T, columns=[f'{is_Mv}AM in Pre', f'{is_Mv}AM in Post', f'{is_Mv}AE-AM in Pre', f'{is_Mv}AE-AM in Post'])
else:
matrix_comp = np.array([AE_Pre, AE_Ind])
df_comp = pd.DataFrame(matrix_comp.T, columns=[f'{is_Mv}AE-AM in Pre', f'{is_Mv}AE-AM in Post'])
else:
if compare_to_am:
matrix_comp = np.array([analog_Ind, AE_Ind])
df_comp = pd.DataFrame(matrix_comp.T, columns=[f'{is_Mv}AM', f'{is_Mv}AE-AM'])
else:
matrix_comp = np.array([AE_Ind])
df_comp = pd.DataFrame(matrix_comp.T, columns=[f'{is_Mv}AE-AM'])
df_comp_melted = df_comp.melt()
if is_atribution:
a = sns.displot(df_comp_melted, y = 'value', hue = 'variable', kind='kde', fill=True, legend=False)
children = plt.gca().get_children()
l = plt.axhline(target, color='red')
if compare_to_am:
plt.legend(children[:4] + [l], [f'{is_Mv}AE-AM Post', f'{is_Mv}AE-AM Pre', f'{is_Mv}AM Post', f'{is_Mv}AM Pre', 'target'],
loc='upper right', bbox_to_anchor=(1.05,0.9))
else:
plt.legend(children[:2] + [l], [f'{is_Mv}AE-AM Post', f'{is_Mv}AE-AM Pre''target'],
loc='upper right', bbox_to_anchor=(1.05,0.9))
plt.ylabel(target_var)
## Save plot
plt.savefig((f'./figures/distribution-{params["season"]}-automated-functions-LatentSpace{params["latent_dim"]}-{params["k"]}.png').replace('*',''))
plt.savefig((f'./figures/distribution-{params["season"]}-automated-functions-LatentSpace{params["latent_dim"]}-{params["k"]}.pdf').replace('*',''))
else:
a = sns.displot(df_comp_melted, y = 'value', hue = 'variable', kind='kde', fill=True, legend=False)
children = plt.gca().get_children()
l = plt.axhline(target, color='red')
if compare_to_am:
plt.legend(children[:2] + [l], [f'{is_Mv}AE-AM', f'{is_Mv}AM', 'target'], loc='upper right', bbox_to_anchor=(1.,0.9))
else:
plt.legend(children[:1] + [l], [f'{is_Mv}AE-AM', 'target'], loc='upper right', bbox_to_anchor=(1.,0.9))
plt.ylabel(target_var)
## Save plot
plt.savefig((f'./figures/distribution-{params["season"]}-automated-functions-LatentSpace{params["latent_dim"]}-{params["k"]}.png').replace('*',''))
plt.savefig((f'./figures/distribution-{params["season"]}-automated-functions-LatentSpace{params["latent_dim"]}-{params["k"]}.pdf').replace('*',''))
# if save_stats save stats
stats_name = ["Mean", "Std", "Diff with Target", "Diff with Pred"]
if save_stats:
if is_atribution:
if compare_to_am:
stats_nd = np.array(
[
[np.round(np.mean(AE_Pre), decimals=4), np.round(np.mean(analog_Pre), decimals=4),
np.round(np.mean(AE_Ind), decimals=4), np.round(np.mean(analog_Ind), decimals=4), np.round(target, decimals=4)],
[np.round(np.std(AE_Pre), decimals=4), np.round(np.std(analog_Pre), decimals=4),
np.round(np.std(AE_Ind), decimals=4), np.round(np.std(analog_Ind), decimals=4), 0],
[np.round(target-np.mean(AE_Pre), decimals=4), np.round(target-np.mean(analog_Pre), decimals=4),
np.round(np.mean(target-AE_Ind), decimals=4), np.round(target-np.mean(analog_Ind), decimals=4), 0],
[np.round(np.mean(AE_Pre_preddiff), decimals=4), np.round(np.mean(analog_Pre_preddiff), decimals=4),
np.round(np.mean(AE_Ind_preddiff), decimals=4), np.round(np.mean(analog_Ind_preddiff), decimals=4),0],
]
)
df_stats = pd.DataFrame(data=stats_nd.T, columns=stats_name, index=[f'{is_Mv}AE-AM in Pre', f'{is_Mv}AM in Pre', f'{is_Mv}AE-AM in Post', f'{is_Mv}AM in Post', 'Target'])
Path("./comparison-csv").mkdir(parents=True, exist_ok=True)
df_stats.to_csv(f'./comparison-csv/stats-summary-{params["season"]}{params["name"]}x{params["iter"]}-{params["data_of_interest_init"]}-epoch{params["n_epochs"]}-latent{params["latent_dim"]}-k{params["k"]}-arch{params["arch"]}-{"VAE" if params["use_VAE"] else "noVAE"}-analog-comparision-stats{current.year}-{current.month}-{current.day}-{current.hour}-{current.minute}-{current.second}.csv'.replace(" ","").replace("'", "").replace(",",""))
else:
stats_nd = np.array(
[
[np.round(np.mean(AE_Pre), decimals=4), np.round(np.mean(AE_Ind), decimals=4), np.round(target, decimals=4)],
[np.round(np.std(AE_Pre), decimals=4), np.round(np.std(AE_Ind), decimals=4), 0],
[np.round(target-np.mean(AE_Pre), decimals=4), np.round(np.mean(target-AE_Ind), decimals=4), 0],
[np.round(np.mean(AE_Pre_preddiff), decimals=4), np.round(np.mean(AE_Ind_preddiff), decimals=4), 0],
]
)
df_stats = pd.DataFrame(data=stats_nd.T, columns=stats_name, index=[f'{is_Mv}AE-AM in Pre', f'{is_Mv}AE-AM in Post', 'Target'])
Path("./comparison-csv").mkdir(parents=True, exist_ok=True)
df_stats.to_csv(f'./comparison-csv/stats-summary-{params["season"]}{params["name"]}x{params["iter"]}-{params["data_of_interest_init"]}-epoch{params["n_epochs"]}-latent{params["latent_dim"]}-k{params["k"]}-arch{params["arch"]}-{"VAE" if params["use_VAE"] else "noVAE"}-analog-comparision-stats{current.year}-{current.month}-{current.day}-{current.hour}-{current.minute}-{current.second}.csv'.replace(" ","").replace("'", "").replace(",",""))
else:
if compare_to_am:
stats_nd = np.array(
[
[np.round(np.mean(AE_Ind), decimals=4), np.round(np.mean(analog_Ind), decimals=4), np.round(target, decimals=4)],
[np.round(np.std(AE_Ind), decimals=4), np.round(np.std(analog_Ind), decimals=4), 0],
[np.round(np.mean(target-AE_Ind), decimals=4), np.round(target-np.mean(analog_Ind), decimals=4), 0],
[np.round(np.mean(AE_Ind_preddiff), decimals=4), np.round(np.mean(analog_Ind_preddiff), decimals=4),0],
]
)
df_stats = pd.DataFrame(data=stats_nd.T, columns=stats_name, index=[f'{is_Mv}AE-AM', f'{is_Mv}AM', 'Target'])
Path("./comparison-csv").mkdir(parents=True, exist_ok=True)
df_stats.to_csv(f'./comparison-csv/stats-summary-{params["season"]}{params["name"]}x{params["iter"]}-{params["data_of_interest_init"]}-epoch{params["n_epochs"]}-latent{params["latent_dim"]}-k{params["k"]}-arch{params["arch"]}-{"VAE" if params["use_VAE"] else "noVAE"}-analog-comparision-stats{current.year}-{current.month}-{current.day}-{current.hour}-{current.minute}-{current.second}.csv'.replace(" ","").replace("'", "").replace(",",""))
else:
stats_nd = np.array(
[
[np.round(np.mean(AE_Ind), decimals=4), np.round(target, decimals=4)],
[np.round(np.std(AE_Ind), decimals=4), 0],
[np.round(np.mean(target-AE_Ind), decimals=4), 0],
[np.round(np.mean(AE_Ind_preddiff), decimals=4), 0],
]
)
df_stats = pd.DataFrame(data=stats_nd.T, columns=stats_name, index=[f'{is_Mv}AE-AM', 'Target'])
Path("./comparison-csv").mkdir(parents=True, exist_ok=True)
df_stats.to_csv(f'./comparison-csv/stats-summary-{params["season"]}{params["name"]}x{params["iter"]}-{params["data_of_interest_init"]}-epoch{params["n_epochs"]}-latent{params["latent_dim"]}-k{params["k"]}-arch{params["arch"]}-{"VAE" if params["use_VAE"] else "noVAE"}-analog-comparision-stats{current.year}-{current.month}-{current.day}-{current.hour}-{current.minute}-{current.second}.csv'.replace(" ","").replace("'", "").replace(",",""))
# else print stats
else:
if is_atribution:
if compare_to_am:
print(f'Pre {is_Mv}AE-AM mean: {np.mean(AE_Pre):.4f}')
print(f'Pre {is_Mv}AE-AM std: {np.std(AE_Pre):.4f}')
print(f'Pre Diff with Target: {target-np.mean(AE_Pre):.4f}')
print(f'Pre Diff with Pred {np.mean(AE_Pre_preddiff):.4f}')
print(f'Pre {is_Mv}AM mean: {np.mean(analog_Pre):.4f}')
print(f'Pre {is_Mv}AM std: {np.std(analog_Pre):.4f}')
print(f'Pre Diff with Target: {target-np.mean(analog_Pre):.4f}')
print(f'Pre Diff with Pred {np.mean(analog_Pre_preddiff):.4f}\n')
print(f'Post {is_Mv}AE-AM mean: {np.mean(AE_Ind):.4f}')
print(f'Post {is_Mv}AE-AM std: {np.std(AE_Ind):.4f}')
print(f'Post Diff with Target: {target-np.mean(AE_Ind):.4f}')
print(f'Post Diff with Pred {np.mean(AE_Ind_preddiff):.4f}')
print(f'Post {is_Mv}AM mean: {np.mean(analog_Ind):.4f}')
print(f'Post {is_Mv}AM std: {np.std(analog_Ind):.4f}')
print(f'Post Diff with Target: {target-np.mean(analog_Ind):.4f}')
print(f'Post Diff with Pred {np.mean(analog_Ind_preddiff):.4f}\n')
print(f'Target: {target:.4f}')
else:
print(f'Pre {is_Mv}AE-AM mean: {np.mean(AE_Pre):.4f}')
print(f'Pre {is_Mv}AE-AM std: {np.std(AE_Pre):.4f}')
print(f'Pre Diff with Target: {target-np.mean(AE_Pre):.4f}')
print(f'Pre Diff with Pred {np.mean(AE_Pre_preddiff):.4f}')
print(f'Post {is_Mv}AE-AM mean: {np.mean(AE_Ind):.4f}')
print(f'Post {is_Mv}AE-AM std: {np.std(AE_Ind):.4f}')
print(f'Post Diff with Target: {target-np.mean(AE_Ind):.4f}')
print(f'Post Diff with Pred {np.mean(AE_Ind_preddiff):.4f}')
print(f'Target: {target:.4f}')
else:
if compare_to_am:
print(f'{is_Mv}AE-AM mean: {np.mean(AE_Ind):.4f}')
print(f'{is_Mv}AE-AM std: {np.std(AE_Ind):.4f}')
print(f'Diff with Target: {target-np.mean(AE_Ind):.4f}')
print(f'Diff with Pred {np.mean(AE_Ind_preddiff):.4f}')
print(f'{is_Mv}AM mean: {np.mean(analog_Ind):.4f}')
print(f'{is_Mv}AM std: {np.std(analog_Ind):.4f}')
print(f'Diff with Target: {target-np.mean(analog_Ind):.4f}')
print(f'Diff with Pred {np.mean(analog_Ind_preddiff):.4f}\n')
print(f'Target: {target:.4f}')
else:
print(f'{is_Mv}AE-AM mean: {np.mean(AE_Ind):.4f}')
print(f'{is_Mv}AE-AM std: {np.std(AE_Ind):.4f}')
print(f'Diff with Target: {target-np.mean(AE_Ind):.4f}')
print(f'Diff with Pred {np.mean(AE_Ind_preddiff):.4f}')
print(f'Target: {target:.4f}')
return
def _step_loop(params, params_multiple, file_params_name, n_execs, ident, verb, teleg, token, chat_id, user_name, save_recons, args):
"""
_step_loop
Auxiliar method that handle the runs depending on the argsparse options selected.
Parameters
----------
params: dict
Default parameters and configuration dictionary for most of the executions.
params_multiple: dict
Specific parameters and configuration dictionary that overwrite params for some methods.
file_params_name: str
The default name of the params/configuration file.
n_execs: int
The number of repeted executions for some methods.
ident: bool
Value of flag to performs the identification period task or not.
verb: bool
Value of flag that indicates if verbosity information should be show or not.
teleg: bool
Value of flag for sending Exceptions to Telegram bot.
token: str
Token of Telegram bot.
chat_id: str
ID of the chat where the Telegram bot will send the messages.
user_name: str
User name to mention in case of Exceptions.
save_recons: bool
Value of flag for saving or not the reconstrucion information in an .nc file.
args: argsparse
The argsparse object with the activated options from terminal.
Returns
----------
"""
# Period
period = 'both'
if args.period is not None:
period = args.period
reconstructions_Pre_Analog = []
reconstructions_Post_Analog = []
reconstructions_Pre_AE = []
reconstructions_Post_AE = []
# Execution of method
if args.method == 'day' or args.method is None:
if ident:
# Read file
## Default if configfile is not specified
if args.conf is not None:
file_params_name = args.conf
## Try-except-else to open file and re-write params
try:
file_params = open(file_params_name)
except:
message = OSError(f'File {file_params_name} not found. To identify the Heat wave period a configuration parameters file is needed.')
if teleg:
url = f"https://api.telegram.org/bot{token}/sendMessage?chat_id={chat_id}&parse_mode=HTML&text={'[<b>'+type(message).__name__+'</b>] '+user_name+': '+str(message)}"
print(requests.get(url).json())
raise message
else:
params = json.load(file_params)
file_params.close()
if verb is not None:
params["verbose"] = verb
params["teleg"] = teleg
params["period"] = period
if "p" not in params.keys():
params["p"] = 2
if "enhanced_distance" not in params.keys():
params["enhanced_distance"] = False
if "save_recons" not in params.keys():
params["save_recons"] = save_recons
if "secret_file" not in params.keys():
params["secret_file"] = args.secret
identify_heatwave_days(params)
message = f"Indentify Heat wave period (flag -i --identifyhw) for {params['name'][1:-1]} is not compatible with default 'method' ('day') and this will not be executed"
if teleg:
url = f"https://api.telegram.org/bot{token}/sendMessage?chat_id={chat_id}&text={'[WARN]: '+message}"
print(requests.get(url).json())
warnings.warn(message)
else:
runComparison(params)
# Everything finished
message = f'Execution of method {args.method} for {params["name"]} with arch {params["arch"]} and latent dim {params["latent_dim"]} has finished successfully'
if teleg:
url = f"https://api.telegram.org/bot{token}/sendMessage?chat_id={chat_id}&text={'[WARN]: '+message}"
requests.get(url).json()
warnings.warn(message)
elif args.method in ['days', 'seasons', 'execs', 'latents', 'seasons-execs', 'latents-execs', 'latents-seasons-execs']:
# Read file
## Default if configfile is not specified
if args.conf is not None:
file_params_name = args.conf
## Try-except-else to open file and re-write params
try:
file_params = open(file_params_name)
except:
message = OSError(f'File {file_params_name} not found. Your method need a configuration parameters file.')
if teleg:
url = f"https://api.telegram.org/bot{token}/sendMessage?chat_id={chat_id}&parse_mode=HTML&text={'[<b>'+type(message).__name__+'</b>] '+user_name+': '+str(message)}"
requests.get(url).json()
raise message
else:
params_multiple = json.load(file_params)
params_multiple["teleg"] = teleg
params_multiple["period"] = period
if "secret_file" not in params_multiple.keys():
params_multiple["secret_file"] = args.secret
if ident:
heatwave_period = identify_heatwave_days(params_multiple)
params_multiple["data_of_interest_init"] = heatwave_period
params_multiple["data_of_interest_end"] = heatwave_period
else:
if params_multiple["per_what"] == "per_month":
heatwave_period = (pd.date_range(start=params_multiple["data_of_interest_init"], end=params_multiple["data_of_interest_end"], freq='MS')).to_numpy()
elif params_multiple["per_what"] == "per_week":
heatwave_period = np.arange(datetime.datetime.strptime(params_multiple["data_of_interest_init"], '%Y-%m-%d'), datetime.datetime.strptime(params_multiple["data_of_interest_end"], '%Y-%m-%d')+datetime.timedelta(days=1), datetime.timedelta(weeks=1))
else:
heatwave_period = np.arange(datetime.datetime.strptime(params_multiple["data_of_interest_init"], '%Y-%m-%d'), datetime.datetime.strptime(params_multiple["data_of_interest_end"], '%Y-%m-%d')+datetime.timedelta(days=1), datetime.timedelta(days=1))
heatwave_period = np.array(list(map(pd.Timestamp, heatwave_period)))
params_multiple["data_of_interest_init"] = heatwave_period
params_multiple["data_of_interest_end"] = heatwave_period
params = params_multiple.copy()
file_params.close()
if verb is not None:
params["verbose"] = verb
if "p" not in params.keys():
params["p"] = 2
if "enhanced_distance" not in params.keys():
params["enhanced_distance"] = False
if "save_recons" not in params.keys():
params["save_recons"] = save_recons
# Methods with configfile
if args.method == 'days':
for idx, init in enumerate(params_multiple["data_of_interest_init"]):
if idx == 0:
params["load_AE"] = False
if idx > 0 and not params["load_AE"]:
params["load_AE"] = True
params["data_of_interest_init"] = init
params["data_of_interest_end"] = params_multiple["data_of_interest_end"][idx]
reconstruction_Pre_Analog, reconstruction_Post_Analog, reconstruction_Pre_AE, reconstruction_Post_AE = runComparison(params)
if params["save_recons"]:
reconstructions_Pre_Analog.append(reconstruction_Pre_Analog)
reconstructions_Post_Analog.append(reconstruction_Post_Analog)
reconstructions_Pre_AE.append(reconstruction_Pre_AE)
reconstructions_Post_AE.append(reconstruction_Post_AE)
save_reconstruction(params, reconstructions_Pre_Analog, reconstructions_Post_Analog, reconstructions_Pre_AE, reconstructions_Post_AE)
# Exec finished
message = f'Execution of method {args.method} for {params["name"]} with arch {params["arch"]} and latent dim {params["latent_dim"]} has finished successfully'
if teleg:
url = f"https://api.telegram.org/bot{token}/sendMessage?chat_id={chat_id}&text={'[WARN]: '+message}"
requests.get(url).json()
warnings.warn(message)
elif args.method == 'seasons':
for season in params_multiple["season"]:
params["season"] = season
for idx, init in enumerate(params_multiple["data_of_interest_init"]):
if idx == 0:
params["load_AE"] = False
if idx > 0 and not params["load_AE"]:
params["load_AE"] = True
params["data_of_interest_init"] = init
params["data_of_interest_end"] = params_multiple["data_of_interest_end"][idx]
reconstruction_Pre_Analog, reconstruction_Post_Analog, reconstruction_Pre_AE, reconstruction_Post_AE = runComparison(params)
if params["save_recons"]:
reconstructions_Pre_Analog.append(reconstruction_Pre_Analog)
reconstructions_Post_Analog.append(reconstruction_Post_Analog)
reconstructions_Pre_AE.append(reconstruction_Pre_AE)
reconstructions_Post_AE.append(reconstruction_Post_AE)
save_reconstruction(params, reconstructions_Pre_Analog, reconstructions_Post_Analog, reconstructions_Pre_AE, reconstructions_Post_AE)
# Exec finished
message = f'Execution of method {args.method} for {params["name"]} with arch {params["arch"]} and latent dim {params["latent_dim"]} has finished successfully'
if teleg:
url = f"https://api.telegram.org/bot{token}/sendMessage?chat_id={chat_id}&text={'[WARN]: '+message}"
requests.get(url).json()
warnings.warn(message)
elif args.method == 'execs':
if 'n_execs' in params.keys():
n_execs = params['n_execs']
for i in range(n_execs):
params["name"] = f'-exec{i}{params_multiple["name"]}'
for idx, init in enumerate(params_multiple["data_of_interest_init"]):
if idx == 0:
params["load_AE"] = False
if idx > 0 and not params["load_AE"]:
params["load_AE"] = True
params["data_of_interest_init"] = init
params["data_of_interest_end"] = params_multiple["data_of_interest_end"][idx]
reconstruction_Pre_Analog, reconstruction_Post_Analog, reconstruction_Pre_AE, reconstruction_Post_AE = runComparison(params)
if params["save_recons"]:
reconstructions_Pre_Analog.append(reconstruction_Pre_Analog)
reconstructions_Post_Analog.append(reconstruction_Post_Analog)
reconstructions_Pre_AE.append(reconstruction_Pre_AE)
reconstructions_Post_AE.append(reconstruction_Post_AE)
save_reconstruction(params, reconstructions_Pre_Analog, reconstructions_Post_Analog, reconstructions_Pre_AE, reconstructions_Post_AE)
# Exec finished
message = f'Execution of method {args.method} for {params["name"]} with arch {params["arch"]} and latent dim {params["latent_dim"]} has finished successfully'
if teleg:
url = f"https://api.telegram.org/bot{token}/sendMessage?chat_id={chat_id}&text={'[WARN]: '+message}"
requests.get(url).json()
warnings.warn(message)
elif args.method == 'latents':
for latent in params_multiple["latent_dim"]:
params["latent_dim"] = latent
for idx, init in enumerate(params_multiple["data_of_interest_init"]):
if idx == 0:
params["load_AE"] = False
if idx > 0 and not params["load_AE"]:
params["load_AE"] = True
params["data_of_interest_init"] = init
params["data_of_interest_end"] = params_multiple["data_of_interest_end"][idx]
reconstruction_Pre_Analog, reconstruction_Post_Analog, reconstruction_Pre_AE, reconstruction_Post_AE = runComparison(params)
if params["save_recons"]:
reconstructions_Pre_Analog.append(reconstruction_Pre_Analog)
reconstructions_Post_Analog.append(reconstruction_Post_Analog)
reconstructions_Pre_AE.append(reconstruction_Pre_AE)
reconstructions_Post_AE.append(reconstruction_Post_AE)
save_reconstruction(params, reconstructions_Pre_Analog, reconstructions_Post_Analog, reconstructions_Pre_AE, reconstructions_Post_AE)
# Exec finished
message = f'Execution of method {args.method} for {params["name"]} with arch {params["arch"]} and latent dim {params["latent_dim"]} has finished successfully'
if teleg:
url = f"https://api.telegram.org/bot{token}/sendMessage?chat_id={chat_id}&text={'[WARN]: '+message}"
requests.get(url).json()
warnings.warn(message)
elif args.method == 'seasons-execs':
if 'n_execs' in params.keys():
n_execs = params["n_execs"]
for season in params_multiple["season"]:
params["season"] = season
for i in range(n_execs):
params["name"] = f'-exec{i}{params_multiple["name"]}'
for idx, init in enumerate(params_multiple["data_of_interest_init"]):
if idx == 0:
params["load_AE"] = False
if idx > 0 and not params["load_AE"]:
params["load_AE"] = True
params["data_of_interest_init"] = init
params["data_of_interest_end"] = params_multiple["data_of_interest_end"][idx]
reconstruction_Pre_Analog, reconstruction_Post_Analog, reconstruction_Pre_AE, reconstruction_Post_AE = runComparison(params)
if params["save_recons"]:
reconstructions_Pre_Analog.append(reconstruction_Pre_Analog)
reconstructions_Post_Analog.append(reconstruction_Post_Analog)
reconstructions_Pre_AE.append(reconstruction_Pre_AE)
reconstructions_Post_AE.append(reconstruction_Post_AE)
save_reconstruction(params, reconstructions_Pre_Analog, reconstructions_Post_Analog, reconstructions_Pre_AE, reconstructions_Post_AE)
# Exec finished
message = f'Execution of method {args.method} for {params["name"]} with arch {params["arch"]} and latent dim {params["latent_dim"]} has finished successfully'
if teleg:
url = f"https://api.telegram.org/bot{token}/sendMessage?chat_id={chat_id}&text={'[WARN]: '+message}"
requests.get(url).json()
warnings.warn(message)
elif args.method == 'latents-execs':
if 'n_execs' in params.keys():
n_execs = params['n_execs']
for latent in params_multiple["latent_dim"]:
params["latent_dim"] = latent
for i in range(n_execs):
params["name"] = f'-exec{i}{params_multiple["name"]}'
for idx, init in enumerate(params_multiple["data_of_interest_init"]):
if idx == 0:
params["load_AE"] = False
if idx > 0 and not params["load_AE"]:
params["load_AE"] = True
params["data_of_interest_init"] = init
params["data_of_interest_end"] = params_multiple["data_of_interest_end"][idx]
reconstruction_Pre_Analog, reconstruction_Post_Analog, reconstruction_Pre_AE, reconstruction_Post_AE = runComparison(params)
if params["save_recons"]:
reconstructions_Pre_Analog.append(reconstruction_Pre_Analog)
reconstructions_Post_Analog.append(reconstruction_Post_Analog)
reconstructions_Pre_AE.append(reconstruction_Pre_AE)
reconstructions_Post_AE.append(reconstruction_Post_AE)
save_reconstruction(params, reconstructions_Pre_Analog, reconstructions_Post_Analog, reconstructions_Pre_AE, reconstructions_Post_AE)
# Exec finished
message = f'Execution of method {args.method} for {params["name"]} with arch {params["arch"]} and latent dim {params["latent_dim"]} has finished successfully'
if teleg:
url = f"https://api.telegram.org/bot{token}/sendMessage?chat_id={chat_id}&text={'[WARN]: '+message}"
requests.get(url).json()
warnings.warn(message)
elif args.method == 'latents-seasons-execs':
if 'n_execs' in params.keys():
n_execs = params['n_execs']
for latent in params_multiple["latent_dim"]:
params["latent_dim"] = latent
for season in params_multiple["season"]:
params["season"] = season
for i in range(n_execs):
params["name"] = f'-exec{i}{params_multiple["name"]}'
for idx, init in enumerate(params_multiple["data_of_interest_init"]):
if idx == 0:
params["load_AE"] = False
if idx > 0 and not params["load_AE"]:
params["load_AE"] = True
params["data_of_interest_init"] = init
params["data_of_interest_end"] = params_multiple["data_of_interest_end"][idx]
reconstruction_Pre_Analog, reconstruction_Post_Analog, reconstruction_Pre_AE, reconstruction_Post_AE = runComparison(params)
if params["save_recons"]:
reconstructions_Pre_Analog.append(reconstruction_Pre_Analog)
reconstructions_Post_Analog.append(reconstruction_Post_Analog)
reconstructions_Pre_AE.append(reconstruction_Pre_AE)
reconstructions_Post_AE.append(reconstruction_Post_AE)
save_reconstruction(params, reconstructions_Pre_Analog, reconstructions_Post_Analog, reconstructions_Pre_AE, reconstructions_Post_AE)
# Exec finished
message = f'Execution of method {args.method} for {params["name"]} with arch {params["arch"]} and latent dim {params["latent_dim"]} has finished successfully'
if teleg:
url = f"https://api.telegram.org/bot{token}/sendMessage?chat_id={chat_id}&text={'[WARN]: '+message}"
requests.get(url).json()
warnings.warn(message)
else:
message = ValueError(f"Not recognized {args.method} method. The available methods are 'day' (default), 'days', 'seasons', 'execs', 'latents', 'seasons-execs', 'latents-execs' or 'latents-seasons-execs'")
if teleg:
url = f"https://api.telegram.org/bot{token}/sendMessage?chat_id={chat_id}&parse_mode=HTML&text={'[<b>'+type(message).__name__+'</b>] '+user_name+': '+str(message)}"
requests.get(url).json()
raise message
# Post-process
if not ident:
post_process(file_params_name, is_atribution = period == 'both')
message = f'Post process of method {args.method} for {params["name"]} with arch {params["arch"]} and latent dim {params["latent_dim"]} has finished successfully'
if teleg:
url = f"https://api.telegram.org/bot{token}/sendMessage?chat_id={chat_id}&text={'[WARN]: '+message}"
requests.get(url).json()
warnings.warn(message)
return
def _step_loop_without_args(params, params_multiple, file_params_name, n_execs, ident, verb, teleg, token, chat_id, user_name, save_recons, period, method):
"""
_step_loop
Auxiliar method that handle the runs depending on the options specified.
Parameters
----------
params: dict
Default parameters and configuration dictionary for most of the executions.
params_multiple: dict
Specific parameters and configuration dictionary that overwrite params for some methods.
file_params_name: str
The default name of the params/configuration file.
n_execs: int
The number of repeted executions for some methods.
ident: bool
Value of flag to performs the identification period task or not.
verb: bool
Value of flag that indicates if verbosity information should be show or not.
teleg: bool
Value of flag for sending Exceptions to Telegram bot.
token: str
Token of Telegram bot.
chat_id: str
ID of the chat where the Telegram bot will send the messages.
user_name: str
User name to mention in case of Exceptions.
save_recons: bool
Value of flag for saving or not the reconstrucion information in an .nc file.
period: str
Specify the period where to perform the operation between `both` (default), `pre` or `post`.
method: str
Specify an method to execute between: `day` (default), `days`, `seasons`, `execs`, `latents`, `seasons-execs`, `latents-execs` or `latents-seasons-execs`
Returns
----------
"""
reconstructions_Pre_Analog = []
reconstructions_Post_Analog = []
reconstructions_Pre_AE = []
reconstructions_Post_AE = []
# Execution of method
if method == 'day':
if ident:
# Read file
## Try-except-else to open file and re-write params
try:
file_params = open(file_params_name)
except:
message = OSError(f'File {file_params_name} not found. To identify the Heat wave period a configuration parameters file is needed.')
if teleg:
url = f"https://api.telegram.org/bot{token}/sendMessage?chat_id={chat_id}&parse_mode=HTML&text={'[<b>'+type(message).__name__+'</b>] '+user_name+': '+str(message)}"
print(requests.get(url).json())
raise message
else:
params = json.load(file_params)
file_params.close()
if verb is not None:
params["verbose"] = verb
params["teleg"] = teleg
params["period"] = period
if "p" not in params.keys():
params["p"] = 2
if "enhanced_distance" not in params.keys():
params["enhanced_distance"] = False
if "save_recons" not in params.keys():
params["save_recons"] = save_recons
if "secret_file" not in params.keys():
params["secret_file"] = args.secret
identify_heatwave_days(params)
message = f"Indentify Heat wave period (flag -i --identifyhw) for {params['name'][1:-1]} is not compatible with default 'method' ('day') and this will not be executed"
if teleg:
url = f"https://api.telegram.org/bot{token}/sendMessage?chat_id={chat_id}&text={'[WARN]: '+message}"
print(requests.get(url).json())
warnings.warn(message)
else:
runComparison(params)
# Everything finished
message = f'Execution of method {args.method} for {params["name"]} with arch {params["arch"]} and latent dim {params["latent_dim"]} has finished successfully'
if teleg:
url = f"https://api.telegram.org/bot{token}/sendMessage?chat_id={chat_id}&text={'[WARN]: '+message}"
requests.get(url).json()
warnings.warn(message)
elif method in ['days', 'seasons', 'execs', 'latents', 'seasons-execs', 'latents-execs', 'latents-seasons-execs']:
# Read file
## Try-except-else to open file and re-write params
try:
file_params = open(file_params_name)
except:
message = OSError(f'File {file_params_name} not found. Your method need a configuration parameters file.')
if teleg:
url = f"https://api.telegram.org/bot{token}/sendMessage?chat_id={chat_id}&parse_mode=HTML&text={'[<b>'+type(message).__name__+'</b>] '+user_name+': '+str(message)}"
requests.get(url).json()
raise message
else:
params_multiple = json.load(file_params)
params_multiple["teleg"] = teleg
params_multiple["period"] = period
if "secret_file" not in params_multiple.keys():
params_multiple["secret_file"] = args.secret
if ident:
heatwave_period = identify_heatwave_days(params_multiple)
params_multiple["data_of_interest_init"] = heatwave_period
params_multiple["data_of_interest_end"] = heatwave_period
else:
if params_multiple["per_what"] == "per_month":
heatwave_period = (pd.date_range(start=params_multiple["data_of_interest_init"], end=params_multiple["data_of_interest_end"], freq='MS')).to_numpy()
elif params_multiple["per_what"] == "per_week":
heatwave_period = np.arange(datetime.datetime.strptime(params_multiple["data_of_interest_init"], '%Y-%m-%d'), datetime.datetime.strptime(params_multiple["data_of_interest_end"], '%Y-%m-%d')+datetime.timedelta(days=1), datetime.timedelta(weeks=1))
else:
heatwave_period = np.arange(datetime.datetime.strptime(params_multiple["data_of_interest_init"], '%Y-%m-%d'), datetime.datetime.strptime(params_multiple["data_of_interest_end"], '%Y-%m-%d')+datetime.timedelta(days=1), datetime.timedelta(days=1))
heatwave_period = np.array(list(map(pd.Timestamp, heatwave_period)))
params_multiple["data_of_interest_init"] = heatwave_period
params_multiple["data_of_interest_end"] = heatwave_period
params = params_multiple.copy()
file_params.close()
if verb is not None:
params["verbose"] = verb
if "p" not in params.keys():
params["p"] = 2
if "enhanced_distance" not in params.keys():
params["enhanced_distance"] = False
if "save_recons" not in params.keys():
params["save_recons"] = save_recons
# Methods with configfile
if method == 'days':
for idx, init in enumerate(params_multiple["data_of_interest_init"]):
if idx == 0:
params["load_AE"] = False
if idx > 0 and not params["load_AE"]:
params["load_AE"] = True
params["data_of_interest_init"] = init
params["data_of_interest_end"] = params_multiple["data_of_interest_end"][idx]
reconstruction_Pre_Analog, reconstruction_Post_Analog, reconstruction_Pre_AE, reconstruction_Post_AE = runComparison(params)
if params["save_recons"]:
reconstructions_Pre_Analog.append(reconstruction_Pre_Analog)
reconstructions_Post_Analog.append(reconstruction_Post_Analog)
reconstructions_Pre_AE.append(reconstruction_Pre_AE)
reconstructions_Post_AE.append(reconstruction_Post_AE)
save_reconstruction(params, reconstructions_Pre_Analog, reconstructions_Post_Analog, reconstructions_Pre_AE, reconstructions_Post_AE)
# Exec finished
message = f'Execution of method {args.method} for {params["name"]} with arch {params["arch"]} and latent dim {params["latent_dim"]} has finished successfully'
if teleg:
url = f"https://api.telegram.org/bot{token}/sendMessage?chat_id={chat_id}&text={'[WARN]: '+message}"
requests.get(url).json()
warnings.warn(message)
elif method == 'seasons':
for season in params_multiple["season"]:
params["season"] = season
for idx, init in enumerate(params_multiple["data_of_interest_init"]):
if idx == 0:
params["load_AE"] = False
if idx > 0 and not params["load_AE"]:
params["load_AE"] = True
params["data_of_interest_init"] = init
params["data_of_interest_end"] = params_multiple["data_of_interest_end"][idx]
reconstruction_Pre_Analog, reconstruction_Post_Analog, reconstruction_Pre_AE, reconstruction_Post_AE = runComparison(params)
if params["save_recons"]:
reconstructions_Pre_Analog.append(reconstruction_Pre_Analog)
reconstructions_Post_Analog.append(reconstruction_Post_Analog)
reconstructions_Pre_AE.append(reconstruction_Pre_AE)
reconstructions_Post_AE.append(reconstruction_Post_AE)
save_reconstruction(params, reconstructions_Pre_Analog, reconstructions_Post_Analog, reconstructions_Pre_AE, reconstructions_Post_AE)
# Exec finished
message = f'Execution of method {args.method} for {params["name"]} with arch {params["arch"]} and latent dim {params["latent_dim"]} has finished successfully'
if teleg:
url = f"https://api.telegram.org/bot{token}/sendMessage?chat_id={chat_id}&text={'[WARN]: '+message}"
requests.get(url).json()
warnings.warn(message)
elif method == 'execs':
if 'n_execs' in params.keys():
n_execs = params['n_execs']
for i in range(n_execs):
params["name"] = f'-exec{i}{params_multiple["name"]}'
for idx, init in enumerate(params_multiple["data_of_interest_init"]):
if idx == 0:
params["load_AE"] = False
if idx > 0 and not params["load_AE"]:
params["load_AE"] = True
params["data_of_interest_init"] = init
params["data_of_interest_end"] = params_multiple["data_of_interest_end"][idx]
reconstruction_Pre_Analog, reconstruction_Post_Analog, reconstruction_Pre_AE, reconstruction_Post_AE = runComparison(params)
if params["save_recons"]:
reconstructions_Pre_Analog.append(reconstruction_Pre_Analog)
reconstructions_Post_Analog.append(reconstruction_Post_Analog)
reconstructions_Pre_AE.append(reconstruction_Pre_AE)
reconstructions_Post_AE.append(reconstruction_Post_AE)
save_reconstruction(params, reconstructions_Pre_Analog, reconstructions_Post_Analog, reconstructions_Pre_AE, reconstructions_Post_AE)
# Exec finished
message = f'Execution of method {args.method} for {params["name"]} with arch {params["arch"]} and latent dim {params["latent_dim"]} has finished successfully'
if teleg:
url = f"https://api.telegram.org/bot{token}/sendMessage?chat_id={chat_id}&text={'[WARN]: '+message}"
requests.get(url).json()
warnings.warn(message)
elif method == 'latents':
for latent in params_multiple["latent_dim"]:
params["latent_dim"] = latent
for idx, init in enumerate(params_multiple["data_of_interest_init"]):
if idx == 0:
params["load_AE"] = False
if idx > 0 and not params["load_AE"]:
params["load_AE"] = True
params["data_of_interest_init"] = init
params["data_of_interest_end"] = params_multiple["data_of_interest_end"][idx]
reconstruction_Pre_Analog, reconstruction_Post_Analog, reconstruction_Pre_AE, reconstruction_Post_AE = runComparison(params)
if params["save_recons"]:
reconstructions_Pre_Analog.append(reconstruction_Pre_Analog)
reconstructions_Post_Analog.append(reconstruction_Post_Analog)
reconstructions_Pre_AE.append(reconstruction_Pre_AE)
reconstructions_Post_AE.append(reconstruction_Post_AE)
save_reconstruction(params, reconstructions_Pre_Analog, reconstructions_Post_Analog, reconstructions_Pre_AE, reconstructions_Post_AE)
# Exec finished
message = f'Execution of method {args.method} for {params["name"]} with arch {params["arch"]} and latent dim {params["latent_dim"]} has finished successfully'
if teleg:
url = f"https://api.telegram.org/bot{token}/sendMessage?chat_id={chat_id}&text={'[WARN]: '+message}"
requests.get(url).json()
warnings.warn(message)
elif method == 'seasons-execs':
if 'n_execs' in params.keys():
n_execs = params["n_execs"]
for season in params_multiple["season"]:
params["season"] = season
for i in range(n_execs):
params["name"] = f'-exec{i}{params_multiple["name"]}'
for idx, init in enumerate(params_multiple["data_of_interest_init"]):
if idx == 0:
params["load_AE"] = False
if idx > 0 and not params["load_AE"]:
params["load_AE"] = True
params["data_of_interest_init"] = init
params["data_of_interest_end"] = params_multiple["data_of_interest_end"][idx]
reconstruction_Pre_Analog, reconstruction_Post_Analog, reconstruction_Pre_AE, reconstruction_Post_AE = runComparison(params)
if params["save_recons"]:
reconstructions_Pre_Analog.append(reconstruction_Pre_Analog)
reconstructions_Post_Analog.append(reconstruction_Post_Analog)
reconstructions_Pre_AE.append(reconstruction_Pre_AE)
reconstructions_Post_AE.append(reconstruction_Post_AE)
save_reconstruction(params, reconstructions_Pre_Analog, reconstructions_Post_Analog, reconstructions_Pre_AE, reconstructions_Post_AE)
# Exec finished
message = f'Execution of method {args.method} for {params["name"]} with arch {params["arch"]} and latent dim {params["latent_dim"]} has finished successfully'
if teleg:
url = f"https://api.telegram.org/bot{token}/sendMessage?chat_id={chat_id}&text={'[WARN]: '+message}"
requests.get(url).json()
warnings.warn(message)
elif method == 'latents-execs':
if 'n_execs' in params.keys():
n_execs = params['n_execs']
for latent in params_multiple["latent_dim"]:
params["latent_dim"] = latent
for i in range(n_execs):
params["name"] = f'-exec{i}{params_multiple["name"]}'
for idx, init in enumerate(params_multiple["data_of_interest_init"]):
if idx == 0:
params["load_AE"] = False
if idx > 0 and not params["load_AE"]:
params["load_AE"] = True
params["data_of_interest_init"] = init
params["data_of_interest_end"] = params_multiple["data_of_interest_end"][idx]
reconstruction_Pre_Analog, reconstruction_Post_Analog, reconstruction_Pre_AE, reconstruction_Post_AE = runComparison(params)
if params["save_recons"]:
reconstructions_Pre_Analog.append(reconstruction_Pre_Analog)
reconstructions_Post_Analog.append(reconstruction_Post_Analog)
reconstructions_Pre_AE.append(reconstruction_Pre_AE)
reconstructions_Post_AE.append(reconstruction_Post_AE)
save_reconstruction(params, reconstructions_Pre_Analog, reconstructions_Post_Analog, reconstructions_Pre_AE, reconstructions_Post_AE)
# Exec finished
message = f'Execution of method {args.method} for {params["name"]} with arch {params["arch"]} and latent dim {params["latent_dim"]} has finished successfully'
if teleg:
url = f"https://api.telegram.org/bot{token}/sendMessage?chat_id={chat_id}&text={'[WARN]: '+message}"
requests.get(url).json()
warnings.warn(message)
elif method == 'latents-seasons-execs':
if 'n_execs' in params.keys():
n_execs = params['n_execs']
for latent in params_multiple["latent_dim"]:
params["latent_dim"] = latent
for season in params_multiple["season"]:
params["season"] = season
for i in range(n_execs):
params["name"] = f'-exec{i}{params_multiple["name"]}'
for idx, init in enumerate(params_multiple["data_of_interest_init"]):
if idx == 0:
params["load_AE"] = False
if idx > 0 and not params["load_AE"]:
params["load_AE"] = True
params["data_of_interest_init"] = init
params["data_of_interest_end"] = params_multiple["data_of_interest_end"][idx]
reconstruction_Pre_Analog, reconstruction_Post_Analog, reconstruction_Pre_AE, reconstruction_Post_AE = runComparison(params)
if params["save_recons"]:
reconstructions_Pre_Analog.append(reconstruction_Pre_Analog)
reconstructions_Post_Analog.append(reconstruction_Post_Analog)
reconstructions_Pre_AE.append(reconstruction_Pre_AE)
reconstructions_Post_AE.append(reconstruction_Post_AE)
save_reconstruction(params, reconstructions_Pre_Analog, reconstructions_Post_Analog, reconstructions_Pre_AE, reconstructions_Post_AE)
# Exec finished
message = f'Execution of method {args.method} for {params["name"]} with arch {params["arch"]} and latent dim {params["latent_dim"]} has finished successfully'
if teleg:
url = f"https://api.telegram.org/bot{token}/sendMessage?chat_id={chat_id}&text={'[WARN]: '+message}"
requests.get(url).json()
warnings.warn(message)
else:
message = ValueError(f"Not recognized {method} method. The available methods are 'day' (default), 'days', 'seasons', 'execs', 'latents', 'seasons-execs', 'latents-execs' or 'latents-seasons-execs'")
if teleg:
url = f"https://api.telegram.org/bot{token}/sendMessage?chat_id={chat_id}&parse_mode=HTML&text={'[<b>'+type(message).__name__+'</b>] '+user_name+': '+str(message)}"
requests.get(url).json()
raise message
# Post-process
if not ident:
post_process(file_params_name, is_atribution = period == 'both')
message = f'Post process of method {args.method} for {params["name"]} with arch {params["arch"]} and latent dim {params["latent_dim"]} has finished successfully'
if teleg:
url = f"https://api.telegram.org/bot{token}/sendMessage?chat_id={chat_id}&text={'[WARN]: '+message}"
requests.get(url).json()
warnings.warn(message)
return
[docs]def va_am(ident:bool=False, method:str='day', config_file:str='params.json', secret_file:str='secrets.txt', verbose:bool=False, teleg:bool=False, period:str='both', save_recons:bool=False):
"""
va_am
Equivalent to main function. Its scope is to provide a way to perform the same procedures as `main` function, but by importing it in another python code.
Parameters
----------
ident: bool
Value of flag to performs the identification period task or not.
method: str
Specify an method to execute between: `day` (default), `days`, `seasons`, `execs`, `latents`, `seasons-execs`, `latents-execs` or `latents-seasons-execs`
config_file: str
The default name of the params/configuration file.
secret_file: str
The default name of the Telegram bot informatin file.
verbose: bool
Value of flag that indicates if verbosity information should be show or not.
teleg: bool
Value of flag for sending Exceptions to Telegram bot.
period: str
Specify the period where to perform the operation between `both` (default), `pre` or `post`.
save_recons: bool
Value of flag for saving or not the reconstrucion information in an .nc file.
Returns
----------
"""
# Default parameters
params = {
"season": "all",
"name": "-per_day-france2003",
"latitude_min": 32,
"latitude_max": 70,
"longitude_min": -30,
"longitude_max": 30,
"pre_init": '1851-01-06',
"pre_end": '1950-12-31',
"post_init": '1951-01-01',
"post_end": '2014-12-28',
"data_of_interest_init": '2003-08-09',
"data_of_interest_end": '2003-08-09',
"load_AE": True,
"load_AE_pre": True, # only for AE post comparison
"file_AE_pre": './models/AE_pre.h5',
"file_AE_post": './models/AE_post.h5',
"latent_dim": 600,
"use_VAE": True,
"with_cpu": False,
"n_epochs": 300,
"k": 20,
"iter": 20,
"interest_region": [4,8,12,19],
"resolution": 2,
"interest_region_type": "idx", # idx or coord
"per_what": "per_day", # per_day or per_week
"remove_year": False,
"replace_choice": False,
"arch": 5,
"teleg": False,
"verbose": False
}
params_multiple = None
file_params_name = config_file
n_execs = 5
verb = None
token = None
chat_id = None
user_name = None
if teleg:
with open(secret_file) as f:
token = f.readline().strip()
chat_id = f.readline().strip()
user_name = f.readline().strip()
f.close()
try:
_step_loop_without_args(params, params_multiple, file_params_name, n_execs, ident, verb, teleg, token, chat_id, user_name, save_recons, period, method)
except Exception as ex:
if teleg:
message = traceback.format_exc().replace("<","").replace(">","")
url = f"https://api.telegram.org/bot{token}/sendMessage?chat_id={chat_id}&parse_mode=HTML&text={'[<b>'+type(ex).__name__+'</b>] '+user_name+': '+str(message)}"
requests.get(url).json()
raise ex
[docs]def main():
"""
Main
Function prepared for runing and managing the program functionality. It use the argparse module to manage the execution of va_am.py as a bash function. To see help use:
.. code-block:: python
python va_am.py -h
"""
# Parser initialization
parser = argparse.ArgumentParser()
parser.add_argument("-i", "--identifyhw", dest='ident', action="store_true", help="Flag. If true, first, identify the heatwave period and, then, apply the 'method' if is one of: \n 'days', 'seasons', 'execs', 'latents', 'seasons-execs', 'latents-execs' or 'latents-seasons-execs'")
parser.add_argument("-m", "--method", dest='method', help="Specify an method to execute between: \n 'day' (default), 'days', 'seasons', 'execs', 'latents', 'seasons-execs', 'latents-execs' or 'latents-seasons-execs'")
parser.add_argument("-f", "--configfile", dest='conf', help="JSON file with configuration of parameters. If not specified and 'method' require the file, it will be searched at 'params.json'")
parser.add_argument("-sf", "--secretfile", dest='secret', help="Path to TXT file with needed information of the Telegram bot to use to WARN and advice about Exceptions. If not specified and 'method' require the file, it will be searched at 'secret.txt'")
parser.add_argument("-v", "--verbose", dest='verb', action="store_true", help="Flag. If true, overwrite verbose param.")
parser.add_argument("-t", "--teleg", dest='teleg', action="store_true", help="Flag. If true, exceptions and warnings will be sent to Telegram Bot.")
parser.add_argument("-p", "--period", dest='period', help="Specify the period where to perform the operations between: \n 'both' (default), 'pre' or 'post'")
parser.add_argument("-sr", "--savereconstruction", dest='save_recons', action="store_true", help="Flag. If true, the reconstruction per iteration would be saved in ./data/ folder as an reconstruction-[name]-[day]-[period]-[AM/VA-AM].nc file.")
args = parser.parse_args()
# Default parameters
params = {
"season": "all",
"name": "-per_day-france2003",
"latitude_min": 32,
"latitude_max": 70,
"longitude_min": -30,
"longitude_max": 30,
"pre_init": '1851-01-06',
"pre_end": '1950-12-31',
"post_init": '1951-01-01',
"post_end": '2014-12-28',
"data_of_interest_init": '2003-08-09',
"data_of_interest_end": '2003-08-09',
"load_AE": True,
"load_AE_pre": True, # only for AE post comparison
"file_AE_pre": './models/AE_pre.h5',
"file_AE_post": './models/AE_post.h5',
"latent_dim": 600,
"use_VAE": True,
"with_cpu": False,
"n_epochs": 300,
"k": 20,
"iter": 20,
"interest_region": [4,8,12,19],
"resolution": 2,
"interest_region_type": "idx", # idx or coord
"per_what": "per_day", # per_day or per_week
"remove_year": False,
"replace_choice": False,
"arch": 5,
"teleg": False,
"verbose": False
}
params_multiple = None
file_params_name = 'params.json'
secret_file = 'secret.txt'
n_execs = 5
ident = False
verb = None
teleg = False
save_recons = False
# Identify Heat wave period
if args.ident is not None:
ident = args.ident
if args.verb is not None:
verb = args.verb
if args.teleg is not None:
teleg = args.teleg
if args.save_recons is not None:
save_recons = args.save_recons
if args.secret is not None:
secret_file = args.secret
else:
args.secret = secret_file
token = None
chat_id = None
user_name = None
if teleg:
with open(secret_file) as f:
token = f.readline().strip()
chat_id = f.readline().strip()
user_name = f.readline().strip()
f.close()
try:
_step_loop(params, params_multiple, file_params_name, n_execs, ident, verb, teleg, token, chat_id, user_name, save_recons, args)
except Exception as ex:
if teleg:
message = traceback.format_exc().replace("<","").replace(">","")
url = f"https://api.telegram.org/bot{token}/sendMessage?chat_id={chat_id}&parse_mode=HTML&text={'[<b>'+type(ex).__name__+'</b>] '+user_name+': '+str(message)}"
requests.get(url).json()
raise ex
if __name__ == "__main__":
main()