Many fixes and improvements

Fixes: #20,#22,#23,#24,#25,#26,#27,#28,#29,#30,#31,#32,#33,#34,#35,#36,#37,#38,#39,#40,#41,#42,#43,#46,#48
This commit is contained in:
Michael Rodin 2023-11-02 15:20:03 +01:00
parent d263701297
commit 78ed396c36
2 changed files with 98 additions and 94 deletions

View file

@ -13,12 +13,16 @@ db.startup()
app = Flask(__name__)
########## CUSTOM FILTERS
## converts a unix timestamp to a human-readable format
## OUTPUT: str="2023-11-02 12:33"
@app.template_filter('ctime')
def timectime(s):
def timectime(s:int):
return datetime.utcfromtimestamp(s).strftime('%Y-%m-%d %H:%M')
## converts a bytes to a human readable scale
## OUTPUT: str="XXX.XXYB"
@app.template_filter('spacer')
def convsize(s):
def convsize(s:int):
sizes=("B","KB","MB","GB","TB")
n=0
while s >= 1000:
@ -31,19 +35,21 @@ def convsize(s):
@app.route('/')
def homepage():
# try to get userdata, else logout state
logged_in,userdata=get_login_info(request.cookies.get('session'))
logged_in,userdata=get_login_info(request.cookies.get('session-id'))
archives=db.get_n_archives()
return render_template("home.html", title="Homepage",userdata=userdata,login=logged_in,archives=archives)
@app.route('/user')
@app.route('/user/<int:page_userid>', methods=['GET','POST'])
def userpage(page_userid:int=0):
logged_in,userdata=get_login_info(request.cookies.get('session'))
logged_in,userdata=get_login_info(request.cookies.get('session-id'))
if not logged_in:
return make_response(redirect('/'))
if page_userid == 0:
return make_response(redirect(f"/user/{userdata[0]}"))
page_userdata=db.get_user_info(page_userid)
res,page_userdata=db.get_user_info(page_userid)
if not res:
return errorpage(page_userdata) # page_userdata is error
# POST: Update display name or password
if request.method == 'POST':
@ -66,11 +72,13 @@ def userpage(page_userid:int=0):
return make_response(redirect('/'))
# GET: return normal info page
return render_template("user.html", title="User Details",userdata=page_userdata,login_userid=userdata[0],userid=page_userid)
return render_template("user.html", title="User Details",login=logged_in,userdata=page_userdata,login_userid=userdata[0],userid=page_userid)
@app.route('/user/<uname>')
def user_redirect(uname:str):
userdata=db.get_user_info_from_uname(uname)
res,userdata=db.get_user_info(uname)
if not res:
return errorpage(userdata) # userdata is error
if not userdata:
return make_response(redirect(f"/user"))
return make_response(redirect(f"/user/{userdata[0]}"))
@ -78,23 +86,21 @@ def user_redirect(uname:str):
@app.route('/add', methods=['GET','POST'])
def addpage():
# try to get userdata, else yeet to the homepage
logged_in,userdata=get_login_info(request.cookies.get('session'))
logged_in,userdata=get_login_info(request.cookies.get('session-id'))
if not logged_in:
return make_response(redirect('/'))
# POST: check and add archive, show confirmation/error message at the end
if request.method == 'POST':
postdict={}
# get and save all inputs, error if one doesn't exist or is wrong type
for i,itype in [("name",str),("hash",str),("category",int),("size",float)]:
for i,itype in [("name",str),("hash",str),("category",int),("size",float),("size_multiplier",int)]:
try:
postdict[i]=itype(request.form[i])
if i == "size_multiplier":
postdict["size"]+=i
except Exception as e:
return errorpage("All fields need to be filled and don't play with their names!")
try:
postdict["size"]=postdict["size"]*int(request.form['size_multiplier'])
except Exception as e:
return errorpage("All fields need to be filled and don't play with their names!")
postdict["owner"]=userdata[0]
res,archid=db.add_archive(postdict)
if res:
@ -111,16 +117,16 @@ def loginpage():
# POST: Process login request
if request.method == 'POST':
username=request.form['username']
password=sha256(request.form['password'].encode()).hexdigest()
code,userid,passhash=db.get_passhash(username)
if code != 200:
return passhash
login_passhash=sha256(request.form['password'].encode()).hexdigest()
res,userid,db_passhash=db.get_passhash(username)
if not res:
return errorpage(userid)# userid is the error
# if passwords match, create session and return cookie
if password.upper() == passhash.upper():
if login_passhash.upper() == db_passhash.upper():
lifetime=RAR_COOKIE_LIFETIME # lifetime of the sesskey in seconds
sesskey=str(uuid())
db.set_sesskey(sesskey,userid,lifetime)
resp=setcookie("session",sesskey,lifetime)
resp=setcookie("session-id",sesskey,lifetime)
return resp
else:
return errorpage("You've entered the wrong password. This incident will be reported.")
@ -130,7 +136,7 @@ def loginpage():
@app.route('/logout')
def logout():
sesskey=request.cookies.get('session')
sesskey=request.cookies.get('session-id')
logged_in,userdata=get_login_info(sesskey)
if not logged_in:
return make_response(redirect('/login'))
@ -139,16 +145,21 @@ def logout():
@app.route('/view/<int:archid>')
def viewpage(archid:int):
logged_in,userdata=get_login_info(request.cookies.get('session'))
archive,category,labels=db.get_archive_info(archid)
logged_in,userdata=get_login_info(request.cookies.get('session-id'))
res,archive,category,labels=db.get_archive_info(archid)
if not res:
return errorpage(archive) # archive is error
return render_template("view.html", title="View Archive",userdata=userdata,login=logged_in,archive=archive,category=category,labels=labels)
@app.route('/delete/<int:archid>', methods=["GET","POST"])
def deletepage(archid:int):
logged_in,userdata=get_login_info(request.cookies.get('session'))
archive,category,labels=db.get_archive_info(archid)
logged_in,userdata=get_login_info(request.cookies.get('session-id'))
res,archive,category,labels=db.get_archive_info(archid)
if not res:
return errorpage(archive) # archive is error
if not logged_in or userdata[0] != archive[8]:
return make_response(redirect(f"/view/{archid}"))
# POST: check if input is correct and delete relevant data
if request.method == 'POST':
if not request.form['archname'] == archive[1]:
return errorpage("The name input doesn't match!")
@ -159,8 +170,10 @@ def deletepage(archid:int):
@app.route('/labels/<int:archid>', methods=["GET","POST"])
def labeleditpage(archid:int):
logged_in,userdata=get_login_info(request.cookies.get('session'))
archive,category,labels=db.get_archive_info(archid)
logged_in,userdata=get_login_info(request.cookies.get('session-id'))
res,archive,category,labels=db.get_archive_info(archid)
if not res:
return errorpage(archive) # archive is error
label_dict=db.get_label_labeltypes(category[3])
if not logged_in or userdata[0] != archive[8]:
return make_response(redirect(f"/view/{archid}"))
@ -168,7 +181,6 @@ def labeleditpage(archid:int):
# POST: parse everything and update labels
if request.method == 'POST':
on_labels=[]
print(request.form)
for i in request.form:
on_labels.append(i)
res, data=db.update_labels(archid, on_labels)
@ -185,7 +197,7 @@ def labeleditpage(archid:int):
@app.route('/search')
def searchpage():
# try to get userdata, else logout state
logged_in,userdata=get_login_info(request.cookies.get('session'))
logged_in,userdata=get_login_info(request.cookies.get('session-id'))
# try to set all required variables, else defaults
try:
@ -199,8 +211,7 @@ def searchpage():
category=0
label_dict={}
try:
keywords=request.args['q']
keywords="".join(keywords).split(" ")
keywords="".join(request.args['q']).split(" ")
except Exception as e:
keywords=[]
try:
@ -226,7 +237,7 @@ def searchpage():
########## FUNCTIONS
def errorpage(message):
return "<h2>ERROR: " + str(message) + "</h2>Go back and try again"
return "<h2>ERROR: " + str(message) + "</h2>Go back and try again", 400
## Checks if given sesskey is valid and returns user data
## OUTPUT: (if sesskey valid) logged_in:bool=True, userdata:tuple
@ -236,7 +247,9 @@ def get_login_info(sesskey:str):
return False,()
logged_in,userid=db.check_sesskey(sesskey)
if logged_in:
userdata=db.get_user_info(userid)
res,userdata=db.get_user_info(userid)
if not res:
return errorpage(userdata) # userdata is error
else:
userdata=()
return logged_in,userdata
@ -256,11 +269,11 @@ def get_category_selection(include_parents:bool=True):
if not cat[2]:
if include_parents:
htmlcatlist.append((cat[0],cat[1]))
parent=cat[1]
parentname=cat[1]
parentid=cat[0]
for i in catlist:
if i[2] == parentid:
htmlcatlist.append((i[0],f"{parent}/{i[1]}"))
htmlcatlist.append((i[0],f"{parentname}/{i[1]}"))
return htmlcatlist
## API CALLS (NO THANKS)

View file

@ -4,7 +4,7 @@ from os import environ
import time,re
from config import *
## params populated with environment variables, defaults can be changed for a permanent solution
## params populated with environment variables, get data from config if environment variables not set
conn_params={
"user" : environ.get('MARIADB_USER') if environ.get('MARIADB_USER') else MARIADB_USER,
"password" : environ.get('MARIADB_PASSWORD') if environ.get('MARIADB_PASSWORD') else MARIADB_PASSWORD,
@ -28,93 +28,91 @@ class db:
SIZE bigint NOT NULL,
IMPORTED int,
CATEGORY int,
OWNER int
);""")
OWNER int);""")
self.cur.execute("""CREATE TABLE IF NOT EXISTS Users(
ID int PRIMARY KEY AUTO_INCREMENT,
UNAME text NOT NULL UNIQUE,
DNAME text NOT NULL,
CREATED int NOT NULL,
STATE text,
PASSHASH text NOT NULL
);""")
PASSHASH text NOT NULL);""")
self.cur.execute("""CREATE TABLE IF NOT EXISTS Sessions(
ID int PRIMARY KEY AUTO_INCREMENT,
SESSKEY text NOT NULL UNIQUE,
USERID int NOT NULL,
CREATED int NOT NULL,
LIFE int
);""")
LIFE int);""")
self.cur.execute("""CREATE TABLE IF NOT EXISTS Cats(
ID int PRIMARY KEY AUTO_INCREMENT,
CATEGORY text NOT NULL,
PARENT int,
DESCRIPTION text
);""")
DESCRIPTION text);""")
self.cur.execute("""CREATE TABLE IF NOT EXISTS ArchLab(
ID int PRIMARY KEY AUTO_INCREMENT,
ARCHID int NOT NULL,
LABID int NOT NULL
);""")
LABID int NOT NULL);""")
self.cur.execute("""CREATE TABLE IF NOT EXISTS CatLabType(
ID int PRIMARY KEY AUTO_INCREMENT,
CATID int NOT NULL,
LABID int NOT NULL
);""")
LABID int NOT NULL);""")
self.cur.execute("""CREATE TABLE IF NOT EXISTS Labs(
ID int PRIMARY KEY AUTO_INCREMENT,
LABEL text NOT NULL,
TYPE int NOT NULL
);""")
TYPE int NOT NULL);""")
self.cur.execute("""CREATE TABLE IF NOT EXISTS LabType(
ID int PRIMARY KEY AUTO_INCREMENT,
NAME text NOT NULL,
DESCRIPTION text
);""")
DESCRIPTION text);""")
## Gets the passhash from a specific user
## OUTPUT: (If user exists) int=200, ID:int, passhash:str
## (If user does not exist) int=400, Exception:str
## OUTPUT: (If user exists) bool=True, ID:int, passhash:str
## (If user does not exist) bool=False, Exception:str
def get_passhash(self, username:str):
self.cur.execute(f"SELECT ID,PASSHASH FROM Users WHERE UNAME='{username}'")
try:
resp=self.cur.fetchone()
except Exception as e:
return 400, e, NULL
return 200, resp[0], resp[1]
resp=self.cur.fetchone()
if not resp:
return False, "The user does not exist!", None
return True, resp[0], resp[1]
## Checks if sesskey exists and is not expired
## OUTPUT: (if valiid) bool=True, USERID:str
## (in invalid) bool=False, str=""
def check_sesskey(self, sesskey:str):
self.cur.execute(f"SELECT SESSKEY,USERID FROM Sessions WHERE SESSKEY='{sesskey}'")
self.cur.execute(f"SELECT SESSKEY,USERID,CREATED,LIFE FROM Sessions WHERE SESSKEY='{sesskey}'")
entry=self.cur.fetchone()
if entry and sesskey in entry:
if entry and sesskey in entry and time.time() < entry[2]+entry[3]:
return True, entry[1]
else:
return False, ""
return False, "The session key is expired or does not exist!"
## Sets a session key. That's it.
def set_sesskey(self, sesskey:str, userid:int, lifetime:int):
self.cur.execute(f"INSERT INTO Sessions(SESSKEY,USERID,CREATED,LIFE) VALUES('{sesskey}',{userid},{time.time()},{lifetime})")
## Removes the session key from the database (doesn't delete the cookie)
def logout_user(self, sesskey:str):
self.cur.execute(f"DELETE FROM Sessions WHERE SESSKEY='{sesskey}'")
## Gets and returns all user info about one (1) user
## OUTPUT: tuple=(ID:int,UNAME:str,DNAME:str,CREATED:int,STATE:text,PASSHASH:text)
def get_user_info(self, userid:int):
self.cur.execute(f"SELECT * FROM Users WHERE ID='{userid}'")
return self.cur.fetchone()
## OUTPUT: (if successful) bool=True, tuple=(ID:int,UNAME:str,DNAME:str,CREATED:int,STATE:text,PASSHASH:text)
## (if unsuccessful) bool=False, error:str
def get_user_info(self, identifier):
match identifier:
case int():
query=f"ID={identifier}"
case str():
query=f"UNAME='{identifier}'"
case _:
return False, "Wrong identifier type!"
## like above, just with uname
## OUTPUT: tuple=(ID:int,UNAME:str,DNAME:str,CREATED:int,STATE:text,PASSHASH:text)
def get_user_info_from_uname(self, uname:str):
self.cur.execute(f"SELECT * FROM Users WHERE UNAME='{uname}'")
return self.cur.fetchone()
self.cur.execute(f"SELECT * FROM Users WHERE {query}")
return True, self.cur.fetchone()
def update_user_info(self, userid, update_type:str,value):
allowed_types={"DNAME":str,"PASSHASH":str}
## updates a value of a user in the database (if allowed in the dictionary)
## OUTPUT: (if successful) bool=True, str="Updated"
## (if unsuccessful) bool=False, str="Not allowed"
def update_user_info(self, userid:int, update_type:str,value):
allowed_types={"DNAME":str,"PASSHASH":str} # only allow to edit these columns!
if update_type.upper() not in allowed_types:
return False, "Not allowed"
self.cur.execute(f"""UPDATE Users SET {update_type}={value if allowed_types[update_type]==int else f"'{value}'"} WHERE ID={userid}""")
@ -141,13 +139,16 @@ class db:
archid=self.cur.fetchone()
return True,archid[0]
## Deletes all relevant entries from Archs and ArchLab
def delete_archive(self, archid:int):
self.cur.execute(f"""DELETE FROM Archs WHERE ID={archid}""")
self.cur.execute(f"""DELETE FROM ArchLab WHERE ARCHID={archid}""")
## Returns all relevant information about one (1) archive
## OUTPUT: archive:tuple=(ID:int,NAME:str,HASH:str,SIZE:int,IMPORTED[UNIX]:int,CATEGORY.ID:int,CATEGORY,str,CATEGORY.DESCRIPTION:str,USER.ID:int,DNAME:str),
## category:tuple=(ID:int,CATEGORY:str,DESCRIPTION:str,PID:int,PCAT:str,PDESC:str)
## OUTPUT: (if archive doesn't exist) bool=False, error:str
## (if archive exists) bool=True,
## archive:tuple=(ID:int,NAME:str,HASH:str,SIZE:int,IMPORTED[UNIX]:int,CATEGORY.ID:int,CATEGORY,str,CATEGORY.DESCRIPTION:str,USER.ID:int,DNAME:str),
## category:tuple=(ID:int,CATEGORY:str,DESCRIPTION:str,PID:int,PCAT:str,PDESC:str),
## labels:list=[…,(ID:int,LABEL:str,LABTYPE:int,LABDESC:str),…]
def get_archive_info(self, archid:int):
# get info about archive itself
@ -156,6 +157,8 @@ class db:
JOIN Users ON Users.ID=Archs.OWNER
WHERE Archs.ID='{archid}'""")
archive=self.cur.fetchone()
if not archive:
return False, "The archive does to exist!", None, None
# get info about category and it's parent
self.cur.execute(f"""SELECT c.ID,c.CATEGORY,c.DESCRIPTION,p.ID AS PID,p.CATEGORY as PCAT,p.DESCRIPTION AS PDESC FROM Cats c, Cats p
WHERE c.ID={archive[5]} AND c.PARENT=p.ID""")
@ -163,8 +166,10 @@ class db:
# get info about labels of archive
labels=self.get_label_info(archid)
return archive, category, labels
return True, archive, category, labels
## Gets all labels and their parents of an archive
## OUTPUT: list=[…,(ID:int,LABEL:str,LABTYPE:int,LABDESC:str),…]
def get_label_info(self, archid:int):
self.cur.execute(f"""SELECT Labs.ID,Labs.LABEL,LabType.ID AS LABTYPE,LabType.DESCRIPTION AS LABDESC FROM ArchLab
JOIN Archs ON Archs.ID=ArchLab.ARCHID
@ -193,11 +198,11 @@ class db:
for w,e in labtypes_list:
labtypes_ids.append(str(w))
labtypes_names.append(e)
ltid_string="(" + ",".join(labtypes_ids) + ")"
ltids_string="(" + ",".join(labtypes_ids) + ")"
# gets all relevant labs: […,(ID:int,NAME:str,LTNAME:str),…]
self.cur.execute(f"""SELECT Labs.ID,Labs.LABEL,LabType.NAME AS LTNAME FROM Labs
JOIN LabType ON Labs.TYPE=LabType.ID
WHERE LabType.ID IN {ltid_string}
WHERE LabType.ID IN {ltids_string}
ORDER BY Labs.LABEL ASC""")
labs_list=self.cur.fetchall()
res_dict={}
@ -211,7 +216,7 @@ class db:
## get a list of enabled labels and update the DB to reflect that state
## OUTPUT: (if on_labels empty) bool=False, str
## (else)
## (else) bool=True, str=""
def update_labels(self, archid:int, on_labels:list):
# fail if no labels passed
if len(on_labels) == 0:
@ -232,13 +237,14 @@ class db:
# remove all labels which are not on
self.cur.execute(f"""DELETE FROM ArchLab WHERE ARCHID={archid} AND LABID NOT IN ({",".join(on_labels)})""")
to_add_list=[]
# creates all entries as strings and puts them into a list
for i in to_add:
to_add_list.append("(" + str(archid) + "," + str(i) + ")")
# add all new labels
self.cur.execute(f"""INSERT INTO ArchLab(ARCHID,LABID) VALUES{",".join(to_add_list)}""")
return True, ""
## Returns n archives, sorted by (imported )time or size
## Returns n archives, sorted by a column
## OUTPUT: archives:array=[…,(ID:int,NAME:str,SIZE:str,IMPORTED[UNIX]:int),…]
def get_n_archives(self, sorttype:str="time",category:int=0, keywords:list=[], count:int=20,labels:list=[]): # TODO: CLEANN!!!!!
match sorttype:
@ -271,21 +277,6 @@ class db:
{keyword_string}
ORDER BY {sorttype} LIMIT {count if count else 20}""")
archives=self.cur.fetchall()
## WARNING: JANK
#positive_archives=[]
#print("LABELS:", labels)
#if len(labels) >= 1:
# for arch in archives:
# archid=arch[0]
# archive_labels=self.get_label_info(archid)
# success=True
# for label in archive_labels:
# if not label[0] in labels:
# success=False
# if success:
# positive_archives.append(arch)
# if len(positive_archives) >= count:
# break
return archives