#!/usr/bin/env python3
import sys, struct
from pathlib import Path
# --- LZW Access decompress (LSB-first, 9->12 bits) ---
def lzw_access_decompress(comp: bytes, max_out: int | None = None) -> bytes:
CLEAR, EOF, FIRST, MAX_CODE = 256, 257, 258, 4095
class BR:
# bitreader LSB-first
def __init__(self, b: bytes):
self.b = b
self.i = 0 # byte index
self.bit = 0 # bit offset within current byte [0..7]
def read(self, n: int):
v = 0
s = 0
while n > 0:
if self.i >= len(self.b):
return None
cur = self.b[self.i]
take = min(8 - self.bit, n)
v |= ((cur >> self.bit) & ((1 << take) - 1)) << s
self.bit += take
if self.bit == 8:
self.bit = 0
self.i += 1
n -= take
s += take
return v
br = BR(comp)
# diccionario como en Access/ScummVM: códigos 0..255 = bytes
# gestionado como "tabla de cadenas" simple
table = {i: bytes((i,)) for i in range(256)}
next_code = FIRST
code_len = 9
max_val = 1 << code_len
out = bytearray()
prev = b""
while True:
code = br.read(code_len)
if code is None:
break # fin de payload
if code == CLEAR:
# reset completo
table = {i: bytes((i,)) for i in range(256)}
next_code, code_len, max_val, prev = FIRST, 9, 1 << 9, b""
continue
if code == EOF:
break # Access a veces ni lo emite; si está, lo respetamos
# regla LZW "KwKwK"
if code in table:
entry = table[code]
elif code == next_code and prev:
entry = prev + prev[:1]
else:
# stream extraño: salimos de forma segura
break
# volcado, con corte duro por max_out si se pide
if max_out is not None:
rem = max_out - len(out)
if rem <= 0:
break
if len(entry) > rem:
out.extend(entry[:rem])
break
out.extend(entry)
# aprendizaje
if prev and next_code <= MAX_CODE:
table[next_code] = prev + entry[:1]
next_code += 1
if next_code >= max_val and code_len < 12:
code_len += 1
max_val <<= 1
prev = entry
return bytes(out if max_out is None else out[:max_out])
# --- DBE -> DEC CLI ---
def dbe2dec(in_path: str, out_path: str):
data = Path(in_path).read_bytes()
if not data.startswith(b"DBE\x01"):
raise ValueError("Cabecera DBE inválida (no empieza por DBE\\x01)")
dest = struct.unpack_from("<I", data, 4)[0]
comp = data[16:] # payload real
dec = lzw_access_decompress(comp, max_out=dest)
Path(out_path).write_bytes(dec)
ok = (len(dec) == dest)
print(f"[dbe2dec] {Path(in_path).name} → {Path(out_path).name} dec={len(dec)} dest={dest} ok={ok}")
if __name__ == "__main__":
if len(sys.argv) != 3:
print("Uso: dbe2dec_only.py <entrada.dbe> <salida.dec>")
sys.exit(1)
dbe2dec(sys.argv[1], sys.argv[2])
#!/usr/bin/env python3
import sys, struct
from pathlib import Path
from typing import Optional, Tuple, List
# ---- Constantes LZW Access ----
CLEAR, EOF, FIRST, MAX_CODE = 256, 257, 258, 4095
# -----------------------------
# Bit writer LSB-first
# -----------------------------
class BitWriter:
def __init__(self):
self.buf = bytearray()
self.acc = 0
self.bits = 0
def put(self, code: int, nbits: int):
self.acc |= (code & ((1 << nbits) - 1)) << self.bits
self.bits += nbits
while self.bits >= 8:
self.buf.append(self.acc & 0xFF)
self.acc >>= 8
self.bits -= 8
def finish(self) -> bytes:
if self.bits:
self.buf.append(self.acc & 0xFF)
self.acc = 0
self.bits = 0
return bytes(self.buf)
# -----------------------------
# Descompresor LZW Access (LSB, 9→12)
# -----------------------------
def lzw_access_decompress(comp: bytes, *, max_out: Optional[int]=None, require_end: bool=False) -> bytes:
class BR:
def __init__(self, b: bytes):
self.b, self.i, self.bit = b, 0, 0
def read(self, n: int):
v = 0; s = 0
while n > 0:
if self.i >= len(self.b):
return None
cur = self.b[self.i]
take = min(8 - self.bit, n)
v |= ((cur >> self.bit) & ((1 << take) - 1)) << s
self.bit += take
if self.bit == 8:
self.bit = 0; self.i += 1
n -= take; s += take
return v
br = BR(comp)
table = {i: bytes((i,)) for i in range(256)}
next_code, code_len, maxv = FIRST, 9, 1 << 9
out, prev = bytearray(), b""
saw_eof = False
while True:
code = br.read(code_len)
if code is None:
break
if code == CLEAR:
table = {i: bytes((i,)) for i in range(256)}
next_code, code_len, maxv, prev = FIRST, 9, 1 << 9, b""
continue
if code == EOF:
saw_eof = True
break
if code in table:
entry = table[code]
elif code == next_code and prev:
entry = prev + prev[:1]
else:
# flujo inválido
break
if max_out is not None:
rem = max_out - len(out)
if rem <= 0:
break
if len(entry) > rem:
out.extend(entry[:rem])
break
out.extend(entry)
if prev and next_code <= MAX_CODE:
table[next_code] = prev + entry[:1]
next_code += 1
if next_code >= maxv and code_len < 12:
code_len += 1
maxv <<= 1
prev = entry
if require_end and not saw_eof:
raise ValueError("EOF no encontrado")
return bytes(out if max_out is None else out[:max_out])
# -----------------------------
# Compresor LZW Access con variantes
# -----------------------------
class Variant:
__slots__ = ("bump", "clear_when_full", "emit_first_after_clear", "name")
def __init__(self, bump:str, clear_when_full:str, emit_first_after_clear:bool):
# bump: 'early' (antes) | 'late' (después) del cruce de umbral
# clear_when_full: 'before_add' (emit CLEAR cuando next_code==4096 justo al detectar)
# 'after_emit' (emit CLEAR en la siguiente oportunidad)
self.bump = bump
self.clear_when_full = clear_when_full
self.emit_first_after_clear = emit_first_after_clear
self.name = f"{bump}/{clear_when_full}/first={'on' if emit_first_after_clear else 'off'}"
def lzw_access_compress_variant(raw: bytes, v: Variant) -> bytes:
# Diccionario bytes->code
dict_ = {bytes([i]): i for i in range(256)}
next_code = FIRST
code_size = 9
limit = 1 << code_size
bw = BitWriter()
bw.put(CLEAR, code_size)
if not raw:
bw.put(EOF, code_size)
return bw.finish()
# semilla
w = bytes([raw[0]])
i = 1
def bump_if_needed():
nonlocal code_size, limit, next_code
if v.bump == "early":
# sube tamaño si el siguiente código (a usar) == límite
if next_code == limit and code_size < 12:
code_size += 1
limit <<= 1
else: # late
# sube después de haber asignado el que cruzó
if next_code > limit and code_size < 12:
code_size += 1
limit <<= 1
while i < len(raw):
c = bytes([raw[i]])
wc = w + c
if wc in dict_:
w = wc
i += 1
continue
# Emitimos w
bw.put(dict_[w], code_size)
# ¿podemos añadir wc?
if next_code <= MAX_CODE:
dict_[wc] = next_code
next_code += 1
bump_if_needed()
else:
# Diccionario lleno → CLEAR
if v.clear_when_full == "before_add":
bw.put(CLEAR, code_size)
dict_ = {bytes([j]): j for j in range(256)}
next_code, code_size, limit = FIRST, 9, 1 << 9
# ¿emitir primer literal tras CLEAR?
if v.emit_first_after_clear:
bw.put(c[0], code_size) # literal
w = b"" # siguiente iter hará w=c
else:
w = c
i += 1
continue
else: # after_emit: primero avanzamos, luego CLEAR en la próxima
bw.put(CLEAR, code_size)
dict_ = {bytes([j]): j for j in range(256)}
next_code, code_size, limit = FIRST, 9, 1 << 9
# igual que arriba
if v.emit_first_after_clear:
bw.put(c[0], code_size)
w = b""
else:
w = c
i += 1
continue
# avanzar ventana
w = c
i += 1
# Fin: emitir w y EOF
if w:
bw.put(dict_[w], code_size)
bw.put(EOF, code_size)
return bw.finish()
def try_all_variants(raw: bytes) -> Tuple[bytes,str]:
variants: List[Variant] = []
for bump in ("early","late"):
for clear in ("before_add","after_emit"):
for first_on in (False, True):
variants.append(Variant(bump, clear, first_on))
for v in variants:
comp = lzw_access_compress_variant(raw, v)
back = lzw_access_decompress(comp, max_out=len(raw), require_end=False)
if back == raw:
return comp, v.name
raise RuntimeError("No hubo variante que haga roundtrip exacto con este .dec")
# -----------------------------
# DBE helpers
# -----------------------------
def build_dbe(payload: bytes, dest_size: int) -> bytes:
return b"DBE\x01" + struct.pack("<I", dest_size) + b"\x00"*8 + payload
def read_dbe(path: str) -> Tuple[int, bytes]:
b = Path(path).read_bytes()
if not b.startswith(b"DBE\x01"):
raise ValueError("Cabecera DBE inválida")
dest = struct.unpack_from("<I", b, 4)[0]
return dest, b[16:]
# -----------------------------
# CLI
# -----------------------------
def cmd_dec2dbe(in_dec: str, out_dbe: str):
raw = Path(in_dec).read_bytes()
comp, chosen = try_all_variants(raw)
back = lzw_access_decompress(comp, max_out=len(raw), require_end=False)
if back != raw:
# No debería ocurrir, por si acaso
raise RuntimeError("Roundtrip inesperadamente falló tras elegir variante")
Path(out_dbe).write_bytes(build_dbe(comp, len(raw)))
print(f"[OK] dec2dbe: {Path(in_dec).name} -> {Path(out_dbe).name} payload={len(comp)} variant={chosen}")
def cmd_verify(in_dbe: str):
dest, comp = read_dbe(in_dbe)
dec = lzw_access_decompress(comp, max_out=dest, require_end=False)
ok = (len(dec) == dest)
print(f"[verify] {Path(in_dbe).name}: comp={len(comp)} dec={len(dec)} dest={dest} ok={ok}")
def main():
if len(sys.argv) < 3:
print("Uso:")
print(" python dbe_compresor.py dec2dbe <entrada.dec> <salida.dbe>")
print(" python dbe_compresor.py verify <entrada.dbe>")
sys.exit(1)
cmd = sys.argv[1].lower()
if cmd == "dec2dbe" and len(sys.argv) == 4:
cmd_dec2dbe(sys.argv[2], sys.argv[3])
elif cmd == "verify" and len(sys.argv) == 3:
cmd_verify(sys.argv[2])
else:
print("Parámetros inválidos"); sys.exit(1)
if __name__ == "__main__":
main()