rar-index-py/flask/app.py

295 lines
11 KiB
Python

from flask import Flask,redirect,url_for,request,render_template,make_response
from datetime import datetime
from hashlib import sha256
from uuid import uuid4 as uuid
## Import db class from func.py and initialise it
from func import db
## import all config variables
from config import *
db=db()
db.startup()
app = Flask(__name__)
########## CUSTOM FILTERS
## converts a unix timestamp to a human-readable format
## OUTPUT: str="2023-11-02 12:33"
@app.template_filter('ctime')
def timectime(s:int):
return datetime.utcfromtimestamp(s).strftime('%Y-%m-%d %H:%M')
## converts a bytes to a human readable scale
## OUTPUT: str="XXX.XXYB"
@app.template_filter('spacer')
def convsize(s:int):
sizes=("B","KB","MB","GB","TB")
n=0
while s >= 1000:
n+=1
s=s/1000
return str("%.2f" % s)+sizes[n]
########## WEB FRONTEND
@app.route('/')
def homepage():
# try to get userdata, else logout state
logged_in,userdata=get_login_info(request.cookies.get('session-id'))
archives,count=db.get_n_archives()
return render_template("home.html", title="Homepage",userdata=userdata,login=logged_in,archives=archives)
@app.route('/user')
@app.route('/user/<int:page_userid>', methods=['GET','POST'])
def userpage(page_userid:int=0):
logged_in,userdata=get_login_info(request.cookies.get('session-id'))
if not logged_in:
return make_response(redirect('/'))
if page_userid == 0:
return make_response(redirect(f"/user/{userdata[0]}"))
res,page_userdata=db.get_user_info(page_userid)
if not res:
return errorpage(page_userdata) # page_userdata is error
# POST: Update display name or password
if request.method == 'POST':
match request.form['edit-type']:
case "password":
old_passhash=sha256(request.form['old-pass'].encode()).hexdigest()
if not old_passhash == db.get_passhash(userdata[1])[2]:
return errorpage("The old password does not match!")
new_password=request.form['new-pass']
conf_password=request.form['conf-pass']
if not new_password == conf_password:
return errorpage("The new passwords do not match!")
res,data=db.update_user_info(userdata[0],"PASSHASH",sha256(new_password.encode()).hexdigest().upper())
case "dname":
res,data=db.update_user_info(userdata[0],"DNAME",request.form['display-name'])
case _:
return make_response(redirect(f"/user/{page_userid}"))
if not res:
return errorpage("Something went wrong: " + data)
return make_response(redirect('/'))
# GET: return normal info page
return render_template("user.html", title="User Details",login=logged_in,userdata=page_userdata,login_userid=userdata[0],userid=page_userid)
@app.route('/user/<uname>')
def user_redirect(uname:str):
res,userdata=db.get_user_info(uname)
if not res:
return errorpage(userdata) # userdata is error
if not userdata:
return make_response(redirect(f"/user"))
return make_response(redirect(f"/user/{userdata[0]}"))
@app.route('/add', methods=['GET','POST'])
def addpage():
# try to get userdata, else yeet to the homepage
logged_in,userdata=get_login_info(request.cookies.get('session-id'))
if not logged_in:
return make_response(redirect('/'))
# POST: check and add archive, show confirmation/error message at the end
if request.method == 'POST':
postdict={}
# get and save all inputs, error if one doesn't exist or is wrong type
for i,itype in [("name",str),("hash",str),("category",int),("size",float),("size_multiplier",int)]:
try:
postdict[i]=itype(request.form[i])
if i == "size_multiplier":
postdict["size"]+=i
except Exception as e:
return errorpage("All fields need to be filled and don't play with their names!")
postdict["owner"]=userdata[0]
res,archid=db.add_archive(postdict)
if res:
return make_response(redirect(f"/view/{str(archid)}"))
else:
return errorpage(archid), 400
# GET: return normal page
htmlcatlist=get_category_selection(False)
return render_template("add.html", title="Add Archive",categories=htmlcatlist)
@app.route('/login', methods=["GET","POST"])
def loginpage():
# POST: Process login request
if request.method == 'POST':
username=request.form['username']
login_passhash=sha256(request.form['password'].encode()).hexdigest()
res,userid,db_passhash=db.get_passhash(username)
if not res:
return errorpage(userid)# userid is the error
# if passwords match, create session and return cookie
if login_passhash.upper() == db_passhash.upper():
lifetime=RAR_COOKIE_LIFETIME # lifetime of the sesskey in seconds
sesskey=str(uuid())
db.set_sesskey(sesskey,userid,lifetime)
resp=setcookie("session-id",sesskey,lifetime)
return resp
else:
return errorpage("You've entered the wrong password. This incident will be reported.")
# GET: Login form
else:
return render_template("login.html", title="Login")
@app.route('/logout')
def logout():
sesskey=request.cookies.get('session-id')
logged_in,userdata=get_login_info(sesskey)
if not logged_in:
return make_response(redirect('/login'))
db.logout_user(sesskey)
return make_response(redirect('/'))
@app.route('/view/<int:archid>')
def viewpage(archid:int):
logged_in,userdata=get_login_info(request.cookies.get('session-id'))
res,archive,category,labels=db.get_archive_info(archid)
if not res:
return errorpage(archive) # archive is error
return render_template("view.html", title="View Archive",userdata=userdata,login=logged_in,archive=archive,category=category,labels=labels)
@app.route('/delete/<int:archid>', methods=["GET","POST"])
def deletepage(archid:int):
logged_in,userdata=get_login_info(request.cookies.get('session-id'))
res,archive,category,labels=db.get_archive_info(archid)
if not res:
return errorpage(archive) # archive is error
if not logged_in or userdata[0] != archive[8]:
return make_response(redirect(f"/view/{archid}"))
# POST: check if input is correct and delete relevant data
if request.method == 'POST':
if not request.form['archname'] == archive[1]:
return errorpage("The name input doesn't match!")
db.delete_archive(archid)
return make_response(redirect('/'))
# GET: return normal deletion page
return render_template("delete.html", title="Delete Archive",userdata=userdata,login=logged_in,archive=archive)
@app.route('/labels/<int:archid>', methods=["GET","POST"])
def labeleditpage(archid:int):
logged_in,userdata=get_login_info(request.cookies.get('session-id'))
res,archive,category,labels=db.get_archive_info(archid)
if not res:
return errorpage(archive) # archive is error
label_dict=db.get_label_labeltypes(category[3])
if not logged_in or userdata[0] != archive[8]:
return make_response(redirect(f"/view/{archid}"))
# POST: parse everything and update labels
if request.method == 'POST':
on_labels=[]
for i in request.form:
on_labels.append(i)
res, data=db.update_labels(archid, on_labels)
if not res:
return errorpage(data)
return make_response(redirect(f"/view/{archid}"))
# GET: return normal labels page
labels_name_list=[]
for i in labels:
labels_name_list.append(i[1])
return render_template("labels.html", title="Edit Labels",userdata=userdata,login=logged_in,archive=archive,res_labels=label_dict,labels_names=labels_name_list)
@app.route('/search')
@app.route('/search/<int:page>')
def searchpage(page:int=1):
# try to get userdata, else logout state
logged_in,userdata=get_login_info(request.cookies.get('session-id'))
# try to set all required variables, else defaults
try:
sorttype=request.args['sort']
except Exception as e:
sorttype="time"
try:
category=int(request.args['category'])
catparentid=db.get_category_info(category)[2]
if catparentid:
label_dict=db.get_label_labeltypes(catparentid)
else:
label_dict=db.get_label_labeltypes(category)
except Exception as e:
category=0
label_dict={}
try:
keywords="".join(request.args['q']).split(" ")
except Exception as e:
keywords=[]
try:
count=int(request.args['count'])
except Exception as e:
count=20
labels=[]
try:
for i in request.args:
try:
int(i)
labels.append(i)
except Exception as e:
continue
except Exception as e:
labels=[]
archives,total_count=db.get_n_archives(sorttype,category,keywords,count,labels,page)
# if the requested page is larger than the maximum, return page 1
if page*count > total_count and (page-1)*count >= total_count:
page=1
archives,total_count=db.get_n_archives(sorttype,category,keywords,count,labels,page)
htmlcatlist=get_category_selection()
return render_template("search.html", title="Advanced Search",page=page,count=count,total_count=total_count,categories=htmlcatlist,userdata=userdata,login=logged_in,archives=archives,res_labels=label_dict,labels=labels)
########## FUNCTIONS
def errorpage(message):
return "<h2>ERROR: " + str(message) + "</h2>Go back and try again", 400
## Checks if given sesskey is valid and returns user data
## OUTPUT: (if sesskey valid) logged_in:bool=True, userdata:tuple
## (if sesskey invalid) logged_in:bool=False, userdata:tuple=()
def get_login_info(sesskey:str):
if not sesskey:
return False,()
logged_in,userid=db.check_sesskey(sesskey)
if logged_in:
res,userdata=db.get_user_info(userid)
if not res:
return errorpage(userdata) # userdata is error
else:
userdata=()
return logged_in,userdata
## returns a cookie and redirects to the homepage
def setcookie(name:str,value:str,lifetime:int=10000):
resp = make_response(redirect('/'))
resp.set_cookie(name, value, max_age=lifetime)
return resp
## Gets all categories and returns them (with or without parents)
## OUTPUT: […,(ID:int,NAME:str),…]
def get_category_selection(include_parents:bool=True):
catlist=db.get_categories()
htmlcatlist=[]
# parse all categories and sort them into list
for cat in catlist:
if not cat[2]:
if include_parents:
htmlcatlist.append((cat[0],cat[1]))
parentname=cat[1]
parentid=cat[0]
for i in catlist:
if i[2] == parentid:
htmlcatlist.append((i[0],f"{parentname}/{i[1]}"))
return htmlcatlist
## API CALLS (NO THANKS)
# main driver function
if __name__ == '__main__':
# run app if executed directly
app.run()