[SIS-xxx] adding rendering of current Debra donation information

updating for Debra 2023 campaign
adding internal rest api for debra information
adding a debra button to receive information
This commit is contained in:
Sheldan
2023-11-09 01:01:49 +01:00
parent 7449c05462
commit c6f20d617d
73 changed files with 1078 additions and 37 deletions

View File

@@ -0,0 +1,229 @@
from flask import Flask, send_file, request
from io import BytesIO
from PIL import Image, ImageDraw, ImageFont
import requests
import os
import json
import logging
FORMAT = '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
logging.basicConfig(encoding='utf-8', level=logging.INFO, format=FORMAT)
app = Flask(__name__)
sissi_host = os.getenv('SISSI_HOST')
sissi_port = os.getenv('SISSI_PORT')
latest_donations_url = f'http://{sissi_host}:{sissi_port}/debra/latestDonations'
highest_donations_url = f'http://{sissi_host}:{sissi_port}/debra/highestDonations'
campaign_info_url = f'http://{sissi_host}:{sissi_port}/debra/campaignInfo'
class ValidationException(Exception):
def __init__(self, provided_value, message):
self.provided_value = provided_value
self.message = message
super().__init__(f'{self.message}: provided value: {provided_value}')
class DonationImageGenerationConstants:
allowed_fonts = ['Andale_Mono', 'Courier_New', 'Impact', 'Trebuchet_MS_Italic',
'Arial', 'Courier_New_Bold', 'Times_New_Roman', 'Verdana',
'Arial_Black', 'Courier_New_Bold_Italic', 'Times_New_Roman_Bold', 'Verdana_Bold',
'Arial_Bold', 'Courier_New_Italic', 'Times_New_Roman_Bold_Italic', 'Verdana_Bold_Italic',
'Arial_Bold_Italic', 'Georgia', 'Times_New_Roman_Italic', 'Verdana_Italic',
'Arial_Italic', 'Georgia_Bold', 'Trebuchet_MS',
'Comic_Sans_MS', 'Georgia_Bold_Italic', 'Trebuchet_MS_Bold',
'Comic_Sans_MS_Bold', 'Georgia_Italic', 'Trebuchet_MS_Bold_Italic']
font_range = (2, 150)
canvas_width_range = (1, 2500)
canvas_height_range = (1, 2500)
donation_count_range = (1, 25)
color_range = (0, 255)
font_default = 'Arial'
font_size_default = 40
canvas_width_default = 1024
canvas_height_default = 300
donation_count_default = 5
r_default = 0
g_default = 0
b_default = 0
class DonationImageGenerationParameters:
validation_message = ''
validation_value = ''
validated = None
def __init__(self, font_name, font_size, canvas_width, canvas_height, donation_count, r, g, b):
self.font_name = font_name
self.font_size = font_size
self.canvas_width = canvas_width
self.canvas_height = canvas_height
self.donation_count = donation_count
self.color = (r, g, b)
def validate(self):
self.validated = True
if self.font_name not in DonationImageGenerationConstants.allowed_fonts:
self.validated = False
self.validation_message = f'Font must be one of ' + ','.join(DonationImageGenerationConstants.allowed_fonts)
self.validation_value = self.font_name
if self.font_size > DonationImageGenerationConstants.font_range[1] or self.font_size < DonationImageGenerationConstants.font_range[0]:
self.validated = False
self.validation_message = f'Font size must be between {DonationImageGenerationConstants.font_range[0]} and {DonationImageGenerationConstants.font_range[1]}'
self.validation_value = self.font_size
if self.canvas_height > DonationImageGenerationConstants.canvas_height_range[1] or self.canvas_height < DonationImageGenerationConstants.canvas_height_range[0]:
self.validated = False
self.validation_message = f'Canvas height must be between {DonationImageGenerationConstants.canvas_height_range[0]} and {DonationImageGenerationConstants.canvas_height_range[1]}'
self.validation_value = self.canvas_height
if self.canvas_width > DonationImageGenerationConstants.canvas_width_range[1] or self.canvas_width < DonationImageGenerationConstants.canvas_width_range[0]:
self.validated = False
self.validation_message = f'Canvas width must be between {DonationImageGenerationConstants.canvas_width_range[0]} and {DonationImageGenerationConstants.color_range[1]}'
self.validation_value = self.canvas_width
if self.donation_count > DonationImageGenerationConstants.donation_count_range[1] or self.donation_count < DonationImageGenerationConstants.donation_count_range[0]:
self.validated = False
self.validation_message = f'Donation count must be between {DonationImageGenerationConstants.donation_count_range[0]} and {DonationImageGenerationConstants.donation_count_range[1]}'
self.validation_value = self.donation_count
if self.color[0] > DonationImageGenerationConstants.color_range[1] or self.color[0] < DonationImageGenerationConstants.color_range[0]:
self.validated = False
self.validation_message = f'Red must be between {DonationImageGenerationConstants.color_range[0]} and {DonationImageGenerationConstants.color_range[1]} inclusively'
self.validation_value = self.color[0]
if self.color[1] > DonationImageGenerationConstants.color_range[1] or self.color[1] < DonationImageGenerationConstants.color_range[0]:
self.validated = False
self.validation_message = f'Green must be between {DonationImageGenerationConstants.color_range[0]} and {DonationImageGenerationConstants.color_range[1]} inclusively'
self.validation_value = self.color[1]
if self.color[2] > DonationImageGenerationConstants.color_range[1] or self.color[2] < DonationImageGenerationConstants.color_range[0]:
self.validated = False
self.validation_message = f'Blue must be between {DonationImageGenerationConstants.color_range[0]} and {DonationImageGenerationConstants.color_range[1]} inclusively'
self.validation_value = self.color[2]
@app.route('/')
def hello():
return 'Hello, World?'
@app.route('/debra/latestDonations')
def latest_donations():
donation_stats = requests.get(latest_donations_url)
logging.info(f'returning latest donations')
return donation_stats.text
@app.route('/debra/highestDonations')
def highest_donations():
donation_stats = requests.get(highest_donations_url)
logging.info(f'returning highest donations')
return donation_stats.text
@app.route('/debra/campaignInfo')
def campaign_info_route():
donation_stats = requests.get(campaign_info_url)
logging.info(f'returning campaign info')
return donation_stats.text
@app.route('/debra/image/help')
def show_image_generation_help():
def make_param(parameters_obj, name, default, values):
parameters_obj[name] = {
'default': default,
'values': values
}
parameters = {}
make_param(parameters, 'font', DonationImageGenerationConstants.font_default, DonationImageGenerationConstants.allowed_fonts)
make_param(parameters, 'fontSize', DonationImageGenerationConstants.font_size_default, DonationImageGenerationConstants.font_range)
make_param(parameters, 'canvasWidth', DonationImageGenerationConstants.canvas_width_default, DonationImageGenerationConstants.canvas_width_range)
make_param(parameters, 'canvasHeight', DonationImageGenerationConstants.canvas_height_default, DonationImageGenerationConstants.canvas_height_range)
make_param(parameters, 'donationCount', DonationImageGenerationConstants.donation_count_default, DonationImageGenerationConstants.donation_count_range)
make_param(parameters, 'r', DonationImageGenerationConstants.r_default, DonationImageGenerationConstants.color_range)
make_param(parameters, 'g', DonationImageGenerationConstants.g_default, DonationImageGenerationConstants.color_range)
make_param(parameters, 'b', DonationImageGenerationConstants.b_default, DonationImageGenerationConstants.color_range)
info_object = {
'parameters': parameters
}
return json.dumps(info_object)
@app.route('/debra/image/info')
def total_donations_image():
campaign_info = json.loads(requests.get(campaign_info_url).text)
logging.info(f'rendering campaign info')
parameters = parse_image_parameters()
if not parameters.validated:
return parameters.validation_message, 400
img = Image.new('RGBA', (parameters.canvas_width, parameters.canvas_height), (255, 0, 0, 0))
d1 = ImageDraw.Draw(img)
font = ImageFont.truetype(f'{parameters.font_name}.ttf', parameters.font_size)
d1.text((0, 0), f"Aktuell {campaign_info['collected']}/{campaign_info['target']}", fill=parameters.color, font=font)
return serve_pil_image(img)
@app.route('/debra/image/latestDonations')
def latest_donation_image():
donation_stats = json.loads(requests.get(latest_donations_url).text)
logging.info(f'rendering latest donations')
parameters = parse_image_parameters()
if not parameters.validated:
return parameters.validation_message, 400
return rendering_donation_image(donation_stats, parameters)
@app.route('/debra/image/highestDonations')
def highest_donation_image():
donation_stats = json.loads(requests.get(highest_donations_url).text)
logging.info(f'rendering highest donations')
parameters = parse_image_parameters()
if not parameters.validated:
return parameters.validation_message, 400
return rendering_donation_image(donation_stats, parameters)
def rendering_donation_image(donation_stats, parameters):
img = Image.new('RGBA', (parameters.canvas_width, parameters.canvas_height), (255, 0, 0, 0))
d1 = ImageDraw.Draw(img)
font = ImageFont.truetype(f'{parameters.font_name}.ttf', parameters.font_size)
donations_to_draw = donation_stats['donations'][:parameters.donation_count]
height = parameters.font_size
it = 0
for donation in donations_to_draw:
name = donation['firstName'] if not donation['anonymous'] else 'anonym'
d1.text((0, height * it), f"{donation['donationAmount']}€ von {name}", fill=parameters.color, font=font)
it += 1
return serve_pil_image(img)
def parse_image_parameters() -> DonationImageGenerationParameters:
font = request.args.get('font', DonationImageGenerationConstants.font_default)
font_size = int(request.args.get('fontSize', DonationImageGenerationConstants.font_size_default))
canvas_width = int(request.args.get('canvasWidth', DonationImageGenerationConstants.canvas_width_default))
canvas_height = int(request.args.get('canvasHeight', DonationImageGenerationConstants.canvas_height_default))
donation_count = int(request.args.get('donationCount', DonationImageGenerationConstants.donation_count_default))
r = int(request.args.get('r', DonationImageGenerationConstants.r_default))
g = int(request.args.get('g', DonationImageGenerationConstants.g_default))
b = int(request.args.get('b', DonationImageGenerationConstants.b_default))
parameters = DonationImageGenerationParameters(font, font_size, canvas_width, canvas_height, donation_count, r, g, b)
parameters.validate()
return parameters
def serve_pil_image(pil_img):
img_io = BytesIO()
pil_img.save(img_io, 'PNG')
img_io.seek(0)
return send_file(img_io, mimetype='image/png')
if __name__ == "__main__":
from waitress import serve
serve(app, host="0.0.0.0", port=8080)

View File

@@ -0,0 +1,42 @@
from sqlalchemy.sql import text
def import_commands(server_id, commands, connection):
user_ids = [command.author_id for command in commands]
create_users(server_id, user_ids, connection)
command_id = 2
for command in commands:
statement = text("""INSERT INTO custom_command(server_id, id, creator_user_in_server_id, additional_message, name)
VALUES(:server_id, :id,
(select user_in_server_id from user_in_server where user_id = :author_id and server_id = :server_id),
:additional_message, :name)""")
connection.execute(statement,
{'server_id': server_id, 'author_id': command.author_id,
'name': command.command, 'additional_message': command.response,
'id': command_id})
command_id += 1
print(f'Creating command for {command.command}')
def create_users(server_id: int, user_ids, con):
created_users = {}
for user_id in user_ids:
if not does_user_exist(user_id, con) and user_id not in created_users:
create_user(user_id, con)
create_user_in_server(user_id, server_id, con)
created_users[user_id] = 1
def does_user_exist(user_id, con):
statement = text("""SELECT count(1) FROM auser where id = :id""")
return con.execute(statement, {'id': user_id}).fetchone()[0] == 1
def create_user(user_id, con):
statement = text("""INSERT INTO auser(id) VALUES(:id)""")
print(f'Creating user {user_id}')
con.execute(statement, {'id': user_id})
def create_user_in_server(user_id, server_id, con):
statement = text("""INSERT INTO user_in_server(server_id, user_id) VALUES(:server_id, :user_id) returning user_in_server_id""")
return con.execute(statement, {'user_id': user_id, 'server_id': server_id}).fetchone()[0]

View File

@@ -0,0 +1,5 @@
class LegacyCommand:
command = ''
response = ''
author_id = 0

View File

@@ -0,0 +1,24 @@
import json
from dto import LegacyCommand
def load_all_commands():
with open('settings.json') as warnings:
lines = warnings.read()
command_obj = json.loads(lines)
custom_commands = command_obj['414589031223512']['GUILD']['297910194841583616']['commands']
all_command_dtos = []
for command in custom_commands:
custom_command = custom_commands[command]
if custom_command is not None and 'response' in custom_command:
if len(custom_command['response']) > 2048 or isinstance(custom_command['response'], list):
continue
command_dto = LegacyCommand()
command_dto.command = command
command_dto.author_id = custom_command['author']['id']
command_dto.response = custom_command['response']
all_command_dtos.append(command_dto)
print(f'loaded {len(all_command_dtos)} commands.')
return all_command_dtos

View File

@@ -0,0 +1,20 @@
import os
import sqlalchemy as db
from legacy_loader import load_all_commands
from command_importer import import_commands
server_id = os.getenv('SERVER_ID')
all_warnings = load_all_commands()
db_host = os.getenv('DB_HOST')
db_port = os.getenv('DB_PORT')
db_database = os.getenv('DB_NAME')
db_user = os.getenv('DB_USER')
db_password = os.getenv('DB_PASS')
engine = db.create_engine('postgresql://%s:%s@%s:%s/%s' % (db_user, db_password, db_host, db_port, db_database))
with engine.connect() as con:
with con.begin():
import_commands(server_id, all_warnings, con)

View File

@@ -0,0 +1,37 @@
from sqlalchemy.sql import text
def import_credits(server_id, credits, connection):
user_ids = [credit.user_id for credit in credits]
create_users(server_id, user_ids, connection)
for credit in credits:
statement = text("""INSERT INTO economy_user(server_id, last_pay_day, last_slots,
id, credits)
VALUES(:server_id, now(), now(),
(select user_in_server_id from user_in_server where user_id = :user_id and server_id = :server_id), :credits)""")
connection.execute(statement, {'server_id': server_id, 'user_id': credit.user_id, 'credits': credit.credits})
print(f'Creating economy user for {credit.user_id}')
def create_users(server_id: int, user_ids, con):
created_users = {}
for user_id in user_ids:
if not does_user_exist(user_id, con) and user_id not in created_users:
create_user(user_id, con)
create_user_in_server(user_id, server_id, con)
created_users[user_id] = 1
def does_user_exist(user_id, con):
statement = text("""SELECT count(1) FROM auser where id = :id""")
return con.execute(statement, {'id': user_id}).fetchone()[0] == 1
def create_user(user_id, con):
statement = text("""INSERT INTO auser(id) VALUES(:id)""")
print(f'Creating user {user_id}')
con.execute(statement, {'id': user_id})
def create_user_in_server(user_id, server_id, con):
statement = text("""INSERT INTO user_in_server(server_id, user_id) VALUES(:server_id, :user_id) returning user_in_server_id""")
return con.execute(statement, {'user_id': user_id, 'server_id': server_id}).fetchone()[0]

View File

@@ -0,0 +1,3 @@
class LegacyCredits:
user_id = 0
credits = 0

View File

@@ -0,0 +1,20 @@
import json
from dto import LegacyCredits
def load_all_credits():
with open('settings.json') as warnings:
lines = warnings.read()
credit_obj = json.loads(lines)
user_list = credit_obj['384734293238749']['MEMBER']['297910194841583616']
all_credit_dtos = []
for user in user_list:
user_obj = user_list[user]
dto = LegacyCredits()
dto.credits = user_obj['balance']
dto.user_id = user
all_credit_dtos.append(dto)
print(f'loaded {len(all_credit_dtos)} credit entries.')
return all_credit_dtos

View File

@@ -0,0 +1,20 @@
import os
import sqlalchemy as db
from legacy_loader import load_all_credits
from credit_importer import import_credits
server_id = os.getenv('SERVER_ID')
all_warnings = load_all_credits()
db_host = os.getenv('DB_HOST')
db_port = os.getenv('DB_PORT')
db_database = os.getenv('DB_NAME')
db_user = os.getenv('DB_USER')
db_password = os.getenv('DB_PASS')
engine = db.create_engine('postgresql://%s:%s@%s:%s/%s' % (db_user, db_password, db_host, db_port, db_database))
with engine.connect() as con:
with con.begin():
import_credits(server_id, all_warnings, con)

View File

@@ -0,0 +1 @@
*.db

View File

@@ -0,0 +1,17 @@
class LegacyQuoteAttachment:
message_id = 0
file_name = ''
url = ''
is_image: bool = False
class LegacyQuote:
id = 0
channel_id = 0
author_id = 0
adder_id = 0
creation_time_stamp = None
content = ''
message_id = 0
attachments = None

View File

@@ -0,0 +1,54 @@
import sqlite3
from sqlite3 import Error
import datetime
from dto import LegacyQuote, LegacyQuoteAttachment
def create_connection(file: str):
conn = None
try:
conn = sqlite3.connect(file)
except Error as e:
print(e)
return conn
def load_all_quotes_with_attachments(conn):
cur = conn.cursor()
cur.execute("SELECT q.id, q.chan_id, q.author_id, "
"q.adder_id, q.jump_url, q.'timestamp', q.content,"
"a.msg_id, a.filename, a.url, a.is_image "
"FROM quotes q left outer join attachments a ON q.jump_url like '%' || a.msg_id")
rows = cur.fetchall()
quotes = {}
for row in rows:
quote_id = row[0]
if quote_id not in quotes:
quotes[quote_id] = LegacyQuote()
current_quote = quotes[quote_id]
current_quote.id = quote_id
current_quote.channel_id = row[1]
current_quote.author_id = row[2]
current_quote.adder_id = row[3]
if len(row[5]) > 25:
current_quote.creation_time_stamp = datetime.datetime.strptime(row[5], '%Y-%m-%d %H:%M:%S.%f')
elif len(row[5]) != 19:
current_quote.creation_time_stamp = datetime.datetime.strptime(row[5], '%Y-%m-%d %H:%M:%S.%f')
else:
current_quote.creation_time_stamp = datetime.datetime.strptime(row[5], '%Y-%m-%d %H:%M:%S')
current_quote.content = row[6]
current_quote.message_id = row[4][row[4].rindex('/')+1:]
if row[7] is not None:
if current_quote.attachments is None:
current_quote.attachments = []
attachment = LegacyQuoteAttachment()
attachment.message_id = row[7]
attachment.file_name = row[8]
attachment.url = row[9]
attachment.is_image = row[10] == 1
current_quote.attachments.append(attachment)
return quotes

View File

@@ -0,0 +1,22 @@
from legacy_loader import create_connection, load_all_quotes_with_attachments
from quote_importer import insert_quotes, create_users, create_channels
import sqlalchemy as db
import os
server_id = os.getenv('SERVER_ID')
conn = create_connection('new_quotes.db')
all_quotes = load_all_quotes_with_attachments(conn)
db_host = os.getenv('DB_HOST')
db_port = os.getenv('DB_PORT')
db_database = os.getenv('DB_NAME')
db_user = os.getenv('DB_USER')
db_password = os.getenv('DB_PASS')
engine = db.create_engine('postgresql://%s:%s@%s:%s/%s' % (db_user, db_password, db_host, db_port, db_database))
1
with engine.connect() as con:
with con.begin():
create_users(server_id, all_quotes, con)
create_channels(server_id, all_quotes, con)
with con.begin():
insert_quotes(server_id, all_quotes, con)

View File

@@ -0,0 +1,70 @@
from sqlalchemy.sql import text
def insert_quotes(server_id: int, quotes, con):
for quote_id in quotes:
quote = quotes[quote_id]
statement = text("""INSERT INTO quote(id, server_id, author_user_in_server_id, adder_user_in_server_id, source_channel_id, message_id, text, created)
VALUES(:id, :server_id,
(select user_in_server_id from user_in_server where user_id = :author_id and server_id = :server_id),
(select user_in_server_id from user_in_server where user_id = :adder_id and server_id = :server_id),
:channel_id, :message_id, :text, :created)""")
con.execute(statement, {'id': quote.id, 'server_id': server_id, 'author_id': quote.author_id, 'adder_id': quote.adder_id,
'channel_id': quote.channel_id, 'message_id': quote.message_id,
'text': quote.content, 'created': quote.creation_time_stamp})
if quote.attachments:
attachment_statement = text("""INSERT INTO quote_attachment(quote_id, server_id, url, is_image)
VALUES(:quote_id, :server_id, :url, :is_image)""")
for attachment in quote.attachments:
con.execute(attachment_statement, {'quote_id': quote_id, 'server_id': server_id, 'url': attachment.url, 'is_image': attachment.is_image})
def create_channels(server_id: int, quotes, con):
channel_ids = {}
for quote_id in quotes:
quote = quotes[quote_id]
if not does_channel_exist(quote.channel_id, con) and quote.channel_id not in channel_ids:
channel_ids[quote.channel_id] = 1
for channel_id in channel_ids:
create_channel(channel_id, server_id, con)
def create_users(server_id: int, quotes, con):
created_users = {}
for quote_id in quotes:
quote = quotes[quote_id]
if not does_user_exist(quote.adder_id, con) and quote.adder_id not in created_users:
create_user(quote.adder_id, con)
create_user_in_server(quote.adder_id, server_id, con)
created_users[quote.adder_id] = 1
if not does_user_exist(quote.author_id, con) and quote.author_id not in created_users:
create_user(quote.author_id, con)
create_user_in_server(quote.author_id, server_id, con)
created_users[quote.author_id] = 1
def does_user_exist(user_id, con):
statement = text("""SELECT count(1) FROM auser where id = :id""")
return con.execute(statement, {'id': user_id}).fetchone()[0] == 1
def does_channel_exist(channel_id, con):
statement = text("""SELECT count(1) FROM channel where id = :id""")
return con.execute(statement, {'id': channel_id}).fetchone()[0] == 1
def create_user(user_id, con):
statement = text("""INSERT INTO auser(id) VALUES(:id)""")
print(f'Creating user {user_id}')
con.execute(statement, {'id': user_id})
def create_channel(channel_id, server_id, con):
statement = text("""INSERT INTO channel(id, server_id, type, deleted) VALUES(:id, :server_id, 'TEXT', false)""")
print(f'Creating channel {channel_id}')
con.execute(statement, {'id': channel_id, 'server_id': server_id})
def create_user_in_server(user_id, server_id, con):
statement = text("""INSERT INTO user_in_server(server_id, user_id) VALUES(:server_id, :user_id) returning user_in_server_id""")
return con.execute(statement, {'user_id': user_id, 'server_id': server_id}).fetchone()[0]

View File

@@ -0,0 +1,11 @@
class LegacyWarning:
level = 0
reason = ''
author_id = 0
user_id = 0
date = 0
duration = 0
until = 0
mod_log_message_id = 0
mod_log_channel_id = 0

View File

@@ -0,0 +1,32 @@
import json
from datetime import datetime
from dto import LegacyWarning
def load_all_warnings():
with open('settings.json') as warnings:
lines = warnings.read()
warnings_obj = json.loads(lines)
warned_users = warnings_obj['260']['MODLOGS']['297910194841583616']
all_warning_dtos = []
for user in warned_users:
warnings = warned_users[user]['x']
for warning in warnings:
warning_dto = LegacyWarning()
warning_dto.level = warning['level']
warning_dto.author_id = warning['author']
warning_dto.reason = warning['reason']
if 'duration' in warning and warning['duration'] is not None:
warning_dto.duration = int(float(warning['duration']))
if 'until' in warning:
warning_dto.until = warning['until']
warning_dto.date = datetime.fromtimestamp(int(warning['time']))
warning_dto.user_id = user
if 'modlog_message' in warning:
warning_dto.mod_log_channel_id = warning['modlog_message']['channel_id']
warning_dto.mod_log_message_id = warning['modlog_message']['message_id']
all_warning_dtos.append(warning_dto)
print(f'loaded {len(all_warning_dtos)} warnings.')
return all_warning_dtos

View File

@@ -0,0 +1,20 @@
import os
import sqlalchemy as db
from legacy_loader import load_all_warnings
from warning_importer import import_warnings
server_id = os.getenv('SERVER_ID')
all_warnings = load_all_warnings()
db_host = os.getenv('DB_HOST')
db_port = os.getenv('DB_PORT')
db_database = os.getenv('DB_NAME')
db_user = os.getenv('DB_USER')
db_password = os.getenv('DB_PASS')
engine = db.create_engine('postgresql://%s:%s@%s:%s/%s' % (db_user, db_password, db_host, db_port, db_database))
with engine.connect() as con:
with con.begin():
import_warnings(server_id, all_warnings, con)

View File

@@ -0,0 +1,147 @@
from datetime import timedelta
from sqlalchemy.sql import text
from dto import LegacyWarning
def import_warnings(server_id, warnings, connection):
user_ids = [warn.user_id for warn in warnings]
user_ids.extend([warn.author_id for warn in warnings])
create_users(server_id, user_ids, connection)
channel_ids = [warn.mod_log_channel_id for warn in warnings if warn.mod_log_channel_id is not None]
create_channels(server_id, channel_ids, connection)
warning_id = 0
mute_id = 0
for warning in warnings:
if warning.level == 5 or warning.level == 4:
import_ban(server_id, warning, connection)
if warning.level == 1:
warning_id += 1
import_warning(server_id, warning, connection, warning_id)
if warning.level == 2:
mute_id += 1
import_mute(server_id, warning, connection, mute_id)
if warning.level == 3:
import_kick(server_id, warning, connection)
connection.execute(f"INSERT INTO counter(counter, server_id, counter_key) VALUES ({warning_id}, {server_id}, 'WARNINGS')")
connection.execute(f"INSERT INTO counter(counter, server_id, counter_key) VALUES ({mute_id}, {server_id}, 'MUTES')")
def import_mute(server_id, warning: LegacyWarning, connection, mute_id):
statement = text("""INSERT INTO infraction(server_id, infraction_user_in_server_id, infraction_creator_user_in_server_id,
description, type, created, points, decayed)
VALUES(:server_id,
(select user_in_server_id from user_in_server where user_id = :muted_user_id and server_id = :server_id),
(select user_in_server_id from user_in_server where user_id = :author_id and server_id = :server_id),
:text, 'mute', :created, :points, false) returning id""")
infraction_id = connection.execute(statement, {'server_id': server_id, 'author_id': warning.author_id, 'muted_user_id': warning.user_id,
'text': warning.reason if warning.reason is not None else 'No reason provided', 'created': warning.date, 'points': 20}).fetchone()[0]
statement = text("""INSERT INTO infraction_parameter(key, value, infraction_id, created)
VALUES(:key, :value, :infraction_id, :created)""")
if warning.duration is not None:
duration = str(int(warning.duration / 60)) + 'm' if warning.duration < 60 * 60 else str(int(warning.duration / 60 / 60)) + 'h'
duration = duration if warning.duration < 60 * 60 * 24 else str(int(warning.duration / 60 / 60 / 24)) + 'd'
else:
duration = '0s'
connection.execute(statement, {'key': 'DURATION', 'value': duration, 'infraction_id': infraction_id, 'created': warning.date})
print(f'Creating infraction for mute for {warning.user_id}')
statement = text("""INSERT INTO mute(id, server_id, muted_user_in_server_id, muting_user_in_server_id,
reason, mute_date, target_date, mute_ended, created, infraction_id)
VALUES(:mute_id, :server_id,
(select user_in_server_id from user_in_server where user_id = :warned_user_id and server_id = :server_id),
(select user_in_server_id from user_in_server where user_id = :author_id and server_id = :server_id),
:text, :created, :target_date, true, :created, :infraction_id)""")
connection.execute(statement, {'mute_id': mute_id, 'server_id': server_id, 'author_id': warning.author_id, 'warned_user_id': warning.user_id,
'text': warning.reason if warning.reason is not None else 'No reason provided', 'created': warning.date, 'infraction_id': infraction_id,
'target_date': warning.date + timedelta(seconds=warning.duration)})
print(f'Creating warning for {warning.user_id}')
def import_warning(server_id, warning: LegacyWarning, connection, warning_id):
statement = text("""INSERT INTO infraction(server_id, infraction_user_in_server_id, infraction_creator_user_in_server_id,
description, type, created, points, decayed)
VALUES(:server_id,
(select user_in_server_id from user_in_server where user_id = :warned_user_id and server_id = :server_id),
(select user_in_server_id from user_in_server where user_id = :author_id and server_id = :server_id),
:text, 'warn', :created, :points, false) returning id""")
infraction_id = connection.execute(statement, {'server_id': server_id, 'author_id': warning.author_id, 'warned_user_id': warning.user_id,
'text': warning.reason if warning.reason is not None else 'No reason provided', 'created': warning.date, 'points': 10}).fetchone()[0]
print(f'Creating infraction for warning for {warning.user_id}')
statement = text("""INSERT INTO warning(id, server_id, warned_user_in_server_id, warning_user_in_server_id,
reason, warn_date, created, decayed, infraction_id)
VALUES(:warn_id, :server_id,
(select user_in_server_id from user_in_server where user_id = :warned_user_id and server_id = :server_id),
(select user_in_server_id from user_in_server where user_id = :author_id and server_id = :server_id),
:text, :created, :created, false, :infraction_id)""")
connection.execute(statement, {'warn_id': warning_id, 'server_id': server_id, 'author_id': warning.author_id, 'warned_user_id': warning.user_id,
'text': warning.reason if warning.reason is not None else 'No reason provided', 'created': warning.date, 'infraction_id': infraction_id})
print(f'Creating warning for {warning.user_id}')
def import_ban(server_id, warning: LegacyWarning, connection):
statement = text("""INSERT INTO infraction(server_id, infraction_user_in_server_id, infraction_creator_user_in_server_id,
description, type, created, points, decayed)
VALUES(:server_id,
(select user_in_server_id from user_in_server where user_id = :banned_user_id and server_id = :server_id),
(select user_in_server_id from user_in_server where user_id = :author_id and server_id = :server_id),
:text, 'ban', :created, :points, false)""")
connection.execute(statement, {'server_id': server_id, 'author_id': warning.author_id, 'banned_user_id': warning.user_id,
'text': warning.reason if warning.reason is not None else 'No reason provided', 'created': warning.date, 'points': 150})
print(f'Creating ban for {warning.user_id}')
def import_kick(server_id, warning: LegacyWarning, connection):
statement = text("""INSERT INTO infraction(server_id, infraction_user_in_server_id, infraction_creator_user_in_server_id,
description, type, created, points, decayed)
VALUES(:server_id,
(select user_in_server_id from user_in_server where user_id = :kicked_user_id and server_id = :server_id),
(select user_in_server_id from user_in_server where user_id = :author_id and server_id = :server_id),
:text, 'kick', :created, :points, false)""")
connection.execute(statement, {'server_id': server_id, 'author_id': warning.author_id, 'kicked_user_id': warning.user_id,
'text': warning.reason if warning.reason is not None else 'No reason provided', 'created': warning.date, 'points': 50})
print(f'Creating kick for {warning.user_id}')
def create_users(server_id: int, user_ids, con):
created_users = {}
for user_id in user_ids:
if not does_user_exist(user_id, con) and user_id not in created_users:
create_user(user_id, con)
create_user_in_server(user_id, server_id, con)
created_users[user_id] = 1
def does_user_exist(user_id, con):
statement = text("""SELECT count(1) FROM auser where id = :id""")
return con.execute(statement, {'id': user_id}).fetchone()[0] == 1
def create_user(user_id, con):
statement = text("""INSERT INTO auser(id) VALUES(:id)""")
print(f'Creating user {user_id}')
con.execute(statement, {'id': user_id})
def create_channels(server_id: int, channel_ids, con):
channels_to_create = {}
for channel_id in channel_ids:
if not does_channel_exist(channel_id, con):
channels_to_create[channel_id] = 1
for channel_id in channels_to_create:
create_channel(channel_id, server_id, con)
def does_channel_exist(channel_id, con):
statement = text("""SELECT count(1) FROM channel where id = :id""")
return con.execute(statement, {'id': channel_id}).fetchone()[0] == 1
def create_channel(channel_id, server_id, con):
statement = text("""INSERT INTO channel(id, server_id, type, deleted) VALUES(:id, :server_id, 'TEXT', false)""")
print(f'Creating channel {channel_id}')
con.execute(statement, {'id': channel_id, 'server_id': server_id})
def create_user_in_server(user_id, server_id, con):
statement = text("""INSERT INTO user_in_server(server_id, user_id) VALUES(:server_id, :user_id) returning user_in_server_id""")
return con.execute(statement, {'user_id': user_id, 'server_id': server_id}).fetchone()[0]