Python 粒子群算法

Python 粒子群算法,第1张

因为个人比较熟悉 PyTorch,所以在做优化问题的时候多是用梯度下降法 —— But,梯度下降法不是万能的。对于一些自变量与因变量无直接运算 (加减乘除......) 的时候,不会产生梯度信息,所以这个时候梯度下降法是没有办法优化的。所以迫不得已学了粒子群算法,在实现的过程中加了自己的一些 idea

大致的思路如下:

  • 根据各个基因的上下限生成指定规模的粒子群 (解向量的集合)
  • 粒子群算法类似鱼群觅食,当有一条鱼找到食物的时候,一定范围内的鱼都会朝着食物移动。我在网上看的多是 2 个最优粒子引领群体移动,我这里做的是一定百分比的优粒子引领群体移动
  • 适应度函数由用户定义。使用适应度函数计算出每个粒子的适应度,根据适应度选出一定百分比的优粒子,并根据适应度计算这些优粒子的权值;计算粒子间的欧式距离,结合优粒子的权值生成优粒子的“影响力”。再以影响力为权值,与 解向量差值 (趋势)、上一轮移动量 (惯性) 加权得到每个粒子的移动量
  • 为保证粒子群搜索范围的广度,在求解的过程中使用随求解轮次衰减的随机量权值,使得粒子群在朝着最优解搜索之外还具有一定的随机性

优化器对象

整数求解的话,直接对最优解取整就行了

import numpy as np

ARRAY_DTYPE = np.float32


class Particle_Swarm_Optimizer:
    ''' 粒子群优化器
        particle_size: 粒子群规模
        gene_pool: 每个基因的取值上下限
        fitness_cal: 适应度函数 (max -> best)
        well_percent: 优粒子百分比
        learn_pace: 学习率
        best_unit: 最优个体'''
    def __init__(self, particle_size, gene_pool, fitness_cal,
                 well_percent=0.05, learn_rate=1., best_unit=None):
        assert particle_size * well_percent > 2, '优粒子百分比 / 群体规模 过小'
        # 记录系统参数
        self._particle_size = particle_size
        self._gene_pool = np.array(gene_pool, dtype=ARRAY_DTYPE)
        self._fitness_cal = fitness_cal
        self._learn_rate = learn_rate
        self._well_percent = (1 - well_percent) * 100
        # 记录最优个体
        if best_unit:
            self.best_unit = np.array(best_unit, ARRAY_DTYPE)
            self.best_fitness = self._fitness_cal(self.best_unit)
        else:
            self.best_unit = None
            self.best_fitness = - np.inf

    def train(self, epochs, inertia_weight=0.2,
              random_weight_init=0.8, random_epochs_percent=0.2):
        ''' inertia_weight: 惯性权值
            random_weight: 随机移动量初始权值
            random_epochs_percent: 随机搜索轮次百分比'''
        # 生成粒子群
        self.particle = self._new_unit(self._particle_size)
        self.inertia = np.zeros_like(self.particle)
        # 随机搜索轮次数
        random_epochs = int(random_epochs_percent * epochs)
        for epoch in range(epochs):
            # 越界处理
            for gene_idx, (amin, amax) in enumerate(self._gene_pool):
                self.particle[:, gene_idx] = np.clip(self.particle[:, gene_idx],
                                                       a_min=amin, a_max=amax)
            # 粒子互相影响下产生的移动量
            move_pace = self._move_pace()
            # 根据轮次生成随机移动量
            if epoch < random_epochs:
                random_weight = (random_epochs - epoch) / random_epochs * random_weight_init
                random_pace = np.random.random(self.particle.shape) * self._learn_rate
                move_pace = random_weight * random_pace + (1 - random_weight) * move_pace
            self.particle += (move_pace + inertia_weight * self.inertia) * self._learn_rate
            self.inertia = move_pace
            # 空值检测, 重叠检测
            normal = ~ np.isnan(self.particle).sum(axis=1).astype(np.bool_)
            self._particle_filter(normal)
            _, unique = np.unique(self.particle, return_index=True, axis=0)
            self._particle_filter(unique)
            # 群体补全
            need = self._particle_size - len(self.particle)
            if need:
                self.particle = np.concatenate([self.particle, self._new_unit(need)], axis=0)
                self.inertia = np.concatenate([self.inertia, np.zeros([need, len(self._gene_pool)])])
            # 展示进度
            progress = get_progress(epoch + 1, epochs, bar_len=20)
            print(f'\rPSO: {progress}, best_fitness: {self.best_fitness}', end='')
        print()
        return self.best_unit

    def _new_unit(self, num):
        ''' 产生指定规模的群体'''
        # 变换为: 下限, 幅值上限
        gene_pool = self._gene_pool.copy()
        gene_pool[:, 1] = gene_pool[:, 1] - gene_pool[:, 0]
        scale = np.random.random([num, len(gene_pool)])
        group = gene_pool[:, 0] + gene_pool[:, 1] * scale
        return group

    def _particle_filter(self, cond):
        ''' 粒子筛选'''
        self.particle = self.particle[cond]
        self.inertia = self.inertia[cond]

    def _move_pace(self):
        ''' 根据 距离、适应度 产生的移动量'''
        # 适应度因子
        fitness_factor = self._fitness_factor()
        # 粒子间的距离
        particle = self.particle[:, np.newaxis, :]
        refer = np.append(self.particle, self.best_unit.reshape(1, -1),
                          axis=0)[np.newaxis, :, :]
        dist = refer - particle
        # 归一化: 距离因子
        dist_factor = (dist ** 2).sum(axis=2)
        dist_factor_max = dist_factor.max()
        dist_factor = (dist_factor_max - dist_factor) / dist_factor_max
        # 粒子间的影响力
        influence = fitness_factor * dist_factor
        move_pace = (dist * influence[..., np.newaxis]).sum(axis=1)
        move_pace /= fitness_factor.sum()
        return move_pace

    def _fitness_factor(self):
        ''' 适应度因子'''
        fitness = np.array(list(map(self._fitness_cal, self.particle)), dtype=ARRAY_DTYPE)
        # 只保留优粒子的适应度
        well_bound = np.percentile(fitness, self._well_percent)
        fitness *= fitness >= well_bound
        # 局部最优的个体
        cur_best_index = fitness.argmax()
        cur_best_fitness = fitness[cur_best_index]
        # 更新全局最优的个体
        if cur_best_fitness >= self.best_fitness:
            self.best_fitness = cur_best_fitness
            self.best_unit = self.particle[cur_best_index]
        # 转化为与最优适应度的比例
        fitness = normalize(np.append(fitness, self.best_fitness))
        return fitness

求解示例

自变量:

适应度函数:

其中:

import matplotlib.pyplot as plt


fitness_fun = lambda i: np.sin(i).sum()
pso = Particle_Swarm_Optimizer(50, gene_pool=[[0, 2], [2, 4], [4, 7]],
                               fitness_cal=fitness_fun)
best = pso.train(200)
print(best)

t = np.linspace(0, 7, 100)
# 绘制正弦函数
plt.plot(t, np.sin(t), color='deepskyblue')
# 绘制自变量边界
for bound in 0, 2, 4, 7:
    plt.plot([bound] * 2, [-1, 1], color='aqua')
# 绘制最优解
plt.scatter(best, np.sin(best), marker='p', c='orange')
plt.show()

最优解:[1.570796, 2.000000, 7.000000]

模型求解:[1.570619, 2.000000, 7.000000]

欢迎分享,转载请注明来源:内存溢出

原文地址:https://54852.com/langs/715453.html

(0)
打赏 微信扫一扫微信扫一扫 支付宝扫一扫支付宝扫一扫
上一篇 2022-04-25
下一篇2022-04-25

发表评论

登录后才能评论

评论列表(0条)

    保存