inital commit

This commit is contained in:
2025-11-29 14:07:45 -06:00
parent 63518d364c
commit f48ab9869d
5 changed files with 384 additions and 0 deletions

32
requirements.txt Normal file
View File

@@ -0,0 +1,32 @@
beautifulsoup4==4.14.2
certifi==2025.10.5
cffi==2.0.0
charset-normalizer==3.4.3
curl_cffi==0.13.0
exchange_calendars==4.11.1
frozendict==2.4.6
html5lib==1.1
idna==3.10
korean-lunar-calendar==0.3.1
lxml==6.0.2
multitasking==0.0.12
numpy==2.3.3
pandas==2.3.3
peewee==3.18.2
platformdirs==4.4.0
protobuf==6.32.1
pycparser==2.23
pyluach==2.3.0
python-dateutil==2.9.0.post0
pytz==2025.2
requests==2.32.5
scipy==1.16.2
six==1.17.0
soupsieve==2.8
toolz==1.0.0
typing_extensions==4.15.0
tzdata==2025.2
urllib3==2.5.0
webencodings==0.5.1
websockets==15.0.1
yfinance==0.2.66

4
scrap.sh Executable file
View File

@@ -0,0 +1,4 @@
#!/bin/bash
cd /home/westfarn/Documents/repos/smart_trading
. venv/bin/activate
python scrap_options_data.py

152
scrap_data.py Normal file
View File

@@ -0,0 +1,152 @@
import pandas as pd
import yfinance as yf
import requests
import numpy as np
from utils import get_sp500_tickers
# --- 1. Main Function to Create Enhanced DataFrame ---
def create_enhanced_repository(tickers, years=3):
"""
Downloads enhanced historical data for all tickers, calculates metrics,
and returns a single combined DataFrame.
"""
if not tickers:
print("Ticker list is empty. Cannot fetch data.")
return pd.DataFrame()
print(f"Starting data download for {len(tickers)} stocks for {years} years...")
# 2a. Download all historical OHLCV data
data = yf.download(tickers, period=f"{years}y", progress=False, auto_adjust=False)
if data.empty:
print("Failed to download data.")
return pd.DataFrame()
# Isolate the OHLCV data
df_ohlcv = data[["Open", "High", "Low", "Close", "Adj Close", "Volume"]].copy()
# --- CALCULATE NEW METRICS ---
# Calculate 250-day and 30-day Simple Moving Average (SMA) on Adj Close
adj_close_data = df_ohlcv["Adj Close"]
sma_250_data = adj_close_data.rolling(window=250).mean()
sma_30_data = adj_close_data.rolling(window=30).mean()
for ticker in tickers:
df_ohlcv.loc[:, ("SMA_250", ticker)] = sma_250_data[ticker]
df_ohlcv.loc[:, ("SMA_30", ticker)] = sma_30_data[ticker]
# Calculate Adjusted Open (using the Adj Close to Close ratio as the adjustment factor)
for ticker in tickers:
# The adjustment factor handles splits and dividends
adj_factor = df_ohlcv["Adj Close"][ticker] / df_ohlcv["Close"][ticker]
df_ohlcv.loc[:, ("Calculated_Adj_Open", ticker)] = (
df_ohlcv["Open"][ticker] * adj_factor
)
# --- RESTRUCTURE DATA ---
# Stack the multi-index columns to create a "tidy" format (Date, Ticker, Metric)
df_long = df_ohlcv.stack(level=1).reset_index()
# Assign column names based on the order of the multi-index columns after stacking
df_long.columns = [
"Date",
"Ticker",
"Open",
"High",
"Low",
"Close",
"Adj_Close",
"Volume",
"SMA_250",
"SMA_30",
"Calculated_Adj_Open",
]
# Initialize columns for fundamental data
df_long["Market_Cap"] = np.nan
df_long["PE_Ratio"] = np.nan
df_long["Earnings_Release_Day"] = False
# --- FETCH FUNDAMENTAL DATA AND EARNINGS DATES ---
for ticker_symbol in tickers:
try:
ticker = yf.Ticker(ticker_symbol)
# Fundamentals (Latest values)
info = ticker.info
market_cap = info.get("marketCap")
pe_ratio = info.get("trailingPE")
# Apply latest fundamental data to all historical rows for the stock
# NOTE: Market Cap and P/E are the LATEST values, not historical time-series
mask = df_long["Ticker"] == ticker_symbol
df_long.loc[mask, "Market_Cap"] = market_cap
df_long.loc[mask, "PE_Ratio"] = pe_ratio
# Earnings Release Dates
earnings_dates_df = ticker.earnings_dates
if earnings_dates_df is not None and not earnings_dates_df.empty:
# Use only the date part for comparison
earnings_dates = set(earnings_dates_df.index.date)
# Set the Earnings_Release_Day flag
df_long.loc[
mask & df_long["Date"].dt.date.isin(earnings_dates),
"Earnings_Release_Day",
] = True
except Exception as e:
# This is common for tickers that may have delisted or have bad data
print(
f"Warning: Could not fetch fundamental/earnings data for {ticker_symbol}. Error: {e}"
)
# Final cleanup and column selection
final_cols = [
"Date",
"Ticker",
"Open",
"High",
"Low",
"Close",
"Adj_Close",
"Calculated_Adj_Open",
"Volume",
"SMA_250",
"SMA_30",
"Market_Cap",
"PE_Ratio",
"Earnings_Release_Day",
]
df_repository_final = (
df_long[final_cols].sort_values(by=["Ticker", "Date"]).reset_index(drop=True)
)
print("\nEnhanced Data Repository created successfully.")
return df_repository_final
# --- Execution Block ---
# 1. Get the list of tickers (Will fetch all 500+ when run locally)
sp500_tickers = get_sp500_tickers()
sp500_tickers = sp500_tickers[:100]
# 2. Create the final DataFrame (Consider reducing 'years' for the full list to speed up)
# Running on all 500 stocks for 3 years will take time.
enhanced_repository_df = create_enhanced_repository(sp500_tickers, years=4)
# 3. Save the result
if not enhanced_repository_df.empty:
filename = "SP500_Enhanced_Data_Repository"
enhanced_repository_df.to_csv(f"{filename}.csv", index=False)
enhanced_repository_df.to_pickle(f"{filename}.pkl")
print(f"\n✅ Data saved to: {filename}")
print("\n--- Sample Data ---")
print(enhanced_repository_df.head())

168
scrap_options_data.py Normal file
View File

@@ -0,0 +1,168 @@
import yfinance as yf
import pandas as pd
import datetime
import os
import numpy as np
import exchange_calendars as xcals
from scipy.stats import norm
from utils import get_nasdaq100_tickers, get_sp500_tickers
from time import sleep
# --- Delta Calculation Function (Black-Scholes-Merton) ---
def bsm_delta(S, K, T, r, sigma, option_type):
"""
Calculates the option delta using the Black-Scholes-Merton model.
S: Current stock price
K: Strike price
T: Time to expiration (in years)
r: Risk-free rate (annual)
sigma: Volatility (annualized, typically Implied Volatility)
option_type: 'call' or 'put'
"""
if T <= 0: # Handle options that have expired
if option_type == "call":
return 1.0 if S > K else 0.0
else: # put
return 0.0 if S < K else -1.0
# Calculate d1
d1 = (np.log(S / K) + (r + 0.5 * sigma**2) * T) / (sigma * np.sqrt(T))
if option_type == "call":
# Delta for a call option is N(d1)
return norm.cdf(d1)
elif option_type == "put":
# Delta for a put option is N(d1) - 1
return norm.cdf(d1) - 1
return np.nan # Should not happen
def is_trading_day(check_date: datetime.date) -> bool:
nyse = xcals.get_calendar("XNYS")
# Check if the date is a valid trading day (excludes weekends and holidays)
is_trading = nyse.is_session(check_date.strftime("%Y-%m-%d"))
return is_trading
if __name__ == "__main__":
# Only run if it is a trading day
if not is_trading_day(datetime.date.today()):
raise UserWarning("Today is not a trading day")
# --- Main Script Modifications ---
# 1. Set the risk-free rate (e.g., current 3-month T-bill rate)
RISK_FREE_RATE = 0.01 # Use a current rate (e.g., 5%)
date_str = datetime.datetime.now().strftime("%Y_%m_%d")
base_folder = os.path.join("data", "options")
if not os.path.isdir(os.path.join(base_folder, date_str)):
os.mkdir(os.path.join(base_folder, date_str))
else:
# only run if we have not grabbed the data yet today
raise UserWarning("We already have the data, no need to get it again")
base_folder = os.path.join(base_folder, date_str)
TODAY = datetime.datetime.now() # Current date/time for T calculation
for ticker_list in [get_sp500_tickers, get_nasdaq100_tickers]:
for ticker_symbol in ticker_list():
filename_start = f"{date_str}_{ticker_symbol}"
# ... (rest of setup)
# Create a Ticker object
ticker = yf.Ticker(ticker_symbol)
# 2. Get the current stock price
try:
stock_info = ticker.info
current_stock_price = stock_info.get("regularMarketPrice")
if current_stock_price is None:
print(f"Could not get current price for {ticker_symbol}. Skipping.")
continue
except Exception as e:
print(f"Error getting stock price for {ticker_symbol}: {e}. Skipping.")
continue
expirations = ticker.options
all_options_data = []
for date_str_exp in expirations:
try:
# Calculate T (Time to Expiration in years)
# Note: yfinance date format is YYYY-MM-DD
expiration_date = datetime.datetime.strptime(
date_str_exp, "%Y-%m-%d"
)
time_to_expiration_days = (expiration_date - TODAY).days
# Use 252 or 365 as convention, 252 for trading days, 365 for calendar days
# 365 is often used for options pricing
T = time_to_expiration_days / 365.0
options_chain = ticker.option_chain(date_str_exp)
calls_df = options_chain.calls
puts_df = options_chain.puts
# ... (Add expiration and option_type columns as before)
calls_df["expiration"] = date_str_exp
puts_df["expiration"] = date_str_exp
calls_df["option_type"] = "call"
puts_df["option_type"] = "put"
# 3. Calculate Delta for Calls
calls_df["delta"] = calls_df.apply(
lambda row: bsm_delta(
S=current_stock_price,
K=row["strike"],
T=T,
r=RISK_FREE_RATE,
sigma=row["impliedVolatility"],
option_type="call",
),
axis=1,
)
# 4. Calculate Delta for Puts
puts_df["delta"] = puts_df.apply(
lambda row: bsm_delta(
S=current_stock_price,
K=row["strike"],
T=T,
r=RISK_FREE_RATE,
sigma=row["impliedVolatility"],
option_type="put",
),
axis=1,
)
all_options_data.append(calls_df)
all_options_data.append(puts_df)
except Exception as e:
print(
f"Could not retrieve or calculate delta for {date_str_exp} on {ticker_symbol}: {e}"
)
# ... (Concatenate and save data as before)
if all_options_data:
full_options_df = pd.concat(all_options_data)
print(
f"\nFull Options Chain for {ticker_symbol} across all expirations (with Delta):"
)
# Display columns relevant to delta calculation
print(
full_options_df[
["strike", "impliedVolatility", "option_type", "delta"]
].head()
)
full_options_df.to_csv(
os.path.join(base_folder, f"{filename_start}.csv")
)
full_options_df.to_pickle(
os.path.join(base_folder, f"{filename_start}.pkl")
)
else:
print(f"No options data retrieved for {ticker_symbol}.")
sleep(0.5)

28
utils.py Normal file
View File

@@ -0,0 +1,28 @@
import requests
import pandas as pd
def get_slickchart_tickers(index: str) -> list[str]:
url = f"https://www.slickcharts.com/{index}"
user_agent = "Mozilla/5.0 (X11; Linux x86_64; rv:109.0) Gecko/20100101 Firefox/111.0" # Default user-agent fails.
response = requests.get(url, headers={"User-Agent": user_agent})
data = pd.read_html(response.text, match="Symbol", index_col="Symbol")[0]
return data.index.to_list()
def get_sp500_tickers() -> list[str]:
try:
return get_slickchart_tickers("sp500")
except Exception as e:
print(f"Error fetching S&P 500 tickers. Using a sample list. Error: {e}")
# Fallback to a sample list if scraping fails
return ["AAPL", "MSFT", "AMZN", "GOOGL", "NVDA"]
def get_nasdaq100_tickers() -> list[str]:
try:
return get_slickchart_tickers("nasdaq100")
except Exception as e:
print(f"Error fetching NASDAQ 100 tickers. Using a sample list. Error: {e}")
# Fallback to a sample list if scraping fails
return ["AAPL", "MSFT", "AMZN", "GOOGL", "NVDA"]