optiver-realized-volatility-prediction 1日目
目標がないと頑張れないので、今日の時点で銅メダルラインの0.1955を一旦に目標してみよう。方法はディスカッションやVote多いNotebookみてひたすらぱくる。
今日のまとめ
- Voteの高いLGBMを用いているNotebookをもとにベースラインを作成。現時点でのスコア(交差検証したもの)は0.2344。
- realized_volatilityの説明力がやはり高そうなので、それに関連した特徴量を今後作成していく感じか?
今日の目標
ベースライン作成
ベースライン作成
0から作っては日が暮れるので、Voteの高かったこのファイルをほぼそのまま流用。データ取得→特徴量作成→学習→予測、という流れと理解しているので、その順に関数をまとめるくらいには手を加えてみた。
LightGBM starter with feature engineering idea | Kaggle
結果
参考にしたサイトたち
【機械学習実践】LightGBMで回帰してみる【Python】
importanceをgainとsplitでそれぞれ比較。
gainはwapから算出したrealized_volatilityが、splitはstock_idがそれぞれ大きな値になっている。gainは目的関数(rmse)の減少にどれだけ寄与したか、splitはどれだけ分岐に表れたか、をそれぞれ指す。ということは、
- realized_volatility: 分岐の登場回数は少ないが、説明力高い
- stock_id: 分岐の登場回数多いが、説明力はそれほど高くない。(とはいえほかの特徴量よりは選ばれているので説明力あり?)
となると、分岐も序盤ではrealized_volatilityが多く登場して、その後stock_idが登場して少しずつ精度向上させているようなイメージか?また、今後の方針としては、realized_volatilityに関する特徴量を増やしていく感じ?(ラグだったら、似たクラスターの平均ボラだったり?)
コード
def get_data() -> pd.DataFrame: ''' データ取得 ''' data_dir = '../input/optiver-realized-volatility-prediction/' train = pd.read_csv(data_dir + "train.csv") test = pd.read_csv(data_dir + "test.csv") sample_sub = pd.read_csv(data_dir + "sample_submission.csv") train['row_id'] = train['stock_id'].astype(str) + '-' + train['time_id'].astype(str) # # book, tradeはstock_idを指定して取得 # stock_id = 0 # book_train = pd.read_parquet(data_dir + "book_train.parquet/stock_id=" + str(stock_id)) # trade_train = pd.read_parquet(data_dir + "trade_train.parquet/stock_id=" + str(stock_id)) # book_test = pd.read_parquet(data_dir + "book_test.parquet/stock_id=" + str(stock_id)) # trade_test = pd.read_parquet(data_dir + "trade_test.parquet/stock_id=" + str(stock_id)) return train, test, sample_sub def calc_wap(df): wap = (df['bid_price1'] * df['ask_size1'] + df['ask_price1'] * df['bid_size1'])/(df['bid_size1'] + df['ask_size1']) return wap def calc_wap2(df): wap = (df['bid_price2'] * df['ask_size2'] + df['ask_price2'] * df['bid_size2'])/(df['bid_size2'] + df['ask_size2']) return wap def log_return(list_stock_prices): return np.log(list_stock_prices).diff() def realized_volatility(series): return np.sqrt(np.sum(series**2)) def count_unique(series): return len(np.unique(series)) def preprocessor_book(file_path): df = pd.read_parquet(file_path) #calculate return etc df['wap'] = calc_wap(df) df['log_return'] = df.groupby('time_id')['wap'].apply(log_return) df['wap2'] = calc_wap2(df) df['log_return2'] = df.groupby('time_id')['wap2'].apply(log_return) df['wap_balance'] = abs(df['wap'] - df['wap2']) df['price_spread'] = (df['ask_price1'] - df['bid_price1']) / ((df['ask_price1'] + df['bid_price1'])/2) df['bid_spread'] = df['bid_price1'] - df['bid_price2'] df['ask_spread'] = df['ask_price1'] - df['ask_price2'] df['total_volume'] = (df['ask_size1'] + df['ask_size2']) + (df['bid_size1'] + df['bid_size2']) df['volume_imbalance'] = abs((df['ask_size1'] + df['ask_size2']) - (df['bid_size1'] + df['bid_size2'])) #dict for aggregate create_feature_dict = { 'log_return':[realized_volatility], 'log_return2':[realized_volatility], 'wap_balance':[np.mean], 'price_spread':[np.mean], 'bid_spread':[np.mean], 'ask_spread':[np.mean], 'volume_imbalance':[np.mean], 'total_volume':[np.mean], 'wap':[np.mean], } #####groupby / all seconds df_feature = pd.DataFrame(df.groupby(['time_id']).agg(create_feature_dict)).reset_index() df_feature.columns = ['_'.join(col) for col in df_feature.columns] #time_id is changed to time_id_ ######groupby / last XX seconds last_seconds = [300] for second in last_seconds: second = 600 - second df_feature_sec = pd.DataFrame(df.query(f'seconds_in_bucket >= {second}').groupby(['time_id']).agg(create_feature_dict)).reset_index() df_feature_sec.columns = ['_'.join(col) for col in df_feature_sec.columns] #time_id is changed to time_id_ df_feature_sec = df_feature_sec.add_suffix('_' + str(second)) df_feature = pd.merge(df_feature,df_feature_sec,how='left',left_on='time_id_',right_on=f'time_id__{second}') df_feature = df_feature.drop([f'time_id__{second}'],axis=1) #create row_id stock_id = file_path.split('=')[1] df_feature['row_id'] = df_feature['time_id_'].apply(lambda x:f'{stock_id}-{x}') df_feature = df_feature.drop(['time_id_'],axis=1) return df_feature # trade_train = pd.read_parquet(data_dir + "trade_train.parquet/stock_id=0") # trade_train.head(15) def preprocessor_trade(file_path): df = pd.read_parquet(file_path) df['log_return'] = df.groupby('time_id')['price'].apply(log_return) aggregate_dictionary = { 'log_return':[realized_volatility], 'seconds_in_bucket':[count_unique], 'size':[np.sum], 'order_count':[np.mean], } df_feature = df.groupby('time_id').agg(aggregate_dictionary) df_feature = df_feature.reset_index() df_feature.columns = ['_'.join(col) for col in df_feature.columns] ######groupby / last XX seconds last_seconds = [300] for second in last_seconds: second = 600 - second df_feature_sec = df.query(f'seconds_in_bucket >= {second}').groupby('time_id').agg(aggregate_dictionary) df_feature_sec = df_feature_sec.reset_index() df_feature_sec.columns = ['_'.join(col) for col in df_feature_sec.columns] df_feature_sec = df_feature_sec.add_suffix('_' + str(second)) df_feature = pd.merge(df_feature,df_feature_sec,how='left',left_on='time_id_',right_on=f'time_id__{second}') df_feature = df_feature.drop([f'time_id__{second}'],axis=1) df_feature = df_feature.add_prefix('trade_') stock_id = file_path.split('=')[1] df_feature['row_id'] = df_feature['trade_time_id_'].apply(lambda x:f'{stock_id}-{x}') df_feature = df_feature.drop(['trade_time_id_'],axis=1) return df_feature def preprocessor(list_stock_ids, is_train = True): from joblib import Parallel, delayed # parallel computing to save time # joblib.Parallel(<Parallelへの引数>,n_jobs:使用するCPU数。-1はすべて。verbose:表示ログの長さ。1だと短い)( # joblib.delayed(<実行する関数>)(<関数への引数>) for 変数名 in イテラブル # ) df = pd.DataFrame() def for_joblib(stock_id): if is_train: file_path_book = data_dir + "book_train.parquet/stock_id=" + str(stock_id) file_path_trade = data_dir + "trade_train.parquet/stock_id=" + str(stock_id) else: file_path_book = data_dir + "book_test.parquet/stock_id=" + str(stock_id) file_path_trade = data_dir + "trade_test.parquet/stock_id=" + str(stock_id) df_tmp = pd.merge(preprocessor_book(file_path_book),preprocessor_trade(file_path_trade),on='row_id',how='left') return pd.concat([df,df_tmp]) df = Parallel(n_jobs=-1, verbose=1)( delayed(for_joblib)(stock_id) for stock_id in list_stock_ids ) df = pd.concat(df,ignore_index = True) return df import lightgbm as lgbm from sklearn.model_selection import KFold def calc_model_importance(model, feature_names=None, importance_type='gain'): importance_df = pd.DataFrame(model.feature_importance(importance_type=importance_type), index=feature_names, columns=['importance']).sort_values('importance') return importance_df def plot_importance(importance_df, title='', save_filepath=None, figsize=(8, 12)): fig, ax = plt.subplots(figsize=figsize) importance_df.plot.barh(ax=ax) if title: plt.title(title) plt.tight_layout() if save_filepath is None: plt.show() else: plt.savefig(save_filepath) plt.close() def calc_mean_importance(importance_df_list): mean_importance = np.mean( np.array([df['importance'].values for df in importance_df_list]), axis=0) mean_df = importance_df_list[0].copy() mean_df['importance'] = mean_importance return mean_df def rmspe(y_true, y_pred): return (np.sqrt(np.mean(np.square((y_true - y_pred) / y_true)))) def feval_RMSPE(preds, lgbm_train): labels = lgbm_train.get_label() return 'RMSPE', round(rmspe(y_true = labels, y_pred = preds),5), False def lgbm_learning(train, params, kf): DO_FEAT_IMP = True train['stock_id'] = train['stock_id'].astype(int) test['stock_id'] = test['stock_id'].astype(int) X = train.drop(['row_id','target'],axis=1) y = train['target'] # kf = KFold(n_splits=5, random_state=19901028, shuffle=True) # 出力物 # oof = pd.DataFrame() # out-of-fold result models = [] # models scores = 0.0 # validation score gain_importance_list = [] split_importance_list = [] # 学習実行 for fold, (trn_idx, val_idx) in enumerate(kf.split(X, y)): print("Fold :", fold+1) # create dataset X_train, y_train = X.loc[trn_idx], y[trn_idx] X_valid, y_valid = X.loc[val_idx], y[val_idx] #RMSPE weight weights = 1/np.square(y_train) lgbm_train = lgbm.Dataset(X_train,y_train,weight = weights) weights = 1/np.square(y_valid) lgbm_valid = lgbm.Dataset(X_valid,y_valid,reference = lgbm_train,weight = weights) # model model = lgbm.train(params=params, train_set=lgbm_train, valid_sets=[lgbm_train, lgbm_valid], num_boost_round=5000, feval=feval_RMSPE, verbose_eval=100, categorical_feature = ['stock_id'] ) # validation y_pred = model.predict(X_valid, num_iteration=model.best_iteration) RMSPE = round(rmspe(y_true = y_valid, y_pred = y_pred),3) print(f'Performance of the prediction: , RMSPE: {RMSPE}') #keep scores and models scores += RMSPE / 5 # Kfold=5なので models.append(model) print("*" * 100) # --- calc model feature importance --- if DO_FEAT_IMP: feature_names = X_train.columns.values.tolist() gain_importance_df = calc_model_importance( model, feature_names=feature_names, importance_type='gain') gain_importance_list.append(gain_importance_df) split_importance_df = calc_model_importance( model, feature_names=feature_names, importance_type='split') split_importance_list.append(split_importance_df) output = { "score": scores, "model": models, "gain_importance": gain_importance_list, "split_importance": split_importance_list } return output # ---------------------------------- # データ取得 train, test, sample_sub = get_data() # ---------------------------------- # 特徴量作成 train_features = preprocessor(train.stock_id.unique(), is_train= True) test_features = preprocessor(test.stock_id.unique(), is_train= False) train = pd.merge(train, train_features, on= "row_id", how= "left") test = pd.merge(test, test_features, on= "row_id", how= "left") # ---------------------------------- # 機械学習 lgbm_params = { "objective": "rmse", "metric": "rmse", "boosting_type": "gbdt", 'early_stopping_rounds': 30, 'learning_rate': 0.01, 'lambda_l1': 1, 'lambda_l2': 1, 'feature_fraction': 0.8, 'bagging_fraction': 0.8, } kf_ = KFold(n_splits=5, random_state=19901028, shuffle=True) outputs = lgbm_learning(train= train, params= lgbm_params, kf= kf_)