2023-10-17 18:25:38 +02:00
## MAIN FUNCTIONS FILE FOR BACK-BACKEND OF FLASK
import mariadb as sql
from os import environ
2023-10-19 13:25:59 +02:00
import time , re
2023-10-17 18:25:38 +02:00
## params populated with environment variables, defaults can be changed for a permanent solution
conn_params = {
2023-10-19 19:39:54 +02:00
" user " : environ . get ( ' MARIADB_USER ' ) if environ . get ( ' MARIADB_USER ' ) else MARIADB_USER ,
" password " : environ . get ( ' MARIADB_PASSWORD ' ) if environ . get ( ' MARIADB_PASSWORD ' ) else MARIADB_PASSWORD ,
" host " : environ . get ( ' MARIADB_HOST ' ) if environ . get ( ' MARIADB_HOST ' ) else MARIADB_HOST ,
" database " : environ . get ( ' MARIADB_DB ' ) if environ . get ( ' MARIADB_DB ' ) else MARIAD_DB
2023-10-17 18:25:38 +02:00
}
class db :
def __init__ ( self ) :
self . conn = sql . connect ( * * conn_params )
2023-10-18 12:55:20 +02:00
self . conn . autocommit = True
2023-10-17 18:25:38 +02:00
self . cur = self . conn . cursor ( )
## Creates all archives, if they don't exist already
## Called only on startup, hence the name
def startup ( self ) :
2023-10-18 12:55:20 +02:00
self . cur . execute ( """ CREATE TABLE IF NOT EXISTS Archs(
2023-10-17 18:25:38 +02:00
ID int PRIMARY KEY AUTO_INCREMENT ,
NAME text NOT NULL ,
2023-10-19 13:25:59 +02:00
HASH text NOT NULL UNIQUE ,
2023-10-17 18:25:38 +02:00
SIZE int NOT NULL ,
IMPORTED int ,
CATEGORY int ,
OWNER int
2023-10-18 12:55:20 +02:00
) ; """ )
self . cur . execute ( """ CREATE TABLE IF NOT EXISTS Users(
2023-10-17 18:25:38 +02:00
ID int PRIMARY KEY AUTO_INCREMENT ,
2023-10-19 19:14:18 +02:00
UNAME text NOT NULL UNIQUE ,
2023-10-17 18:25:38 +02:00
DNAME text NOT NULL ,
CREATED int NOT NULL ,
STATE text ,
PASSHASH text NOT NULL
2023-10-18 12:55:20 +02:00
) ; """ )
self . cur . execute ( """ CREATE TABLE IF NOT EXISTS Sessions(
2023-10-17 18:25:38 +02:00
ID int PRIMARY KEY AUTO_INCREMENT ,
2023-10-19 19:14:18 +02:00
SESSKEY text NOT NULL UNIQUE ,
2023-10-18 12:55:20 +02:00
USERID int NOT NULL ,
2023-10-17 18:25:38 +02:00
CREATED int NOT NULL ,
LIFE int
2023-10-18 12:55:20 +02:00
) ; """ )
self . cur . execute ( """ CREATE TABLE IF NOT EXISTS Cats(
2023-10-17 18:25:38 +02:00
ID int PRIMARY KEY AUTO_INCREMENT ,
CATEGORY text NOT NULL ,
PARENT int ,
DESCRIPTION text
2023-10-18 12:55:20 +02:00
) ; """ )
self . cur . execute ( """ CREATE TABLE IF NOT EXISTS ArchLab(
2023-10-17 18:25:38 +02:00
ID int PRIMARY KEY AUTO_INCREMENT ,
ARCHID int NOT NULL ,
LABID int NOT NULL
2023-10-18 12:55:20 +02:00
) ; """ )
2023-10-19 13:25:59 +02:00
self . cur . execute ( """ CREATE TABLE IF NOT EXISTS CatLabType(
ID int PRIMARY KEY AUTO_INCREMENT ,
CATID int NOT NULL ,
LABID int NOT NULL
) ; """ )
2023-10-18 12:55:20 +02:00
self . cur . execute ( """ CREATE TABLE IF NOT EXISTS Labs(
2023-10-17 18:25:38 +02:00
ID int PRIMARY KEY AUTO_INCREMENT ,
LABEL text NOT NULL ,
TYPE int NOT NULL
2023-10-18 12:55:20 +02:00
) ; """ )
self . cur . execute ( """ CREATE TABLE IF NOT EXISTS LabType(
2023-10-17 18:25:38 +02:00
ID int PRIMARY KEY AUTO_INCREMENT ,
NAME text NOT NULL ,
DESCRIPTION text
2023-10-18 12:55:20 +02:00
) ; """ )
## Gets the passhash from a specific user
## OUTPUT: (If user exists) int=200, passhash:str
## (If user does not exist) int=400, Exception:str
def get_passhash ( self , username : str ) :
self . cur . execute ( f " SELECT ID,PASSHASH FROM Users WHERE UNAME= ' { username } ' " )
try :
resp = self . cur . fetchone ( )
except Exception as e :
return 400 , e , NULL
return 200 , resp [ 0 ] , resp [ 1 ]
## Checks if sesskey exists and is not expired
## OUTPUT: (if valiid) bool=True, USERID:str
## (in invalid) bool=False, str=""
def check_sesskey ( self , sesskey : str ) :
self . cur . execute ( f " SELECT SESSKEY,USERID FROM Sessions WHERE SESSKEY= ' { sesskey } ' " )
entry = self . cur . fetchone ( )
if sesskey in entry :
return True , entry [ 1 ]
else :
return False , " "
## Sets a session key. That's it.
def set_sesskey ( self , sesskey : str , userid : int , lifetime : int ) :
self . cur . execute ( f " INSERT INTO Sessions(SESSKEY,USERID,CREATED,LIFE) VALUES( ' { sesskey } ' , { userid } , { time . time ( ) } , { lifetime } ) " )
## Gets and returns all user info about one (1) user
## OUTPUT: tuple=(ID:int,UNAME:str,DNAME:str,CREATED:int,STATE:text,PASSHASH:text)
def get_user_info ( self , userid : int ) :
self . cur . execute ( f " SELECT * FROM Users WHERE ID= ' { userid } ' " )
return self . cur . fetchone ( )
2023-10-17 18:25:38 +02:00
2023-10-19 13:25:59 +02:00
## Checks information for errors and adds archive to the DB
## OUTPUT: (if successful) res:bool=True, ID:int
## (if unsuccessful) res:bool=False, str
def add_archive ( self , archive : dict ) :
# Check everything for errors or malicious things
archive [ " hash " ] = archive [ " hash " ] . upper ( )
if not re . match ( ' [A-Z0-9] {40} ' , archive [ " hash " ] ) :
return False , " Hash needs to be 40 characters in hexadecimal (SHA-1). "
2023-10-19 18:24:36 +02:00
if re . match ( ' .*[^A-Za-z0-9 \ . +_-].* ' , archive [ " name " ] ) :
2023-10-19 13:25:59 +02:00
return False , " The name contains illegal characters. Allowed chars: ' [A-Za-z0-9 \ . _-] ' "
print ( archive [ " name " ] )
curtime = time . time ( )
try :
self . cur . execute ( f " INSERT INTO Archs(NAME,HASH,SIZE,IMPORTED,CATEGORY,OWNER) VALUES( ' { archive [ ' name ' ] } ' , ' { archive [ ' hash ' ] } ' , { archive [ ' size ' ] } , { curtime } , { archive [ ' category ' ] } , { archive [ ' owner ' ] } ) " )
except Exception as e : # hash needs to be unique
return False , e
self . cur . execute ( f " SELECT ID FROM Archs WHERE HASH= ' { archive [ ' hash ' ] } ' " )
archid = self . cur . fetchone ( )
return True , archid [ 0 ]
2023-10-19 19:14:18 +02:00
def delete_archive ( self , archid : int ) :
self . cur . execute ( f """ DELETE FROM Archs WHERE ID= { archid } """ )
self . cur . execute ( f """ DELETE FROM ArchLab WHERE ARCHID= { archid } """ )
2023-10-19 13:25:59 +02:00
2023-10-17 18:25:38 +02:00
## Returns all relevant information about one (1) archive
2023-10-19 18:24:36 +02:00
## OUTPUT: archive:tuple=(ID:int,NAME:str,HASH:str,SIZE:int,IMPORTED[UNIX]:int,CATEGORY.ID:int,CATEGORY,str,CATEGORY.DESCRIPTION:str,USER.ID:int,DNAME:str),
## category:tuple=(ID:int,CATEGORY:str,DESCRIPTION:str,PID:int,PCAT:str,PDESC:str)
## labels:array=[…,(LABEL:str,LABTYPE:int,LABDESC:str),…]
def get_archive_info ( self , archid : int ) :
# get info about archive itself
self . cur . execute ( f """ SELECT Archs.ID,Archs.NAME,Archs.HASH,Archs.SIZE,Archs.IMPORTED,Cats.ID,Cats.CATEGORY,Cats.DESCRIPTION,Users.ID,Users.DNAME FROM Archs
2023-10-17 18:25:38 +02:00
JOIN Cats ON Cats . ID = Archs . CATEGORY
JOIN Users ON Users . ID = Archs . OWNER
2023-10-19 18:24:36 +02:00
WHERE Archs . ID = ' {archid} ' """ )
2023-10-17 18:25:38 +02:00
archive = self . cur . fetchone ( )
2023-10-19 18:24:36 +02:00
# get info about category and it's parent
self . cur . execute ( f """ SELECT c.ID,c.CATEGORY,c.DESCRIPTION,p.ID AS PID,p.CATEGORY as PCAT,p.DESCRIPTION AS PDESC FROM Cats c, Cats p
WHERE c . ID = { archive [ 5 ] } AND c . PARENT = p . ID """ )
category = self . cur . fetchone ( )
# get info about labels of archive
self . cur . execute ( f """ SELECT Labs.LABEL,LabType.ID AS LABTYPE,LabType.DESCRIPTION AS LABDESC FROM ArchLab
2023-10-17 18:25:38 +02:00
JOIN Archs ON Archs . ID = ArchLab . ARCHID
JOIN Labs ON Labs . ID = ArchLab . LABID
JOIN LabType ON Labs . TYPE = LabType . ID
2023-10-19 18:24:36 +02:00
WHERE ARCHID = { archid } ; """ )
2023-10-17 18:25:38 +02:00
labels = self . cur . fetchall ( )
2023-10-19 18:24:36 +02:00
return archive , category , labels
2023-10-17 18:25:38 +02:00
2023-10-18 17:24:59 +02:00
## Returns all categories.
## OUTPUT: array=[…,(ID:int,CATEGORY:str,PARENT:int,DESCRIPTION:str),…]
2023-10-19 18:24:36 +02:00
def get_categories ( self ) :
2023-10-18 17:24:59 +02:00
self . cur . execute ( " SELECT * FROM Cats; " )
return self . cur . fetchall ( )
2023-10-19 18:24:36 +02:00
## get all labeltypes and their respective labels based on a category parent
## OUTPUT: res_dict:dict={…,LabType.NAME:[…,(ID:int,NAME:str),…],…}
def get_label_labeltypes ( self , catparentid : int ) :
# gets all relevant labtypes: […,(ID.int,NAME:str),…]
self . cur . execute ( f """ SELECT LabType.ID,LabType.NAME FROM CatLabType
JOIN Cats ON Cats . ID = CatLabType . CATID
JOIN LabType ON LabType . ID = CatLabType . LABID
WHERE Cats . ID = { catparentid }
ORDER BY LabType . NAME ASC """ )
labtypes_list = self . cur . fetchall ( )
labtypes_ids , labtypes_names = [ ] , [ ]
for w , e in labtypes_list :
labtypes_ids . append ( str ( w ) )
labtypes_names . append ( e )
ltid_string = " ( " + " , " . join ( labtypes_ids ) + " ) "
# gets all relevant labs: […,(ID:int,NAME:str,LTNAME:str),…]
self . cur . execute ( f """ SELECT Labs.ID,Labs.LABEL,LabType.NAME AS LTNAME FROM Labs
JOIN LabType ON Labs . TYPE = LabType . ID
WHERE LabType . ID IN { ltid_string }
ORDER BY Labs . LABEL ASC """ )
labs_list = self . cur . fetchall ( )
res_dict = { }
# creates all labtype entries in dict
for i in labtypes_names :
res_dict [ i ] = [ ]
# puts all labs into their respective labtype
for entry in labs_list :
res_dict [ entry [ 2 ] ] . append ( entry [ : 2 ] )
return res_dict
## get a list of enabled labels and update the DB to reflect that state
## OUTPUT: (if on_labels empty) bool=False, str
## (else)
2023-10-19 19:14:18 +02:00
def update_labels ( self , archid : int , on_labels : list ) :
2023-10-19 18:24:36 +02:00
# fail if no labels passed
if len ( on_labels ) == 0 :
return False , " You have to select at least one label! "
# get all relevant labels
self . cur . execute ( f """ SELECT ArchLab.LABID FROM ArchLab
WHERE ArchLab . ARCHID = { archid } """ )
existing_labs = [ ]
for i in self . cur . fetchall ( ) :
existing_labs . append ( i [ 0 ] )
to_add = [ ]
# get all missing labels to add
for lab in on_labels :
if int ( lab ) not in existing_labs :
to_add . append ( lab )
# remove all labels which are not on
self . cur . execute ( f """ DELETE FROM ArchLab WHERE ARCHID= { archid } AND LABID NOT IN ( { " , " . join ( on_labels ) } ) """ )
to_add_list = [ ]
for i in to_add :
to_add_list . append ( " ( " + str ( archid ) + " , " + str ( i ) + " ) " )
# add all new labels
self . cur . execute ( f """ INSERT INTO ArchLab(ARCHID,LABID) VALUES { " , " . join ( to_add_list ) } """ )
return True , " "
2023-10-18 17:24:59 +02:00
2023-10-17 18:25:38 +02:00
## Returns n archives, sorted by (imported )time or size
2023-10-18 12:55:20 +02:00
## OUTPUT: archives:array=[…,(ID:int,NAME:str,SIZE:str,IMPORTED[UNIX]:int),…]
2023-10-18 17:24:59 +02:00
def get_n_archives ( self , sorttype : str = " time " , category : int = 0 , keywords : list = [ ] , count : int = 20 ) :
2023-10-17 18:25:38 +02:00
match sorttype :
case " size " :
2023-10-18 17:24:59 +02:00
sorttype = " SIZE DESC "
case " time " :
sorttype = " IMPORTED DESC "
case " za " :
sorttype = " NAME DESC "
2023-10-17 18:25:38 +02:00
case _ :
2023-10-18 17:24:59 +02:00
sorttype = " NAME ASC "
# create SQL query for keywords
keyword_string = " "
for w in keywords :
keyword_string + = f " AND NAME LIKE ' % { w } % ' "
if len ( keywords ) == 1 :
2023-10-19 18:24:36 +02:00
keyword_string + = f " OR HASH = ' { keywords [ 0 ] } ' "
2023-10-18 17:24:59 +02:00
# Get all children of category (if exist) and put into query string
2023-10-19 18:24:36 +02:00
categories = self . get_categories ( )
2023-10-19 13:25:59 +02:00
catlist = [ str ( category ) ]
2023-10-18 17:24:59 +02:00
for i in categories :
if i [ 2 ] == int ( category ) :
catlist . append ( str ( i [ 0 ] ) )
2023-10-19 13:25:59 +02:00
categories = " ( " + " , " . join ( catlist ) + " ) "
2023-10-18 17:24:59 +02:00
self . cur . execute ( f """ SELECT ID,NAME,SIZE,IMPORTED FROM Archs
2023-10-19 13:25:59 +02:00
{ " WHERE 1=1 " if category == 0 else " WHERE CATEGORY IN " + categories }
2023-10-18 17:24:59 +02:00
{ keyword_string }
ORDER BY { sorttype } LIMIT { count } ; """ )
2023-10-17 18:25:38 +02:00
archives = self . cur . fetchall ( )
return archives
if __name__ == " __main__ " :
#startup()
db = db ( conn_params )
db . cur . close ( )
db . conn . close ( )
exit ( )