EntityGatheringBagForPropagation.java

/*
 * The coLAB project
 * Copyright (C) 2021-2023 AlbaSim, MEI, HEIG-VD, HES-SO
 *
 * Licensed under the MIT License
 */
package ch.colabproject.colab.api.controller;

import ch.colabproject.colab.api.model.WithWebsocketChannels;
import ch.colabproject.colab.api.persistence.jpa.card.CardTypeDao;
import ch.colabproject.colab.api.persistence.jpa.project.ProjectDao;
import ch.colabproject.colab.api.persistence.jpa.team.TeamMemberDao;
import ch.colabproject.colab.api.persistence.jpa.user.UserDao;
import ch.colabproject.colab.api.ws.WebsocketMessagePreparer;
import ch.colabproject.colab.api.ws.message.IndexEntry;
import ch.colabproject.colab.api.ws.message.PrecomputedWsMessages;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.enterprise.context.RequestScoped;
import javax.inject.Inject;
import javax.transaction.Status;
import java.io.Serializable;
import java.util.Collection;
import java.util.HashSet;
import java.util.Set;
import java.util.stream.Collectors;

/**
 * Transaction sidekick used to collect updated and deleted entities. Once the transaction is
 * completed, precompute messages to propagate through websocket channels.
 *
 * @author maxence
 */
@RequestScoped
//@Stateful
public class EntityGatheringBagForPropagation implements Serializable {

    private static final long serialVersionUID = 1L;

    /** logger */
    private static final Logger logger = LoggerFactory
        .getLogger(EntityGatheringBagForPropagation.class);

    /**
     * Synchronizer
     */
    private transient WebsocketTxSync synchronizer;

    /**
     * To send message to clients
     */
    @Inject
    private WebsocketManager websocketManager;

    /**
     * To resolve nested channels
     */
    @Inject
    private UserDao userDao;

    /**
     * To resolve nested channels
     */
    @Inject
    private TeamMemberDao teamDao;

    /**
     * To resolve nested channels
     */
    @Inject
    private CardTypeDao cardTypeDao;

    /**
     * To resolve nested channels
     */
    @Inject
    private ProjectDao projectDao;

    /**
     * TO sudo
     */
    @Inject
    private RequestManager requestManager;

    /**
     * To sync with JTA transaction(s)
     */
    @Inject
    private WebsocketTxManager txManager;

    /**
     * set of updated entities to be propagated
     */
    private Set<WithWebsocketChannels> updated = new HashSet<>();

    /**
     * set of entities which have been deleted during the transaction
     */
    private Set<IndexEntry> deleted = new HashSet<>();

    /**
     * store the prepared messages
     */
    private PrecomputedWsMessages message;

    /**
     * Is the message has been precomputed ?
     */
    private boolean precomputed = false;

    /**
     * Get the synchronizer
     *
     * @return the synchronizer
     */
    public WebsocketTxSync getSynchronizer() {
        return synchronizer;
    }

    /**
     * Set the synchronizer
     *
     * @param sync the synchronizer
     */
    public void setSynchronizer(WebsocketTxSync sync) {
        this.synchronizer = sync;
    }

//    /**
//     * As soon as this bean is construct, make sure there is a synchronizer bound to the current
//     * transaction
//     */
//    @AfterBegin
//    //@PostConstruct
//    public void construct() {
//        logger.trace("NEW TRANSACTION BEANLIFE CYCLE");
//        if (jtaSyncRegistry != null) {
//            if (synchronizer == null) {
//                logger.trace("Create Sync");
//                synchronizer = new WebsocketTxSync(this);
//                jtaSyncRegistry.registerInterposedSynchronization(synchronizer);
//            } else {
//                logger.trace("Synchronizer already registered");
//            }
//        } else {
//            logger.error(" * NULL -> NO-CONTEXT");
//        }
//    }
    /**
     * Register updated object within the WS JTA synchronizer.
     *
     * @param o object to register
     */
    public void registerUpdate(WithWebsocketChannels o) {
        // make sure txManager exists by just touching it
        txManager.touch();
        updated.add(o);
        logger.trace("UpdatedSet: {}", updated);
    }

    /**
     * Register updated objects within the WS JTA synchronizer.
     *
     * @param c collection of objects to register
     */
    public void registerUpdates(Collection<WithWebsocketChannels> c) {
        // make sure txManager exists by just touching it
        txManager.touch();
        updated.addAll(c);
        logger.trace("UpdatedSet: {}", updated);
    }

    /**
     * Register deleted objects within the WS JTA synchronizer
     *
     * @param c just deleted objects
     */
    public void registerDeletion(Collection<? extends WithWebsocketChannels> c) {
        // make sure txManager exists by just touching it
        txManager.touch();
        c.stream()
            .map(IndexEntry::build)
            .forEach(deleted::add);
        logger.trace("Deleted set: {}", deleted);
    }

    /**
     * Register deleted object within the WS JTA synchronizer
     *
     * @param o just deleted object
     */
    public void registerDeletion(WithWebsocketChannels o) {
        // make sure txManager exists by just touching it
        txManager.touch();
        deleted.add(IndexEntry.build(o));
        logger.trace("Deleted set: {}", deleted);
    }

    /**
     * Pre compute the message.
     */
    private void precomputeMessage() {
        try {
            logger.debug("Precompute messages; Update:{}, Deletes:{}", updated, deleted);
            // filter updates to keep only those who haven't been deleted
            Set<WithWebsocketChannels> filtered = updated.stream()
                .filter(
                    (u) -> !deleted.stream()
                        .anyMatch((d) -> d.equals(u))
                ).collect(Collectors.toSet());

            this.precomputed = true;
            requestManager.sudo(() -> {
                return this.message = WebsocketMessagePreparer.prepareWsMessage(userDao, teamDao,
                    cardTypeDao, projectDao, filtered, deleted);
            });
            logger.debug("Precomputed: {}", message);
        } catch (Exception ex) {
            logger.error("Failed to precompute websocket messages", ex);
        }
    }

    /**
     * Prepare websocket messages
     */
    public void prepare() {
        // make sure to flush everything to database: EDIT 20211118: seems useless
        // em.flush();
        requestManager.setTxDone(true);
        // logger.info(
        // "Before transactionCompletion: This method is not called for each transaction, why ???");
        this.precomputeMessage();
    }

    /**
     * After completion callback
     *
     * @param status transaction {@link Status status}
     */
    public void afterCompletion(int status) {
        switch (status) {
            case Status.STATUS_COMMITTED:
                logger.trace("Commit TX");
                this.commit();
                break;
            case Status.STATUS_ROLLEDBACK:
                logger.trace("Rollback TX");
                this.rollback();
                break;
            default:
                logger.warn("Unknonwn status {}", status);
                break;
        }
        // clear the synchronizer, so any new transaction will recreate one
        this.synchronizer = null;
    }

    /**
     * On transaction rollback
     */
    private void rollback() {
        /* no-op */
    }

    /**
     * After commit, send pre-computed messages.
     */
    private void commit() {
        logger.debug("After transaction completion: {}", message);
        requestManager.setTxDone(true);
        if (!precomputed) {
            logger.warn("Messages were not precomputed @BeforeCompletion!!!");
            // message shall be precomputed during the "before completion" phase, but the
            // dedicated method is never called, and I do not understand the reason...
            this.precomputeMessage();
        }
        if (message != null && !message.getMessages().isEmpty()) {
            logger.debug("Send messages: {}", message);
            websocketManager.propagate(message);
        }
    }
}