from typing import List, Dict, Optional import requests import pandas as pd import os import time from datetime import datetime, timedelta from requests.exceptions import ConnectionError, Timeout, RequestException from app.logging_config import get_logger from app.config import load_config config = load_config() API_KEY = config['DEFAULT']['API_KEY'] logger = get_logger() class Scraper: def __init__(self, faction_id, fetch_interval, run_interval, app): self.faction_id = faction_id self.fetch_interval = fetch_interval self.run_interval = run_interval self.end_time = datetime.now() + timedelta(days=run_interval) self.data_file_name = os.path.join(app.config['DATA']['DATA_DIR'], f"{self.faction_id}-{datetime.now().strftime('%Y-%m-%d-%H-%M')}.csv") self.scraping_active = False print(self.data_file_name) def fetch_faction_data(self): url = f"https://api.torn.com/faction/{self.faction_id}?selections=&key={API_KEY}" response = requests.get(url) if response.status_code == 200: return response.json() logger.warning(f"Failed to fetch faction data for faction ID {self.faction_id}. Response: {response.text}") return None def fetch_user_activity(self, user_id): url = f"https://api.torn.com/user/{user_id}?selections=basic,profile&key={API_KEY}" retries = 3 for attempt in range(retries): try: response = requests.get(url, timeout=10) response.raise_for_status() return response.json() except ConnectionError as e: logger.error(f"Connection error while fetching user activity for user ID {user_id}: {e}") except Timeout as e: logger.error(f"Timeout error while fetching user activity for user ID {user_id}: {e}") except RequestException as e: logger.error(f"Error while fetching user activity for user ID {user_id}: {e}") if attempt < retries - 1: time.sleep(2 ** attempt) # Exponential backoff return None def start_scraping(self) -> None: """Starts the scraping process until the end time is reached or stopped manually.""" self.scraping_active = True logger.info(f"Starting scraping for faction ID {self.faction_id}") logger.debug(f"Fetch interval: {self.fetch_interval}s, Run interval: {self.run_interval} days, End time: {self.end_time}") MAX_FAILURES = 5 # Stop after 5 consecutive failures failure_count = 0 while datetime.now() < self.end_time and self.scraping_active: logger.info(f"Fetching data at {datetime.now()}") faction_data = self.fetch_faction_data() if not faction_data or "members" not in faction_data: logger.warning(f"No faction data found for ID {self.faction_id} (Failure {failure_count + 1}/{MAX_FAILURES})") failure_count += 1 if failure_count >= MAX_FAILURES: logger.error(f"Max failures reached ({MAX_FAILURES}). Stopping scraping.") break time.sleep(self.fetch_interval) continue failure_count = 0 # Reset failure count on success user_activity_data = self.process_faction_members(faction_data["members"]) self.save_data(user_activity_data) logger.info(f"Data appended to {self.data_file_name}") time.sleep(self.fetch_interval) self.handle_scraping_end() def process_faction_members(self, members: Dict[str, Dict]) -> List[Dict]: """Processes and retrieves user activity for all faction members.""" user_activity_data = [] for user_id in members.keys(): user_activity = self.fetch_user_activity(user_id) if user_activity: user_activity_data.append({ "user_id": user_id, "name": user_activity.get("name", ""), "last_action": user_activity.get("last_action", {}).get("timestamp", 0), "status": user_activity.get("status", {}).get("state", ""), "timestamp": datetime.now().timestamp(), }) logger.info(f"Fetched data for user {user_id} ({user_activity.get('name', '')})") else: logger.warning(f"Failed to fetch data for user {user_id}") return user_activity_data def save_data(self, user_activity_data: List[Dict]) -> None: """Saves user activity data to a CSV file.""" if not user_activity_data: logger.warning("No data to save.") return df = pd.DataFrame(user_activity_data) df["last_action"] = pd.to_datetime(df["last_action"], unit="s") df["timestamp"] = pd.to_datetime(df["timestamp"], unit="s") file_exists = os.path.isfile(self.data_file_name) try: with open(self.data_file_name, "a" if file_exists else "w") as f: df.to_csv(f, mode="a" if file_exists else "w", header=not file_exists, index=False) logger.info(f"Data successfully saved to {self.data_file_name}") except Exception as e: logger.error(f"Error saving data to {self.data_file_name}: {e}") def handle_scraping_end(self) -> None: """Handles cleanup and logging when scraping ends.""" if not self.scraping_active: logger.warning(f"Scraping stopped manually at {datetime.now()}") elif datetime.now() >= self.end_time: logger.warning(f"Scraping stopped due to timeout at {datetime.now()} (Run interval: {self.run_interval} days)") else: logger.error(f"Unexpected stop at {datetime.now()}") logger.info("Scraping completed.") self.scraping_active = False def stop_scraping(self): self.scraping_active = False logger.debug("Scraping stopped by user") def generate_statistics(df): df['hour'] = df['timestamp'].dt.hour # No need to convert timestamp again return df.groupby('hour').size() # Activity by hour