rar-index-py/flask/func.py
Michael Rodin c3bd73a068 Adding archives
Added functionality to add archives.
2023-10-19 13:25:59 +02:00

193 lines
8 KiB
Python

## MAIN FUNCTIONS FILE FOR BACK-BACKEND OF FLASK
import mariadb as sql
from os import environ
import time,re
## params populated with environment variables, defaults can be changed for a permanent solution
conn_params={
"user" : environ.get('MARIADB_USER') if environ.get('MARIADB_USER') else "rar_index_app",
"password" : environ.get('MARIADB_PASSWORD') if environ.get('MARIADB_PASSWORD') else "password",
"host" : environ.get('MARIADB_HOST') if environ.get('MARIADB_HOST') else "marcelsite.com",
"database" : environ.get('MARIADB_DB') if environ.get('MARIADB_DB') else "rar_index"
}
class db:
def __init__(self):
self.conn=sql.connect(**conn_params)
self.conn.autocommit=True
self.cur=self.conn.cursor()
## Creates all archives, if they don't exist already
## Called only on startup, hence the name
def startup(self):
self.cur.execute("""CREATE TABLE IF NOT EXISTS Archs(
ID int PRIMARY KEY AUTO_INCREMENT,
NAME text NOT NULL,
HASH text NOT NULL UNIQUE,
SIZE int NOT NULL,
IMPORTED int,
CATEGORY int,
OWNER int
);""")
self.cur.execute("""CREATE TABLE IF NOT EXISTS Users(
ID int PRIMARY KEY AUTO_INCREMENT,
UNAME text NOT NULL,
DNAME text NOT NULL,
CREATED int NOT NULL,
STATE text,
PASSHASH text NOT NULL
);""")
self.cur.execute("""CREATE TABLE IF NOT EXISTS Sessions(
ID int PRIMARY KEY AUTO_INCREMENT,
SESSKEY text NOT NULL,
USERID int NOT NULL,
CREATED int NOT NULL,
LIFE int
);""")
self.cur.execute("""CREATE TABLE IF NOT EXISTS Cats(
ID int PRIMARY KEY AUTO_INCREMENT,
CATEGORY text NOT NULL,
PARENT int,
DESCRIPTION text
);""")
self.cur.execute("""CREATE TABLE IF NOT EXISTS ArchLab(
ID int PRIMARY KEY AUTO_INCREMENT,
ARCHID int NOT NULL,
LABID int NOT NULL
);""")
self.cur.execute("""CREATE TABLE IF NOT EXISTS CatLabType(
ID int PRIMARY KEY AUTO_INCREMENT,
CATID int NOT NULL,
LABID int NOT NULL
);""")
self.cur.execute("""CREATE TABLE IF NOT EXISTS Labs(
ID int PRIMARY KEY AUTO_INCREMENT,
LABEL text NOT NULL,
TYPE int NOT NULL
);""")
self.cur.execute("""CREATE TABLE IF NOT EXISTS LabType(
ID int PRIMARY KEY AUTO_INCREMENT,
NAME text NOT NULL,
DESCRIPTION text
);""")
## Gets the passhash from a specific user
## OUTPUT: (If user exists) int=200, passhash:str
## (If user does not exist) int=400, Exception:str
def get_passhash(self, username:str):
self.cur.execute(f"SELECT ID,PASSHASH FROM Users WHERE UNAME='{username}'")
try:
resp=self.cur.fetchone()
except Exception as e:
return 400, e, NULL
return 200, resp[0], resp[1]
## Checks if sesskey exists and is not expired
## OUTPUT: (if valiid) bool=True, USERID:str
## (in invalid) bool=False, str=""
def check_sesskey(self, sesskey:str):
self.cur.execute(f"SELECT SESSKEY,USERID FROM Sessions WHERE SESSKEY='{sesskey}'")
entry=self.cur.fetchone()
if sesskey in entry:
return True, entry[1]
else:
return False, ""
## Sets a session key. That's it.
def set_sesskey(self, sesskey:str, userid:int, lifetime:int):
self.cur.execute(f"INSERT INTO Sessions(SESSKEY,USERID,CREATED,LIFE) VALUES('{sesskey}',{userid},{time.time()},{lifetime})")
## Gets and returns all user info about one (1) user
## OUTPUT: tuple=(ID:int,UNAME:str,DNAME:str,CREATED:int,STATE:text,PASSHASH:text)
def get_user_info(self, userid:int):
self.cur.execute(f"SELECT * FROM Users WHERE ID='{userid}'")
return self.cur.fetchone()
## Checks information for errors and adds archive to the DB
## OUTPUT: (if successful) res:bool=True, ID:int
## (if unsuccessful) res:bool=False, str
def add_archive(self, archive:dict):
# Check everything for errors or malicious things
archive["hash"]=archive["hash"].upper()
if not re.match('[A-Z0-9]{40}', archive["hash"]):
return False, "Hash needs to be 40 characters in hexadecimal (SHA-1)."
if re.match('.*[^A-Za-z0-9\. _-].*', archive["name"]):
return False, "The name contains illegal characters. Allowed chars: '[A-Za-z0-9\. _-]'"
print(archive["name"])
curtime=time.time()
try:
self.cur.execute(f"INSERT INTO Archs(NAME,HASH,SIZE,IMPORTED,CATEGORY,OWNER) VALUES('{archive['name']}','{archive['hash']}',{archive['size']},{curtime},{archive['category']},{archive['owner']})")
except Exception as e: # hash needs to be unique
return False, e
self.cur.execute(f"SELECT ID FROM Archs WHERE HASH='{archive['hash']}'")
archid=self.cur.fetchone()
return True,archid[0]
## Returns all relevant information about one (1) archive
## OUTPUT: archive:tuple=(NAME:str,HASH:str,IMPORTED[UNIX]:int,CATEGORY,str,CATEGORY.DESCRIPTION:str,UNAME:str,DNAME:str),
## labels:array=[…,(LABEL:str,CATEGORY:str,CATDESC:str,LABTYPE:str,LABDESC:str),…]
def get_archive_info(self, hash:str):
self.cur.execute(f"""SELECT Archs.NAME,Archs.HASH,Archs.IMPORTED,Cats.CATEGORY,Cats.DESCRIPTION,Users.UNAME,Users.DNAME FROM Archs
JOIN Cats ON Cats.ID=Archs.CATEGORY
JOIN Users ON Users.ID=Archs.OWNER
WHERE hash='{hash}'""")
archive=self.cur.fetchone()
self.cur.execute(f"""SELECT Labs.LABEL,Cats.CATEGORY,Cats.DESCRIPTION AS CATDESC,LabType.NAME AS LABTYPE,LabType.DESCRIPTION AS LABDESC FROM ArchLab
JOIN Archs ON Archs.ID=ArchLab.ARCHID
JOIN Labs ON Labs.ID=ArchLab.LABID
JOIN Cats ON Labs.CATEGORY=Cats.ID
JOIN LabType ON Labs.TYPE=LabType.ID
WHERE ARCHID=1;""")
labels=self.cur.fetchall()
return archive, labels
## Returns all categories.
## OUTPUT: array=[…,(ID:int,CATEGORY:str,PARENT:int,DESCRIPTION:str),…]
def get_all_categories(self):
self.cur.execute("SELECT * FROM Cats;")
return self.cur.fetchall()
## Returns n archives, sorted by (imported )time or size
## OUTPUT: archives:array=[…,(ID:int,NAME:str,SIZE:str,IMPORTED[UNIX]:int),…]
def get_n_archives(self, sorttype:str="time",category:int=0, keywords:list=[], count:int=20):
match sorttype:
case "size":
sorttype="SIZE DESC"
case "time":
sorttype="IMPORTED DESC"
case "za":
sorttype="NAME DESC"
case _:
sorttype="NAME ASC"
# create SQL query for keywords
keyword_string=""
for w in keywords:
keyword_string+=f"AND NAME LIKE '%{w}%' "
if len(keywords) == 1:
for w in keywords:
keyword_string+=f"OR HASH = '{w}' "
# Get all children of category (if exist) and put into query string
categories=self.get_all_categories()
catlist=[str(category)]
for i in categories:
if i[2] == int(category):
catlist.append(str(i[0]))
categories="(" + ",".join(catlist) + ")"
self.cur.execute(f"""SELECT ID,NAME,SIZE,IMPORTED FROM Archs
{"WHERE 1=1" if category==0 else " WHERE CATEGORY IN " + categories}
{keyword_string}
ORDER BY {sorttype} LIMIT {count};""")
archives=self.cur.fetchall()
return archives
if __name__ == "__main__":
#startup()
db=db(conn_params)
db.cur.close()
db.conn.close()
exit()