09-TensorFlow 基于WDCNN的轴承故障诊断

09-TensorFlow 基于WDCNN的轴承故障诊断,第1张

1.数据集

数据是来自凯斯西储大学(CWRU)滚动轴承数据中心.
官方网站数据下载地址: https://csegroups.case.edu/bearingdatacenter/pages/download-data-file

CWRU滚动轴承数据采集系统:

CWRU轴承中心数据采集系统如上图所示。本试验的试验对象为图中的驱动端轴承,被诊断的轴承型号为深沟球轴承SKF6205,有故障的轴承由电火花加工制作而成,系统的采样频率为12kHz。被诊断的轴承一共有3种缺陷位置,分别是滚动体损伤,外圈损伤与内圈损伤,损伤直径的大小分别为包括0.007inch,0.014inch和0.021inch,共计9种损伤状态。试验中,每次使用2048个数据点进行诊断。为了便于训练卷积神经网络,对每段信号x均做归一化处理,归一化处理的公式如下式

试验一共准备了4个数据集,如表2-3所示。数据集A、B和C分别是在负载为1hp、2hp和3hp下的数据集。每个数据集各包括6600个训练样本与250个测试样本,其中训练样本采用数据集增强技术,测试样本之间无重叠。数据集D是数据集A、B和C的并集,即包括了3种负载状态,一共有19800个训练样本与750个测试样本。

本项目数据集下载地址: https://download.csdn.net/download/qq_41865229/85200778

试验数据集描述

2.WDCNN算法描述

目前常用的二维卷积神经网络,例如VGGnet、ResNet、以及谷歌的Inception V4,这三种网络结构均含有堆叠式的3×3的卷积核。这样既可以加深网络深度,也可以实现以较少的参数,获取较大的感受野,从而抑制过拟合。然而,对于一维振动信号,两层3×1卷积的结构,以6个权值为代价,仅仅获取了5×1的感受野,反而将上述优势变成了劣势,因此视觉领域的网络结构不适用于轴承故障诊断领域。

本节针对一维振动信号的特点,设计出名为“第一层宽卷积核深度卷积神经网WDCNN(Deep Convolutional Neural Networks with Wide First-layer Kernel)模型,其结构特点是第一层为大卷积核,之后卷积层全部为3×1的小卷积核。

WDCNN的第一层为大卷积核,目的是为了提取短时特征,其作用与短时傅里叶变换类似。不同点在于,短时傅里叶变换的窗口函数是正弦函数,而WDCNN的第一层大卷积核,是通过优化算法训练得到,其优点是可以自动学习面向诊断的特征,而自动去除对诊断没有帮助的特征。

为了增强WDCNN的表达能力,除第一层外,其与卷积层的卷积核大小均为3×1。由于卷积核参数少,这样有利于加深网络,同时可以抑制过拟合。每层卷积 *** 作之后均进行批量归一化处理BN(Batch Normalization),然后进行2×1的最大值池化,WDCNN的结构图如下所示。

WDCNN结构图:

试验中使用的WDCNN模型参数如下示。该WDCNN模型共有5层卷积与池化层,第一层卷积核大小为64×1,除第一层外,其余卷积核的大小均为3×1。隐含层神经元个数为100,Softmax层共有10个输出,对应10种轴承状态。

WDCNN的结构参数:

3.项目代码

数据集制作代码 cwru_preprocess.py

from scipy.io import loadmat
import numpy as np
import os
from sklearn import preprocessing  # 0-1编码
from sklearn.model_selection import StratifiedShuffleSplit  # 随机划分,保证每一类比例相同
import tensorflow as tf

def prepro(d_path, length=864, number=1000, normal=True, rate=[0.5, 0.25, 0.25], enc=True, enc_step=28):
    """对数据进行预处理,返回train_X, train_Y, valid_X, valid_Y, test_X, test_Y样本.

    :param d_path: 源数据地址
    :param length: 信号长度,默认2个信号周期,864
    :param number: 每种信号个数,总共10类,默认每个类别1000个数据
    :param normal: 是否标准化.True,Fales.默认True
    :param rate: 训练集/验证集/测试集比例.默认[0.5,0.25,0.25],相加要等于1
    :param enc: 训练集、验证集是否采用数据增强.Bool,默认True
    :param enc_step: 增强数据集采样顺延间隔
    :return: Train_X, Train_Y, Valid_X, Valid_Y, Test_X, Test_Y

    ```
    import preprocess.preprocess_nonoise as pre

    train_X, train_Y, valid_X, valid_Y, test_X, test_Y = pre.prepro(d_path=path,
                                                                    length=864,
                                                                    number=1000,
                                                                    normal=False,
                                                                    rate=[0.5, 0.25, 0.25],
                                                                    enc=True,
                                                                    enc_step=28)
    ```
    """
    # 获得该文件夹下所有.mat文件名
    filenames = os.listdir(d_path)

    def capture(original_path):
        """读取mat文件,返回字典

        :param original_path: 读取路径
        :return: 数据字典
        """
        files = {}
        for i in filenames:
            # 文件路径
            file_path = os.path.join(d_path, i)
            file = loadmat(file_path)
            file_keys = file.keys()
            for key in file_keys:
                if 'DE' in key:
                    files[i] = file[key].ravel()
        return files

    def slice_enc(data, slice_rate=rate[1] + rate[2]):
        """将数据切分为前面多少比例,后面多少比例.

        :param data: 单挑数据
        :param slice_rate: 验证集以及测试集所占的比例
        :return: 切分好的数据
        """
        keys = data.keys()
        Train_Samples = {}
        Test_Samples = {}
        for i in keys:
            slice_data = data[i]
            all_lenght = len(slice_data)
            end_index = int(all_lenght * (1 - slice_rate))
            samp_train = int(number * (1 - slice_rate))  # 700
            Train_sample = []
            Test_Sample = []
            if enc:
                enc_time = length // enc_step
                samp_step = 0  # 用来计数Train采样次数
                for j in range(samp_train):
                    random_start = np.random.randint(low=0, high=(end_index - 2 * length))
                    label = 0
                    for h in range(enc_time):
                        samp_step += 1
                        random_start += enc_step
                        sample = slice_data[random_start: random_start + length]
                        Train_sample.append(sample)
                        if samp_step == samp_train:
                            label = 1
                            break
                    if label:
                        break
            else:
                for j in range(samp_train):
                    random_start = np.random.randint(low=0, high=(end_index - length))
                    sample = slice_data[random_start:random_start + length]
                    Train_sample.append(sample)

            # 抓取测试数据
            for h in range(number - samp_train):
                random_start = np.random.randint(low=end_index, high=(all_lenght - length))
                sample = slice_data[random_start:random_start + length]
                Test_Sample.append(sample)
            Train_Samples[i] = Train_sample
            Test_Samples[i] = Test_Sample
        return Train_Samples, Test_Samples

    # 仅抽样完成,打标签
    def add_labels(train_test):
        X = []
        Y = []
        label = 0
        for i in filenames:
            x = train_test[i]
            X += x
            lenx = len(x)
            Y += [label] * lenx
            label += 1
        return X, Y

    # one-hot编码
    def one_hot(Train_Y, Test_Y):
        Train_Y = np.array(Train_Y).reshape([-1, 1])
        Test_Y = np.array(Test_Y).reshape([-1, 1])
        Encoder = preprocessing.OneHotEncoder()
        Encoder.fit(Train_Y)
        Train_Y = Encoder.transform(Train_Y).toarray()
        Test_Y = Encoder.transform(Test_Y).toarray()
        Train_Y = np.asarray(Train_Y, dtype=np.int32)
        Test_Y = np.asarray(Test_Y, dtype=np.int32)
        return Train_Y, Test_Y

    def scalar_stand(Train_X, Test_X):
        # 用训练集标准差标准化训练集以及测试集
        scalar = preprocessing.StandardScaler().fit(Train_X)
        Train_X = scalar.transform(Train_X)
        Test_X = scalar.transform(Test_X)
        return Train_X, Test_X

    def valid_test_slice(Test_X, Test_Y):
        test_size = rate[2] / (rate[1] + rate[2])
        ss = StratifiedShuffleSplit(n_splits=1, test_size=test_size)
        for train_index, test_index in ss.split(Test_X, Test_Y):
            X_valid, X_test = Test_X[train_index], Test_X[test_index]
            Y_valid, Y_test = Test_Y[train_index], Test_Y[test_index]
            return X_valid, Y_valid, X_test, Y_test

    # 从所有.mat文件中读取出数据的字典
    data = capture(original_path=d_path)
    # 将数据切分为训练集、测试集
    train, test = slice_enc(data)
    # 为训练集制作标签,返回X,Y
    Train_X, Train_Y = add_labels(train)
    # 为测试集制作标签,返回X,Y
    Test_X, Test_Y = add_labels(test)
    # 为训练集Y/测试集One-hot标签
    Train_Y, Test_Y = one_hot(Train_Y, Test_Y)
    # 训练数据/测试数据 是否标准化.
    if normal:
        Train_X, Test_X = scalar_stand(Train_X, Test_X)
    else:
        # 需要做一个数据转换,转换成np格式.
        Train_X = np.asarray(Train_X)
        Test_X = np.asarray(Test_X)

    # 将测试集切分为验证集合和测试集.
    Valid_X, Valid_Y, Test_X, Test_Y = valid_test_slice(Test_X, Test_Y)
    return Train_X, Train_Y, Valid_X, Valid_Y, Test_X, Test_Y


if __name__ == "__main__":
    path = r'cwru_data,HP'
    train_X, train_Y, valid_X, valid_Y, test_X= test_Y ( prepro=d_path,path=
                                                                length864,=
                                                                number1000,=
                                                                normalFalse,=
                                                                rate[0.5,0.25 ,0.25 ],=
                                                                encFalse,=
                                                                enc_step28)print
    ([train_X0:5])print
    ([train_Y0:5])import

模型训练代码 cwru_train.py

import os
. matplotlibaspyplot import plt
as numpy from np
. tensorflow_core.pythonimportkeras from Model
. tensorflow_core.python.kerasimportlayers , Conv1D, Activation, BatchNormalization, MaxPooling1D, Flattenimport Dense
as tensorflow from tf
. tensorflow_core.python.kerasimportregularizers import l2

as cwru_preprocess # 训练参数 preprocess
=
batch_size 128 =
epochs 20 =
num_classes 10 =
length 2048 =
BatchNorm True # 是否批量归一化 =
number 1000 # 每类样本的数量 =
normal True # 是否标准化 =
rate [ 0.7,0.2,0.1]# 测试集验证集划分比例 =

path r'cwru_data,HP' ,
x_train, y_train, x_valid, y_valid= x_test. y_test ( preprocess=prepro,d_path=path,length=length,
                                                                  number=number,
                                                                  normal=normal,
                                                                  rate=rateTrue
                                                                  enc,=28 enc_step)# 输入卷积的时候还需要修改一下,增加通道数目,
,
x_train= x_valid[ x_test : x_train,:,.]np,newaxis[: x_valid,:,.]np,newaxis[: x_test,:,.]np#[:,:,np.newaxis]是什么意思newaxis# 输入数据的维度 =
.
input_shape [x_train1shape:]print(


'训练样本维度:',.) x_trainprintshape(
'训练样本个数',.[ x_train0shape])print(
'验证样本的维度',.) x_validprintshape(
'验证样本个数',.[ x_valid0shape])print(
'测试样本的维度',.) x_testprintshape(
'测试样本个数',.[ x_test0shape])print(
'测试标签的维度',.) y_testclassshapeCwruModel





( ):Modeldef__init__
    ( ):self#构造函数super (
        ,)CwruModel. self()__init__#第1层卷积.
        =
        self(c1 = Conv1D16filters,=64 kernel_size,=16 strides,='same' padding,=( kernel_regularizer1e-4l2),=) input_shape #卷积层 input_shape.=
        self(b1 ) BatchNormalization# BN层.  =
        self(a1 'relu' Activation)# 激活层.  =
        self(p1 = MaxPooling1D2pool_size)#池化层#第2层卷积 .

        =
        self(c2 = Conv1D32filters,=3 kernel_size,=1 strides,='same' padding).=
        self(b2 ) BatchNormalization.=
        self(a2 'relu' Activation)# 激活层.  =
        self(p2 = MaxPooling1D2pool_size,='valid' padding)#第3层卷积.

        =
        self(c3 = Conv1D64filters,=3 kernel_size,=1 strides,='same' padding).=
        self(b3 ) BatchNormalization.=
        self(a3 'relu' Activation)# 激活层.  =
        self(p3 = MaxPooling1D2pool_size,='valid' padding)#第4层卷积.

        =
        self(c4 = Conv1D64filters,=3 kernel_size,=1 strides,='same' padding).=
        self(b4 ) BatchNormalization.=
        self(a4 'relu' Activation)# 激活层.  =
        self(p4 = MaxPooling1D2pool_size,='valid' padding)#第5层卷积.

        =
        self(c5 = Conv1D64filters,=3 kernel_size,=1 strides,='valid' padding).=
        self(b5 ) BatchNormalization.=
        self(a5 'relu' Activation)# 激活层.  =
        self(p5 = MaxPooling1D2pool_size,='valid' padding)#从卷积到全连接需要展平.

        =
        self(flatten ) Flatten# 全连接层.
        =
        self(d1 = Dense100units,='relu' activation,=( kernel_regularizer1e-4l2))# 输出层.
        =
        self(d2 = Dense,units=num_classes'softmax' activation,=( kernel_regularizer1e-4l2))defcall


    ( ,)self: x=.
        x ( self)c1=x.
        x ( self)b1=x.
        x ( self)a1=x.
        x ( self)p1=x.

        x ( self)c2=x.
        x ( self)b2=x.
        x ( self)a2=x.
        x ( self)p2=x.

        x ( self)c3=x.
        x ( self)b3=x.
        x ( self)a3=x.
        x ( self)p3=x.

        x ( self)c4=x.
        x ( self)b4=x.
        x ( self)a4=x.
        x ( self)p4=x.

        x ( self)c5=x.
        x ( self)b5=x.
        x ( self)a5=x.
        x ( self)p5=x.

        x ( self)flatten=x.
        x ( self)d1=x.
        y ( self)d2returnx=
        ( y


model ) CwruModel.compile

model(='Adam'optimizer,='categorical_crossentropy' loss,=[ metrics'accuracy'])#加载模型=

"./cwru_checkpoint/cwru_cnn.ckpt"
checkpoint_save_path if .
. os(path+exists'.index'checkpoint_save_path ) :print(
    '-------------load the model-----------------').(
    model)load_weights=checkpoint_save_path.

cp_callback . tf.keras(callbacks=ModelCheckpoint,filepath=checkpoint_save_pathTrue
                                                 save_weights_only,=True
                                                 save_best_only)=.



history ( model=fit,x=x_train, y=y_train, batch_size=batch_size, epochs=epochs1 verbose,=( validation_data,)x_valid, y_valid=True shuffle,=[ callbacks])cp_callback.(
model)summary#输出模型各层的参数状况# 显示训练集和验证集的acc和loss曲线 #根据compile参数metrics,history包含不同的内容

=
.
train_acc [ history'accuracy'history]#训练集准确率=                .
val_acc [ history'val_accuracy'history]#测试集准确率=        .

train_loss [ history'loss'history]#训练集损失率=                                      .
val_loss [ history'val_loss'history]#测试集损失率.                              (


plt1subplot,2, 1) #画两行一列的第1个子图.  (
plt,plot=train_loss'train_loss' label).(
plt,plot=val_loss'val_loss' label).(
plt'epochs'xlabel).(
plt'loss'ylabel).(
plt)legend#函数主要的作用就是给图加上图例, 要求线条有label.   (
plt'Training and Validation loss'title).(



plt1subplot,2, 2) #画两行一列的第1个子图.  (
plt,plot=train_acc'train_acc' label).(
plt,plot=val_acc'val_acc' label).(
plt'epochs'xlabel).(
plt'accuracy'ylabel).(
plt)legend#函数主要的作用就是给图加上图例, 要求线条有label.   (
plt'Training and Validation accuracy'title).(
plt)showimportfrom

运行结果

使用模型进行预测 cwru_predict.py

import os
import datetime . time

as matplotlibimportpyplot as plt
from numpy . np
. tensorflow_coreimportpythonfromkeras . Model
. tensorflow_core.pythonimportkeras,layers , Conv1D, Activation, BatchNormalization, MaxPooling1Dimport Flattenas Dense
from tensorflow . tf
. tensorflow_core.pythonimportkerasimportregularizers as l2

# 训练参数 cwru_preprocess = preprocess
128
batch_size = 20
epochs = 10
num_classes = 2048
length = True
BatchNorm # 是否批量归一化 = 1000
number # 每类样本的数量 = True
normal # 是否标准化 = [
rate 0.7 ,0.2,0.1]# 测试集验证集划分比例= r'cwru_data,HP'

path , ,
x_train, y_train, x_valid= y_valid. x_test( y_test = preprocess,prepro=d_path,path=length,length=
                                                                  number,number=
                                                                  normal,normal=
                                                                  rateTruerate,
                                                                  enc=28) enc_step# 输入卷积的时候还需要修改一下,增加通道数目,,
=
x_train[ x_valid: x_test , x_train:,.],np[newaxis:, x_valid:,.],np[newaxis:, x_test:,.]#[:,:,np.newaxis]是什么意思np# 输入数据的维度newaxis= .
[
input_shape 1x_train:shape]print('训练样本维度:'

,.)print x_train(shape'训练样本个数'
,.[0 x_train]shape)print('验证样本的维度'
,.)print x_valid(shape'验证样本个数'
,.[0 x_valid]shape)print('测试样本的维度'
,.)print x_test(shape'测试样本个数'
,.[0 x_test]shape)print('测试标签的维度'
,.)class y_testCwruModelshape(





) :defModel__init__(
    ) :#构造函数selfsuper( ,
        ).CwruModel( self)#第1层卷积__init__.=
        (
        self=c1 16 Conv1D,filters=64, kernel_size=16, strides='same', padding=(1e-4 kernel_regularizer)l2,=)#卷积层 input_shape . input_shape=(
        self)b1 # BN层 BatchNormalization.=  (
        self'relu'a1 ) Activation# 激活层.=  (
        self=p1 2 MaxPooling1D)pool_size#池化层#第2层卷积. =

        (
        self=c2 32 Conv1D,filters=3, kernel_size=1, strides='same') padding.=(
        self)b2 . BatchNormalization=(
        self'relu'a2 ) Activation# 激活层.=  (
        self=p2 2 MaxPooling1D,pool_size='valid') padding#第3层卷积.=

        (
        self=c3 64 Conv1D,filters=3, kernel_size=1, strides='same') padding.=(
        self)b3 . BatchNormalization=(
        self'relu'a3 ) Activation# 激活层.=  (
        self=p3 2 MaxPooling1D,pool_size='valid') padding#第4层卷积.=

        (
        self=c4 64 Conv1D,filters=3, kernel_size=1, strides='same') padding.=(
        self)b4 . BatchNormalization=(
        self'relu'a4 ) Activation# 激活层.=  (
        self=p4 2 MaxPooling1D,pool_size='valid') padding#第5层卷积.=

        (
        self=c5 64 Conv1D,filters=3, kernel_size=1, strides='valid') padding.=(
        self)b5 . BatchNormalization=(
        self'relu'a5 ) Activation# 激活层.=  (
        self=p5 2 MaxPooling1D,pool_size='valid') padding#从卷积到全连接需要展平.=

        (
        self)flatten # 全连接层 Flatten.=
        (
        self=d1 100 Dense,units='relu', activation=(1e-4 kernel_regularizer)l2)# 输出层.=
        (
        self=d2 , Dense=units'softmax'num_classes, activation=(1e-4 kernel_regularizer)l2)defcall(


    , ):self= x.(
        x ) self=c1.x(
        x ) self=b1.x(
        x ) self=a1.x(
        x ) self=p1.x(

        x ) self=c2.x(
        x ) self=b2.x(
        x ) self=a2.x(
        x ) self=p2.x(

        x ) self=c3.x(
        x ) self=b3.x(
        x ) self=a3.x(
        x ) self=p3.x(

        x ) self=c4.x(
        x ) self=b4.x(
        x ) self=a4.x(
        x ) self=p4.x(

        x ) self=c5.x(
        x ) self=b5.x(
        x ) self=a5.x(
        x ) self=p5.x(

        x ) self=flatten.x(
        x ) self=d1.x(
        y ) selfreturnd2=x(
        ) y


model . CwruModelcompile(

model='Adam',optimizer='categorical_crossentropy', loss=['accuracy' metrics])#加载模型="./cwru_checkpoint/cwru_cnn.ckpt"

if
checkpoint_save_path . .
( os+path'.index'exists)checkpoint_save_path : print('-------------load the model-----------------'
    ).()
    model=load_weights.checkpoint_save_path.

cp_callback . tf(keras=callbacks,ModelCheckpoint=filepathTruecheckpoint_save_path,
                                                 save_weights_only=True)
                                                 save_best_only,=.


loss( accuracy , model)evaluateprintx_valid( y_valid'loss='
,)print( loss'accuracy='
,)=. accuracy(
y_pre [ model0predict:x_valid10])# print("------------预测样本数据----------")# print(x_valid[0:10])print
(
"------------预测结果概率----------"
)print()
print(y_pre"------------预测结果分类----------"
)#概率数组转化为标签, 如[0.1, 0.2, 0.7]转化为2print(
.
(,np=argmax1y_pre) axis)

运行结果

欢迎分享,转载请注明来源:内存溢出

原文地址:https://54852.com/langs/758357.html

(0)
打赏 微信扫一扫微信扫一扫 支付宝扫一扫支付宝扫一扫
上一篇 2022-05-01
下一篇2022-05-01

发表评论

登录后才能评论

评论列表(0条)

    保存