首页   注册   登录
V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
V2EX  ›  TensorFlow

Tensorflow 笔记 RNN 预测时间序列

  •  
  •   LittleUqeer · 2017-02-10 13:22:33 +08:00 · 11389 次点击
    这是一个创建于 739 天前的主题,其中的信息可能已经有所发展或是发生改变。

    最近重看了一遍大作手回忆录,就想能否让机器识别一下 M 顶、 W 底这种相当主观的技术分析。

    假设各种技术分析图形可以看作是各种技术分析因子在一段时间内的相对比例变化,

    这样可以认为将技术分析因子进行标准化之后不会损失太多的信息。

    将因子随交易时间推进的变换看作时间序列 in_length 表示时间序列的长度,

    in_width 看作时间序列的宽度也就是并列的多种技术分析因子数值。

    不考虑时间推进因素的影响可以使用 CNN 等非时间序列 DNN 处理。

    考虑时间推进对因子变化有影响,此时刻因子变化和股价涨跌受到前面状态的影响的时间序列可以使用 HMM 或 RNN 处理。

    HMM 常见的假设为状态 i_t 只受到状态 i_{t-1}时刻的影响, RNN 则泛泛认为可以使用训练集来学习到时间序列的非线性关系。

    本帖对于 RNN 结构进行简单探索。

    定长时间序列

    训练 RNN 模型定长序列,假设在 T_i 交易日可以通过前 m 交易日技术分析走势预判后第 n 交易日股价涨跌幅度,并且 RNN 模型可以自动从 T_{i-m}到 T_i 时间序列学习到这种预测关系。

    输入数据格式[批次,步长,多因子] 其中步长表示从 T_{i-m}到 T_i 时间序列

    class.fit(trainX, trainY)训练模型

    clf.pred_prob(trainX) 预测返回概率矩阵

    clf.pred_signal(trainX) 预测返回标签

    trainX 输入格式 [row, in_length, in_width]

    trainY 输入格式 [row]

    batch_size=128 喂入批次大小

    display_step=5 显示步长

    layer_units_num=2000 隐藏层单元数目

    training_epoch=100 训练次数

    class test_1(object):
        def __init__(self,
                    batch_size = 128,
                    learning_rate = 0.001,
                    training_epoch = 10,
                    display_step = 5,
                    layer_units_num = 100):
            self.batch_size = batch_size
            self.learning_rate = learning_rate
            self.training_epoch = training_epoch
            self.display_step = display_step
            self.layer_units_num = layer_units_num
            
        def dense_to_one_hot(self,labels_dense):
            """标签 转换 one hot 编码
            输入 labels_dense 必须为非负数
            2016-11-21
            """
            num_classes = len(np.unique(labels_dense)) # np.unique 去掉重复函数
            raws_labels = labels_dense.shape[0]
            index_offset = np.arange(raws_labels) * num_classes
            labels_one_hot = np.zeros((raws_labels, num_classes))
            labels_one_hot.flat[index_offset + labels_dense.ravel()] = 1
            return labels_one_hot  
            
        def Preprocessing(self, trainX, trainY, seed=False):
            trainY = self.dense_to_one_hot(trainY)
            self.in_length= in_length= trainX.shape[1]
            self.in_width= in_width= trainX.shape[2]
            self.out_classes= out_classes= trainY.shape[1]
            
            if seed:
                tf.set_random_seed(20170204)
            weights = {
                'out': tf.Variable(tf.truncated_normal(shape=[self.layer_units_num, out_classes], 
                                                    mean=0., stddev=1., seed=None, dtype=tf.float32),
                                   trainable=True, name='Weight_full_out')
            }
            
            biases = {
                'out': tf.Variable(tf.truncated_normal([out_classes]), trainable=True, name= 'Biases_full_out')
            }    
            
            self.weights = weights
            self.biases = biases        
            
            X = tf.placeholder(dtype=tf.float32, shape=[None, in_length, in_width], name='trainX') # 批次,时间序列,多因子
            Y = tf.placeholder(dtype= tf.float32, shape=[None, out_classes], name='trainY') 
            keep_prob = tf.placeholder(dtype= tf.float32)
            self.X = X
            self.Y = Y
            self.keep_prob = keep_prob  
            
        def Network(self):
            
            with tf.name_scope('layer_1'):            
                monolayer_1 = tf.nn.rnn_cell.BasicLSTMCell(num_units= self.layer_units_num, 
                                                           forget_bias=1., state_is_tuple=True, activation=tf.tanh)            
                monolayer_1 = tf.nn.rnn_cell.DropoutWrapper(cell=monolayer_1, output_keep_prob= keep_prob)
    
            
            with tf.name_scope('layer_2'):        
                monolayer_2 = tf.nn.rnn_cell.BasicLSTMCell(num_units= self.layer_units_num,
                                                           forget_bias=1., state_is_tuple=True, activation=tf.tanh)
                monolayer_2 = tf.nn.rnn_cell.DropoutWrapper(cell=monolayer_2, output_keep_prob= keep_prob)
            
            with tf.name_scope('layer_3'):                  
                monolayer_3 = tf.nn.rnn_cell.BasicLSTMCell(num_units= self.layer_units_num,
                                                           forget_bias=1., state_is_tuple=True, activation=tf.tanh)
                monolayer_3 = tf.nn.rnn_cell.DropoutWrapper(cell=monolayer_3, output_keep_prob= keep_prob)
            
            with tf.name_scope('layer_Final'):
                monolayer_final = tf.nn.rnn_cell.BasicLSTMCell(num_units=self.layer_units_num, 
                                                           forget_bias=1., state_is_tuple=True, activation=tf.tanh)
            
            with tf.name_scope('Layer_Structure_Combination'):
                layer_units_num = self.layer_units_num
                Layers = tf.nn.rnn_cell.MultiRNNCell(cells=[monolayer_1,monolayer_2,monolayer_3,monolayer_final],
                                                    state_is_tuple = True)    
            self.Layers = Layers
            return Layers
            
        def Model(self):
            X = self.X
            keep_prob = self.keep_prob
            
            X = tf.transpose(X, [1, 0, 2])
            X = tf.reshape(X, [-1,self.in_width])
            X = tf.split(split_dim=0, num_split=self.in_length, value=X)
            
            with tf.name_scope('layer_1'):            
                monolayer_1 = tf.nn.rnn_cell.BasicLSTMCell(num_units= self.layer_units_num, 
                                                           forget_bias=1., state_is_tuple=True, activation=tf.tanh)            
                monolayer_1 = tf.nn.rnn_cell.DropoutWrapper(cell=monolayer_1, output_keep_prob= keep_prob)
    
            
            with tf.name_scope('layer_2'):        
                monolayer_2 = tf.nn.rnn_cell.BasicLSTMCell(num_units= self.layer_units_num,
                                                           forget_bias=1., state_is_tuple=True, activation=tf.tanh)
                monolayer_2 = tf.nn.rnn_cell.DropoutWrapper(cell=monolayer_2, output_keep_prob= keep_prob)
            
            with tf.name_scope('layer_3'):                  
                monolayer_3 = tf.nn.rnn_cell.BasicLSTMCell(num_units= self.layer_units_num,
                                                           forget_bias=1., state_is_tuple=True, activation=tf.tanh)
                monolayer_3 = tf.nn.rnn_cell.DropoutWrapper(cell=monolayer_3, output_keep_prob= keep_prob)
            
            with tf.name_scope('layer_Final'):
                monolayer_final = tf.nn.rnn_cell.BasicLSTMCell(num_units=self.layer_units_num, 
                                                           forget_bias=1., state_is_tuple=True, activation=tf.tanh)
            
            with tf.name_scope('Layer_Structure_Combination'):
                layer_units_num = self.layer_units_num
                Layers = tf.nn.rnn_cell.MultiRNNCell(cells=[monolayer_1,monolayer_2,monolayer_3,monolayer_final],
                                                    state_is_tuple = True)    
            
            outputs,_ = tf.nn.rnn(cell=monolayer_final, inputs=X, dtype=tf.float32)
            output = outputs[-1]
    
            return tf.nn.bias_add(value= tf.matmul(output, self.weights['out']), bias= self.biases['out'])  
            
        def train(self, trainX, trainY, seed=False):
            self.sess = tf.InteractiveSession()
            
            self.Preprocessing(trainX, trainY, seed)
            
            tmp = self.Model()
            
            self.predict = tf.nn.softmax(tmp)
            
            self.cost = tf.reduce_sum(tf.nn.softmax_cross_entropy_with_logits(tmp, self.Y))
            
            optimizer = tf.train.AdamOptimizer(learning_rate= self.learning_rate) # 0 设置训练器
            grads_and_vars = optimizer.compute_gradients(self.cost)
            for i, (grid, var) in enumerate(grads_and_vars):
                if grid != None:
                    grid = tf.clip_by_value(grid, -1., 1.)
                    grads_and_vars[i] = (grid, var)
            optimizer = optimizer.apply_gradients(grads_and_vars)
            self.optimizer = optimizer
            
            self.correct_pred = tf.equal(tf.argmax(tmp,1), tf.argmax(self.Y,1))
            accuracy = tf.reduce_mean(tf.cast(self.correct_pred, tf.float32))
            self.accuracy = accuracy
            
            #self.init = tf.global_variables_initializer()   
            self.init = tf.initialize_all_variables()
        def fit(self,trainX, trainY, dropout = 0.3, seed=True):
            self.train(trainX, trainY, seed=True)
            sess = self.sess
            sess.run(self.init)
            batch_size = self.batch_size
            trainY = self.dense_to_one_hot(trainY)
            for ep in range(self.training_epoch):
                for i in range(int(len(trainX)/batch_size)+1):
                    if i < int(len(trainX)/batch_size)+1:
                        batch_x = trainX[i*batch_size : (i+1)*batch_size]
                        batch_y = trainY[i*batch_size : (i+1)*batch_size]
                    elif i== int(len(trainX)/batch_size)+1:
                        batch_x = trainX[-batch_size:]
                        batch_y = trainY[-batch_size:]
                    sess.run(self.optimizer, feed_dict={self.X:batch_x, self.Y:batch_y, self.keep_prob:(1.-dropout)})
                if ep%self.display_step==0:                
                    loss, acc = sess.run([self.cost,self.accuracy], feed_dict={self.X:trainX, self.Y:trainY, self.keep_prob:1.})
                    print (str(ep)+"th "+'Epoch Loss = {:.5f}'.format(loss)+" Training Accuracy={:.5f}".format(acc))
            self.sess= sess
            print("Optimization Finished!") 
        
        def pred_prob(self, testX):
            sess = self.sess
            batch_size = self.batch_size
            trainX = testX
            predict_output = np.zeros([1,self.out_classes])
            for i in range(int(len(trainX)/batch_size)+1):
                if i < int(len(trainX)/batch_size)+1:
                    batch_x = trainX[i*batch_size : (i+1)*batch_size]
                    batch_y = trainY[i*batch_size : (i+1)*batch_size]
                elif i== int(len(trainX)/batch_size)+1:
                    batch_x = trainX[-batch_size:]
                    batch_y = trainY[-batch_size:]
                tp = sess.run(self.predict,feed_dict={self.X:batch_x, self.keep_prob:1.})
                predict_output = np.row_stack([predict_output, tp])
            predict_output = np.delete(predict_output, obj=0, axis=0)
            return predict_output
        
        def pred(self, testX):
            pred_prob = self.pred_prob(testX)
            return np.argmax(pred_prob, axis=1)
    
    

    变长时间序列

    训练 RNN 模型定长序列,假设在 T_i 交易日可以通过前 m 交易日技术分析走势预判后第 n 交易日股价涨跌幅度, RNN 模型可以从 T_{i-m}到 T_i 时间序列学习到这种预测关系,但是无法训练得到准确的稀疏权重。对应 T 交易日进行预判的时候考虑前 2 周走势或者前 2 个月的走势,这个时候模型无法隐式学习,需要指定训练集为变长序列对应涨跌标签。

    输入 对于变长时间序列 设格式为

    [batch_size, real_length, in_width]

    其中 real_length 表示变长时间序列输入步长,为一个变化值,使用 LSTM 进行预测的时候将输入数据修改格式为

    batch_size, in_length, in_width

    其中 in_length 使用 real_length 中的最大步长值,空余部分使用 0 填充

    修改原始稠密矩阵为稀疏矩阵统一格式进行输入。

    输出裁剪 由于输入 tensor 为稀疏矩阵,则对应的 RNN 网络计算得到的矩阵为稀疏矩阵(对于全为 0 的填充稀疏部分进行数值优化的时候梯度为 0 实际不变化)

    将拓扑结构图得到的矩阵进行裁剪,使得输出 tensor 格式从

    [batch_size, in_length, layer_units_num]

    转换为

    [batch_size, 1, layer_units_num]

    softmax [batch_size, out_classes]

    这里 1 表示这里对输入步长取实际输入步长最后一步。

    import functools
    from functools import reduce  
    import numpy as np
    import tensorflow as tf
    
    def lazy_property(function):
        attribute = '_' + function.__name__
    
        @property
        @functools.wraps(function)
        def wrapper(self):
            if not hasattr(self, attribute):
                setattr(self, attribute, function(self))
            return getattr(self, attribute)
        return wrapper
    
    class test(object):
        def __init__(self,
                    batch_size = 128,
                    learning_rate = 0.001,
                    error = .01,
                    display_step = 5,
                    layer_units_num = 200):
            self.batch_size = batch_size
            self.learning_rate = learning_rate
            self.error = error
            self.display_step = display_step
            self.layer_units_num = layer_units_num
            
        def dense_to_one_hot(self,labels_dense):
            """标签 转换 one hot 编码
            输入 labels_dense 必须为非负数
            2016-11-21
            """
            num_classes = len(np.unique(labels_dense)) # np.unique 去掉重复函数
            raws_labels = labels_dense.shape[0]
            index_offset = np.arange(raws_labels) * num_classes
            labels_one_hot = np.zeros((raws_labels, num_classes))
            labels_one_hot.flat[index_offset + labels_dense.ravel()] = 1
            return labels_one_hot  
        
        # 获得权重和偏置
        @staticmethod
        def _weight_and_bias(in_size, out_size):
            weight = tf.truncated_normal([in_size, out_size], stddev=0.01)
            bias = tf.constant(0.1, shape=[out_size])
            return tf.Variable(weight), tf.Variable(bias)
        
        @lazy_property
        def length(self):
            dense_sign = tf.sign(tf.reduce_max(tf.abs(self.X),reduction_indices=2))
            length = tf.reduce_sum(input_tensor=dense_sign, reduction_indices=1)
            length = tf.cast(length, tf.int32)
            return length
        
        @staticmethod
        def _final_relevant(output, length):
            # length 输入时间序列的实际长度
            # in_length 表示输入时间序列长度
            # max_length 表示最大时间序列长度,也就是稀疏矩阵 最大时间序列长度
            batch_size = tf.shape(output)[0]
            max_length = int(output.get_shape()[1])
            output_size = int(output.get_shape()[2])
            index = tf.range(start=0, limit=batch_size)*max_length + (length-1) # 这里使用 max_length 开创间隔,使用 length-1 表示实际位置,最后一个输出的位置
            flat = tf.reshape(output, [-1,output_size]) # 将输出展平, batch_size*length in_width
            relevant = tf.gather(flat, index) # 根据实际长度选出最后一个输出 output 状态使用
            return relevant  
        
        def Preprocessing(self, trainX, trainY):
            self.in_length= in_length= trainX.shape[1]
            self.in_width= in_width= trainX.shape[2]
            self.out_classes= out_classes= trainY.shape[1]
            
            self.X = tf.placeholder(dtype=tf.float32, shape=[None, in_length, in_width], name='trainX') # 批次,时间序列,多因子
            self.Y = tf.placeholder(dtype= tf.float32, shape=[None, out_classes], name='trainY') 
            self.keep_prob = tf.placeholder(dtype= tf.float32)
        
        def str2float(self,s):  
            def char2num(s):  
                return {'0': 0, '1': 1, '2': 2, '3': 3, '4': 4, '5': 5, '6': 6, '7': 7, '8': 8, '9': 9}[s]  
            n = s.index('.')  
            return reduce(lambda x,y:x*10+y,map(char2num,s[:n]+s[n+1:]))/(10**n)  
        
        def Interface(self):        
            # 4 层 GRU 结构描述
            monolayer = tf.nn.rnn_cell.GRUCell(num_units= self.layer_units_num)
            monolayer = tf.nn.rnn_cell.DropoutWrapper(cell=monolayer, output_keep_prob=self.keep_prob)
            monolayer_final = tf.nn.rnn_cell.GRUCell(num_units= self.layer_units_num)
            layers = tf.nn.rnn_cell.MultiRNNCell([monolayer]*3+[monolayer_final])
            # 激活 注意 in_length 表示输入序列步长, length 表示实际步长
            output,_ = tf.nn.dynamic_rnn(cell= layers, inputs= self.X, dtype= tf.float32, sequence_length= self.length)        
            output = self._final_relevant(output, self.length)
            
            weights, biases = self._weight_and_bias(self.layer_units_num, self.out_classes)        
            Prediction = tf.nn.bias_add(tf.matmul(output, weights),biases)
            return Prediction
        
        def Graph(self, trainX, trainY):
            try:
                tf.InteractiveSession.close()
            except:
                pass
            self.sess = tf.InteractiveSession()
            tf.get_default_session()
            self.Preprocessing(trainX, trainY)
            tmp = self.Interface()
            
            self.pred = tf.nn.softmax(tmp)
            self.cost = tf.reduce_sum(tf.nn.softmax_cross_entropy_with_logits(tmp, self.Y))
            
            optimizer = tf.train.AdamOptimizer(learning_rate= self.learning_rate) # 0 设置训练器
            grads_and_vars = optimizer.compute_gradients(self.cost)
            for i, (grid, var) in enumerate(grads_and_vars):
                if grid != None:
                    grid = tf.clip_by_value(grid, -1., 1.)
                    grads_and_vars[i] = (grid, var)
            optimizer = optimizer.apply_gradients(grads_and_vars)
            self.optimizer = optimizer
            
            self.correct_pred = tf.equal(tf.argmax(tmp,1), tf.argmax(self.Y,1))
            self.accuracy = tf.reduce_mean(tf.cast(self.correct_pred, tf.float32))
            #self.init = tf.global_variables_initializer()
            self.init = tf.initialize_all_variables()
        def fit(self, trainX, trainY, dropout= 0.618):
            # 对标签 one_hot 编码
            trainY = self.dense_to_one_hot(trainY)
            
            self.Graph(trainX, trainY)
            self.sess.run(self.init)
            batch_size = self.batch_size
            sig =10.
            ep = 0
            while (sig > self.error):
                for i in range(int(len(trainX)/batch_size)+1):
                    if i < int(len(trainX)/batch_size)+1:
                        batch_x = trainX[i*batch_size : (i+1)*batch_size]
                        batch_y = trainY[i*batch_size : (i+1)*batch_size]
                    elif i== int(len(trainX)/batch_size)+1:
                        batch_x = trainX[-batch_size:]
                        batch_y = trainY[-batch_size:]
                    self.sess.run(self.optimizer,feed_dict={self.X:batch_x, self.Y:batch_y, self.keep_prob:(1.-dropout)})
                sig = self.sess.run(self.accuracy, feed_dict={self.X:trainX, self.Y:trainY, self.keep_prob:1.})
                if ep%self.display_step==0:                
                    loss = self.sess.run(self.cost, feed_dict={self.X:trainX, self.Y:trainY, self.keep_prob:1.})
                    print (str(ep)+"th "+'Epoch Loss = {:.5f}'.format(loss)+" Training Accuracy={:.5f}".format(sig))
                ep += 1
            print("Optimization Finished!")                
            
        def pred_prob(self, testX):
            batch_size = self.batch_size
            trainX = testX
            predict_output = np.zeros([1,self.out_classes])
            for i in range(int(len(trainX)/batch_size)+1):
                if i < int(len(trainX)/batch_size)+1:
                    batch_x = trainX[i*batch_size : (i+1)*batch_size]
                    batch_y = trainY[i*batch_size : (i+1)*batch_size]
                elif i== int(len(trainX)/batch_size)+1:
                    batch_x = trainX[-batch_size:]
                    batch_y = trainY[-batch_size:]
                tp = self.sess.run(self.pred, feed_dict={self.X:batch_x, self.keep_prob:1.})
                predict_output = np.row_stack([predict_output, tp])
            predict_output = np.delete(predict_output, obj=0, axis=0)
            return predict_output
        
        def pred_signal(self, testX):
            pred_prob = self.pred_prob(testX)
            return np.argmax(pred_prob, axis=1)
    
    

    因限制原因,图片与源代码戳这里: https://uqer.io/community/share/589d3cc2c1e3cc00567fdbea

    3 回复  |  直到 2017-02-10 21:54:54 +08:00
        1
    tomleader0828   2017-02-10 14:17:33 +08:00
    mark
        2
    pming1   2017-02-10 14:19:37 +08:00
    不明觉厉
        3
    pathbox   2017-02-10 21:54:54 +08:00
    在考虑要不要学 Python
    关于   ·   FAQ   ·   API   ·   我们的愿景   ·   广告投放   ·   感谢   ·   实用小工具   ·   813 人在线   最高记录 4385   ·  
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.3 · 19ms · UTC 21:45 · PVG 05:45 · LAX 13:45 · JFK 16:45
    ♥ Do have faith in what you're doing.
    沪ICP备16043287号-1