mirror of
https://gitea.tendokyu.moe/Hay1tsme/artemis.git
synced 2026-02-15 04:07:29 +08:00
reformat with black in preperation for merge to master
This commit is contained in:
@@ -19,6 +19,7 @@ from .handlers import IDZHandlerBase
|
||||
|
||||
HANDLER_MAP: List[Dict]
|
||||
|
||||
|
||||
class IDZKey:
|
||||
def __init__(self, n, d, e, hashN: int) -> None:
|
||||
self.N = n
|
||||
@@ -26,9 +27,16 @@ class IDZKey:
|
||||
self.e = e
|
||||
self.hashN = hashN
|
||||
|
||||
|
||||
class IDZUserDBProtocol(Protocol):
|
||||
def __init__(self, core_cfg: CoreConfig, game_cfg: IDZConfig, keys: List[IDZKey], handlers: List[Dict]) -> None:
|
||||
self.logger = logging.getLogger('idz')
|
||||
def __init__(
|
||||
self,
|
||||
core_cfg: CoreConfig,
|
||||
game_cfg: IDZConfig,
|
||||
keys: List[IDZKey],
|
||||
handlers: List[Dict],
|
||||
) -> None:
|
||||
self.logger = logging.getLogger("idz")
|
||||
self.core_config = core_cfg
|
||||
self.game_config = game_cfg
|
||||
self.rsa_keys = keys
|
||||
@@ -37,14 +45,14 @@ class IDZUserDBProtocol(Protocol):
|
||||
self.version = None
|
||||
self.version_internal = None
|
||||
self.skip_next = False
|
||||
|
||||
|
||||
def append_padding(self, data: bytes):
|
||||
"""Appends 0s to the end of the data until it's at the correct size"""
|
||||
length = struct.unpack_from("<H", data, 6)
|
||||
padding_size = length[0] - len(data)
|
||||
data += bytes(padding_size)
|
||||
return data
|
||||
|
||||
|
||||
def connectionMade(self) -> None:
|
||||
self.logger.debug(f"{self.transport.getPeer().host} Connected")
|
||||
base = 0
|
||||
@@ -57,13 +65,17 @@ class IDZUserDBProtocol(Protocol):
|
||||
|
||||
rsa_key = random.choice(self.rsa_keys)
|
||||
key_enc: int = pow(base, rsa_key.e, rsa_key.N)
|
||||
result = key_enc.to_bytes(0x40, "little") + struct.pack("<I", 0x01020304) + rsa_key.hashN.to_bytes(4, "little")
|
||||
result = (
|
||||
key_enc.to_bytes(0x40, "little")
|
||||
+ struct.pack("<I", 0x01020304)
|
||||
+ rsa_key.hashN.to_bytes(4, "little")
|
||||
)
|
||||
|
||||
self.logger.debug(f"Send handshake {result.hex()}")
|
||||
|
||||
self.transport.write(result)
|
||||
|
||||
def connectionLost(self, reason) -> None:
|
||||
def connectionLost(self, reason) -> None:
|
||||
self.logger.debug(
|
||||
f"{self.transport.getPeer().host} Disconnected - {reason.value}"
|
||||
)
|
||||
@@ -84,7 +96,7 @@ class IDZUserDBProtocol(Protocol):
|
||||
self.transport.write(b"\x00")
|
||||
return
|
||||
|
||||
elif magic == 0x01020304:
|
||||
elif magic == 0x01020304:
|
||||
self.version = int(data_dec[16:19].decode())
|
||||
|
||||
if self.version == 110:
|
||||
@@ -99,10 +111,12 @@ class IDZUserDBProtocol(Protocol):
|
||||
self.logger.warn(f"Bad version v{self.version}")
|
||||
self.version = None
|
||||
self.version_internal = None
|
||||
|
||||
self.logger.debug(f"Userdb v{self.version} handshake response from {self.transport.getPeer().host}")
|
||||
|
||||
self.logger.debug(
|
||||
f"Userdb v{self.version} handshake response from {self.transport.getPeer().host}"
|
||||
)
|
||||
return
|
||||
|
||||
|
||||
elif self.skip_next:
|
||||
self.skip_next = False
|
||||
self.transport.write(b"\x00")
|
||||
@@ -110,19 +124,25 @@ class IDZUserDBProtocol(Protocol):
|
||||
|
||||
elif self.version is None:
|
||||
# We didn't get a handshake before, and this isn't one now, so we're up the creek
|
||||
self.logger.info(f"Bad UserDB request from from {self.transport.getPeer().host}")
|
||||
self.logger.info(
|
||||
f"Bad UserDB request from from {self.transport.getPeer().host}"
|
||||
)
|
||||
self.transport.write(b"\x00")
|
||||
return
|
||||
|
||||
cmd = struct.unpack_from("<H", data_dec, 0)[0]
|
||||
|
||||
handler_cls: Optional[Type[IDZHandlerBase]] = self.handlers[self.version_internal].get(cmd, None)
|
||||
|
||||
handler_cls: Optional[Type[IDZHandlerBase]] = self.handlers[
|
||||
self.version_internal
|
||||
].get(cmd, None)
|
||||
if handler_cls is None:
|
||||
self.logger.warn(f"No handler for v{self.version} {hex(cmd)} cmd")
|
||||
handler_cls = IDZHandlerBase
|
||||
|
||||
handler = handler_cls(self.core_config, self.game_config, self.version_internal)
|
||||
self.logger.info(f"Userdb v{self.version} {handler.name} request from {self.transport.getPeer().host}")
|
||||
|
||||
handler = handler_cls(self.core_config, self.game_config, self.version_internal)
|
||||
self.logger.info(
|
||||
f"Userdb v{self.version} {handler.name} request from {self.transport.getPeer().host}"
|
||||
)
|
||||
response = handler.handle(data_dec)
|
||||
|
||||
self.logger.debug(f"Response: {response.hex()}")
|
||||
@@ -136,14 +156,23 @@ class IDZUserDBProtocol(Protocol):
|
||||
class IDZUserDBFactory(Factory):
|
||||
protocol = IDZUserDBProtocol
|
||||
|
||||
def __init__(self, cfg: CoreConfig, game_cfg: IDZConfig, keys: List[IDZKey], handlers: List[Dict]) -> None:
|
||||
def __init__(
|
||||
self,
|
||||
cfg: CoreConfig,
|
||||
game_cfg: IDZConfig,
|
||||
keys: List[IDZKey],
|
||||
handlers: List[Dict],
|
||||
) -> None:
|
||||
self.core_config = cfg
|
||||
self.game_config = game_cfg
|
||||
self.keys = keys
|
||||
self.handlers = handlers
|
||||
|
||||
def buildProtocol(self, addr):
|
||||
return IDZUserDBProtocol(self.core_config, self.game_config, self.keys, self.handlers)
|
||||
return IDZUserDBProtocol(
|
||||
self.core_config, self.game_config, self.keys, self.handlers
|
||||
)
|
||||
|
||||
|
||||
class IDZUserDBWeb(resource.Resource):
|
||||
def __init__(self, core_cfg: CoreConfig, game_cfg: IDZConfig):
|
||||
@@ -151,12 +180,16 @@ class IDZUserDBWeb(resource.Resource):
|
||||
self.isLeaf = True
|
||||
self.core_config = core_cfg
|
||||
self.game_config = game_cfg
|
||||
self.logger = logging.getLogger('idz')
|
||||
|
||||
self.logger = logging.getLogger("idz")
|
||||
|
||||
def render_POST(self, request: Request) -> bytes:
|
||||
self.logger.info(f"IDZUserDBWeb POST from {request.getClientAddress().host} to {request.uri} with data {request.content.getvalue()}")
|
||||
self.logger.info(
|
||||
f"IDZUserDBWeb POST from {request.getClientAddress().host} to {request.uri} with data {request.content.getvalue()}"
|
||||
)
|
||||
return b""
|
||||
|
||||
def render_GET(self, request: Request) -> bytes:
|
||||
self.logger.info(f"IDZUserDBWeb GET from {request.getClientAddress().host} to {request.uri}")
|
||||
return b""
|
||||
self.logger.info(
|
||||
f"IDZUserDBWeb GET from {request.getClientAddress().host} to {request.uri}"
|
||||
)
|
||||
return b""
|
||||
|
||||
Reference in New Issue
Block a user