keras 自动编码器训练(损失低,但重建图像全黑)

fkaflof6  于 2023-10-19  发布在  其他
关注(0)|答案(2)|浏览(148)

我在Keras中实现了一个(变分)自动编码器(VAE),遵循this guide
VAE从topomaps文件夹读取数据,并从labels文件夹读取标签。它们都包含.npy文件,which are essentially numpy arrays stored on disk
我将数据集分为80%的训练数据和20%的测试数据。具体而言:

  • x_train形状为(245760, 40, 40, 1)
  • y_train形状为(245760,)
  • x_test形状为(61440, 40, 40, 1)
  • y_test形状为(61440,)

不幸的是,即使(训练)lossval_loss非常低(分别为2.9246e-04-4.8249e-04),如果我直观地检查我的VAE的“重建技能”,我可以注意到它们很差,因为重建的图像与原始图像完全不相似:

我使用以下配置运行了演示:

  • latent_dim=25。此值不得更改
  • epochs=2
  • batch_size=512
  • optimizer=Adam()
  • learning_rate=0.001

我知道epochs非常小,但它只是一个演示。我的目标是扩大它,当我弄清楚为什么我的VAE不会工作,即使损失非常低。
这是运行输出:

Epoch 1/2
2023-08-30 11:35:49.408811: I tensorflow/core/grappler/optimizers/custom_graph_optimizer_registry.cc:114] Plugin optimizer for device_type GPU is enabled.
384/384 [==============================] - ETA: 0s - loss: 60.0042 - reconstruction_loss: 11.0072 - kl_loss: 0.09892023-08-30 11:38:40.538661: I tensorflow/core/grappler/optimizers/custom_graph_optimizer_registry.cc:114] Plugin optimizer for device_type GPU is enabled.
384/384 [==============================] - 190s 492ms/step - loss: 59.8772 - reconstruction_loss: 11.0072 - kl_loss: 0.0989 - val_loss: 5.5495e-04 - val_reconstruction_loss: 5.4875e-04 - val_kl_loss: 6.1989e-06
Epoch 2/2
384/384 [==============================] - 188s 490ms/step - loss: 3.0879e-04 - reconstruction_loss: 3.0472e-04 - kl_loss: 1.5222e-06 - val_loss: 3.4318e-04 - val_reconstruction_loss: 3.4303e-04 - val_kl_loss: 1.4901e-07
2023-08-30 11:42:17.049 Python[2419:58392] +[CATransaction synchronize] called within transaction
2023-08-30 11:42:26.493 Python[2419:58392] +[CATransaction synchronize] called within transaction
2023-08-30 11:42:37.200 Python[2419:58392] +[CATransaction synchronize] called within transaction
2023-08-30 11:42:41.800433: I tensorflow/core/grappler/optimizers/custom_graph_optimizer_registry.cc:114] Plugin optimizer for device_type GPU is enabled.
1920/1920 [==============================] - 29s 15ms/step
2023-08-30 11:43:16.276 Python[2419:58392] +[CATransaction synchronize] called within transaction

Process finished with exit code 0

学习曲线:

这是三个模型类:

class VAE(keras.Model):
    def __init__(self, encoder, decoder, **kwargs):
        super().__init__(**kwargs)
        self.encoder = encoder
        self.decoder = decoder
        self.total_loss_tracker = keras.metrics.Mean(name="total_loss")
        self.reconstruction_loss_tracker = keras.metrics.Mean(name="reconstruction_loss")
        self.kl_loss_tracker = keras.metrics.Mean(name="kl_loss")

    def call(self, inputs, training=None, mask=None):
        _, _, z = self.encoder(inputs)
        outputs = self.decoder(z)
        return outputs

    @property
    def metrics(self):
        return [
            self.total_loss_tracker,
            self.reconstruction_loss_tracker,
            self.kl_loss_tracker,
        ]

    def train_step(self, data):
        with tf.GradientTape() as tape:
            # Forward pass
            z_mean, z_log_var, z = self.encoder(data)
            reconstruction = self.decoder(z)

            # Compute losses
            reconstruction_loss = tf.reduce_mean(
                tf.reduce_sum(
                    keras.losses.binary_crossentropy(data, reconstruction), axis=(1, 2)
                )
            )
            kl_loss = -0.5 * (1 + z_log_var - tf.square(z_mean) - tf.exp(z_log_var))
            kl_loss = tf.reduce_mean(tf.reduce_sum(kl_loss, axis=1))
            total_loss = reconstruction_loss + kl_loss

        # Compute gradient
        grads = tape.gradient(total_loss, self.trainable_weights)

        # Update weights
        self.optimizer.apply_gradients(zip(grads, self.trainable_weights))

        # Update my own metrics
        self.total_loss_tracker.update_state(total_loss)
        self.reconstruction_loss_tracker.update_state(reconstruction_loss)
        self.kl_loss_tracker.update_state(kl_loss)

        return {
            "loss": self.total_loss_tracker.result(),
            "reconstruction_loss": self.reconstruction_loss_tracker.result(),
            "kl_loss": self.kl_loss_tracker.result(),
        }

    def test_step(self, data):
        # Forward pass
        z_mean, z_log_var, z = self.encoder(data)
        reconstruction = self.decoder(z)

        # Compute losses
        reconstruction_loss = tf.reduce_mean(
            tf.reduce_sum(
                keras.losses.binary_crossentropy(data, reconstruction), axis=(1, 2)
            )
        )
        kl_loss = -0.5 * (1 + z_log_var - tf.square(z_mean) - tf.exp(z_log_var))
        kl_loss = tf.reduce_mean(tf.reduce_sum(kl_loss, axis=1))
        total_loss = reconstruction_loss + kl_loss

        # Update my own metrics
        self.total_loss_tracker.update_state(total_loss)
        self.reconstruction_loss_tracker.update_state(reconstruction_loss)
        self.kl_loss_tracker.update_state(kl_loss)

        return {
            "loss": self.total_loss_tracker.result(),
            "reconstruction_loss": self.reconstruction_loss_tracker.result(),
            "kl_loss": self.kl_loss_tracker.result(),
        }

class Encoder(keras.Model):
    def __init__(self, latent_dimension, input_shape):
        super(Encoder, self).__init__()
        self.latent_dim = latent_dimension
        self.conv_block1 = keras.Sequential([
            layers.Input(shape=input_shape),
            layers.Conv2D(filters=64, kernel_size=3, activation="relu", strides=2, padding="same"),
            layers.BatchNormalization()
        ])
        self.conv_block2 = keras.Sequential([
            layers.Conv2D(filters=128, kernel_size=3, activation="relu", strides=2, padding="same"),
            layers.BatchNormalization()
        ])
        self.conv_block3 = keras.Sequential([
            layers.Conv2D(filters=256, kernel_size=3, activation="relu", strides=2, padding="same"),
            layers.BatchNormalization()
        ])
        self.flatten = layers.Flatten()
        self.dense = layers.Dense(units=100, activation="relu")
        self.z_mean = layers.Dense(latent_dimension, name="z_mean")
        self.z_log_var = layers.Dense(latent_dimension, name="z_log_var")
        self.sampling = sample

    def call(self, inputs, training=None, mask=None):
        x = self.conv_block1(inputs)
        x = self.conv_block2(x)
        x = self.conv_block3(x)
        x = self.flatten(x)
        x = self.dense(x)
        z_mean = self.z_mean(x)
        z_log_var = self.z_log_var(x)
        z = self.sampling(z_mean, z_log_var)
        return z_mean, z_log_var, z

class Decoder(keras.Model):
    def __init__(self, latent_dimension):
        super(Decoder, self).__init__()
        self.latent_dim = latent_dimension
        self.dense1 = keras.Sequential([
            layers.Dense(units=100, activation="relu"),
            layers.BatchNormalization()
        ])
        self.dense2 = keras.Sequential([
            layers.Dense(units=1024, activation="relu"),
            layers.BatchNormalization()
        ])
        self.dense3 = keras.Sequential([
            layers.Dense(units=4096, activation="relu"),
            layers.BatchNormalization()
        ])
        self.reshape = layers.Reshape((4, 4, 256))
        self.deconv1 = keras.Sequential([
            layers.Conv2DTranspose(filters=256, kernel_size=3, activation="relu", strides=2, padding="same"),
            layers.BatchNormalization()
        ])
        self.deconv2 = keras.Sequential([
            layers.Conv2DTranspose(filters=128, kernel_size=3, activation="relu", strides=1, padding="same"),
            layers.BatchNormalization()
        ])
        self.deconv3 = keras.Sequential([
            layers.Conv2DTranspose(filters=128, kernel_size=3, activation="relu", strides=2, padding="valid"),
            layers.BatchNormalization()
        ])
        self.deconv4 = keras.Sequential([
            layers.Conv2DTranspose(filters=64, kernel_size=3, activation="relu", strides=1, padding="valid"),
            layers.BatchNormalization()
        ])
        self.deconv5 = keras.Sequential([
            layers.Conv2DTranspose(filters=64, kernel_size=3, activation="relu", strides=2, padding="valid"),
            layers.BatchNormalization()
        ])
        self.deconv6 = layers.Conv2DTranspose(filters=1, kernel_size=2, activation="sigmoid", padding="valid")

    def call(self, inputs, training=None, mask=None):
        x = self.dense1(inputs)
        x = self.dense2(x)
        x = self.dense3(x)
        x = self.reshape(x)
        x = self.deconv1(x)
        x = self.deconv2(x)
        x = self.deconv3(x)
        x = self.deconv4(x)
        x = self.deconv5(x)
        decoder_outputs = self.deconv6(x)
        return decoder_outputs

请注意,VAE的实现属于我已经发布的指南。
这是主要功能:

if __name__ == '__main__':
    # Load data
    x_train, x_test, y_train, y_test = load_data("topomaps", "labels", 0.2)

    # Expand dimensions to (None, 40, 40, 1)
    x_train = np.expand_dims(x_train, -1)
    x_test = np.expand_dims(x_test, -1)

    # Print data shapes
    print("x_train shape:", x_train.shape)
    print("y_train shape:", y_train.shape)
    print("x_test shape:", x_test.shape)
    print("y_test shape:", y_test.shape)

    # Normalize the data
    x_train = x_train.astype("float32") / 255.0
    x_test = x_test.astype("float32") / 255.0

    # Compiling the VAE
    latent_dimension = 25  # Do not change
    encoder = Encoder(latent_dimension, (40, 40, 1))
    decoder = Decoder(latent_dimension)
    vae = VAE(encoder, decoder)
    vae.compile(Adam(learning_rate=0.001))

    # Training
    x_train, x_val, y_train, y_val = train_test_split(x_train, y_train, test_size=0.2)
    print("x_val shape:", x_val.shape)
    print("y_val shape:", y_val.shape)
    epochs = 2
    batch_size = 512
    history = vae.fit(x_train, epochs=epochs, batch_size=batch_size, validation_data=(x_val,))

    # Plot learning curves
    plot_metric(history, "loss")

    # Check reconstruction skills against a random test sample
    image_index = 5
    plt.title(f"Original image {image_index}")
    original_image = x_test[image_index]
    plt.imshow(original_image, cmap="gray")
    plt.show()

    plt.title(f"Reconstructed image {image_index}, latent_dim = {latent_dimension}, epochs = {epochs}, "
              f"batch_size = {batch_size}")
    x_test_reconstructed = vae.predict(x_test)
    reconstructed_image = x_test_reconstructed[image_index]
    plt.imshow(reconstructed_image, cmap="gray")
    plt.show()

以下是我使用的一些函数:

def load_data(topomaps_folder: str, labels_folder: str, test_size):
    x, y = _create_dataset(topomaps_folder, labels_folder)
    x_train, x_test, y_train, y_test = train_test_split(x, y, test_size=test_size)
    return x_train, x_test, y_train, y_test

def _create_dataset(topomaps_folder, labels_folder):
    topomaps_files = os.listdir(topomaps_folder)
    labels_files = os.listdir(labels_folder)
    topomaps_files.sort()
    labels_files.sort()
    x = []
    y = []
    n_files = len(topomaps_files)
    for topomaps_file, labels_file in tqdm(zip(topomaps_files, labels_files), total=n_files, desc="Loading data set"):
        topomaps_array = np.load(f"{topomaps_folder}/{topomaps_file}")
        labels_array = np.load(f"{labels_folder}/{labels_file}")
        if topomaps_array.shape[0] != labels_array.shape[0]:
            raise Exception("Shapes must be equal")
        for i in range(topomaps_array.shape[0]):
            x.append(topomaps_array[i])
            y.append(labels_array[i])
    x = np.array(x)
    y = np.array(y)
    return x, y

def sample(z_mean, z_log_var):
    batch = tf.shape(z_mean)[0]
    dim = tf.shape(z_mean)[1]
    epsilon = tf.random.normal(shape=(batch, dim))
    stddev = tf.exp(0.5 * z_log_var)
    return z_mean + stddev * epsilon

def plot_metric(history, metric):
    plt.plot(history.history[metric])
    plt.plot(history.history['val_' + metric])
    plt.title(metric)
    plt.ylabel(metric)
    plt.xlabel('epoch')
    plt.legend(['train', 'validation'])
    plt.show()

编辑
**-你能用低得多的训练集来看看它是否过拟合并重建一个已知的图像吗?**好的,这是我在main函数中所做的

# Reduce DS size
x_train = x_train[:500]
y_train = y_train[:500]
x_test = x_test[:500]
y_test = y_test[:500]

这就是我得到的:

学习曲线是:

如果我运行相同的配置,但设置epochs=100,我得到:

损失:

**-不确定你是否做到了......但你在绘图时是否将输出转换回非标准化值?**这就是我所做的:

plt.title(f"Reconstructed image {image_index}, latent_dim = {latent_dimension}, epochs = {epochs}, "
          f"batch_size = {batch_size}")
x_test_reconstructed = vae.predict(x_test)
reconstructed_image = x_test_reconstructed[image_index]
reconstructed_image = reconstructed_image * 255
plt.imshow(reconstructed_image, cmap="gray")
plt.show()

但我仍然得到:

**-当批量大小为4,学习率为0.0002,100个epoch时,损失图是什么样子?**重建图像全黑,损失曲线为:

iibxawm4

iibxawm41#

我运行我的实现没有问题。我在VAE教程的“副本”上运行,并在1个epoch后得到以下结果:

import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import cv2
import matplotlib.pyplot as plt

class Sampling(layers.Layer):
    def call(self, inputs):
        z_mean, z_log_var = inputs
        batch = tf.shape(z_mean)[0]
        dim = tf.shape(z_mean)[1]
        epsilon = tf.random.normal(shape=(batch, dim))
        return z_mean + tf.exp(0.5 * z_log_var) * epsilon

latent_dim = 25

encoder_inputs = keras.Input(shape=(40, 40, 1))
x = layers.Conv2D(32, 3, activation="relu", strides=2, padding="same")(encoder_inputs)
x = layers.Conv2D(64, 3, activation="relu", strides=2, padding="same")(x)
x = layers.Flatten()(x)
x = layers.Dense(16, activation="relu")(x)
z_mean = layers.Dense(latent_dim, name="z_mean")(x)
z_log_var = layers.Dense(latent_dim, name="z_log_var")(x)
z = Sampling()([z_mean, z_log_var])
encoder = keras.Model(encoder_inputs, [z_mean, z_log_var, z], name="encoder")
encoder.summary()


latent_inputs = keras.Input(shape=(latent_dim,))
x = layers.Dense(6400, activation="relu")(latent_inputs)
x = layers.Reshape((10,10,64))(x)
x = layers.Conv2DTranspose(64, 3, activation="relu", strides=2, padding="same")(x)
x = layers.Conv2DTranspose(32, 3, activation="relu", strides=2, padding="same")(x)
decoder_outputs = layers.Conv2DTranspose(1, 3, activation="sigmoid", padding="same")(x)
decoder = keras.Model(latent_inputs, decoder_outputs, name="decoder")
decoder.summary()

class VAE(keras.Model):
    def __init__(self, encoder, decoder, **kwargs):
        super().__init__(**kwargs)
        self.encoder = encoder
        self.decoder = decoder
        self.total_loss_tracker = keras.metrics.Mean(name="total_loss")
        self.reconstruction_loss_tracker = keras.metrics.Mean(
            name="reconstruction_loss"
        )
        self.kl_loss_tracker = keras.metrics.Mean(name="kl_loss")

    @property
    def metrics(self):
        return [
            self.total_loss_tracker,
            self.reconstruction_loss_tracker,
            self.kl_loss_tracker,
        ]

    def train_step(self, data):
        with tf.GradientTape() as tape:
            z_mean, z_log_var, z = self.encoder(data)
            reconstruction = self.decoder(z)
            reconstruction_loss = tf.reduce_mean(
                tf.reduce_sum(
                    keras.losses.binary_crossentropy(data, reconstruction), axis=(1, 2)
                )
            )
            kl_loss = -0.5 * (1 + z_log_var - tf.square(z_mean) - tf.exp(z_log_var))
            kl_loss = tf.reduce_mean(tf.reduce_sum(kl_loss, axis=1))
            total_loss = reconstruction_loss + kl_loss
        grads = tape.gradient(total_loss, self.trainable_weights)
        self.optimizer.apply_gradients(zip(grads, self.trainable_weights))
        self.total_loss_tracker.update_state(total_loss)
        self.reconstruction_loss_tracker.update_state(reconstruction_loss)
        self.kl_loss_tracker.update_state(kl_loss)
        return {
            "loss": self.total_loss_tracker.result(),
            "reconstruction_loss": self.reconstruction_loss_tracker.result(),
            "kl_loss": self.kl_loss_tracker.result(),
        }

def create_data():
    x_train = []
    data = cv2.imread("IMAGE",0) #used a screen clipping since I do not have your data source
    data = cv2.resize(data, (40,40), interpolation=cv2.INTER_AREA) # resize it to the same 40x40 limit 
    gaussian = np.random.normal(0, 0.2, (data.shape[0],data.shape[1])) #add noise for realism
    
    x_train = np.array([data]*1000) #duplicated with random noise 1000x
    x_train[:] = x_train[:]+gaussian
    x_test = x_train
    return x_train, x_test, data
    
x_train,x_test , data = create_data()
x = x_train
x = np.expand_dims(x,-1).astype("float32") / 255


vae = VAE(encoder, decoder)
vae.compile(optimizer=keras.optimizers.Adam())
vae.fit(x, epochs=1, batch_size=8)
data_input = np.expand_dims(data,-1).astype("float32")/255
print(data_input.shape)
latent = vae.encoder.predict(x)
print(latent)
output = vae.decoder.predict(latent[0])
digit = output[0]
plt.figure("Source")
plt.imshow(data,cmap="gray")
plt.figure("output")
plt.imshow(digit,cmap='gray')
plt.show()

我没有你的数据,所以我不能检查你的数据加载器是否有任何错误。你可能想检查你的自动编码器和解码器上的调用函数,但是按照教程代码,我没有发现你的问题。
我建议对您的数据进行全面的调试:
1.检查数据输入、结构、形状等。(健康测试是好的)
1.尝试删除测试验证。
1.确保模型适用于已知数据集,即Mnist。尝试在MNIST数据集上调用您的VAE,如果它不起作用,那么这里有问题。
除非我有一些输入数据的样本,否则真的没有办法正确地调试代码。我高度怀疑数据输入,因为模型构造代码看起来很好。

qgelzfjb

qgelzfjb2#

这个问题涉及到数据规范化。

x_train = (x_train - np.min(x_train)) / (np.max(x_train) - np.min(x_train))

x_test = (x_test - np.min(x_test)) / (np.max(x_test) - np.min(x_test))

应该能解决问题

相关问题