+关注
已关注

分类  

模块(0)

标准库(0)

字典(0)

文件(0)

字符串(0)

标签  

标准库模块(0)

字符串(0)

爬虫(1)

模块(0)

人工智能(0)

日期归档  

2020-04(12)

2020-05(31)

2020-06(47)

2020-07(31)

2020-08(41)

生成式对抗网络(Gan)下篇:代码--基于tensorflow实现

发布于2021-03-07 20:53     阅读(809)     评论(0)     点赞(27)     收藏(4)


0

1

2

3

4

5

6

7

8

9

一、程序代码

【步骤】

  1. 定义对抗生成网络结构:
    D(判别)网络 :识别G生成的假数据
    G(生成)网络:
    输入格式、初始化:
  2. 训练:
    传入2组数据:真实数据x:高斯分布
    随机初始化z
# 【一】导入必要的包
import argparse  # 参数解析包:解析命令行参数和选项
import numpy as np  # 科学计算的库,可以提供矩阵运算
from scipy.stats import norm  # scipy数值计算库
import tensorflow as tf
import matplotlib.pyplot as plt  # matplotlib绘图库
import seaborn as sns  # 数据模块可视化
from matplotlib import animation

sns.set(color_codes=True)  # set() 设置主题,调色板更常用
seed = 42  # 设置seed,用于随机初始化 ,使得每次生成的随机数相同

np.random.seed(seed)
tf.set_random_seed(seed)

# 【二】定义真实的数据分布(高斯分布)
class DataDistribution(object):
    def __init__(self):
        # 均值
        self.mu = 4
        # 标准差
        self.sigma = 0.5

    def sample(self, N):
        samples = np.random.normal(self.mu, self.sigma, N)
        samples.sort()
        return samples

# 【三】随机初始化一个分布,作为G网络的输入
class GeneratorDistribution(object):
    def __init__(self, range):
        self.range = range

    def sample(self, N):
        return np.linspace(-self.range, self.range, N) + np.random.random(N) * 0.01

# 【四】定义线性运算函数,其中参数output_dim = h_dim * 2 = 8
def linear(input, output_dim, scope=None, stddev=1.0):
    # 定义一个随机初始化
    norm = tf.random_normal_initializer(stddev = stddev)
    # b初始化为0
    const = tf.constant_initializer(0.0)
    with tf.variable_scope(scope or 'linear'):
        # 声明 w 的shape,输入为(12,1)* w,故 w 为(1,8),w 的初始化方式为高斯初始化
        w = tf.get_variable('w', [input.get_shape()[1], output_dim], initializer=norm)
        # b 初始化为常量
        b = tf.get_variable('b', [output_dim], initializer=const)
        # 执行线性运算
        return tf.matmul(input, w) + b

def generator(input, h_dim):
    h0 = tf.nn.softplus(linear(input, h_dim, 'g0'))
    h1 = linear(h0, 1, 'g1')
    return h1

# 【五】初始化w 和 b 的函数,其中h0,h1,h2,h3 为层,将mlp_hidden_size = 4 传给 h_dim
def discriminator(input, h_dim):
    # 第一层:linear 控制w和b的初始化,这里linear函数的第二个参数为 4*2=8
    h0 = tf.tanh(linear(input, h_dim * 2, 'd0'))
    # 第二层输出,隐藏层神经元个数还是8
    h1 = tf.tanh(linear(h0, h_dim *2, 'd1'))
    # h2为第三层输出值
    h2 = tf.tanh(linear(h1, h_dim * 2, scope='d2'))
    # 最终的输出值
    h3 = tf.sigmoid(linear(h2, 1, scope='d3'))
    return h3

# 【六】优化器:采用学习率衰减的方法
def optimizer(loss, var_list, initial_learning_rate):
    # 衰减策略
    decay = 0.95
    # 网络每迭代150次进行一次学习率的衰减
    num_decay_steps = 150
    batch = tf.Variable(0)
    # 调节学习率衰减的函数(exponential_decay:学习率衰减的计算方式)
    learning_rate = tf.train.exponential_decay(initial_learning_rate, batch, num_decay_steps, decay, staircase=True)
    # 梯度下降求解
    optimizer = tf.train.GradientDescentOptimizer(learning_rate).minimize(loss, global_step=batch, var_list=var_list)
    # 返回求解器
    return optimizer

# 【七】构造模型
class GAN(object):
    def __init__(self, data, gen, num_steps, batch_size, log_every):
        self.data = data
        self.gen = gen
        self.num_steps = num_steps
        self.batch_size = batch_size
        self.log_every = log_every
        # 隐藏层神经元个数
        self.mlp_hidden_size = 4
        # 学习率
        self.learning_rate = 0.03
        # 通过placeholder格式来创造模型
        self._create_model()

    def _create_model(self):
        # 创建一个名叫D_pre的域,先构造一个D_pre网络,用来训练出真正的D网络初始化网络所需要的参数
        with tf.variable_scope('D_pre'):
            # 输入的shape为(12,1),一个batcha一个batch的训练,每个batch的大小为12,要训练的数据为1维的点
            self.pre_input = tf.placeholder(tf.float32, shape=(self.batch_size,1))
            self.pre_labels = tf.placeholder(tf.float32, shape=(self.batch_size,1))
            # 调用discriminator来初始化w和b参数,其中self.mlp_hidden_size=4,为discriminator函数的第二个参数
            D_pre = discriminator(self.pre_input, self.mlp_hidden_size)
            # 预测值和label之间的差异
            self.pre_loss = tf.reduce_mean(tf.square(D_pre - self.pre_labels))
            # 定义优化器求解
            self.pre_opt = optimizer(self.pre_loss, None, self.learning_rate)

        # This defines the generator network - it takes samples from a noise
        # distribution as input,and passes them through an MLP
        # 真正的G网络
        with tf.variable_scope('Gen'):
            self.z = tf.placeholder(tf.float32, shape=(self.batch_size, 1))
            # 生成网络只有2层
            self.G = generator(self.z, self.mlp_hidden_size)

        # The discriminator tries to tell the difference between samples from the true data distribution (self.x) and the generated samples (self.z)
        # Here we create two copies of the discriminator network (that share parameters),as you cannot use the same network with different inputs in Tensorflow
        with tf.variable_scope('Disc') as scope:
            self.x = tf.placeholder(tf.float32, shape=(self.batch_size, 1))
            # 构造D1网络:真实的数据
            self.D1 = discriminator(self.x, self.mlp_hidden_size)
            # 重新使用一下变量,不用重新定义
            scope.reuse_variables()
            # 构造D2网络: 生成的数据
            self.D2 = discriminator(self.G, self.mlp_hidden_size)

            # Define the loss for discriminator and generator networks (see the original paper for details), and create optimizers for both
            # 定义判别网络损失函数
            self.loss_d = tf.reduce_mean(-tf.log(self.D1)-tf.log(1-self.D2))
            # 定义生成网络损失函数
            self.loss_g = tf.reduce_mean(-tf.log(self.D2))

            self.d_pre_params = tf.get_collection(tf.GraphKeys.TRAINABLE_VARIABLES, scope='D_pre')
            self.d_params = tf.get_collection(tf.GraphKeys.TRAINABLE_VARIABLES, scope='Disc')
            self.g_params = tf.get_collection(tf.GraphKeys.TRAINABLE_VARIABLES, scope='Gen')
            # 优化,得到2组参数
            self.opt_d = optimizer(self.loss_d, self.d_params, self.learning_rate)
            self.opt_g = optimizer(self.loss_g, self.g_params, self.learning_rate)

    def train(self):
        with tf.Session() as session:
            tf.global_variables_initializer().run()
            # pretraining discriminator
            # 先训练D_pre网络
            num_pretrain_steps = 1000
            for step in range(num_pretrain_steps):
                # 随机生成数据
                d = (np.random.random(self.batch_size) - 0.5) * 10.0
                labels = norm.pdf(d, loc=self.data.mu, scale=self.data.sigma)
                pretrain_loss, _ = session.run([self.pre_loss, self.pre_opt],
                                               {
                                                   self.pre_input: np.reshape(d, (self.batch_size,1)),
                                                   self.pre_labels: np.reshape(labels, (self.batch_size, 1))
                                               })
                # 拿出预测训练好的数据
                self.weightsD = session.run(self.d_pre_params)
                # copy weights from pre-training over to new D network
                for i, v in enumerate(self.d_params):
                    session.run(v.assign(self.weightsD[i]))

                # 训练真正的网络
                for step in range(self.num_steps):
                    # update discriminator
                    x = self.data.sample(self.batch_size)
                    # z 是一个随机生成的噪音
                    z = self.gen.sample(self.batch_size)
                    # 优化判别网络
                    loss_d, _ = session.run([self.loss_d, self.opt_d],
                                            {
                                                self.x: np.reshape(x, (self.batch_size, 1)),
                                                self.z: np.reshape(z, (self.batch_size, 1))
                                            })

                    # update generator
                    # 随机初始化
                    z = self.gen.sample(self.batch_size)
                    # 迭代优化
                    loss_g, _ = session.run([self.loss_g, self.opt_g],
                                            {
                                                self.z: np.reshape(z, (self.batch_size, 1))
                                            })
                    # 打印
                    if step % self.log_every == 0:
                        print('{}:{}\t{}'.format(step, loss_d, loss_g))
                    # 画图
                    if step % 100 == 0 or step == 0 or step == self.num_steps - 1:
                        self._plot_distributions(session)

    def _samples(self, session, num_points=10000, num_bins=100):
        xs = np.linspace(-self.gen.range, self.gen.range, num_points)
        bins = np.linspace(-self.gen.range, self.gen.range, num_bins)

        # data distribution
        d = self.data.sample(num_points)
        pd, _ = np.histogram(d, bins=bins, density=True)

        # generated samples
        zs = np.linspace(-self.gen.range, self.gen.range, num_points)
        g = np.zeros((num_points, 1))
        for i in range(num_points // self.batch_size):
            g[self.batch_size * i: self.batch_size * (i+1)] = session.run(self.G,{
                self.z: np.reshape(
                    zs[self.batch_size * i: self.batch_size * (i+1)],(self.batch_size, 1)
                )
            })
        pg, _ = np.histogram(g, bins=bins, density=True)
        return pd, pg

    def _plot_distributions(self, session):
        pd, pg = self._samples(session)
        p_x = np.linspace(-self.gen.range, self.gen.range, len(pd))
        f, ax = plt.subplots(1)
        ax.set_ylim(0,1)
        plt.plot(p_x, pd, label='real data')
        plt.plot(p_x, pg, label='generated data')
        plt.title('1D Generative Adversarial Network')
        plt.xlabel('Data values')
        plt.ylabel('Probability density')
        plt.legend()
        plt.show()

#
def main(args):
    model = GAN(
        # 定义真实数据的分布
        DataDistribution(),
        # 创造一些噪音点,用来传入G函数
        GeneratorDistribution(range=8),
        # 迭代次数
        args.num_steps,
        # 一次迭代12个点的数据
        args.batch_size,
        # 隔多少次打印当前loss
        args.log_every)
    model.train()

#
def parse_args():
    parser = argparse.ArgumentParser()
    parser.add_argument('--num-steps',type=int, default=1200, help='the number of training steps to take')
    parser.add_argument('--batch-size', type=int, default=12, help='the batch size')
    parser.add_argument('--log-every', type=int, default=10, help='print loss after this many steps')
    return parser.parse_args()

if __name__ == '__main__':
    main(parse_args())

二、运行结果

1.程序运行初始状态
在这里插入图片描述
注意:黄色线为随机初始化的数据,蓝色线为真实的呈高斯分布的数据。
2.迭代运行1200次后的状态
在这里插入图片描述

原文链接:https://blog.csdn.net/ITCLSJ/article/details/114384201

0

1

2

3

4

5

6



所属网站分类: 技术文章 > 博客

作者:追梦骚年

链接: https://www.pythonheidong.com/blog/article/879797/2672dc8bf463234bf78e/

来源: python黑洞网

任何形式的转载都请注明出处,如有侵权 一经发现 必将追究其法律责任

27 0
收藏该文
已收藏

评论内容:(最多支持255个字符)