1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298
| """ exploit_interrogation_full.py
完整交互脚本:使用指定 17 条非自适应查询与审问服务交互, 自动 PoW -> 提问 -> 解码 -> 提交,并稳健判断每轮成功/失败。 """
import socket import sys import time import re import itertools import hashlib
QUESTIONS = [ "( ( S1 ) == S4 ) == 0", "( ( ( ( S0 ) == S6 ) == 0 ) == S7 ) == 0", "( ( ( ( ( ( S1 ) == S2 ) == 0 ) == S6 ) == 0 ) == S7 ) == 0", "( ( S2 ) == S5 ) == 0", "( ( ( ( ( ( S2 ) == S3 ) == 0 ) == S4 ) == 0 ) == S7 ) == 0", "( ( ( ( ( ( ( ( ( ( S0 ) == S2 ) == 0 ) == S3 ) == 0 ) == S4 ) == 0 ) == S5 ) == 0 ) == S7 ) == 0", "( ( S5 ) == S7 ) == 0", "( ( S3 ) == S6 ) == 0", "( ( ( ( ( ( ( ( S1 ) == S3 ) == 0 ) == S5 ) == 0 ) == S6 ) == 0 ) == S7 ) == 0", "( ( ( ( ( ( S2 ) == S3 ) == 0 ) == S4 ) == 0 ) == S6 ) == 0", "( ( ( ( ( ( ( ( S0 ) == S2 ) == 0 ) == S3 ) == 0 ) == S5 ) == 0 ) == S7 ) == 0", "( ( ( ( ( ( ( ( ( ( S0 ) == S3 ) == 0 ) == S4 ) == 0 ) == S5 ) == 0 ) == S6 ) == 0 ) == S7 ) == 0", "( ( S0 ) == S4 ) == 0", "( ( S1 ) == S3 ) == 0", "( ( ( ( ( ( S0 ) == S1 ) == 0 ) == S4 ) == 0 ) == S7 ) == 0", "( ( ( ( S0 ) == S1 ) == 0 ) == S3 ) == 0", "( ( ( ( ( ( ( ( S2 ) == S3 ) == 0 ) == S5 ) == 0 ) == S6 ) == 0 ) == S7 ) == 0" ]
POW_CHARS = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
def recv_some(sock, timeout=2.0): """尝试读取一些数据;超时或无数据返回空字符串(不会抛异常)。""" try: sock.settimeout(timeout) data = sock.recv(4096) if not data: return "" return data.decode(errors='ignore') except socket.timeout: return "" except Exception: return ""
def recv_until_keywords(sock, keywords, total_timeout=12.0, per_recv=1.0): """ 循环读取,直到缓冲包含任一 keywords(忽略大小写),或超时。 返回累积缓冲(字符串)。 """ deadline = time.time() + total_timeout buf = "" low = "" while time.time() < deadline: chunk = recv_some(sock, timeout=per_recv) if chunk: buf += chunk low = buf.lower() for kw in keywords: if kw.lower() in low: return buf else: time.sleep(0.02) return buf
def send_line(sock, s): if isinstance(s, str): s = s.encode() if not s.endswith(b'\n'): s += b'\n' sock.sendall(s)
def solve_pow_from_banner(banner): m = re.search(r"sha256\(XXXX\+([A-Za-z0-9]+)\)\s*==\s*([0-9a-f]{64})", banner) if not m: return None tail = m.group(1) target = m.group(2) for p in itertools.product(POW_CHARS, repeat=4): prefix = ''.join(p) if hashlib.sha256((prefix + tail).encode()).hexdigest() == target: return prefix return None
def eval_expr_on_S(expr, S): """ 在给定 S(长度8的布尔元组)下评估表达式 expr(QUESTIONS 中的格式)。 将 S0..S7 注入局部命名空间,禁用 __builtins__。 """ locals_map = { f"S{i}": bool(S[i]) for i in range(8) } locals_map["0"] = 0 locals_map["1"] = 1 try: return bool(eval(expr, {"__builtins__": None}, locals_map)) except Exception: return False
def parse_bool_from_text(text): if not text: return None if re.search(r"\bTrue\b", text, re.IGNORECASE): return True if re.search(r"\bFalse\b", text, re.IGNORECASE): return False m = re.search(r"\b([01])\b", text) if m: return m.group(1) == '1' return None
def decode_S_from_observed(observed_bits): best_dist = 999 best_vals = [] for val in range(256): S = tuple(((val >> i) & 1) == 1 for i in range(8)) expected = [1 if eval_expr_on_S(q, S) else 0 for q in QUESTIONS] dist = sum(1 for a,b in zip(expected, observed_bits) if a != b) if dist < best_dist: best_dist = dist best_vals = [val] elif dist == best_dist: best_vals.append(val) return best_vals, best_dist
def wait_round_end_or_next(sock, timeout_total=12.0, per_recv=1.0): """ 提交答案后调用,等待出现成功或失败提示。 成功(返回 True): "The prisoner scowls as you expose his lies" + "Very well" + "ask your next round" 或者出现 "ask your question" / "Welcome to the interrogation" 失败(返回 False): 包含 "fell for" / "fell for my deception" 超时(返回 None) """ end_time = time.time() + timeout_total buf = "" while time.time() < end_time: chunk = recv_some(sock, timeout=per_recv) if chunk: buf += chunk low = buf.lower() if "you fell for" in low or "fell for my deception" in low or "you fell for my deception" in low: print("[server - failure]:", buf.strip()) return False if ("the prisoner scowls as you expose his lies" in low and "very well" in low and "ask your next round" in low): print("[server - success]:", buf.strip()) return True if "the prisoner scowls as you expose his lies" in low: print("[server - success fragment]:", buf.strip()) return True if "very well, ask your next round" in low or "very well" in low: print("[server - success fragment]:", buf.strip()) return True if "ask your question" in low or "welcome to the interrogation" in low: print("[server - next round prompt]:", buf.strip()) return True else: time.sleep(0.02) print("[warn] wait_round_end_or_next: timed out. buffer:", buf.strip()[:300]) return None
def run(host, port): sock = socket.create_connection((host, port), timeout=10) try: banner = recv_until_keywords(sock, ["sha256(XXXX+", "Give me XXXX", "Notice:", "Ask your question"], total_timeout=6.0) if banner: print(banner.strip()) if "sha256(XXXX+" in banner: prefix = solve_pow_from_banner(banner) if prefix is None: print("[!] PoW parse/solve failed.") return print("[*] PoW solved:", prefix) _ = recv_until_keywords(sock, ["Give me XXXX", ":"], total_timeout=3.0) send_line(sock, prefix) time.sleep(0.05) more = recv_some(sock, timeout=1.0) if more: print(more.strip()) round_count = 0 while True: round_count += 1 print("\n=== Round", round_count, "===") intro = recv_until_keywords(sock, ["Ask your question", "Welcome to the interrogation", "Notice:"], total_timeout=6.0) if intro: print(intro.strip()) observed = [] for idx, q in enumerate(QUESTIONS, start=1): prompt = recv_until_keywords(sock, ["Ask your question", "Ask your question:"], total_timeout=8.0) if prompt: try: print(prompt.strip().splitlines()[-1]) except: print(prompt.strip()) print(f"Sending Q{idx:02d}: {q}") send_line(sock, q) resp = recv_until_keywords(sock, ["Prisoner's response", "The prisoner smirks", "Ask your question", "Now reveal"], total_timeout=4.0) if not resp: resp = recv_some(sock, timeout=1.0) if resp: last = resp.strip().splitlines()[-1] print(" <-", last) else: print(" <- (no response captured)") val = parse_bool_from_text(resp) if val is None: more = recv_some(sock, timeout=1.0) combined = (resp or "") + more val = parse_bool_from_text(combined) if val is None: print("[!] Could not parse boolean response, defaulting to False") val = False observed.append(1 if val else 0) tail = recv_until_keywords(sock, ["Now reveal the true secrets", "Now reveal"], total_timeout=3.0) if tail: print(tail.strip()) print("[*] Observed bits:", observed) best_vals, best_dist = decode_S_from_observed(observed) print(f"[*] Best distance: {best_dist}, #candidates: {len(best_vals)} (show up to 8): {best_vals[:8]}") if best_dist <= 2 and best_vals: chosen = best_vals[0] S_bits = [(chosen >> i) & 1 for i in range(8)] elif best_vals: chosen = best_vals[0] S_bits = [(chosen >> i) & 1 for i in range(8)] print("[warn] best_dist >2, using best candidate as fallback") else: S_bits = [0]*8 print("[warn] no candidates found, submitting all zeros as fallback") print("[*] Submitting S0..S7:", S_bits) send_line(sock, " ".join(str(b) for b in S_bits)) res = wait_round_end_or_next(sock, timeout_total=12.0) if res is True: print("[*] Round succeeded — continue to next round.") elif res is False: print("[!] Round failed — stopping.") break else: extra = recv_some(sock, timeout=2.0) if extra: print("[server extra]:", extra.strip()) low = extra.lower() if "you fell for" in low or "fell for my deception" in low: print("[!] Server reports failure in extra read.") break if "ask your question" in low or "very well" in low or "welcome to the interrogation" in low: print("[*] Detected next prompt in extra read; continuing.") continue print("[!] No clear verdict after waiting — exiting.") break if round_count >= 25: print("[*] reached max configured rounds (25).") break finally: try: sock.close() except: pass
if __name__ == "__main__": if len(sys.argv) < 3: print("Usage: python3 exploit_interrogation_full.py <host> <port>") sys.exit(1) host = sys.argv[1] port = int(sys.argv[2]) run(host, port)
|