Files
Example-TCG-Site/store/management/commands/populate_mtg_cards.py
Ryan Westfall 9040021d1b MASSIVE UPDATE:
bounty board feature

buyers to see bounty boards

seller profile page (like have theme chooser)

Have the game and set name be filters.

Add cards to vault manually

update card inventory add to have the autocomplete for the card  -

store analytics, clicks, views, link to store (url/QR code)

bulk item inventory creation --

Make the banner feature flag driven so I can have a beta site setup like the primary site

don't use primary key values in urls - update to use uuid4 values

site analytics. tianji is being sent

item potent on the mtg and lorcana populate scripts

Card item images for specific listings

check that when you buy a card it is in the vault

Buys should be able to search on store inventories

More pie charts for the seller!

post bounty board is slow to load

seller reviews/ratings - show a historgram - need a way for someone to rate

Report a seller feature for buyer to report

Make sure the stlying is consistent based on the theme choosen

smart minimum order quantity and shipping amounts (defined by the store itself)

put virtual packs behind a feature flag like bounty board

proxy service feature flag

Terms of Service

new description for TCGKof

store SSN, ITIN, and EIN

optomize for SEO
2026-01-23 12:28:20 -06:00

232 lines
9.5 KiB
Python

import requests
import sys
from django.core.management.base import BaseCommand
from django.utils.dateparse import parse_date
from store.models import Game, Set, Card
class Command(BaseCommand):
help = 'Populates the database with Magic: The Gathering sets and cards from Scryfall.'
def add_arguments(self, parser):
parser.add_argument(
'--clear',
action='store_true',
help='Clear existing Magic: The Gathering cards and sets before populating.'
)
parser.add_argument(
'--duration',
default='7',
help='Duration in days to look back for new cards/sets. Use "all" to fetch everything. Default is 7 days.'
)
def handle(self, *args, **options):
self.stdout.write(self.style.SUCCESS('Starting MTG population...'))
# 1. Ensure Game exists
game, created = Game.objects.get_or_create(
name="Magic: The Gathering",
defaults={'slug': 'magic-the-gathering'}
)
if created:
self.stdout.write(self.style.SUCCESS(f'Created Game: {game.name}'))
else:
self.stdout.write(f'Found Game: {game.name}')
# Handle --clear
if options['clear']:
self.stdout.write(self.style.WARNING('Clearing existing MTG data...'))
Card.objects.filter(set__game=game).delete()
Set.objects.filter(game=game).delete()
self.stdout.write(self.style.SUCCESS('Cleared MTG data.'))
# Handle --duration
duration = options['duration']
start_date = None
if duration != 'all':
try:
days = int(duration)
from django.utils import timezone
from datetime import timedelta
start_date = timezone.now().date() - timedelta(days=days)
self.stdout.write(f'Fetching data from the last {days} days (since {start_date})...')
except ValueError:
self.stdout.write(self.style.ERROR('Invalid duration. Must be an integer or "all".'))
return
# 2. Fetch Sets
self.stdout.write('Fetching sets from Scryfall...')
response = requests.get('https://api.scryfall.com/sets')
if response.status_code != 200:
self.stdout.write(self.style.ERROR('Failed to fetch sets'))
return
sets_data = response.json().get('data', [])
self.stdout.write(f'Found {len(sets_data)} sets total. Filtering and processing...')
sets_processed = 0
for set_data in sets_data:
release_date = parse_date(set_data.get('released_at')) if set_data.get('released_at') else None
# If start_date is set, skip sets older than start_date
# Note: Scryfall API doesn't allow filtering sets by date in the list endpoint efficienty, so we filter here.
# However, cards might be added to old sets? Scryfall cards usually are released with sets.
# But if we are doing a partial update, we only care about sets released recently?
# Or should we check all sets? The user requirement implies "grabs the data from the past X days".
# Safest is to update all sets (lightweight) or just new ones.
# Let's update all sets if "all", or filter if "duration".
# Actually, updating Sets is fast. Let's just update all sets regardless of duration to be safe,
# unless clearing.
# WAIT, if we only want "latest set", we should filter.
# User said: "If we can go based off of duration then we can do sets too. and only grab that latests set."
# So let's filter sets by date if duration is set.
if start_date and release_date and release_date < start_date:
continue
set_obj, created = Set.objects.update_or_create(
code=set_data.get('code'),
game=game,
defaults={
'name': set_data.get('name'),
'release_date': release_date,
}
)
sets_processed += 1
self.stdout.write(self.style.SUCCESS(f'Processed {sets_processed} sets.'))
# 3. Fetch Cards
if duration == 'all':
self.fetch_bulk_data(game)
else:
self.fetch_recent_cards(game, start_date)
self.stdout.write(self.style.SUCCESS('Finished MTG population!'))
def fetch_bulk_data(self, game):
self.stdout.write('Fetching Bulk Data info (Full Update)...')
response = requests.get('https://api.scryfall.com/bulk-data')
if response.status_code != 200:
self.stdout.write(self.style.ERROR('Failed to fetch bulk data info'))
return
bulk_data_list = response.json().get('data', [])
download_uri = None
for item in bulk_data_list:
if item.get('type') == 'default_cards':
download_uri = item.get('download_uri')
break
if not download_uri:
self.stdout.write(self.style.ERROR('Could not find "default_cards" bulk data.'))
return
self.stdout.write(f'Downloading card data from {download_uri} ...')
with requests.get(download_uri, stream=True) as r:
r.raise_for_status()
try:
cards_data = r.json()
except Exception as e:
self.stdout.write(self.style.ERROR(f'Failed to load JSON: {e}'))
return
self.process_cards(cards_data, game)
def fetch_recent_cards(self, game, start_date):
self.stdout.write(f'Fetching cards released since {start_date}...')
# Use Search API
# date>=YYYY-MM-DD
query = f"date>={start_date.isoformat()}"
url = "https://api.scryfall.com/cards/search"
params = {'q': query, 'order': 'released'}
has_more = True
next_url = url
total_processed = 0
while has_more:
self.stdout.write(f' Requesting: {next_url} (params: {params})')
response = requests.get(next_url, params=params if next_url == url else None)
if response.status_code != 200:
self.stdout.write(self.style.ERROR(f'Failed to search cards: {response.text}'))
return
data = response.json()
cards_data = data.get('data', [])
self.process_cards(cards_data, game, verbose=False)
total_processed += len(cards_data)
has_more = data.get('has_more', False)
next_url = data.get('next_page')
# Scryfall requests being nice
import time
time.sleep(0.1)
self.stdout.write(f'fetched {total_processed} cards via search.')
def process_cards(self, cards_data, game, verbose=True):
if verbose:
self.stdout.write(f'Processing {len(cards_data)} cards...')
# Cache sets
sets_map = {s.code: s for s in Set.objects.filter(game=game)}
count = 0
for card_data in cards_data:
set_code = card_data.get('set')
if set_code not in sets_map:
# If we filtered sets by date, we might miss the set for this card if the card is newer than the set release?
# (e.g. late additions).
# Or if we have a partial set update but the card belongs to an old set (reprints in new product?)
# If set is not in DB, we skip or fetch it?
# Given we updated sets based on duration, if the set isn't there, we probably shouldn't add the card
# OR we should lazily create the set.
# For safety, let's try to get the set from DB again or skip.
# If we skipped the set because of date, we probably shouldn't double guess ourselves.
continue
set_obj = sets_map[set_code]
# Extract Image URL
image_url = ''
if 'image_uris' in card_data and 'normal' in card_data['image_uris']:
image_url = card_data['image_uris']['normal']
elif 'card_faces' in card_data and card_data['card_faces'] and 'image_uris' in card_data['card_faces'][0]:
if 'normal' in card_data['card_faces'][0]['image_uris']:
image_url = card_data['card_faces'][0]['image_uris']['normal']
# TCGPlayer ID
tcgplayer_id = card_data.get('tcgplayer_id')
# Scryfall ID
scryfall_id = card_data.get('id')
# External URL (Scryfall URI)
external_url = card_data.get('scryfall_uri', '')
# Collector Number
collector_number = card_data.get('collector_number', '')
Card.objects.update_or_create(
scryfall_id=scryfall_id,
defaults={
'set': set_obj,
'name': card_data.get('name'),
'rarity': card_data.get('rarity'),
'image_url': image_url,
'tcgplayer_id': tcgplayer_id,
'collector_number': collector_number,
'external_url': external_url,
}
)
count += 1
if verbose and count % 1000 == 0:
self.stdout.write(f' Processed {count}...')
if verbose:
self.stdout.write(f'Batch processed {count} cards.')