前言

  • TimesNet:用于一般时间序列分析的时间二维变化模型论文下载地址Github项目地址论文解读系列
  • 本文针对TimesNet模型参数与模型架构开源代码进行讲解,本人水平有限,若出现解读错误,欢迎指出
  • 开源代码中分别实现长短期序列预测、插补、异常检测、分类任务,本文针对长短期序列预测为例进行讲解。
  • 后面会推出自定义项目,以及使用NNI框架对模型关键参数进行调节,也可能会推出模型压缩等相关文章,如果感兴趣请关注点赞收藏,谢谢

参数设定模块(run)

  • 首先打开run.py文件保证在不修改任何参数的情况下,代码可以跑通,这里windows系统需要将代码中--is_training--model_id--model--data参数中required=True选项删除,否则会报错。--num_workers参数需要置为0。
  • 其次需要在项目文件夹下新建子文件夹data用来存放训练数据,可以使用ETTh1数据,这里提供下载地址
  • 运行run.py训练完成不报错就成功了,建议在GPU上进行,CPU几乎没办法训练。

参数含义

  • 下面是各参数含义(注释)
parser = argparse.ArgumentParser(description='TimesNet')

    # basic config
    # 任务类型
    parser.add_argument('--task_name', type=str, default='long_term_forecast',
                        help='task name, options:[long_term_forecast, short_term_forecast, imputation, classification, anomaly_detection]')
    # 置为训练模式
    parser.add_argument('--is_training', type=int, default=1, help='status')
    # 模型名称
    parser.add_argument('--model_id', type=str, default='test', help='model id')
    # 选择模型
    parser.add_argument('--model', type=str, default='TimesNet',
                        help='model name, options: [Autoformer, Transformer, TimesNet]')

    # data loader
    # 数据名称
    parser.add_argument('--data', type=str, default='ETTh1', help='dataset type')
    # 数据所在文件夹
    parser.add_argument('--root_path', type=str, default='./data/', help='root path of the data file')
    # 数据文件全称
    parser.add_argument('--data_path', type=str, default='ETTh1.csv', help='data file')
    # 时间特征处理方式
    parser.add_argument('--features', type=str, default='M',
                        help='forecasting task, options:[M, S, MS]; M:multivariate predict multivariate, S:univariate predict univariate, MS:multivariate predict univariate')
    # 目标列列名
    parser.add_argument('--target', type=str, default='OT', help='target feature in S or MS task')
    # 时间采集粒度
    parser.add_argument('--freq', type=str, default='h',
                        help='freq for time features encoding, options:[s:secondly, t:minutely, h:hourly, d:daily, b:business days, w:weekly, m:monthly], you can also use more detailed freq like 15min or 3h')
    # 模型权重保存文件夹
    parser.add_argument('--checkpoints', type=str, default='./checkpoints/', help='location of model checkpoints')

    # forecasting task
    # 回视窗口
    parser.add_argument('--seq_len', type=int, default=96, help='input sequence length')
    # 先验序列长度
    parser.add_argument('--label_len', type=int, default=48, help='start token length')
    # 预测窗口长度
    parser.add_argument('--pred_len', type=int, default=96, help='prediction sequence length')
    # 季节模式(针对M4数据集)
    parser.add_argument('--seasonal_patterns', type=str, default='Monthly', help='subset for M4')

    # inputation task
    # 插补任务中数据丢失率
    parser.add_argument('--mask_rate', type=float, default=0.25, help='mask ratio')

    # anomaly detection task
    # 异常检测中异常点占比
    parser.add_argument('--anomaly_ratio', type=float, default=0.25, help='prior anomaly ratio (%)')

    # model define
    # TimesBlock中傅里叶变换,频率排名前k个周期
    parser.add_argument('--top_k', type=int, default=5, help='for TimesBlock')
    # Inception中卷积核个数
    parser.add_argument('--num_kernels', type=int, default=6, help='for Inception')
    # encoder输入特征数
    parser.add_argument('--enc_in', type=int, default=7, help='encoder input size')
    # decoder输入特征数
    parser.add_argument('--dec_in', type=int, default=7, help='decoder input size')
    # 输出通道数
    parser.add_argument('--c_out', type=int, default=7, help='output size')
    # 线性层隐含神经元个数
    parser.add_argument('--d_model', type=int, default=512, help='dimension of model')
    # 多头注意力机制
    parser.add_argument('--n_heads', type=int, default=8, help='num of heads')
    # encoder层数
    parser.add_argument('--e_layers', type=int, default=2, help='num of encoder layers')
    # decoder层数
    parser.add_argument('--d_layers', type=int, default=1, help='num of decoder layers')
    # FFN层隐含神经元个数
    parser.add_argument('--d_ff', type=int, default=2048, help='dimension of fcn')
    # 滑动窗口长度
    parser.add_argument('--moving_avg', type=int, default=25, help='window size of moving average')
    # 对Q进行采样,对Q采样的因子数
    parser.add_argument('--factor', type=int, default=1, help='attn factor')
    # 是否下采样操作pooling
    parser.add_argument('--distil', action='store_false',
                        help='whether to use distilling in encoder, using this argument means not using distilling',
                        default=True)
    # dropout率
    parser.add_argument('--dropout', type=float, default=0.1, help='dropout')
    # 时间特征嵌入方式
    parser.add_argument('--embed', type=str, default='timeF',
                        help='time features encoding, options:[timeF, fixed, learned]')
    # 激活函数类型
    parser.add_argument('--activation', type=str, default='gelu', help='activation')
    # 是否输出attention
    parser.add_argument('--output_attention', action='store_true', help='whether to output attention in ecoder')

    # optimization
    # 并行核心数
    parser.add_argument('--num_workers', type=int, default=0, help='data loader num workers')
    # 实验轮数
    parser.add_argument('--itr', type=int, default=1, help='experiments times')
    # 训练迭代次数
    parser.add_argument('--train_epochs', type=int, default=10, help='train epochs')
    # batch size大小
    parser.add_argument('--batch_size', type=int, default=32, help='batch size of train input data')
    # early stopping机制容忍次数
    parser.add_argument('--patience', type=int, default=3, help='early stopping patience')
    # 学习率
    parser.add_argument('--learning_rate', type=float, default=0.0001, help='optimizer learning rate')
    parser.add_argument('--des', type=str, default='test', help='exp description')
    # 损失函数
    parser.add_argument('--loss', type=str, default='MSE', help='loss function')
    # 学习率下降策略
    parser.add_argument('--lradj', type=str, default='type1', help='adjust learning rate')
    # 使用混合精度训练
    parser.add_argument('--use_amp', action='store_true', help='use automatic mixed precision training', default=False)

    # GPU
    parser.add_argument('--use_gpu', type=bool, default=False, help='use gpu')
    parser.add_argument('--gpu', type=int, default=0, help='gpu')
    parser.add_argument('--use_multi_gpu', action='store_true', help='use multiple gpus', default=False)
    parser.add_argument('--devices', type=str, default='0,1,2,3', help='device ids of multile gpus')

    # de-stationary projector params
    parser.add_argument('--p_hidden_dims', type=int, nargs='+', default=[128, 128],
                        help='hidden layer dimensions of projector (List)')
    parser.add_argument('--p_hidden_layers', type=int, default=2, help='number of hidden layers in projector')

我们在exp.train(setting)行打上断点跳到训练主函数exp_long_term_forecasting.py

数据处理模块

_get_data中找到数据处理函数data_factory.py点击进入,可以看到各标准数据集处理方法:

data_dict = {
    'ETTh1': Dataset_ETT_hour,
    'ETTh2': Dataset_ETT_hour,
    'ETTm1': Dataset_ETT_minute,
    'ETTm2': Dataset_ETT_minute,
    'custom': Dataset_Custom,
    'm4': Dataset_M4,
    'PSM': PSMSegLoader,
    'MSL': MSLSegLoader,
    'SMAP': SMAPSegLoader,
    'SMD': SMDSegLoader,
    'SWAT': SWATSegLoader,
    'UEA': UEAloader
}
  • 由于我们的数据集是ETTh1,那么数据处理的方式为Dataset_ETT_hour,我们进入data_loader.py文件,找到Dataset_ETT_hour
  • __init__主要用于传各类参数,这里不过多赘述,主要对__read_data__进行说明
     def __read_data__(self):
        # 数据标准化实例
        self.scaler = StandardScaler()
        # 读取数据
        df_raw = pd.read_csv(os.path.join(self.root_path,
                                          self.data_path))
        # 计算数据起始点
        border1s = [0, 12 * 30 * 24 - self.seq_len, 12 * 30 * 24 + 4 * 30 * 24 - self.seq_len]
        border2s = [12 * 30 * 24, 12 * 30 * 24 + 4 * 30 * 24, 12 * 30 * 24 + 8 * 30 * 24]
        border1 = border1s[self.set_type]
        border2 = border2s[self.set_type]

        # 如果预测对象为多变量预测或多元预测单变量
        if self.features == 'M' or self.features == 'MS':
            # 取除日期列的其他所有列
            cols_data = df_raw.columns[1:]
            df_data = df_raw[cols_data]
        # 若预测类型为S(单特征预测单特征)
        elif self.features == 'S':
            # 取特征列
            df_data = df_raw[[self.target]]

        # 将数据进行归一化
        if self.scale:
            train_data = df_data[border1s[0]:border2s[0]]
            self.scaler.fit(train_data.values)
            data = self.scaler.transform(df_data.values)
        else:
            data = df_data.values
        # 取日期列
        df_stamp = df_raw[['date']][border1:border2]
        # 利用pandas将数据转换为日期格式
        df_stamp['date'] = pd.to_datetime(df_stamp.date)
        # 构建时间特征
        if self.timeenc == 0:
            df_stamp['month'] = df_stamp.date.apply(lambda row: row.month, 1)
            df_stamp['day'] = df_stamp.date.apply(lambda row: row.day, 1)
            df_stamp['weekday'] = df_stamp.date.apply(lambda row: row.weekday(), 1)
            df_stamp['hour'] = df_stamp.date.apply(lambda row: row.hour, 1)
            data_stamp = df_stamp.drop(['date'], 1).values
        elif self.timeenc == 1:
            # 时间特征构造函数
            data_stamp = time_features(pd.to_datetime(df_stamp['date'].values), freq=self.freq)
            # 转置
            data_stamp = data_stamp.transpose(1, 0)
        
        # 取数据特征列
        self.data_x = data[border1:border2]
        self.data_y = data[border1:border2]
        self.data_stamp = data_stamp
  • 需要注意的是time_features函数,用来提取日期特征,比如't':['month','day','weekday','hour','minute'],表示提取月,天,周,小时,分钟。可以打开timefeatures.py文件进行查阅,同样后期也可以加一些日期编码进去。
  • 同样的,对__getitem__进行说明
    def __getitem__(self, index):
        # 随机取得标签
        s_begin = index

        # 训练区间
        s_end = s_begin + self.seq_len
        # 有标签区间+无标签区间(预测时间步长)
        r_begin = s_end - self.label_len
        r_end = r_begin + self.label_len + self.pred_len

        # 取训练数据
        seq_x = self.data_x[s_begin:s_end]
        seq_y = self.data_y[r_begin:r_end]
        # 取训练数据对应时间特征
        seq_x_mark = self.data_stamp[s_begin:s_end]
        # 取有标签区间+无标签区间(预测时间步长)对应时间特征
        seq_y_mark = self.data_stamp[r_begin:r_end]

        return seq_x, seq_y, seq_x_mark, seq_y_mark

  • 关于这部分数据处理可能有些绕,开源看我在SCInet代码讲解中数据处理那一部分,绘制了数据集划分图。
    请添加图片描述

网络架构

  • 打开models文件夹下的TimesNet.py,可以看到TimesBlockModel

TimesBlock

  • 可以看到一维离散傅里叶变换FFT_for_Period函数逐行代码解析请看注释,示意图如下:
    在这里插入图片描述
def FFT_for_Period(x, k=2):
    # [B, T, C]
    # 一维离散傅里叶变换,沿T维度[B, T, C] --> [B, T//2+1, C]
    xf = torch.fft.rfft(x, dim=1)
    # 通过增幅寻找周期
    # 在每个频率上的平均值,然后对所有频率取平均[B, T//2+1, C] --> [T//2+1]
    frequency_list = abs(xf).mean(0).mean(-1)
    # 将第一个频率值设置为0(直流分量)
    frequency_list[0] = 0
    # 在频率列表中找到前K个最大值的索引
    _, top_list = torch.topk(frequency_list, k)
    # 将top_list张量转换为numpy数组
    top_list = top_list.detach().cpu().numpy()
    # 通过将序列的总长度除以每个顶部频率,计算周期
    period = x.shape[1] // top_list
    # 返回period,计算选定的顶部频率在最后一个维度C上的平均幅度[B, T//2+1, C] --> [B, k]
    return period, abs(xf).mean(-1)[:, top_list]
  • 转到TimesBlock类,论文中提到的parameter-efficient块是一个类似于Inception视觉模型的架构。
    在这里插入图片描述
  • Inception模型采用多分支结构,它将1×1卷积、3×3卷积最大池化堆叠在一起。这种结构既可以增加网络的宽度,又可以增强网络对不同尺寸的适应性。
  • 打开layers文件夹下的Conv_Blocks.py文件。里面实现了两个类Inception_Block_V1Inception_Block_V2。作者在TimesBlock中只使用了V1版本。
  • 代码逐行解析看注释,重点看各卷积层卷积核的变化和padding操作。
class Inception_Block_V1(nn.Module):
    def __init__(self, in_channels, out_channels, num_kernels=6, init_weight=True):
        super(Inception_Block_V1, self).__init__()
        self.in_channels = in_channels
        self.out_channels = out_channels
        self.num_kernels = num_kernels
        kernels = []
        for i in range(self.num_kernels):
            # 二维卷积层,卷积核(1, 3, 5, 7,...)
            kernels.append(nn.Conv2d(in_channels, out_channels, kernel_size=2 * i + 1, padding=i))
        self.kernels = nn.ModuleList(kernels)
        if init_weight:
            self._initialize_weights()

    # 初始化权重
    def _initialize_weights(self):
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
                if m.bias is not None:
                    nn.init.constant_(m.bias, 0)

    def forward(self, x):
        # 新建列表
        res_list = []
        # 遍历每一层2维卷积
        for i in range(self.num_kernels):
            # 将每一层的结果添加到列表中
            res_list.append(self.kernels[i](x))
        # 将列表沿最后一个维度拼接, 然后在最后一个维度求平均
        res = torch.stack(res_list, dim=-1).mean(-1)
        return res


class Inception_Block_V2(nn.Module):
    def __init__(self, in_channels, out_channels, num_kernels=6, init_weight=True):
        super(Inception_Block_V2, self).__init__()
        self.in_channels = in_channels
        self.out_channels = out_channels
        self.num_kernels = num_kernels
        kernels = []
        for i in range(self.num_kernels // 2):
            # 二维卷积,卷积核为[(1,3), (1,5), (1,7)]
            kernels.append(nn.Conv2d(in_channels, out_channels, kernel_size=[1, 2 * i + 3], padding=[0, i + 1]))
            # 二维卷积,卷积核为[(3,1), (5,1), (7,1)]
            kernels.append(nn.Conv2d(in_channels, out_channels, kernel_size=[2 * i + 3, 1], padding=[i + 1, 0]))
        # 再添加一个2维卷积,卷积核为(1,1)
        kernels.append(nn.Conv2d(in_channels, out_channels, kernel_size=1))
        self.kernels = nn.ModuleList(kernels)
        if init_weight:
            self._initialize_weights()

    def _initialize_weights(self):
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
                if m.bias is not None:
                    nn.init.constant_(m.bias, 0)

    def forward(self, x):
        res_list = []
        for i in range(self.num_kernels + 1):
            # 得到每层的输出
            res_list.append(self.kernels[i](x))
        # 将列表沿最后一个维度拼接, 然后在最后一个维度求平均
        res = torch.stack(res_list, dim=-1).mean(-1)
        return res
  • 回到TimesBlock类,再经过一维离散傅里叶变换后,取得频率排名前k的周期,将其进行padding使其符合卷积操作数据维度。将得到的结果进行堆叠拼接。然后把频率权重进行自适应聚合,最后加入残差连接得到输出。具体看代码解析
class TimesBlock(nn.Module):
    def __init__(self, configs):
        super(TimesBlock, self).__init__()
        self.seq_len = configs.seq_len
        self.pred_len = configs.pred_len
        self.k = configs.top_k
        # parameter-efficient块
        self.conv = nn.Sequential(
            Inception_Block_V1(configs.d_model, configs.d_ff,
                               num_kernels=configs.num_kernels),
            nn.GELU(),
            Inception_Block_V1(configs.d_ff, configs.d_model,
                               num_kernels=configs.num_kernels)
        )

    def forward(self, x):
        # 得到B, T, N
        B, T, N = x.size()
        # 得到周期以及对应权重
        period_list, period_weight = FFT_for_Period(x, self.k)

        res = []
        for i in range(self.k):
            # 取周期
            period = period_list[i]
            # padding操作
            if (self.seq_len + self.pred_len) % period != 0:
                length = (((self.seq_len + self.pred_len) // period) + 1) * period
                # padding:[batch,length - seq_len - pred_len,feature]
                padding = torch.zeros([x.shape[0], (length - (self.seq_len + self.pred_len)), x.shape[2]]).to(x.device)
                # 沿len维度拼接
                out = torch.cat([x, padding], dim=1)
            # 不需要padding
            else:
                length = (self.seq_len + self.pred_len)
                out = x
            # 重塑[batch,length // period,period,feature] --> [batch,feature,length // period,period](深拷贝)
            out = out.reshape(B, length // period, period,
                              N).permute(0, 3, 1, 2).contiguous()
            # 进入卷积网络
            out = self.conv(out)
            # 重塑结果(batch, -1, feature)
            out = out.permute(0, 2, 3, 1).reshape(B, -1, N)
            res.append(out[:, :(self.seq_len + self.pred_len), :])
        # 将结果沿feature维度拼接
        res = torch.stack(res, dim=-1)
        # 自适应聚合
        period_weight = F.softmax(period_weight, dim=1)
        # 添加两个维度,重复使其与res维度一致
        period_weight = period_weight.unsqueeze(
            1).unsqueeze(1).repeat(1, T, N, 1)
        # 沿feature维度求和
        res = torch.sum(res * period_weight, -1)
        # 残差连接
        res = res + x
        return res

TimesNet

  • 可以看到model类,针对每个任务类型,建立函数,forecast(预测),imputation(插值),anomaly_detection(异常检测),classification(分类)。
  • 详细内部处理请看代码注释
# 预测类函数
    def forecast(self, x_enc, x_mark_enc, x_dec, x_mark_dec):
        # 非稳态Transfromer标准归一化
        # 计算均值
        means = x_enc.mean(1, keepdim=True).detach()
        # 减去均值
        x_enc = x_enc - means
        # 计算标准差
        stdev = torch.sqrt(
            torch.var(x_enc, dim=1, keepdim=True, unbiased=False) + 1e-5)
        # 除以标准差
        x_enc /= stdev

        # embedding
        enc_out = self.enc_embedding(x_enc, x_mark_enc)  # [B,T,C]
        # [B,T,C] --> [B,C,T] --> [B,T,C]
        enc_out = self.predict_linear(enc_out.permute(0, 2, 1)).permute(
            0, 2, 1)  # align temporal dimension

        # TimesNet
        for i in range(self.layer):
            # layer_norm
            enc_out = self.layer_norm(self.model[i](enc_out))
        # 映射层
        dec_out = self.projection(enc_out)

        # 非稳态Transfromer反归一化
        dec_out = dec_out * \
                  (stdev[:, 0, :].unsqueeze(1).repeat(
                      1, self.pred_len + self.seq_len, 1))
        dec_out = dec_out + \
                  (means[:, 0, :].unsqueeze(1).repeat(
                      1, self.pred_len + self.seq_len, 1))
        return dec_out

    def imputation(self, x_enc, x_mark_enc, x_dec, x_mark_dec, mask):
        # 非稳态Transfromer标准归一化
        # 计算均值
        means = torch.sum(x_enc, dim=1) / torch.sum(mask == 1, dim=1)
        means = means.unsqueeze(1).detach()
        # 减去均值
        x_enc = x_enc - means
        x_enc = x_enc.masked_fill(mask == 0, 0)
        # 计算方差
        stdev = torch.sqrt(torch.sum(x_enc * x_enc, dim=1) /
                           torch.sum(mask == 1, dim=1) + 1e-5)
        stdev = stdev.unsqueeze(1).detach()
        # 除以方差
        x_enc /= stdev

        # embedding
        enc_out = self.enc_embedding(x_enc, x_mark_enc)  # [B,T,C]
        # TimesNet
        for i in range(self.layer):
            enc_out = self.layer_norm(self.model[i](enc_out))
        # porject back
        dec_out = self.projection(enc_out)

        # 非稳态Transfromer反归一化
        dec_out = dec_out * \
                  (stdev[:, 0, :].unsqueeze(1).repeat(
                      1, self.pred_len + self.seq_len, 1))
        dec_out = dec_out + \
                  (means[:, 0, :].unsqueeze(1).repeat(
                      1, self.pred_len + self.seq_len, 1))
        return dec_out

    def anomaly_detection(self, x_enc):
        # Normalization from Non-stationary Transformer
        means = x_enc.mean(1, keepdim=True).detach()
        x_enc = x_enc - means
        stdev = torch.sqrt(
            torch.var(x_enc, dim=1, keepdim=True, unbiased=False) + 1e-5)
        x_enc /= stdev

        # embedding
        enc_out = self.enc_embedding(x_enc, None)  # [B,T,C]
        # TimesNet
        for i in range(self.layer):
            enc_out = self.layer_norm(self.model[i](enc_out))
        # porject back
        dec_out = self.projection(enc_out)

        # De-Normalization from Non-stationary Transformer
        dec_out = dec_out * \
                  (stdev[:, 0, :].unsqueeze(1).repeat(
                      1, self.pred_len + self.seq_len, 1))
        dec_out = dec_out + \
                  (means[:, 0, :].unsqueeze(1).repeat(
                      1, self.pred_len + self.seq_len, 1))
        return dec_out

    def classification(self, x_enc, x_mark_enc):
        # embedding
        enc_out = self.enc_embedding(x_enc, None)  # [B,T,C]
        # TimesNet
        for i in range(self.layer):
            enc_out = self.layer_norm(self.model[i](enc_out))

        # Output
        # the output transformer encoder/decoder embeddings don't include non-linearity
        output = self.act(enc_out)
        output = self.dropout(output)
        # zero-out padding embeddings
        output = output * x_mark_enc.unsqueeze(-1)
        # (batch_size, seq_length * d_model)
        output = output.reshape(output.shape[0], -1)
        output = self.projection(output)  # (batch_size, num_classes)
        return output

  • 看到model类中__init__方法和forward函数
class Model(nn.Module):

    def __init__(self, configs):
        super(Model, self).__init__()
        self.configs = configs
        self.task_name = configs.task_name
        self.seq_len = configs.seq_len
        self.label_len = configs.label_len
        self.pred_len = configs.pred_len
        # TimesBlock堆叠
        self.model = nn.ModuleList([TimesBlock(configs)
                                    for _ in range(configs.e_layers)])
        self.enc_embedding = DataEmbedding(configs.enc_in, configs.d_model, configs.embed, configs.freq,
                                           configs.dropout)
        self.layer = configs.e_layers
        self.layer_norm = nn.LayerNorm(configs.d_model)
        # 若任务是长期预测或短期预测
        if self.task_name == 'long_term_forecast' or self.task_name == 'short_term_forecast':
            # 线性层输出pred_len
            self.predict_linear = nn.Linear(
                self.seq_len, self.pred_len + self.seq_len)
            # 映射,线性层,输出c_out
            self.projection = nn.Linear(
                configs.d_model, configs.c_out, bias=True)
        # 若任务为插值或异常检测
        if self.task_name == 'imputation' or self.task_name == 'anomaly_detection':
            # 映射,线性层,输出c_out
            self.projection = nn.Linear(
                configs.d_model, configs.c_out, bias=True)
        # 若任务为分类
        if self.task_name == 'classification':
            # gelu激活函数
            self.act = F.gelu
            # dropout层
            self.dropout = nn.Dropout(configs.dropout)
            # 映射,线性层,输出num_class
            self.projection = nn.Linear(
                configs.d_model * configs.seq_len, configs.num_class)
    def forward(self, x_enc, x_mark_enc, x_dec, x_mark_dec, mask=None):
        if self.task_name == 'long_term_forecast' or self.task_name == 'short_term_forecast':
            dec_out = self.forecast(x_enc, x_mark_enc, x_dec, x_mark_dec)
            return dec_out[:, -self.pred_len:, :]  # [B, L, D]
        if self.task_name == 'imputation':
            dec_out = self.imputation(
                x_enc, x_mark_enc, x_dec, x_mark_dec, mask)
            return dec_out  # [B, L, D]
        if self.task_name == 'anomaly_detection':
            dec_out = self.anomaly_detection(x_enc)
            return dec_out  # [B, L, D]
        if self.task_name == 'classification':
            dec_out = self.classification(x_enc, x_mark_enc)
            return dec_out  # [B, N]
        return None

Embed

  • 在TimesNet中用到了DataEmbedding,其定义在layers文件夹下Embed.py文件中。其他所有informer类模型embedding操作也在这个文件夹中,这里正好做一个整体的解释。
class PositionalEmbedding(nn.Module):
    def __init__(self, d_model, max_len=5000):
        super(PositionalEmbedding, self).__init__()
        # 在对数空间中计算位置编码
        pe = torch.zeros(max_len, d_model).float()
        pe.require_grad = False

        position = torch.arange(0, max_len).float().unsqueeze(1)
        div_term = (torch.arange(0, d_model, 2).float()
                    * -(math.log(10000.0) / d_model)).exp()

        # 使用正弦和余弦函数计算位置编码
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)

        pe = pe.unsqueeze(0)
        self.register_buffer('pe', pe)

    def forward(self, x):
        return self.pe[:, :x.size(1)]


class TokenEmbedding(nn.Module):
    def __init__(self, c_in, d_model):
        super(TokenEmbedding, self).__init__()
        padding = 1 if torch.__version__ >= '1.5.0' else 2
        # 通过一维卷积进行标记嵌入
        self.tokenConv = nn.Conv1d(in_channels=c_in, out_channels=d_model,
                                   kernel_size=3, padding=padding, padding_mode='circular', bias=False)
        for m in self.modules():
            if isinstance(m, nn.Conv1d):
                nn.init.kaiming_normal_(
                    m.weight, mode='fan_in', nonlinearity='leaky_relu')

    def forward(self, x):
        x = self.tokenConv(x.permute(0, 2, 1)).transpose(1, 2)
        return x


class FixedEmbedding(nn.Module):
    def __init__(self, c_in, d_model):
        super(FixedEmbedding, self).__init__()

        w = torch.zeros(c_in, d_model).float()
        w.require_grad = False

        position = torch.arange(0, c_in).float().unsqueeze(1)
        div_term = (torch.arange(0, d_model, 2).float()
                    * -(math.log(10000.0) / d_model)).exp()

        # 使用正弦和余弦函数计算位置编码
        w[:, 0::2] = torch.sin(position * div_term)
        w[:, 1::2] = torch.cos(position * div_term)
        # 使用固定嵌入层进行标记嵌入
        self.emb = nn.Embedding(c_in, d_model)
        self.emb.weight = nn.Parameter(w, requires_grad=False)

    def forward(self, x):
        return self.emb(x).detach()


class TemporalEmbedding(nn.Module):
    def __init__(self, d_model, embed_type='fixed', freq='h'):
        super(TemporalEmbedding, self).__init__()

        minute_size = 4
        hour_size = 24
        weekday_size = 7
        day_size = 32
        month_size = 13

        Embed = FixedEmbedding if embed_type == 'fixed' else nn.Embedding
        # 如果频率为't',使用固定嵌入层进行标记嵌入
        if freq == 't':
            self.minute_embed = Embed(minute_size, d_model)
        # 使用固定嵌入层进行小时标记嵌入
        self.hour_embed = Embed(hour_size, d_model)
        # 使用固定嵌入层进行星期几标记嵌入
        self.weekday_embed = Embed(weekday_size, d_model)
        # 使用固定嵌入层进行日期标记嵌入
        self.day_embed = Embed(day_size, d_model)
        # 使用固定嵌入层进行月份标记嵌入
        self.month_embed = Embed(month_size, d_model)

    def forward(self, x):
        x = x.long()
        # 根据频率选择对应的嵌入进行计算
        minute_x = self.minute_embed(x[:, :, 4]) if hasattr(
            self, 'minute_embed') else 0.
        hour_x = self.hour_embed(x[:, :, 3])
        weekday_x = self.weekday_embed(x[:, :, 2])
        day_x = self.day_embed(x[:, :, 1])
        month_x = self.month_embed(x[:, :, 0])

        # 将不同的嵌入进行相加得到最终的时间特征嵌入
        return hour_x + weekday_x + day_x + month_x + minute_x


class TimeFeatureEmbedding(nn.Module):
    def __init__(self, d_model, embed_type='timeF', freq='h'):
        super(TimeFeatureEmbedding, self).__init__()

        freq_map = {'h': 4, 't': 5, 's': 6,
                    'm': 1, 'a': 1, 'w': 2, 'd': 3, 'b': 3}
        d_inp = freq_map[freq]
        # 使用线性层进行时间特征嵌入
        self.embed = nn.Linear(d_inp, d_model, bias=False)

    def forward(self, x):
        return self.embed(x)


class DataEmbedding(nn.Module):
    def __init__(self, c_in, d_model, embed_type='fixed', freq='h', dropout=0.1):
        super(DataEmbedding, self).__init__()

        self.value_embedding = TokenEmbedding(c_in=c_in, d_model=d_model)
        self.position_embedding = PositionalEmbedding(d_model=d_model)
        self.temporal_embedding = TemporalEmbedding(d_model=d_model, embed_type=embed_type,
                                                    freq=freq) if embed_type != 'timeF' else TimeFeatureEmbedding(
            d_model=d_model, embed_type=embed_type, freq=freq)
        self.dropout = nn.Dropout(p=dropout)

    def forward(self, x, x_mark):
        if x_mark is None:
            # 如果没有时间标记,则只使用数值嵌入和位置嵌入
            x = self.value_embedding(x) + self.position_embedding(x)
        else:
            # 如果有时间标记,则使用数值嵌入、时间嵌入和位置嵌入
            x = self.value_embedding(
                x) + self.temporal_embedding(x_mark) + self.position_embedding(x)
        return self.dropout(x)


class DataEmbedding_wo_pos(nn.Module):
    def __init__(self, c_in, d_model, embed_type='fixed', freq='h', dropout=0.1):
        super(DataEmbedding_wo_pos, self).__init__()

        # 值嵌入层,将输入的特征向量映射到d_model维度的向量空间
        self.value_embedding = TokenEmbedding(c_in=c_in, d_model=d_model)
        # 位置嵌入层,用于对输入进行位置编码
        self.position_embedding = PositionalEmbedding(d_model=d_model)
        # 时间嵌入层,根据embed_type和freq对输入进行时间编码
        # 如果embed_type不是'timeF',则使用TemporalEmbedding
        # 否则使用TimeFeatureEmbedding
        self.temporal_embedding = TemporalEmbedding(d_model=d_model, embed_type=embed_type,
                                                    freq=freq) if embed_type != 'timeF' else TimeFeatureEmbedding(
            d_model=d_model, embed_type=embed_type, freq=freq)
        # Dropout层,用于在训练过程中进行随机失活
        self.dropout = nn.Dropout(p=dropout)

    def forward(self, x, x_mark):
        if x_mark is None:
            # 如果x_mark为空,则仅应用值嵌入到x
            x = self.value_embedding(x)
        else:
            # 否则,将值嵌入和时间嵌入相加
            x = self.value_embedding(x) + self.temporal_embedding(x_mark)
        return self.dropout(x)


class PatchEmbedding(nn.Module):
    def __init__(self, d_model, patch_len, stride, padding, dropout):
        super(PatchEmbedding, self).__init__()
        # Patching
        # patch_len是每个patch的长度
        self.patch_len = patch_len
        # stride是patch的步幅
        self.stride = stride
        # padding_patch_layer用于对输入进行补零操作
        self.padding_patch_layer = nn.ReplicationPad1d((0, padding))

        # Backbone, Input encoding: projection of feature vectors onto a d-dim vector space
        # value_embedding将每个patch的特征向量映射到d_model维度的向量空间
        self.value_embedding = nn.Linear(patch_len, d_model, bias=False)

        # Positional embedding
        # 位置嵌入层,用于对输入进行位置编码
        self.position_embedding = PositionalEmbedding(d_model)

        # Residual dropout
        # Dropout层
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        # 对输入进行patch操作
        n_vars = x.shape[1]
        # 对输入进行补零操作
        x = self.padding_patch_layer(x)
        # 滑动窗口分割输入为多个patch
        x = x.unfold(dimension=-1, size=self.patch_len, step=self.stride)
        # 重塑张量形状
        x = torch.reshape(x, (x.shape[0] * x.shape[1], x.shape[2], x.shape[3]))
        # 输入 encoding
        x = self.value_embedding(x) + self.position_embedding(x)
        return self.dropout(x), n_vars

模型训练

  • 这里我主要注释一下train函数,validtest函数都差不多,只是有些操作不需要删减了而已。
    def train(self, setting):
        # 取得训练、验证、测试数据及数据加载器
        train_data, train_loader = self._get_data(flag='train')
        vali_data, vali_loader = self._get_data(flag='val')
        test_data, test_loader = self._get_data(flag='test')

        path = os.path.join(self.args.checkpoints, setting)
        # 创建模型保存路径
        if not os.path.exists(path):
            os.makedirs(path)

        # 获取当前时间
        time_now = time.time()

        # 取训练步数
        train_steps = len(train_loader)
        # 设置早停参数
        early_stopping = EarlyStopping(patience=self.args.patience, verbose=True)

        # 选择优化器
        model_optim = self._select_optimizer()
        # 选择损失函数
        criterion = self._select_criterion()

        # 如果多GPU并行
        if self.args.use_amp:
            scaler = torch.cuda.amp.GradScaler()

        # 训练次数
        for epoch in range(self.args.train_epochs):
            iter_count = 0
            train_loss = []

            self.model.train()
            epoch_time = time.time()
            for i, (batch_x, batch_y, batch_x_mark, batch_y_mark) in enumerate(train_loader):
                iter_count += 1
                # 梯度归零
                model_optim.zero_grad()

                # 取训练数据
                batch_x = batch_x.float().to(self.device)

                batch_y = batch_y.float().to(self.device)
                batch_x_mark = batch_x_mark.float().to(self.device)
                batch_y_mark = batch_y_mark.float().to(self.device)

                # decoder输入
                dec_inp = torch.zeros_like(batch_y[:, -self.args.pred_len:, :]).float()
                dec_inp = torch.cat([batch_y[:, :self.args.label_len, :], dec_inp], dim=1).float().to(self.device)

                # encoder - decoder
                if self.args.use_amp:
                    with torch.cuda.amp.autocast():
                        if self.args.output_attention:
                            outputs = self.model(batch_x, batch_x_mark, dec_inp, batch_y_mark)[0]
                        else:
                            outputs = self.model(batch_x, batch_x_mark, dec_inp, batch_y_mark)

                        f_dim = -1 if self.args.features == 'MS' else 0
                        outputs = outputs[:, -self.args.pred_len:, f_dim:]
                        batch_y = batch_y[:, -self.args.pred_len:, f_dim:].to(self.device)
                        loss = criterion(outputs, batch_y)
                        train_loss.append(loss.item())
                else:
                    if self.args.output_attention:
                        outputs = self.model(batch_x, batch_x_mark, dec_inp, batch_y_mark)[0]
                    else:
                        outputs = self.model(batch_x, batch_x_mark, dec_inp, batch_y_mark)

                    # 如果预测方式为MS,取最后1列否则取第1列
                    f_dim = -1 if self.args.features == 'MS' else 0
                    outputs = outputs[:, -self.args.pred_len:, f_dim:]
                    batch_y = batch_y[:, -self.args.pred_len:, f_dim:].to(self.device)
                    # 计算损失
                    loss = criterion(outputs, batch_y)
                    # 将损失放入train_loss列表中
                    train_loss.append(loss.item())

                # 记录训练过程
                if (i + 1) % 100 == 0:
                    print("\titers: {0}, epoch: {1} | loss: {2:.7f}".format(i + 1, epoch + 1, loss.item()))
                    speed = (time.time() - time_now) / iter_count
                    left_time = speed * ((self.args.train_epochs - epoch) * train_steps - i)
                    print('\tspeed: {:.4f}s/iter; left time: {:.4f}s'.format(speed, left_time))
                    iter_count = 0
                    time_now = time.time()

                if self.args.use_amp:
                    scaler.scale(loss).backward()
                    scaler.step(model_optim)
                    scaler.update()
                else:
                    # 反向传播
                    loss.backward()
                    # 更新梯度
                    model_optim.step()

            print("Epoch: {} cost time: {}".format(epoch + 1, time.time() - epoch_time))
            train_loss = np.average(train_loss)
            vali_loss = self.vali(vali_data, vali_loader, criterion)
            test_loss = self.vali(test_data, test_loader, criterion)

            print("Epoch: {0}, Steps: {1} | Train Loss: {2:.7f} Vali Loss: {3:.7f} Test Loss: {4:.7f}".format(
                epoch + 1, train_steps, train_loss, vali_loss, test_loss))
            early_stopping(vali_loss, self.model, path)
            if early_stopping.early_stop:
                print("Early stopping")
                break

            # 更新学习率
            adjust_learning_rate(model_optim, epoch + 1, self.args)
        # 保存模型
        best_model_path = path + '/' + 'checkpoint.pth'
        self.model.load_state_dict(torch.load(best_model_path))

        return self.model