Commit 265113d9 authored by Davide Lagoa's avatar Davide Lagoa
Browse files

webserver major update

parent c853fd71
FROM python:3-onbuild
RUN mkdir /configs
COPY ./configs /configs
COPY ./code /home
WORKDIR /workdir
EXPOSE 80
......
def read_conf_file(path):
res = {}
with open(path) as file:
lines = file.readlines()
for line in lines:
if '=' in line:
prop = line.replace(" ", "").replace("\n", "")
l_str = prop.split('=')
res[l_str[0]] = l_str[1]
return res
def read_workers_conf_file(path):
res = []
with open(path) as file:
lines = file.readlines()
for line in lines:
str = line.replace(" ", "").replace("\n", "")
res.append(str)
return res
# -*- coding: utf-8 -*-
from flask import Flask, render_template, request, send_file, jsonify
from ReadConf import read_conf_file
import os
import socket
import shutil
import logging
logPath = '/usr/src/app/logs/'
CONFIGURATIONS = read_conf_file('/configs/docker_config.conf')
logPath = CONFIGURATIONS["logs"]
if not os.path.exists(logPath):
os.makedirs(logPath)
......@@ -13,24 +16,22 @@ if not os.path.exists(logPath):
logPath = logPath + 'manager.log'
logging.basicConfig(filename=logPath, level=logging.DEBUG, format='%(asctime)s: %(levelname)s: >>>%(message)s')
logging.info("Docker configurations: " + str(CONFIGURATIONS))
logging.info('Starting server')
# subprocess.Popen(["python", "/home/workersManager.py"])
app = Flask(__name__)
hostname = socket.gethostname()
SUBMISSIONS_LIMIT = 100
SUBMISSIONS_PATH = '/workdir/submissions_pool/'
PROCESSING_PATH = '/workdir/processing_pool/'
RESULTS_PATH = '/workdir/results_pool/'
SUBMISSIONS_LIMIT = int(CONFIGURATIONS["records_limit"])
SUBMISSIONS_PATH = CONFIGURATIONS["submissions"]
PROCESSING_PATH = CONFIGURATIONS["processing"]
RESULTS_PATH = CONFIGURATIONS["results"]
last_submission_ID = 0
print('SERVER WORKING')
@app.route("/")
def index():
logging.info('Welcome page rendering...')
......@@ -53,11 +54,12 @@ def submit():
logging.info("Taxonomy identifier: " + taxonomyId)
return run(files, taxonomyId, False)
@app.route("/submitMerlinPlugin/<taxonomyId>", methods=["POST"])
def submitMerlinPlugin(taxonomyId):
logging.info('Merlin plugin submission')
files = request.files.getlist("file")
logging.info("files in submission " + str(files))
validInput = verifyInput(taxonomyId)
......@@ -69,6 +71,7 @@ def submitMerlinPlugin(taxonomyId):
return run(files, taxonomyId, True)
def run(files, taxonomyId, isRest):
logging.debug('New submission in progress...')
......@@ -96,7 +99,7 @@ def run(files, taxonomyId, isRest):
for file in files:
formats = {".faa":"genome.faa", ".fasta":"genome.faa", ".xml":"model.xml"}
formats = {".faa": "genome.faa", ".fasta": "genome.faa", ".xml": "model.xml"}
fileName = file.filename
inFormats = False
......@@ -107,7 +110,6 @@ def run(files, taxonomyId, isRest):
inputCheck.append(formats[key])
if not inFormats and "genome.faa" in inputCheck and fileName.endswith(".txt"):
logging.info("ESTOU A ENTRAR AQUI")
fileName = "metabolites.txt"
inputCheck.append("metabolites.txt")
......@@ -115,20 +117,21 @@ def run(files, taxonomyId, isRest):
logging.debug('Saving file ' + fileName + ' at ' + destination)
file.save(destination)
validInput = ["genome.faa","model.xml"]
validInput2 = ["genome.faa","metabolites.txt"]
validInput = ["genome.faa", "model.xml"]
validInput2 = ["genome.faa", "metabolites.txt"]
validInputs = [validInput, validInput2]
logging.info("taxID -> " + taxonomyId)
logging.info("inputCheck -> " + str(inputCheck))
if inputCheck in validInputs:
with open("/".join([submission_path,"taxID.txt"]), "w") as taxFile:
taxFile.write(taxonomyId) # write TaxonomyID file
with open("/".join([submission_path, "taxID.txt"]), "w") as taxFile:
taxFile.write(taxonomyId) # write TaxonomyID file
else:
return jsonify({"Genome":"Input genome must be in .faa or .fasta format", "Model":"Model must be in .xml format or a list of metabolites in .txt format (one metabolite per line)", "Taxonomy ID":"Input must only contain digits"}), 415
return jsonify({"Genome": "Input genome must be in .faa or .fasta format",
"Model": "Model must be in .xml format or a list of metabolites in .txt format (one metabolite per line)",
"Taxonomy ID": "Input must only contain digits"}), 415
with open(submission_path + "/submissionComplete", "w") as f:
f.write("Submission of files complete: ")
......@@ -140,37 +143,35 @@ def run(files, taxonomyId, isRest):
if isRest:
return jsonify({"result": "submitted",
"message": "submission accepted. Waiting in queue with " + str(total + 1) + " submissions!",
"queue": total,
"submissionID": str(last_submission_ID)}), 201
"message": "submission accepted. Waiting in queue with " + str(total + 1) + " submissions!",
"queue": total,
"submissionID": str(last_submission_ID)}), 201
#return render_template("status.html", subId = str(last_submission_ID))
# return render_template("status.html", subId = str(last_submission_ID))
return display_msg(str(last_submission_ID), isRest)
@app.route('/status/<submissionID>/<isRest>')
def display_msg(submissionID, isRest):
if isRest == 'False':
isRest = False
elif isRest == 'True':
isRest = True
logging.info("isRest request: " + str(isRest) + " > type: " + str(type(isRest)))
logging.debug("Request to check submission " + submissionID + " status")
folders = os.listdir(SUBMISSIONS_PATH)
logging.info("submissions " + str(folders))
if submissionID in folders:
total = len(folders)
inQueue = countEntriesBelow(folders, submissionID)
if isRest:
return jsonify({"result": "queued",
"message": "Waiting in queue with " + str(total + 1) + " submissions!",
"queue": str(inQueue)}), 201
"message": "Waiting in queue with " + str(total + 1) + " submissions!",
"queue": str(inQueue)}), 201
subStatus = {"result": "queued",
"message": "Waiting in queue",
......@@ -182,25 +183,45 @@ def display_msg(submissionID, isRest):
logging.info("processing " + str(folders))
if submissionID in folders: # up and running response
logging.debug("Submission " + submissionID + "still running")
logging.debug("Submission " + submissionID + " still running")
if isRest:
return jsonify({"result": "processing", "message": "accepted but process still running, please wait."}), 202
subStatus = {"result": "processing", "message": "accepted but process still running, please wait."}
return render_template("status.html", subId=submissionID, status=0), 202
return render_template("status.html", subId=submissionID, status=subStatus), 202
folders = os.listdir(RESULTS_PATH)
logging.info("results " + str(folders))
if submissionID in folders:
files = os.listdir(RESULTS_PATH + submissionID)
logging.debug("Submission " + submissionID + " results complete")
if isRest:
result = "error"
message = "an error occurred while processing the submission"
code = 500
if "results.zip" in files:
result = "success"
message = "the process is now complete. Download the results at ./download"
code = 200
elif "408" in files:
result = "time out"
message = "the process timed out. Please try again"
code = 408
elif "500" in files:
code = 500
if isRest or code is not 200:
return jsonify(
{"result": "success", "message": "the process is now complete. Download the results at ./downloads"}), 200
{"result": result,
"message": message}), code
#return download(submissionID)
return render_template("download.html", subId = submissionID), 200
# return download(submissionID)
return render_template("download.html", subId=submissionID), 200
return jsonify({"result": "Server error",
"message": "Something went wrong while processing the request, please try again", }), 500
......@@ -223,8 +244,8 @@ def countEntriesBelow(l, id):
res += 1
return res
def verifyInput(userInput):
userInput = str(userInput)
validTaxID = False
......@@ -239,10 +260,7 @@ def verifyInput(userInput):
return validTaxID
logging.info('Server running!')
if __name__ == "__main__":
app.run(host="0.0.0.0", port=80, threaded=True, debug=True)
app.run(host=CONFIGURATIONS["flask_host"], port=int(CONFIGURATIONS["flask_port"]), threaded=True, debug=True)
......@@ -6,9 +6,7 @@
<link rel="stylesheet" href="{{ url_for('static', filename='css/center.css') }}">
<link rel="stylesheet" href="//maxcdn.bootstrapcdn.com/bootstrap/3.2.0/css/bootstrap.min.css">
<link rel="stylesheet" href="//maxcdn.bootstrapcdn.com/bootstrap/3.2.0/css/bootstrap-theme.min.css">
<link rel="stylesheet" href="{{ url_for('static', filename='css/header.css') }}">
<link href="//fonts.googleapis.com/css?family=Oxygen&subset=latin,latin-ext" rel="stylesheet" type="text/css">
<link rel="stylesheet" href="{{ url_for('static', filename='css/nav.css') }}">
<link rel="stylesheet" href="{{ url_for('static', filename='css/downloadButton.css') }}">
<body style="background: #fefcfe">
......
......@@ -7,9 +7,7 @@
<link rel="stylesheet" href="{{ url_for('static', filename='css/center.css') }}">
<link rel="stylesheet" href="//maxcdn.bootstrapcdn.com/bootstrap/3.2.0/css/bootstrap.min.css">
<link rel="stylesheet" href="//maxcdn.bootstrapcdn.com/bootstrap/3.2.0/css/bootstrap-theme.min.css">
<link rel="stylesheet" href="{{ url_for('static', filename='css/header.css') }}">
<link href="//fonts.googleapis.com/css?family=Oxygen&subset=latin,latin-ext" rel="stylesheet" type="text/css">
<link rel="stylesheet" href="{{ url_for('static', filename='css/nav.css') }}">
<body style="background: #fefcfe">
<div class="center" align="center" style="border: 0">
......
from ReadConf import read_conf_file, read_workers_conf_file
import logging
import time
import os
......@@ -5,12 +6,25 @@ import requests
import shutil
import subprocess
RESULTS_LIMIT = 30
SUBMISSIONS_PATH = '/workdir/submissions_pool/'
PROCESSING_PATH = '/workdir/processing_pool/'
RESULTS_PATH = '/workdir/results_pool/'
CONFIGURATIONS = read_conf_file('/configs/docker_config.conf')
CONFIGURATIONS_WORKERS = read_workers_conf_file('/configs/workers_addresses.conf')
CONFIGURATION_FILE = '/home/docker_config.conf'
logPath = CONFIGURATIONS["logs"]
if not os.path.exists(logPath):
os.makedirs(logPath)
logPath = logPath + 'managerWorkers.log'
print(logPath)
logging.basicConfig(filename=logPath, level=logging.DEBUG, format='%(asctime)s: %(levelname)s: >>>%(message)s')
logging.info("Docker configurations: " + str(CONFIGURATIONS))
RESULTS_LIMIT = int(CONFIGURATIONS["records_limit"])
SUBMISSIONS_PATH = CONFIGURATIONS["submissions"]
PROCESSING_PATH = CONFIGURATIONS["processing"]
RESULTS_PATH = CONFIGURATIONS["results"]
if os.path.exists(SUBMISSIONS_PATH):
shutil.rmtree(SUBMISSIONS_PATH, ignore_errors=True)
......@@ -25,16 +39,6 @@ os.makedirs(SUBMISSIONS_PATH)
os.makedirs(PROCESSING_PATH)
os.makedirs(RESULTS_PATH)
logPath = '/usr/src/app/logs/'
if not os.path.exists(logPath):
os.makedirs(logPath)
logPath = logPath + 'managerWorkers.log'
logging.basicConfig(filename=logPath, level=logging.DEBUG, format='%(asctime)s: %(levelname)s: >>>%(message)s')
def run():
mapping = initializeWorkers()
......@@ -58,29 +62,36 @@ def run():
if workerID is not None:
mapping[workerID] = submissionID
port = configs.get('port')
protocol = configs.get('protocol')
starter = configs.get('starter')
processingPath = PROCESSING_PATH + submissionID
starter = CONFIGURATIONS.get('starter')
processingPath = PROCESSING_PATH + submissionID + "/"
shutil.move(SUBMISSIONS_PATH + submissionID, processingPath)
processingPath = processingPath.replace('/', '$')
modelFile = "model.xml"
if not os.path.isfile(processingPath + modelFile):
modelFile = "metabolites.txt"
files = {
'model': open(processingPath + modelFile, 'rb'),
'taxonomyId': open(processingPath + 'taxID.txt', 'rb'),
'genome': open(processingPath + 'genome.faa', 'rb')}
logging.info("Sending request to start to " + str(workerID))
logging.info(
protocol + str(workerID) + ":" + str(port) + starter + "/" + processingPath + "/" + RESULTS_PATH.replace('/',
'$') + submissionID)
requests.get(
protocol + workerID + ":" + port + starter + "/" + processingPath + "/" + RESULTS_PATH.replace('/',
'$') + submissionID) # request worker docker to run
results = os.listdir(RESULTS_PATH)
processes = os.listdir(PROCESSING_PATH)
for key in mapping.keys(): # reset workers that finished
if mapping[key] in results:
subResult = os.listdir(RESULTS_PATH + mapping[key])
if 'processComplete' in subResult or mapping[key] not in processes:
mapping[key] = None
logging.debug("Sending files: " + str(files))
logging.info(str(workerID) + starter)
res = requests.post(workerID + starter, files = files) # request worker docker to run
if res.status_code is not 102:
logging.error("An error occurred while starting the worker!")
mapping = checkSubmissionsStatus(mapping)
# results = os.listdir(RESULTS_PATH)
# for key in mapping.keys(): # reset workers that finished
# if mapping[key] in results:
# shutil.rmtree(RESULTS_PATH + mapping[key], ignore_errors=True)
# mapping[key] = None
cleanResultsDirectory()
......@@ -92,6 +103,47 @@ def run():
time.sleep(5)
def checkSubmissionsStatus(mapping):
status = CONFIGURATIONS.get('status')
for key in mapping.keys(): # reset workers that finished
if mapping[key] is not None:
response = requests.get(key + status)
savePath = RESULTS_PATH + mapping[key]
if response.status_code == 200:
if not os.path.exists(savePath):
os.makedirs(savePath)
savePath = savePath + "/results.zip"
logging.debug("Save path: " + savePath)
zfile = open(savePath, 'wb')
zfile.write(response.content)
zfile.close()
logging.debug("folder contents: " + str(os.listdir(RESULTS_PATH + mapping[key])))
elif response.status_code == 202:
continue
elif response.status_code == 408:
if not os.path.exists(savePath):
os.makedirs(savePath)
with open(savePath + "/408", "w") as f:
f.write("Submission failed, time out")
elif response.status_code == 408:
if not os.path.exists(savePath):
os.makedirs(savePath)
with open(savePath + "/410", "w") as f:
f.write("Submission failed")
else:
if not os.path.exists(savePath):
os.makedirs(savePath)
with open(savePath + "/500", "w") as f:
f.write("Submission failed, unknown error")
if os.path.exists(RESULTS_PATH + mapping[key]):
shutil.rmtree(PROCESSING_PATH + mapping[key], ignore_errors=True)
mapping[key] = None
return mapping
def getNextInQueue(l):
integerList = [int(i) for i in l]
......@@ -109,26 +161,33 @@ def getNextInQueue(l):
def initializeWorkers():
handshake = CONFIGURATIONS.get('handshake')
dic = {}
time.sleep(10) #give time to initialize workers
for host in CONFIGURATIONS_WORKERS:
res = requests.get(host + handshake)
if res.status_code == 200:
logging.info('new worker instantiated -> ' + host)
dic[host] = None
else:
logging.error('declared worker is not running -> ' + host)
workersLimit = int(configs.get('limit'))
for i in range(1, workersLimit + 1):
key = configs['name'] + str(i)
logging.info('new worker instantiated -> ' + key)
dic[key] = None
logging.info(str(workersLimit) + " workers initialized")
logging.info(str(len(dic)) + " workers initialized")
return dic
def findAvailableWorker(mapping):
handshake = CONFIGURATIONS.get('handshake')
logging.info('Workers status: ' + str(mapping))
print(mapping)
for key in mapping.keys():
if mapping[key] is None:
return key
res = requests.get(key + handshake)
if res.status_code == 200:
return key
else:
logging.error('worker not responsive -> ' + key)
return None
......@@ -144,26 +203,6 @@ def cleanResultsDirectory():
shutil.rmtree(PROCESSING_PATH + directoryID, ignore_errors=True)
results.remove(directoryID)
def read_conf_file():
res = {}
with open(CONFIGURATION_FILE) as file:
lines = file.readlines()
for line in lines:
if '=' in line:
l = line.replace(" ", "").replace("\n", "")
l_str = l.split('=')
res[l_str[0]] = l_str[1]
logging.info('Configurations: ' + str(res))
return res
configs = read_conf_file()
logging.info("Docker configurations: " + str(configs))
if __name__ == '__main__':
logging.info("Starting server!")
subprocess.Popen(["python", "/home/manager.py"])
......
#docker workers configuration
#5 - restful worker starter
starter = /start
#6 - submissions directory path
submissions = /workdir/submissions_pool/
#7 - processing directory path
processing = /workdir/processing_pool/
#8 - results directory path
results = /workdir/results_pool/
#9 - logs directory path
logs = /workdir/logs/
#10 - number of submissions that can stay in queue waiting and in cache in results directory
records_limit = 50
#11 - flask webserver port
flask_port = 80
#12 - flask webserver host
flask_host = 0.0.0.0
#13 - restful worker status
status = /status
#14 - workers config file path
workers = /configs/workerConfigs.conf
#15 - check worker connection
handshake = /handshake
\ No newline at end of file
http://transyt_worker_1:80
http://transyt_worker_2:80
http://transyt_worker_3:80
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment