refactors to use redis & celery

This commit is contained in:
2025-02-20 19:56:37 +01:00
parent f68ada7204
commit 5994d8ae7b
10 changed files with 427 additions and 154 deletions

View File

@@ -5,21 +5,60 @@ import os
import time
from datetime import datetime, timedelta
from requests.exceptions import ConnectionError, Timeout, RequestException
import redis
import threading
from flask import current_app
class Scraper:
def __init__(self, faction_id, fetch_interval, run_interval, app):
self.faction_id = faction_id
self.fetch_interval = fetch_interval
self.run_interval = run_interval
self.end_time = datetime.now() + timedelta(days=run_interval)
self.data_file_name = os.path.join(app.config['DATA']['DATA_DIR'], f"{self.faction_id}-{datetime.now().strftime('%Y-%m-%d-%H-%M')}.csv")
self.scraping_active = False
self.API_KEY = app.config['DEFAULT']['API_KEY']
self.logger = app.logger
print(self.data_file_name)
_instances = {} # Track all instances by faction_id
_lock = threading.Lock()
def __new__(cls, faction_id, *args, **kwargs):
with cls._lock:
# Stop any existing instance for this faction
if faction_id in cls._instances:
old_instance = cls._instances[faction_id]
old_instance.stop_scraping()
instance = super().__new__(cls)
cls._instances[faction_id] = instance
return instance
def __init__(self, faction_id, fetch_interval, run_interval, config):
# Only initialize if not already initialized
if not hasattr(self, 'faction_id'):
self.redis_client = redis.StrictRedis(
host='localhost', port=6379, db=0, decode_responses=True
)
self.faction_id = faction_id
self.fetch_interval = fetch_interval
self.run_interval = run_interval
self.API_KEY = config['DEFAULT']['API_KEY']
self.data_file_name = os.path.join(
config['DATA']['DATA_DIR'],
f"{faction_id}-{datetime.now().strftime('%Y-%m-%d-%H-%M')}.csv"
)
self.end_time = datetime.now() + timedelta(days=int(run_interval))
# Store scraper state in Redis
self.redis_client.hmset(f"scraper:{faction_id}", {
"faction_id": faction_id,
"fetch_interval": fetch_interval,
"run_interval": run_interval,
"end_time": self.end_time.isoformat(),
"data_file_name": self.data_file_name,
"scraping_active": "0",
"api_key": self.API_KEY
})
@property
def scraping_active(self):
return bool(int(self.redis_client.hget(f"scraper:{self.faction_id}", "scraping_active")))
@scraping_active.setter
def scraping_active(self, value):
self.redis_client.hset(f"scraper:{self.faction_id}", "scraping_active", "1" if value else "0")
def fetch_faction_data(self):
url = f"https://api.torn.com/faction/{self.faction_id}?selections=&key={self.API_KEY}"
@@ -48,40 +87,38 @@ class Scraper:
time.sleep(2 ** attempt) # Exponential backoff
return None
def start_scraping(self, app) -> None:
def start_scraping(self) -> None:
"""Starts the scraping process until the end time is reached or stopped manually."""
self.scraping_active = True
current_app.logger.info(f"Starting scraping for faction ID {self.faction_id}")
current_app.logger.debug(f"Fetch interval: {self.fetch_interval}s, Run interval: {self.run_interval} days, End time: {self.end_time}")
# Anwendungskontext explizit setzen
with app.app_context():
current_app.logger.info(f"Starting scraping for faction ID {self.faction_id}")
current_app.logger.debug(f"Fetch interval: {self.fetch_interval}s, Run interval: {self.run_interval} days, End time: {self.end_time}")
MAX_FAILURES = 5
failure_count = 0
MAX_FAILURES = 5 # Stop after 5 consecutive failures
failure_count = 0
while datetime.now() < self.end_time and self.scraping_active:
current_app.logger.info(f"Fetching data at {datetime.now()}")
faction_data = self.fetch_faction_data()
while datetime.now() < self.end_time and self.scraping_active:
current_app.logger.info(f"Fetching data at {datetime.now()}")
faction_data = self.fetch_faction_data()
if not faction_data or "members" not in faction_data:
current_app.logger.warning(f"No faction data found for ID {self.faction_id} (Failure {failure_count + 1}/{MAX_FAILURES})")
failure_count += 1
if failure_count >= MAX_FAILURES:
current_app.logger.error(f"Max failures reached ({MAX_FAILURES}). Stopping scraping.")
break
time.sleep(self.fetch_interval)
continue
current_app.logger.info(f"Fetched {len(faction_data['members'])} members for faction {self.faction_id}")
failure_count = 0 # Reset failure count on success
user_activity_data = self.process_faction_members(faction_data["members"])
self.save_data(user_activity_data)
current_app.logger.info(f"Data appended to {self.data_file_name}")
if not faction_data or "members" not in faction_data:
current_app.logger.warning(f"No faction data found for ID {self.faction_id} (Failure {failure_count + 1}/{MAX_FAILURES})")
failure_count += 1
if failure_count >= MAX_FAILURES:
current_app.logger.error(f"Max failures reached ({MAX_FAILURES}). Stopping scraping.")
break
time.sleep(self.fetch_interval)
continue
self.handle_scraping_end()
current_app.logger.info(f"Fetched {len(faction_data['members'])} members for faction {self.faction_id}")
failure_count = 0 # Reset failure count on success
user_activity_data = self.process_faction_members(faction_data["members"])
self.save_data(user_activity_data)
current_app.logger.info(f"Data appended to {self.data_file_name}")
time.sleep(self.fetch_interval)
self.handle_scraping_end()
def process_faction_members(self, members: Dict[str, Dict]) -> List[Dict]:
@@ -122,6 +159,18 @@ class Scraper:
except Exception as e:
current_app.logger.error(f"Error saving data to {self.data_file_name}: {e}")
def cleanup_redis_state(self):
"""Clean up all Redis state for this scraper instance"""
if hasattr(self, 'faction_id'):
self.redis_client.delete(f"scraper:{self.faction_id}")
current_id = self.redis_client.get("current_faction_id")
if current_id and current_id == str(self.faction_id):
self.redis_client.delete("current_faction_id")
# Remove from instances tracking
with self._lock:
if self.faction_id in self._instances:
del self._instances[self.faction_id]
def handle_scraping_end(self) -> None:
"""Handles cleanup and logging when scraping ends."""
if not self.scraping_active:
@@ -133,7 +182,13 @@ class Scraper:
current_app.logger.info("Scraping completed.")
self.scraping_active = False
self.cleanup_redis_state()
def stop_scraping(self):
self.scraping_active = False
current_app.logger.debug("Scraping stopped by user")
self.cleanup_redis_state()
current_app.logger.debug(f"Scraping stopped for faction {self.faction_id}")
def __del__(self):
"""Ensure Redis cleanup on object destruction"""
self.cleanup_redis_state()