From 5290597b2b091d7a9decf75dca6ff69e083832c8 Mon Sep 17 00:00:00 2001 From: Azalea <22280294+hykilpikonna@users.noreply.github.com> Date: Mon, 13 Jan 2025 21:14:44 -0500 Subject: [PATCH] [O] Futari serailization --- .../samnyan/aqua/sega/maimai2/MaimaiFutari.kt | 99 +++++++++++-------- 1 file changed, 56 insertions(+), 43 deletions(-) diff --git a/src/main/java/icu/samnyan/aqua/sega/maimai2/MaimaiFutari.kt b/src/main/java/icu/samnyan/aqua/sega/maimai2/MaimaiFutari.kt index 59c3aa2f..1cadf125 100644 --- a/src/main/java/icu/samnyan/aqua/sega/maimai2/MaimaiFutari.kt +++ b/src/main/java/icu/samnyan/aqua/sega/maimai2/MaimaiFutari.kt @@ -20,38 +20,54 @@ const val SO_TIMEOUT = 10000 private object Command { // Control plane - const val CTL_START = 1 - const val CTL_BIND = 2 - const val CTL_HEARTBEAT = 3 - const val CTL_TCP_CONNECT = 4 // Accept a new multiplexed TCP stream - const val CTL_TCP_ACCEPT = 5 - const val CTL_TCP_ACCEPT_ACK = 6 - const val CTL_TCP_CLOSE = 7 + const val CTL_START = 1u + const val CTL_BIND = 2u + const val CTL_HEARTBEAT = 3u + const val CTL_TCP_CONNECT = 4u // Accept a new multiplexed TCP stream + const val CTL_TCP_ACCEPT = 5u + const val CTL_TCP_ACCEPT_ACK = 6u + const val CTL_TCP_CLOSE = 7u // Data plane - const val DATA_SEND = 21 - const val DATA_BROADCAST = 22 + const val DATA_SEND = 21u + const val DATA_BROADCAST = 22u } private object Proto { - const val TCP = 6 - const val UDP = 17 + const val TCP = 6u + const val UDP = 17u } -data class Message( - val cmd: Int, - val proto: Int? = null, // Protocol, TCP or UDP - val sid: Int? = null, // Stream ID, only applicable in TCP streams, 0 in UDP +data class Msg( + var cmd: UInt, + var proto: UInt? = null, + var sid: UInt? = null, + var src: UInt? = null, + var sPort: UInt? = null, + var dst: UInt? = null, + var dPort: UInt? = null, + var data: String? = null +) { + override fun toString() = ls( + 1, cmd, proto, sid, src, sPort, dst, dPort, + null, null, null, null, null, null, null, null, // reserved for future use + data + ).joinToString(",") { it?.str ?: "" }.trimEnd(',') - // Src and dst should be simulated ipv4 addresses and 0-65535 ports - val src: UInt? = null, - val sPort: Int? = null, - val dst: UInt? = null, - val dPort: Int? = null, + companion object { + val fields = arr(Msg::proto, Msg::sid, Msg::src, Msg::sPort, Msg::dst, Msg::dPort) - val data: Any? = null -) -fun ctlMsg(cmd: Int, data: Any? = null) = Message(cmd, data = data) + fun fromString(str: String): Msg { + val sp = str.split(',') + return Msg(0u).apply { + cmd = sp[1].toUInt() + fields.forEachIndexed { i, f -> f.set(this, sp.getOrNull(i + 2)?.some?.toUIntOrNull()) } + data = sp.drop(16).joinToString(",") + } + } + } +} +fun ctlMsg(cmd: UInt, data: String? = null) = Msg(cmd, data = data) data class ActiveClient( val clientKey: String, @@ -59,8 +75,8 @@ data class ActiveClient( val reader: BufferedReader, val writer: BufferedWriter, // - val tcpStreams: MutableMap = mutableMapOf(), - val pendingStreams: MutableSet = mutableSetOf(), + val tcpStreams: MutableMap = mutableMapOf(), + val pendingStreams: MutableSet = mutableSetOf(), ) { val log = logger() val stubIp = keychipToStubIp(clientKey) @@ -68,16 +84,16 @@ data class ActiveClient( var lastHeartbeat = millis() - fun send(msg: Message) { + fun send(msg: Msg) { writeMutex.withLock { - writer.write(msg.toJson()) + writer.write(msg.toString()) writer.newLine() writer.flush() } } } -fun ActiveClient.handle(msg: Message) { +fun ActiveClient.handle(msg: Msg) { // Find target by dst IP address or TCP stream ID val target = (msg.dst ?: msg.sid?.let { tcpStreams[it] } )?.let { clients[it] } @@ -95,19 +111,16 @@ fun ActiveClient.handle(msg: Message) { return log.warn("Stream ID not found: ${msg.sid}") target.send(msg.copy(src = stubIp, dst = target.stubIp)) - - // 1020844165 - // 2245580860 } Command.CTL_TCP_CONNECT -> { target ?: return log.warn("Connect: Target not found: ${msg.dst}") - msg.sid ?: return log.warn("Connect: Stream ID not found") + val sid = msg.sid ?: return log.warn("Connect: Stream ID not found") - if (msg.sid in tcpStreams || msg.sid in pendingStreams) - return log.warn("Stream ID already in use: ${msg.sid}") + if (sid in tcpStreams || sid in pendingStreams) + return log.warn("Stream ID already in use: $sid") // Add the stream to the pending list - pendingStreams.add(msg.sid) + pendingStreams.add(sid) if (pendingStreams.size > MAX_STREAMS) { log.warn("Too many pending streams, closing connection") return socket.close() @@ -117,15 +130,15 @@ fun ActiveClient.handle(msg: Message) { } Command.CTL_TCP_ACCEPT -> { target ?: return log.warn("Accept: Target not found: ${msg.dst}") - msg.sid ?: return log.warn("Accept: Stream ID not found") + val sid = msg.sid ?: return log.warn("Accept: Stream ID not found") - if (msg.sid !in target.pendingStreams) - return log.warn("Stream ID not found in pending list: ${msg.sid}") + if (sid !in target.pendingStreams) + return log.warn("Stream ID not found in pending list: ${sid}") // Add the stream to the active list - target.pendingStreams.remove(msg.sid) - target.tcpStreams[msg.sid] = stubIp - tcpStreams[msg.sid] = target.stubIp + target.pendingStreams.remove(sid) + target.tcpStreams[sid] = stubIp + tcpStreams[sid] = target.stubIp target.send(msg.copy(src = stubIp, dst = target.stubIp)) } @@ -165,7 +178,7 @@ class MaimaiFutari(private val port: Int = 20101) { while (true) { val input = (reader.readLine() ?: continue).trim('\uFEFF') log.debug("Received: $input") - val message = input.json() + val message = Msg.fromString(input) when (message.cmd) { // Start: Register the client. Payload is the keychip @@ -178,7 +191,7 @@ class MaimaiFutari(private val port: Int = 20101) { log.info("[+] Client registered: ${socket.remoteSocketAddress} -> $id") // Send back the version - handler?.send(ctlMsg(Command.CTL_START, mapOf("version" to PROTO_VERSION))) + handler?.send(ctlMsg(Command.CTL_START, "version=$PROTO_VERSION")) } // Handle any other command using the handler