Bienvenidos a Abandonsocios: El Portal de los Juegos Antiguos
0 Usuarios y 1 Visitante están viendo este tema.
#!/usr/bin/env python3import sys, structfrom pathlib import Path# --- LZW Access decompress (LSB-first, 9->12 bits) ---def lzw_access_decompress(comp: bytes, max_out: int | None = None) -> bytes: CLEAR, EOF, FIRST, MAX_CODE = 256, 257, 258, 4095 class BR: # bitreader LSB-first def __init__(self, b: bytes): self.b = b self.i = 0 # byte index self.bit = 0 # bit offset within current byte [0..7] def read(self, n: int): v = 0 s = 0 while n > 0: if self.i >= len(self.b): return None cur = self.b[self.i] take = min(8 - self.bit, n) v |= ((cur >> self.bit) & ((1 << take) - 1)) << s self.bit += take if self.bit == 8: self.bit = 0 self.i += 1 n -= take s += take return v br = BR(comp) # diccionario como en Access/ScummVM: códigos 0..255 = bytes # gestionado como "tabla de cadenas" simple table = {i: bytes((i,)) for i in range(256)} next_code = FIRST code_len = 9 max_val = 1 << code_len out = bytearray() prev = b"" while True: code = br.read(code_len) if code is None: break # fin de payload if code == CLEAR: # reset completo table = {i: bytes((i,)) for i in range(256)} next_code, code_len, max_val, prev = FIRST, 9, 1 << 9, b"" continue if code == EOF: break # Access a veces ni lo emite; si está, lo respetamos # regla LZW "KwKwK" if code in table: entry = table[code] elif code == next_code and prev: entry = prev + prev[:1] else: # stream extraño: salimos de forma segura break # volcado, con corte duro por max_out si se pide if max_out is not None: rem = max_out - len(out) if rem <= 0: break if len(entry) > rem: out.extend(entry[:rem]) break out.extend(entry) # aprendizaje if prev and next_code <= MAX_CODE: table[next_code] = prev + entry[:1] next_code += 1 if next_code >= max_val and code_len < 12: code_len += 1 max_val <<= 1 prev = entry return bytes(out if max_out is None else out[:max_out])# --- DBE -> DEC CLI ---def dbe2dec(in_path: str, out_path: str): data = Path(in_path).read_bytes() if not data.startswith(b"DBE\x01"): raise ValueError("Cabecera DBE inválida (no empieza por DBE\\x01)") dest = struct.unpack_from("<I", data, 4)[0] comp = data[16:] # payload real dec = lzw_access_decompress(comp, max_out=dest) Path(out_path).write_bytes(dec) ok = (len(dec) == dest) print(f"[dbe2dec] {Path(in_path).name} → {Path(out_path).name} dec={len(dec)} dest={dest} ok={ok}")if __name__ == "__main__": if len(sys.argv) != 3: print("Uso: dbe2dec_only.py <entrada.dbe> <salida.dec>") sys.exit(1) dbe2dec(sys.argv[1], sys.argv[2])
#!/usr/bin/env python3import sys, structfrom pathlib import Pathfrom typing import Optional, Tuple, List# ---- Constantes LZW Access ----CLEAR, EOF, FIRST, MAX_CODE = 256, 257, 258, 4095# -----------------------------# Bit writer LSB-first# -----------------------------class BitWriter: def __init__(self): self.buf = bytearray() self.acc = 0 self.bits = 0 def put(self, code: int, nbits: int): self.acc |= (code & ((1 << nbits) - 1)) << self.bits self.bits += nbits while self.bits >= 8: self.buf.append(self.acc & 0xFF) self.acc >>= 8 self.bits -= 8 def finish(self) -> bytes: if self.bits: self.buf.append(self.acc & 0xFF) self.acc = 0 self.bits = 0 return bytes(self.buf)# -----------------------------# Descompresor LZW Access (LSB, 9→12)# -----------------------------def lzw_access_decompress(comp: bytes, *, max_out: Optional[int]=None, require_end: bool=False) -> bytes: class BR: def __init__(self, b: bytes): self.b, self.i, self.bit = b, 0, 0 def read(self, n: int): v = 0; s = 0 while n > 0: if self.i >= len(self.b): return None cur = self.b[self.i] take = min(8 - self.bit, n) v |= ((cur >> self.bit) & ((1 << take) - 1)) << s self.bit += take if self.bit == 8: self.bit = 0; self.i += 1 n -= take; s += take return v br = BR(comp) table = {i: bytes((i,)) for i in range(256)} next_code, code_len, maxv = FIRST, 9, 1 << 9 out, prev = bytearray(), b"" saw_eof = False while True: code = br.read(code_len) if code is None: break if code == CLEAR: table = {i: bytes((i,)) for i in range(256)} next_code, code_len, maxv, prev = FIRST, 9, 1 << 9, b"" continue if code == EOF: saw_eof = True break if code in table: entry = table[code] elif code == next_code and prev: entry = prev + prev[:1] else: # flujo inválido break if max_out is not None: rem = max_out - len(out) if rem <= 0: break if len(entry) > rem: out.extend(entry[:rem]) break out.extend(entry) if prev and next_code <= MAX_CODE: table[next_code] = prev + entry[:1] next_code += 1 if next_code >= maxv and code_len < 12: code_len += 1 maxv <<= 1 prev = entry if require_end and not saw_eof: raise ValueError("EOF no encontrado") return bytes(out if max_out is None else out[:max_out])# -----------------------------# Compresor LZW Access con variantes# -----------------------------class Variant: __slots__ = ("bump", "clear_when_full", "emit_first_after_clear", "name") def __init__(self, bump:str, clear_when_full:str, emit_first_after_clear:bool): # bump: 'early' (antes) | 'late' (después) del cruce de umbral # clear_when_full: 'before_add' (emit CLEAR cuando next_code==4096 justo al detectar) # 'after_emit' (emit CLEAR en la siguiente oportunidad) self.bump = bump self.clear_when_full = clear_when_full self.emit_first_after_clear = emit_first_after_clear self.name = f"{bump}/{clear_when_full}/first={'on' if emit_first_after_clear else 'off'}"def lzw_access_compress_variant(raw: bytes, v: Variant) -> bytes: # Diccionario bytes->code dict_ = {bytes([i]): i for i in range(256)} next_code = FIRST code_size = 9 limit = 1 << code_size bw = BitWriter() bw.put(CLEAR, code_size) if not raw: bw.put(EOF, code_size) return bw.finish() # semilla w = bytes([raw[0]]) i = 1 def bump_if_needed(): nonlocal code_size, limit, next_code if v.bump == "early": # sube tamaño si el siguiente código (a usar) == límite if next_code == limit and code_size < 12: code_size += 1 limit <<= 1 else: # late # sube después de haber asignado el que cruzó if next_code > limit and code_size < 12: code_size += 1 limit <<= 1 while i < len(raw): c = bytes([raw[i]]) wc = w + c if wc in dict_: w = wc i += 1 continue # Emitimos w bw.put(dict_[w], code_size) # ¿podemos añadir wc? if next_code <= MAX_CODE: dict_[wc] = next_code next_code += 1 bump_if_needed() else: # Diccionario lleno → CLEAR if v.clear_when_full == "before_add": bw.put(CLEAR, code_size) dict_ = {bytes([j]): j for j in range(256)} next_code, code_size, limit = FIRST, 9, 1 << 9 # ¿emitir primer literal tras CLEAR? if v.emit_first_after_clear: bw.put(c[0], code_size) # literal w = b"" # siguiente iter hará w=c else: w = c i += 1 continue else: # after_emit: primero avanzamos, luego CLEAR en la próxima bw.put(CLEAR, code_size) dict_ = {bytes([j]): j for j in range(256)} next_code, code_size, limit = FIRST, 9, 1 << 9 # igual que arriba if v.emit_first_after_clear: bw.put(c[0], code_size) w = b"" else: w = c i += 1 continue # avanzar ventana w = c i += 1 # Fin: emitir w y EOF if w: bw.put(dict_[w], code_size) bw.put(EOF, code_size) return bw.finish()def try_all_variants(raw: bytes) -> Tuple[bytes,str]: variants: List[Variant] = [] for bump in ("early","late"): for clear in ("before_add","after_emit"): for first_on in (False, True): variants.append(Variant(bump, clear, first_on)) for v in variants: comp = lzw_access_compress_variant(raw, v) back = lzw_access_decompress(comp, max_out=len(raw), require_end=False) if back == raw: return comp, v.name raise RuntimeError("No hubo variante que haga roundtrip exacto con este .dec")# -----------------------------# DBE helpers# -----------------------------def build_dbe(payload: bytes, dest_size: int) -> bytes: return b"DBE\x01" + struct.pack("<I", dest_size) + b"\x00"*8 + payloaddef read_dbe(path: str) -> Tuple[int, bytes]: b = Path(path).read_bytes() if not b.startswith(b"DBE\x01"): raise ValueError("Cabecera DBE inválida") dest = struct.unpack_from("<I", b, 4)[0] return dest, b[16:]# -----------------------------# CLI# -----------------------------def cmd_dec2dbe(in_dec: str, out_dbe: str): raw = Path(in_dec).read_bytes() comp, chosen = try_all_variants(raw) back = lzw_access_decompress(comp, max_out=len(raw), require_end=False) if back != raw: # No debería ocurrir, por si acaso raise RuntimeError("Roundtrip inesperadamente falló tras elegir variante") Path(out_dbe).write_bytes(build_dbe(comp, len(raw))) print(f"[OK] dec2dbe: {Path(in_dec).name} -> {Path(out_dbe).name} payload={len(comp)} variant={chosen}")def cmd_verify(in_dbe: str): dest, comp = read_dbe(in_dbe) dec = lzw_access_decompress(comp, max_out=dest, require_end=False) ok = (len(dec) == dest) print(f"[verify] {Path(in_dbe).name}: comp={len(comp)} dec={len(dec)} dest={dest} ok={ok}")def main(): if len(sys.argv) < 3: print("Uso:") print(" python dbe_compresor.py dec2dbe <entrada.dec> <salida.dbe>") print(" python dbe_compresor.py verify <entrada.dbe>") sys.exit(1) cmd = sys.argv[1].lower() if cmd == "dec2dbe" and len(sys.argv) == 4: cmd_dec2dbe(sys.argv[2], sys.argv[3]) elif cmd == "verify" and len(sys.argv) == 3: cmd_verify(sys.argv[2]) else: print("Parámetros inválidos"); sys.exit(1)if __name__ == "__main__": main()