package fr.titionfire.ffsaf.ws; import com.fasterxml.jackson.core.JsonProcessingException; import fr.titionfire.ffsaf.data.repository.CompetitionRepository; import fr.titionfire.ffsaf.domain.service.CompetPermService; import fr.titionfire.ffsaf.net2.MessageType; import fr.titionfire.ffsaf.utils.SecurityCtx; import fr.titionfire.ffsaf.ws.data.WelcomeInfo; import fr.titionfire.ffsaf.ws.recv.*; import fr.titionfire.ffsaf.ws.send.JsonUni; import io.quarkus.hibernate.reactive.panache.common.WithSession; import io.quarkus.security.Authenticated; import io.quarkus.websockets.next.*; import io.smallrye.mutiny.Multi; import io.smallrye.mutiny.Uni; import io.smallrye.mutiny.unchecked.Unchecked; import jakarta.annotation.PostConstruct; import jakarta.inject.Inject; import jakarta.inject.Named; import jakarta.ws.rs.ForbiddenException; import org.jboss.logging.Logger; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.time.Duration; import java.util.*; import java.util.concurrent.Executor; import static fr.titionfire.ffsaf.net2.Client_Thread.MAPPER; @Authenticated @WebSocket(path = "api/ws/competition/{uuid}") public class CompetitionWS { private static final Logger LOGGER = Logger.getLogger(CompetitionWS.class); private static final HashMap>> waitingResponse = new HashMap<>(); @Inject RMatch rMatch; @Inject RCategorie rCategorie; @Inject RRegister rRegister; @Inject RCardboard rCardboard; @Inject SecurityCtx securityCtx; @Inject CompetPermService competPermService; @SuppressWarnings("CdiInjectionPointsInspection") @Inject OpenConnections connections; @Inject @Named("notify-executor") Executor notifyExecutor; private static Executor executor; @Inject CompetitionRepository competitionRepository; HashMap wsMethods = new HashMap<>(); public void getWSReceiverMethods(Class clazz, Object object) { for (Method method : clazz.getDeclaredMethods()) { if (method.isAnnotationPresent(WSReceiver.class)) { if (method.getParameterCount() <= 1) { LOGGER.warnf("@WSReceiver has no parameters for method %s", method.getName()); continue; } if (!method.getReturnType().equals(Uni.class)) { LOGGER.warnf("@WSReceiver has returned unexpected type %s", method.getReturnType()); continue; } // method.setAccessible(true); wsMethods.put(method, object); } } } @PostConstruct void init() { getWSReceiverMethods(RMatch.class, rMatch); getWSReceiverMethods(RCategorie.class, rCategorie); getWSReceiverMethods(RRegister.class, rRegister); getWSReceiverMethods(RCardboard.class, rCardboard); executor = notifyExecutor; } @OnOpen @WithSession Uni open(WebSocketConnection connection) { LOGGER.infof("Opening CompetitionWS for %s", connection.pathParam("uuid")); LOGGER.debugf("Active connections: %d", connection.getOpenConnections().size()); return competitionRepository.find("uuid", connection.pathParam("uuid")).firstResult() .invoke(Unchecked.consumer(cm -> { if (cm == null) throw new ForbiddenException(); })) .call(cm -> competPermService.hasEditPerm(securityCtx, cm).map(__ -> PermLevel.ADMIN) .onFailure() .recoverWithUni(competPermService.hasTablePerm(securityCtx, cm).map(__ -> PermLevel.TABLE)) .onFailure() .recoverWithUni(competPermService.hasViewPerm(securityCtx, cm).map(__ -> PermLevel.VIEW)) .invoke(prem -> connection.userData().put(UserData.TypedKey.forString("prem"), prem.toString())) .invoke(prem -> LOGGER.infof("Connection permission: %s", prem)) .onFailure().transform(t -> new ForbiddenException())) .invoke(__ -> { connection.userData().put(UserData.TypedKey.forString("subject"), securityCtx.getSubject()); waitingResponse.put(connection, new HashMap<>()); }) .map(cm -> { WelcomeInfo welcomeInfo = new WelcomeInfo(); welcomeInfo.setName(cm.getName()); welcomeInfo.setPerm(connection.userData().get(UserData.TypedKey.forString("prem"))); return new MessageOut(UUID.randomUUID(), "welcomeInfo", MessageType.NOTIFY, welcomeInfo); }); } @OnClose void close(WebSocketConnection connection) { LOGGER.infof("Closing CompetitionWS for %s ", connection.pathParam("uuid")); LOGGER.debugf("Active connections: %d", connection.getOpenConnections().size()); waitingResponse.remove(connection); } private MessageOut makeReply(MessageIn message, Object data) { return new MessageOut(message.uuid(), message.code(), MessageType.REPLY, data); } private MessageOut makeError(MessageIn message, Object data) { return new MessageOut(message.uuid(), message.code(), MessageType.ERROR, data); } @OnTextMessage Multi processAsync(WebSocketConnection connection, MessageIn message) { System.out.println(message); if (message.type() == MessageType.REPLY || message.type() == MessageType.ERROR) { try { JsonUni jsonUni = waitingResponse.get(connection).get(message.uuid()); if (jsonUni == null) { LOGGER.debugf("No JsonUni found for %s", message.uuid()); if (message.type() == MessageType.ERROR) LOGGER.errorf("request %s make error %s", message.uuid(), message.data()); return null; } waitingResponse.get(connection).remove(message.uuid()); if (message.type() == MessageType.ERROR) return jsonUni.castAndError(message.data()).onFailure() .invoke(t -> LOGGER.error(t.getMessage(), t)) .replaceWith((MessageOut) null).toMulti().filter(__ -> false); return jsonUni.castAndChain(message.data()).onFailure() .invoke(t -> LOGGER.error(t.getMessage(), t)) .replaceWith((MessageOut) null).toMulti().filter(__ -> false); } catch (JsonProcessingException e) { LOGGER.warn(e.getMessage(), e); } } try { for (Map.Entry entry : wsMethods.entrySet()) { Method method = entry.getKey(); WSReceiver wsReceiver = method.getAnnotation(WSReceiver.class); PermLevel perm = PermLevel.valueOf(connection.userData().get(UserData.TypedKey.forString("prem"))); if (wsReceiver.code().equalsIgnoreCase(message.code())) { if (wsReceiver.permission().ordinal() > perm.ordinal()) return Uni.createFrom().item(makeError(message, "Permission denied")).toMulti(); return ((Uni) method.invoke(entry.getValue(), connection, MAPPER.treeToValue(message.data(), method.getParameterTypes()[1]))) .ifNoItem().after(Duration.ofSeconds(5)).fail() .map(o -> makeReply(message, o)) .onFailure() .recoverWithItem(t -> { LOGGER.error(t.getMessage(), t); return makeError(message, t.getMessage()); }).toMulti() .filter(__ -> message.type() == MessageType.REQUEST); } } return Uni.createFrom().item(makeError(message, "No receiver method found")).toMulti(); } catch (IllegalAccessException | InvocationTargetException | JsonProcessingException e) { LOGGER.warn(e.getMessage(), e); return Uni.createFrom().item(makeError(message, e.getMessage())).toMulti(); } // return Uni.createFrom().item(new Message<>(message.uuid(), message.code(), MessageType.REPLY, "ko")); } public static void sendNotifyToOtherEditor(WebSocketConnection connection, String code, Object data) { String uuid = connection.pathParam("uuid"); List> queue = new ArrayList<>(); queue.add(Uni.createFrom().voidItem()); // For avoid empty queue connection.getOpenConnections().forEach(c -> { if (uuid.equals(c.pathParam("uuid"))) { queue.add(c.sendText(new MessageOut(UUID.randomUUID(), code, MessageType.NOTIFY, data))); } }); Uni.join().all(queue) .andCollectFailures() .runSubscriptionOn(executor) .subscribeAsCompletionStage() .whenComplete((v, t) -> { if (t != null) { LOGGER.error("Error sending ws_out message", t); } }); } @OnError Uni error(WebSocketConnection connection, ForbiddenException t) { return connection.close(CloseReason.INTERNAL_SERVER_ERROR); //return "forbidden: " + securityCtx.getSubject(); } }