[AB-209] adding cleanup job for old embedded messages, this job deletes the entries from the database and removes reactions

reducing thread count for listener executor and scheduling
fixing channel deletion listener not being part of a transaction
This commit is contained in:
Sheldan
2021-04-06 20:23:26 +02:00
parent 61d16d5985
commit e7b1fbe9d1
21 changed files with 280 additions and 10 deletions

View File

@@ -0,0 +1,31 @@
package dev.sheldan.abstracto.linkembed.job;
import dev.sheldan.abstracto.linkembed.service.MessageEmbedService;
import lombok.extern.slf4j.Slf4j;
import org.quartz.DisallowConcurrentExecution;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.quartz.PersistJobDataAfterExecution;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.quartz.QuartzJobBean;
import org.springframework.stereotype.Component;
@Slf4j
@DisallowConcurrentExecution
@Component
@PersistJobDataAfterExecution
public class LinkEmbedCleanupJob extends QuartzJobBean {
@Autowired
private MessageEmbedService messageEmbedService;
@Override
protected void executeInternal(JobExecutionContext jobExecutionContext) throws JobExecutionException {
try {
log.info("Executing link embed clean up job.");
messageEmbedService.cleanUpOldMessageEmbeds();
} catch (Exception exception) {
log.error("Link embed cleanup job failed.", exception);
}
}
}

View File

@@ -4,7 +4,12 @@ package dev.sheldan.abstracto.linkembed.repository;
import dev.sheldan.abstracto.linkembed.model.database.EmbeddedMessage;
import org.springframework.data.jpa.repository.JpaRepository;
import java.time.Instant;
import java.util.List;
public interface EmbeddedMessageRepository extends JpaRepository<EmbeddedMessage, Long> {
EmbeddedMessage findByEmbeddingMessageId(Long messageId);
List<EmbeddedMessage> findByCreatedLessThan(Instant date);
void deleteByEmbeddingMessageIdIn(List<Long> embeddedMessageId);
}

View File

@@ -1,5 +1,6 @@
package dev.sheldan.abstracto.linkembed.service;
import dev.sheldan.abstracto.core.models.ServerChannelMessage;
import dev.sheldan.abstracto.core.models.cache.CachedMessage;
import dev.sheldan.abstracto.core.models.database.AUserInAServer;
import dev.sheldan.abstracto.core.models.template.listener.MessageEmbeddedModel;
@@ -9,22 +10,31 @@ import dev.sheldan.abstracto.core.service.management.ServerManagementService;
import dev.sheldan.abstracto.core.service.management.UserInServerManagementService;
import dev.sheldan.abstracto.core.templating.model.MessageToSend;
import dev.sheldan.abstracto.core.templating.service.TemplateService;
import dev.sheldan.abstracto.core.utils.CompletableFutureList;
import dev.sheldan.abstracto.core.utils.FutureUtils;
import dev.sheldan.abstracto.linkembed.model.MessageEmbedLink;
import dev.sheldan.abstracto.linkembed.model.database.EmbeddedMessage;
import dev.sheldan.abstracto.linkembed.service.management.MessageEmbedPostManagementService;
import lombok.extern.slf4j.Slf4j;
import net.dv8tion.jda.api.entities.Message;
import net.dv8tion.jda.api.entities.TextChannel;
import net.dv8tion.jda.api.entities.User;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
@Component
@Slf4j
@@ -68,6 +78,15 @@ public class MessageEmbedServiceBean implements MessageEmbedService {
@Autowired
private ReactionService reactionService;
@Autowired
private MessageService messageService;
@Autowired
private EmoteService emoteService;
@Value("${abstracto.feature.linkEmbed.removalDays}")
private Long embedRemovalDays;
@Override
public List<MessageEmbedLink> getLinksInMessage(String message) {
List<MessageEmbedLink> links = new ArrayList<>();
@@ -97,7 +116,6 @@ public class MessageEmbedServiceBean implements MessageEmbedService {
linksToEmbed.forEach(messageEmbedLink ->
messageCache.getMessageFromCache(messageEmbedLink.getServerId(), messageEmbedLink.getChannelId(), messageEmbedLink.getMessageId())
.thenAccept(cachedMessage -> self.embedLink(cachedMessage, target, userEmbeddingUserInServerId, embeddingMessage)
).exceptionally(throwable -> {
log.error("Message embedding from cache failed for message {}.", messageEmbedLink.getMessageId(), throwable);
return null;
@@ -113,6 +131,60 @@ public class MessageEmbedServiceBean implements MessageEmbedService {
);
}
@Override
public CompletableFuture<Void> cleanUpOldMessageEmbeds() {
Instant oldestDate = Instant.now().truncatedTo(ChronoUnit.DAYS).minus(embedRemovalDays, ChronoUnit.DAYS);
List<EmbeddedMessage> embeddedMessages = messageEmbedPostManagementService.getEmbeddedMessagesOlderThan(oldestDate);
if(embeddedMessages.isEmpty()) {
log.info("No embedded messages to clean up.");
return CompletableFuture.completedFuture(null);
}
log.info("Cleaning up {} embedded embeddedMessages", embeddedMessages.size());
List<ServerChannelMessage> serverChannelMessages = embeddedMessages.stream().map(embeddedMessage ->
ServerChannelMessage
.builder()
.serverId(embeddedMessage.getEmbeddedServer().getId())
.channelId(embeddedMessage.getEmbeddedChannel().getId())
.messageId(embeddedMessage.getEmbeddingMessageId())
.build()
)
.collect(Collectors.toList());
List<Long> embeddedMessagesHandled = embeddedMessages
.stream()
.map(EmbeddedMessage::getEmbeddingMessageId)
.collect(Collectors.toList());
List<CompletableFuture<Message>> messageFutures = messageService.retrieveMessages(serverChannelMessages);
CompletableFutureList<Message> future = new CompletableFutureList<>(messageFutures);
return future.getMainFuture()
.handle((unused, throwable) -> self.removeReactions(future.getObjects()))
.thenCompose(Function.identity())
// deleting the messages from db regardless of exceptions, at most the reaction remains
.whenComplete((unused, throwable) -> self.deleteEmbeddedMessages(embeddedMessagesHandled))
.exceptionally(throwable -> {
log.error("Failed to clean up embedded messages.", throwable);
return null;
});
}
@Transactional
public CompletableFuture<Void> removeReactions(List<Message> allMessages) {
List<CompletableFuture<Void>> removalFutures = new ArrayList<>();
Map<Long, List<Message>> groupedPerServer = allMessages
.stream()
.collect(Collectors.groupingBy(message -> message.getGuild().getIdLong()));
groupedPerServer.forEach((serverId, serverMessages) -> {
// we assume the emote remained the same
CompletableFutureList<Void> removalFuture = reactionService.removeReactionFromMessagesWithFutureWithFutureList(serverMessages, REMOVAL_EMOTE);
removalFutures.add(removalFuture.getMainFuture());
});
return FutureUtils.toSingleFutureGeneric(removalFutures);
}
@Transactional
public void deleteEmbeddedMessages(List<Long> embeddedMessagesToDelete) {
messageEmbedPostManagementService.deleteEmbeddedMessagesViaId(embeddedMessagesToDelete);
}
@Transactional
public CompletableFuture<Void> sendEmbeddingMessage(CachedMessage cachedMessage, TextChannel target, Long userEmbeddingUserInServerId, MessageEmbeddedModel messageEmbeddedModel) {
MessageToSend embed = templateService.renderEmbedTemplate(MESSAGE_EMBED_TEMPLATE, messageEmbeddedModel, target.getGuild().getIdLong());

View File

@@ -16,6 +16,8 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import java.time.Instant;
import java.util.List;
import java.util.Optional;
@Component
@@ -76,4 +78,15 @@ public class MessageEmbedPostManagementServiceBean implements MessageEmbedPostMa
embeddedMessageRepository.delete(embeddedMessage);
}
@Override
public List<EmbeddedMessage> getEmbeddedMessagesOlderThan(Instant date) {
return embeddedMessageRepository.findByCreatedLessThan(date);
}
@Override
public void deleteEmbeddedMessagesViaId(List<Long> embeddingMessageId) {
log.info("Deleting {} embedded messages from db.", embeddingMessageId.size());
embeddedMessageRepository.deleteByEmbeddingMessageIdIn(embeddingMessageId);
}
}

View File

@@ -1,2 +1,5 @@
abstracto.featureFlags.linkEmbeds.featureName=linkEmbeds
abstracto.featureFlags.linkEmbeds.enabled=false
abstracto.feature.linkEmbed.removalDays=1

View File

@@ -0,0 +1,10 @@
<?xml version="1.1" encoding="UTF-8" standalone="no"?>
<databaseChangeLog xmlns="http://www.liquibase.org/xml/ns/dbchangelog"
xmlns:ext="http://www.liquibase.org/xml/ns/dbchangelog-ext"
xmlns:pro="http://www.liquibase.org/xml/ns/pro"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog ../dbchangelog-3.8.xsd
http://www.liquibase.org/xml/ns/dbchangelog-ext ../dbchangelog-3.8.xsd
http://www.liquibase.org/xml/ns/pro ../dbchangelog-3.8.xsd" >
<include file="link-embed-seedData/data.xml" relativeToChangelogFile="true"/>
</databaseChangeLog>

View File

@@ -0,0 +1,10 @@
<?xml version="1.1" encoding="UTF-8" standalone="no"?>
<databaseChangeLog xmlns="http://www.liquibase.org/xml/ns/dbchangelog"
xmlns:ext="http://www.liquibase.org/xml/ns/dbchangelog-ext"
xmlns:pro="http://www.liquibase.org/xml/ns/pro"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog ../../dbchangelog-3.8.xsd
http://www.liquibase.org/xml/ns/dbchangelog-ext ../../dbchangelog-3.8.xsd
http://www.liquibase.org/xml/ns/pro ../../dbchangelog-3.8.xsd" >
<include file="linkEmbedCleanupJob.xml" relativeToChangelogFile="true"/>
</databaseChangeLog>

View File

@@ -0,0 +1,19 @@
<?xml version="1.1" encoding="UTF-8" standalone="no"?>
<databaseChangeLog xmlns="http://www.liquibase.org/xml/ns/dbchangelog"
xmlns:ext="http://www.liquibase.org/xml/ns/dbchangelog-ext"
xmlns:pro="http://www.liquibase.org/xml/ns/pro"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog ../../dbchangelog-3.8.xsd
http://www.liquibase.org/xml/ns/dbchangelog-ext ../../dbchangelog-3.8.xsd
http://www.liquibase.org/xml/ns/pro ../../dbchangelog-3.8.xsd" >
<changeSet author="Sheldan" id="linkEmbed-cleanup-insert">
<insert tableName="scheduler_job">
<column name="name" value="linkEmbedCleanupJob"/>
<column name="group_name" value="linkEmbed"/>
<column name="clazz" value="dev.sheldan.abstracto.linkembed.job.LinkEmbedCleanupJob"/>
<column name="active" value="true"/>
<column name="cron_expression" value="0 0 0 * * ?"/>
<column name="recovery" value="false"/>
</insert>
</changeSet>
</databaseChangeLog>

View File

@@ -7,4 +7,5 @@
http://www.liquibase.org/xml/ns/dbchangelog-ext dbchangelog-3.8.xsd
http://www.liquibase.org/xml/ns/pro dbchangelog-3.8.xsd" >
<include file="1.0-link-embed/collection.xml" relativeToChangelogFile="true"/>
<include file="1.2.8-link-embed/collection.xml" relativeToChangelogFile="true"/>
</databaseChangeLog>