LiveManager.java
/*
* The coLAB project
* Copyright (C) 2021-2023 AlbaSim, MEI, HEIG-VD, HES-SO
*
* Licensed under the MIT License
*/
package ch.colabproject.colab.api.microchanges.live;
import ch.colabproject.colab.api.controller.EntityGatheringBagForPropagation;
import ch.colabproject.colab.api.controller.RequestManager;
import ch.colabproject.colab.api.controller.document.BlockManager;
import ch.colabproject.colab.api.exceptions.ColabMergeException;
import ch.colabproject.colab.api.microchanges.live.monitoring.BlockMonitoring;
import ch.colabproject.colab.api.microchanges.model.Change;
import ch.colabproject.colab.api.microchanges.tools.CancelDebounce;
import ch.colabproject.colab.api.microchanges.tools.Debouncer;
import ch.colabproject.colab.api.model.card.Card;
import ch.colabproject.colab.api.model.card.CardContent;
import ch.colabproject.colab.api.model.card.CardType;
import ch.colabproject.colab.api.model.card.CardTypeRef;
import ch.colabproject.colab.api.model.document.Resource;
import ch.colabproject.colab.api.model.document.Resourceable;
import ch.colabproject.colab.api.model.document.TextDataBlock;
import ch.colabproject.colab.api.model.project.Project;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
import javax.ejb.Lock;
import javax.ejb.LockType;
import javax.ejb.Singleton;
import javax.ejb.TransactionAttribute;
import javax.ejb.TransactionAttributeType;
import javax.inject.Inject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.hazelcast.cluster.Member;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.IExecutorService;
import com.hazelcast.cp.lock.FencedLock;
import com.hazelcast.map.IMap;
/**
* Micro Changes Management.
*
* @author maxence
*/
@Singleton
@Lock(LockType.READ)
public class LiveManager implements Serializable {
private static final long serialVersionUID = 1L;
/** logger */
private static final Logger logger = LoggerFactory.getLogger(LiveManager.class);
/** delayed process call */
private final Map<Long, Future<Void>> debounces = new HashMap<>();
/** Block specific logic management */
@Inject
private BlockManager blockManager;
/** Hazelcast instance. */
@Inject
private HazelcastInstance hzInstance;
/** The request manager */
@Inject
private RequestManager requestManager;
/**
* To register changes as updated object
*/
@Inject
private EntityGatheringBagForPropagation transactionManager;
/** Get shared cache of microchanges */
private IMap<Long, LiveUpdates> getCache() {
return hzInstance.getMap("MICROCHANGES_CAHCE");
}
/**
* Get the lock for the given block id
*
* @param id id of the block
*
* @return the lock
*/
private FencedLock getLock(Long id) {
return hzInstance.getCPSubsystem().getLock("Block" + id);
}
/**
* Lock the block
*
* @param id id of the block to lock
*/
private void lock(Long id) {
logger.trace("Lock block #{}", id);
getLock(id).lock();
}
/**
* Unlock the block
*
* @param id id of the block to unlock
*/
private void unlock(Long id) {
logger.trace("Unlock block #{}", id);
getLock(id).unlock();
}
/**
* Get pending updates for the given block
*
* @param id of the block
*
* @return LiveUpdate object
*/
private LiveUpdates get(Long id) {
logger.debug("Get LiveUpdates for block #{}", id);
try {
this.lock(id);
LiveUpdates get = getCache().get(id);
if (get != null) {
logger.trace("Get existing {}", get);
return get;
} else {
LiveUpdates l = new LiveUpdates();
TextDataBlock block = blockManager.findBlock(id);
l.setRevision(block.getRevision());
l.setContent(block.getTextData());
l.setTargetClass(block.getJsonDiscriminator());
l.setTargetId(block.getId());
logger.trace("new empty LiveUpdates {}", l);
return l;
}
} finally {
this.unlock(id);
}
}
/**
* Patch a block. Add the given changes to the list of pending changes and schedule changes
* processing.
*
* @param id id of the block to patch
* @param patch the patch to apply
*/
public void patchBlock(Long id, Change patch) {
logger.debug("Patch block #{} with {}", id, patch);
TextDataBlock block = blockManager.findBlock(id);
try {
this.lock(id);
LiveUpdates get = get(id);
List<Change> changes = get.getPendingChanges();
Set<String> basedOn = patch.getBasedOn();
boolean parentExists = basedOn.contains(block.getRevision())
|| changes.stream()
.filter(change -> basedOn.stream()
.filter(rev -> change.getRevision().equals(rev))
.findFirst().isPresent()
)
.findFirst().isPresent();
if (!parentExists) {
logger.warn("Change is based on non-existing parent");
logger.trace("TODO: keep it in a temp bag the time his parent is known");
// patch.setBasedOn("0");
}
// Project project = block.getProject();
patch.setBlockId(block.getId());
changes.add(patch);
getCache().put(id, get);
this.scheduleSaveMicroChanges(id);
logger.trace("Registered change is {}", patch);
transactionManager.registerUpdate(patch);
// WsUpdateChangeMessage message = WsUpdateChangeMessage.build(List.of(patch));
// try {
// PrecomputedWsMessages msg = WebsocketHelper.prepareWsMessage(userDao,
// block.getChannels(), message);
// websocketManager.propagate(msg);
// } catch (EncodeException ex) {
// logger.error("Live update error: precompute failed");
// }
} finally {
this.unlock(id);
}
}
/**
* Get all pending changes for the given block
*
* @param id id of the block
*
* @return list of changes
*/
public List<Change> getPendingChanges(Long id) {
LiveUpdates get = getCache().get(id);
if (get != null) {
return get.getPendingChanges();
} else {
return new ArrayList<>();
}
}
/**
* Process all pending changes and save new value to database.
*
* @param blockId id of the block to process
*/
@TransactionAttribute(TransactionAttributeType.REQUIRES_NEW)
public void process(Long blockId) {
requestManager.sudo(() -> {
logger.debug("Process changes for #{}", blockId);
try {
this.lock(blockId);
TextDataBlock block = blockManager.findBlock(blockId);
if (block != null) {
LiveUpdates get = this.getCache().get(blockId);
if (get != null) {
try {
LiveResult result = get.process(false);
block.setTextData(result.getContent());
block.setRevision(result.getRevision());
blockManager.updateBlock(block);
this.deletePendingChangesAndPropagate(blockId);
} catch (RuntimeException ex) {
logger.error("Process failed", ex);
block.setHealthy(false);
} catch (ColabMergeException ex) {
logger.error("Fails to save block", ex);
block.setHealthy(false);
} catch (StackOverflowError error) {
logger.error("StackOverflowError");
block.setHealthy(false);
}
}
} else {
// block has been deleted
this.deletePendingChangesAndPropagate(blockId);
}
} finally {
this.unlock(blockId);
}
}
);
}
/**
* Clear pending changes
*
* @param id id of the block
*/
public void deletePendingChangesAndPropagate(Long id) {
logger.debug("Delete pending changes");
TextDataBlock block = blockManager.findBlock(id);
if (block != null) {
block.setHealthy(true);
try {
List<Change> changes = getPendingChanges(id);
transactionManager.registerDeletion(changes);
} catch (Throwable t) {
logger.warn("Propagate deleted changes failed", t);
}
// WsDeleteChangeMessage message = WsDeleteChangeMessage.build(changes);
//
// try {
// PrecomputedWsMessages msg = WebsocketHelper.prepareWsMessage(userDao, block.getChannels(), message);
// websocketManager.propagate(msg);
// } catch (EncodeException ex) {
// logger.error("Live update error: precompute failed");
// }
}
try {
getCache().remove(id);
} catch (Throwable t) {
logger.warn("Drop changes", t);
}
}
/**
* Cancel any debounce call related to the given blockId
*
* @param blockId id of the block
*
* @return true if there was something to cancel
*/
public boolean cancelDebounce(Long blockId) {
logger.debug("Cancel debounce #{}", blockId);
Future<Void> remove = this.debounces.remove(blockId);
if (remove != null) {
return remove.cancel(true);
}
return false;
}
/**
* Schedule changes processing.
* <p>
* Ask all hazelcast instances to cancel any pending process call and reschedule a new one
*
* @param blockId id of the block
*/
public void scheduleSaveMicroChanges(Long blockId) {
IExecutorService executorService = hzInstance.getExecutorService("COLAB_LIVE");
Map<Member, Future<Boolean>> cancelCalls = executorService
.submitToAllMembers(new CancelDebounce(blockId));
logger.debug("Schedule processing #{}", blockId);
// cancel all pending debounce calls
cancelCalls.values().forEach(cancelCall -> {
try {
cancelCall.get();
} catch (InterruptedException | ExecutionException ex) {
logger.error("Fails to cancel", ex);
}
});
// schedule a new one
Future<Void> call = executorService.submit(new Debouncer(blockId));
this.debounces.put(blockId, call);
}
/**
* get data to monitor block with pending changes
*
* @return monitoring data
*/
public List<BlockMonitoring> getMonitoringData() {
IMap<Long, LiveUpdates> cache = getCache();
Set<Long> keys = cache.keySet();
return keys.stream().map(key -> {
var bm = new BlockMonitoring();
bm.setBlockId(key);
TextDataBlock block = blockManager.findBlock(key);
if (block != null) {
StringBuilder title = new StringBuilder();
if (block.getProject() != null) {
Project p = block.getProject();
title.append("Project \"")
.append(p.getName())
.append("\" #")
.append(p.getId())
.append(" / ");
}
try {
if (block.getOwningResource() != null) {
// block belongs to a resource
Resource r = block.getOwningResource();
Resourceable owner = r.getOwner();
if (owner instanceof CardType) {
CardType ct = (CardType) owner;
title.append("Type: ").append(ct.getTitle()).append(" #")
.append(ct.getId());
} else if (owner instanceof CardTypeRef) {
CardTypeRef ref = (CardTypeRef) owner;
title.append("TypeRef: ").append(ref.resolve().getTitle()).append(" #")
.append(ref.getId());
} else if (owner instanceof CardContent) {
CardContent cc = (CardContent) owner;
Card c = cc.getCard();
title.append("Card: ").append(c.getTitle()).append(" #")
.append(c.getId());
title.append(" / CardContent: ").append(cc.getTitle()).append(" #")
.append(cc.getId());
} else if (owner instanceof Card) {
Card c = (Card) owner;
title.append("Card: ").append(c.getTitle()).append(" #")
.append(c.getId());
}
title.append(" / Resource ").append(r.getTitle()).append(" # ")
.append(r.getId());
} else if (block.getOwningCardContent() != null) {
CardContent cc = block.getOwningCardContent();
Card c = cc.getCard();
title.append("Card: ").append(c.getTitle()).append(" #").append(c.getId()
).append(" / CardContent: ").append(cc.getTitle()).append(" #")
.append(cc.getId());
}
} catch (Throwable error) {
/** no-op */
logger.warn("Fails to build block title {}", block);
}
bm.setTitle(title.toString());
try {
var data = cache.get(key);
if (data != null) {
if (block.isHealthy()) {
bm.setStatus(BlockMonitoring.BlockStatus.HEALTHY);
} else {
bm.setStatus(BlockMonitoring.BlockStatus.UNHEALTHY);
}
} else {
bm.setStatus(BlockMonitoring.BlockStatus.PROCESSED);
}
} catch (Throwable e) {
/** Catch everything ! */
bm.setStatus(BlockMonitoring.BlockStatus.DATA_ERROR);
}
} else {
bm.setTitle("Ghost block");
bm.setStatus(BlockMonitoring.BlockStatus.DELETED);
}
return bm;
}).collect(Collectors.toList());
}
}