ffsaf-site/src/main/java/fr/titionfire/ffsaf/net2/Client_Thread.java

230 lines
8.6 KiB
Java

package fr.titionfire.ffsaf.net2;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import fr.titionfire.ffsaf.utils.JsonConsumer;
import fr.titionfire.ffsaf.utils.Pair;
import jakarta.inject.Inject;
import jakarta.transaction.Transactional;
import org.jboss.logging.Logger;
import org.quartz.*;
import javax.crypto.BadPaddingException;
import javax.crypto.Cipher;
import javax.crypto.IllegalBlockSizeException;
import javax.crypto.NoSuchPaddingException;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.net.Socket;
import java.nio.charset.StandardCharsets;
import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException;
import java.security.PublicKey;
import java.util.Base64;
import java.util.HashMap;
import java.util.UUID;
public class Client_Thread extends Thread {
private static final Logger LOGGER = Logger.getLogger(Client_Thread.class);
public static final ObjectMapper MAPPER = new ObjectMapper();
private final Socket s;
private final OutputStreamWriter dos;
private final BufferedReader br;
public final ServerCustom serv;
private boolean isAuth;
private final HashMap<UUID, JsonConsumer<Object>> waitResult = new HashMap<>();
public Client_Thread(ServerCustom serv, Socket s, PublicKey publicKey) throws IOException {
this.serv = serv;
this.s = s;
this.isAuth = false;
dos = new OutputStreamWriter(s.getOutputStream(), StandardCharsets.UTF_8); // to send data to the client
br = new BufferedReader(new InputStreamReader(s.getInputStream(), StandardCharsets.UTF_8)); // to read data
// coming from the client
LOGGER.infof("Connection established to %s", s.getInetAddress().getHostAddress());
String auth_code = UUID.randomUUID() + "-" + UUID.randomUUID();
JsonConsumer<Object> consumer = new JsonConsumer<>(Object.class, t -> {
if (t.equals(auth_code)) {
isAuth = true;
LOGGER.infof("Client %s is auth", s.getInetAddress().getHostAddress());
} else {
LOGGER.warnf("Client %s is not auth", s.getInetAddress().getHostAddress());
stopSoket();
}
});
try {
Cipher cipher = Cipher.getInstance("RSA");
cipher.init(Cipher.ENCRYPT_MODE, publicKey);
byte[] bytes = cipher.doFinal(auth_code.getBytes(StandardCharsets.UTF_8));
sendReq(new String(Base64.getEncoder().encode(bytes)), "auth", consumer);
} catch (NoSuchAlgorithmException | NoSuchPaddingException | IllegalBlockSizeException | BadPaddingException | InvalidKeyException e) {
LOGGER.error(e.getMessage(), e);
}
}
@Override
public void run() {
while (!s.isClosed()) {
String jsonmessage;
// read from client
try {
if ((jsonmessage = br.readLine()) != null) {
jsonmessage = jsonmessage.replace('¤', '\n');
process_message(jsonmessage);
} else {
s.close();
}
} catch (IOException e) {
if (e.getMessage().equals("Stream closed") || e.getMessage().equals("Connection reset")) {
break;
}
LOGGER.error(e.getMessage(), e);
}
}
serv.onCloseClient(this);
// close connection
try {
dos.close();
br.close();
LOGGER.infof("Connection closed for %s", s.getInetAddress().getHostAddress());
} catch (IOException e) {
LOGGER.error(e.getMessage(), e);
}
}
private void process_message(String jsonmessage) {
try {
JsonNode node = MAPPER.readTree(jsonmessage);
if (node.has("uuid") && node.has("code") && node.has("type")) {
Message<JsonNode> message = new Message<>(UUID.fromString(node.get("uuid").asText()),
node.get("code").asText(), MessageType.valueOf(node.get("type").asText()),
(node.has("data")) ? node.get("data") : null);
LOGGER.debugf("Message Received of %s: %s", s.getInetAddress().getHostAddress(), message);
if (message.type() == MessageType.REQUEST || message.type() == MessageType.NOTIFY) {
if (serv.iMap.containsKey(message.code())) {
if (isAuth) {
UUID uuid = UUID.randomUUID();
serv.jobData.put(uuid, new Pair<>(this, message));
JobKey jobKey = JobKey.jobKey(uuid.toString(), "client-request");
JobDetail job = JobBuilder.newJob(CallSyncJob.class)
.usingJobData("uuid", uuid.toString())
.storeDurably(true)
.withIdentity(jobKey)
.build();
serv.quartz.addJob(job, true);
serv.quartz.triggerJob(jobKey);
}
} else {
this.sendErr("No receiver found", message.code(), message.uuid());
LOGGER.warnf("No receiver found for %s", message.code());
}
} else {
if (waitResult.containsKey(message.uuid())) {
if (waitResult.get(message.uuid()) != null)
waitResult.get(message.uuid()).castAndAccept(message.data());
waitResult.remove(message.uuid());
} else {
LOGGER.warnf("No waiting message found for %s", message.uuid());
}
}
} else {
LOGGER.debugf("Message format incorrect: %s", jsonmessage);
}
} catch (Exception e) {
LOGGER.debug(jsonmessage);
LOGGER.error(e.getMessage(), e);
}
}
public void sendNotify(Object object, String type) {
sendMessage(new Message<>(UUID.randomUUID(), type, MessageType.NOTIFY, object));
}
public void sendReq(Object object, String type) {
sendReq(object, type, null);
}
public void sendReq(Object object, String code, JsonConsumer<Object> consumer) {
UUID uuid;
do {
uuid = UUID.randomUUID();
} while (waitResult.containsKey(uuid));
waitResult.put(uuid, consumer);
sendMessage(new Message<>(uuid, code, MessageType.REQUEST, object));
}
public void sendRepTo(Object object, Message<?> in) {
sendMessage(new Message<>(in.uuid(), in.code(), MessageType.REPLY, object));
}
public void sendErr(String msg, String type, UUID uuid) {
sendMessage(new Message<>(uuid, type, MessageType.ERROR, msg));
}
public void sendErrTo(String msg, Message<?> in) {
sendMessage(new Message<>(in.uuid(), in.code(), MessageType.ERROR, msg));
}
public void sendMessage(Message<?> message) {
try {
synchronized (dos) {
dos.write(MAPPER.writeValueAsString(message).replace('¤', ' ').replace('\n', '¤') + '\n');
dos.flush();
}
} catch (IOException e) {
LOGGER.error(e.getMessage(), e);
}
LOGGER.tracef("Send message type: %s, code: %s, uuid: %s, to: %s", message.type(), message.code(),
message.uuid(), s.getInetAddress().getHostAddress());
}
public void stopSoket() {
try {
s.close();
} catch (IOException ignored) {
}
}
public static class CallSyncJob implements Job {
@Inject
ServerCustom socket;
@Transactional
public void execute(JobExecutionContext context) {
UUID uuid = UUID.fromString(context.getMergedJobDataMap().getString("uuid"));
Pair<Client_Thread, Message<JsonNode>> pair = socket.jobData.get(uuid);
if (pair != null) {
try {
socket.iMap.get(pair.getValue().code()).castAndCall(pair.getKey(), pair.getValue());
} catch (JsonProcessingException e) {
LOGGER.error(e.getMessage(), e);
}
socket.jobData.remove(uuid);
} else {
LOGGER.errorf("no get found %d", socket.jobData.size());
}
}
}
}