## MAIN FUNCTIONS FILE FOR BACK-BACKEND OF FLASK import mariadb as sql from os import environ import time ## params populated with environment variables, defaults can be changed for a permanent solution conn_params={ "user" : environ.get('MARIADB_USER') if environ.get('MARIADB_USER') else "rar_index_app", "password" : environ.get('MARIADB_PASSWORD') if environ.get('MARIADB_PASSWORD') else "password", "host" : environ.get('MARIADB_HOST') if environ.get('MARIADB_HOST') else "marcelsite.com", "database" : environ.get('MARIADB_DB') if environ.get('MARIADB_DB') else "rar_index" } class db: def __init__(self): self.conn=sql.connect(**conn_params) self.conn.autocommit=True self.cur=self.conn.cursor() ## Creates all archives, if they don't exist already ## Called only on startup, hence the name def startup(self): self.cur.execute("""CREATE TABLE IF NOT EXISTS Archs( ID int PRIMARY KEY AUTO_INCREMENT, NAME text NOT NULL, HASH text NOT NULL, SIZE int NOT NULL, IMPORTED int, CATEGORY int, OWNER int );""") self.cur.execute("""CREATE TABLE IF NOT EXISTS Users( ID int PRIMARY KEY AUTO_INCREMENT, UNAME text NOT NULL, DNAME text NOT NULL, CREATED int NOT NULL, STATE text, PASSHASH text NOT NULL );""") self.cur.execute("""CREATE TABLE IF NOT EXISTS Sessions( ID int PRIMARY KEY AUTO_INCREMENT, SESSKEY text NOT NULL, USERID int NOT NULL, CREATED int NOT NULL, LIFE int );""") self.cur.execute("""CREATE TABLE IF NOT EXISTS Cats( ID int PRIMARY KEY AUTO_INCREMENT, CATEGORY text NOT NULL, PARENT int, DESCRIPTION text );""") self.cur.execute("""CREATE TABLE IF NOT EXISTS ArchLab( ID int PRIMARY KEY AUTO_INCREMENT, ARCHID int NOT NULL, LABID int NOT NULL );""") self.cur.execute("""CREATE TABLE IF NOT EXISTS Labs( ID int PRIMARY KEY AUTO_INCREMENT, LABEL text NOT NULL, CATEGORY text, TYPE int NOT NULL );""") self.cur.execute("""CREATE TABLE IF NOT EXISTS LabType( ID int PRIMARY KEY AUTO_INCREMENT, NAME text NOT NULL, DESCRIPTION text );""") ## Gets the passhash from a specific user ## OUTPUT: (If user exists) int=200, passhash:str ## (If user does not exist) int=400, Exception:str def get_passhash(self, username:str): self.cur.execute(f"SELECT ID,PASSHASH FROM Users WHERE UNAME='{username}'") try: resp=self.cur.fetchone() except Exception as e: return 400, e, NULL return 200, resp[0], resp[1] ## Checks if sesskey exists and is not expired ## OUTPUT: (if valiid) bool=True, USERID:str ## (in invalid) bool=False, str="" def check_sesskey(self, sesskey:str): self.cur.execute(f"SELECT SESSKEY,USERID FROM Sessions WHERE SESSKEY='{sesskey}'") entry=self.cur.fetchone() if sesskey in entry: return True, entry[1] else: return False, "" ## Sets a session key. That's it. def set_sesskey(self, sesskey:str, userid:int, lifetime:int): self.cur.execute(f"INSERT INTO Sessions(SESSKEY,USERID,CREATED,LIFE) VALUES('{sesskey}',{userid},{time.time()},{lifetime})") ## Gets and returns all user info about one (1) user ## OUTPUT: tuple=(ID:int,UNAME:str,DNAME:str,CREATED:int,STATE:text,PASSHASH:text) def get_user_info(self, userid:int): self.cur.execute(f"SELECT * FROM Users WHERE ID='{userid}'") return self.cur.fetchone() ## Returns all relevant information about one (1) archive ## OUTPUT: archive:tuple=(NAME:str,HASH:str,IMPORTED[UNIX]:int,CATEGORY,str,CATEGORY.DESCRIPTION:str,UNAME:str,DNAME:str) ## labels:array=[…,(LABEL:str,CATEGORY:str,CATDESC:str,LABTYPE:str,LABDESC:str),…] def get_archive_info(self, hash:str): self.cur.execute(f"""SELECT Archs.NAME,Archs.HASH,Archs.IMPORTED,Cats.CATEGORY,Cats.DESCRIPTION,Users.UNAME,Users.DNAME FROM Archs JOIN Cats ON Cats.ID=Archs.CATEGORY JOIN Users ON Users.ID=Archs.OWNER WHERE hash='{hash}'""") archive=self.cur.fetchone() self.cur.execute(f"""SELECT Labs.LABEL,Cats.CATEGORY,Cats.DESCRIPTION AS CATDESC,LabType.NAME AS LABTYPE,LabType.DESCRIPTION AS LABDESC FROM ArchLab JOIN Archs ON Archs.ID=ArchLab.ARCHID JOIN Labs ON Labs.ID=ArchLab.LABID JOIN Cats ON Labs.CATEGORY=Cats.ID JOIN LabType ON Labs.TYPE=LabType.ID WHERE ARCHID=1;""") labels=self.cur.fetchall() return archive, labels ## Returns all categories. ## OUTPUT: array=[…,(ID:int,CATEGORY:str,PARENT:int,DESCRIPTION:str),…] def get_all_categories(self): self.cur.execute("SELECT * FROM Cats;") return self.cur.fetchall() ## Returns n archives, sorted by (imported )time or size ## OUTPUT: archives:array=[…,(ID:int,NAME:str,SIZE:str,IMPORTED[UNIX]:int),…] def get_n_archives(self, sorttype:str="time",category:int=0, keywords:list=[], count:int=20): match sorttype: case "size": sorttype="SIZE DESC" case "time": sorttype="IMPORTED DESC" case "za": sorttype="NAME DESC" case _: sorttype="NAME ASC" # create SQL query for keywords keyword_string="" for w in keywords: keyword_string+=f"AND NAME LIKE '%{w}%' " if len(keywords) == 1: for w in keywords: keyword_string+=f"OR HASH = '{w}' " # Get all children of category (if exist) and put into query string categories=self.get_all_categories() catlist=[category] for i in categories: if i[2] == int(category): catlist.append(str(i[0])) category="(" + ",".join(catlist) + ")" self.cur.execute(f"""SELECT ID,NAME,SIZE,IMPORTED FROM Archs {"WHERE 1=1" if category==0 else " WHERE CATEGORY IN " + category} {keyword_string} ORDER BY {sorttype} LIMIT {count};""") archives=self.cur.fetchall() return archives if __name__ == "__main__": #startup() db=db(conn_params) db.cur.close() db.conn.close() exit()