动机
学了好长时间的机器学习一直找不到实践的机会,工作上的场景都被算法大拿们占据了,小菜鸟只能自谋出路。恰逢前些时中概股大跌,损失惨重,一怒之下干脆研究研究能否用机器学习来搞搞了。
数据获取
历史股价数据可以从雅虎金融上获取,而且已经有现成的python lib来做这个。
import pandas_datareader.data as reader
import fix_yahoo_finance as yf
def fetch_stock_list_from_web():
global COMPANIES
COMPANIES = pd.read_csv(SP500_LIST_PATH)
if _data_already_loaded():
return
yf.pdr_override()
for cell in COMPANIES['Symbol']:
try:
data = reader.get_data_yahoo(cell)
data.to_csv('./stock_data/' + cell + '.csv')
except Exception as e:
print(e)
其中SP500_LIST_PATH=‘constituents.csv’ 这个google一下就能找到。
打印预测结果
跳跃一下先把周边的工具准备完毕,有了股票预测数据之后一般都要打印一个预测和真实数据的对比图,这个可以用matplotlib.pyplot搞定。
import matplotlib.pyplot as plt
def _print_comparison_gui(pred, real):
truths = _flatten(real)[-200:]
preds = _flatten(pred)[-200:]
days = range(len(truths))[-200:]
plt.figure(figsize=(12, 6))
plt.plot(days, truths, label='truth')
plt.plot(days, preds, label='pred')
plt.legend(loc='upper left', frameon=False)
plt.xlabel("day")
plt.ylabel("normalized price")
plt.ylim((min(preds), max(truths)))
plt.grid(ls='--')
plt.show()
naive版本
预测股价的模型应该是怎样的呢?初始想法是把股价表达成时间的函数,因为某些时间具有特殊性比如周五,年底等等,所以把时间拆成年,月,日而不是作为一个整体,其实我们预测股价的时候一般会看公司的基本面,大环境,一些相关消息等等,但搞定这些东西还远超出我的能力所以就先忽略了。最终x是(year, month, day, day_of_week), y就是price。
def preprocess_data(symbols):
global PROCESSED_STOCK_DATA
for i in range(len(symbols)):
symbol = symbols[i]
print('Loading ' + symbol)
price = ORIGINAL_STOCK_DATA[symbol]['Open']
price = list(map(lambda each: [each],price))
date = list(map(lambda each: datetime.datetime.strptime(each, '%Y-%m-%d'), ORIGINAL_STOCK_DATA[symbol]['Date']))
x = list(map(lambda each: (each.year, each.month, each.day, each.weekday()), date))
train_size = int(len(x) * (1.0 - TEST_RATIO))
train_X, test_X = x[:train_size], x[train_size:]
train_y, test_y = price[:train_size], price[train_size:]
PROCESSED_STOCK_DATA[symbol] = (train_X, train_y, test_X, test_y)
def build_model(x):
L1 = 30
w1 = tf.Variable(tf.random_uniform([INPUT_VARIABLE_COUNT, L1], 0, 1))
b1 = tf.Variable(tf.zeros([1, L1]))
wb1 = tf.matmul(x, w1) + b1
layer1 = tf.nn.relu(wb1)
L2 = 40
w2 = tf.Variable(tf.random_uniform([L1, L2], 0, 1))
b2 = tf.Variable(tf.zeros([1, L2]))
wb2 = tf.matmul(layer1, w2) + b2
layer2 = tf.nn.relu(wb2)
w3 = tf.Variable(tf.random_uniform([L2, 1], 0, 1))
b3 = tf.Variable(tf.zeros([1, 1]))
wb3 = tf.matmul(layer2, w3) + b3
return wb3
可惜一跑就杯具了,跑了一小会所有的预测值就都在12上下跳动了,感觉像是梯度消失只好搬出batchnorm来救场,
def build_model(x, is_test, iteration):
L1 = 30
w1 = tf.Variable(tf.random_uniform([INPUT_VARIABLE_COUNT, L1], 0, 1))
S1 = tf.Variable(tf.ones([L1]))
b1 = tf.Variable(tf.zeros([1, L1]))
wb1 = tf.matmul(x, w1) + b1
Y1bn, update_ema1 = batchnorm(wb1, b1, S1, is_test, iteration)
layer1 = tf.nn.relu(Y1bn)
L2 = 40
w2 = tf.Variable(tf.random_uniform([L1, L2], 0, 1))
S2 = tf.Variable(tf.ones([L2]))
b2 = tf.Variable(tf.zeros([1, L2]))
wb2 = tf.matmul(layer1, w2) + b2
Y2bn, update_ema2 = batchnorm(wb2, b2, S2, is_test, iteration)
layer2 = tf.nn.relu(Y2bn)
w3 = tf.Variable(tf.random_uniform([L2, 1], 0, 1))
b3 = tf.Variable(tf.zeros([1, 1]))
wb3 = tf.matmul(layer2, w3) + b3
update_ema = tf.group(update_ema1, update_ema2)
return wb3, update_ema
最终拿到的结果是这样的,
感觉相当的不靠谱,程序似乎学到了一点大的趋势,但在基准值和连续性上面都差距较大。连续性差感觉可能是变量不够,网络层次太浅,可惜加大层数后在我的macbook pro上面跑了一晚上也没有一点要收敛的样子,只能放弃了。
LSTM版本
回想一下股价预测的方法中有一个流派叫技术分析也就是看k线图,这个似乎比较符合我现在只有股价做输入的状况,而k线图不就是和LSTM模型的场景一样一样的么?不过从头写一个LSTM还是有点超出本菜鸟的能力了,所以就在别人的基础上改吧,借鉴了使用rnn预测股票价格
rnn文章里面用的面向对象编程风格,而且把训练过程和测试过程放到一起,都让程序的理解难度加大了,所以我改写成了简单的过程式风格而且分开了训练和测试过程。
一些要点
1. LSTM版本除了模型不一样之外比较重要的是对股价做了规范化处理,把绝对股价换算成股价变动百分比,这样就不会碰到预测不出训练时没训练过的股价的问题了。
data = [data[0] / data[0][0] - 1.0] + [
curr / data[i][-1] - 1.0 for i, curr in enumerate(data[1:])]
data = [np.array(data[i * INPUT_SIZE: (i + 1) * INPUT_SIZE])
for i in range(len(data) // INPUT_SIZE)]
3. 预测的模型实际上是用连续多天(比如30天)的股价预测后面一天的股价,所以x和y是下面的样子
X = np.array([data[i: i + NUMBER_OF_STEPS] for i in range(len(data) - NUMBER_OF_STEPS)])
y = np.array([data[i + NUMBER_OF_STEPS] for i in range(len(data) - NUMBER_OF_STEPS)])
4. rnn文章对股票价格的input还做了一个向量转换(tf.nn.embedding_lookup),但我实际测试的结果不理想,而且感觉做了数据规范化以后的股价已经是向量了,似乎没必要再做一层转换,所以我的模型是这样的
def build_model(symbols,inputs,keep_prob):
def _create_one_cell():
lstm_cell = tf.contrib.rnn.LSTMCell(LSTM_SIZE, state_is_tuple=True)
lstm_cell = tf.contrib.rnn.DropoutWrapper(lstm_cell, output_keep_prob=keep_prob)
return lstm_cell
cell = tf.contrib.rnn.MultiRNNCell(
[_create_one_cell() for _ in range(NUMBER_OF_LAYERS)],
state_is_tuple=True
) if NUMBER_OF_LAYERS > 1 else _create_one_cell()
# Run dynamic RNN
val, state_ = tf.nn.dynamic_rnn(cell, inputs, dtype=tf.float32, scope="dynamic_rnn")
# Before transpose, val.get_shape() = (batch_size, num_steps, lstm_size)
# After transpose, val.get_shape() = (num_steps, batch_size, lstm_size)
val = tf.transpose(val, [1, 0, 2])
# 这里取向量的最后一位也就是最后一个step对应的数据,也就是模型预测的股票数据,也解释了为啥要用transpose
last = tf.gather(val, int(val.get_shape()[0]) - 1, name="lstm_state")
ws = tf.Variable(tf.truncated_normal([LSTM_SIZE, INPUT_SIZE]), name="w")
bias = tf.get_variable("b", [INPUT_SIZE])
pred = tf.matmul(last, ws) + bias
return pred, cell
结果
从上面的图来看,预测结果还是有点靠谱的,有几个较大的波动都给预测出来了,这是不是也说明K线图法是靠谱的呢?
代码
import pandas_datareader.data as reader
import fix_yahoo_finance as yf
import numpy as np
import os
import pandas as pd
import random
import tensorflow as tf
import matplotlib.pyplot as plt
SP500_LIST_PATH = './stock_data/constituents.csv'
DATA_PATH = './stock_data'
CHECK_POINTS_PATH = "./checkpoints/stock_check_points"
TRAIN_STOCK_LIST = ['AAPL','AMZN','GOOG','FB','MSFT','NFLX','NVDA','ORCL']
#TRAIN_STOCK_LIST = ['AAPL']
COMPANIES = None
ORIGINAL_STOCK_DATA = {}
PROCESSED_STOCK_DATA = {}
INPUT_SIZE = 5
# 可以理解成序列长度
NUMBER_OF_STEPS = 30
LSTM_SIZE = 128
EMBED_SIZE = 128
NUMBER_OF_LAYERS = 2
STOCK_COUNT = 10
MAX_EPOCH = 50000
INIT_EPOCH = 5
LEARNING_RATE_BASE = 0.001
LEARNING_RATE_DECAY_STEP = 1000
LEARNING_RATE_DECAY_RATE = 0.95
INIT_LEARNING_RATE = 0.001
LEARNING_RATE_DECAY = 0.99
TEST_RATIO = 0.1
BATCH_SIZE = 64
KEEP_PROB = 0.8
SAVE_STEP = 1000
def _data_already_loaded():
dirs = os.listdir(DATA_PATH)
if dirs and len(dirs) > 100:
return True
else:
return False
def fetch_stock_list_from_web():
global COMPANIES
COMPANIES = pd.read_csv(SP500_LIST_PATH)
if _data_already_loaded():
return
yf.pdr_override()
for cell in COMPANIES['Symbol']:
try:
data = reader.get_data_yahoo(cell)
data.to_csv('./stock_data/' + cell + '.csv')
except Exception as e:
print(e)
def load_stock_data():
global ORIGINAL_STOCK_DATA
for cell in COMPANIES['Symbol']:
csv_file = './stock_data/' + cell + '.csv'
if not (os.path.exists(csv_file)):
continue
csv_data = pd.read_csv(csv_file)
ORIGINAL_STOCK_DATA[cell] = csv_data
def preprocess_data(symbols):
global PROCESSED_STOCK_DATA
for symbol in symbols:
print('Loading ' + symbol)
data = ORIGINAL_STOCK_DATA[symbol]['Open']
# 按input_size进行拆分
data = [np.array(data[i * INPUT_SIZE: (i + 1) * INPUT_SIZE])
for i in range(len(data) // INPUT_SIZE)]
# 计算相对增量,注意后面的for里面i是从0开始的
data = [data[0] / data[0][0] - 1.0] + [
curr / data[i][-1] - 1.0 for i, curr in enumerate(data[1:])]
X = np.array([data[i: i + NUMBER_OF_STEPS] for i in range(len(data) - NUMBER_OF_STEPS)])
y = np.array([data[i + NUMBER_OF_STEPS] for i in range(len(data) - NUMBER_OF_STEPS)])
train_size = int(len(X) * (1.0 - TEST_RATIO))
train_X, test_X = X[:train_size], X[train_size:]
train_y, test_y = y[:train_size], y[train_size:]
PROCESSED_STOCK_DATA[symbol] = (train_X, train_y, test_X, test_y)
def _generate_batch(symbol):
train_X, train_y, test_X, test_y = PROCESSED_STOCK_DATA[symbol]
num_batches = int(len(train_X)) // BATCH_SIZE
if BATCH_SIZE * num_batches < len(train_X):
num_batches += 1
batch_indices = list(range(num_batches))
random.shuffle(batch_indices)
for j in batch_indices:
batch_X = train_X[j * BATCH_SIZE: (j + 1) * BATCH_SIZE]
batch_y = train_y[j * BATCH_SIZE: (j + 1) * BATCH_SIZE]
assert set(map(len, batch_X)) == {NUMBER_OF_STEPS}
yield batch_X, batch_y
def build_model(symbols,inputs,keep_prob):
def _create_one_cell():
lstm_cell = tf.contrib.rnn.LSTMCell(LSTM_SIZE, state_is_tuple=True)
lstm_cell = tf.contrib.rnn.DropoutWrapper(lstm_cell, output_keep_prob=keep_prob)
return lstm_cell
cell = tf.contrib.rnn.MultiRNNCell(
[_create_one_cell() for _ in range(NUMBER_OF_LAYERS)],
state_is_tuple=True
) if NUMBER_OF_LAYERS > 1 else _create_one_cell()
# embed_matrix = tf.Variable(
# tf.random_uniform([STOCK_COUNT, EMBED_SIZE], -1.0, 1.0),
# name="embed_matrix"
# )
# 对embedding lookup的效果比较怀疑
# stock_label_embeds.shape = (batch_size, embedding_size)
# stacked_symbols = tf.tile(symbols, [1, NUMBER_OF_STEPS], name='stacked_stock_labels')
# stacked_embeds = tf.nn.embedding_lookup(embed_matrix, stacked_symbols)
# After concat, inputs.shape = (batch_size, num_steps, input_size + embed_size)
#inputs_with_embed = tf.concat([inputs, stacked_embeds], axis=2, name="inputs_with_embed")
inputs_with_embed = tf.identity(inputs)
print("inputs.shape:", inputs.shape)
print("inputs_with_embed.shape:", inputs_with_embed.shape)
# Run dynamic RNN
val, state_ = tf.nn.dynamic_rnn(cell, inputs_with_embed, dtype=tf.float32, scope="dynamic_rnn")
# Before transpose, val.get_shape() = (batch_size, num_steps, lstm_size)
# After transpose, val.get_shape() = (num_steps, batch_size, lstm_size)
val = tf.transpose(val, [1, 0, 2])
# 这里取向量的最后一位也就是最后一个step对应的数据,也就是模型预测的股票数据,也解释了为啥要用transpose
last = tf.gather(val, int(val.get_shape()[0]) - 1, name="lstm_state")
ws = tf.Variable(tf.truncated_normal([LSTM_SIZE, INPUT_SIZE]), name="w")
bias = tf.get_variable("b", [INPUT_SIZE])
pred = tf.matmul(last, ws) + bias
return pred, cell
def train():
print('start training ...')
learning_rate = tf.placeholder(tf.float32, None, name="learning_rate")
keep_prob = tf.placeholder(tf.float32, None, name="keep_prob")
# Stock symbols are mapped to integers.
symbols = tf.placeholder(tf.int32, [None, 1], name='stock_labels')
inputs = tf.placeholder(tf.float32, [None, NUMBER_OF_STEPS, INPUT_SIZE], name="inputs")
targets = tf.placeholder(tf.float32, [None, INPUT_SIZE], name="targets")
pred, cell = build_model(symbols,inputs,keep_prob)
# 方差损失
loss = tf.reduce_mean(tf.square(pred - targets), name="loss_mse_train")
global_step = tf.Variable(0, trainable=False)
add_global_step = global_step.assign_add(1)
trainable_variables = tf.trainable_variables()
grads, a = tf.clip_by_global_norm(tf.gradients(loss, trainable_variables), 5) # prevent loss divergence caused by gradient explosion
learning_rate = tf.train.exponential_decay(LEARNING_RATE_BASE, global_step=global_step,
decay_steps=LEARNING_RATE_DECAY_STEP, decay_rate=LEARNING_RATE_DECAY_RATE)
optimizer = tf.train.AdamOptimizer(learning_rate)
optim = optimizer.apply_gradients(zip(grads, trainable_variables))
#optim = tf.train.RMSPropOptimizer(learning_rate).minimize(loss, name="rmsprop_optim")
with tf.Session() as sess:
sess.run(tf.global_variables_initializer())
saver = tf.train.Saver()
if not os.path.exists(CHECK_POINTS_PATH):
os.makedirs(CHECK_POINTS_PATH)
check_point = tf.train.get_checkpoint_state(CHECK_POINTS_PATH)
# if have checkPoint, restore checkPoint
if check_point and check_point.model_checkpoint_path:
saver.restore(sess, check_point.model_checkpoint_path)
print("restored %s" % check_point.model_checkpoint_path)
else:
print("no checkpoint found!")
g_step = 0
for epoch in range(MAX_EPOCH):
epoch_step = 0
each_turn_learning_rate = INIT_LEARNING_RATE * (
LEARNING_RATE_DECAY ** max(float(epoch + 1 - INIT_EPOCH), 0.0)
)
for label_, d_ in PROCESSED_STOCK_DATA.items():
label_pos = list(PROCESSED_STOCK_DATA.keys()).index(label_)
for batch_x, batch_y in _generate_batch(label_):
g_step += 1
epoch_step += 1
batch_labels = np.array([[label_pos]] * len(batch_x))
train_data_feed = {
learning_rate: each_turn_learning_rate,
keep_prob: KEEP_PROB,
inputs: batch_x,
targets: batch_y,
symbols: batch_labels,
}
train_loss, train_pred, _, _ = sess.run(
[loss, pred, optim, add_global_step], train_data_feed)
print("epoch: %d, steps: %d, loss: %3f" % (epoch + 1, epoch_step, train_loss))
# save and test
if g_step % SAVE_STEP == SAVE_STEP - 1: # prevent save at the beginning
print("save model")
saver.save(sess, os.path.join(CHECK_POINTS_PATH, 'stock.model'), global_step=g_step)
def test():
print('start test ...')
learning_rate = tf.placeholder(tf.float32, None, name="learning_rate")
keep_prob = tf.placeholder(tf.float32, None, name="keep_prob")
# Stock symbols are mapped to integers.
symbols = tf.placeholder(tf.int32, [None, 1], name='stock_labels')
inputs = tf.placeholder(tf.float32, [None, NUMBER_OF_STEPS, INPUT_SIZE], name="inputs")
targets = tf.placeholder(tf.float32, [None, INPUT_SIZE], name="targets")
pred, cell = build_model(symbols,inputs,keep_prob)
# 方差损失
loss = tf.reduce_mean(tf.square(pred - targets), name="loss_mse_train")
with tf.Session() as sess:
sess.run(tf.global_variables_initializer())
saver = tf.train.Saver()
check_point = tf.train.get_checkpoint_state(CHECK_POINTS_PATH)
# if have checkPoint, restore checkPoint
if check_point and check_point.model_checkpoint_path:
saver.restore(sess, check_point.model_checkpoint_path)
print("restored %s" % check_point.model_checkpoint_path)
else:
print("no checkpoint found!")
exit(1)
for label_, d_ in PROCESSED_STOCK_DATA.items():
label_pos = list(PROCESSED_STOCK_DATA.keys()).index(label_)
a, b, test_x, test_y = PROCESSED_STOCK_DATA[label_]
batch_labels = np.array([[label_pos]] * len(test_x))
test_data_feed = {
learning_rate: 0.0,
keep_prob: 1.0,
inputs: test_x,
targets: test_y,
symbols: batch_labels,
}
test_loss, test_pred = sess.run(
[loss,pred], test_data_feed)
print("stock: %s, loss: %3f" % (label_, test_loss))
_print_comparison_gui(test_pred, test_y, label_)
_print_comparison_text(test_pred, test_y)
# writer=tf.summary.FileWriter('./logs',sess.graph)
# writer.close()
# 把数据打印出来试试
def _print_comparison_gui(pred, real, stock_sym):
truths = _flatten(real)[-200:]
preds = 1 * (_flatten(pred)[-200:])
days = range(len(truths))[-200:]
plt.figure(figsize=(12, 6))
plt.plot(days, truths, label='truth')
plt.plot(days, preds, label='pred')
plt.legend(loc='upper left', frameon=False)
plt.xlabel("day")
plt.ylabel("normalized price")
plt.ylim((min(truths), max(truths)))
plt.grid(ls='--')
plt.title(stock_sym + " | Last %d days in test" % len(truths))
plt.show()
def _print_comparison_text(pred, real):
pred = _flatten(pred)
real = _flatten(real)
for each in pred:
print("%5f" % each, end=' ')
print('')
print('')
for each in real:
print("%5f" % each, end=' ')
print('')
print('')
def _flatten(seq):
return np.array([x for y in seq for x in y])
fetch_stock_list_from_web()
load_stock_data()
preprocess_data(TRAIN_STOCK_LIST)
#train()
test()

