# import psycopg2
import json
import redis
from dhanhq import marketfeed
from dhanhq import dhanhq
from src.dataaggregators.candleAggregator import OHLCBuilder
import os
from dotenv import load_dotenv
from src.connections.cache import r
load_dotenv(".env")
pubsub = r.pubsub()
pubsub.subscribe("subscribe_queue")
client_id = os.getenv("CLIENT_ID")
access_token = os.getenv("ACCESS_TOKEN")
instruments = [(marketfeed.IDX, "13", marketfeed.Ticker),
(marketfeed.MCX, "439487", marketfeed.Ticker),
(marketfeed.NSE_FNO, "44472", marketfeed.Ticker)] # Example: Nifty Index
version = "v2"
dhan = dhanhq(client_id,access_token)
nifty = OHLCBuilder("Nifty",interval_minutes=5)
fir = nifty.get_first_candle()
las = nifty.get_last_candle()
sym = str(44473)
data = marketfeed.DhanFeed(client_id, access_token, instruments, "v2")
def run_feed():
print("[Feed] Starting Dhan feed...")
while True:
data.run_forever()
res = data.get_data()
# print(res)
if res and res.get("LTP"):
tick_data = {
"sec_id": str(res["security_id"]),
"ltp": float(res["LTP"]),
"ltt": str(res["LTT"])
}
nifty.save_to_db(las)
# Publish JSON string
r.publish("ticks", json.dumps(tick_data))
# Check Redis for new instruments
msg = pubsub.get_message()
if msg and msg["type"] == "message":
try:
new_sec_ids = json.loads(msg["data"]) # e.g., [47161, 47162]
sub_instruments = [(marketfeed.NSE_FNO, str(s), marketfeed.Ticker) for s in new_sec_ids]
# Use asyncio to safely subscribe while feed is running
import asyncio
asyncio.ensure_future(data.subscribe_symbols(sub_instruments))
print(f"[Feed] Subscribed new instruments: {sub_instruments}")
except Exception as e:
print("[Feed] Subscription error:", e)
if __name__ == "__main__":
run_feed()
[quote="AJITHKUMAR_P, post:1, topic:54754, full:true"]
# import psycopg2
import json
import redis
from dhanhq import marketfeed
from dhanhq import dhanhq
from src.dataaggregators.candleAggregator import OHLCBuilder
import os
from dotenv import load_dotenv
from src.connections.cache import r
load_dotenv(".env")
pubsub = r.pubsub()
pubsub.subscribe("subscribe_queue")
client_id = os.getenv("CLIENT_ID")
access_token = os.getenv("ACCESS_TOKEN")
instruments = [(marketfeed.IDX, "13", marketfeed.Ticker),
(marketfeed.MCX, "439487", marketfeed.Ticker),
(marketfeed.NSE_FNO, "44472", marketfeed.Ticker)] # Example: Nifty Index
version = "v2"
dhan = dhanhq(client_id,access_token)
nifty = OHLCBuilder("Nifty",interval_minutes=5)
fir = nifty.get_first_candle()
las = nifty.get_last_candle()
sym = str(44473)
data = marketfeed.DhanFeed(client_id, access_token, instruments, "v2")
def run_feed():
print("[Feed] Starting Dhan feed...")
while True:
data.run_forever()
res = data.get_data()
# print(res)
if res and res.get("LTP"):
tick_data = {
"sec_id": str(res["security_id"]),
"ltp": float(res["LTP"]),
"ltt": str(res["LTT"])
}
nifty.save_to_db(las)
# Publish JSON string
r.publish("ticks", json.dumps(tick_data))
# Check Redis for new instruments
msg = pubsub.get_message()
if msg and msg["type"] == "message":
try:
new_sec_ids = json.loads(msg["data"]) # e.g., [47161, 47162]
sub_instruments = [(marketfeed.NSE_FNO, str(s), marketfeed.Ticker) for s in new_sec_ids]
# Use asyncio to safely subscribe while feed is running
import asyncio
asyncio.ensure_future(data.subscribe_symbols(sub_instruments))
print(f"[Feed] Subscribed new instruments: {sub_instruments}")
except Exception as e:
print("[Feed] Subscription error:", e)
if __name__ == "__main__":
run_feed()
you’re probably using the latest websockets library, revert to 13.0.