despite may tries and corrections the apis still runs 400. all I want is to download the options chain data for all the FnO instruments and expiries to perform analysis. and store it in a db.
here is the script:
#!/usr/bin/env python3
"""
dhan_option_chain.py
Version: 1.3
Purpose:
- Download instrument master CSV once per day (cached in DB).
- Detect NSE option underlyings (OPTIDX, OPTSTK).
- Fetch expiry list and full option chain for each underlying.
- Store all data in sqlite (append-only), compute intrinsic/time/oi-change/change%/PCR/POP estimate.
- Enforce consistent 4-space indentation (no tabs) to avoid Pylance/IndentationError.
"""
__version__ = "1.3"
import argparse
import csv
import io
import json
import logging
import os
import sqlite3
import sys
import time
from typing import Any, Dict, Iterable, List, Optional
import requests
from datetime import datetime, date
# ---------- Config ----------
INSTRUMENT_MASTER_URL = "https://images.dhan.co/api-data/api-scrip-master-detailed.csv"
DHAN_BASE = "https://api.dhan.co/v2"
DB_PATH = os.environ.get("DHAN_DB_PATH", "dhan_option_chain.db")
LOG_FILE = os.environ.get("DHAN_LOG_FILE", "dhan_option_chain.log")
OPTIONCHAIN_RATE_LIMIT_SECONDS = 3 # sleep between option-chain calls
# ---------- Logging ----------
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s %(message)s",
handlers=[logging.FileHandler(LOG_FILE), logging.StreamHandler(sys.stdout)],
)
logger = logging.getLogger("dhan_option_chain")
# ---------- Utilities ----------
def now_iso() -> str:
return datetime.utcnow().isoformat()
def today_iso() -> str:
return date.today().isoformat()
def parse_int(v: Any) -> Optional[int]:
try:
if v is None or v == "":
return None
return int(float(v))
except Exception:
return None
def parse_float(v: Any) -> Optional[float]:
try:
if v is None or v == "":
return None
return float(v)
except Exception:
return None
def pick(d: Optional[dict], *keys: str):
if not d:
return None
for k in keys:
if k in d and d[k] not in (None, ""):
return d[k]
return None
def compute_change_percent(last_price: Optional[float], prev_close: Optional[float]) -> Optional[float]:
try:
if last_price is None or prev_close is None or prev_close == 0:
return None
return (last_price - prev_close) / prev_close * 100.0
except Exception:
return None
def estimate_pop_from_delta(delta: Optional[float]) -> Optional[float]:
try:
if delta is None:
return None
return max(0.0, (1.0 - abs(float(delta))) * 100.0)
except Exception:
return None
def compute_intrinsic_and_time(option_last: Optional[float], strike: Optional[float],
spot: Optional[float], side: str, fut_price: Optional[float] = None):
intrinsic_spot = None
intrinsic_fut = None
time_value = None
try:
if spot is not None and strike is not None:
if side == "CE":
intrinsic_spot = max(0.0, spot - strike)
else:
intrinsic_spot = max(0.0, strike - spot)
if fut_price is not None and strike is not None:
if side == "CE":
intrinsic_fut = max(0.0, fut_price - strike)
else:
intrinsic_fut = max(0.0, strike - fut_price)
if intrinsic_spot is not None and option_last is not None:
tv = option_last - intrinsic_spot
time_value = max(0.0, tv)
except Exception:
intrinsic_spot = intrinsic_fut = time_value = None
return intrinsic_spot, intrinsic_fut, time_value
# ---------- conf.py loader ----------
def load_conf() -> Dict[str, str]:
"""Load conf.py if present and extract client_id & access_token."""
cfg: Dict[str, str] = {}
try:
import importlib
conf = importlib.import_module("conf")
except Exception as e:
logger.error("Could not import conf.py: %s", e)
return cfg
for key in ("CLIENT_ID", "client_id", "CLIENTID", "clientId"):
if hasattr(conf, key):
cfg["client_id"] = getattr(conf, key)
break
for key in ("ACCESS_TOKEN", "access_token", "ACCESS_TOKEN_JWT", "token"):
if hasattr(conf, key):
cfg["access_token"] = getattr(conf, key)
break
return cfg
# ---------- DB Schema ----------
CREATE_OPTION_STRIKE_SQL = """
CREATE TABLE IF NOT EXISTS option_strike_snapshot (
id INTEGER PRIMARY KEY AUTOINCREMENT,
chain_snapshot_id INTEGER,
strike REAL,
call_ltp REAL,
call_time_value REAL,
call_intrinsic_spot REAL,
call_intrinsic_fut REAL,
call_bid_price REAL,
call_offer_price REAL,
call_oi INTEGER,
call_oi_change INTEGER,
call_change_percent REAL,
call_volume INTEGER,
call_pop_percent REAL,
call_delta REAL,
call_theta REAL,
call_vega REAL,
call_gamma REAL,
call_iv REAL,
put_ltp REAL,
put_time_value REAL,
put_intrinsic_spot REAL,
put_intrinsic_fut REAL,
put_bid_price REAL,
put_offer_price REAL,
put_oi INTEGER,
put_oi_change INTEGER,
put_change_percent REAL,
put_volume INTEGER,
put_pop_percent REAL,
put_delta REAL,
put_theta REAL,
put_vega REAL,
put_gamma REAL,
put_iv REAL
);
"""
def create_schema(conn: sqlite3.Connection) -> None:
cur = conn.cursor()
cur.execute("""
CREATE TABLE IF NOT EXISTS instrument_master_snapshot (
id INTEGER PRIMARY KEY AUTOINCREMENT,
fetched_date TEXT,
security_id INTEGER,
raw_json TEXT
);
""")
cur.execute("""
CREATE TABLE IF NOT EXISTS expiry_list_snapshot (
id INTEGER PRIMARY KEY AUTOINCREMENT,
fetched_at TEXT,
security_id INTEGER,
exchange_segment TEXT,
expiry_date TEXT
);
""")
cur.execute("""
CREATE TABLE IF NOT EXISTS option_chain_snapshot (
id INTEGER PRIMARY KEY AUTOINCREMENT,
fetched_at TEXT,
security_id INTEGER,
exchange_segment TEXT,
expiry_date TEXT,
underlying_ltp REAL
);
""")
cur.execute(CREATE_OPTION_STRIKE_SQL)
cur.execute("""
CREATE TABLE IF NOT EXISTS expiry_metrics (
id INTEGER PRIMARY KEY AUTOINCREMENT,
chain_snapshot_id INTEGER,
total_put_oi INTEGER,
total_call_oi INTEGER,
pcr REAL
);
""")
conn.commit()
# ---------- Instrument master: daily cache ----------
def fetch_instrument_master_csv() -> List[Dict[str, str]]:
logger.info("Downloading instrument master CSV from %s", INSTRUMENT_MASTER_URL)
r = requests.get(INSTRUMENT_MASTER_URL, timeout=60)
r.raise_for_status()
text = r.content.decode("utf-8", errors="replace")
reader = csv.DictReader(io.StringIO(text))
rows: List[Dict[str, str]] = []
for row in reader:
# normalize keys to uppercase for stable access
clean = {k.strip().upper(): (v.strip() if v is not None else "") for k, v in row.items()}
rows.append(clean)
logger.info("Fetched %d instrument rows", len(rows))
return rows
def load_instrument_master(conn: sqlite3.Connection, force_refresh: bool = False) -> List[Dict[str, str]]:
cur = conn.cursor()
today = today_iso()
cur.execute("SELECT COUNT(*) FROM instrument_master_snapshot WHERE fetched_date = ?", (today,))
count = cur.fetchone()[0]
if count > 0 and not force_refresh:
logger.info("Using cached instrument master for %s (%d rows)", today, count)
cur.execute("SELECT raw_json FROM instrument_master_snapshot WHERE fetched_date = ?", (today,))
rows = [json.loads(r[0]) for r in cur.fetchall()]
return rows
# fetch fresh
rows = fetch_instrument_master_csv()
# remove today's rows if any (start clean)
cur.execute("DELETE FROM instrument_master_snapshot WHERE fetched_date = ?", (today,))
for r in rows:
secid = parse_int(r.get("SECURITY_ID"))
cur.execute("INSERT INTO instrument_master_snapshot (fetched_date, security_id, raw_json) VALUES (?, ?, ?)",
(today, secid, json.dumps(r)))
conn.commit()
logger.info("Stored instrument master snapshot for %s (%d rows)", today, len(rows))
return rows
# ---------- Underlying selection ----------
def collect_nse_option_underlyings(rows: Iterable[Dict[str, str]]) -> List[int]:
uids = set()
for r in rows:
try:
exch = (r.get("EXCH_ID") or r.get("EXCHANGE") or "").strip().upper()
# CSV uses INSTRUMENT (per your header) that contains OPTIDX / OPTSTK
inst = (r.get("INSTRUMENT") or r.get("INSTRUMENT_TYPE") or "").strip().upper()
if exch != "NSE":
continue
if inst not in ("OPTIDX", "OPTSTK"):
continue
uid = parse_int(r.get("UNDERLYING_SECURITY_ID") or r.get("UNDERLYING_SECURITY"))
if uid and uid > 0:
uids.add(uid)
except Exception:
continue
ulist = sorted(list(uids))
logger.info("Collected %d unique NSE option underlying security ids", len(ulist))
return ulist
def find_fut_for_underlying(rows: Iterable[Dict[str, str]], underlying_id: int) -> Optional[int]:
for r in rows:
try:
itype = (r.get("INSTRUMENT") or r.get("INSTRUMENT_TYPE") or "").strip().upper()
if not itype.startswith("FUT"):
continue
uid = parse_int(r.get("UNDERLYING_SECURITY_ID") or r.get("UNDERLYING_SECURITY"))
if uid != underlying_id:
continue
secid = parse_int(r.get("SECURITY_ID"))
if secid and secid > 0:
return secid
except Exception:
continue
return None
# ---------- Dhan client ----------
class DhanClient:
def __init__(self, client_id: str, access_token: str, base: str = DHAN_BASE):
self.base = base.rstrip('/')
self.s = requests.Session()
self.s.headers.update({
"access-token": str(access_token),
"client-id": str(client_id),
"Content-Type": "application/json"
})
def post(self, path: str, payload: dict, retries: int = 3, backoff: float = 1.0) -> Any:
url = f"{self.base}{path}"
attempt = 0
while attempt < retries:
try:
r = self.s.post(url, json=payload, timeout=30)
r.raise_for_status()
return r.json()
except Exception as e:
attempt += 1
logger.warning("POST %s attempt %d/%d error: %s", url, attempt, retries, e)
if attempt < retries:
time.sleep(backoff * (2 ** (attempt-1)))
else:
logger.exception("POST %s final failure", url)
raise
# ---------- Option chain parsing ----------
def parse_option_chain_data(data: Any):
"""Yield normalized entries: {'strike': float, 'ce': dict, 'pe': dict}"""
if not data:
return
# common shape: data['oc'] mapping
if isinstance(data, dict) and 'oc' in data and isinstance(data['oc'], dict):
for sk, obj in data['oc'].items():
try:
strike = float(sk)
except Exception:
try:
strike = float(str(sk).replace(',', ''))
except Exception:
continue
ce = obj.get('ce') or {}
pe = obj.get('pe') or {}
yield {'strike': strike, 'ce': ce, 'pe': pe}
return
# shape: list under common keys
candidate_lists = []
if isinstance(data, dict):
for key in ('optionChain', 'records', 'data', 'result'):
v = data.get(key)
if isinstance(v, list):
candidate_lists.append(v)
if isinstance(data, list):
candidate_lists.append(data)
for lst in candidate_lists:
for row in lst:
strike_raw = pick(row, 'strikePrice', 'strike_price', 'strike')
strike = None
try:
if strike_raw is not None:
strike = float(strike_raw)
except Exception:
strike = None
ce = pick(row, 'CE', 'ce') or row.get('CE') or row.get('ce') or {}
pe = pick(row, 'PE', 'pe') or row.get('PE') or row.get('pe') or {}
yield {'strike': strike, 'ce': ce or {}, 'pe': pe or {}}
return
# fallback: top-level numeric keys
if isinstance(data, dict):
for k, v in data.items():
try:
sk = float(k)
except Exception:
continue
obj = v or {}
ce = obj.get('ce') or obj.get('CE') or {}
pe = obj.get('pe') or obj.get('PE') or {}
yield {'strike': sk, 'ce': ce, 'pe': pe}
# ---------- Store helpers ----------
def store_expiry_list(conn: sqlite3.Connection, security_id: int, exchange_segment: str, expiries: List[str]) -> None:
cur = conn.cursor()
fetched_at = now_iso()
for ex in expiries:
cur.execute("INSERT INTO expiry_list_snapshot (fetched_at, security_id, exchange_segment, expiry_date) VALUES (?,?,?,?)",
(fetched_at, security_id, exchange_segment, ex))
conn.commit()
def store_chain_and_strikes(conn: sqlite3.Connection, security_id: int, exchange_segment: str, expiry: str,
data: Any, fut_price: Optional[float] = None) -> None:
cur = conn.cursor()
fetched_at = now_iso()
underlying_ltp = parse_float(pick(data, 'last_price', 'lastPrice', 'last_price'))
cur.execute("INSERT INTO option_chain_snapshot (fetched_at, security_id, exchange_segment, expiry_date, underlying_ltp) VALUES (?,?,?,?,?)",
(fetched_at, security_id, exchange_segment, expiry, underlying_ltp))
chain_snapshot_id = cur.lastrowid
entries = list(parse_option_chain_data(data))
total_put_oi = 0
total_call_oi = 0
for ent in entries:
strike = ent.get('strike')
ce = ent.get('ce') or {}
pe = ent.get('pe') or {}
# CALL values
call_ltp = parse_float(pick(ce, 'last_price', 'lastPrice', 'ltp', 'lastPrice'))
call_oi = parse_int(pick(ce, 'oi', 'openInterest', 'open_interest'))
call_prev_oi = parse_int(pick(ce, 'previous_oi', 'previousOi', 'previous_oi'))
call_prev_close = parse_float(pick(ce, 'previous_close_price', 'previousClose', 'previous_close'))
call_bid = parse_float(pick(ce, 'top_bid_price', 'topBidPrice', 'bidprice', 'bid_price', 'bid'))
call_offer = parse_float(pick(ce, 'top_ask_price', 'topAskPrice', 'askPrice', 'ask_price', 'ask'))
call_vol = parse_int(pick(ce, 'volume', 'totalTradedVolume', 'total_traded_volume'))
call_iv = parse_float(pick(ce, 'implied_volatility', 'impliedVolatility', 'implied_vol'))
greeks_c = pick(ce, 'greeks') or {}
call_delta = parse_float(pick(greeks_c, 'delta') or pick(ce, 'delta'))
call_theta = parse_float(pick(greeks_c, 'theta') or pick(ce, 'theta'))
call_gamma = parse_float(pick(greeks_c, 'gamma') or pick(ce, 'gamma'))
call_vega = parse_float(pick(greeks_c, 'vega') or pick(ce, 'vega'))
call_oi_change = None
if call_oi is not None and call_prev_oi is not None:
try:
call_oi_change = int(call_oi - call_prev_oi)
except Exception:
call_oi_change = None
call_change_pct = compute_change_percent(call_ltp, call_prev_close)
call_pop = estimate_pop_from_delta(call_delta)
if call_oi:
try:
total_call_oi += int(call_oi)
except Exception:
pass
call_intrinsic_spot, call_intrinsic_fut, call_time_value = compute_intrinsic_and_time(call_ltp, float(strike) if strike is not None else None, underlying_ltp, 'CE', fut_price)
# PUT values
put_ltp = parse_float(pick(pe, 'last_price', 'lastPrice', 'ltp', 'lastPrice'))
put_oi = parse_int(pick(pe, 'oi', 'openInterest', 'open_interest'))
put_prev_oi = parse_int(pick(pe, 'previous_oi', 'previousOi', 'previous_oi'))
put_prev_close = parse_float(pick(pe, 'previous_close_price', 'previousClose', 'previous_close'))
put_bid = parse_float(pick(pe, 'top_bid_price', 'topBidPrice', 'bidprice', 'bid_price', 'bid'))
put_offer = parse_float(pick(pe, 'top_ask_price', 'topAskPrice', 'askPrice', 'ask_price', 'ask'))
put_vol = parse_int(pick(pe, 'volume', 'totalTradedVolume', 'total_traded_volume'))
put_iv = parse_float(pick(pe, 'implied_volatility', 'impliedVolatility', 'implied_vol'))
greeks_p = pick(pe, 'greeks') or {}
put_delta = parse_float(pick(greeks_p, 'delta') or pick(pe, 'delta'))
put_theta = parse_float(pick(greeks_p, 'theta') or pick(pe, 'theta'))
put_gamma = parse_float(pick(greeks_p, 'gamma') or pick(pe, 'gamma'))
put_vega = parse_float(pick(greeks_p, 'vega') or pick(pe, 'vega'))
put_oi_change = None
if put_oi is not None and put_prev_oi is not None:
try:
put_oi_change = int(put_oi - put_prev_oi)
except Exception:
put_oi_change = None
put_change_pct = compute_change_percent(put_ltp, put_prev_close)
put_pop = estimate_pop_from_delta(put_delta)
if put_oi:
try:
total_put_oi += int(put_oi)
except Exception:
pass
put_intrinsic_spot, put_intrinsic_fut, put_time_value = compute_intrinsic_and_time(put_ltp, float(strike) if strike is not None else None, underlying_ltp, 'PE', fut_price)
# insert row
cur.execute("""
INSERT INTO option_strike_snapshot (
chain_snapshot_id, strike,
call_ltp, call_time_value, call_intrinsic_spot, call_intrinsic_fut, call_bid_price, call_offer_price,
call_oi, call_oi_change, call_change_percent, call_volume, call_pop_percent, call_delta, call_theta,
call_vega, call_gamma, call_iv,
put_ltp, put_time_value, put_intrinsic_spot, put_intrinsic_fut, put_bid_price, put_offer_price,
put_oi, put_oi_change, put_change_percent, put_volume, put_pop_percent, put_delta, put_theta, put_vega, put_gamma, put_iv
) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)
""", (
chain_snapshot_id, strike,
call_ltp, call_time_value, call_intrinsic_spot, call_intrinsic_fut, call_bid, call_offer,
call_oi, call_oi_change, call_change_pct, call_vol, call_pop, call_delta, call_theta,
call_vega, call_gamma, call_iv,
put_ltp, put_time_value, put_intrinsic_spot, put_intrinsic_fut, put_bid, put_offer,
put_oi, put_oi_change, put_change_pct, put_vol, put_pop, put_delta, put_theta, put_vega, put_gamma, put_iv
))
# expiry metrics
try:
pcr = None
if total_call_oi and total_call_oi != 0:
pcr = total_put_oi / float(total_call_oi)
except Exception:
pcr = None
cur.execute("INSERT INTO expiry_metrics (chain_snapshot_id, total_put_oi, total_call_oi, pcr) VALUES (?,?,?,?)",
(chain_snapshot_id, total_put_oi, total_call_oi, pcr))
conn.commit()
# ---------- Main orchestration ----------
def run(force_refresh: bool = False, limit: Optional[int] = None) -> None:
started_at = now_iso()
logger.info("dhan_option_chain.py v%s starting at %s", __version__, started_at)
cfg = load_conf()
client_id = cfg.get("client_id")
access_token = cfg.get("access_token")
if not client_id or not access_token:
logger.error("Missing CLIENT_ID/ACCESS_TOKEN in conf.py; please create conf.py next to this script.")
return
conn = sqlite3.connect(DB_PATH)
create_schema(conn)
rows = load_instrument_master(conn, force_refresh=force_refresh)
if not rows:
logger.error("No instrument master rows available. Exiting.")
return
underlyings = collect_nse_option_underlyings(rows)
if not underlyings:
logger.warning("No NSE option underlyings found. Exiting.")
return
if limit and isinstance(limit, int) and limit > 0:
underlyings = underlyings[:limit]
logger.info("Limiting to first %d underlyings for this run", limit)
dh = DhanClient(client_id, access_token)
total = len(underlyings)
# Map instrument id to instrument for expirylist payload
instruments_by_id = {}
for r in rows:
secid = parse_int(r.get("SECURITY_ID"))
if secid:
instruments_by_id[secid] = r
for idx, underlying in enumerate(underlyings, start=1):
logger.info("(%d/%d) Processing underlying %s", idx, total, underlying)
# determine underlying segment from the instrument master row (ANNEXURE values like 'NSE_FNO' or 'IDX_I')
underlying_row = instruments_by_id.get(int(underlying), {})
underlying_seg = underlying_row.get("SEGMENT") or underlying_row.get("UNDERLYING_SEGMENT") or "NSE_FNO"
# fetch expiry list using Dhan v2 parameter names: UnderlyingScrip, UnderlyingSeg
try:
expiry_resp = dh.post(
"/optionchain/expirylist",
{
"UnderlyingScrip": int(underlying),
"UnderlyingSeg": underlying_seg
}
)
except Exception as e:
logger.exception("Failed to fetch expiry list for %s: %s", underlying, e)
continue
# extract expiry list robustly (data may be under 'data' as list)
expiries = None
if isinstance(expiry_resp, dict):
expiries = expiry_resp.get("data") or expiry_resp.get("expiryList") or expiry_resp.get("expiries")
else:
expiries = expiry_resp
# if dict-of-lists flatten
if isinstance(expiries, dict):
tmp = []
for v in expiries.values():
if isinstance(v, list):
tmp.extend(v)
expiries = tmp
if not expiries:
logger.warning("No expiries returned for %s", underlying)
continue
logger.info("Found %d expiries for %s", len(expiries), underlying)
# store expiry list with the actual segment used
store_expiry_list(conn, underlying, underlying_seg, expiries)
# attempt to find FUT secid and fut price for intrinsic_fut calculation
fut_secid = find_fut_for_underlying(rows, underlying)
fut_price = None
if fut_secid:
try:
resp = dh.post("/marketfeed/ltp", {"NSE_FNO": [int(fut_secid)]})
data = resp.get("data") if isinstance(resp, dict) else resp
secmap = data.get("NSE_FNO", {}) if isinstance(data, dict) else {}
secinfo = secmap.get(str(fut_secid)) or secmap.get(fut_secid)
if secinfo:
fut_price = parse_float(pick(secinfo, "last_price", "lastPrice", "last_price"))
logger.info("Fetched FUT price for underlying %s secid %s -> %s", underlying, fut_secid, fut_price)
except Exception:
logger.debug("Could not fetch FUT LTP for secid %s", fut_secid)
# for each expiry fetch optionchain
for expiry in expiries:
logger.info("Fetching option chain for %s expiry %s", underlying, expiry)
try:
oc_resp = dh.post(
"/optionchain",
{
"UnderlyingScrip": int(underlying),
"UnderlyingSeg": underlying_seg,
"Expiry": expiry
}
)
except Exception as e:
logger.exception("Failed optionchain for %s %s: %s", underlying, expiry, e)
continue
data = None
if isinstance(oc_resp, dict):
data = pick(oc_resp, "data") or oc_resp.get("data") or oc_resp
else:
data = oc_resp
if not data:
logger.warning("No option chain data for %s %s", underlying, expiry)
continue
# store chain + strikes
try:
store_chain_and_strikes(conn, underlying, underlying_seg, expiry, data, fut_price)
logger.info("Stored option chain for %s expiry %s", underlying, expiry)
except Exception as e:
logger.exception("Error storing chain/strikes for %s %s: %s", underlying, expiry, e)
# rate-limit
time.sleep(OPTIONCHAIN_RATE_LIMIT_SECONDS)
conn.close()
logger.info("Run completed at %s", now_iso())
# ---------- CLI ----------
def main():
parser = argparse.ArgumentParser(description="Fetch and store Dhan NSE option chains.")
parser.add_argument("--force-refresh", action="store_true", help="Force re-download instrument master CSV for today")
parser.add_argument("--limit", type=int, default=0, help="Limit the number of underlyings processed (for testing)")
args = parser.parse_args()
run(force_refresh=args.force_refresh, limit=(args.limit if args.limit > 0 else None))
if __name__ == "__main__":
main()
any thoughts?