Subscription error: 'ClientConnection' object has no attribute 'closed'

# import psycopg2
import json
import redis
from dhanhq import marketfeed
from dhanhq import dhanhq
from src.dataaggregators.candleAggregator import OHLCBuilder
import os
from dotenv import load_dotenv
from src.connections.cache import r
load_dotenv(".env")

pubsub = r.pubsub()
pubsub.subscribe("subscribe_queue")
client_id = os.getenv("CLIENT_ID")
access_token = os.getenv("ACCESS_TOKEN")





instruments = [(marketfeed.IDX, "13", marketfeed.Ticker),
               (marketfeed.MCX, "439487", marketfeed.Ticker),
               (marketfeed.NSE_FNO, "44472", marketfeed.Ticker)]  # Example: Nifty Index
version = "v2"
dhan = dhanhq(client_id,access_token)

nifty = OHLCBuilder("Nifty",interval_minutes=5)
fir = nifty.get_first_candle()
las = nifty.get_last_candle()
sym = str(44473)
data = marketfeed.DhanFeed(client_id, access_token, instruments, "v2")

def run_feed():
    print("[Feed] Starting Dhan feed...")    
    while True:
        data.run_forever()
        res = data.get_data()
        # print(res)
        if res and res.get("LTP"):
            tick_data = {
            "sec_id": str(res["security_id"]),
            "ltp": float(res["LTP"]),
            "ltt": str(res["LTT"])
        }
        nifty.save_to_db(las)
        
        # Publish JSON string
        r.publish("ticks", json.dumps(tick_data))

         # Check Redis for new instruments
        msg = pubsub.get_message()
        if msg and msg["type"] == "message":
            try:
                new_sec_ids = json.loads(msg["data"])  # e.g., [47161, 47162]
                sub_instruments = [(marketfeed.NSE_FNO, str(s), marketfeed.Ticker) for s in new_sec_ids]

                # Use asyncio to safely subscribe while feed is running
                import asyncio
                asyncio.ensure_future(data.subscribe_symbols(sub_instruments))

                print(f"[Feed] Subscribed new instruments: {sub_instruments}")
            except Exception as e:
                print("[Feed] Subscription error:", e)




if __name__ == "__main__":
    run_feed()

[quote="AJITHKUMAR_P, post:1, topic:54754, full:true"]
# import psycopg2
import json
import redis
from dhanhq import marketfeed
from dhanhq import dhanhq
from src.dataaggregators.candleAggregator import OHLCBuilder
import os
from dotenv import load_dotenv
from src.connections.cache import r
load_dotenv(".env")

pubsub = r.pubsub()
pubsub.subscribe("subscribe_queue")
client_id = os.getenv("CLIENT_ID")
access_token = os.getenv("ACCESS_TOKEN")





instruments = [(marketfeed.IDX, "13", marketfeed.Ticker),
               (marketfeed.MCX, "439487", marketfeed.Ticker),
               (marketfeed.NSE_FNO, "44472", marketfeed.Ticker)]  # Example: Nifty Index
version = "v2"
dhan = dhanhq(client_id,access_token)

nifty = OHLCBuilder("Nifty",interval_minutes=5)
fir = nifty.get_first_candle()
las = nifty.get_last_candle()
sym = str(44473)
data = marketfeed.DhanFeed(client_id, access_token, instruments, "v2")

def run_feed():
    print("[Feed] Starting Dhan feed...")    
    while True:
        data.run_forever()
        res = data.get_data()
        # print(res)
        if res and res.get("LTP"):
            tick_data = {
            "sec_id": str(res["security_id"]),
            "ltp": float(res["LTP"]),
            "ltt": str(res["LTT"])
        }
        nifty.save_to_db(las)
        
        # Publish JSON string
        r.publish("ticks", json.dumps(tick_data))

         # Check Redis for new instruments
        msg = pubsub.get_message()
        if msg and msg["type"] == "message":
            try:
                new_sec_ids = json.loads(msg["data"])  # e.g., [47161, 47162]
                sub_instruments = [(marketfeed.NSE_FNO, str(s), marketfeed.Ticker) for s in new_sec_ids]

                # Use asyncio to safely subscribe while feed is running
                import asyncio
                asyncio.ensure_future(data.subscribe_symbols(sub_instruments))

                print(f"[Feed] Subscribed new instruments: {sub_instruments}")
            except Exception as e:
                print("[Feed] Subscription error:", e)




if __name__ == "__main__":
    run_feed()


you’re probably using the latest websockets library, revert to 13.0.