[AB-10] attempt to fix the race condition when creating a starboard post

fixes #10
This commit is contained in:
Sheldan
2022-06-20 23:32:33 +02:00
parent c84a76b6a3
commit c02bf4aa8e
3 changed files with 49 additions and 20 deletions

View File

@@ -1,6 +1,7 @@
package dev.sheldan.abstracto.starboard.listener; package dev.sheldan.abstracto.starboard.listener;
import dev.sheldan.abstracto.core.config.FeatureDefinition; import dev.sheldan.abstracto.core.config.FeatureDefinition;
import dev.sheldan.abstracto.core.exception.AbstractoRunTimeException;
import dev.sheldan.abstracto.core.listener.DefaultListenerResult; import dev.sheldan.abstracto.core.listener.DefaultListenerResult;
import dev.sheldan.abstracto.core.listener.async.jda.AsyncReactionAddedListener; import dev.sheldan.abstracto.core.listener.async.jda.AsyncReactionAddedListener;
import dev.sheldan.abstracto.core.metric.service.CounterMetric; import dev.sheldan.abstracto.core.metric.service.CounterMetric;
@@ -42,7 +43,12 @@ public class StarAddedListener extends StarboardListener implements AsyncReactio
log.info("User {} in server {} reacted with star to put a message {} from channel {} on starboard.", log.info("User {} in server {} reacted with star to put a message {} from channel {} on starboard.",
model.getUserReacting().getUserId(), model.getServerId(), model.getMessage().getMessageId(), model.getMessage().getChannelId()); model.getUserReacting().getUserId(), model.getServerId(), model.getMessage().getMessageId(), model.getMessage().getChannelId());
Optional<CachedReactions> reactionOptional = emoteService.getReactionFromMessageByEmote(model.getMessage(), aEmote); Optional<CachedReactions> reactionOptional = emoteService.getReactionFromMessageByEmote(model.getMessage(), aEmote);
handleStarboardPostChange(model.getMessage(), reactionOptional.orElse(null), model.getUserReacting(), true); try {
handleStarboardPostChange(model.getMessage(), reactionOptional.orElse(null), model.getUserReacting(), true);
} catch (InterruptedException e) {
log.error("Starboard post change failed in guild {} for message {} in channel {}", serverId, model.getMessage().getChannelId(), model.getMessage().getMessageId(), e);
throw new AbstractoRunTimeException(e);
}
return DefaultListenerResult.PROCESSED; return DefaultListenerResult.PROCESSED;
} else { } else {
return DefaultListenerResult.IGNORED; return DefaultListenerResult.IGNORED;

View File

@@ -1,6 +1,7 @@
package dev.sheldan.abstracto.starboard.listener; package dev.sheldan.abstracto.starboard.listener;
import dev.sheldan.abstracto.core.config.FeatureDefinition; import dev.sheldan.abstracto.core.config.FeatureDefinition;
import dev.sheldan.abstracto.core.exception.AbstractoRunTimeException;
import dev.sheldan.abstracto.core.listener.DefaultListenerResult; import dev.sheldan.abstracto.core.listener.DefaultListenerResult;
import dev.sheldan.abstracto.core.listener.async.jda.AsyncReactionRemovedListener; import dev.sheldan.abstracto.core.listener.async.jda.AsyncReactionRemovedListener;
import dev.sheldan.abstracto.core.metric.service.CounterMetric; import dev.sheldan.abstracto.core.metric.service.CounterMetric;
@@ -43,7 +44,12 @@ public class StarRemovedListener extends StarboardListener implements AsyncReact
log.info("User {} in server {} removed star reaction from message {} on starboard.", log.info("User {} in server {} removed star reaction from message {} on starboard.",
userRemoving.getUserId(), model.getServerId(), model.getMessage().getMessageId()); userRemoving.getUserId(), model.getServerId(), model.getMessage().getMessageId());
Optional<CachedReactions> reactionOptional = emoteService.getReactionFromMessageByEmote(model.getMessage(), aEmote); Optional<CachedReactions> reactionOptional = emoteService.getReactionFromMessageByEmote(model.getMessage(), aEmote);
handleStarboardPostChange(model.getMessage(), reactionOptional.orElse(null), userRemoving, false); try {
handleStarboardPostChange(model.getMessage(), reactionOptional.orElse(null), userRemoving, false);
} catch (InterruptedException e) {
log.error("Starboard post change failed in guild {} for message {} in channel {}", guildId, model.getMessage().getChannelId(), model.getMessage().getMessageId(), e);
throw new AbstractoRunTimeException(e);
}
return DefaultListenerResult.PROCESSED; return DefaultListenerResult.PROCESSED;
} else { } else {
return DefaultListenerResult.IGNORED; return DefaultListenerResult.IGNORED;

View File

@@ -9,6 +9,7 @@ import dev.sheldan.abstracto.core.models.cache.CachedReactions;
import dev.sheldan.abstracto.core.models.database.AUserInAServer; import dev.sheldan.abstracto.core.models.database.AUserInAServer;
import dev.sheldan.abstracto.core.service.ConfigService; import dev.sheldan.abstracto.core.service.ConfigService;
import dev.sheldan.abstracto.core.service.EmoteService; import dev.sheldan.abstracto.core.service.EmoteService;
import dev.sheldan.abstracto.core.service.LockService;
import dev.sheldan.abstracto.core.service.management.ConfigManagementService; import dev.sheldan.abstracto.core.service.management.ConfigManagementService;
import dev.sheldan.abstracto.core.service.management.UserInServerManagementService; import dev.sheldan.abstracto.core.service.management.UserInServerManagementService;
import dev.sheldan.abstracto.starboard.config.StarboardFeatureConfig; import dev.sheldan.abstracto.starboard.config.StarboardFeatureConfig;
@@ -27,6 +28,7 @@ import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.Objects; import java.util.Objects;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.Semaphore;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@Component @Component
@@ -63,6 +65,8 @@ public abstract class StarboardListener {
public static final String STARBOARD_POSTS = "starboard.posts"; public static final String STARBOARD_POSTS = "starboard.posts";
public static final String STAR_ACTION = "action"; public static final String STAR_ACTION = "action";
public static final Semaphore STARBOARD_SEMAPHORE = new Semaphore(1, true);
protected static final CounterMetric STARBOARD_STARS_THRESHOLD_REACHED = CounterMetric protected static final CounterMetric STARBOARD_STARS_THRESHOLD_REACHED = CounterMetric
.builder() .builder()
.name(STARBOARD_POSTS) .name(STARBOARD_POSTS)
@@ -75,16 +79,16 @@ public abstract class StarboardListener {
.tagList(Arrays.asList(MetricTag.getTag(STAR_ACTION, "threshold.below"))) .tagList(Arrays.asList(MetricTag.getTag(STAR_ACTION, "threshold.below")))
.build(); .build();
protected void handleStarboardPostChange(CachedMessage message, CachedReactions reaction, ServerUser userReacting, boolean adding) { protected void handleStarboardPostChange(CachedMessage message, CachedReactions reaction, ServerUser userReacting, boolean adding) throws InterruptedException {
Long starMaxDays = configService.getLongValueOrConfigDefault(StarboardFeatureConfig.STAR_MAX_DAYS_CONFIG_KEY, message.getServerId()); Long starMaxDays = configService.getLongValueOrConfigDefault(StarboardFeatureConfig.STAR_MAX_DAYS_CONFIG_KEY, message.getServerId());
if(message.getTimeCreated().isBefore(Instant.now().minus(starMaxDays, ChronoUnit.DAYS))) { if(message.getTimeCreated().isBefore(Instant.now().minus(starMaxDays, ChronoUnit.DAYS))) {
log.info("Post {} in channel {} in guild {} is beyond the configured max day amount of {} - ignoring.", message.getMessageId(), message.getChannelId(), message.getServerId(), starMaxDays); log.info("Post {} in channel {} in guild {} is beyond the configured max day amount of {} - ignoring.", message.getMessageId(), message.getChannelId(), message.getServerId(), starMaxDays);
return; return;
} }
Optional<StarboardPost> starboardPostOptional = starboardPostManagementService.findByMessageId(message.getMessageId()); Optional<StarboardPost> firstOptional = starboardPostManagementService.findByMessageId(message.getMessageId());
boolean starboardPostExists = starboardPostOptional.isPresent(); boolean starboardPostExists = firstOptional.isPresent();
if(starboardPostExists && starboardPostOptional.get().isIgnored()) { if(starboardPostExists && firstOptional.get().isIgnored()) {
log.info("Starboard post {} for message {} in server {} is ignored. Doing nothing.", starboardPostOptional.get().getId(), message.getMessageId(), message.getServerId()); log.info("Starboard post {} for message {} in server {} is ignored. Doing nothing.", firstOptional.get().getId(), message.getMessageId(), message.getServerId());
return; return;
} }
if(reaction != null) { if(reaction != null) {
@@ -93,29 +97,42 @@ public abstract class StarboardListener {
Long starMinimum = getFromConfig(FIRST_LEVEL_THRESHOLD_KEY, message.getServerId()); Long starMinimum = getFromConfig(FIRST_LEVEL_THRESHOLD_KEY, message.getServerId());
AUserInAServer userAddingReaction = userInServerManagementService.loadOrCreateUser(userReacting); AUserInAServer userAddingReaction = userInServerManagementService.loadOrCreateUser(userReacting);
if (userExceptAuthor.size() >= starMinimum) { if (userExceptAuthor.size() >= starMinimum) {
log.info("Post reached starboard minimum. Message {} in channel {} in server {} will be starred/updated.", try {
message.getMessageId(), message.getChannelId(), message.getServerId()); STARBOARD_SEMAPHORE.acquire();
if(starboardPostExists) { Optional<StarboardPost> secondOptional = starboardPostManagementService.findByMessageId(message.getMessageId());
updateStarboardPost(message, userAddingReaction, adding, starboardPostOptional.get(), userExceptAuthor); log.info("Post reached starboard minimum. Message {} in channel {} in server {} will be starred/updated.",
} else { message.getMessageId(), message.getChannelId(), message.getServerId());
metricService.incrementCounter(STARBOARD_STARS_THRESHOLD_REACHED); if(secondOptional.isPresent()) {
log.info("Creating starboard post for message {} in channel {} in server {}", message.getMessageId(), message.getChannelId(), message.getServerId()); updateStarboardPost(message, userAddingReaction, adding, secondOptional.get(), userExceptAuthor);
starboardService.createStarboardPost(message, userExceptAuthor, userAddingReaction, author).exceptionally(throwable -> { STARBOARD_SEMAPHORE.release();
log.error("Failed to persist starboard post for message {}.", message.getMessageId(), throwable); } else {
return null; metricService.incrementCounter(STARBOARD_STARS_THRESHOLD_REACHED);
}); log.info("Creating starboard post for message {} in channel {} in server {}", message.getMessageId(), message.getChannelId(), message.getServerId());
starboardService.createStarboardPost(message, userExceptAuthor, userAddingReaction, author).exceptionally(throwable -> {
log.error("Failed to persist starboard post for message {}.", message.getMessageId(), throwable);
STARBOARD_SEMAPHORE.release();
return null;
}).thenAccept(unused -> {
log.info("Releasing starboard post lock for message {}.", message.getMessageId());
STARBOARD_SEMAPHORE.release();
});
}
} catch (Throwable throwable) {
log.error("Failed to create starboard post for message {}.", message.getMessageId(), throwable);
STARBOARD_SEMAPHORE.release();
throw throwable;
} }
} else { } else {
if(starboardPostExists) { if(starboardPostExists) {
metricService.incrementCounter(STARBOARD_STARS_THRESHOLD_FELL); metricService.incrementCounter(STARBOARD_STARS_THRESHOLD_FELL);
log.info("Removing starboard post for message {} in channel {} in server {}. It fell under the threshold {}", message.getMessageId(), message.getChannelId(), message.getServerId(), starMinimum); log.info("Removing starboard post for message {} in channel {} in server {}. It fell under the threshold {}", message.getMessageId(), message.getChannelId(), message.getServerId(), starMinimum);
starboardPostOptional.ifPresent(starboardPost -> completelyRemoveStarboardPost(starboardPost, userReacting)); firstOptional.ifPresent(starboardPost -> completelyRemoveStarboardPost(starboardPost, userReacting));
} }
} }
} else { } else {
if(starboardPostExists) { if(starboardPostExists) {
log.info("Removing starboard post for message {} in channel {} in server {}", message.getMessageId(), message.getChannelId(), message.getServerId()); log.info("Removing starboard post for message {} in channel {} in server {}", message.getMessageId(), message.getChannelId(), message.getServerId());
starboardPostOptional.ifPresent(starboardPost -> completelyRemoveStarboardPost(starboardPost, userReacting)); firstOptional.ifPresent(starboardPost -> completelyRemoveStarboardPost(starboardPost, userReacting));
} }
} }
} }