full refactor fml
This commit is contained in:
85
app/util.py
85
app/util.py
@@ -0,0 +1,85 @@
|
||||
import os
|
||||
import zipfile
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
from app.state import data_file_name, log_file_name
|
||||
|
||||
from app.config import load_config
|
||||
|
||||
config = load_config()
|
||||
|
||||
def create_zip(file_paths, zip_name, app):
|
||||
temp_dir = os.path.abspath(app.config['TEMP']['TEMP_DIR'])
|
||||
zip_path = os.path.join(temp_dir, zip_name)
|
||||
with zipfile.ZipFile(zip_path, 'w') as zipf:
|
||||
for file_path in file_paths:
|
||||
zipf.write(file_path, os.path.basename(file_path))
|
||||
print(f"Zip file created: {zip_path}")
|
||||
return zip_path
|
||||
|
||||
def delete_old_zips():
|
||||
temp_dir = os.path.abspath(config['TEMP']['TEMP_DIR'])
|
||||
now = datetime.now()
|
||||
for filename in os.listdir(temp_dir):
|
||||
if filename.endswith('.zip'):
|
||||
file_path = os.path.join(temp_dir, filename)
|
||||
if now - datetime.fromtimestamp(os.path.getmtime(file_path)) > timedelta(hours=1):
|
||||
os.remove(file_path)
|
||||
|
||||
def tail(filename, n):
|
||||
stat = os.stat(filename)
|
||||
n = int(n)
|
||||
if stat.st_size == 0 or n == 0:
|
||||
yield ''
|
||||
return
|
||||
|
||||
page_size = int(config['LOGGING']['TAIL_PAGE_SIZE'])
|
||||
offsets = []
|
||||
count = _n = n if n >= 0 else -n
|
||||
|
||||
last_byte_read = last_nl_byte = starting_offset = stat.st_size - 1
|
||||
|
||||
with open(filename, 'r') as f:
|
||||
while count > 0:
|
||||
starting_byte = last_byte_read - page_size
|
||||
if last_byte_read == 0:
|
||||
offsets.append(0)
|
||||
break
|
||||
elif starting_byte < 0:
|
||||
f.seek(0)
|
||||
text = f.read(last_byte_read)
|
||||
else:
|
||||
f.seek(starting_byte)
|
||||
text = f.read(page_size)
|
||||
|
||||
for i in range(-1, -1*len(text)-1, -1):
|
||||
last_byte_read -= 1
|
||||
if text[i] == '\n':
|
||||
last_nl_byte = last_byte_read
|
||||
starting_offset = last_nl_byte + 1
|
||||
offsets.append(starting_offset)
|
||||
count -= 1
|
||||
|
||||
offsets = offsets[len(offsets)-_n:]
|
||||
offsets.reverse()
|
||||
|
||||
with open(filename, 'r') as f:
|
||||
for i, offset in enumerate(offsets):
|
||||
f.seek(offset)
|
||||
|
||||
if i == len(offsets) - 1:
|
||||
yield f.read()
|
||||
else:
|
||||
bytes_to_read = offsets[i+1] - offset
|
||||
yield f.read(bytes_to_read)
|
||||
|
||||
def get_size(path):
|
||||
size = os.path.getsize(path)
|
||||
if size < 1024:
|
||||
return f"{size} bytes"
|
||||
elif size < pow(1024,2):
|
||||
return f"{round(size/1024, 2)} KB"
|
||||
elif size < pow(1024,3):
|
||||
return f"{round(size/(pow(1024,2)), 2)} MB"
|
||||
elif size < pow(1024,4):
|
||||
return f"{round(size/(pow(1024,3)), 2)} GB"
|
||||
Reference in New Issue
Block a user