``` import re from typing import List, Dict, Set, Tuple from dataclasses import dataclass from collections import deque @dataclass class PickleContext: """Maintains context for pickle operation analysis""" stack: deque # Simulates pickle's stack memo: dict # Simulates pickle's memo last_global: str = None # Tracks last GLOBAL operation last_mark: int = 0 # Tracks position of last MARK binpersid_context: List[str] = None # Tracks context for BINPERSID ops def __post_init__(self): self.binpersid_context = [] class PickleSecurityScanner: def __init__(self): # Define dangerous operations and patterns self.dangerous_ops = { 'GLOBAL': "Imports modules/objects which could be dangerous", 'REDUCE': "Calls functions/methods which could execute arbitrary code", 'BUILD': "Calls __setstate__ which could execute arbitrary code", 'INST': "Creates instances which could be dangerous", 'OBJ': "Creates arbitrary objects which could be dangerous", 'NEWOBJ': "Creates new objects which could be dangerous" } self.dangerous_modules = { 'os', 'sys', 'subprocess', 'commands', 'base64', 'pickle', 'cpickle', 'shelve', 'marshal', 'builtins', 'posix', 'nt', 'shutil', 'importlib', 'pathlib', 'tempfile', 'requests', 'urllib', 'socket' } self.dangerous_patterns = [ # Command execution patterns r'eval\(', r'exec\(', r'execfile\(', r'system\(', r'popen\(', r'spawn', r'subprocess\.', r'shell=True', # File operations r'open\(', r'file\(', r'unlink\(', r'remove\(', r'rmdir\(', r'delete\(', r'write\(', r'chmod\(', r'chown\(', # Import related r'__import__', r'importlib', r'load_module', r'load_source', r'load_compiled', # Special methods r'__[\w]+__', # Double underscore methods r'__reduce__', r'__reduce_ex__', r'__getattr__', r'__setattr__', r'__delattr__', r'__getattribute__', # Network related r'urllib\.', r'requests\.', r'socket\.', r'connect\(', r'bind\(', r'listen\(', # Code compilation r'compile\(', r'code\.', # Process manipulation r'fork\(', r'exec\(', r'execv\(', r'kill\(', r'pkill', r'terminate\(', # Shell interaction r'shell', r'bash', r'sh\s', r'command', r'getoutput\(', # Other dangerous operations r'serializ', r'deserializ', r'marshal\.', r'base64\.', r'tempfile\.', r'mktemp', r'random\.' ] self.safe_reduces = { 'collections.OrderedDict', 'torch._utils._rebuild_tensor_v2', 'torch.HalfStorage', 'torch.FloatStorage', 'torch.IntStorage', 'numpy.core.multiarray._reconstruct', 'numpy.ndarray', 'pandas.core.series.Series', 'pandas.core.frame.DataFrame' } def analyze_binpersid_context(self, context: PickleContext) -> Tuple[bool, str]: """ Analyze the context around a BINPERSID operation to determine if it's safe. Returns (is_safe, reason) """ stack_contents = ' '.join(str(x) for x in context.stack) context_string = ' '.join(context.binpersid_context[-5:]) # Safe if dealing with PyTorch storage/tensors if 'torch' in context.last_global and 'Storage' in context.last_global: return True, "PyTorch storage reconstruction" # Check against safe patterns for pattern in self.safe_persistent_patterns: if (re.search(pattern, stack_contents) or re.search(pattern, context_string)): return True, f"Matches safe pattern: {pattern}" # Check for potential memory exhaustion try: sizes = re.findall(r'BININT\d*\s+(\d+)', context_string) if sizes: size = max(int(s) for s in sizes) if size > 1_000_000_000: return False, f"Potential memory exhaustion (size: {size})" except (ValueError, TypeError): pass # Check for dangerous patterns for pattern in self.dangerous_patterns: if (re.search(pattern, stack_contents, re.IGNORECASE) or re.search(pattern, context_string, re.IGNORECASE)): return False, f"Dangerous pattern in context: {pattern}" return True, "No suspicious patterns found" def is_safe_reduce(self, context: PickleContext) -> bool: """Analyze if a REDUCE operation is safe based on context.""" if not context.last_global: return False # Check if it's a known safe reduction if any(safe in context.last_global for safe in self.safe_reduces): return True # Analyze stack contents for potentially dangerous operations stack_contents = ' '.join(str(x) for x in context.stack) dangerous_patterns = [ r'eval', r'exec', r'system', r'popen', r'spawn', r'importlib', r'__import__', r'subprocess', r'file', r'open', r'remove', r'unlink', r'rmdir' ] return not any(re.search(pattern, stack_contents, re.IGNORECASE) for pattern in dangerous_patterns) def scan_line(self, line: str, context: PickleContext) -> Tuple[str, str, str]: """Scan a single line for dangerous operations with comprehensive pattern matching.""" line = line.strip() parts = line.split() if not parts: return None # Update context context.binpersid_context.append(line) if len(context.binpersid_context) > 20: context.binpersid_context.pop(0) # Check for GLOBAL operations with dangerous modules if 'GLOBAL' in parts: try: global_name = ' '.join(parts[parts.index('GLOBAL')+1:]) context.last_global = global_name if any(mod in global_name.lower() for mod in self.dangerous_modules): return ('HIGH', f'Dangerous module import: {global_name}', line) except IndexError: pass # Special handling for BINPERSID if 'BINPERSID' in parts: is_safe, reason = self.analyze_binpersid_context(context) if not is_safe: return ('MEDIUM', f'Potentially unsafe BINPERSID: {reason}', line) return None # Check for dangerous patterns in the line for pattern in self.dangerous_patterns: if re.search(pattern, line, re.IGNORECASE): return ('HIGH', f'Dangerous pattern found: {pattern}', line) # Handle other dangerous operations for op in self.dangerous_ops: if op in parts: if op == 'REDUCE' and context.last_global: # Check if this is a known safe reduction if self.is_safe_reduce(context): return None return ('MEDIUM', self.dangerous_ops[op], line) return None def scan_content(self, content: str) -> List[Dict]: """Scan pickle disassembly content for security issues with context.""" findings = [] context = PickleContext(stack=deque(), memo={}) for line_num, line in enumerate(content.split('\n'), 1): result = self.scan_line(line, context) if result: severity, message, line_content = result findings.append({ 'line_number': line_num, 'severity': severity, 'message': message, 'content': line_content }) return findings def format_findings(self, findings: List[Dict]) -> str: """Format the findings into a readable report.""" if not findings: return "No security issues found." report = [] report.append("Security Scan Results") report.append("===================") # Group by severity for severity in ['HIGH', 'MEDIUM']: severity_findings = [f for f in findings if f['severity'] == severity] if severity_findings: report.append(f"\n{severity} Severity Issues:") report.append("-" * 20) for finding in severity_findings: report.append(f"Line {finding['line_number']}: {finding['message']}") report.append(f"Content: {finding['content']}") report.append("") return "\n".join(report) def scan_pickle_disassembly(content: str) -> str: """Main function to scan pickle disassembly content.""" scanner = PickleSecurityScanner() findings = scanner.scan_content(content) return scanner.format_findings(findings) # Example usage if __name__ == "__main__": # Add example showing typical PyTorch tensor reconstruction sample_content = open("data.dis").read() result = scan_pickle_disassembly(sample_content) print(result) ```