我在Keras中实现了一个(变分)自动编码器(VAE),遵循this guide。
VAE从topomaps
文件夹读取数据,并从labels
文件夹读取标签。它们都包含.npy
文件,which are essentially numpy arrays stored on disk。
我将数据集分为80%的训练数据和20%的测试数据。具体而言:
x_train
形状为(245760, 40, 40, 1)
y_train
形状为(245760,)
x_test
形状为(61440, 40, 40, 1)
y_test
形状为(61440,)
不幸的是,即使(训练)loss
和val_loss
非常低(分别为2.9246e-04
和-4.8249e-04
),如果我直观地检查我的VAE的“重建技能”,我可以注意到它们很差,因为重建的图像与原始图像完全不相似:
我使用以下配置运行了演示:
latent_dim=25
。此值不得更改epochs=2
batch_size=512
optimizer=Adam()
learning_rate=0.001
我知道epochs
非常小,但它只是一个演示。我的目标是扩大它,当我弄清楚为什么我的VAE不会工作,即使损失非常低。
这是运行输出:
Epoch 1/2
2023-08-30 11:35:49.408811: I tensorflow/core/grappler/optimizers/custom_graph_optimizer_registry.cc:114] Plugin optimizer for device_type GPU is enabled.
384/384 [==============================] - ETA: 0s - loss: 60.0042 - reconstruction_loss: 11.0072 - kl_loss: 0.09892023-08-30 11:38:40.538661: I tensorflow/core/grappler/optimizers/custom_graph_optimizer_registry.cc:114] Plugin optimizer for device_type GPU is enabled.
384/384 [==============================] - 190s 492ms/step - loss: 59.8772 - reconstruction_loss: 11.0072 - kl_loss: 0.0989 - val_loss: 5.5495e-04 - val_reconstruction_loss: 5.4875e-04 - val_kl_loss: 6.1989e-06
Epoch 2/2
384/384 [==============================] - 188s 490ms/step - loss: 3.0879e-04 - reconstruction_loss: 3.0472e-04 - kl_loss: 1.5222e-06 - val_loss: 3.4318e-04 - val_reconstruction_loss: 3.4303e-04 - val_kl_loss: 1.4901e-07
2023-08-30 11:42:17.049 Python[2419:58392] +[CATransaction synchronize] called within transaction
2023-08-30 11:42:26.493 Python[2419:58392] +[CATransaction synchronize] called within transaction
2023-08-30 11:42:37.200 Python[2419:58392] +[CATransaction synchronize] called within transaction
2023-08-30 11:42:41.800433: I tensorflow/core/grappler/optimizers/custom_graph_optimizer_registry.cc:114] Plugin optimizer for device_type GPU is enabled.
1920/1920 [==============================] - 29s 15ms/step
2023-08-30 11:43:16.276 Python[2419:58392] +[CATransaction synchronize] called within transaction
Process finished with exit code 0
学习曲线:
这是三个模型类:
class VAE(keras.Model):
def __init__(self, encoder, decoder, **kwargs):
super().__init__(**kwargs)
self.encoder = encoder
self.decoder = decoder
self.total_loss_tracker = keras.metrics.Mean(name="total_loss")
self.reconstruction_loss_tracker = keras.metrics.Mean(name="reconstruction_loss")
self.kl_loss_tracker = keras.metrics.Mean(name="kl_loss")
def call(self, inputs, training=None, mask=None):
_, _, z = self.encoder(inputs)
outputs = self.decoder(z)
return outputs
@property
def metrics(self):
return [
self.total_loss_tracker,
self.reconstruction_loss_tracker,
self.kl_loss_tracker,
]
def train_step(self, data):
with tf.GradientTape() as tape:
# Forward pass
z_mean, z_log_var, z = self.encoder(data)
reconstruction = self.decoder(z)
# Compute losses
reconstruction_loss = tf.reduce_mean(
tf.reduce_sum(
keras.losses.binary_crossentropy(data, reconstruction), axis=(1, 2)
)
)
kl_loss = -0.5 * (1 + z_log_var - tf.square(z_mean) - tf.exp(z_log_var))
kl_loss = tf.reduce_mean(tf.reduce_sum(kl_loss, axis=1))
total_loss = reconstruction_loss + kl_loss
# Compute gradient
grads = tape.gradient(total_loss, self.trainable_weights)
# Update weights
self.optimizer.apply_gradients(zip(grads, self.trainable_weights))
# Update my own metrics
self.total_loss_tracker.update_state(total_loss)
self.reconstruction_loss_tracker.update_state(reconstruction_loss)
self.kl_loss_tracker.update_state(kl_loss)
return {
"loss": self.total_loss_tracker.result(),
"reconstruction_loss": self.reconstruction_loss_tracker.result(),
"kl_loss": self.kl_loss_tracker.result(),
}
def test_step(self, data):
# Forward pass
z_mean, z_log_var, z = self.encoder(data)
reconstruction = self.decoder(z)
# Compute losses
reconstruction_loss = tf.reduce_mean(
tf.reduce_sum(
keras.losses.binary_crossentropy(data, reconstruction), axis=(1, 2)
)
)
kl_loss = -0.5 * (1 + z_log_var - tf.square(z_mean) - tf.exp(z_log_var))
kl_loss = tf.reduce_mean(tf.reduce_sum(kl_loss, axis=1))
total_loss = reconstruction_loss + kl_loss
# Update my own metrics
self.total_loss_tracker.update_state(total_loss)
self.reconstruction_loss_tracker.update_state(reconstruction_loss)
self.kl_loss_tracker.update_state(kl_loss)
return {
"loss": self.total_loss_tracker.result(),
"reconstruction_loss": self.reconstruction_loss_tracker.result(),
"kl_loss": self.kl_loss_tracker.result(),
}
class Encoder(keras.Model):
def __init__(self, latent_dimension, input_shape):
super(Encoder, self).__init__()
self.latent_dim = latent_dimension
self.conv_block1 = keras.Sequential([
layers.Input(shape=input_shape),
layers.Conv2D(filters=64, kernel_size=3, activation="relu", strides=2, padding="same"),
layers.BatchNormalization()
])
self.conv_block2 = keras.Sequential([
layers.Conv2D(filters=128, kernel_size=3, activation="relu", strides=2, padding="same"),
layers.BatchNormalization()
])
self.conv_block3 = keras.Sequential([
layers.Conv2D(filters=256, kernel_size=3, activation="relu", strides=2, padding="same"),
layers.BatchNormalization()
])
self.flatten = layers.Flatten()
self.dense = layers.Dense(units=100, activation="relu")
self.z_mean = layers.Dense(latent_dimension, name="z_mean")
self.z_log_var = layers.Dense(latent_dimension, name="z_log_var")
self.sampling = sample
def call(self, inputs, training=None, mask=None):
x = self.conv_block1(inputs)
x = self.conv_block2(x)
x = self.conv_block3(x)
x = self.flatten(x)
x = self.dense(x)
z_mean = self.z_mean(x)
z_log_var = self.z_log_var(x)
z = self.sampling(z_mean, z_log_var)
return z_mean, z_log_var, z
class Decoder(keras.Model):
def __init__(self, latent_dimension):
super(Decoder, self).__init__()
self.latent_dim = latent_dimension
self.dense1 = keras.Sequential([
layers.Dense(units=100, activation="relu"),
layers.BatchNormalization()
])
self.dense2 = keras.Sequential([
layers.Dense(units=1024, activation="relu"),
layers.BatchNormalization()
])
self.dense3 = keras.Sequential([
layers.Dense(units=4096, activation="relu"),
layers.BatchNormalization()
])
self.reshape = layers.Reshape((4, 4, 256))
self.deconv1 = keras.Sequential([
layers.Conv2DTranspose(filters=256, kernel_size=3, activation="relu", strides=2, padding="same"),
layers.BatchNormalization()
])
self.deconv2 = keras.Sequential([
layers.Conv2DTranspose(filters=128, kernel_size=3, activation="relu", strides=1, padding="same"),
layers.BatchNormalization()
])
self.deconv3 = keras.Sequential([
layers.Conv2DTranspose(filters=128, kernel_size=3, activation="relu", strides=2, padding="valid"),
layers.BatchNormalization()
])
self.deconv4 = keras.Sequential([
layers.Conv2DTranspose(filters=64, kernel_size=3, activation="relu", strides=1, padding="valid"),
layers.BatchNormalization()
])
self.deconv5 = keras.Sequential([
layers.Conv2DTranspose(filters=64, kernel_size=3, activation="relu", strides=2, padding="valid"),
layers.BatchNormalization()
])
self.deconv6 = layers.Conv2DTranspose(filters=1, kernel_size=2, activation="sigmoid", padding="valid")
def call(self, inputs, training=None, mask=None):
x = self.dense1(inputs)
x = self.dense2(x)
x = self.dense3(x)
x = self.reshape(x)
x = self.deconv1(x)
x = self.deconv2(x)
x = self.deconv3(x)
x = self.deconv4(x)
x = self.deconv5(x)
decoder_outputs = self.deconv6(x)
return decoder_outputs
请注意,VAE的实现属于我已经发布的指南。
这是主要功能:
if __name__ == '__main__':
# Load data
x_train, x_test, y_train, y_test = load_data("topomaps", "labels", 0.2)
# Expand dimensions to (None, 40, 40, 1)
x_train = np.expand_dims(x_train, -1)
x_test = np.expand_dims(x_test, -1)
# Print data shapes
print("x_train shape:", x_train.shape)
print("y_train shape:", y_train.shape)
print("x_test shape:", x_test.shape)
print("y_test shape:", y_test.shape)
# Normalize the data
x_train = x_train.astype("float32") / 255.0
x_test = x_test.astype("float32") / 255.0
# Compiling the VAE
latent_dimension = 25 # Do not change
encoder = Encoder(latent_dimension, (40, 40, 1))
decoder = Decoder(latent_dimension)
vae = VAE(encoder, decoder)
vae.compile(Adam(learning_rate=0.001))
# Training
x_train, x_val, y_train, y_val = train_test_split(x_train, y_train, test_size=0.2)
print("x_val shape:", x_val.shape)
print("y_val shape:", y_val.shape)
epochs = 2
batch_size = 512
history = vae.fit(x_train, epochs=epochs, batch_size=batch_size, validation_data=(x_val,))
# Plot learning curves
plot_metric(history, "loss")
# Check reconstruction skills against a random test sample
image_index = 5
plt.title(f"Original image {image_index}")
original_image = x_test[image_index]
plt.imshow(original_image, cmap="gray")
plt.show()
plt.title(f"Reconstructed image {image_index}, latent_dim = {latent_dimension}, epochs = {epochs}, "
f"batch_size = {batch_size}")
x_test_reconstructed = vae.predict(x_test)
reconstructed_image = x_test_reconstructed[image_index]
plt.imshow(reconstructed_image, cmap="gray")
plt.show()
以下是我使用的一些函数:
def load_data(topomaps_folder: str, labels_folder: str, test_size):
x, y = _create_dataset(topomaps_folder, labels_folder)
x_train, x_test, y_train, y_test = train_test_split(x, y, test_size=test_size)
return x_train, x_test, y_train, y_test
def _create_dataset(topomaps_folder, labels_folder):
topomaps_files = os.listdir(topomaps_folder)
labels_files = os.listdir(labels_folder)
topomaps_files.sort()
labels_files.sort()
x = []
y = []
n_files = len(topomaps_files)
for topomaps_file, labels_file in tqdm(zip(topomaps_files, labels_files), total=n_files, desc="Loading data set"):
topomaps_array = np.load(f"{topomaps_folder}/{topomaps_file}")
labels_array = np.load(f"{labels_folder}/{labels_file}")
if topomaps_array.shape[0] != labels_array.shape[0]:
raise Exception("Shapes must be equal")
for i in range(topomaps_array.shape[0]):
x.append(topomaps_array[i])
y.append(labels_array[i])
x = np.array(x)
y = np.array(y)
return x, y
def sample(z_mean, z_log_var):
batch = tf.shape(z_mean)[0]
dim = tf.shape(z_mean)[1]
epsilon = tf.random.normal(shape=(batch, dim))
stddev = tf.exp(0.5 * z_log_var)
return z_mean + stddev * epsilon
def plot_metric(history, metric):
plt.plot(history.history[metric])
plt.plot(history.history['val_' + metric])
plt.title(metric)
plt.ylabel(metric)
plt.xlabel('epoch')
plt.legend(['train', 'validation'])
plt.show()
编辑
**-你能用低得多的训练集来看看它是否过拟合并重建一个已知的图像吗?**好的,这是我在main
函数中所做的
# Reduce DS size
x_train = x_train[:500]
y_train = y_train[:500]
x_test = x_test[:500]
y_test = y_test[:500]
这就是我得到的:
学习曲线是:
如果我运行相同的配置,但设置epochs=100
,我得到:
损失:
**-不确定你是否做到了......但你在绘图时是否将输出转换回非标准化值?**这就是我所做的:
plt.title(f"Reconstructed image {image_index}, latent_dim = {latent_dimension}, epochs = {epochs}, "
f"batch_size = {batch_size}")
x_test_reconstructed = vae.predict(x_test)
reconstructed_image = x_test_reconstructed[image_index]
reconstructed_image = reconstructed_image * 255
plt.imshow(reconstructed_image, cmap="gray")
plt.show()
但我仍然得到:
**-当批量大小为4,学习率为0.0002,100个epoch时,损失图是什么样子?**重建图像全黑,损失曲线为:
2条答案
按热度按时间iibxawm41#
我运行我的实现没有问题。我在VAE教程的“副本”上运行,并在1个epoch后得到以下结果:
我没有你的数据,所以我不能检查你的数据加载器是否有任何错误。你可能想检查你的自动编码器和解码器上的调用函数,但是按照教程代码,我没有发现你的问题。
我建议对您的数据进行全面的调试:
1.检查数据输入、结构、形状等。(健康测试是好的)
1.尝试删除测试验证。
1.确保模型适用于已知数据集,即Mnist。尝试在MNIST数据集上调用您的VAE,如果它不起作用,那么这里有问题。
除非我有一些输入数据的样本,否则真的没有办法正确地调试代码。我高度怀疑数据输入,因为模型构造代码看起来很好。
qgelzfjb2#
这个问题涉及到数据规范化。
和
应该能解决问题