[AB-166] added locking for runtime storage

This commit is contained in:
Sheldan
2020-11-22 14:43:42 +01:00
parent 448d555dba
commit 1d9f2595db
7 changed files with 110 additions and 71 deletions

View File

@@ -2,6 +2,7 @@ package dev.sheldan.abstracto.experience.job;
import dev.sheldan.abstracto.experience.models.ServerExperience; import dev.sheldan.abstracto.experience.models.ServerExperience;
import dev.sheldan.abstracto.experience.service.AUserExperienceService; import dev.sheldan.abstracto.experience.service.AUserExperienceService;
import dev.sheldan.abstracto.experience.service.RunTimeExperienceService;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.quartz.DisallowConcurrentExecution; import org.quartz.DisallowConcurrentExecution;
import org.quartz.JobExecutionContext; import org.quartz.JobExecutionContext;
@@ -30,17 +31,27 @@ public class ExperiencePersistingJob extends QuartzJobBean {
@Autowired @Autowired
private AUserExperienceService userExperienceService; private AUserExperienceService userExperienceService;
@Autowired
private RunTimeExperienceService runTimeExperienceService;
@Override @Override
protected void executeInternal(JobExecutionContext context) throws JobExecutionException { protected void executeInternal(JobExecutionContext context) throws JobExecutionException {
Map<Long, List<ServerExperience>> runtimeExperience = userExperienceService.getRuntimeExperience(); runTimeExperienceService.takeLock();
log.info("Running experience persisting job."); try {
Long pastMinute = (Instant.now().getEpochSecond() / 60) - 1; Map<Long, List<ServerExperience>> runtimeExperience = runTimeExperienceService.getRuntimeExperience();
if(runtimeExperience.containsKey(pastMinute)) { log.info("Running experience persisting job.");
List<ServerExperience> foundServers = runtimeExperience.get(pastMinute); Long pastMinute = (Instant.now().getEpochSecond() / 60) - 1;
log.info("Found experience from {} servers to persist.", foundServers.size()); if(runtimeExperience.containsKey(pastMinute)) {
userExperienceService.handleExperienceGain(foundServers).thenAccept(aVoid -> List<ServerExperience> foundServers = runtimeExperience.get(pastMinute);
runtimeExperience.remove(pastMinute) log.info("Found experience from {} servers to persist.", foundServers.size());
); userExperienceService.handleExperienceGain(foundServers).thenAccept(aVoid -> {
runTimeExperienceService.takeLock();
runTimeExperienceService.getRuntimeExperience().remove(pastMinute);
runTimeExperienceService.releaseLock();
});
}
} finally {
runTimeExperienceService.releaseLock();
} }
} }

View File

@@ -88,35 +88,35 @@ public class AUserExperienceServiceBean implements AUserExperienceService {
*/ */
@Override @Override
public void addExperience(AUserInAServer userInAServer) { public void addExperience(AUserInAServer userInAServer) {
Long minute = Instant.now().getEpochSecond() / 60; runTimeExperienceService.takeLock();
Map<Long, List<ServerExperience>> runtimeExperience = runTimeExperienceService.getRuntimeExperience(); try {
if(runtimeExperience.containsKey(minute)) { Long minute = Instant.now().getEpochSecond() / 60;
log.trace("Minute {} already tracked, adding user {} in server {}.", Map<Long, List<ServerExperience>> runtimeExperience = runTimeExperienceService.getRuntimeExperience();
minute, userInAServer.getUserReference().getId(), userInAServer.getServerReference().getId()); if(runtimeExperience.containsKey(minute)) {
List<ServerExperience> existing = runtimeExperience.get(minute); log.trace("Minute {} already tracked, adding user {} in server {}.",
for (ServerExperience server : existing) { minute, userInAServer.getUserReference().getId(), userInAServer.getServerReference().getId());
if (server.getServerId().equals(userInAServer.getServerReference().getId()) && server.getUserInServerIds().stream().noneMatch(userInAServer1 -> userInAServer.getUserInServerId().equals(userInAServer1))) { List<ServerExperience> existing = runtimeExperience.get(minute);
server.getUserInServerIds().add(userInAServer.getUserInServerId()); for (ServerExperience server : existing) {
break; if (server.getServerId().equals(userInAServer.getServerReference().getId()) && server.getUserInServerIds().stream().noneMatch(userInAServer1 -> userInAServer.getUserInServerId().equals(userInAServer1))) {
server.getUserInServerIds().add(userInAServer.getUserInServerId());
break;
}
} }
} else {
log.trace("Minute {} did not exist yet. Creating new entry for user {} in server {}.", minute, userInAServer.getUserReference().getId(), userInAServer.getServerReference().getId());
ServerExperience serverExperience = ServerExperience
.builder()
.serverId(userInAServer.getServerReference().getId())
.build();
serverExperience.getUserInServerIds().add(userInAServer.getUserInServerId());
runtimeExperience.put(minute, new ArrayList<>(Arrays.asList(serverExperience)));
} }
} finally {
} else { runTimeExperienceService.releaseLock();
log.trace("Minute {} did not exist yet. Creating new entry for user {} in server {}.", minute, userInAServer.getUserReference().getId(), userInAServer.getServerReference().getId());
ServerExperience serverExperience = ServerExperience
.builder()
.serverId(userInAServer.getServerReference().getId())
.build();
serverExperience.getUserInServerIds().add(userInAServer.getUserInServerId());
runtimeExperience.put(minute, new ArrayList<>(Arrays.asList(serverExperience)));
} }
} }
@Override
public Map<Long, List<ServerExperience>> getRuntimeExperience() {
return runTimeExperienceService.getRuntimeExperience();
}
/** /**
* Calculates the level of the given {@link AUserExperience} according to the given {@link AExperienceLevel} list * Calculates the level of the given {@link AUserExperience} according to the given {@link AExperienceLevel} list

View File

@@ -6,13 +6,23 @@ import org.springframework.stereotype.Component;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@Component @Component
public class RunTimeExperienceService { public class RunTimeExperienceService {
private Map<Long, List<ServerExperience>> runtimeExperience = new HashMap<>(); private Map<Long, List<ServerExperience>> runtimeExperience = new HashMap<>();
private static final Lock lock = new ReentrantLock();
public Map<Long, List<ServerExperience>> getRuntimeExperience() { public Map<Long, List<ServerExperience>> getRuntimeExperience() {
return runtimeExperience; return runtimeExperience;
} }
public void takeLock() {
lock.lock();
}
public void releaseLock() {
lock.unlock();
}
} }

View File

@@ -12,7 +12,6 @@ import dev.sheldan.abstracto.experience.models.database.AExperienceLevel;
import dev.sheldan.abstracto.experience.models.database.AExperienceRole; import dev.sheldan.abstracto.experience.models.database.AExperienceRole;
import dev.sheldan.abstracto.experience.models.database.AUserExperience; import dev.sheldan.abstracto.experience.models.database.AUserExperience;
import java.util.Map;
import java.util.List; import java.util.List;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer; import java.util.function.Consumer;
@@ -31,13 +30,6 @@ public interface AUserExperienceService {
*/ */
void addExperience(AUserInAServer userInAServer); void addExperience(AUserInAServer userInAServer);
/**
* The current representation of the run time experience. Basically a HashMap of minutes to a list of {@link AServer}
* containing a list of {@link AUserInAServer} which should gain experience in the minute used as key in the HashMap
* @return
*/
Map<Long, List<ServerExperience>> getRuntimeExperience();
/** /**
* Calculates the appropriate level of the given {@link AUserExperience} according to the given {@link AExperienceLevel} * Calculates the appropriate level of the given {@link AUserExperience} according to the given {@link AExperienceLevel}
* configuration. * configuration.

View File

@@ -33,15 +33,20 @@ public class EmotePersistingJob extends QuartzJobBean {
@Override @Override
@Transactional @Transactional
protected void executeInternal(JobExecutionContext jobExecutionContext) throws JobExecutionException { protected void executeInternal(JobExecutionContext jobExecutionContext) throws JobExecutionException {
Map<Long, Map<Long, List<PersistingEmote>>> runtimeConfig = trackedEmoteRuntimeService.getRuntimeConfig(); trackedEmoteRuntimeService.takeLock();
log.info("Running statistic persisting job."); try {
Long pastMinute = getPastMinute(); Map<Long, Map<Long, List<PersistingEmote>>> runtimeConfig = trackedEmoteRuntimeService.getRuntimeConfig();
if(runtimeConfig.containsKey(pastMinute)) { log.info("Running statistic persisting job.");
Map<Long, List<PersistingEmote>> foundStatistics = runtimeConfig.get(pastMinute); Long pastMinute = getPastMinute();
log.info("Found emote statistics from {} servers to persist.", foundStatistics.size()); if(runtimeConfig.containsKey(pastMinute)) {
trackedEmoteService.storeEmoteStatistics(foundStatistics); Map<Long, List<PersistingEmote>> foundStatistics = runtimeConfig.get(pastMinute);
runtimeConfig.remove(pastMinute); log.info("Found emote statistics from {} servers to persist.", foundStatistics.size());
checkForPastEmoteStats(pastMinute, runtimeConfig); trackedEmoteService.storeEmoteStatistics(foundStatistics);
runtimeConfig.remove(pastMinute);
checkForPastEmoteStats(pastMinute, runtimeConfig);
}
} finally {
trackedEmoteRuntimeService.releaseLock();
} }
} }

View File

@@ -9,6 +9,8 @@ import org.springframework.stereotype.Component;
import java.time.Instant; import java.time.Instant;
import java.util.*; import java.util.*;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@Component @Component
@Slf4j @Slf4j
@@ -16,6 +18,7 @@ public class TrackedEmoteRuntimeServiceBean implements TrackedEmoteRuntimeServic
@Autowired @Autowired
private TrackedEmoteRunTimeStorage trackedEmoteRunTimeStorage; private TrackedEmoteRunTimeStorage trackedEmoteRunTimeStorage;
private static final Lock runTimeLock = new ReentrantLock();
@Override @Override
public Map<Long, Map<Long, List<PersistingEmote>>> getRuntimeConfig() { public Map<Long, Map<Long, List<PersistingEmote>>> getRuntimeConfig() {
@@ -29,29 +32,34 @@ public class TrackedEmoteRuntimeServiceBean implements TrackedEmoteRuntimeServic
@Override @Override
public void addEmoteForServer(Emote emote, Guild guild, Long count, boolean external) { public void addEmoteForServer(Emote emote, Guild guild, Long count, boolean external) {
Long key = getKey(); takeLock();
PersistingEmote newPersistentEmote = createFromEmote(guild, emote, count, external); try {
if(trackedEmoteRunTimeStorage.contains(key)) { Long key = getKey();
Map<Long, List<PersistingEmote>> elementsForKey = trackedEmoteRunTimeStorage.get(key); PersistingEmote newPersistentEmote = createFromEmote(guild, emote, count, external);
if(elementsForKey.containsKey(guild.getIdLong())) { if (trackedEmoteRunTimeStorage.contains(key)) {
List<PersistingEmote> persistingEmotes = elementsForKey.get(guild.getIdLong()); Map<Long, List<PersistingEmote>> elementsForKey = trackedEmoteRunTimeStorage.get(key);
Optional<PersistingEmote> existingEmote = persistingEmotes if (elementsForKey.containsKey(guild.getIdLong())) {
.stream() List<PersistingEmote> persistingEmotes = elementsForKey.get(guild.getIdLong());
.filter(persistingEmote -> persistingEmote.getEmoteId().equals(emote.getIdLong())) Optional<PersistingEmote> existingEmote = persistingEmotes
.findFirst(); .stream()
existingEmote.ifPresent(persistingEmote -> persistingEmote.setCount(persistingEmote.getCount() + count)); .filter(persistingEmote -> persistingEmote.getEmoteId().equals(emote.getIdLong()))
if(!existingEmote.isPresent()) { .findFirst();
persistingEmotes.add(newPersistentEmote); existingEmote.ifPresent(persistingEmote -> persistingEmote.setCount(persistingEmote.getCount() + count));
if (!existingEmote.isPresent()) {
persistingEmotes.add(newPersistentEmote);
}
} else {
log.trace("Adding emote {} to list of server {}.", newPersistentEmote.getEmoteId(), guild.getIdLong());
elementsForKey.put(guild.getIdLong(), new ArrayList<>(Arrays.asList(newPersistentEmote)));
} }
} else { } else {
log.trace("Adding emote {} to list of server {}.", newPersistentEmote.getEmoteId(), guild.getIdLong()); HashMap<Long, List<PersistingEmote>> serverEmotes = new HashMap<>();
elementsForKey.put(guild.getIdLong(), new ArrayList<>(Arrays.asList(newPersistentEmote))); serverEmotes.put(guild.getIdLong(), new ArrayList<>(Arrays.asList(newPersistentEmote)));
log.trace("Adding emote map entry for server {}.", guild.getIdLong());
trackedEmoteRunTimeStorage.put(key, serverEmotes);
} }
} else { } finally {
HashMap<Long, List<PersistingEmote>> serverEmotes = new HashMap<>(); releaseLock();
serverEmotes.put(guild.getIdLong(), new ArrayList<>(Arrays.asList(newPersistentEmote)));
log.trace("Adding emote map entry for server {}.", guild.getIdLong());
trackedEmoteRunTimeStorage.put(key, serverEmotes);
} }
} }
@@ -79,4 +87,15 @@ public class TrackedEmoteRuntimeServiceBean implements TrackedEmoteRuntimeServic
.serverId(guild.getIdLong()) .serverId(guild.getIdLong())
.build(); .build();
} }
@Override
public void takeLock() {
runTimeLock.lock();
}
@Override
public void releaseLock() {
runTimeLock.unlock();
}
} }

View File

@@ -14,4 +14,6 @@ public interface TrackedEmoteRuntimeService {
Long getKey(); Long getKey();
PersistingEmote createFromEmote(Guild guild, Emote emote, boolean external); PersistingEmote createFromEmote(Guild guild, Emote emote, boolean external);
PersistingEmote createFromEmote(Guild guild, Emote emote, Long count, boolean external); PersistingEmote createFromEmote(Guild guild, Emote emote, Long count, boolean external);
void takeLock();
void releaseLock();
} }