From: Andrew Vineyard <TechnoMage6@gmail.com>
---
.gitignore | 1 +
depot_archiver.py | 14 ++---
depot_extractor.py | 12 +++-
depot_validator.py | 152 +++++++++++++++++++++++++++++++++++++++++++++
4 files changed, 171 insertions(+), 8 deletions(-)
create mode 100644 depot_validator.py
diff --git a/.gitignore b/.gitignore
index 2e78875..b4f6805 100644
--- a/.gitignore
+++ b/.gitignore
@@ -7,6 +7,7 @@ virtualenv/
extract/
clientmanifests/
clientpackages/
+keys/
*.swp
depot_keys.txt
last_change.txt
diff --git a/depot_archiver.py b/depot_archiver.py
index 994cdf0..c8bdb4e 100644
--- a/depot_archiver.py
+++ b/depot_archiver.py
@@ -104,7 +104,7 @@ def archive_manifest(manifest, c, name="unknown", dry_run=False, server_override
content = await response.content.read()
break
elif 400 <= response.status < 500:
- print(f"error: received status code {response.status} (on chunk {chunk_str}, server {host})")
+ print(f"\033[31merror: received status code {response.status} (on chunk {chunk_str}, server {host})\003[0m")
return False
except Exception as e:
print("rotating to next server:", e)
@@ -160,8 +160,8 @@ def try_load_manifest(appid, depotid, manifestid):
makedirs("./depots/%s" % depotid, exist_ok=True)
if path.exists(dest):
with open(dest, "rb") as f:
- manifest = CDNDepotManifest(c, appid, f.read())
print("Loaded cached manifest %s from disk" % manifestid)
+ return CDNDepotManifest(c, appid, f.read())
else:
while True:
license_requested = False
@@ -218,18 +218,18 @@ if __name__ == "__main__":
if args.workshop_id:
response = steam_client.send_um_and_wait("PublishedFile.GetDetails#1", {'publishedfileids':[args.workshop_id]})
if response.header.eresult != EResult.OK:
- print("error: couldn't get workshop item info:", response.header.error_message)
+ print("\033[31merror: couldn't get workshop item info:\033[0m", response.header.error_message)
exit(1)
file = response.body.publishedfiledetails[0]
if file.result != EResult.OK:
- print("error: steam returned error", EResult(file.result))
+ print("\033[31merror: steam returned error\033[0m", EResult(file.result))
exit(1)
print("Retrieved data for workshop item", file.title, "for app", file.consumer_appid, "(%s)" % file.app_name)
if not file.hcontent_file:
- print("error: workshop item is not on SteamPipe")
+ print("\033[31merror: workshop item is not on SteamPipe\033[0m")
exit(1)
if file.file_url:
- print("error: workshop item is not on SteamPipe: its download URL is", file.file_url)
+ print("\033[31merror: workshop item is not on SteamPipe: its download URL is\033[0m", file.file_url)
exit(1)
archive_manifest(try_load_manifest(file.consumer_appid, file.consumer_appid, file.hcontent_file), c, file.title, args.dry_run, args.server, args.backup)
exit(0)
@@ -251,7 +251,7 @@ if __name__ == "__main__":
if changenumber > highest_changenumber:
highest_changenumber = changenumber
if highest_changenumber == 0:
- print("error: -l flag specified, but no local appinfo exists for app", appid)
+ print("\033[31merror: -l flag specified, but no local appinfo exists for app\033[0m", appid)
exit(1)
appinfo_path = "./appinfo/%s_%s.vdf" % (appid, highest_changenumber)
else:
diff --git a/depot_extractor.py b/depot_extractor.py
index faecbcc..3ab857f 100644
--- a/depot_extractor.py
@@ -31,6 +31,7 @@ from chunkstore import Chunkstore
if __name__ == "__main__":
path = "./depots/%s/" % args.depotid
+ keyfile = "./keys/%s.depotkey" % args.depotid
manifest = None
with open(path + "%s.zip" % args.manifestid, "rb") as f:
manifest = DepotManifest(f.read())
@@ -39,7 +40,16 @@ if __name__ == "__main__":
if manifest.filenames_encrypted:
manifest.decrypt_filenames(args.depotkey)
elif manifest.filenames_encrypted:
- if exists("./depot_keys.txt"):
+ ## Using No-Intro's DepotKey format, which is
+ ## a 32-byte/256-bit binary file.
+ ## Examples require login to No-Intro to view.
+ if exists(keyfile):
+ with open(keyfile, "rb") as f:
+ args.depotkey = f.read()
+ manifest.decrypt_filenames(args.depotkey)
+ ## If depotkey is not found, locate depot_keys.txt
+ ## and check if key is located in there.
+ elif exists("./depot_keys.txt"):
with open("./depot_keys.txt", "r", encoding="utf-8") as f:
for line in f.read().split("\n"):
line = line.split("\t")
diff --git a/depot_validator.py b/depot_validator.py
new file mode 100644
index 0000000..aba45bb
--- /dev/null
+++ b/depot_validator.py
@@ -0,0 +1,152 @@
+#!/usr/bin/env python3
+from argparse import ArgumentParser
+from binascii import hexlify, unhexlify
+from datetime import datetime
+from fnmatch import fnmatch
+from glob import glob
+from hashlib import sha1
+from io import BytesIO
+from os import scandir, makedirs, remove
+from os.path import dirname, exists
+from pathlib import Path
+from struct import unpack
+from sys import argv
+from zipfile import ZipFile
+import lzma
+
+if __name__ == "__main__": # exit before we import our shit if the args are wrong
+ parser = ArgumentParser(description='Extract downloaded depots.')
+ parser.add_argument('depotid', type=int)
+ parser.add_argument('depotkey', type=str, nargs='?')
+ parser.add_argument('-b', dest="backup", help="Path to a .csd backup file to extract (the manifest must also be present in the depots folder)", nargs='?')
+ args = parser.parse_args()
+
+from steam.core.manifest import DepotManifest
+from steam.core.crypto import symmetric_decrypt
+from chunkstore import Chunkstore
+
+if __name__ == "__main__":
+ path = "./depots/%s/" % args.depotid
+ keyfile = "./keys/%s.depotkey" % args.depotid
+ if args.depotkey:
+ args.depotkey = bytes.fromhex(args.depotkey)
+ elif exists(keyfile):
+ with open(keyfile, "rb") as f:
+ args.depotkey = f.read()
+ elif exists("./depot_keys.txt"):
+ with open("./depot_keys.txt", "r", encoding="utf-8") as f:
+ for line in f.read().split("\n"):
+ line = line.split("\t")
+ try:
+ if int(line[0]) == args.depotid:
+ args.depotkey = bytes.fromhex(line[2])
+ break
+ except ValueError:
+ pass
+ if not args.depotkey:
+ print("\033[31mERROR: files are encrypted, but no depot key was specified and no key for this depot exists in depot_keys.txt\033[0m")
+ exit(1)
+ else:
+ print("\033[31mERROR: files are encrypted, but no depot key was specified and no depot_keys.txt or depotkey file exists\033[0m")
+ exit(1)
+
+ chunks = {}
+ if args.backup:
+ chunkstores = {}
+ chunks_by_store = {}
+ for csm in glob(args.backup.replace("_1.csm","").replace("_1.csd","") + "_*.csm"):
+ chunkstore = Chunkstore(csm)
+ chunkstore.unpack()
+ for chunk, _ in chunkstore.chunks.items():
+ chunks[chunk] = _
+ chunks_by_store[chunk] = csm
+ chunkstores[csm] = chunkstore
+ else:
+ chunkFiles = [data.name for data in scandir(path) if data.is_file()
+ and not data.name.endswith(".zip")]
+ for name in chunkFiles: chunks[name] = 0
+
+ # print(f"{len(chunks)}")
+
+ def is_hex(s):
+ try:
+ unhexlify(s)
+ return True
+ except:
+ return False
+
+ badfiles = []
+
+ for file, value in chunks.items():
+ try:
+ if args.backup:
+ chunkhex = hexlify(file).decode()
+ chunk_data = None
+ is_encrypted = False
+ try:
+ chunkstore = chunkstores[chunks_by_store[file]]
+ chunk_data = chunkstore.get_chunk(file)
+ is_encrypted = chunkstore.is_encrypted
+ except Exception as e:
+ print(f"\033[31mError retrieving chunk\033[0m {chunkhex}: {e}")
+ ##breakpoint()
+ continue
+ if is_encrypted:
+ if args.depotkey:
+ decrypted = symmetric_decrypt(chunk_data, args.depotkey)
+ else:
+ print("\033[31mERROR: chunk %s is encrypted, but no depot key was specified\033[0m" % chunkhex)
+ exit(1)
+ else:
+ decrypted = chunk_data
+ chunk_data = None
+
+ else:
+ chunkhex = hexlify(unhexlify(file.replace("_decrypted", ""))).decode()
+ if exists(path + chunkhex):
+ with open(path + chunkhex, "rb") as chunkfile:
+ if args.depotkey:
+ try:
+ decrypted = symmetric_decrypt(chunkfile.read(), args.depotkey)
+ except ValueError as e:
+ print(f"{e}")
+ print(f"\033[31mError, unable to decrypt file:\033[0m {chunkhex}")
+ badfiles.append(chunkhex)
+ continue
+ else:
+ print("\033[31mERROR: chunk %s is encrypted, but no depot key was specified\033[0m" % chunkhex)
+ exit(1)
+ elif exists(path + chunkhex + "_decrypted"):
+ with open(path + chunkhex + "_decrypted", "rb") as chunkfile:
+ decrypted = chunkfile.read()
+ else:
+ print("missing chunk " + chunkhex)
+ continue
+ decompressed = None
+ if decrypted[:2] == b'VZ': # LZMA
+ decompressedSize = unpack('<i', decrypted[-6:-2])[0]
+ print("Testing (LZMA) from chunk", chunkhex, "Size:", decompressedSize)
+ try:
+ decompressed = lzma.LZMADecompressor(lzma.FORMAT_RAW, filters=[lzma._decode_filter_properties(lzma.FILTER_LZMA1, decrypted[7:12])]).decompress(decrypted[12:-10])[:decompressedSize]
+ except lzma.LZMAError as e:
+ print(f"\033[31mFailed to decompress:\033[0m {chunkhex}")
+ print(f"\033[31mError:\033[0m {e}")
+ badfiles.append(chunkhex)
+ continue
+ elif decrypted[:2] == b'PK': # Zip
+ print("Testing (Zip) from chunk", chunkhex)
+ zipfile = ZipFile(BytesIO(decrypted))
+ decompressed = zipfile.read(zipfile.filelist[0])
+ else:
+ print("\033[31mERROR: unknown archive type\033[0m", decrypted[:2].decode())
+ badfiles.append(chunkhex)
+ continue
+ #exit(1)
+ sha = sha1(decompressed)
+ if sha.digest() != unhexlify(chunkhex):
+ print("\033[31mERROR: sha1 checksum mismatch\033[0m (expected %s, got %s)" % (chunkhex, sha.hexdigest()))
+ badfiles.append(chunkhex)
+ except IsADirectoryError:
+ pass
+ for bad in badfiles:
+ print(f"{bad}")
\ No newline at end of file
--
2.43.4