diff --git a/flask/app.py b/flask/app.py index 445d364..763326f 100644 --- a/flask/app.py +++ b/flask/app.py @@ -13,12 +13,16 @@ db.startup() app = Flask(__name__) ########## CUSTOM FILTERS +## converts a unix timestamp to a human-readable format +## OUTPUT: str="2023-11-02 12:33" @app.template_filter('ctime') -def timectime(s): +def timectime(s:int): return datetime.utcfromtimestamp(s).strftime('%Y-%m-%d %H:%M') +## converts a bytes to a human readable scale +## OUTPUT: str="XXX.XXYB" @app.template_filter('spacer') -def convsize(s): +def convsize(s:int): sizes=("B","KB","MB","GB","TB") n=0 while s >= 1000: @@ -31,19 +35,21 @@ def convsize(s): @app.route('/') def homepage(): # try to get userdata, else logout state - logged_in,userdata=get_login_info(request.cookies.get('session')) + logged_in,userdata=get_login_info(request.cookies.get('session-id')) archives=db.get_n_archives() return render_template("home.html", title="Homepage",userdata=userdata,login=logged_in,archives=archives) @app.route('/user') @app.route('/user/', methods=['GET','POST']) def userpage(page_userid:int=0): - logged_in,userdata=get_login_info(request.cookies.get('session')) + logged_in,userdata=get_login_info(request.cookies.get('session-id')) if not logged_in: return make_response(redirect('/')) if page_userid == 0: return make_response(redirect(f"/user/{userdata[0]}")) - page_userdata=db.get_user_info(page_userid) + res,page_userdata=db.get_user_info(page_userid) + if not res: + return errorpage(page_userdata) # page_userdata is error # POST: Update display name or password if request.method == 'POST': @@ -66,11 +72,13 @@ def userpage(page_userid:int=0): return make_response(redirect('/')) # GET: return normal info page - return render_template("user.html", title="User Details",userdata=page_userdata,login_userid=userdata[0],userid=page_userid) + return render_template("user.html", title="User Details",login=logged_in,userdata=page_userdata,login_userid=userdata[0],userid=page_userid) @app.route('/user/') def user_redirect(uname:str): - userdata=db.get_user_info_from_uname(uname) + res,userdata=db.get_user_info(uname) + if not res: + return errorpage(userdata) # userdata is error if not userdata: return make_response(redirect(f"/user")) return make_response(redirect(f"/user/{userdata[0]}")) @@ -78,23 +86,21 @@ def user_redirect(uname:str): @app.route('/add', methods=['GET','POST']) def addpage(): # try to get userdata, else yeet to the homepage - logged_in,userdata=get_login_info(request.cookies.get('session')) + logged_in,userdata=get_login_info(request.cookies.get('session-id')) if not logged_in: return make_response(redirect('/')) # POST: check and add archive, show confirmation/error message at the end if request.method == 'POST': postdict={} # get and save all inputs, error if one doesn't exist or is wrong type - for i,itype in [("name",str),("hash",str),("category",int),("size",float)]: + for i,itype in [("name",str),("hash",str),("category",int),("size",float),("size_multiplier",int)]: try: postdict[i]=itype(request.form[i]) + if i == "size_multiplier": + postdict["size"]+=i except Exception as e: return errorpage("All fields need to be filled and don't play with their names!") - try: - postdict["size"]=postdict["size"]*int(request.form['size_multiplier']) - except Exception as e: - return errorpage("All fields need to be filled and don't play with their names!") postdict["owner"]=userdata[0] res,archid=db.add_archive(postdict) if res: @@ -111,16 +117,16 @@ def loginpage(): # POST: Process login request if request.method == 'POST': username=request.form['username'] - password=sha256(request.form['password'].encode()).hexdigest() - code,userid,passhash=db.get_passhash(username) - if code != 200: - return passhash + login_passhash=sha256(request.form['password'].encode()).hexdigest() + res,userid,db_passhash=db.get_passhash(username) + if not res: + return errorpage(userid)# userid is the error # if passwords match, create session and return cookie - if password.upper() == passhash.upper(): + if login_passhash.upper() == db_passhash.upper(): lifetime=RAR_COOKIE_LIFETIME # lifetime of the sesskey in seconds sesskey=str(uuid()) db.set_sesskey(sesskey,userid,lifetime) - resp=setcookie("session",sesskey,lifetime) + resp=setcookie("session-id",sesskey,lifetime) return resp else: return errorpage("You've entered the wrong password. This incident will be reported.") @@ -130,7 +136,7 @@ def loginpage(): @app.route('/logout') def logout(): - sesskey=request.cookies.get('session') + sesskey=request.cookies.get('session-id') logged_in,userdata=get_login_info(sesskey) if not logged_in: return make_response(redirect('/login')) @@ -139,16 +145,21 @@ def logout(): @app.route('/view/') def viewpage(archid:int): - logged_in,userdata=get_login_info(request.cookies.get('session')) - archive,category,labels=db.get_archive_info(archid) + logged_in,userdata=get_login_info(request.cookies.get('session-id')) + res,archive,category,labels=db.get_archive_info(archid) + if not res: + return errorpage(archive) # archive is error return render_template("view.html", title="View Archive",userdata=userdata,login=logged_in,archive=archive,category=category,labels=labels) @app.route('/delete/', methods=["GET","POST"]) def deletepage(archid:int): - logged_in,userdata=get_login_info(request.cookies.get('session')) - archive,category,labels=db.get_archive_info(archid) + logged_in,userdata=get_login_info(request.cookies.get('session-id')) + res,archive,category,labels=db.get_archive_info(archid) + if not res: + return errorpage(archive) # archive is error if not logged_in or userdata[0] != archive[8]: return make_response(redirect(f"/view/{archid}")) + # POST: check if input is correct and delete relevant data if request.method == 'POST': if not request.form['archname'] == archive[1]: return errorpage("The name input doesn't match!") @@ -159,8 +170,10 @@ def deletepage(archid:int): @app.route('/labels/', methods=["GET","POST"]) def labeleditpage(archid:int): - logged_in,userdata=get_login_info(request.cookies.get('session')) - archive,category,labels=db.get_archive_info(archid) + logged_in,userdata=get_login_info(request.cookies.get('session-id')) + res,archive,category,labels=db.get_archive_info(archid) + if not res: + return errorpage(archive) # archive is error label_dict=db.get_label_labeltypes(category[3]) if not logged_in or userdata[0] != archive[8]: return make_response(redirect(f"/view/{archid}")) @@ -168,7 +181,6 @@ def labeleditpage(archid:int): # POST: parse everything and update labels if request.method == 'POST': on_labels=[] - print(request.form) for i in request.form: on_labels.append(i) res, data=db.update_labels(archid, on_labels) @@ -185,7 +197,7 @@ def labeleditpage(archid:int): @app.route('/search') def searchpage(): # try to get userdata, else logout state - logged_in,userdata=get_login_info(request.cookies.get('session')) + logged_in,userdata=get_login_info(request.cookies.get('session-id')) # try to set all required variables, else defaults try: @@ -199,8 +211,7 @@ def searchpage(): category=0 label_dict={} try: - keywords=request.args['q'] - keywords="".join(keywords).split(" ") + keywords="".join(request.args['q']).split(" ") except Exception as e: keywords=[] try: @@ -226,7 +237,7 @@ def searchpage(): ########## FUNCTIONS def errorpage(message): - return "

ERROR: " + str(message) + "

Go back and try again" + return "

ERROR: " + str(message) + "

Go back and try again", 400 ## Checks if given sesskey is valid and returns user data ## OUTPUT: (if sesskey valid) logged_in:bool=True, userdata:tuple @@ -236,7 +247,9 @@ def get_login_info(sesskey:str): return False,() logged_in,userid=db.check_sesskey(sesskey) if logged_in: - userdata=db.get_user_info(userid) + res,userdata=db.get_user_info(userid) + if not res: + return errorpage(userdata) # userdata is error else: userdata=() return logged_in,userdata @@ -256,11 +269,11 @@ def get_category_selection(include_parents:bool=True): if not cat[2]: if include_parents: htmlcatlist.append((cat[0],cat[1])) - parent=cat[1] + parentname=cat[1] parentid=cat[0] for i in catlist: if i[2] == parentid: - htmlcatlist.append((i[0],f"{parent}/{i[1]}")) + htmlcatlist.append((i[0],f"{parentname}/{i[1]}")) return htmlcatlist ## API CALLS (NO THANKS) diff --git a/flask/func.py b/flask/func.py index 2dcca7d..0d36892 100644 --- a/flask/func.py +++ b/flask/func.py @@ -4,7 +4,7 @@ from os import environ import time,re from config import * -## params populated with environment variables, defaults can be changed for a permanent solution +## params populated with environment variables, get data from config if environment variables not set conn_params={ "user" : environ.get('MARIADB_USER') if environ.get('MARIADB_USER') else MARIADB_USER, "password" : environ.get('MARIADB_PASSWORD') if environ.get('MARIADB_PASSWORD') else MARIADB_PASSWORD, @@ -28,93 +28,91 @@ class db: SIZE bigint NOT NULL, IMPORTED int, CATEGORY int, - OWNER int - );""") + OWNER int);""") self.cur.execute("""CREATE TABLE IF NOT EXISTS Users( ID int PRIMARY KEY AUTO_INCREMENT, UNAME text NOT NULL UNIQUE, DNAME text NOT NULL, CREATED int NOT NULL, STATE text, - PASSHASH text NOT NULL - );""") + PASSHASH text NOT NULL);""") self.cur.execute("""CREATE TABLE IF NOT EXISTS Sessions( ID int PRIMARY KEY AUTO_INCREMENT, SESSKEY text NOT NULL UNIQUE, USERID int NOT NULL, CREATED int NOT NULL, - LIFE int - );""") + LIFE int);""") self.cur.execute("""CREATE TABLE IF NOT EXISTS Cats( ID int PRIMARY KEY AUTO_INCREMENT, CATEGORY text NOT NULL, PARENT int, - DESCRIPTION text - );""") + DESCRIPTION text);""") self.cur.execute("""CREATE TABLE IF NOT EXISTS ArchLab( ID int PRIMARY KEY AUTO_INCREMENT, ARCHID int NOT NULL, - LABID int NOT NULL - );""") + LABID int NOT NULL);""") self.cur.execute("""CREATE TABLE IF NOT EXISTS CatLabType( ID int PRIMARY KEY AUTO_INCREMENT, CATID int NOT NULL, - LABID int NOT NULL - );""") + LABID int NOT NULL);""") self.cur.execute("""CREATE TABLE IF NOT EXISTS Labs( ID int PRIMARY KEY AUTO_INCREMENT, LABEL text NOT NULL, - TYPE int NOT NULL - );""") + TYPE int NOT NULL);""") self.cur.execute("""CREATE TABLE IF NOT EXISTS LabType( ID int PRIMARY KEY AUTO_INCREMENT, NAME text NOT NULL, - DESCRIPTION text - );""") + DESCRIPTION text);""") ## Gets the passhash from a specific user - ## OUTPUT: (If user exists) int=200, ID:int, passhash:str - ## (If user does not exist) int=400, Exception:str + ## OUTPUT: (If user exists) bool=True, ID:int, passhash:str + ## (If user does not exist) bool=False, Exception:str def get_passhash(self, username:str): self.cur.execute(f"SELECT ID,PASSHASH FROM Users WHERE UNAME='{username}'") - try: - resp=self.cur.fetchone() - except Exception as e: - return 400, e, NULL - return 200, resp[0], resp[1] + resp=self.cur.fetchone() + if not resp: + return False, "The user does not exist!", None + return True, resp[0], resp[1] ## Checks if sesskey exists and is not expired ## OUTPUT: (if valiid) bool=True, USERID:str ## (in invalid) bool=False, str="" def check_sesskey(self, sesskey:str): - self.cur.execute(f"SELECT SESSKEY,USERID FROM Sessions WHERE SESSKEY='{sesskey}'") + self.cur.execute(f"SELECT SESSKEY,USERID,CREATED,LIFE FROM Sessions WHERE SESSKEY='{sesskey}'") entry=self.cur.fetchone() - if entry and sesskey in entry: + if entry and sesskey in entry and time.time() < entry[2]+entry[3]: return True, entry[1] else: - return False, "" + return False, "The session key is expired or does not exist!" ## Sets a session key. That's it. def set_sesskey(self, sesskey:str, userid:int, lifetime:int): self.cur.execute(f"INSERT INTO Sessions(SESSKEY,USERID,CREATED,LIFE) VALUES('{sesskey}',{userid},{time.time()},{lifetime})") + ## Removes the session key from the database (doesn't delete the cookie) def logout_user(self, sesskey:str): self.cur.execute(f"DELETE FROM Sessions WHERE SESSKEY='{sesskey}'") ## Gets and returns all user info about one (1) user - ## OUTPUT: tuple=(ID:int,UNAME:str,DNAME:str,CREATED:int,STATE:text,PASSHASH:text) - def get_user_info(self, userid:int): - self.cur.execute(f"SELECT * FROM Users WHERE ID='{userid}'") - return self.cur.fetchone() + ## OUTPUT: (if successful) bool=True, tuple=(ID:int,UNAME:str,DNAME:str,CREATED:int,STATE:text,PASSHASH:text) + ## (if unsuccessful) bool=False, error:str + def get_user_info(self, identifier): + match identifier: + case int(): + query=f"ID={identifier}" + case str(): + query=f"UNAME='{identifier}'" + case _: + return False, "Wrong identifier type!" - ## like above, just with uname - ## OUTPUT: tuple=(ID:int,UNAME:str,DNAME:str,CREATED:int,STATE:text,PASSHASH:text) - def get_user_info_from_uname(self, uname:str): - self.cur.execute(f"SELECT * FROM Users WHERE UNAME='{uname}'") - return self.cur.fetchone() + self.cur.execute(f"SELECT * FROM Users WHERE {query}") + return True, self.cur.fetchone() - def update_user_info(self, userid, update_type:str,value): - allowed_types={"DNAME":str,"PASSHASH":str} + ## updates a value of a user in the database (if allowed in the dictionary) + ## OUTPUT: (if successful) bool=True, str="Updated" + ## (if unsuccessful) bool=False, str="Not allowed" + def update_user_info(self, userid:int, update_type:str,value): + allowed_types={"DNAME":str,"PASSHASH":str} # only allow to edit these columns! if update_type.upper() not in allowed_types: return False, "Not allowed" self.cur.execute(f"""UPDATE Users SET {update_type}={value if allowed_types[update_type]==int else f"'{value}'"} WHERE ID={userid}""") @@ -141,13 +139,16 @@ class db: archid=self.cur.fetchone() return True,archid[0] + ## Deletes all relevant entries from Archs and ArchLab def delete_archive(self, archid:int): self.cur.execute(f"""DELETE FROM Archs WHERE ID={archid}""") self.cur.execute(f"""DELETE FROM ArchLab WHERE ARCHID={archid}""") ## Returns all relevant information about one (1) archive - ## OUTPUT: archive:tuple=(ID:int,NAME:str,HASH:str,SIZE:int,IMPORTED[UNIX]:int,CATEGORY.ID:int,CATEGORY,str,CATEGORY.DESCRIPTION:str,USER.ID:int,DNAME:str), - ## category:tuple=(ID:int,CATEGORY:str,DESCRIPTION:str,PID:int,PCAT:str,PDESC:str) + ## OUTPUT: (if archive doesn't exist) bool=False, error:str + ## (if archive exists) bool=True, + ## archive:tuple=(ID:int,NAME:str,HASH:str,SIZE:int,IMPORTED[UNIX]:int,CATEGORY.ID:int,CATEGORY,str,CATEGORY.DESCRIPTION:str,USER.ID:int,DNAME:str), + ## category:tuple=(ID:int,CATEGORY:str,DESCRIPTION:str,PID:int,PCAT:str,PDESC:str), ## labels:list=[…,(ID:int,LABEL:str,LABTYPE:int,LABDESC:str),…] def get_archive_info(self, archid:int): # get info about archive itself @@ -156,6 +157,8 @@ class db: JOIN Users ON Users.ID=Archs.OWNER WHERE Archs.ID='{archid}'""") archive=self.cur.fetchone() + if not archive: + return False, "The archive does to exist!", None, None # get info about category and it's parent self.cur.execute(f"""SELECT c.ID,c.CATEGORY,c.DESCRIPTION,p.ID AS PID,p.CATEGORY as PCAT,p.DESCRIPTION AS PDESC FROM Cats c, Cats p WHERE c.ID={archive[5]} AND c.PARENT=p.ID""") @@ -163,8 +166,10 @@ class db: # get info about labels of archive labels=self.get_label_info(archid) - return archive, category, labels + return True, archive, category, labels + ## Gets all labels and their parents of an archive + ## OUTPUT: list=[…,(ID:int,LABEL:str,LABTYPE:int,LABDESC:str),…] def get_label_info(self, archid:int): self.cur.execute(f"""SELECT Labs.ID,Labs.LABEL,LabType.ID AS LABTYPE,LabType.DESCRIPTION AS LABDESC FROM ArchLab JOIN Archs ON Archs.ID=ArchLab.ARCHID @@ -193,11 +198,11 @@ class db: for w,e in labtypes_list: labtypes_ids.append(str(w)) labtypes_names.append(e) - ltid_string="(" + ",".join(labtypes_ids) + ")" + ltids_string="(" + ",".join(labtypes_ids) + ")" # gets all relevant labs: […,(ID:int,NAME:str,LTNAME:str),…] self.cur.execute(f"""SELECT Labs.ID,Labs.LABEL,LabType.NAME AS LTNAME FROM Labs JOIN LabType ON Labs.TYPE=LabType.ID - WHERE LabType.ID IN {ltid_string} + WHERE LabType.ID IN {ltids_string} ORDER BY Labs.LABEL ASC""") labs_list=self.cur.fetchall() res_dict={} @@ -211,7 +216,7 @@ class db: ## get a list of enabled labels and update the DB to reflect that state ## OUTPUT: (if on_labels empty) bool=False, str - ## (else) + ## (else) bool=True, str="" def update_labels(self, archid:int, on_labels:list): # fail if no labels passed if len(on_labels) == 0: @@ -232,13 +237,14 @@ class db: # remove all labels which are not on self.cur.execute(f"""DELETE FROM ArchLab WHERE ARCHID={archid} AND LABID NOT IN ({",".join(on_labels)})""") to_add_list=[] + # creates all entries as strings and puts them into a list for i in to_add: to_add_list.append("(" + str(archid) + "," + str(i) + ")") # add all new labels self.cur.execute(f"""INSERT INTO ArchLab(ARCHID,LABID) VALUES{",".join(to_add_list)}""") return True, "" - ## Returns n archives, sorted by (imported )time or size + ## Returns n archives, sorted by a column ## OUTPUT: archives:array=[…,(ID:int,NAME:str,SIZE:str,IMPORTED[UNIX]:int),…] def get_n_archives(self, sorttype:str="time",category:int=0, keywords:list=[], count:int=20,labels:list=[]): # TODO: CLEANN!!!!! match sorttype: @@ -271,21 +277,6 @@ class db: {keyword_string} ORDER BY {sorttype} LIMIT {count if count else 20}""") archives=self.cur.fetchall() - ## WARNING: JANK - #positive_archives=[] - #print("LABELS:", labels) - #if len(labels) >= 1: - # for arch in archives: - # archid=arch[0] - # archive_labels=self.get_label_info(archid) - # success=True - # for label in archive_labels: - # if not label[0] in labels: - # success=False - # if success: - # positive_archives.append(arch) - # if len(positive_archives) >= count: - # break return archives