import tensorflow as tf
from tensorflow import keras
from keras import layers, regularizers
import keras.backend as K
import numpy as np
[docs]def sampling(args):
"""
Auxiliar function to perform the VAE sampling
Parameters
----------
args: list of keras.layer
A list wich contains the mean layer, standard deviation layer and de dimension of the latent space.
Returns
----------
: keras.layer
The sampled latent space of the VAE.
"""
z_mean, z_log_var, latent_dim = args
print(z_mean)
print(z_log_var)
print(latent_dim)
epsilon = keras.backend.random_normal(shape=(keras.backend.shape(z_mean)[0], latent_dim),
mean=0., stddev=1.)
return z_mean + keras.backend.exp(0.5 * z_log_var) * epsilon
#return z_mean + keras.backend.exp(z_log_var) * epsilon
[docs]def masking(args):
"""
Auxiliar function to apply a spacial mask to a layer
Parameters
----------
args: list of keras.layer
A list wich contains the layer to be masked and the mask to be used.
Returns
----------
: keras.layer
The input layer masked by mask.
"""
input, mask = args
return tf.where(mask, input, 0)
[docs]def masking2(args):
"""
Auxiliar function to apply a spacial conditional mask to a layer
Parameters
----------
args: list of keras.layer
A list wich contains the layer to be masked and the mask to be used.
Returns
----------
: keras.layer
The input layer masked by mask.
"""
input, mask_shape = args
mask = (tf.random.normal(shape=mask_shape) < 0.6)
return tf.where(condition=mask, x=input, y=0)
[docs]class AE_conv():
"""
Class of AutoEncoders
Parameters
----------
input_dim: list or ndarray
A 2-array with the dimensions of the image.
latent_dim: int
The dimension of the latent (coded) space.
arch: int
The architecture of AutoEncoder to be used. The default is the arch=4.
in_channels: int
Input channels to be used. That is, if you are using a RGB image as input, it has 3 input channels. Also, if you use a ndarray with differents variables (wind, temperature, pressure, etc.), you could use each variable as a channel.
out_channels: int
Output channels to be computed. In the example of a RBG image as input, you can generate a RGB image (3 output channels) or a gray-scale image (1 output channel).
Returns
----------
: AE_conv
After initialization, it generates an object with the AutoEncoder architecture and methods
"""
def __init__(self, input_dim, latent_dim, arch=4, in_channels=1, out_channels=1, VAE=False, mask=None):
self.input_dim = input_dim
self.latent_dim = latent_dim
self.in_channels = in_channels
self.out_channels = out_channels
self.encoder = None
self.decoder = None
self.autoencoder = None
self.VAE = VAE
self.arch = arch
if mask is None and arch==6:
self.mask = tf.ones(tf.concat([self.input_dim, [self.in_channels]], 0), dtype=bool)
if mask is not None:
self.mask = tf.convert_to_tensor(mask, dtype=bool)
if self.arch==1:
# default architecture
self.def_arch_build()
elif self.arch==2:
# default simplified
self.simple_arch_build()
elif self.arch==3:
# batch normalized & simetric
self.batched_simetric_build()
elif self.arch==4:
# heatwave adapted to 20x30 with KL loss
self.kl_heatwave_arch_build()
elif self.arch==5:
# default adapted to 20x30
self.heatwave_arch_build()
elif self.arch==6:
# heatwave with mask
self.mask_heatwave_arch_build()
elif self.arch==7:
# heatwave arch simplified
self.simple_heatwave_arch_build()
elif self.arch==8:
# dense architecture
self.dense_build()
elif self.arch==9:
self.ae_arch_build()
elif self.arch==10:
self.ae_relu_arch_build()
elif self.arch==11:
self.mask_ae_relu_arch_build()
elif self.arch==12:
self.very_simple_arch_build()
elif self.arch==13:
self.sparse_arch_build()
elif self.arch==14:
self.keras_vae_build()
elif self.arch==15:
self.vae_relu_arch_build()
elif self.arch==16:
self.ae_relu_arch_build2()
elif self.arch==17:
self.cvae_arch_build()
# Default architecture of anti-simetric AutoEncoder
[docs] def def_arch_build(self):
"""
Easy example of architecture definition. Input have to be divisible by 18 in both dimensions (latitude, longitude).
Parameters
----------
Returns
----------
"""
# Input
input_img = keras.Input(shape=(self.input_dim[0], self.input_dim[1], self.in_channels))
# Encoder
x = layers.Conv2D(32, 3, activation=layers.LeakyReLU(alpha=0.3), padding='same', strides=1)(input_img)
x = layers.Conv2D(32, 3, activation=layers.LeakyReLU(alpha=0.3), padding='same', strides=1)(x)
x = layers.Conv2D(32, 3, activation=layers.LeakyReLU(alpha=0.3), padding='same', strides=3)(x)
x = layers.Conv2D(64, 3, activation=layers.LeakyReLU(alpha=0.3), padding='same', strides=1)(x)
x = layers.Conv2D(64, 3, activation=layers.LeakyReLU(alpha=0.3), padding='same', strides=1)(x)
x = layers.Conv2D(64, 3, activation=layers.LeakyReLU(alpha=0.3), padding='same', strides=3)(x)
x = layers.Conv2D(128, 3, activation=layers.LeakyReLU(alpha=0.3), padding='same', strides=1)(x)
x = layers.Conv2D(128, 3, activation=layers.LeakyReLU(alpha=0.3), padding='same', strides=1)(x)
x = layers.Conv2D(16, 3, activation=layers.LeakyReLU(alpha=0.3), padding='same', strides=2)(x)
x = layers.Flatten()(x)
x = layers.Dense(256, activation=layers.LeakyReLU(alpha=0.3))(x)
# Latent space:
x = layers.Dropout(0.3)(x)
encoded = layers.Dense(self.latent_dim, activation=layers.LeakyReLU(alpha=0.3))(x)
# Decoder
x = layers.Dense(256, activation=layers.LeakyReLU(alpha=0.3))(encoded)
x = layers.Dropout(0.3)(x)
dim_row, dim_col = self.input_dim[0]/18, self.input_dim[1]/18
x = layers.Dense(int(dim_row * dim_col), activation=layers.LeakyReLU(alpha=0.3))(x)
x = layers.Reshape((int(dim_row), int(dim_col), 1))(x)
x = layers.Conv2DTranspose(128, 4, activation=layers.LeakyReLU(alpha=0.3), padding='same', strides=2)(x)
x = layers.Conv2DTranspose(64, 4, activation=layers.LeakyReLU(alpha=0.3), padding='same', strides=3)(x)
x = layers.Conv2DTranspose(32, 4, activation=layers.LeakyReLU(alpha=0.3), padding='same', strides=3)(x)
# Output
x = layers.Conv2DTranspose(3, 4, activation=layers.LeakyReLU(alpha=0.3), padding='same', strides=1)(x)
decoded = layers.Conv2D(self.out_channels, 3, activation='selu', padding='same', strides=1)(x)
# Model
self.encoder = keras.Model(input_img, encoded)
self.decoder = keras.Model(encoded, decoded)
self.autoencoder = keras.Model(input_img, self.decoder(self.encoder(input_img)))
print(self.autoencoder.summary())
# Simplified architecture of anti-simetric AutoEncoder
[docs] def simple_arch_build(self):
"""
A very short architecture. Input have to be divisible by 18 in both dimensions (latitude, longitude).
Parameters
----------
Returns
----------
"""
# Input
input_img = keras.Input(shape=(self.input_dim[0], self.input_dim[1], self.in_channels))
# Encoder
x = layers.Conv2D(32, 3, activation=layers.LeakyReLU(alpha=0.3), padding='same', strides=1)(input_img)
x = layers.Conv2D(32, 3, activation=layers.LeakyReLU(alpha=0.3), padding='same', strides=3)(x)
x = layers.Conv2D(64, 3, activation=layers.LeakyReLU(alpha=0.3), padding='same', strides=1)(x)
x = layers.Conv2D(64, 3, activation=layers.LeakyReLU(alpha=0.3), padding='same', strides=3)(x)
x = layers.Conv2D(128, 3, activation=layers.LeakyReLU(alpha=0.3), padding='same', strides=1)(x)
x = layers.Conv2D(16, 3, activation=layers.LeakyReLU(alpha=0.3), padding='same', strides=2)(x)
x = layers.Flatten()(x)
x = layers.Dense(256, activation=layers.LeakyReLU(alpha=0.3))(x)
# Latent space:
x = layers.Dropout(0.3)(x)
encoded = layers.Dense(self.latent_dim, activation=layers.LeakyReLU(alpha=0.3))(x)
# Decoder
x = layers.Dense(256, activation=layers.LeakyReLU(alpha=0.3))(encoded)
x = layers.Dropout(0.3)(x)
dim_row, dim_col = self.input_dim[0]/18, self.input_dim[1]/18
x = layers.Dense(int(dim_row * dim_col), activation=layers.LeakyReLU(alpha=0.3))(x)
x = layers.Reshape((int(dim_row), int(dim_col), 1))(x)
x = layers.Conv2DTranspose(128, 4, activation=layers.LeakyReLU(alpha=0.3), padding='same', strides=6)(x)
x = layers.Conv2DTranspose(32, 4, activation=layers.LeakyReLU(alpha=0.3), padding='same', strides=3)(x)
# Output
x = layers.Conv2DTranspose(3, 4, activation=layers.LeakyReLU(alpha=0.3), padding='same', strides=1)(x)
decoded = layers.Conv2D(self.out_channels, 3, activation='selu', padding='same', strides=1)(x)
# Model
self.encoder = keras.Model(input_img, encoded)
self.decoder = keras.Model(encoded, decoded)
self.autoencoder = keras.Model(input_img, self.decoder(self.encoder(input_img)))
print(self.autoencoder.summary())
# Batch normalization and simetric architecture of AutoEncoder
[docs] def batched_simetric_build(self):
"""
Simetric architecture with batch normalization. Input have to be divisible by 16 in both dimensions (latitude, longitude).
Parameters
----------
Returns
----------
"""
# Input
input_img = keras.Input(shape=(self.input_dim[0], self.input_dim[1], self.in_channels))
# Encoder
x = layers.Conv2D(64, 4, activation=layers.LeakyReLU(alpha=0.2), padding='same', strides=2)(input_img)
x = layers.BatchNormalization()(x)
x = layers.Conv2D(128, 4, activation=layers.LeakyReLU(alpha=0.2), padding='same', strides=2)(x)
x = layers.BatchNormalization()(x)
x = layers.Conv2D(256, 4, activation=layers.LeakyReLU(alpha=0.2), padding='same', strides=2)(x)
x = layers.BatchNormalization()(x)
x = layers.Conv2D(512, 4, activation=layers.LeakyReLU(alpha=0.2), padding='same', strides=2)(x)
x = layers.BatchNormalization()(x)
x = layers.Flatten()(x)
x = layers.Dense(128, activation=layers.LeakyReLU(alpha=0.3))(x)
# Latent space:
x = layers.Dropout(0.3)(x)
encoded = layers.Dense(self.latent_dim, activation=layers.LeakyReLU(alpha=0.3))(x)
# Decoder
x = layers.Dense(128, activation=layers.LeakyReLU(alpha=0.3))(encoded)
x = layers.Dropout(0.3)(x)
dim_row, dim_col = self.input_dim[0]/16, self.input_dim[1]/16
x = layers.Dense(int(dim_row * dim_col), activation=layers.LeakyReLU(alpha=0.3))(x)
x = layers.Reshape((int(dim_row), int(dim_col), 1))(x)
x = layers.Conv2DTranspose(512, 4, activation=layers.LeakyReLU(alpha=0.2), padding='same', strides=2)(x)
x = layers.BatchNormalization()(x)
x = layers.Conv2DTranspose(256, 4, activation=layers.LeakyReLU(alpha=0.2), padding='same', strides=2)(x)
x = layers.BatchNormalization()(x)
x = layers.Conv2DTranspose(128, 4, activation=layers.LeakyReLU(alpha=0.2), padding='same', strides=2)(x)
x = layers.BatchNormalization()(x)
x = layers.Conv2DTranspose(64, 4, activation=layers.LeakyReLU(alpha=0.2), padding='same', strides=2)(x)
x = layers.BatchNormalization()(x)
# Output
x = layers.Conv2DTranspose(3, 4, activation=layers.LeakyReLU(alpha=0.2), padding='same', strides=1)(x)
decoded = layers.Conv2D(self.out_channels, 3, activation='tanh', padding='same', strides=1)(x)
# Model
self.encoder = keras.Model(input_img, encoded)
self.decoder = keras.Model(encoded, decoded)
self.autoencoder = keras.Model(input_img, self.decoder(self.encoder(input_img)))
print(self.autoencoder.summary())
# Special architecture for the Heatwave with KL loss
[docs] def kl_heatwave_arch_build(self):
"""
Example of architecture for HW with KL-loss function. Input have to be divisible by (4,6) respectivelly in dimensions (latitude, longitude).
Parameters
----------
Returns
----------
"""
# Input
kl_factor = 0.02
if len(self.input_dim)==3:
kl_factor = self.input_dim[2]
input_img = keras.Input(shape=(self.input_dim[0], self.input_dim[1], self.in_channels))
# Encoder
x = layers.Conv2D(32, 3, activation=layers.LeakyReLU(alpha=0.3), padding='same', strides=1)(input_img)
x = layers.Conv2D(32, 3, activation=layers.LeakyReLU(alpha=0.3), padding='same', strides=1)(x)
x = layers.Conv2D(32, 3, activation=layers.LeakyReLU(alpha=0.3), padding='same', strides=(2,3))(x)
x = layers.Conv2D(64, 3, activation=layers.LeakyReLU(alpha=0.3), padding='same', strides=1)(x)
x = layers.Conv2D(64, 3, activation=layers.LeakyReLU(alpha=0.3), padding='same', strides=1)(x)
x = layers.Conv2D(64, 3, activation=layers.LeakyReLU(alpha=0.3), padding='same', strides=2)(x)
x = layers.Conv2D(128, 3, activation=layers.LeakyReLU(alpha=0.3), padding='same', strides=1)(x)
x = layers.Conv2D(128, 3, activation=layers.LeakyReLU(alpha=0.3), padding='same', strides=1)(x)
x = layers.Conv2D(16, 3, activation=layers.LeakyReLU(alpha=0.3), padding='same', strides=1)(x)
x = layers.Flatten()(x)
x = layers.Dense(256, activation=layers.LeakyReLU(alpha=0.3))(x)
x = layers.Dropout(0.3)(x)
# Latent space:
if self.VAE:
x = layers.Dense(self.latent_dim, activation=layers.LeakyReLU(alpha=0.3))(x)
z_mean = layers.Dense(self.latent_dim)(x)
z_log_var = layers.Dense(self.latent_dim)(x)
encoded = layers.Lambda(sampling, output_shape=(self.latent_dim,))([z_mean, z_log_var, self.latent_dim])
latent_inputs = layers.Input(shape=(self.latent_dim,), name='z_sampling')
x = layers.Dense(256, activation=layers.LeakyReLU(alpha=0.3))(latent_inputs)
else:
encoded = layers.Dense(self.latent_dim, activation=layers.LeakyReLU(alpha=0.3))(x)
x = layers.Dense(256, activation=layers.LeakyReLU(alpha=0.3))(encoded)
# Decoder
x = layers.Dropout(0.3)(x)
dim_row, dim_col = self.input_dim[0]/4, self.input_dim[1]/6
x = layers.Dense(int(dim_row * dim_col), activation=layers.LeakyReLU(alpha=0.3))(x)
x = layers.Reshape((int(dim_row), int(dim_col), 1))(x)
x = layers.Conv2DTranspose(128, 4, activation=layers.LeakyReLU(alpha=0.3), padding='same', strides=1)(x)
x = layers.Conv2DTranspose(64, 4, activation=layers.LeakyReLU(alpha=0.3), padding='same', strides=2)(x)
x = layers.Conv2DTranspose(32, 4, activation=layers.LeakyReLU(alpha=0.3), padding='same', strides=(2,3))(x)
# Output
x = layers.Conv2DTranspose(3, 4, activation=layers.LeakyReLU(alpha=0.3), padding='same', strides=1)(x)
decoded = layers.Conv2D(self.out_channels, 3, activation='selu', padding='same', strides=1)(x)
# Model
if self.VAE:
self.encoder = keras.Model(input_img, encoded, name='encoder')
self.decoder = keras.Model(latent_inputs, decoded)
outputs = self.decoder(self.encoder(input_img))#[2])
self.autoencoder = keras.Model(input_img, outputs)
else:
self.encoder = keras.Model(input_img, encoded)
self.decoder = keras.Model(encoded, decoded)
self.autoencoder = keras.Model(input_img, self.decoder(self.encoder(input_img)))
print(self.autoencoder.summary())
# KL loss
reconstruction_loss = tf.reduce_mean(tf.reduce_sum(keras.losses.mse(input_img, outputs), axis=(1, 2)))
kl_loss = -0.5*(1 + z_log_var - tf.square(z_mean) - tf.exp(z_log_var))
kl_loss = tf.reduce_mean(tf.reduce_sum(kl_loss, axis=-1))
vae_loss = reconstruction_loss + kl_factor*kl_loss
self.autoencoder.add_loss(vae_loss)
# Special architecture for the Heatwave
[docs] def heatwave_arch_build(self):
"""
Example of architecture for HW. Input have to be divisible by (4,6) respectivelly in dimensions (latitude, longitude).
Parameters
----------
Returns
----------
"""
# Input
input_img = keras.Input(shape=(self.input_dim[0], self.input_dim[1], self.in_channels))
# Encoder
x = layers.Conv2D(32, 3, activation=layers.LeakyReLU(alpha=0.3), padding='same', strides=1)(input_img)
x = layers.Conv2D(32, 3, activation=layers.LeakyReLU(alpha=0.3), padding='same', strides=1)(x)
x = layers.Conv2D(32, 3, activation=layers.LeakyReLU(alpha=0.3), padding='same', strides=(2,3))(x)
x = layers.Conv2D(64, 3, activation=layers.LeakyReLU(alpha=0.3), padding='same', strides=1)(x)
x = layers.Conv2D(64, 3, activation=layers.LeakyReLU(alpha=0.3), padding='same', strides=1)(x)
x = layers.Conv2D(64, 3, activation=layers.LeakyReLU(alpha=0.3), padding='same', strides=2)(x)
x = layers.Conv2D(128, 3, activation=layers.LeakyReLU(alpha=0.3), padding='same', strides=1)(x)
x = layers.Conv2D(128, 3, activation=layers.LeakyReLU(alpha=0.3), padding='same', strides=1)(x)
x = layers.Conv2D(16, 3, activation=layers.LeakyReLU(alpha=0.3), padding='same', strides=1)(x)
x = layers.Flatten()(x)
x = layers.Dense(256, activation=layers.LeakyReLU(alpha=0.3))(x)
x = layers.Dropout(0.3)(x)
# Latent space:
if self.VAE:
x = layers.Dense(self.latent_dim, activation=layers.LeakyReLU(alpha=0.3))(x)
z_mean = layers.Dense(self.latent_dim)(x)
z_log_var = layers.Dense(self.latent_dim)(x)
encoded = layers.Lambda(sampling, output_shape=(self.latent_dim,))([z_mean, z_log_var, self.latent_dim])
latent_inputs = layers.Input(shape=(self.latent_dim,), name='z_sampling')
x = layers.Dense(256, activation=layers.LeakyReLU(alpha=0.3))(latent_inputs)
else:
encoded = layers.Dense(self.latent_dim, activation=layers.LeakyReLU(alpha=0.3))(x)
x = layers.Dense(256, activation=layers.LeakyReLU(alpha=0.3))(encoded)
# Decoder
x = layers.Dropout(0.3)(x)
dim_row, dim_col = self.input_dim[0]/4, self.input_dim[1]/6
x = layers.Dense(int(dim_row * dim_col), activation=layers.LeakyReLU(alpha=0.3))(x)
x = layers.Reshape((int(dim_row), int(dim_col), 1))(x)
x = layers.Conv2DTranspose(128, 4, activation=layers.LeakyReLU(alpha=0.3), padding='same', strides=1)(x)
x = layers.Conv2DTranspose(64, 4, activation=layers.LeakyReLU(alpha=0.3), padding='same', strides=2)(x)
x = layers.Conv2DTranspose(32, 4, activation=layers.LeakyReLU(alpha=0.3), padding='same', strides=(2,3))(x)
# Output
x = layers.Conv2DTranspose(3, 4, activation=layers.LeakyReLU(alpha=0.3), padding='same', strides=1)(x)
decoded = layers.Conv2D(self.out_channels, 3, activation='selu', padding='same', strides=1)(x)
# Model
if self.VAE:
self.encoder = keras.Model(input_img, encoded, name='encoder')
self.decoder = keras.Model(latent_inputs, decoded)
outputs = self.decoder(self.encoder(input_img))#[2])
self.autoencoder = keras.Model(input_img, outputs)
else:
self.encoder = keras.Model(input_img, encoded)
self.decoder = keras.Model(encoded, decoded)
self.autoencoder = keras.Model(input_img, self.decoder(self.encoder(input_img)))
print(self.autoencoder.summary())
# Special architecture for the Heatwave with mask
[docs] def mask_heatwave_arch_build(self):
"""
Example of architecture for HW with masked input. Input have to be divisible by (4,6) respectivelly in dimensions (latitude, longitude).
Parameters
----------
Returns
----------
"""
# Input
input_img = keras.Input(shape=(self.input_dim[0], self.input_dim[1], self.in_channels))
# Mask
m = layers.Lambda(masking, output_shape = (len(self.mask)))([input_img, self.mask])
# Encoder
x = layers.Conv2D(32, 3, activation=layers.LeakyReLU(alpha=0.3), padding='same', strides=1)(m)
x = layers.Conv2D(32, 3, activation=layers.LeakyReLU(alpha=0.3), padding='same', strides=1)(x)
x = layers.Conv2D(32, 3, activation=layers.LeakyReLU(alpha=0.3), padding='same', strides=(2,3))(x)
x = layers.Conv2D(64, 3, activation=layers.LeakyReLU(alpha=0.3), padding='same', strides=1)(x)
x = layers.Conv2D(64, 3, activation=layers.LeakyReLU(alpha=0.3), padding='same', strides=1)(x)
x = layers.Conv2D(64, 3, activation=layers.LeakyReLU(alpha=0.3), padding='same', strides=2)(x)
x = layers.Conv2D(128, 3, activation=layers.LeakyReLU(alpha=0.3), padding='same', strides=1)(x)
x = layers.Conv2D(128, 3, activation=layers.LeakyReLU(alpha=0.3), padding='same', strides=1)(x)
x = layers.Conv2D(16, 3, activation=layers.LeakyReLU(alpha=0.3), padding='same', strides=1)(x)
x = layers.Flatten()(x)
x = layers.Dense(256, activation=layers.LeakyReLU(alpha=0.3))(x)
x = layers.Dropout(0.3)(x)
# Latent space:
if self.VAE:
x = layers.Dense(self.latent_dim, activation=layers.LeakyReLU(alpha=0.3))(x)
z_mean = layers.Dense(self.latent_dim)(x)
z_log_var = layers.Dense(self.latent_dim)(x)
encoded = layers.Lambda(sampling, output_shape=(self.latent_dim,))([z_mean, z_log_var, self.latent_dim])
latent_inputs = layers.Input(shape=(self.latent_dim,), name='z_sampling')
x = layers.Dense(256, activation=layers.LeakyReLU(alpha=0.3))(latent_inputs)
else:
encoded = layers.Dense(self.latent_dim, activation=layers.LeakyReLU(alpha=0.3))(x)
x = layers.Dense(256, activation=layers.LeakyReLU(alpha=0.3))(encoded)
# Decoder
x = layers.Dropout(0.3)(x)
dim_row, dim_col = self.input_dim[0]/4, self.input_dim[1]/6
x = layers.Dense(int(dim_row * dim_col), activation=layers.LeakyReLU(alpha=0.3))(x)
x = layers.Reshape((int(dim_row), int(dim_col), 1))(x)
x = layers.Conv2DTranspose(128, 4, activation=layers.LeakyReLU(alpha=0.3), padding='same', strides=1)(x)
x = layers.Conv2DTranspose(64, 4, activation=layers.LeakyReLU(alpha=0.3), padding='same', strides=2)(x)
x = layers.Conv2DTranspose(32, 4, activation=layers.LeakyReLU(alpha=0.3), padding='same', strides=(2,3))(x)
# Output
x = layers.Conv2DTranspose(3, 4, activation=layers.LeakyReLU(alpha=0.3), padding='same', strides=1)(x)
decoded = layers.Conv2D(self.out_channels, 3, activation='selu', padding='same', strides=1)(x)
# Model
if self.VAE:
self.encoder = keras.Model(input_img, encoded, name='encoder')
self.decoder = keras.Model(latent_inputs, decoded)
outputs = self.decoder(self.encoder(input_img))#[2])
self.autoencoder = keras.Model(input_img, outputs)
else:
self.encoder = keras.Model(input_img, encoded)
self.decoder = keras.Model(encoded, decoded)
self.autoencoder = keras.Model(input_img, self.decoder(self.encoder(input_img)))
print(self.autoencoder.summary())
# Simple special architecture for the Heatwave
[docs] def simple_heatwave_arch_build(self):
"""
Simplified architecture for HW. Input have to be divisible by (4,6) respectivelly in dimensions (latitude, longitude).
Parameters
----------
Returns
----------
"""
# Input
input_img = keras.Input(shape=(self.input_dim[0], self.input_dim[1], self.in_channels))
# Encoder
x = layers.Conv2D(32, 3, activation=layers.LeakyReLU(alpha=0.3), padding='same', strides=1)(input_img)
x = layers.Conv2D(32, 3, activation=layers.LeakyReLU(alpha=0.3), padding='same', strides=1)(x)
x = layers.Conv2D(32, 3, activation=layers.LeakyReLU(alpha=0.3), padding='same', strides=(2,3))(x)
x = layers.Conv2D(64, 3, activation=layers.LeakyReLU(alpha=0.3), padding='same', strides=1)(x)
x = layers.Conv2D(64, 3, activation=layers.LeakyReLU(alpha=0.3), padding='same', strides=1)(x)
x = layers.Conv2D(64, 3, activation=layers.LeakyReLU(alpha=0.3), padding='same', strides=2)(x)
#x = layers.Conv2D(128, 3, activation=layers.LeakyReLU(alpha=0.3), padding='same', strides=1)(x)
#x = layers.Conv2D(128, 3, activation=layers.LeakyReLU(alpha=0.3), padding='same', strides=1)(x)
#x = layers.Conv2D(16, 3, activation=layers.LeakyReLU(alpha=0.3), padding='same', strides=1)(x)
x = layers.Flatten()(x)
x = layers.Dense(256, activation=layers.LeakyReLU(alpha=0.3))(x)
# Latent space:
x = layers.Dropout(0.3)(x)
encoded = layers.Dense(self.latent_dim, activation=layers.LeakyReLU(alpha=0.3))(x)
# Decoder
x = layers.Dense(256, activation=layers.LeakyReLU(alpha=0.3))(encoded)
x = layers.Dropout(0.3)(x)
dim_row, dim_col = self.input_dim[0]/4, self.input_dim[1]/6
x = layers.Dense(int(dim_row * dim_col), activation=layers.LeakyReLU(alpha=0.3))(x)
x = layers.Reshape((int(dim_row), int(dim_col), 1))(x)
#x = layers.Conv2DTranspose(128, 4, activation=layers.LeakyReLU(alpha=0.3), padding='same', strides=1)(x)
x = layers.Conv2DTranspose(64, 4, activation=layers.LeakyReLU(alpha=0.3), padding='same', strides=2)(x)
x = layers.Conv2DTranspose(32, 4, activation=layers.LeakyReLU(alpha=0.3), padding='same', strides=(2,3))(x)
# Output
x = layers.Conv2DTranspose(3, 4, activation=layers.LeakyReLU(alpha=0.3), padding='same', strides=1)(x)
decoded = layers.Conv2D(self.out_channels, 3, activation='selu', padding='same', strides=1)(x)
# Model
self.encoder = keras.Model(input_img, encoded)
self.decoder = keras.Model(encoded, decoded)
self.autoencoder = keras.Model(input_img, self.decoder(self.encoder(input_img)))
print(self.autoencoder.summary())
[docs] def dense_build(self):
"""
Example of full-dense architecture.
Parameters
----------
Returns
----------
"""
# Input
input_img = keras.Input(shape=(self.input_dim[0], self.input_dim[1]))
# Encoder
x = layers.Flatten()(input_img)
x = layers.Dense(1024, activation='selu')(x)
x = layers.Dense(512, activation='selu')(x)
x = layers.Dense(128, activation='selu')(x)
x = layers.Dense(32, activation='selu')(x)
# Latent space
#x = layers.Dropout(0.3)(x)
encoded = layers.Dense(self.latent_dim, activation='selu')(x)
# Decoder
x = layers.Dense(32, activation='selu')(encoded)
x = layers.Dense(128, activation='selu')(x)
# Output
x = layers.Dense(self.input_dim[0] * self.input_dim[1], activation='sigmoid')(x)
decoded = layers.Reshape((self.input_dim[0], self.input_dim[1]))(x)
# Model
self.encoder = keras.Model(input_img, encoded)
self.decoder = keras.Model(encoded, decoded)
self.autoencoder = keras.Model(input_img, self.decoder(self.encoder(input_img)))
print(self.autoencoder.summary())
[docs] def ae_arch_build(self):
"""
Example of Autoencoder architecture for HW. Input have to be divisible by (4,6) respectivelly in dimensions (latitude, longitude).
Parameters
----------
Returns
----------
"""
# Input
# kl_factor = 1
# if len(self.input_dim)==3:
# kl_factor = self.input_dim[2]
input_img = keras.Input(shape=(self.input_dim[0], self.input_dim[1], self.in_channels))
# Encoder
x = layers.Conv2D(32, 3, activation=layers.LeakyReLU(alpha=0.3), padding='same', strides=1)(input_img)
x = layers.Conv2D(32, 3, activation=layers.LeakyReLU(alpha=0.3), padding='same', strides=1)(x)
x = layers.Conv2D(32, 3, activation=layers.LeakyReLU(alpha=0.3), padding='same', strides=(2,3))(x)
x = layers.Conv2D(16, 3, activation=layers.LeakyReLU(alpha=0.3), padding='same', strides=1)(x)
x = layers.Conv2D(16, 3, activation=layers.LeakyReLU(alpha=0.3), padding='same', strides=1)(x)
x = layers.Conv2D(16, 3, activation=layers.LeakyReLU(alpha=0.3), padding='same', strides=2)(x)
x = layers.Flatten()(x)
# Latent space:
if self.VAE:
x = layers.Dense(self.latent_dim, activation=layers.LeakyReLU(alpha=0.3))(x)
z_mean = layers.Dense(self.latent_dim)(x)
z_log_var = layers.Dense(self.latent_dim)(x)
encoded = layers.Lambda(sampling, output_shape=(self.latent_dim,))([z_mean, z_log_var, self.latent_dim])
latent_inputs = layers.Input(shape=(self.latent_dim,), name='z_sampling')
x = layers.Dense(self.latent_dim, activation=layers.LeakyReLU(alpha=0.3))(latent_inputs)
else:
encoded = layers.Dense(self.latent_dim, activation=layers.LeakyReLU(alpha=0.3))(x)
x = layers.Dense(self.latent_dim, activation=layers.LeakyReLU(alpha=0.3))(encoded)
# Decoder
dim_row, dim_col = self.input_dim[0]/4, self.input_dim[1]/6
#x = layers.Dense(int(dim_row * dim_col), activation=layers.LeakyReLU(alpha=0.3))(x)
x = layers.Reshape((int(dim_row), int(dim_col), 16))(x)
x = layers.Conv2DTranspose(16, 4, activation=layers.LeakyReLU(alpha=0.3), padding='same', strides=2)(x)
x = layers.Conv2DTranspose(32, 4, activation=layers.LeakyReLU(alpha=0.3), padding='same', strides=(2,3))(x)
# Output
#x = layers.Conv2DTranspose(3, 4, activation=layers.LeakyReLU(alpha=0.3), padding='same', strides=1)(x)
decoded = layers.Conv2D(self.out_channels, 3, activation='selu', padding='same', strides=1)(x)
# Model
if self.VAE:
self.encoder = keras.Model(input_img, encoded, name='encoder')
self.decoder = keras.Model(latent_inputs, decoded)
outputs = self.decoder(self.encoder(input_img))#[2])
self.autoencoder = keras.Model(input_img, outputs)
else:
self.encoder = keras.Model(input_img, encoded)
self.decoder = keras.Model(encoded, decoded)
self.autoencoder = keras.Model(input_img, self.decoder(self.encoder(input_img)))
print(self.autoencoder.summary())
# # KL loss
# reconstruction_loss = tf.reduce_mean(tf.reduce_sum(keras.losses.mse(input_img, outputs), axis=(1, 2)))
# kl_loss = -0.5*(1 + z_log_var - tf.square(z_mean) - tf.exp(z_log_var))
# kl_loss = tf.reduce_mean(tf.reduce_sum(kl_loss, axis=-1))
# vae_loss = reconstruction_loss + kl_factor*kl_loss
# self.autoencoder.add_loss(vae_loss)
[docs] def ae_relu_arch_build(self):
"""
Example of AE architecture for HW with relu activation function. Input have to be divisible by (4,6) respectivelly in dimensions (latitude, longitude).
Parameters
----------
Returns
----------
"""
# Input
input_img = keras.Input(shape=(self.input_dim[0], self.input_dim[1], self.in_channels))
# Encoder
x = layers.Conv2D(32, 3, activation=layers.ReLU(), padding='same', strides=1)(input_img)
x = layers.Conv2D(32, 3, activation=layers.ReLU(), padding='same', strides=1)(x)
x = layers.Conv2D(32, 3, activation=layers.ReLU(), padding='same', strides=(2,3))(x)
x = layers.Conv2D(16, 3, activation=layers.ReLU(), padding='same', strides=1)(x)
x = layers.Conv2D(16, 3, activation=layers.ReLU(), padding='same', strides=1)(x)
x = layers.Conv2D(16, 3, activation=layers.ReLU(), padding='same', strides=2)(x)
x = layers.Flatten()(x)
dim_row, dim_col = self.input_dim[0]/4, self.input_dim[1]/6
# Latent space:
if self.VAE:
x = layers.Dense(self.latent_dim, activation=layers.ReLU())(x)
z_mean = layers.Dense(self.latent_dim)(x)
z_log_var = layers.Dense(self.latent_dim)(x)
encoded = layers.Lambda(sampling, output_shape=(self.latent_dim,))([z_mean, z_log_var, self.latent_dim])
latent_inputs = layers.Input(shape=(self.latent_dim,), name='z_sampling')
x = layers.Dense(int(16 * dim_row * dim_col), activation=layers.ReLU())(latent_inputs)
else:
encoded = layers.Dense(self.latent_dim, activation=layers.ReLU())(x)
x = layers.Dense(int(16 * dim_row * dim_col), activation=layers.ReLU())(encoded)
# Decoder
#x = layers.Dense(int(dim_row * dim_col), activation=layers.ReLU())(x)
x = layers.Reshape((int(dim_row), int(dim_col), 16))(x)
x = layers.Conv2DTranspose(16, 4, activation=layers.ReLU(), padding='same', strides=2)(x)
x = layers.Conv2DTranspose(32, 4, activation=layers.ReLU(), padding='same', strides=(2,3))(x)
# Output
#x = layers.Conv2DTranspose(3, 4, activation=layers.ReLU(), padding='same', strides=1)(x)
decoded = layers.Conv2D(self.out_channels, 3, activation='selu', padding='same', strides=1)(x)
# Model
if self.VAE:
self.encoder = keras.Model(input_img, encoded, name='encoder')
self.decoder = keras.Model(latent_inputs, decoded)
outputs = self.decoder(self.encoder(input_img))#[2])
self.autoencoder = keras.Model(input_img, outputs)
else:
self.encoder = keras.Model(input_img, encoded)
self.decoder = keras.Model(encoded, decoded)
self.autoencoder = keras.Model(input_img, self.decoder(self.encoder(input_img)))
print(self.autoencoder.summary())
[docs] def mask_ae_relu_arch_build(self):
"""
Example of AE architecture for HW with relu and spatial mask. Input have to be divisible by (4,6) respectivelly in dimensions (latitude, longitude).
Parameters
----------
Returns
----------
"""
# Input
input_img = keras.Input(shape=(self.input_dim[0], self.input_dim[1], self.in_channels))
# Mask
x = layers.Lambda(masking2, output_shape=(self.input_dim[0], self.input_dim[1], self.in_channels))([input_img, (self.input_dim[0], self.input_dim[1], self.in_channels)])
# Encoder
x = layers.Conv2D(32, 3, activation=layers.ReLU(), padding='same', strides=1)(x)
x = layers.Conv2D(32, 3, activation=layers.ReLU(), padding='same', strides=1)(x)
x = layers.Conv2D(32, 3, activation=layers.ReLU(), padding='same', strides=(2,3))(x)
x = layers.Conv2D(16, 3, activation=layers.ReLU(), padding='same', strides=1)(x)
x = layers.Conv2D(16, 3, activation=layers.ReLU(), padding='same', strides=1)(x)
x = layers.Conv2D(16, 3, activation=layers.ReLU(), padding='same', strides=2)(x)
x = layers.Flatten()(x)
dim_row, dim_col = self.input_dim[0]/4, self.input_dim[1]/6
# Latent space:
if self.VAE:
x = layers.Dense(self.latent_dim, activation=layers.ReLU())(x)
z_mean = layers.Dense(self.latent_dim)(x)
z_log_var = layers.Dense(self.latent_dim)(x)
encoded = layers.Lambda(sampling, output_shape=(self.latent_dim,))([z_mean, z_log_var, self.latent_dim])
latent_inputs = layers.Input(shape=(self.latent_dim,), name='z_sampling')
x = layers.Dense(int(16 * dim_row * dim_col), activation=layers.ReLU())(latent_inputs)
else:
encoded = layers.Dense(self.latent_dim, activation=layers.ReLU())(x)
x = layers.Dense(int(16 * dim_row * dim_col), activation=layers.ReLU())(encoded)
# Decoder
#x = layers.Dense(int(dim_row * dim_col), activation=layers.ReLU())(x)
x = layers.Reshape((int(dim_row), int(dim_col), 16))(x)
x = layers.Conv2DTranspose(16, 4, activation=layers.ReLU(), padding='same', strides=2)(x)
x = layers.Conv2DTranspose(32, 4, activation=layers.ReLU(), padding='same', strides=(2,3))(x)
# Output
#x = layers.Conv2DTranspose(3, 4, activation=layers.ReLU(), padding='same', strides=1)(x)
decoded = layers.Conv2D(self.out_channels, 3, activation='selu', padding='same', strides=1)(x)
# Model
if self.VAE:
self.encoder = keras.Model(input_img, encoded, name='encoder')
self.decoder = keras.Model(latent_inputs, decoded)
outputs = self.decoder(self.encoder(input_img))#[2])
self.autoencoder = keras.Model(input_img, outputs)
else:
self.encoder = keras.Model(input_img, encoded)
self.decoder = keras.Model(encoded, decoded)
self.autoencoder = keras.Model(input_img, self.decoder(self.encoder(input_img)))
print(self.autoencoder.summary())
[docs] def very_simple_arch_build(self):
"""
Simplified architecture of AE with relu. Input have to be divisible by (4,6) respectivelly in dimensions (latitude, longitude).
Parameters
----------
Returns
----------
"""
# Input
input_img = keras.Input(shape=(self.input_dim[0], self.input_dim[1], self.in_channels))
# Encoder
x = layers.Conv2D(16, 3, activation=layers.ReLU(), padding='same', strides=(2,3))(input_img)
x = layers.Conv2D(8, 3, activation=layers.ReLU(), padding='same', strides=2)(x)
x = layers.Flatten()(x)
dim_row, dim_col = self.input_dim[0]/4, self.input_dim[1]/6
# Latent space:
if self.VAE:
x = layers.Dense(self.latent_dim, activation=layers.ReLU())(x)
z_mean = layers.Dense(self.latent_dim)(x)
z_log_var = layers.Dense(self.latent_dim)(x)
encoded = layers.Lambda(sampling, output_shape=(self.latent_dim,))([z_mean, z_log_var, self.latent_dim])
latent_inputs = layers.Input(shape=(self.latent_dim,), name='z_sampling')
x = layers.Dense(self.latent_dim, activation=layers.ReLU())(latent_inputs)
else:
encoded = layers.Dense(self.latent_dim, activation=layers.ReLU())(x)
x = layers.Dense(int(8 * dim_row * dim_col), activation=layers.ReLU())(encoded)
# Decoder
#x = layers.Dense(int(dim_row * dim_col), activation=layers.ReLU())(x)
x = layers.Reshape((int(dim_row), int(dim_col), 8))(x)
x = layers.Conv2DTranspose(8, 4, activation=layers.ReLU(), padding='same', strides=2)(x)
x = layers.Conv2DTranspose(16, 4, activation=layers.ReLU(), padding='same', strides=(2,3))(x)
# Output
#x = layers.Conv2DTranspose(3, 4, activation=layers.ReLU(), padding='same', strides=1)(x)
decoded = layers.Conv2D(self.out_channels, 3, activation='selu', padding='same', strides=1)(x)
# Model
if self.VAE:
self.encoder = keras.Model(input_img, encoded, name='encoder')
self.decoder = keras.Model(latent_inputs, decoded)
outputs = self.decoder(self.encoder(input_img))#[2])
self.autoencoder = keras.Model(input_img, outputs)
else:
self.encoder = keras.Model(input_img, encoded)
self.decoder = keras.Model(encoded, decoded)
self.autoencoder = keras.Model(input_img, self.decoder(self.encoder(input_img)))
print(self.autoencoder.summary())
[docs] def sparse_arch_build(self):
"""
Example of Sparse Autoencoder architecture. Input have to be divisible by (4,6) respectivelly in dimensions (latitude, longitude).
Parameters
----------
Returns
----------
"""
# Input
input_img = keras.Input(shape=(self.input_dim[0], self.input_dim[1], self.in_channels))
# Encoder
x = layers.Conv2D(32, 3, activation=layers.ReLU(), padding='same', strides=1)(input_img)
x = layers.Conv2D(32, 3, activation=layers.ReLU(), padding='same', strides=1)(x)
x = layers.Conv2D(32, 3, activation=layers.ReLU(), padding='same', strides=(2,3))(x)
x = layers.Conv2D(16, 3, activation=layers.ReLU(), padding='same', strides=1)(x)
x = layers.Conv2D(16, 3, activation=layers.ReLU(), padding='same', strides=1)(x)
x = layers.Conv2D(16, 3, activation=layers.ReLU(), padding='same', strides=2)(x)
x = layers.Flatten()(x)
dim_row, dim_col = self.input_dim[0]/4, self.input_dim[1]/6
# Latent space:
if self.VAE:
x = layers.Dense(self.latent_dim, activation=layers.ReLU())(x)
z_mean = layers.Dense(self.latent_dim)(x)
z_log_var = layers.Dense(self.latent_dim)(x)
encoded = layers.Lambda(sampling, output_shape=(self.latent_dim,))([z_mean, z_log_var, self.latent_dim])
latent_inputs = layers.Input(shape=(self.latent_dim,), name='z_sampling')
x = layers.Dense(self.latent_dim, activation=layers.ReLU())(latent_inputs)
else:
encoded = layers.Dense(self.latent_dim, activation=layers.ReLU())(x)
x = layers.Dense(int(16 * dim_row * dim_col), activity_regularizer=regularizers.l1(10e-5), activation=layers.ReLU())(encoded)
# Decoder
#x = layers.Dense(int(dim_row * dim_col), activation=layers.ReLU())(x)
x = layers.Reshape((int(dim_row), int(dim_col), 16))(x)
x = layers.Conv2DTranspose(16, 4, activation=layers.ReLU(), padding='same', strides=2)(x)
x = layers.Conv2DTranspose(32, 4, activation=layers.ReLU(), padding='same', strides=(2,3))(x)
# Output
#x = layers.Conv2DTranspose(3, 4, activation=layers.ReLU(), padding='same', strides=1)(x)
decoded = layers.Conv2D(self.out_channels, 3, activation='selu', padding='same', strides=1)(x)
# Model
if self.VAE:
self.encoder = keras.Model(input_img, encoded, name='encoder')
self.decoder = keras.Model(latent_inputs, decoded)
outputs = self.decoder(self.encoder(input_img))#[2])
self.autoencoder = keras.Model(input_img, outputs)
else:
self.encoder = keras.Model(input_img, encoded)
self.decoder = keras.Model(encoded, decoded)
self.autoencoder = keras.Model(input_img, self.decoder(self.encoder(input_img)))
print(self.autoencoder.summary())
[docs] def keras_vae_build(self):
"""
Keras example architecture. Input have to be divisible by (4,6) respectivelly in dimensions (latitude, longitude).
Parameters
----------
Returns
----------
"""
# Input
input_img = keras.Input(shape=(self.input_dim[0], self.input_dim[1], self.in_channels))
# Encoder
x = layers.Conv2D(32, 3, activation=layers.ReLU(), padding='same', strides=(2,3))(input_img)
x = layers.Conv2D(64, 3, activation=layers.ReLU(), padding='same', strides=2)(x)
x = layers.Flatten()(x)
dim_row, dim_col = self.input_dim[0]/4, self.input_dim[1]/6
# Latent space:
if self.VAE:
x = layers.Dense(self.latent_dim, activation=layers.ReLU())(x)
z_mean = layers.Dense(self.latent_dim)(x)
z_log_var = layers.Dense(self.latent_dim)(x)
encoded = layers.Lambda(sampling, output_shape=(self.latent_dim,))([z_mean, z_log_var, self.latent_dim])
latent_inputs = layers.Input(shape=(self.latent_dim,), name='z_sampling')
x = layers.Dense(int(16 * dim_row * dim_col), activation=layers.ReLU())(latent_inputs)
else:
encoded = layers.Dense(self.latent_dim, activation=layers.ReLU())(x)
x = layers.Dense(int(16 * dim_row * dim_col), activity_regularizer=regularizers.l1(10e-5), activation=layers.ReLU())(encoded)
# Decoder
#x = layers.Dense(int(dim_row * dim_col), activation=layers.ReLU())(x)
x = layers.Reshape((int(dim_row), int(dim_col), 16))(x)
x = layers.Conv2DTranspose(64, 4, activation=layers.ReLU(), padding='same', strides=2)(x)
x = layers.Conv2DTranspose(32, 4, activation=layers.ReLU(), padding='same', strides=(2,3))(x)
# Output
#x = layers.Conv2DTranspose(3, 4, activation=layers.ReLU(), padding='same', strides=1)(x)
decoded = layers.Conv2DTranspose(self.out_channels, 3, activation='selu', padding='same', strides=1)(x)
# Model
if self.VAE:
self.encoder = keras.Model(input_img, encoded, name='encoder')
self.decoder = keras.Model(latent_inputs, decoded)
outputs = self.decoder(self.encoder(input_img))#[2])
self.autoencoder = keras.Model(input_img, outputs)
else:
self.encoder = keras.Model(input_img, encoded)
self.decoder = keras.Model(encoded, decoded)
self.autoencoder = keras.Model(input_img, self.decoder(self.encoder(input_img)))
print(self.autoencoder.summary())
# KL loss
reconstruction_loss = tf.reduce_mean(tf.reduce_sum(keras.losses.mse(input_img, outputs), axis=(1, 2)))
kl_loss = -0.5*(1 + z_log_var - tf.square(z_mean) - tf.exp(z_log_var))
kl_loss = tf.reduce_mean(tf.reduce_sum(kl_loss, axis=-1))
vae_loss = reconstruction_loss + kl_loss
self.autoencoder.add_loss(vae_loss)
[docs] def vae_relu_arch_build(self):
"""
VAE architecture with relu activation function. Input have to be divisible by (4,6) respectivelly in dimensions (latitude, longitude).
Parameters
----------
Returns
----------
"""
# Input
input_img = keras.Input(shape=(self.input_dim[0], self.input_dim[1], self.in_channels))
# Encoder
x = layers.Conv2D(16, 3, activation=layers.ReLU(), padding='same', strides=1)(input_img)
x = layers.Conv2D(16, 3, activation=layers.ReLU(), padding='same', strides=1)(x)
x = layers.Conv2D(16, 3, activation=layers.ReLU(), padding='same', strides=(2,3))(x)
x = layers.Conv2D(32, 3, activation=layers.ReLU(), padding='same', strides=1)(x)
x = layers.Conv2D(32, 3, activation=layers.ReLU(), padding='same', strides=1)(x)
x = layers.Conv2D(32, 3, activation=layers.ReLU(), padding='same', strides=2)(x)
x = layers.Flatten()(x)
dim_row, dim_col = self.input_dim[0]/4, self.input_dim[1]/6
# Latent space:
if self.VAE:
x = layers.Dense(self.latent_dim, activation=layers.ReLU())(x)
z_mean = layers.Dense(self.latent_dim)(x)
z_log_var = layers.Dense(self.latent_dim)(x)
encoded = layers.Lambda(sampling, output_shape=(self.latent_dim,))([z_mean, z_log_var, self.latent_dim])
latent_inputs = layers.Input(shape=(self.latent_dim,), name='z_sampling')
x = layers.Dense(int(32 * dim_row * dim_col), activation=layers.ReLU())(latent_inputs)
else:
encoded = layers.Dense(self.latent_dim, activation=layers.ReLU())(x)
x = layers.Dense(int(32 * dim_row * dim_col), activation=layers.ReLU())(encoded)
# Decoder
#x = layers.Dense(int(dim_row * dim_col), activation=layers.ReLU())(x)
x = layers.Reshape((int(dim_row), int(dim_col), 32))(x)
x = layers.Conv2DTranspose(32, 4, activation=layers.ReLU(), padding='same', strides=2)(x)
x = layers.Conv2DTranspose(16, 4, activation=layers.ReLU(), padding='same', strides=(2,3))(x)
# Output
#x = layers.Conv2DTranspose(3, 4, activation=layers.ReLU(), padding='same', strides=1)(x)
decoded = layers.Conv2D(self.out_channels, 3, activation='selu', padding='same', strides=1)(x)
# Model
if self.VAE:
self.encoder = keras.Model(input_img, encoded, name='encoder')
self.decoder = keras.Model(latent_inputs, decoded)
outputs = self.decoder(self.encoder(input_img))#[2])
self.autoencoder = keras.Model(input_img, outputs)
else:
self.encoder = keras.Model(input_img, encoded)
self.decoder = keras.Model(encoded, decoded)
self.autoencoder = keras.Model(input_img, self.decoder(self.encoder(input_img)))
print(self.autoencoder.summary())
# KL loss
reconstruction_loss = tf.reduce_mean(tf.reduce_sum(keras.losses.mse(input_img, outputs), axis=(1, 2)))
kl_loss = -0.5*(1 + z_log_var - tf.square(z_mean) - tf.exp(z_log_var))
kl_loss = tf.reduce_mean(tf.reduce_sum(kl_loss, axis=-1))
vae_loss = reconstruction_loss + kl_loss
self.autoencoder.add_loss(vae_loss)
[docs] def ae_relu_arch_build2(self):
"""
Example of AE architecture with relu and BatchNormalization. Input have to be divisible by (4,6) respectivelly in dimensions (latitude, longitude).
Parameters
----------
Returns
----------
"""
# Input
input_img = keras.Input(shape=(self.input_dim[0], self.input_dim[1], self.in_channels))
# Encoder
x = layers.Conv2D(32, 3, activation=layers.ReLU(), padding='same', strides=1)(input_img)
x = layers.BatchNormalization()(x)
x = layers.Conv2D(32, 3, activation=layers.ReLU(), padding='same', strides=1)(x)
x = layers.BatchNormalization()(x)
x = layers.Conv2D(32, 3, activation=layers.ReLU(), padding='same', strides=(2,3))(x)
x = layers.BatchNormalization()(x)
x = layers.Conv2D(16, 3, activation=layers.ReLU(), padding='same', strides=1)(x)
x = layers.BatchNormalization()(x)
x = layers.Conv2D(16, 3, activation=layers.ReLU(), padding='same', strides=1)(x)
x = layers.BatchNormalization()(x)
x = layers.Conv2D(16, 3, activation=layers.ReLU(), padding='same', strides=2)(x)
x = layers.BatchNormalization()(x)
x = layers.Flatten()(x)
dim_row, dim_col = self.input_dim[0]/4, self.input_dim[1]/6
# Latent space:
if self.VAE:
x = layers.Dense(self.latent_dim, activation=layers.ReLU())(x)
z_mean = layers.Dense(self.latent_dim)(x)
z_log_var = layers.Dense(self.latent_dim)(x)
encoded = layers.Lambda(sampling, output_shape=(self.latent_dim,))([z_mean, z_log_var, self.latent_dim])
latent_inputs = layers.Input(shape=(self.latent_dim,), name='z_sampling')
x = layers.Dense(int(16 * dim_row * dim_col), activation=layers.ReLU())(latent_inputs)
else:
encoded = layers.Dense(self.latent_dim, activation=layers.ReLU())(x)
x = layers.Dense(int(16 * dim_row * dim_col), activation=layers.ReLU())(encoded)
# Decoder
#x = layers.Dense(int(dim_row * dim_col), activation=layers.ReLU())(x)
x = layers.Reshape((int(dim_row), int(dim_col), 16))(x)
x = layers.Conv2DTranspose(16, 4, activation=layers.ReLU(), padding='same', strides=2)(x)
x = layers.BatchNormalization()(x)
x = layers.Conv2DTranspose(32, 4, activation=layers.ReLU(), padding='same', strides=(2,3))(x)
x = layers.BatchNormalization()(x)
# Output
#x = layers.Conv2DTranspose(3, 4, activation=layers.ReLU(), padding='same', strides=1)(x)
decoded = layers.Conv2D(self.out_channels, 3, activation='selu', padding='same', strides=1)(x)
# Model
if self.VAE:
self.encoder = keras.Model(input_img, encoded, name='encoder')
self.decoder = keras.Model(latent_inputs, decoded)
outputs = self.decoder(self.encoder(input_img))#[2])
self.autoencoder = keras.Model(input_img, outputs)
else:
self.encoder = keras.Model(input_img, encoded)
self.decoder = keras.Model(encoded, decoded)
self.autoencoder = keras.Model(input_img, self.decoder(self.encoder(input_img)))
print(self.autoencoder.summary())
[docs] def cvae_arch_build(self):
"""
VAE architecture with relu activation function. Input have to be divisible by (4,6) respectivelly in dimensions (latitude, longitude).
Parameters
----------
Returns
----------
"""
# Input
kl_factor = 0.5
focus = 0.6
class_factor = 0.5
if len(self.input_dim)==5:
kl_factor = self.input_dim[2]
focus = self.input_dim[3]
class_factor = self.input_dim[4]
input_img = keras.Input(shape=(self.input_dim[0], self.input_dim[1], self.in_channels))
# Classifier
clf_inputs=keras.Input(shape=(self.latent_dim,))
c =layers.Dense(32, activation=keras.activations.relu,input_shape=[self.latent_dim])(clf_inputs)
c =layers.Dense(1, activation=keras.activations.sigmoid)(c)
# Encoder
x = layers.Conv2D(16, 3, activation=layers.ReLU(), padding='same', strides=1)(input_img)
x = layers.Conv2D(16, 3, activation=layers.ReLU(), padding='same', strides=1)(x)
x = layers.Conv2D(16, 3, activation=layers.ReLU(), padding='same', strides=(2,3))(x)
x = layers.Conv2D(32, 3, activation=layers.ReLU(), padding='same', strides=1)(x)
x = layers.Conv2D(32, 3, activation=layers.ReLU(), padding='same', strides=1)(x)
x = layers.Conv2D(32, 3, activation=layers.ReLU(), padding='same', strides=2)(x)
x = layers.Flatten()(x)
dim_row, dim_col = self.input_dim[0]/4, self.input_dim[1]/6
# Latent space:
if self.VAE:
x = layers.Dense(self.latent_dim, activation=layers.ReLU())(x)
z_mean = layers.Dense(self.latent_dim)(x)
z_log_var = layers.Dense(self.latent_dim)(x)
encoded = layers.Lambda(sampling, output_shape=(self.latent_dim,))([z_mean, z_log_var, self.latent_dim])
latent_inputs = layers.Input(shape=(self.latent_dim,), name='z_sampling')
x = layers.Dense(int(32 * dim_row * dim_col), activation=layers.ReLU())(latent_inputs)
else:
encoded = layers.Dense(self.latent_dim, activation=layers.ReLU())(x)
x = layers.Dense(int(32 * dim_row * dim_col), activation=layers.ReLU())(encoded)
# Decoder
#x = layers.Dense(int(dim_row * dim_col), activation=layers.ReLU())(x)
x = layers.Reshape((int(dim_row), int(dim_col), 32))(x)
x = layers.Conv2DTranspose(32, 4, activation=layers.ReLU(), padding='same', strides=2)(x)
x = layers.Conv2DTranspose(16, 4, activation=layers.ReLU(), padding='same', strides=(2,3))(x)
# Output
#x = layers.Conv2DTranspose(3, 4, activation=layers.ReLU(), padding='same', strides=1)(x)
decoded = layers.Conv2D(self.out_channels, 3, activation='selu', padding='same', strides=1)(x)
# Model
if self.VAE:
self.classifier = keras.Model(clf_inputs, c, name="classifier")
self.encoder = keras.Model(input_img, encoded, name='encoder')
self.decoder = keras.Model(latent_inputs, decoded)
outputs = self.decoder(self.encoder(input_img))#[2])
self.autoencoder = keras.Model(input_img, outputs)
else:
self.classifier = keras.Model(clf_inputs, c, name="classifier")
self.encoder = keras.Model(input_img, encoded)
self.decoder = keras.Model(encoded, decoded)
self.autoencoder = keras.Model(input_img, self.decoder(self.encoder(input_img)))
print(self.autoencoder.summary())
# Reconstruction loss
reconstruction_loss = tf.reduce_mean(tf.reduce_sum(keras.losses.mse(input_img, outputs), axis=(1, 2)))
# KL loss
kl_loss = -0.5*(1 + z_log_var - tf.square(z_mean) - tf.exp(z_log_var))
kl_loss = tf.reduce_mean(tf.reduce_sum(kl_loss, axis=-1))
# Classifier loss
y_preds = self.classifier(latent_inputs)
y_preds=y_preds[:, 0]
bce_loss_function= tf.keras.losses.BinaryFocalCrossentropy(gamma=focus, reduction=tf.keras.losses.Reduction.NONE)
cross_entropy_loss = tf.reduce_mean(
bce_loss_function(labels, y_preds)
)
vae_loss = reconstruction_loss + kl_factor*kl_loss + class_factor*cross_entropy_loss
self.autoencoder.add_loss(vae_loss)
[docs] def compile(self, **kwargs):
"""
Compilation of the AutoEncoder.
Here we specify the method to optimize, the loss function and,
if we want, some metrics to show together with the loss.
Parameters
----------
**kwargs: dict
all available parameters for fit method at the keras/tensorflow version. If not specified, the default paramaters are optimizer=Adam, loss=mse, metrics=[mae, mape].
Returns
----------
:
No output, only compile the model.
"""
default_compile_params = {"optimizer":"adam", "loss":"mse", "metrics":["mae", "mape"]}
if kwargs:
default_compile_params.update(kwargs)
if self.arch in [4, 14, 15]:
if "loss" in kwargs.keys():
kwargs.pop("loss")
self.autoencoder.compile(**default_compile_params)
else:
self.autoencoder.compile(**default_compile_params)
[docs] def fit(self, x, y, epochs=100, verbose=1, min_delta=3e-6, patience=10, restore_best_weights=True, mask=None, **kwargs):
"""
Training of the AutoEncoder.
Here we specify the parameters for our training.
Parameters
----------
x:
input data.
y:
output data (data to be compared and trained).
epochs: int
number of epochs to train the model.
verbose: 'auto', 0, 1 or 2.
differents modes of verbosity when the model is trained.
min_delta: int or float
minimum change in the monitored quantity to qualify as an improvement.
patience: int
number of epochs with no improvement adter wich training will be stoped.
restore_best_weigths: bool
whether to restore model weights from the epoch with the best value of the monitoredquantity
**kwargs: dict
all available parameters for fit method at the keras/tensorflow version, except verbose and epochs
Returns
----------
: History
A History object. See History.history.
"""
default_fit_params = {"batch_size":64, "validation_split":0.15}
if kwargs:
default_fit_params.update(kwargs)
callback = tf.keras.callbacks.EarlyStopping(monitor='val_loss', min_delta=min_delta, patience=patience,
restore_best_weights=restore_best_weights)
if mask is not None:
x[:,mask,0] = 0
if (x!=y).any():
y[:,mask,0] = 0
else:
y = x
history = self.autoencoder.fit(x, y,
epochs=epochs,
callbacks=callback,
verbose=verbose,
**default_fit_params
)
return history