Source code for pymicrostructure.metrics.market

"""Range of metrics to analyze market data."""

from pymicrostructure.markets.base import Market
import pandas as pd
import numpy as np
from numpy.lib.stride_tricks import as_strided
from scipy import stats
from statsmodels.tsa.stattools import adfuller
import seaborn as sns


################ LIQUIDITY METRICS ################


[docs] def quoted_spread(market: Market) -> pd.DataFrame: """ Calculate the quoted spread of a market. The quoted spread is the difference between the best bid and best ask prices. Parameters: ----------- market : Market An object representing the market, which must have an 'ob_snapshots' attribute. Each snapshot should be a dictionary with 'bid' and 'ask' keys. Returns: -------- pd.DataFrame A DataFrame with a 'quoted_spread' column, indexed by snapshot times. """ best_bid = [ snapshot["bid"][0]["price"] if snapshot["bid"] else None for snapshot in market.ob_snapshots ] best_ask = [ snapshot["ask"][0]["price"] if snapshot["ask"] else None for snapshot in market.ob_snapshots ] ob_time = [snapshot["time"] for snapshot in market.ob_snapshots] df = pd.DataFrame({"best_bid": best_bid, "best_ask": best_ask}, index=ob_time) df["quoted_spread"] = df["best_ask"] - df["best_bid"] return df[["quoted_spread"]]
[docs] def effective_spread( market: Market, volume: float, relative: bool = False ) -> pd.DataFrame: """ Calculate the effective spread of a market for a given order size. The effective spread is the difference between the execution price of a market order and the midpoint price, multiplied by 2 to account for round-trip costs. Parameters: ----------- market : Market An object representing the market, which must have an 'ob_snapshots' attribute. Each snapshot should be a dictionary with 'bid' and 'ask' keys, containing lists of price-volume pairs. volume : float The size of the market order to simulate. Returns: -------- pd.DataFrame A DataFrame with 'effective_spread_buy' and 'effective_spread_sell' columns, indexed by snapshot times. """ assert volume > 0, "Volume must be positive." def calculate_execution_price(orders, volume): remaining_volume = volume execution_price = 0 for order in orders: price, size = order["price"], abs(order["volume"]) if remaining_volume <= size: execution_price += price * remaining_volume break else: execution_price += price * size remaining_volume -= size return execution_price / volume if volume > 0 else None effective_spreads = [] ob_times = [] for snapshot in market.ob_snapshots: bid_orders = snapshot["bid"] ask_orders = snapshot["ask"] if not bid_orders or not ask_orders: effective_spreads.append((None, None)) else: mid_price = (bid_orders[0]["price"] + ask_orders[0]["price"]) / 2 buy_execution_price = calculate_execution_price(ask_orders, volume) sell_execution_price = calculate_execution_price(bid_orders, volume) if buy_execution_price and sell_execution_price: effective_spread_buy = 2 * abs((buy_execution_price - mid_price)) effective_spread_sell = -2 * abs((mid_price - sell_execution_price)) if relative: effective_spread_buy /= mid_price effective_spread_sell /= mid_price effective_spreads.append((effective_spread_buy, effective_spread_sell)) else: effective_spreads.append((None, None)) ob_times.append(snapshot["time"]) df = pd.DataFrame( effective_spreads, columns=[ f"effective_spread_buy_v_{volume}", f"effective_spread_sell_v_{volume}", ], index=ob_times, ) return df
[docs] def amihud_illiquidity(market: Market, window: int = 20) -> pd.DataFrame: """ Calculate the rolling window Amihud illiquidity measure (lambda) based on trade history. Parameters: ----------- market : Market An object representing the market, which must have a 'trade_history' attribute. Each trade in the history should be a dictionary with 'price', 'volume', and 'time' keys. window : int, optional The size of the rolling window (in number of periods). Default is 20. Returns: -------- pd.DataFrame A DataFrame with an 'amihud_lambda' column, indexed by time. """ # Convert trade history to DataFrame trades_df = pd.DataFrame(market.trade_history) trades_df.set_index("time", inplace=True) # Calculate returns trades_df["return"] = trades_df["price"].pct_change() * 100 # Calculate dollar volume trades_df["dollar_volume"] = trades_df["price"] * trades_df["volume"] assert 0 not in trades_df["dollar_volume"], "Dollar volume cannot be zero." # Calculate Amihud measure trades_df["daily_amihud"] = abs(trades_df["return"]) / trades_df["dollar_volume"] # Calculate rolling window Amihud lambda trades_df["amihud_lambda"] = trades_df["daily_amihud"].rolling(window=window).mean() return trades_df[["amihud_lambda"]]
[docs] def kyle_lambda(market: Market, window: int = 20) -> pd.DataFrame: """ Calculate the rolling window Kyle's Lambda based on trade history. Parameters: ----------- market : Market An object representing the market, which must have a 'trade_history' attribute. Each trade in the history should be a dictionary with 'price', 'volume', 'aggressor_side', and 'time' keys. window : int, optional The size of the rolling window (in number of periods). Default is 20. Returns: -------- pd.DataFrame A DataFrame with a 'kyle_lambda' column, indexed by time. """ # Convert trade history to DataFrame trades_df = pd.DataFrame(market.trade_history) trades_df.set_index("time", inplace=True) # Calculate price changes trades_df["price_change"] = trades_df["price"].diff() # Calculate signed volume (order flow) trades_df["signed_volume"] = trades_df["volume"] * trades_df["aggressor_side"] trades_df.dropna(inplace=True) def rolling_regression(x, y, window=100): slope = [np.nan] * window for i in range(window, len(x)): slope.append(np.polyfit(x[i - window : i], y[i - window : i], 1)[0]) return slope trades_df["kyle_lambda"] = rolling_regression( trades_df["price_change"], trades_df["signed_volume"] ) # Function to calculate Kyle's Lambda for a window return trades_df[["kyle_lambda"]]
################ INEFFICIENCY METRICS ##################
[docs] def returns_autocorrelation(market: Market, window: int = 20) -> pd.DataFrame: """ Calculate the rolling window auto-correlation of returns based on trade history. Parameters: ----------- market : Market An object representing the market, which must have a 'trade_history' attribute. Each trade in the history should be a dictionary with 'price', 'time', and 'aggressor_side' keys. window : int, optional The size of the rolling window (in number of periods). Default is 20. Returns: -------- pd.DataFrame A DataFrame with a 'returns_autocorr' column, indexed by time. """ # Convert trade history to DataFrame trades_df = pd.DataFrame(market.trade_history) trades_df.set_index("time", inplace=True) # Calculate returns trades_df["return"] = trades_df["price"].pct_change() # Calculate rolling window auto-correlation trades_df["returns_autocorr"] = ( trades_df["return"].rolling(window=window).corr(trades_df["return"].shift(1)) ) return trades_df[["returns_autocorr"]]
[docs] def variance_ratio_test(market: Market, k: int = 5, window: int = 100) -> pd.DataFrame: """ Perform a rolling Variance Ratio test on price series. Parameters: ----------- market : Market An object representing the market, which must have a 'trade_history' attribute. Each trade in the history should be a dictionary with 'price' and 'time' keys. k : int, optional The number of periods to use for the k-period return. Default is 5. window : int, optional The size of the rolling window. Default is 100. Returns: -------- pd.DataFrame A DataFrame with 'vr_statistic' and 'p_value' columns, indexed by time. """ # Convert trade history to DataFrame trades_df = pd.DataFrame(market.trade_history) trades_df.set_index("time", inplace=True) # Calculate log returns trades_df["log_return"] = np.log(trades_df["price"]).diff() def calculate_vr(returns): if len(returns) < window: return pd.Series({"vr_statistic": np.nan, "p_value": np.nan}) # Calculate 1-period and k-period variances var_1 = np.var(returns) var_k = np.var(returns.rolling(k).sum()) / k # Calculate Variance Ratio vr = var_k / var_1 # Calculate test statistic m = len(returns) phi = (2 * (2 * k - 1) * (k - 1)) / (3 * k * m) vr_statistic = (vr - 1) / np.sqrt(phi) return vr_statistic # Perform rolling Variance Ratio test trades_df.dropna(inplace=True) trades_df["vr_statistic"] = ( trades_df["log_return"].rolling(window).apply(calculate_vr) ) return trades_df[["vr_statistic"]]
[docs] def hurst_exponent( market: Market, window: int = 100, max_lag: int = 20 ) -> pd.DataFrame: """ Calculate the rolling Hurst exponent for a price series. Parameters: ----------- market : Market An object representing the market, which must have a 'trade_history' attribute. Each trade in the history should be a dictionary with 'price' and 'time' keys. window : int, optional The size of the rolling window. Default is 100. max_lag : int, optional The maximum lag to consider in the R/S calculation. Default is 20. Returns: -------- pd.DataFrame A DataFrame with a 'hurst_exponent' column, indexed by time. """ # Convert trade history to DataFrame trades_df = pd.DataFrame(market.trade_history) trades_df.set_index("time", inplace=True) # Calculate log returns trades_df["log_return"] = np.log(trades_df["price"]).diff() def calculate_hurst(returns): returns = returns.dropna().values # Convert to numpy array and drop NaNs if len(returns) < window: return np.nan # Calculate the array of the variances of the lagged differences tau = [np.arange(1, lag + 1) for lag in range(2, max_lag)] # Calculate the cumulative sum of log returns cum_sum = returns.cumsum() def r_s(lag): # Use list comprehension instead of stride tricks series = [cum_sum[lag:] - cum_sum[:-lag]] # Calculate the R/S statistic r = np.max(series) - np.min(series) s = np.std(np.diff(series)) return r / s if s != 0 else np.nan r_s_values = [r_s(t) for t in range(2, max_lag)] # Calculate the Hurst exponent hurst = np.polyfit(np.log(range(2, max_lag)), np.log(r_s_values), 1)[0] return hurst # Calculate rolling Hurst exponent trades_df["hurst_exponent"] = ( trades_df["log_return"].rolling(window=window).apply(calculate_hurst) ) return trades_df[["hurst_exponent"]]
[docs] def rolling_adf_test( market: Market, window: int = 100, alpha: float = 0.05 ) -> pd.DataFrame: """ Perform a rolling Augmented Dickey-Fuller test on price series. Parameters: ----------- market : Market An object representing the market, which must have a 'trade_history' attribute. Each trade in the history should be a dictionary with 'price' and 'time' keys. window : int, optional The size of the rolling window. Default is 100. alpha : float, optional The significance level for the test. Default is 0.05. Returns: -------- pd.DataFrame A DataFrame with 'adf_statistic', 'p_value', and 'is_stationary' columns, indexed by time. """ # Convert trade history to DataFrame trades_df = pd.DataFrame(market.trade_history) trades_df.set_index("time", inplace=True) def perform_adf_test(prices): if len(prices) < window: return pd.Series( {"adf_statistic": np.nan, "p_value": np.nan, "is_stationary": np.nan} ) result = adfuller(prices, autolag="AIC") adf_statistic, p_value = result[0], result[1] return adf_statistic # Perform rolling ADF test trades_df["adf_statistic"] = ( trades_df["price"].rolling(window).apply(perform_adf_test) ) return trades_df[["adf_statistic"]]
################ ORDER FLOW METRICS ####################
[docs] def rolling_cancellation_rate(market: Market, window: int = 100) -> pd.DataFrame: """ Calculate the rolling window cancellation rate based on order history. The cancellation rate is the number of canceled orders divided by the total number of orders. Parameters: ----------- market : Market An object representing the market, which must have a 'cancellations' and 'order_history' attribute. Each order in the history should be a dictionary with 'status' and 'time' keys. window : int, optional The size of the rolling window. Default is 100. Returns: -------- pd.DataFrame A DataFrame with a 'cancellation_rate' column, indexed by time. """ # Convert order history to DataFrame cancellations = market.cancellations cancellations = [(abs(x.volume), x.time) for x in cancellations] cancellations_df = pd.DataFrame(cancellations, columns=["volume", "time"]) cancellations_df.set_index("time", inplace=True) # aggregate on time cancellations_df = cancellations_df.groupby("time").sum() return cancellations_df
[docs] def order_flow_imbalance(market: Market, window: int = 100) -> pd.DataFrame: """ Calculate the rolling window order flow imbalance based on trade history. The order flow imbalance is the ratio of buy volume to total volume. Parameters: ----------- market : Market An object representing the market, which must have a 'trade_history' attribute. Each trade in the history should be a dictionary with 'volume' and 'aggressor_side' keys. window : int, optional The size of the rolling window. Default is 100. Returns: -------- pd.DataFrame A DataFrame with an 'order_flow_imbalance' column, indexed by time. """ # Convert trade history to DataFrame trades_df = pd.DataFrame(market.trade_history) trades_df.set_index("time", inplace=True) # Calculate order flow imbalance trades_df["order_flow_imbalance"] = ( trades_df["volume"] * trades_df["aggressor_side"] ) # Calculate rolling window order flow imbalance trades_df["order_flow_imbalance"] = ( trades_df["order_flow_imbalance"].rolling(window=window).mean() ) return trades_df[["order_flow_imbalance"]]
[docs] def trade_sign_autocorrelation(market: Market, window: int = 100) -> pd.DataFrame: """ Calculate the rolling window auto-correlation of trade signs based on trade history. The trade sign is the sign of the trade price change. Parameters: ----------- market : Market An object representing the market, which must have a 'trade_history' attribute. Each trade in the history should be a dictionary with 'price', 'time', and 'aggressor_side' keys. window : int, optional The size of the rolling window. Default is 100. Returns: -------- pd.DataFrame A DataFrame with a 'trade_sign_autocorr' column, indexed by time. """ # Convert trade history to DataFrame trades_df = pd.DataFrame(market.trade_history) trades_df.set_index("time", inplace=True) # Calculate trade signs trades_df["trade_sign"] = np.sign(trades_df["aggressor_side"]) # Calculate rolling window auto-correlation trades_df["trade_sign_autocorr"] = ( trades_df["trade_sign"] .rolling(window=window) .corr(trades_df["trade_sign"].shift(1)) ) return trades_df[["trade_sign_autocorr"]]
################ ORDER BOOK METRICS ####################
[docs] def order_book_depth(market: Market, window: int = 100) -> pd.DataFrame: """ Calculate the rolling window order book depth based on order book snapshots. The order book depth is the total volume at the best bid and best ask prices. Parameters: ----------- market : Market An object representing the market, which must have an 'ob_snapshots' attribute. Each snapshot should be a dictionary with 'bid' and 'ask' keys, containing lists of price-volume pairs. window : int, optional The size of the rolling window. Default is 100. Returns: -------- pd.DataFrame A DataFrame with 'bid_depth' and 'ask_depth' columns, indexed by time. """ # Extract bid and ask depths from order book snapshots bid_depth = [ sum(order["volume"] for order in snapshot["bid"]) if snapshot["bid"] else 0 for snapshot in market.ob_snapshots ] ask_depth = [ sum(order["volume"] for order in snapshot["ask"]) if snapshot["ask"] else 0 for snapshot in market.ob_snapshots ] ob_time = [snapshot["time"] for snapshot in market.ob_snapshots] df = pd.DataFrame({"bid_depth": bid_depth, "ask_depth": ask_depth}, index=ob_time) df["bid_depth"] = df["bid_depth"].rolling(window=window).mean() df["ask_depth"] = df["ask_depth"].rolling(window=window).mean() df["depth_difference"] = df["ask_depth"] + df["bid_depth"] return df[["bid_depth", "ask_depth", "depth_difference"]]
[docs] def order_book_heatmap(market: Market, frequency: int = 10) -> pd.DataFrame: """ Create a heatmap of order book volumes over time. Parameters: ----------- market : Market An object representing the market, which must have an 'ob_snapshots' attribute. Each snapshot should be a dictionary with 'bid' and 'ask' keys, containing lists of price-volume pairs. frequency : int, optional The frequency of snapshots to include in the heatmap. Default is 10. This parameter can be used to reduce the computation time of the heatmap. At a cost of less resolution. Returns: -------- pd.DataFrame A DataFrame with order book volumes indexed by time and price level. """ df = pd.DataFrame() for i in market.ob_snapshots[::10]: timestamp = i["time"] bid_prices = np.array([b["price"] for b in i["bid"]]) bid_volumes = np.array([b["volume"] for b in i["bid"]]) bid_volumes = np.cumsum(bid_volumes) ask_prices = np.array([a["price"] for a in i["ask"]]) ask_volumes = np.array([a["volume"] for a in i["ask"]]) ask_volumes = np.cumsum(ask_volumes) cols = np.append(bid_prices, ask_prices) data = np.append(bid_volumes, ask_volumes) df_slice = pd.DataFrame(index=[timestamp], columns=cols, data=[data]) df = pd.concat([df, df_slice]) df = df[df.columns.sort_values()] bids = df[df > 0].bfill(axis=1).fillna(0) asks = df[df < 0].ffill(axis=1).abs().fillna(0) return (bids + asks).T[::-1]
################ PRICE DYNAMICS METRICS ####################
[docs] def vwap(market: Market, window: int = 100) -> pd.DataFrame: """ Calculate the rolling window volume-weighted average price (VWAP) based on trade history. Parameters: ----------- market : Market An object representing the market, which must have a 'trade_history' attribute. Each trade in the history should be a dictionary with 'price', 'volume', and 'time' keys. window : int, optional The size of the rolling window. Default is 100. Returns: -------- pd.DataFrame A DataFrame with a 'vwap' column, indexed by time. """ # Convert trade history to DataFrame trades_df = pd.DataFrame(market.trade_history) trades_df.set_index("time", inplace=True) # Calculate VWAP trades_df["dollar_volume"] = trades_df["price"] * trades_df["volume"] trades_df["cumulative_dollar_volume"] = ( trades_df["dollar_volume"].rolling(window=window).sum() ) trades_df["cumulative_volume"] = trades_df["volume"].rolling(window=window).sum() trades_df["vwap"] = ( trades_df["cumulative_dollar_volume"] / trades_df["cumulative_volume"] ) return trades_df[["vwap"]]
[docs] def trade_midprice_deviation(market: Market, window: int = 100) -> pd.DataFrame: """ Calculate the rolling window deviation of trade prices from the midprice based on trade history. The midprice is the average of the best bid and best ask prices. Parameters: ----------- market : Market An object representing the market, which must have an 'ob_snapshots' and 'trade_history' attribute. Each snapshot should be a dictionary with 'bid' and 'ask' keys, containing lists of price-volume pairs. Each trade in the history should be a dictionary with 'price' and 'time' keys. window : int, optional The size of the rolling window. Default is 100. Returns: -------- pd.DataFrame A DataFrame with a 'trade_midprice_deviation' column, indexed by time. """ # Extract midprices from order book snapshots midprices = [ ( (snapshot["bid"][0]["price"] + snapshot["ask"][0]["price"]) / 2 if snapshot["bid"] and snapshot["ask"] else None ) for snapshot in market.ob_snapshots ] ob_time = [snapshot["time"] for snapshot in market.ob_snapshots] # Convert trade history to DataFrame trades_df = pd.DataFrame(market.trade_history) trades_df.set_index("time", inplace=True) ob_midprices = pd.DataFrame(midprices, index=ob_time, columns=["midprice"]) summary = trades_df.join(ob_midprices, how="outer") summary.dropna(inplace=True) # Calculate deviation from midprice summary["trade_midprice_deviation"] = abs(summary["price"] - summary["midprice"]) summary["trade_midprice_deviation"] = ( summary["trade_midprice_deviation"].rolling(window=window).mean() ) return summary[["trade_midprice_deviation"]]
[docs] def realized_volatility(market: Market, window: int = 100) -> pd.DataFrame: """ Calculate the rolling window realized volatility based on trade history. Realized volatility is the standard deviation of price returns over a given window. Parameters: ----------- market : Market An object representing the market, which must have a 'trade_history' attribute. Each trade in the history should be a dictionary with 'price' and 'time' keys. window : int, optional The size of the rolling window. Default is 100. Returns: -------- pd.DataFrame A DataFrame with a 'realized_volatility' column, indexed by time. """ # Convert trade history to DataFrame trades_df = pd.DataFrame(market.trade_history) trades_df.set_index("time", inplace=True) # Calculate log returns trades_df["log_return"] = np.log(trades_df["price"]).diff() # Calculate rolling window realized volatility trades_df["realized_volatility"] = ( trades_df["log_return"].rolling(window=window).std() ) return trades_df[["realized_volatility"]]
################ MICROSTRUCTURE METRICS ################
[docs] def roll_spread_estimator( market: Market, window_size: int = 100, relative: bool = False ) -> pd.DataFrame: """ Estimate the rolling spread of a market based on its trade history. This function calculates the rolling spread using a covariance-based method on the price changes over a specified window size. Parameters: ----------- market : Market An object representing the market, which must have a 'trade_history' attribute. Each trade in the history should be a dictionary with 'price' and 'time' keys. window_size : int, optional (default=100) The size of the rolling window for spread estimation. relative : bool, optional (default=False) If True, calculate relative price changes instead of absolute changes. Returns: -------- pd.DataFrame A DataFrame with a 'roll_spread' column, indexed by trade times. Notes: ------ The roll spread is estimated as: roll_spread = mult * sqrt(-cov(delta_price_t, delta_price_t-1)) where mult is 200 for relative changes and 2 for absolute changes. """ trade_data = [ {"price": trade["price"], "time": trade["time"]} for trade in market.trade_history ] df = pd.DataFrame(trade_data) df.set_index("time", inplace=True) if relative: df["price_delta"] = df["price"].pct_change() else: df["price_delta"] = df["price"].diff() df["price_delta_l1"] = df["price_delta"].shift(1) df.dropna(inplace=True) df["rolling_cov"] = ( df["price_delta"].rolling(window=window_size).cov(df["price_delta_l1"]) ) mult = 200 if relative else 2 df["roll_spread"] = mult * np.sqrt(-df["rolling_cov"].clip(upper=0)) return df[["roll_spread"]]
################ OTHER METRICS METRICS ################
[docs] def news_goodness(market: Market, window: int = 20) -> pd.DataFrame: """ Calculate the rolling window news goodness based on trade history. The news goodness is the ratio of the number of trades with positive news to the total number of trades. Parameters: ----------- market : Market An object representing the market, which must have a 'trade_history' attribute. Each trade in the history should be a dictionary with 'price', 'time', and 'news' keys. window : int, optional The size of the rolling window. Default is 100. Returns: -------- pd.DataFrame A DataFrame with a 'news_goodness' column, indexed by time. """ # Convert trade history to DataFrame news_history = pd.DataFrame(market.news_history) return news_history