
数据是来自凯斯西储大学(CWRU)滚动轴承数据中心.
官方网站数据下载地址: https://csegroups.case.edu/bearingdatacenter/pages/download-data-file
CWRU滚动轴承数据采集系统:
CWRU轴承中心数据采集系统如上图所示。本试验的试验对象为图中的驱动端轴承,被诊断的轴承型号为深沟球轴承SKF6205,有故障的轴承由电火花加工制作而成,系统的采样频率为12kHz。被诊断的轴承一共有3种缺陷位置,分别是滚动体损伤,外圈损伤与内圈损伤,损伤直径的大小分别为包括0.007inch,0.014inch和0.021inch,共计9种损伤状态。试验中,每次使用2048个数据点进行诊断。为了便于训练卷积神经网络,对每段信号x均做归一化处理,归一化处理的公式如下式
试验一共准备了4个数据集,如表2-3所示。数据集A、B和C分别是在负载为1hp、2hp和3hp下的数据集。每个数据集各包括6600个训练样本与250个测试样本,其中训练样本采用数据集增强技术,测试样本之间无重叠。数据集D是数据集A、B和C的并集,即包括了3种负载状态,一共有19800个训练样本与750个测试样本。
本项目数据集下载地址: https://download.csdn.net/download/qq_41865229/85200778
试验数据集描述
目前常用的二维卷积神经网络,例如VGGnet、ResNet、以及谷歌的Inception V4,这三种网络结构均含有堆叠式的3×3的卷积核。这样既可以加深网络深度,也可以实现以较少的参数,获取较大的感受野,从而抑制过拟合。然而,对于一维振动信号,两层3×1卷积的结构,以6个权值为代价,仅仅获取了5×1的感受野,反而将上述优势变成了劣势,因此视觉领域的网络结构不适用于轴承故障诊断领域。
本节针对一维振动信号的特点,设计出名为“第一层宽卷积核深度卷积神经网WDCNN(Deep Convolutional Neural Networks with Wide First-layer Kernel)模型,其结构特点是第一层为大卷积核,之后卷积层全部为3×1的小卷积核。
WDCNN的第一层为大卷积核,目的是为了提取短时特征,其作用与短时傅里叶变换类似。不同点在于,短时傅里叶变换的窗口函数是正弦函数,而WDCNN的第一层大卷积核,是通过优化算法训练得到,其优点是可以自动学习面向诊断的特征,而自动去除对诊断没有帮助的特征。
为了增强WDCNN的表达能力,除第一层外,其与卷积层的卷积核大小均为3×1。由于卷积核参数少,这样有利于加深网络,同时可以抑制过拟合。每层卷积 *** 作之后均进行批量归一化处理BN(Batch Normalization),然后进行2×1的最大值池化,WDCNN的结构图如下所示。
WDCNN结构图:
试验中使用的WDCNN模型参数如下示。该WDCNN模型共有5层卷积与池化层,第一层卷积核大小为64×1,除第一层外,其余卷积核的大小均为3×1。隐含层神经元个数为100,Softmax层共有10个输出,对应10种轴承状态。
WDCNN的结构参数:
数据集制作代码 cwru_preprocess.py
from scipy.io import loadmat
import numpy as np
import os
from sklearn import preprocessing # 0-1编码
from sklearn.model_selection import StratifiedShuffleSplit # 随机划分,保证每一类比例相同
import tensorflow as tf
def prepro(d_path, length=864, number=1000, normal=True, rate=[0.5, 0.25, 0.25], enc=True, enc_step=28):
"""对数据进行预处理,返回train_X, train_Y, valid_X, valid_Y, test_X, test_Y样本.
:param d_path: 源数据地址
:param length: 信号长度,默认2个信号周期,864
:param number: 每种信号个数,总共10类,默认每个类别1000个数据
:param normal: 是否标准化.True,Fales.默认True
:param rate: 训练集/验证集/测试集比例.默认[0.5,0.25,0.25],相加要等于1
:param enc: 训练集、验证集是否采用数据增强.Bool,默认True
:param enc_step: 增强数据集采样顺延间隔
:return: Train_X, Train_Y, Valid_X, Valid_Y, Test_X, Test_Y
```
import preprocess.preprocess_nonoise as pre
train_X, train_Y, valid_X, valid_Y, test_X, test_Y = pre.prepro(d_path=path,
length=864,
number=1000,
normal=False,
rate=[0.5, 0.25, 0.25],
enc=True,
enc_step=28)
```
"""
# 获得该文件夹下所有.mat文件名
filenames = os.listdir(d_path)
def capture(original_path):
"""读取mat文件,返回字典
:param original_path: 读取路径
:return: 数据字典
"""
files = {}
for i in filenames:
# 文件路径
file_path = os.path.join(d_path, i)
file = loadmat(file_path)
file_keys = file.keys()
for key in file_keys:
if 'DE' in key:
files[i] = file[key].ravel()
return files
def slice_enc(data, slice_rate=rate[1] + rate[2]):
"""将数据切分为前面多少比例,后面多少比例.
:param data: 单挑数据
:param slice_rate: 验证集以及测试集所占的比例
:return: 切分好的数据
"""
keys = data.keys()
Train_Samples = {}
Test_Samples = {}
for i in keys:
slice_data = data[i]
all_lenght = len(slice_data)
end_index = int(all_lenght * (1 - slice_rate))
samp_train = int(number * (1 - slice_rate)) # 700
Train_sample = []
Test_Sample = []
if enc:
enc_time = length // enc_step
samp_step = 0 # 用来计数Train采样次数
for j in range(samp_train):
random_start = np.random.randint(low=0, high=(end_index - 2 * length))
label = 0
for h in range(enc_time):
samp_step += 1
random_start += enc_step
sample = slice_data[random_start: random_start + length]
Train_sample.append(sample)
if samp_step == samp_train:
label = 1
break
if label:
break
else:
for j in range(samp_train):
random_start = np.random.randint(low=0, high=(end_index - length))
sample = slice_data[random_start:random_start + length]
Train_sample.append(sample)
# 抓取测试数据
for h in range(number - samp_train):
random_start = np.random.randint(low=end_index, high=(all_lenght - length))
sample = slice_data[random_start:random_start + length]
Test_Sample.append(sample)
Train_Samples[i] = Train_sample
Test_Samples[i] = Test_Sample
return Train_Samples, Test_Samples
# 仅抽样完成,打标签
def add_labels(train_test):
X = []
Y = []
label = 0
for i in filenames:
x = train_test[i]
X += x
lenx = len(x)
Y += [label] * lenx
label += 1
return X, Y
# one-hot编码
def one_hot(Train_Y, Test_Y):
Train_Y = np.array(Train_Y).reshape([-1, 1])
Test_Y = np.array(Test_Y).reshape([-1, 1])
Encoder = preprocessing.OneHotEncoder()
Encoder.fit(Train_Y)
Train_Y = Encoder.transform(Train_Y).toarray()
Test_Y = Encoder.transform(Test_Y).toarray()
Train_Y = np.asarray(Train_Y, dtype=np.int32)
Test_Y = np.asarray(Test_Y, dtype=np.int32)
return Train_Y, Test_Y
def scalar_stand(Train_X, Test_X):
# 用训练集标准差标准化训练集以及测试集
scalar = preprocessing.StandardScaler().fit(Train_X)
Train_X = scalar.transform(Train_X)
Test_X = scalar.transform(Test_X)
return Train_X, Test_X
def valid_test_slice(Test_X, Test_Y):
test_size = rate[2] / (rate[1] + rate[2])
ss = StratifiedShuffleSplit(n_splits=1, test_size=test_size)
for train_index, test_index in ss.split(Test_X, Test_Y):
X_valid, X_test = Test_X[train_index], Test_X[test_index]
Y_valid, Y_test = Test_Y[train_index], Test_Y[test_index]
return X_valid, Y_valid, X_test, Y_test
# 从所有.mat文件中读取出数据的字典
data = capture(original_path=d_path)
# 将数据切分为训练集、测试集
train, test = slice_enc(data)
# 为训练集制作标签,返回X,Y
Train_X, Train_Y = add_labels(train)
# 为测试集制作标签,返回X,Y
Test_X, Test_Y = add_labels(test)
# 为训练集Y/测试集One-hot标签
Train_Y, Test_Y = one_hot(Train_Y, Test_Y)
# 训练数据/测试数据 是否标准化.
if normal:
Train_X, Test_X = scalar_stand(Train_X, Test_X)
else:
# 需要做一个数据转换,转换成np格式.
Train_X = np.asarray(Train_X)
Test_X = np.asarray(Test_X)
# 将测试集切分为验证集合和测试集.
Valid_X, Valid_Y, Test_X, Test_Y = valid_test_slice(Test_X, Test_Y)
return Train_X, Train_Y, Valid_X, Valid_Y, Test_X, Test_Y
if __name__ == "__main__":
path = r'cwru_data,HP'
train_X, train_Y, valid_X, valid_Y, test_X= test_Y ( prepro=d_path,path=
length864,=
number1000,=
normalFalse,=
rate[0.5,0.25 ,0.25 ],=
encFalse,=
enc_step28)print
([train_X0:5])print
([train_Y0:5])import
模型训练代码 cwru_train.py
import os
. matplotlibaspyplot import plt
as numpy from np
. tensorflow_core.pythonimportkeras from Model
. tensorflow_core.python.kerasimportlayers , Conv1D, Activation, BatchNormalization, MaxPooling1D, Flattenimport Dense
as tensorflow from tf
. tensorflow_core.python.kerasimportregularizers import l2
as cwru_preprocess # 训练参数 preprocess
=
batch_size 128 =
epochs 20 =
num_classes 10 =
length 2048 =
BatchNorm True # 是否批量归一化 =
number 1000 # 每类样本的数量 =
normal True # 是否标准化 =
rate [ 0.7,0.2,0.1]# 测试集验证集划分比例 =
path r'cwru_data,HP' ,
x_train, y_train, x_valid, y_valid= x_test. y_test ( preprocess=prepro,d_path=path,length=length,
number=number,
normal=normal,
rate=rateTrue
enc,=28 enc_step)# 输入卷积的时候还需要修改一下,增加通道数目,
,
x_train= x_valid[ x_test : x_train,:,.]np,newaxis[: x_valid,:,.]np,newaxis[: x_test,:,.]np#[:,:,np.newaxis]是什么意思newaxis# 输入数据的维度 =
.
input_shape [x_train1shape:]print(
'训练样本维度:',.) x_trainprintshape(
'训练样本个数',.[ x_train0shape])print(
'验证样本的维度',.) x_validprintshape(
'验证样本个数',.[ x_valid0shape])print(
'测试样本的维度',.) x_testprintshape(
'测试样本个数',.[ x_test0shape])print(
'测试标签的维度',.) y_testclassshapeCwruModel
( ):Modeldef__init__
( ):self#构造函数super (
,)CwruModel. self()__init__#第1层卷积.
=
self(c1 = Conv1D16filters,=64 kernel_size,=16 strides,='same' padding,=( kernel_regularizer1e-4l2),=) input_shape #卷积层 input_shape.=
self(b1 ) BatchNormalization# BN层. =
self(a1 'relu' Activation)# 激活层. =
self(p1 = MaxPooling1D2pool_size)#池化层#第2层卷积 .
=
self(c2 = Conv1D32filters,=3 kernel_size,=1 strides,='same' padding).=
self(b2 ) BatchNormalization.=
self(a2 'relu' Activation)# 激活层. =
self(p2 = MaxPooling1D2pool_size,='valid' padding)#第3层卷积.
=
self(c3 = Conv1D64filters,=3 kernel_size,=1 strides,='same' padding).=
self(b3 ) BatchNormalization.=
self(a3 'relu' Activation)# 激活层. =
self(p3 = MaxPooling1D2pool_size,='valid' padding)#第4层卷积.
=
self(c4 = Conv1D64filters,=3 kernel_size,=1 strides,='same' padding).=
self(b4 ) BatchNormalization.=
self(a4 'relu' Activation)# 激活层. =
self(p4 = MaxPooling1D2pool_size,='valid' padding)#第5层卷积.
=
self(c5 = Conv1D64filters,=3 kernel_size,=1 strides,='valid' padding).=
self(b5 ) BatchNormalization.=
self(a5 'relu' Activation)# 激活层. =
self(p5 = MaxPooling1D2pool_size,='valid' padding)#从卷积到全连接需要展平.
=
self(flatten ) Flatten# 全连接层.
=
self(d1 = Dense100units,='relu' activation,=( kernel_regularizer1e-4l2))# 输出层.
=
self(d2 = Dense,units=num_classes'softmax' activation,=( kernel_regularizer1e-4l2))defcall
( ,)self: x=.
x ( self)c1=x.
x ( self)b1=x.
x ( self)a1=x.
x ( self)p1=x.
x ( self)c2=x.
x ( self)b2=x.
x ( self)a2=x.
x ( self)p2=x.
x ( self)c3=x.
x ( self)b3=x.
x ( self)a3=x.
x ( self)p3=x.
x ( self)c4=x.
x ( self)b4=x.
x ( self)a4=x.
x ( self)p4=x.
x ( self)c5=x.
x ( self)b5=x.
x ( self)a5=x.
x ( self)p5=x.
x ( self)flatten=x.
x ( self)d1=x.
y ( self)d2returnx=
( y
model ) CwruModel.compile
model(='Adam'optimizer,='categorical_crossentropy' loss,=[ metrics'accuracy'])#加载模型=
"./cwru_checkpoint/cwru_cnn.ckpt"
checkpoint_save_path if .
. os(path+exists'.index'checkpoint_save_path ) :print(
'-------------load the model-----------------').(
model)load_weights=checkpoint_save_path.
cp_callback . tf.keras(callbacks=ModelCheckpoint,filepath=checkpoint_save_pathTrue
save_weights_only,=True
save_best_only)=.
history ( model=fit,x=x_train, y=y_train, batch_size=batch_size, epochs=epochs1 verbose,=( validation_data,)x_valid, y_valid=True shuffle,=[ callbacks])cp_callback.(
model)summary#输出模型各层的参数状况# 显示训练集和验证集的acc和loss曲线 #根据compile参数metrics,history包含不同的内容
=
.
train_acc [ history'accuracy'history]#训练集准确率= .
val_acc [ history'val_accuracy'history]#测试集准确率= .
train_loss [ history'loss'history]#训练集损失率= .
val_loss [ history'val_loss'history]#测试集损失率. (
plt1subplot,2, 1) #画两行一列的第1个子图. (
plt,plot=train_loss'train_loss' label).(
plt,plot=val_loss'val_loss' label).(
plt'epochs'xlabel).(
plt'loss'ylabel).(
plt)legend#函数主要的作用就是给图加上图例, 要求线条有label. (
plt'Training and Validation loss'title).(
plt1subplot,2, 2) #画两行一列的第1个子图. (
plt,plot=train_acc'train_acc' label).(
plt,plot=val_acc'val_acc' label).(
plt'epochs'xlabel).(
plt'accuracy'ylabel).(
plt)legend#函数主要的作用就是给图加上图例, 要求线条有label. (
plt'Training and Validation accuracy'title).(
plt)showimportfrom
运行结果
使用模型进行预测 cwru_predict.py
import os
import datetime . time
as matplotlibimportpyplot as plt
from numpy . np
. tensorflow_coreimportpythonfromkeras . Model
. tensorflow_core.pythonimportkeras,layers , Conv1D, Activation, BatchNormalization, MaxPooling1Dimport Flattenas Dense
from tensorflow . tf
. tensorflow_core.pythonimportkerasimportregularizers as l2
# 训练参数 cwru_preprocess = preprocess
128
batch_size = 20
epochs = 10
num_classes = 2048
length = True
BatchNorm # 是否批量归一化 = 1000
number # 每类样本的数量 = True
normal # 是否标准化 = [
rate 0.7 ,0.2,0.1]# 测试集验证集划分比例= r'cwru_data,HP'
path , ,
x_train, y_train, x_valid= y_valid. x_test( y_test = preprocess,prepro=d_path,path=length,length=
number,number=
normal,normal=
rateTruerate,
enc=28) enc_step# 输入卷积的时候还需要修改一下,增加通道数目,,
=
x_train[ x_valid: x_test , x_train:,.],np[newaxis:, x_valid:,.],np[newaxis:, x_test:,.]#[:,:,np.newaxis]是什么意思np# 输入数据的维度newaxis= .
[
input_shape 1x_train:shape]print('训练样本维度:'
,.)print x_train(shape'训练样本个数'
,.[0 x_train]shape)print('验证样本的维度'
,.)print x_valid(shape'验证样本个数'
,.[0 x_valid]shape)print('测试样本的维度'
,.)print x_test(shape'测试样本个数'
,.[0 x_test]shape)print('测试标签的维度'
,.)class y_testCwruModelshape(
) :defModel__init__(
) :#构造函数selfsuper( ,
).CwruModel( self)#第1层卷积__init__.=
(
self=c1 16 Conv1D,filters=64, kernel_size=16, strides='same', padding=(1e-4 kernel_regularizer)l2,=)#卷积层 input_shape . input_shape=(
self)b1 # BN层 BatchNormalization.= (
self'relu'a1 ) Activation# 激活层.= (
self=p1 2 MaxPooling1D)pool_size#池化层#第2层卷积. =
(
self=c2 32 Conv1D,filters=3, kernel_size=1, strides='same') padding.=(
self)b2 . BatchNormalization=(
self'relu'a2 ) Activation# 激活层.= (
self=p2 2 MaxPooling1D,pool_size='valid') padding#第3层卷积.=
(
self=c3 64 Conv1D,filters=3, kernel_size=1, strides='same') padding.=(
self)b3 . BatchNormalization=(
self'relu'a3 ) Activation# 激活层.= (
self=p3 2 MaxPooling1D,pool_size='valid') padding#第4层卷积.=
(
self=c4 64 Conv1D,filters=3, kernel_size=1, strides='same') padding.=(
self)b4 . BatchNormalization=(
self'relu'a4 ) Activation# 激活层.= (
self=p4 2 MaxPooling1D,pool_size='valid') padding#第5层卷积.=
(
self=c5 64 Conv1D,filters=3, kernel_size=1, strides='valid') padding.=(
self)b5 . BatchNormalization=(
self'relu'a5 ) Activation# 激活层.= (
self=p5 2 MaxPooling1D,pool_size='valid') padding#从卷积到全连接需要展平.=
(
self)flatten # 全连接层 Flatten.=
(
self=d1 100 Dense,units='relu', activation=(1e-4 kernel_regularizer)l2)# 输出层.=
(
self=d2 , Dense=units'softmax'num_classes, activation=(1e-4 kernel_regularizer)l2)defcall(
, ):self= x.(
x ) self=c1.x(
x ) self=b1.x(
x ) self=a1.x(
x ) self=p1.x(
x ) self=c2.x(
x ) self=b2.x(
x ) self=a2.x(
x ) self=p2.x(
x ) self=c3.x(
x ) self=b3.x(
x ) self=a3.x(
x ) self=p3.x(
x ) self=c4.x(
x ) self=b4.x(
x ) self=a4.x(
x ) self=p4.x(
x ) self=c5.x(
x ) self=b5.x(
x ) self=a5.x(
x ) self=p5.x(
x ) self=flatten.x(
x ) self=d1.x(
y ) selfreturnd2=x(
) y
model . CwruModelcompile(
model='Adam',optimizer='categorical_crossentropy', loss=['accuracy' metrics])#加载模型="./cwru_checkpoint/cwru_cnn.ckpt"
if
checkpoint_save_path . .
( os+path'.index'exists)checkpoint_save_path : print('-------------load the model-----------------'
).()
model=load_weights.checkpoint_save_path.
cp_callback . tf(keras=callbacks,ModelCheckpoint=filepathTruecheckpoint_save_path,
save_weights_only=True)
save_best_only,=.
loss( accuracy , model)evaluateprintx_valid( y_valid'loss='
,)print( loss'accuracy='
,)=. accuracy(
y_pre [ model0predict:x_valid10])# print("------------预测样本数据----------")# print(x_valid[0:10])print
(
"------------预测结果概率----------"
)print()
print(y_pre"------------预测结果分类----------"
)#概率数组转化为标签, 如[0.1, 0.2, 0.7]转化为2print(
.
(,np=argmax1y_pre) axis)
运行结果
欢迎分享,转载请注明来源:内存溢出
微信扫一扫
支付宝扫一扫
评论列表(0条)