#!/usr/bin/env python3
"""
Service IA Temps Réel - Analyse en streaming via WebSockets Binance
Utilise le GPU pour des prédictions instantanées à chaque nouvelle bougie
"""

import json
import threading
import time
import logging
import asyncio
import websockets
from datetime import datetime
from typing import Dict, List, Callable, Optional
from collections import deque
import os

# Configuration du logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("AI_Realtime")

# Import du prédicteur IA
try:
    from ai_predictor import AIPredictor, get_ai_predictor
    AI_AVAILABLE = True
except ImportError:
    AI_AVAILABLE = False
    logger.warning("Module AI Predictor non disponible")

# Chemin du script
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))


class RealtimeKlineBuffer:
    """Buffer de klines en temps réel pour un symbole"""
    
    def __init__(self, symbol: str, max_size: int = 100):
        self.symbol = symbol
        self.klines = deque(maxlen=max_size)
        self.current_kline = None
        self.last_update = None
    
    def update(self, kline_data: dict):
        """Met à jour avec une nouvelle kline WebSocket"""
        k = kline_data['k']
        
        kline = {
            'open_time': k['t'],
            'open': float(k['o']),
            'high': float(k['h']),
            'low': float(k['l']),
            'close': float(k['c']),
            'volume': float(k['v']),
            'close_time': k['T'],
            'is_closed': k['x']
        }
        
        if k['x']:  # Bougie fermée
            self.klines.append(kline)
            self.current_kline = None
        else:
            self.current_kline = kline
        
        self.last_update = datetime.now()
    
    def get_prices(self) -> List[float]:
        """Retourne les prix de clôture"""
        prices = [k['close'] for k in self.klines]
        if self.current_kline:
            prices.append(self.current_kline['close'])
        return prices
    
    def get_volumes(self) -> List[float]:
        """Retourne les volumes"""
        volumes = [k['volume'] for k in self.klines]
        if self.current_kline:
            volumes.append(self.current_kline['volume'])
        return volumes


class AIRealtimeService:
    """
    Service d'analyse IA en temps réel via WebSockets Binance
    
    Fonctionnement:
    1. Connexion WebSocket aux streams kline de tous les symboles
    2. À chaque nouvelle bougie fermée → Analyse IA GPU
    3. Mise à jour instantanée des opportunités
    4. Notification callback si signal détecté
    """
    
    def __init__(self):
        self.predictor = get_ai_predictor() if AI_AVAILABLE else None
        self.symbols: List[str] = []
        self.buffers: Dict[str, RealtimeKlineBuffer] = {}
        self.is_running = False
        self.ws_task = None
        self.loop = None
        self._thread = None
        
        # Callbacks
        self.on_signal_callback: Optional[Callable] = None
        self.on_update_callback: Optional[Callable] = None
        
        # Configuration
        self.interval = "5m"  # Intervalle des bougies
        self.analysis_on_close = True  # Analyser quand bougie se ferme
        self.analysis_on_tick = False  # Analyser à chaque tick (plus intensif)
        self.min_analysis_interval = 1.0  # Secondes min entre analyses
        
        # Stats temps réel
        self.stats = {
            'connected': False,
            'symbols_streaming': 0,
            'analyses_per_minute': 0,
            'last_analysis': None,
            'signals_detected': 0,
            'ws_messages_received': 0
        }
        self._analysis_times = deque(maxlen=60)
        
        # Lock pour thread-safety
        self._lock = threading.Lock()
    
    def set_symbols(self, symbols: List[str]):
        """Définit les symboles à surveiller"""
        self.symbols = [s.upper() for s in symbols]
        self.buffers = {s: RealtimeKlineBuffer(s) for s in self.symbols}
        logger.info(f"📊 {len(symbols)} symboles configurés pour streaming")
    
    def set_on_signal(self, callback: Callable):
        """Callback appelé quand un signal d'achat est détecté"""
        self.on_signal_callback = callback
    
    def set_on_update(self, callback: Callable):
        """Callback appelé à chaque mise à jour"""
        self.on_update_callback = callback
    
    async def _fetch_initial_klines(self, symbol: str):
        """Récupère les klines historiques pour initialiser le buffer"""
        import aiohttp
        url = f"https://api.binance.com/api/v3/klines"
        params = {"symbol": symbol, "interval": self.interval, "limit": 100}
        
        try:
            async with aiohttp.ClientSession() as session:
                async with session.get(url, params=params) as response:
                    if response.status == 200:
                        klines = await response.json()
                        buffer = self.buffers.get(symbol)
                        if buffer:
                            for k in klines[:-1]:  # Toutes sauf la dernière (en cours)
                                buffer.klines.append({
                                    'open_time': k[0],
                                    'open': float(k[1]),
                                    'high': float(k[2]),
                                    'low': float(k[3]),
                                    'close': float(k[4]),
                                    'volume': float(k[5]),
                                    'close_time': k[6],
                                    'is_closed': True
                                })
                        logger.debug(f"✓ {symbol}: {len(buffer.klines)} klines chargées")
        except Exception as e:
            logger.warning(f"Erreur chargement klines {symbol}: {e}")
    
    async def _ws_handler(self):
        """Gère la connexion WebSocket combinée"""
        if not self.symbols:
            logger.warning("Aucun symbole configuré")
            return
        
        # Charger les données historiques d'abord
        logger.info("📥 Chargement des données historiques...")
        tasks = [self._fetch_initial_klines(s) for s in self.symbols]
        await asyncio.gather(*tasks, return_exceptions=True)
        logger.info(f"✅ Données historiques chargées pour {len(self.symbols)} symboles")
        
        # Construire le stream combiné (max ~200 streams par connexion)
        streams = [f"{s.lower()}@kline_{self.interval}" for s in self.symbols]
        
        # Diviser en chunks si trop de symboles
        chunk_size = 200
        stream_chunks = [streams[i:i+chunk_size] for i in range(0, len(streams), chunk_size)]
        
        while self.is_running:
            try:
                # Connexion au stream combiné
                ws_url = f"wss://stream.binance.com:9443/stream?streams={'/'.join(stream_chunks[0])}"
                
                logger.info(f"🔌 Connexion WebSocket Binance...")
                
                async with websockets.connect(ws_url, ping_interval=20, ping_timeout=10) as ws:
                    self.stats['connected'] = True
                    self.stats['symbols_streaming'] = len(self.symbols)
                    self._save_status()  # Sauvegarder quand connecté
                    logger.info(f"✅ WebSocket connecté - {len(self.symbols)} symboles en streaming")
                    
                    while self.is_running:
                        try:
                            msg = await asyncio.wait_for(ws.recv(), timeout=30)
                            data = json.loads(msg)
                            
                            self.stats['ws_messages_received'] += 1
                            
                            if 'data' in data:
                                await self._process_kline_message(data['data'])
                                
                        except asyncio.TimeoutError:
                            # Pas de message, c'est normal
                            continue
                        except websockets.ConnectionClosed:
                            logger.warning("⚠️ WebSocket déconnecté, reconnexion...")
                            break
                            
            except Exception as e:
                logger.error(f"Erreur WebSocket: {e}")
                self.stats['connected'] = False
                
            if self.is_running:
                logger.info("🔄 Reconnexion dans 5 secondes...")
                await asyncio.sleep(5)
    
    async def _process_kline_message(self, data: dict):
        """Traite un message kline WebSocket"""
        if data.get('e') != 'kline':
            return
        
        symbol = data['s']
        kline = data['k']
        
        buffer = self.buffers.get(symbol)
        if not buffer:
            return
        
        # Mettre à jour le buffer
        buffer.update(data)
        
        # Analyser si bougie fermée OU si mode tick actif
        should_analyze = False
        if kline['x'] and self.analysis_on_close:
            should_analyze = True
        elif self.analysis_on_tick:
            # Vérifier l'intervalle minimum
            now = time.time()
            if not hasattr(self, '_last_tick_analysis'):
                self._last_tick_analysis = {}
            last = self._last_tick_analysis.get(symbol, 0)
            if now - last >= self.min_analysis_interval:
                should_analyze = True
                self._last_tick_analysis[symbol] = now
        
        if should_analyze:
            await self._analyze_symbol(symbol)
    
    async def _analyze_symbol(self, symbol: str):
        """Analyse un symbole avec l'IA"""
        if not self.predictor:
            return
        
        buffer = self.buffers.get(symbol)
        if not buffer or len(buffer.get_prices()) < 50:
            return
        
        try:
            prices = buffer.get_prices()
            volumes = buffer.get_volumes()
            
            # Analyse IA (GPU-accélérée)
            item = self.predictor.analyze_symbol(symbol, prices, volumes)
            self.predictor.update_watchlist(symbol, item)
            
            # Stats
            self._analysis_times.append(time.time())
            self.stats['last_analysis'] = datetime.now().isoformat()
            self.stats['analyses_per_minute'] = len([t for t in self._analysis_times if time.time() - t < 60])
            
            # Callbacks
            if self.on_update_callback:
                self.on_update_callback(symbol, item)
            
            # Signal détecté?
            if item.status == "ready":
                self.stats['signals_detected'] += 1
                logger.info(f"🎯 SIGNAL: {symbol} - Score:{item.score} Pattern:{item.pattern}")
                if self.on_signal_callback:
                    self.on_signal_callback(symbol, item)
                    
        except Exception as e:
            logger.warning(f"Erreur analyse {symbol}: {e}")
    
    def _run_loop(self):
        """Exécute la boucle asyncio dans un thread"""
        self.loop = asyncio.new_event_loop()
        asyncio.set_event_loop(self.loop)
        
        try:
            self.loop.run_until_complete(self._ws_handler())
        except Exception as e:
            logger.error(f"Erreur loop: {e}")
        finally:
            self.loop.close()
    
    def start(self):
        """Démarre le service en temps réel"""
        if self.is_running:
            return
        
        if not AI_AVAILABLE:
            logger.error("❌ Module IA non disponible")
            return
        
        self.is_running = True
        self._thread = threading.Thread(target=self._run_loop, daemon=True)
        self._thread.start()
        self._save_status()  # Sauvegarder le statut
        logger.info("🚀 Service IA Temps Réel démarré")
    
    def stop(self):
        """Arrête le service"""
        self.is_running = False
        self.stats['connected'] = False
        self._save_status()  # Sauvegarder le statut
        if self._thread:
            self._thread.join(timeout=5)
        logger.info("⏹️ Service IA Temps Réel arrêté")
    
    def _save_status(self):
        """Sauvegarde le statut dans un fichier pour partage inter-processus"""
        status_file = os.path.join(SCRIPT_DIR, "ai_realtime_status.json")
        try:
            status = {
                'is_running': self.is_running,
                'mode': 'realtime_websocket' if self.is_running else 'stopped',
                'connected': self.stats.get('connected', False),
                'symbols_streaming': self.stats.get('symbols_streaming', 0),
                'last_update': datetime.now().isoformat()
            }
            with open(status_file, 'w') as f:
                json.dump(status, f)
        except Exception as e:
            logger.warning(f"Erreur sauvegarde statut: {e}")
    
    def get_status(self) -> Dict:
        """Retourne le statut du service"""
        return {
            'is_running': self.is_running,
            'mode': 'realtime_websocket',
            'interval': self.interval,
            **self.stats,
            'watchlist_count': len(self.predictor.watchlist) if self.predictor else 0
        }
    
    def get_opportunities(self, min_score: int = 50) -> List[Dict]:
        """Retourne les opportunités actuelles"""
        if not self.predictor:
            return []
        
        watchlist = self.predictor.get_watchlist()
        return [w for w in watchlist if w['score'] >= min_score]
    
    def get_ready_signals(self) -> List[Dict]:
        """Retourne les signaux prêts à acheter (doublement validés)"""
        if not self.predictor:
            return []
        
        watchlist = self.predictor.get_watchlist()
        # 🚨 Double filtre: status == 'ready' ET smart_eligible == True
        # Cela garantit que les signaux BEARISH sont bloqués
        return [w for w in watchlist if w['status'] == 'ready' and w.get('smart_eligible', False) == True]


# Instance globale
_realtime_service = None

def get_realtime_service() -> AIRealtimeService:
    """Retourne l'instance globale du service temps réel"""
    global _realtime_service
    if _realtime_service is None:
        _realtime_service = AIRealtimeService()
    return _realtime_service


def start_realtime_service(symbols: List[str] = None) -> AIRealtimeService:
    """Démarre le service avec les symboles donnés"""
    service = get_realtime_service()
    
    if symbols is None:
        # Charger depuis watchlist.json
        watchlist_file = os.path.join(SCRIPT_DIR, "watchlist.json")
        if os.path.exists(watchlist_file):
            with open(watchlist_file, 'r') as f:
                data = json.load(f)
                manual = data.get('symbols', [])
                auto_added = list(data.get('auto_added', {}).keys())
                spy_inj = list(data.get('spy_injected', {}).keys())
                symbols = list(dict.fromkeys(manual + auto_added + spy_inj))
    
    if not symbols:
        symbols = ['BTCUSDT', 'ETHUSDT', 'BNBUSDT', 'SOLUSDT', 'XRPUSDT']
    
    service.set_symbols(symbols)
    service.start()
    return service


if __name__ == "__main__":
    # Test du service
    import signal
    
    def on_signal(symbol, item):
        print(f"🎯 SIGNAL DÉTECTÉ: {symbol}")
        print(f"   Score: {item.score}, Pattern: {item.pattern}")
        print(f"   Gain prédit: {item.predicted_gain:.2f}%")
    
    service = start_realtime_service()
    service.set_on_signal(on_signal)
    
    print("\n" + "="*50)
    print("🧠 Service IA Temps Réel - Test Mode")
    print("="*50)
    print("Appuyez sur Ctrl+C pour arrêter\n")
    
    try:
        while True:
            time.sleep(10)
            status = service.get_status()
            signals = service.get_ready_signals()
            print(f"\n📊 Status: Connected={status['connected']}, "
                  f"Analyses/min={status['analyses_per_minute']}, "
                  f"Signaux={len(signals)}")
            
            if signals:
                print("🎯 Signaux actifs:")
                for s in signals[:5]:
                    print(f"   • {s['symbol']}: Score={s['score']}, Pattern={s['pattern']}")
                    
    except KeyboardInterrupt:
        service.stop()
        print("\n✅ Service arrêté")
