Files
smart_trading/scrap_options_data.py
2025-11-29 14:07:45 -06:00

169 lines
6.3 KiB
Python

import yfinance as yf
import pandas as pd
import datetime
import os
import numpy as np
import exchange_calendars as xcals
from scipy.stats import norm
from utils import get_nasdaq100_tickers, get_sp500_tickers
from time import sleep
# --- Delta Calculation Function (Black-Scholes-Merton) ---
def bsm_delta(S, K, T, r, sigma, option_type):
"""
Calculates the option delta using the Black-Scholes-Merton model.
S: Current stock price
K: Strike price
T: Time to expiration (in years)
r: Risk-free rate (annual)
sigma: Volatility (annualized, typically Implied Volatility)
option_type: 'call' or 'put'
"""
if T <= 0: # Handle options that have expired
if option_type == "call":
return 1.0 if S > K else 0.0
else: # put
return 0.0 if S < K else -1.0
# Calculate d1
d1 = (np.log(S / K) + (r + 0.5 * sigma**2) * T) / (sigma * np.sqrt(T))
if option_type == "call":
# Delta for a call option is N(d1)
return norm.cdf(d1)
elif option_type == "put":
# Delta for a put option is N(d1) - 1
return norm.cdf(d1) - 1
return np.nan # Should not happen
def is_trading_day(check_date: datetime.date) -> bool:
nyse = xcals.get_calendar("XNYS")
# Check if the date is a valid trading day (excludes weekends and holidays)
is_trading = nyse.is_session(check_date.strftime("%Y-%m-%d"))
return is_trading
if __name__ == "__main__":
# Only run if it is a trading day
if not is_trading_day(datetime.date.today()):
raise UserWarning("Today is not a trading day")
# --- Main Script Modifications ---
# 1. Set the risk-free rate (e.g., current 3-month T-bill rate)
RISK_FREE_RATE = 0.01 # Use a current rate (e.g., 5%)
date_str = datetime.datetime.now().strftime("%Y_%m_%d")
base_folder = os.path.join("data", "options")
if not os.path.isdir(os.path.join(base_folder, date_str)):
os.mkdir(os.path.join(base_folder, date_str))
else:
# only run if we have not grabbed the data yet today
raise UserWarning("We already have the data, no need to get it again")
base_folder = os.path.join(base_folder, date_str)
TODAY = datetime.datetime.now() # Current date/time for T calculation
for ticker_list in [get_sp500_tickers, get_nasdaq100_tickers]:
for ticker_symbol in ticker_list():
filename_start = f"{date_str}_{ticker_symbol}"
# ... (rest of setup)
# Create a Ticker object
ticker = yf.Ticker(ticker_symbol)
# 2. Get the current stock price
try:
stock_info = ticker.info
current_stock_price = stock_info.get("regularMarketPrice")
if current_stock_price is None:
print(f"Could not get current price for {ticker_symbol}. Skipping.")
continue
except Exception as e:
print(f"Error getting stock price for {ticker_symbol}: {e}. Skipping.")
continue
expirations = ticker.options
all_options_data = []
for date_str_exp in expirations:
try:
# Calculate T (Time to Expiration in years)
# Note: yfinance date format is YYYY-MM-DD
expiration_date = datetime.datetime.strptime(
date_str_exp, "%Y-%m-%d"
)
time_to_expiration_days = (expiration_date - TODAY).days
# Use 252 or 365 as convention, 252 for trading days, 365 for calendar days
# 365 is often used for options pricing
T = time_to_expiration_days / 365.0
options_chain = ticker.option_chain(date_str_exp)
calls_df = options_chain.calls
puts_df = options_chain.puts
# ... (Add expiration and option_type columns as before)
calls_df["expiration"] = date_str_exp
puts_df["expiration"] = date_str_exp
calls_df["option_type"] = "call"
puts_df["option_type"] = "put"
# 3. Calculate Delta for Calls
calls_df["delta"] = calls_df.apply(
lambda row: bsm_delta(
S=current_stock_price,
K=row["strike"],
T=T,
r=RISK_FREE_RATE,
sigma=row["impliedVolatility"],
option_type="call",
),
axis=1,
)
# 4. Calculate Delta for Puts
puts_df["delta"] = puts_df.apply(
lambda row: bsm_delta(
S=current_stock_price,
K=row["strike"],
T=T,
r=RISK_FREE_RATE,
sigma=row["impliedVolatility"],
option_type="put",
),
axis=1,
)
all_options_data.append(calls_df)
all_options_data.append(puts_df)
except Exception as e:
print(
f"Could not retrieve or calculate delta for {date_str_exp} on {ticker_symbol}: {e}"
)
# ... (Concatenate and save data as before)
if all_options_data:
full_options_df = pd.concat(all_options_data)
print(
f"\nFull Options Chain for {ticker_symbol} across all expirations (with Delta):"
)
# Display columns relevant to delta calculation
print(
full_options_df[
["strike", "impliedVolatility", "option_type", "delta"]
].head()
)
full_options_df.to_csv(
os.path.join(base_folder, f"{filename_start}.csv")
)
full_options_df.to_pickle(
os.path.join(base_folder, f"{filename_start}.pkl")
)
else:
print(f"No options data retrieved for {ticker_symbol}.")
sleep(0.5)