Commit a56634d7 authored by Davide Lagoa's avatar Davide Lagoa
Browse files

logs debug system created

parent d3a7d447
from zipfile import ZipFile
import os
def read_conf_file(path):
res = {}
with open(path) as file:
lines = file.readlines()
for line in lines:
if '=' in line:
prop = line.replace(" ", "").replace("\n", "")
l_str = prop.split('=')
res[l_str[0]] = l_str[1]
return res
def read_workers_conf_file(path):
res = []
with open(path) as file:
lines = file.readlines()
for line in lines:
if "#" not in line[0]:
str = line.replace(" ", "").replace("\n", "")
res.append(str)
return res
def compressFiles(path, savePath):
file_paths = get_all_file_paths(path)
zipLocation = savePath
with ZipFile(zipLocation, 'w') as zip:
# writing each file one by one
for file in file_paths:
new_file = file.split("/")[-1]
zip.write(file, arcname=new_file)
return zipLocation
def get_all_file_paths(directory):
# initializing empty file paths list
file_paths = []
# crawling through directory and subdirectories
for root, directories, files in os.walk(directory):
for filename in files:
# join the two strings in order to form the full filepath.
filepath = os.path.join(root, filename)
file_paths.append(filepath)
# returning all file paths
return file_paths
# -*- coding: utf-8 -*-
from flask import Flask, render_template, request, send_file, jsonify
from ReadConf import read_conf_file
from logging.handlers import TimedRotatingFileHandler
import FilesUtilities
import FilesUtilities
import os
import socket
import shutil
import logging
import re
import time
import requests
CONFIGURATIONS = read_conf_file('/configs/docker_config.conf')
CONFIGURATIONS = FilesUtilities.read_conf_file('/configs/docker_config.conf')
logPath = CONFIGURATIONS["logs"]
......@@ -16,10 +20,19 @@ if not os.path.exists(logPath):
logPath = logPath + 'manager.log'
logging.basicConfig(filename=logPath, level=logging.DEBUG, format='%(asctime)s: %(levelname)s: >>>%(message)s')
logging.info("Docker configurations: " + str(CONFIGURATIONS))
# format the log entries
formatter = logging.Formatter('%(asctime)s %(name)s %(levelname)s %(message)s')
logging.info('Starting server')
handler = TimedRotatingFileHandler(logPath, when='midnight', backupCount=20)
handler.setFormatter(formatter)
logger = logging.getLogger(__name__)
logger.addHandler(handler)
logger.setLevel(logging.DEBUG)
logger.info("Docker configurations: " + str(CONFIGURATIONS))
logger.info('Starting server')
app = Flask(__name__)
hostname = socket.gethostname()
......@@ -33,16 +46,22 @@ last_submission_ID = 0
print('SERVER WORKING')
@app.after_request
def after_request(response):
response.headers["Cache-Control"] = "no-cache, no-store, must-revalidate, public, max-age=0"
response.headers["Expires"] = 0
response.headers["Pragma"] = "no-cache"
return response
@app.route("/")
def index():
logging.info('Welcome page rendering...')
logger.info('Welcome page rendering...')
return render_template("home.html")
@app.route("/submit", methods=["POST"])
def submit():
logging.info('HTML submission')
logger.info('HTML submission')
files = request.files.getlist("file")
taxonomyId = request.form["text"]
......@@ -53,16 +72,16 @@ def submit():
"Model": "Model must be in .xml format or a list of metabolites in .txt format (one metabolite per line)",
"Taxonomy ID": "Input must only contain digits"}), 406
logging.info("Taxonomy identifier: " + taxonomyId)
logger.info("Taxonomy identifier: " + taxonomyId)
return run(files, taxonomyId, False)
@app.route("/submitMerlinPlugin/<taxonomyId>", methods=["POST"])
def submitMerlinPlugin(taxonomyId):
logging.info('Merlin plugin submission')
logger.info('Merlin plugin submission')
files = request.files.getlist("file")
logging.info("files in submission " + str(files))
logger.info("files in submission " + str(files))
validInput = verifyInput(taxonomyId)
......@@ -85,16 +104,16 @@ def getReactionInfo(reactionId):
return jsonify({"Result": "Success",
"Purpose": "Future page for transyt reaction " + reactionId}), 200
else:
logging.info('Bad request to get reaction. Request done: ' + reactionId)
logger.info('Bad request to get reaction. Request done: ' + reactionId)
return jsonify({"Result": "Bad request, page does not exist!"}), 400
def run(files, taxonomyId, isRest):
logging.debug('New submission in progress...')
logger.debug('New submission in progress...')
total = len(os.listdir(SUBMISSIONS_PATH))
if total > SUBMISSIONS_LIMIT:
logging.error('Server capacity overload! Returning error 503 to client!')
logger.error('Server capacity overload! Returning error 503 to client!')
return jsonify({"result": "Server overload",
"message": "The server cannot handle the submission due to capacity overload. Please try again later!"}), 503
global last_submission_ID
......@@ -109,7 +128,7 @@ def run(files, taxonomyId, isRest):
os.makedirs(submission_path)
logging.debug("Submission " + str(last_submission_ID) + " folder created!")
logger.debug("Submission " + str(last_submission_ID) + " folder created!")
inputCheck = []
......@@ -130,15 +149,15 @@ def run(files, taxonomyId, isRest):
inputCheck.append("metabolites.txt")
destination = "/".join([submission_path, fileName])
logging.debug('Saving file ' + fileName + ' at ' + destination)
logger.debug('Saving file ' + fileName + ' at ' + destination)
file.save(destination)
validInput = ["genome.faa", "model.xml"]
validInput2 = ["genome.faa", "metabolites.txt"]
validInputs = [validInput, validInput2]
logging.info("taxID -> " + taxonomyId)
logging.info("inputCheck -> " + str(inputCheck))
logger.info("taxID -> " + taxonomyId)
logger.info("inputCheck -> " + str(inputCheck))
if inputCheck in validInputs:
with open("/".join([submission_path, "taxID.txt"]), "w") as taxFile:
......@@ -174,11 +193,11 @@ def display_msg(submissionID, isRest):
elif isRest == 'True':
isRest = True
logging.debug("Request to check submission " + submissionID + " status")
logger.debug("Request to check submission " + submissionID + " status")
folders = os.listdir(SUBMISSIONS_PATH)
logging.info("submissions " + str(folders))
logger.info("submissions " + str(folders))
if submissionID in folders:
total = len(folders)
......@@ -196,10 +215,10 @@ def display_msg(submissionID, isRest):
folders = os.listdir(PROCESSING_PATH)
logging.info("processing " + str(folders))
logger.info("processing " + str(folders))
if submissionID in folders: # up and running response
logging.debug("Submission " + submissionID + " still running")
logger.debug("Submission " + submissionID + " still running")
if isRest:
return jsonify({"result": "processing", "message": "accepted but process still running, please wait."}), 202
......@@ -208,11 +227,11 @@ def display_msg(submissionID, isRest):
folders = os.listdir(RESULTS_PATH)
logging.info("results " + str(folders))
logger.info("results " + str(folders))
if submissionID in folders:
files = os.listdir(RESULTS_PATH + submissionID)
logging.debug("Submission " + submissionID + " results complete")
logger.debug("Submission " + submissionID + " results complete")
result = "error"
message = "an error occurred while processing the submission"
......@@ -245,10 +264,81 @@ def display_msg(submissionID, isRest):
@app.route("/download/<submissionID>")
def download(submissionID):
logging.debug("Request to download submission " + submissionID + " results")
return send_file(RESULTS_PATH + submissionID + "/results.zip", as_attachment=True,
logger.debug("Request to download submission " + submissionID + " results")
timestr = time.strftime("%Y%m%d_%H%M%S")
return send_file(RESULTS_PATH + submissionID + "/results" + timestr + ".zip", as_attachment=True,
attachment_filename='results.zip')
@app.route("/debug/logs/<level>")
def logsRetriever(level):
logger.info("Request to download logs reports. Level: " + level)
managerLogsPath = CONFIGURATIONS["logs"]
saveFilesPath = '/tempLogs/'
logsZip = '/logs.zip'
if os.path.exists(saveFilesPath):
logger.debug("Removing old returned logs.")
shutil.rmtree(saveFilesPath, ignore_errors=True)
if os.path.exists(logsZip):
os.remove(logsZip)
os.makedirs(saveFilesPath)
#implement service here when running
if level == 'manager':
logger.info("Collecting manager logs.")
FilesUtilities.compressFiles(managerLogsPath, saveFilesPath + 'managerLogs.zip')
elif level == 'workers':
logger.info("Collecting workers logs.")
getWorkersLogs(saveFilesPath)
elif level == 'all':
logger.info("Collecting workers logs.")
getWorkersLogs(saveFilesPath)
logger.info("Collecting manager logs.")
FilesUtilities.compressFiles(managerLogsPath, saveFilesPath + 'managerLogs.zip')
else:
logger.info('Bad request to get logs. Request done: ' + level)
return jsonify({"Result": "Bad request, this level is not recognized!"}), 400
FilesUtilities.compressFiles(saveFilesPath, logsZip)
if not os.path.exists(saveFilesPath):
return jsonify({"Result": "Request unavailable!"}), 503
timestr = time.strftime("%Y%m%d_%H%M%S")
return send_file(logsZip, as_attachment=True, attachment_filename='logs' + timestr + '.zip')
def getWorkersLogs(savePath):
logsWorkerPage = CONFIGURATIONS.get('workerLog')
handshake = CONFIGURATIONS.get('handshake')
workers = FilesUtilities.read_workers_conf_file('/configs/workers_addresses.conf')
for host in workers:
response = requests.get(host + handshake)
if response.status_code == 200:
logger.debug("Worker " + host + " is awake. Asking for it for logs.")
response = requests.get(host + logsWorkerPage)
if response.status_code == 200:
name = re.sub("https*:", "", host)
name = name.replace("/", "")
filesavePath = savePath + name + '.zip'
zfile = open(filesavePath, 'wb')
zfile.write(response.content)
zfile.close()
logger.debug("Logs report saved at: " + filesavePath)
else:
logger.debug("Logs not retrieved. Response code: " + str(response.status_code))
else:
logger.debug("Worker " + host + " not alive!")
def countEntriesBelow(l, id):
id = int(id)
......@@ -277,7 +367,9 @@ def verifyInput(userInput):
return validTaxID
logging.info('Server running!')
logger.info('Server running!')
if __name__ == "__main__":
app.run(host=CONFIGURATIONS["flask_host"], port=int(CONFIGURATIONS["flask_port"]), threaded=True, debug=True)
app.run(host=CONFIGURATIONS["flask_host"], port=int(CONFIGURATIONS["flask_port"]), threaded=True, debug=False)
app.config['SEND_FILE_MAX_AGE_DEFAULT'] = 0
from ReadConf import read_conf_file, read_workers_conf_file
from FilesUtilities import read_conf_file, read_workers_conf_file
from logging.handlers import TimedRotatingFileHandler
import logging
import time
import os
......@@ -16,10 +17,16 @@ if not os.path.exists(logPath):
logPath = logPath + 'managerWorkers.log'
print(logPath)
# format the log entries
formatter = logging.Formatter('%(asctime)s %(name)s %(levelname)s %(message)s')
logging.basicConfig(filename=logPath, level=logging.DEBUG, format='%(asctime)s: %(levelname)s: >>>%(message)s')
logging.info("Docker configurations: " + str(CONFIGURATIONS))
handler = TimedRotatingFileHandler(logPath, when='midnight', backupCount=20)
handler.setFormatter(formatter)
logger = logging.getLogger(__name__)
logger.addHandler(handler)
logger.setLevel(logging.DEBUG)
logger.info("Docker configurations: " + str(CONFIGURATIONS))
RESULTS_LIMIT = int(CONFIGURATIONS["records_limit"])
SUBMISSIONS_PATH = CONFIGURATIONS["submissions"]
......@@ -39,6 +46,7 @@ os.makedirs(SUBMISSIONS_PATH)
os.makedirs(PROCESSING_PATH)
os.makedirs(RESULTS_PATH)
def run():
mapping = initializeWorkers()
......@@ -50,14 +58,14 @@ def run():
try:
submissions = os.listdir(SUBMISSIONS_PATH)
# logging.info("Submissions len = " + str(len(submissions)))
# logger.info("Submissions len = " + str(len(submissions)))
if len(submissions) > 0:
submissionID = getNextInQueue(submissions)
logging.info("Next in queue = " + submissionID)
logger.info("Next in queue = " + submissionID)
workerID = findAvailableWorker(mapping)
logging.info("Available workerID = " + str(workerID))
logger.info("Available workerID = " + str(workerID))
if workerID is not None:
mapping[workerID] = submissionID
......@@ -77,13 +85,13 @@ def run():
'taxonomyId': open(processingPath + 'taxID.txt', 'rb'),
'genome': open(processingPath + 'genome.faa', 'rb')}
logging.info("Sending request to start to " + str(workerID))
logging.debug("Sending files: " + str(files))
logging.info(str(workerID) + starter)
logger.info("Sending request to start to " + str(workerID))
logger.debug("Sending files: " + str(files))
logger.info(str(workerID) + starter)
res = requests.post(workerID + starter, files = files) # request worker docker to run
if res.status_code is not 102:
logging.error("An error occurred while starting the worker!")
logger.error("An error occurred while starting the worker!")
mapping = checkSubmissionsStatus(mapping)
# results = os.listdir(RESULTS_PATH)
......@@ -97,7 +105,7 @@ def run():
except:
path = SUBMISSIONS_PATH + str(submissionID)
logging.error('An error occurred while processing submission ' + str(submissionID) + '!')
logger.error('An error occurred while processing submission ' + str(submissionID) + '!')
if os.path.exists(path):
shutil.rmtree(path)
......@@ -114,12 +122,12 @@ def checkSubmissionsStatus(mapping):
if not os.path.exists(savePath):
os.makedirs(savePath)
savePath = savePath + "/results.zip"
logging.debug("Save path: " + savePath)
logger.debug("Save path: " + savePath)
zfile = open(savePath, 'wb')
zfile.write(response.content)
zfile.close()
logging.debug("folder contents: " + str(os.listdir(RESULTS_PATH + mapping[key])))
logger.debug("folder contents: " + str(os.listdir(RESULTS_PATH + mapping[key])))
elif response.status_code == 202:
continue
......@@ -167,19 +175,19 @@ def initializeWorkers():
for host in CONFIGURATIONS_WORKERS:
res = requests.get(host + handshake)
if res.status_code == 200:
logging.info('new worker instantiated -> ' + host)
logger.info('new worker instantiated -> ' + host)
dic[host] = None
else:
logging.error('declared worker is not running -> ' + host)
logger.error('declared worker is not running -> ' + host)
logging.info(str(len(dic)) + " workers initialized")
logger.info(str(len(dic)) + " workers initialized")
return dic
def findAvailableWorker(mapping):
handshake = CONFIGURATIONS.get('handshake')
logging.info('Workers status: ' + str(mapping))
logger.info('Workers status: ' + str(mapping))
print(mapping)
for key in mapping.keys():
if mapping[key] is None:
......@@ -187,7 +195,7 @@ def findAvailableWorker(mapping):
if res.status_code == 200:
return key
else:
logging.error('worker not responsive -> ' + key)
logger.error('worker not responsive -> ' + key)
return None
......@@ -197,14 +205,14 @@ def cleanResultsDirectory():
while len(results) > RESULTS_LIMIT:
directoryID = getNextInQueue(results)
logging.info('Deleting directory ' + directoryID + ' from results!')
logger.info('Deleting directory ' + directoryID + ' from results!')
shutil.rmtree(RESULTS_PATH + directoryID, ignore_errors=True)
if os.path.exists(PROCESSING_PATH):
shutil.rmtree(PROCESSING_PATH + directoryID, ignore_errors=True)
results.remove(directoryID)
if __name__ == '__main__':
logging.info("Starting server!")
logger.info("Starting server!")
subprocess.Popen(["python", "/home/manager.py"])
logging.info("Starting service!")
logger.info("Starting service!")
run()
......@@ -31,4 +31,7 @@ status = /status
workers = /configs/workerConfigs.conf
#15 - check worker connection
handshake = /handshake
\ No newline at end of file
handshake = /handshake
#16 - check worker connection
workerLog = /retrieveLogs
\ No newline at end of file
http://transyt_worker_1:80
http://transyt_worker_2:80
http://transyt_worker_3:80
#http://rosalind.di.uminho.pt:8085
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment