import yfinance as yf import pandas as pd import datetime import os import numpy as np import exchange_calendars as xcals from scipy.stats import norm from utils import get_nasdaq100_tickers, get_sp500_tickers from time import sleep # --- Delta Calculation Function (Black-Scholes-Merton) --- def bsm_delta(S, K, T, r, sigma, option_type): """ Calculates the option delta using the Black-Scholes-Merton model. S: Current stock price K: Strike price T: Time to expiration (in years) r: Risk-free rate (annual) sigma: Volatility (annualized, typically Implied Volatility) option_type: 'call' or 'put' """ if T <= 0: # Handle options that have expired if option_type == "call": return 1.0 if S > K else 0.0 else: # put return 0.0 if S < K else -1.0 # Calculate d1 d1 = (np.log(S / K) + (r + 0.5 * sigma**2) * T) / (sigma * np.sqrt(T)) if option_type == "call": # Delta for a call option is N(d1) return norm.cdf(d1) elif option_type == "put": # Delta for a put option is N(d1) - 1 return norm.cdf(d1) - 1 return np.nan # Should not happen def is_trading_day(check_date: datetime.date) -> bool: nyse = xcals.get_calendar("XNYS") # Check if the date is a valid trading day (excludes weekends and holidays) is_trading = nyse.is_session(check_date.strftime("%Y-%m-%d")) return is_trading if __name__ == "__main__": # Only run if it is a trading day if not is_trading_day(datetime.date.today()): raise UserWarning("Today is not a trading day") # --- Main Script Modifications --- # 1. Set the risk-free rate (e.g., current 3-month T-bill rate) RISK_FREE_RATE = 0.01 # Use a current rate (e.g., 5%) date_str = datetime.datetime.now().strftime("%Y_%m_%d") base_folder = os.path.join("data", "options") if not os.path.isdir(os.path.join(base_folder, date_str)): os.mkdir(os.path.join(base_folder, date_str)) else: # only run if we have not grabbed the data yet today raise UserWarning("We already have the data, no need to get it again") base_folder = os.path.join(base_folder, date_str) TODAY = datetime.datetime.now() # Current date/time for T calculation for ticker_list in [get_sp500_tickers, get_nasdaq100_tickers]: for ticker_symbol in ticker_list(): filename_start = f"{date_str}_{ticker_symbol}" # ... (rest of setup) # Create a Ticker object ticker = yf.Ticker(ticker_symbol) # 2. Get the current stock price try: stock_info = ticker.info current_stock_price = stock_info.get("regularMarketPrice") if current_stock_price is None: print(f"Could not get current price for {ticker_symbol}. Skipping.") continue except Exception as e: print(f"Error getting stock price for {ticker_symbol}: {e}. Skipping.") continue expirations = ticker.options all_options_data = [] for date_str_exp in expirations: try: # Calculate T (Time to Expiration in years) # Note: yfinance date format is YYYY-MM-DD expiration_date = datetime.datetime.strptime( date_str_exp, "%Y-%m-%d" ) time_to_expiration_days = (expiration_date - TODAY).days # Use 252 or 365 as convention, 252 for trading days, 365 for calendar days # 365 is often used for options pricing T = time_to_expiration_days / 365.0 options_chain = ticker.option_chain(date_str_exp) calls_df = options_chain.calls puts_df = options_chain.puts # ... (Add expiration and option_type columns as before) calls_df["expiration"] = date_str_exp puts_df["expiration"] = date_str_exp calls_df["option_type"] = "call" puts_df["option_type"] = "put" # 3. Calculate Delta for Calls calls_df["delta"] = calls_df.apply( lambda row: bsm_delta( S=current_stock_price, K=row["strike"], T=T, r=RISK_FREE_RATE, sigma=row["impliedVolatility"], option_type="call", ), axis=1, ) # 4. Calculate Delta for Puts puts_df["delta"] = puts_df.apply( lambda row: bsm_delta( S=current_stock_price, K=row["strike"], T=T, r=RISK_FREE_RATE, sigma=row["impliedVolatility"], option_type="put", ), axis=1, ) all_options_data.append(calls_df) all_options_data.append(puts_df) except Exception as e: print( f"Could not retrieve or calculate delta for {date_str_exp} on {ticker_symbol}: {e}" ) # ... (Concatenate and save data as before) if all_options_data: full_options_df = pd.concat(all_options_data) print( f"\nFull Options Chain for {ticker_symbol} across all expirations (with Delta):" ) # Display columns relevant to delta calculation print( full_options_df[ ["strike", "impliedVolatility", "option_type", "delta"] ].head() ) full_options_df.to_csv( os.path.join(base_folder, f"{filename_start}.csv") ) full_options_df.to_pickle( os.path.join(base_folder, f"{filename_start}.pkl") ) else: print(f"No options data retrieved for {ticker_symbol}.") sleep(0.5)