动机
学了好长时间的机器学习一直找不到实践的机会,工作上的场景都被算法大拿们占据了,小菜鸟只能自谋出路。恰逢前些时中概股大跌,损失惨重,一怒之下干脆研究研究能否用机器学习来搞搞了。
数据获取
历史股价数据可以从雅虎金融上获取,而且已经有现成的python lib来做这个。
import pandas_datareader.data as reader import fix_yahoo_finance as yf def fetch_stock_list_from_web(): global COMPANIES COMPANIES = pd.read_csv(SP500_LIST_PATH) if _data_already_loaded(): return yf.pdr_override() for cell in COMPANIES['Symbol']: try: data = reader.get_data_yahoo(cell) data.to_csv('./stock_data/' + cell + '.csv') except Exception as e: print(e)
其中SP500_LIST_PATH=‘constituents.csv’ 这个google一下就能找到。
打印预测结果
跳跃一下先把周边的工具准备完毕,有了股票预测数据之后一般都要打印一个预测和真实数据的对比图,这个可以用matplotlib.pyplot搞定。
import matplotlib.pyplot as plt def _print_comparison_gui(pred, real): truths = _flatten(real)[-200:] preds = _flatten(pred)[-200:] days = range(len(truths))[-200:] plt.figure(figsize=(12, 6)) plt.plot(days, truths, label='truth') plt.plot(days, preds, label='pred') plt.legend(loc='upper left', frameon=False) plt.xlabel("day") plt.ylabel("normalized price") plt.ylim((min(preds), max(truths))) plt.grid(ls='--') plt.show()
naive版本
预测股价的模型应该是怎样的呢?初始想法是把股价表达成时间的函数,因为某些时间具有特殊性比如周五,年底等等,所以把时间拆成年,月,日而不是作为一个整体,其实我们预测股价的时候一般会看公司的基本面,大环境,一些相关消息等等,但搞定这些东西还远超出我的能力所以就先忽略了。最终x是(year, month, day, day_of_week), y就是price。
def preprocess_data(symbols): global PROCESSED_STOCK_DATA for i in range(len(symbols)): symbol = symbols[i] print('Loading ' + symbol) price = ORIGINAL_STOCK_DATA[symbol]['Open'] price = list(map(lambda each: [each],price)) date = list(map(lambda each: datetime.datetime.strptime(each, '%Y-%m-%d'), ORIGINAL_STOCK_DATA[symbol]['Date'])) x = list(map(lambda each: (each.year, each.month, each.day, each.weekday()), date)) train_size = int(len(x) * (1.0 - TEST_RATIO)) train_X, test_X = x[:train_size], x[train_size:] train_y, test_y = price[:train_size], price[train_size:] PROCESSED_STOCK_DATA[symbol] = (train_X, train_y, test_X, test_y)
def build_model(x): L1 = 30 w1 = tf.Variable(tf.random_uniform([INPUT_VARIABLE_COUNT, L1], 0, 1)) b1 = tf.Variable(tf.zeros([1, L1])) wb1 = tf.matmul(x, w1) + b1 layer1 = tf.nn.relu(wb1) L2 = 40 w2 = tf.Variable(tf.random_uniform([L1, L2], 0, 1)) b2 = tf.Variable(tf.zeros([1, L2])) wb2 = tf.matmul(layer1, w2) + b2 layer2 = tf.nn.relu(wb2) w3 = tf.Variable(tf.random_uniform([L2, 1], 0, 1)) b3 = tf.Variable(tf.zeros([1, 1])) wb3 = tf.matmul(layer2, w3) + b3 return wb3
可惜一跑就杯具了,跑了一小会所有的预测值就都在12上下跳动了,感觉像是梯度消失只好搬出batchnorm来救场,
def build_model(x, is_test, iteration): L1 = 30 w1 = tf.Variable(tf.random_uniform([INPUT_VARIABLE_COUNT, L1], 0, 1)) S1 = tf.Variable(tf.ones([L1])) b1 = tf.Variable(tf.zeros([1, L1])) wb1 = tf.matmul(x, w1) + b1 Y1bn, update_ema1 = batchnorm(wb1, b1, S1, is_test, iteration) layer1 = tf.nn.relu(Y1bn) L2 = 40 w2 = tf.Variable(tf.random_uniform([L1, L2], 0, 1)) S2 = tf.Variable(tf.ones([L2])) b2 = tf.Variable(tf.zeros([1, L2])) wb2 = tf.matmul(layer1, w2) + b2 Y2bn, update_ema2 = batchnorm(wb2, b2, S2, is_test, iteration) layer2 = tf.nn.relu(Y2bn) w3 = tf.Variable(tf.random_uniform([L2, 1], 0, 1)) b3 = tf.Variable(tf.zeros([1, 1])) wb3 = tf.matmul(layer2, w3) + b3 update_ema = tf.group(update_ema1, update_ema2) return wb3, update_ema
最终拿到的结果是这样的,
感觉相当的不靠谱,程序似乎学到了一点大的趋势,但在基准值和连续性上面都差距较大。连续性差感觉可能是变量不够,网络层次太浅,可惜加大层数后在我的macbook pro上面跑了一晚上也没有一点要收敛的样子,只能放弃了。
LSTM版本
回想一下股价预测的方法中有一个流派叫技术分析也就是看k线图,这个似乎比较符合我现在只有股价做输入的状况,而k线图不就是和LSTM模型的场景一样一样的么?不过从头写一个LSTM还是有点超出本菜鸟的能力了,所以就在别人的基础上改吧,借鉴了使用rnn预测股票价格
rnn文章里面用的面向对象编程风格,而且把训练过程和测试过程放到一起,都让程序的理解难度加大了,所以我改写成了简单的过程式风格而且分开了训练和测试过程。
一些要点
1. LSTM版本除了模型不一样之外比较重要的是对股价做了规范化处理,把绝对股价换算成股价变动百分比,这样就不会碰到预测不出训练时没训练过的股价的问题了。
data = [data[0] / data[0][0] - 1.0] + [ curr / data[i][-1] - 1.0 for i, curr in enumerate(data[1:])]
data = [np.array(data[i * INPUT_SIZE: (i + 1) * INPUT_SIZE]) for i in range(len(data) // INPUT_SIZE)]
3. 预测的模型实际上是用连续多天(比如30天)的股价预测后面一天的股价,所以x和y是下面的样子
X = np.array([data[i: i + NUMBER_OF_STEPS] for i in range(len(data) - NUMBER_OF_STEPS)]) y = np.array([data[i + NUMBER_OF_STEPS] for i in range(len(data) - NUMBER_OF_STEPS)])
4. rnn文章对股票价格的input还做了一个向量转换(tf.nn.embedding_lookup),但我实际测试的结果不理想,而且感觉做了数据规范化以后的股价已经是向量了,似乎没必要再做一层转换,所以我的模型是这样的
def build_model(symbols,inputs,keep_prob): def _create_one_cell(): lstm_cell = tf.contrib.rnn.LSTMCell(LSTM_SIZE, state_is_tuple=True) lstm_cell = tf.contrib.rnn.DropoutWrapper(lstm_cell, output_keep_prob=keep_prob) return lstm_cell cell = tf.contrib.rnn.MultiRNNCell( [_create_one_cell() for _ in range(NUMBER_OF_LAYERS)], state_is_tuple=True ) if NUMBER_OF_LAYERS > 1 else _create_one_cell() # Run dynamic RNN val, state_ = tf.nn.dynamic_rnn(cell, inputs, dtype=tf.float32, scope="dynamic_rnn") # Before transpose, val.get_shape() = (batch_size, num_steps, lstm_size) # After transpose, val.get_shape() = (num_steps, batch_size, lstm_size) val = tf.transpose(val, [1, 0, 2]) # 这里取向量的最后一位也就是最后一个step对应的数据,也就是模型预测的股票数据,也解释了为啥要用transpose last = tf.gather(val, int(val.get_shape()[0]) - 1, name="lstm_state") ws = tf.Variable(tf.truncated_normal([LSTM_SIZE, INPUT_SIZE]), name="w") bias = tf.get_variable("b", [INPUT_SIZE]) pred = tf.matmul(last, ws) + bias return pred, cell
结果
从上面的图来看,预测结果还是有点靠谱的,有几个较大的波动都给预测出来了,这是不是也说明K线图法是靠谱的呢?
代码
import pandas_datareader.data as reader import fix_yahoo_finance as yf import numpy as np import os import pandas as pd import random import tensorflow as tf import matplotlib.pyplot as plt SP500_LIST_PATH = './stock_data/constituents.csv' DATA_PATH = './stock_data' CHECK_POINTS_PATH = "./checkpoints/stock_check_points" TRAIN_STOCK_LIST = ['AAPL','AMZN','GOOG','FB','MSFT','NFLX','NVDA','ORCL'] #TRAIN_STOCK_LIST = ['AAPL'] COMPANIES = None ORIGINAL_STOCK_DATA = {} PROCESSED_STOCK_DATA = {} INPUT_SIZE = 5 # 可以理解成序列长度 NUMBER_OF_STEPS = 30 LSTM_SIZE = 128 EMBED_SIZE = 128 NUMBER_OF_LAYERS = 2 STOCK_COUNT = 10 MAX_EPOCH = 50000 INIT_EPOCH = 5 LEARNING_RATE_BASE = 0.001 LEARNING_RATE_DECAY_STEP = 1000 LEARNING_RATE_DECAY_RATE = 0.95 INIT_LEARNING_RATE = 0.001 LEARNING_RATE_DECAY = 0.99 TEST_RATIO = 0.1 BATCH_SIZE = 64 KEEP_PROB = 0.8 SAVE_STEP = 1000 def _data_already_loaded(): dirs = os.listdir(DATA_PATH) if dirs and len(dirs) > 100: return True else: return False def fetch_stock_list_from_web(): global COMPANIES COMPANIES = pd.read_csv(SP500_LIST_PATH) if _data_already_loaded(): return yf.pdr_override() for cell in COMPANIES['Symbol']: try: data = reader.get_data_yahoo(cell) data.to_csv('./stock_data/' + cell + '.csv') except Exception as e: print(e) def load_stock_data(): global ORIGINAL_STOCK_DATA for cell in COMPANIES['Symbol']: csv_file = './stock_data/' + cell + '.csv' if not (os.path.exists(csv_file)): continue csv_data = pd.read_csv(csv_file) ORIGINAL_STOCK_DATA[cell] = csv_data def preprocess_data(symbols): global PROCESSED_STOCK_DATA for symbol in symbols: print('Loading ' + symbol) data = ORIGINAL_STOCK_DATA[symbol]['Open'] # 按input_size进行拆分 data = [np.array(data[i * INPUT_SIZE: (i + 1) * INPUT_SIZE]) for i in range(len(data) // INPUT_SIZE)] # 计算相对增量,注意后面的for里面i是从0开始的 data = [data[0] / data[0][0] - 1.0] + [ curr / data[i][-1] - 1.0 for i, curr in enumerate(data[1:])] X = np.array([data[i: i + NUMBER_OF_STEPS] for i in range(len(data) - NUMBER_OF_STEPS)]) y = np.array([data[i + NUMBER_OF_STEPS] for i in range(len(data) - NUMBER_OF_STEPS)]) train_size = int(len(X) * (1.0 - TEST_RATIO)) train_X, test_X = X[:train_size], X[train_size:] train_y, test_y = y[:train_size], y[train_size:] PROCESSED_STOCK_DATA[symbol] = (train_X, train_y, test_X, test_y) def _generate_batch(symbol): train_X, train_y, test_X, test_y = PROCESSED_STOCK_DATA[symbol] num_batches = int(len(train_X)) // BATCH_SIZE if BATCH_SIZE * num_batches < len(train_X): num_batches += 1 batch_indices = list(range(num_batches)) random.shuffle(batch_indices) for j in batch_indices: batch_X = train_X[j * BATCH_SIZE: (j + 1) * BATCH_SIZE] batch_y = train_y[j * BATCH_SIZE: (j + 1) * BATCH_SIZE] assert set(map(len, batch_X)) == {NUMBER_OF_STEPS} yield batch_X, batch_y def build_model(symbols,inputs,keep_prob): def _create_one_cell(): lstm_cell = tf.contrib.rnn.LSTMCell(LSTM_SIZE, state_is_tuple=True) lstm_cell = tf.contrib.rnn.DropoutWrapper(lstm_cell, output_keep_prob=keep_prob) return lstm_cell cell = tf.contrib.rnn.MultiRNNCell( [_create_one_cell() for _ in range(NUMBER_OF_LAYERS)], state_is_tuple=True ) if NUMBER_OF_LAYERS > 1 else _create_one_cell() # embed_matrix = tf.Variable( # tf.random_uniform([STOCK_COUNT, EMBED_SIZE], -1.0, 1.0), # name="embed_matrix" # ) # 对embedding lookup的效果比较怀疑 # stock_label_embeds.shape = (batch_size, embedding_size) # stacked_symbols = tf.tile(symbols, [1, NUMBER_OF_STEPS], name='stacked_stock_labels') # stacked_embeds = tf.nn.embedding_lookup(embed_matrix, stacked_symbols) # After concat, inputs.shape = (batch_size, num_steps, input_size + embed_size) #inputs_with_embed = tf.concat([inputs, stacked_embeds], axis=2, name="inputs_with_embed") inputs_with_embed = tf.identity(inputs) print("inputs.shape:", inputs.shape) print("inputs_with_embed.shape:", inputs_with_embed.shape) # Run dynamic RNN val, state_ = tf.nn.dynamic_rnn(cell, inputs_with_embed, dtype=tf.float32, scope="dynamic_rnn") # Before transpose, val.get_shape() = (batch_size, num_steps, lstm_size) # After transpose, val.get_shape() = (num_steps, batch_size, lstm_size) val = tf.transpose(val, [1, 0, 2]) # 这里取向量的最后一位也就是最后一个step对应的数据,也就是模型预测的股票数据,也解释了为啥要用transpose last = tf.gather(val, int(val.get_shape()[0]) - 1, name="lstm_state") ws = tf.Variable(tf.truncated_normal([LSTM_SIZE, INPUT_SIZE]), name="w") bias = tf.get_variable("b", [INPUT_SIZE]) pred = tf.matmul(last, ws) + bias return pred, cell def train(): print('start training ...') learning_rate = tf.placeholder(tf.float32, None, name="learning_rate") keep_prob = tf.placeholder(tf.float32, None, name="keep_prob") # Stock symbols are mapped to integers. symbols = tf.placeholder(tf.int32, [None, 1], name='stock_labels') inputs = tf.placeholder(tf.float32, [None, NUMBER_OF_STEPS, INPUT_SIZE], name="inputs") targets = tf.placeholder(tf.float32, [None, INPUT_SIZE], name="targets") pred, cell = build_model(symbols,inputs,keep_prob) # 方差损失 loss = tf.reduce_mean(tf.square(pred - targets), name="loss_mse_train") global_step = tf.Variable(0, trainable=False) add_global_step = global_step.assign_add(1) trainable_variables = tf.trainable_variables() grads, a = tf.clip_by_global_norm(tf.gradients(loss, trainable_variables), 5) # prevent loss divergence caused by gradient explosion learning_rate = tf.train.exponential_decay(LEARNING_RATE_BASE, global_step=global_step, decay_steps=LEARNING_RATE_DECAY_STEP, decay_rate=LEARNING_RATE_DECAY_RATE) optimizer = tf.train.AdamOptimizer(learning_rate) optim = optimizer.apply_gradients(zip(grads, trainable_variables)) #optim = tf.train.RMSPropOptimizer(learning_rate).minimize(loss, name="rmsprop_optim") with tf.Session() as sess: sess.run(tf.global_variables_initializer()) saver = tf.train.Saver() if not os.path.exists(CHECK_POINTS_PATH): os.makedirs(CHECK_POINTS_PATH) check_point = tf.train.get_checkpoint_state(CHECK_POINTS_PATH) # if have checkPoint, restore checkPoint if check_point and check_point.model_checkpoint_path: saver.restore(sess, check_point.model_checkpoint_path) print("restored %s" % check_point.model_checkpoint_path) else: print("no checkpoint found!") g_step = 0 for epoch in range(MAX_EPOCH): epoch_step = 0 each_turn_learning_rate = INIT_LEARNING_RATE * ( LEARNING_RATE_DECAY ** max(float(epoch + 1 - INIT_EPOCH), 0.0) ) for label_, d_ in PROCESSED_STOCK_DATA.items(): label_pos = list(PROCESSED_STOCK_DATA.keys()).index(label_) for batch_x, batch_y in _generate_batch(label_): g_step += 1 epoch_step += 1 batch_labels = np.array([[label_pos]] * len(batch_x)) train_data_feed = { learning_rate: each_turn_learning_rate, keep_prob: KEEP_PROB, inputs: batch_x, targets: batch_y, symbols: batch_labels, } train_loss, train_pred, _, _ = sess.run( [loss, pred, optim, add_global_step], train_data_feed) print("epoch: %d, steps: %d, loss: %3f" % (epoch + 1, epoch_step, train_loss)) # save and test if g_step % SAVE_STEP == SAVE_STEP - 1: # prevent save at the beginning print("save model") saver.save(sess, os.path.join(CHECK_POINTS_PATH, 'stock.model'), global_step=g_step) def test(): print('start test ...') learning_rate = tf.placeholder(tf.float32, None, name="learning_rate") keep_prob = tf.placeholder(tf.float32, None, name="keep_prob") # Stock symbols are mapped to integers. symbols = tf.placeholder(tf.int32, [None, 1], name='stock_labels') inputs = tf.placeholder(tf.float32, [None, NUMBER_OF_STEPS, INPUT_SIZE], name="inputs") targets = tf.placeholder(tf.float32, [None, INPUT_SIZE], name="targets") pred, cell = build_model(symbols,inputs,keep_prob) # 方差损失 loss = tf.reduce_mean(tf.square(pred - targets), name="loss_mse_train") with tf.Session() as sess: sess.run(tf.global_variables_initializer()) saver = tf.train.Saver() check_point = tf.train.get_checkpoint_state(CHECK_POINTS_PATH) # if have checkPoint, restore checkPoint if check_point and check_point.model_checkpoint_path: saver.restore(sess, check_point.model_checkpoint_path) print("restored %s" % check_point.model_checkpoint_path) else: print("no checkpoint found!") exit(1) for label_, d_ in PROCESSED_STOCK_DATA.items(): label_pos = list(PROCESSED_STOCK_DATA.keys()).index(label_) a, b, test_x, test_y = PROCESSED_STOCK_DATA[label_] batch_labels = np.array([[label_pos]] * len(test_x)) test_data_feed = { learning_rate: 0.0, keep_prob: 1.0, inputs: test_x, targets: test_y, symbols: batch_labels, } test_loss, test_pred = sess.run( [loss,pred], test_data_feed) print("stock: %s, loss: %3f" % (label_, test_loss)) _print_comparison_gui(test_pred, test_y, label_) _print_comparison_text(test_pred, test_y) # writer=tf.summary.FileWriter('./logs',sess.graph) # writer.close() # 把数据打印出来试试 def _print_comparison_gui(pred, real, stock_sym): truths = _flatten(real)[-200:] preds = 1 * (_flatten(pred)[-200:]) days = range(len(truths))[-200:] plt.figure(figsize=(12, 6)) plt.plot(days, truths, label='truth') plt.plot(days, preds, label='pred') plt.legend(loc='upper left', frameon=False) plt.xlabel("day") plt.ylabel("normalized price") plt.ylim((min(truths), max(truths))) plt.grid(ls='--') plt.title(stock_sym + " | Last %d days in test" % len(truths)) plt.show() def _print_comparison_text(pred, real): pred = _flatten(pred) real = _flatten(real) for each in pred: print("%5f" % each, end=' ') print('') print('') for each in real: print("%5f" % each, end=' ') print('') print('') def _flatten(seq): return np.array([x for y in seq for x in y]) fetch_stock_list_from_web() load_stock_data() preprocess_data(TRAIN_STOCK_LIST) #train() test()