301 lines
14 KiB
Python
301 lines
14 KiB
Python
## MAIN FUNCTIONS FILE FOR BACK-BACKEND OF FLASK
|
|
import mariadb as sql
|
|
from os import environ
|
|
import time,re
|
|
from config import *
|
|
|
|
## params populated with environment variables, get data from config if environment variables not set
|
|
conn_params={
|
|
"user" : environ.get('MARIADB_USER') if environ.get('MARIADB_USER') else MARIADB_USER,
|
|
"password" : environ.get('MARIADB_PASSWORD') if environ.get('MARIADB_PASSWORD') else MARIADB_PASSWORD,
|
|
"host" : environ.get('MARIADB_HOST') if environ.get('MARIADB_HOST') else MARIADB_HOST,
|
|
"database" : environ.get('MARIADB_DB') if environ.get('MARIADB_DB') else MARIADB_DB
|
|
}
|
|
|
|
class db:
|
|
def __init__(self):
|
|
self.conn=sql.connect(**conn_params)
|
|
self.conn.autocommit=True
|
|
self.cur=self.conn.cursor()
|
|
|
|
## Creates all archives, if they don't exist already
|
|
## Called only on startup, hence the name
|
|
def startup(self):
|
|
self.cur.execute("""CREATE TABLE IF NOT EXISTS Archs(
|
|
ID int PRIMARY KEY AUTO_INCREMENT,
|
|
NAME text NOT NULL,
|
|
HASH text NOT NULL UNIQUE,
|
|
SIZE bigint NOT NULL,
|
|
IMPORTED int,
|
|
CATEGORY int,
|
|
OWNER int);""")
|
|
self.cur.execute("""CREATE TABLE IF NOT EXISTS Users(
|
|
ID int PRIMARY KEY AUTO_INCREMENT,
|
|
UNAME text NOT NULL UNIQUE,
|
|
DNAME text NOT NULL,
|
|
CREATED int NOT NULL,
|
|
STATE text,
|
|
PASSHASH text NOT NULL);""")
|
|
self.cur.execute("""CREATE TABLE IF NOT EXISTS Sessions(
|
|
ID int PRIMARY KEY AUTO_INCREMENT,
|
|
SESSKEY text NOT NULL UNIQUE,
|
|
USERID int NOT NULL,
|
|
CREATED int NOT NULL,
|
|
LIFE int);""")
|
|
self.cur.execute("""CREATE TABLE IF NOT EXISTS Cats(
|
|
ID int PRIMARY KEY AUTO_INCREMENT,
|
|
CATEGORY text NOT NULL,
|
|
PARENT int,
|
|
DESCRIPTION text);""")
|
|
self.cur.execute("""CREATE TABLE IF NOT EXISTS ArchLab(
|
|
ID int PRIMARY KEY AUTO_INCREMENT,
|
|
ARCHID int NOT NULL,
|
|
LABID int NOT NULL);""")
|
|
self.cur.execute("""CREATE TABLE IF NOT EXISTS CatLabType(
|
|
ID int PRIMARY KEY AUTO_INCREMENT,
|
|
CATID int NOT NULL,
|
|
LABID int NOT NULL);""")
|
|
self.cur.execute("""CREATE TABLE IF NOT EXISTS Labs(
|
|
ID int PRIMARY KEY AUTO_INCREMENT,
|
|
LABEL text NOT NULL,
|
|
TYPE int NOT NULL);""")
|
|
self.cur.execute("""CREATE TABLE IF NOT EXISTS LabType(
|
|
ID int PRIMARY KEY AUTO_INCREMENT,
|
|
NAME text NOT NULL,
|
|
DESCRIPTION text);""")
|
|
|
|
## Gets the passhash from a specific user
|
|
## OUTPUT: (If user exists) bool=True, ID:int, passhash:str
|
|
## (If user does not exist) bool=False, Exception:str
|
|
def get_passhash(self, username:str):
|
|
self.cur.execute(f"SELECT ID,PASSHASH FROM Users WHERE UNAME='{username}'")
|
|
resp=self.cur.fetchone()
|
|
if not resp:
|
|
return False, "The user does not exist!", None
|
|
return True, resp[0], resp[1]
|
|
|
|
## Checks if sesskey exists and is not expired
|
|
## OUTPUT: (if valiid) bool=True, USERID:str
|
|
## (in invalid) bool=False, str=""
|
|
def check_sesskey(self, sesskey:str):
|
|
self.cur.execute(f"SELECT SESSKEY,USERID,CREATED,LIFE FROM Sessions WHERE SESSKEY='{sesskey}'")
|
|
entry=self.cur.fetchone()
|
|
if entry and sesskey in entry and time.time() < entry[2]+entry[3]:
|
|
return True, entry[1]
|
|
else:
|
|
return False, "The session key is expired or does not exist!"
|
|
|
|
## Sets a session key. That's it.
|
|
def set_sesskey(self, sesskey:str, userid:int, lifetime:int):
|
|
self.cur.execute(f"INSERT INTO Sessions(SESSKEY,USERID,CREATED,LIFE) VALUES('{sesskey}',{userid},{time.time()},{lifetime})")
|
|
|
|
## Removes the session key from the database (doesn't delete the cookie)
|
|
def logout_user(self, sesskey:str):
|
|
self.cur.execute(f"DELETE FROM Sessions WHERE SESSKEY='{sesskey}'")
|
|
|
|
## Gets and returns all user info about one (1) user
|
|
## OUTPUT: (if successful) bool=True, tuple=(ID:int,UNAME:str,DNAME:str,CREATED:int,STATE:text,PASSHASH:text)
|
|
## (if unsuccessful) bool=False, error:str
|
|
def get_user_info(self, identifier):
|
|
match identifier:
|
|
case int():
|
|
query=f"ID={identifier}"
|
|
case str():
|
|
query=f"UNAME='{identifier}'"
|
|
case _:
|
|
return False, "Wrong identifier type!"
|
|
|
|
self.cur.execute(f"SELECT * FROM Users WHERE {query}")
|
|
return True, self.cur.fetchone()
|
|
|
|
## updates a value of a user in the database (if allowed in the dictionary)
|
|
## OUTPUT: (if successful) bool=True, str="Updated"
|
|
## (if unsuccessful) bool=False, str="Not allowed"
|
|
def update_user_info(self, userid:int, update_type:str,value):
|
|
allowed_types={"DNAME":str,"PASSHASH":str} # only allow to edit these columns!
|
|
if update_type.upper() not in allowed_types:
|
|
return False, "Not allowed"
|
|
self.cur.execute(f"""UPDATE Users SET {update_type}={value if allowed_types[update_type]==int else f"'{value}'"} WHERE ID={userid}""")
|
|
return True, "Updated"
|
|
|
|
## Checks information for errors and adds archive to the DB
|
|
## OUTPUT: (if successful) res:bool=True, ID:int
|
|
## (if unsuccessful) res:bool=False, str
|
|
def add_archive(self, archive:dict):
|
|
# Check everything for errors or malicious things
|
|
archive["hash"]=archive["hash"].upper()
|
|
if not re.match('[A-Z0-9]{40}', archive["hash"]):
|
|
return False, "Hash needs to be 40 characters in hexadecimal (SHA-1)."
|
|
if re.match('.*[^A-Za-z0-9\. +_-].*', archive["name"]):
|
|
return False, "The name contains illegal characters. Allowed chars: '[A-Za-z0-9\. _-]'"
|
|
print(archive["name"])
|
|
|
|
curtime=time.time()
|
|
try:
|
|
self.cur.execute(f"INSERT INTO Archs(NAME,HASH,SIZE,IMPORTED,CATEGORY,OWNER) VALUES('{archive['name']}','{archive['hash']}',{archive['size']},{curtime},{archive['category']},{archive['owner']})")
|
|
except Exception as e: # hash needs to be unique
|
|
return False, e
|
|
self.cur.execute(f"SELECT ID FROM Archs WHERE HASH='{archive['hash']}'")
|
|
archid=self.cur.fetchone()
|
|
return True,archid[0]
|
|
|
|
## Deletes all relevant entries from Archs and ArchLab
|
|
def delete_archive(self, archid:int):
|
|
self.cur.execute(f"""DELETE FROM Archs WHERE ID={archid}""")
|
|
self.cur.execute(f"""DELETE FROM ArchLab WHERE ARCHID={archid}""")
|
|
|
|
## Returns all relevant information about one (1) archive
|
|
## OUTPUT: (if archive doesn't exist) bool=False, error:str
|
|
## (if archive exists) bool=True,
|
|
## archive:tuple=(ID:int,NAME:str,HASH:str,SIZE:int,IMPORTED[UNIX]:int,CATEGORY.ID:int,CATEGORY,str,CATEGORY.DESCRIPTION:str,USER.ID:int,DNAME:str),
|
|
## category:tuple=(ID:int,CATEGORY:str,DESCRIPTION:str,PID:int,PCAT:str,PDESC:str),
|
|
## labels:list=[…,(ID:int,LABEL:str,LABTYPE:int,LABDESC:str),…]
|
|
def get_archive_info(self, archid:int):
|
|
# get info about archive itself
|
|
self.cur.execute(f"""SELECT Archs.ID,Archs.NAME,Archs.HASH,Archs.SIZE,Archs.IMPORTED,Cats.ID,Cats.CATEGORY,Cats.DESCRIPTION,Users.ID,Users.DNAME FROM Archs
|
|
JOIN Cats ON Cats.ID=Archs.CATEGORY
|
|
JOIN Users ON Users.ID=Archs.OWNER
|
|
WHERE Archs.ID='{archid}'""")
|
|
archive=self.cur.fetchone()
|
|
if not archive:
|
|
return False, "The archive does to exist!", None, None
|
|
# get info about category and it's parent
|
|
self.cur.execute(f"""SELECT c.ID,c.CATEGORY,c.DESCRIPTION,p.ID AS PID,p.CATEGORY as PCAT,p.DESCRIPTION AS PDESC FROM Cats c, Cats p
|
|
WHERE c.ID={archive[5]} AND c.PARENT=p.ID""")
|
|
category=self.cur.fetchone()
|
|
# get info about labels of archive
|
|
|
|
labels=self.get_label_info(archid)
|
|
return True, archive, category, labels
|
|
|
|
## Gets all labels and their parents of an archive
|
|
## OUTPUT: list=[…,(ID:int,LABEL:str,LABTYPE:int,LABDESC:str),…]
|
|
def get_label_info(self, archid:int):
|
|
self.cur.execute(f"""SELECT Labs.ID,Labs.LABEL,LabType.ID AS LABTYPE,LabType.DESCRIPTION AS LABDESC FROM ArchLab
|
|
JOIN Archs ON Archs.ID=ArchLab.ARCHID
|
|
JOIN Labs ON Labs.ID=ArchLab.LABID
|
|
JOIN LabType ON Labs.TYPE=LabType.ID
|
|
WHERE ARCHID={archid};""")
|
|
return self.cur.fetchall()
|
|
|
|
## Returns all categories.
|
|
## OUTPUT: array=[…,(ID:int,CATEGORY:str,PARENT:int,DESCRIPTION:str),…]
|
|
def get_categories(self):
|
|
self.cur.execute("SELECT * FROM Cats;")
|
|
return self.cur.fetchall()
|
|
|
|
## Gets and returns all info about one (1) category
|
|
## OUTPUT: tup=(ID:int,CATEGORY:str,PARENT:int,DESCRIPTION:str)
|
|
def get_category_info(self, catid:int):
|
|
self.cur.execute(f"SELECT * FROM Cats WHERE ID={catid}")
|
|
return self.cur.fetchone()
|
|
|
|
## get all labeltypes and their respective labels based on a category parent
|
|
## OUTPUT: res_dict:dict={…,LabType.NAME:[…,(ID:int,NAME:str),…],…}
|
|
def get_label_labeltypes(self, catparentid:int):
|
|
# gets all relevant labtypes: […,(ID.int,NAME:str),…]
|
|
self.cur.execute(f"""SELECT LabType.ID,LabType.NAME FROM CatLabType
|
|
JOIN Cats ON Cats.ID=CatLabType.CATID
|
|
JOIN LabType ON LabType.ID=CatLabType.LABID
|
|
WHERE Cats.ID={catparentid}
|
|
ORDER BY LabType.NAME ASC""")
|
|
labtypes_list=self.cur.fetchall()
|
|
labtypes_ids,labtypes_names=[],[]
|
|
for w,e in labtypes_list:
|
|
labtypes_ids.append(str(w))
|
|
labtypes_names.append(e)
|
|
ltids_string="(" + ",".join(labtypes_ids) + ")"
|
|
# gets all relevant labs: […,(ID:int,NAME:str,LTNAME:str),…]
|
|
self.cur.execute(f"""SELECT Labs.ID,Labs.LABEL,LabType.NAME AS LTNAME FROM Labs
|
|
JOIN LabType ON Labs.TYPE=LabType.ID
|
|
WHERE LabType.ID IN {ltids_string}
|
|
ORDER BY Labs.LABEL ASC""")
|
|
labs_list=self.cur.fetchall()
|
|
res_dict={}
|
|
# creates all labtype entries in dict
|
|
for i in labtypes_names:
|
|
res_dict[i]=[]
|
|
# puts all labs into their respective labtype
|
|
for entry in labs_list:
|
|
res_dict[entry[2]].append(entry[:2])
|
|
return res_dict
|
|
|
|
## get a list of enabled labels and update the DB to reflect that state
|
|
## OUTPUT: (if on_labels empty) bool=False, str
|
|
## (else) bool=True, str=""
|
|
def update_labels(self, archid:int, on_labels:list):
|
|
# fail if no labels passed
|
|
if len(on_labels) == 0:
|
|
return False, "You have to select at least one label!"
|
|
|
|
# get all relevant labels
|
|
self.cur.execute(f"""SELECT ArchLab.LABID FROM ArchLab
|
|
WHERE ArchLab.ARCHID={archid}""")
|
|
existing_labs=[]
|
|
for i in self.cur.fetchall():
|
|
existing_labs.append(i[0])
|
|
to_add=[]
|
|
# get all missing labels to add
|
|
for lab in on_labels:
|
|
if int(lab) not in existing_labs:
|
|
to_add.append(lab)
|
|
|
|
# remove all labels which are not on
|
|
self.cur.execute(f"""DELETE FROM ArchLab WHERE ARCHID={archid} AND LABID NOT IN ({",".join(on_labels)})""")
|
|
to_add_list=[]
|
|
# creates all entries as strings and puts them into a list
|
|
for i in to_add:
|
|
to_add_list.append("(" + str(archid) + "," + str(i) + ")")
|
|
# add all new labels
|
|
self.cur.execute(f"""INSERT INTO ArchLab(ARCHID,LABID) VALUES{",".join(to_add_list)}""")
|
|
return True, ""
|
|
|
|
## Returns n archives, sorted by a column
|
|
## OUTPUT: archives:array=[…,(ID:int,NAME:str,SIZE:str,IMPORTED[UNIX]:int),…], total_count:int
|
|
def get_n_archives(self, sorttype:str="time",category:int=0, keywords:list=[], count:int=20, labels:list=None, page:int=1): # TODO: CLEANN!!!!!
|
|
match sorttype:
|
|
case "size":
|
|
sorttype="Archs.SIZE DESC"
|
|
case "time":
|
|
sorttype="Archs.IMPORTED DESC"
|
|
case "za":
|
|
sorttype="Archs.NAME DESC"
|
|
case _:
|
|
sorttype="Archs.NAME ASC"
|
|
|
|
# create SQL query for keywords
|
|
keyword_string=""
|
|
for w in keywords:
|
|
keyword_string+=f"AND Archs.NAME LIKE '%{w}%' "
|
|
if len(keywords) == 1:
|
|
keyword_string+=f"OR Archs.HASH = '{keywords[0]}' "
|
|
|
|
# get all children of category (if exist) and put into query string
|
|
categories=self.get_categories()
|
|
catlist=[str(category)]
|
|
for i in categories:
|
|
if i[2] == int(category):
|
|
catlist.append(str(i[0]))
|
|
categories="(" + ",".join(catlist) + ")"
|
|
|
|
# select all archives by keywords, categories and labels
|
|
self.cur.execute(f"""SELECT DISTINCT Archs.ID,Archs.NAME,Archs.SIZE,Archs.IMPORTED FROM Archs
|
|
JOIN ArchLab ON Archs.ID=ArchLab.ArchID
|
|
{"WHERE 1=1" if category==0 else "WHERE Archs.CATEGORY IN " + categories}
|
|
{f"AND ArchLab.LabID IN ({','.join(labels)})" if labels else ""}
|
|
{keyword_string}
|
|
GROUP BY Archs.ID
|
|
{f"HAVING COUNT(Archs.ID)={len(labels)}" if labels else ""}
|
|
ORDER BY {sorttype}""")
|
|
archives=self.cur.fetchall()
|
|
res_archives=archives[page*count-count:page*count] # get archives for the selected page
|
|
total_count=len(archives)
|
|
|
|
return res_archives, total_count
|
|
|
|
if __name__ == "__main__":
|
|
#startup()
|
|
db=db(conn_params)
|
|
db.cur.close()
|
|
db.conn.close()
|
|
exit() |