1012 9.6 KB 242
import re
from typing import List, Dict, Set, Tuple
from dataclasses import dataclass
from collections import deque
@dataclass
class PickleContext:
"""Maintains context for pickle operation analysis"""
stack: deque # Simulates pickle's stack
memo: dict # Simulates pickle's memo
last_global: str = None # Tracks last GLOBAL operation
last_mark: int = 0 # Tracks position of last MARK
binpersid_context: List[str] = None # Tracks context for BINPERSID ops
def __post_init__(self):
self.binpersid_context = []
class PickleSecurityScanner:
def __init__(self):
# Define dangerous operations and patterns
self.dangerous_ops = {
'GLOBAL': "Imports modules/objects which could be dangerous",
'REDUCE': "Calls functions/methods which could execute arbitrary code",
'BUILD': "Calls __setstate__ which could execute arbitrary code",
'INST': "Creates instances which could be dangerous",
'OBJ': "Creates arbitrary objects which could be dangerous",
'NEWOBJ': "Creates new objects which could be dangerous"
}
self.dangerous_modules = {
'os', 'sys', 'subprocess', 'commands', 'base64',
'pickle', 'cpickle', 'shelve', 'marshal', 'builtins',
'posix', 'nt', 'shutil', 'importlib', 'pathlib',
'tempfile', 'requests', 'urllib', 'socket'
}
self.dangerous_patterns = [
# Command execution patterns
r'eval\(', r'exec\(', r'execfile\(',
r'system\(', r'popen\(', r'spawn',
r'subprocess\.', r'shell=True',
# File operations
r'open\(', r'file\(', r'unlink\(',
r'remove\(', r'rmdir\(', r'delete\(',
r'write\(', r'chmod\(', r'chown\(',
# Import related
r'__import__', r'importlib', r'load_module',
r'load_source', r'load_compiled',
# Special methods
r'__[\w]+__', # Double underscore methods
r'__reduce__', r'__reduce_ex__',
r'__getattr__', r'__setattr__',
r'__delattr__', r'__getattribute__',
# Network related
r'urllib\.', r'requests\.', r'socket\.',
r'connect\(', r'bind\(', r'listen\(',
# Code compilation
r'compile\(', r'code\.',
# Process manipulation
r'fork\(', r'exec\(', r'execv\(',
r'kill\(', r'pkill', r'terminate\(',
# Shell interaction
r'shell', r'bash', r'sh\s',
r'command', r'getoutput\(',
# Other dangerous operations
r'serializ', r'deserializ',
r'marshal\.', r'base64\.',
r'tempfile\.', r'mktemp',
r'random\.'
]
self.safe_reduces = {
'collections.OrderedDict',
'torch._utils._rebuild_tensor_v2',
'torch.HalfStorage',
'torch.FloatStorage',
'torch.IntStorage',
'numpy.core.multiarray._reconstruct',
'numpy.ndarray',
'pandas.core.series.Series',
'pandas.core.frame.DataFrame'
}
def analyze_binpersid_context(self, context: PickleContext) -> Tuple[bool, str]:
"""
Analyze the context around a BINPERSID operation to determine if it's safe.
Returns (is_safe, reason)
"""
stack_contents = ' '.join(str(x) for x in context.stack)
context_string = ' '.join(context.binpersid_context[-5:])
# Safe if dealing with PyTorch storage/tensors
if 'torch' in context.last_global and 'Storage' in context.last_global:
return True, "PyTorch storage reconstruction"
# Check against safe patterns
for pattern in self.safe_persistent_patterns:
if (re.search(pattern, stack_contents) or
re.search(pattern, context_string)):
return True, f"Matches safe pattern: {pattern}"
# Check for potential memory exhaustion
try:
sizes = re.findall(r'BININT\d*\s+(\d+)', context_string)
if sizes:
size = max(int(s) for s in sizes)
if size > 1_000_000_000:
return False, f"Potential memory exhaustion (size: {size})"
except (ValueError, TypeError):
pass
# Check for dangerous patterns
for pattern in self.dangerous_patterns:
if (re.search(pattern, stack_contents, re.IGNORECASE) or
re.search(pattern, context_string, re.IGNORECASE)):
return False, f"Dangerous pattern in context: {pattern}"
return True, "No suspicious patterns found"
def is_safe_reduce(self, context: PickleContext) -> bool:
"""Analyze if a REDUCE operation is safe based on context."""
if not context.last_global:
return False
# Check if it's a known safe reduction
if any(safe in context.last_global for safe in self.safe_reduces):
return True
# Analyze stack contents for potentially dangerous operations
stack_contents = ' '.join(str(x) for x in context.stack)
dangerous_patterns = [
r'eval', r'exec', r'system', r'popen', r'spawn',
r'importlib', r'__import__', r'subprocess',
r'file', r'open', r'remove', r'unlink', r'rmdir'
]
return not any(re.search(pattern, stack_contents, re.IGNORECASE)
for pattern in dangerous_patterns)
def scan_line(self, line: str, context: PickleContext) -> Tuple[str, str, str]:
"""Scan a single line for dangerous operations with comprehensive pattern matching."""
line = line.strip()
parts = line.split()
if not parts:
return None
# Update context
context.binpersid_context.append(line)
if len(context.binpersid_context) > 20:
context.binpersid_context.pop(0)
# Check for GLOBAL operations with dangerous modules
if 'GLOBAL' in parts:
try:
global_name = ' '.join(parts[parts.index('GLOBAL')+1:])
context.last_global = global_name
if any(mod in global_name.lower() for mod in self.dangerous_modules):
return ('HIGH', f'Dangerous module import: {global_name}', line)
except IndexError:
pass
# Special handling for BINPERSID
if 'BINPERSID' in parts:
is_safe, reason = self.analyze_binpersid_context(context)
if not is_safe:
return ('MEDIUM', f'Potentially unsafe BINPERSID: {reason}', line)
return None
# Check for dangerous patterns in the line
for pattern in self.dangerous_patterns:
if re.search(pattern, line, re.IGNORECASE):
return ('HIGH', f'Dangerous pattern found: {pattern}', line)
# Handle other dangerous operations
for op in self.dangerous_ops:
if op in parts:
if op == 'REDUCE' and context.last_global:
# Check if this is a known safe reduction
if self.is_safe_reduce(context):
return None
return ('MEDIUM', self.dangerous_ops[op], line)
return None
def scan_content(self, content: str) -> List[Dict]:
"""Scan pickle disassembly content for security issues with context."""
findings = []
context = PickleContext(stack=deque(), memo={})
for line_num, line in enumerate(content.split('\n'), 1):
result = self.scan_line(line, context)
if result:
severity, message, line_content = result
findings.append({
'line_number': line_num,
'severity': severity,
'message': message,
'content': line_content
})
return findings
def format_findings(self, findings: List[Dict]) -> str:
"""Format the findings into a readable report."""
if not findings:
return "No security issues found."
report = []
report.append("Security Scan Results")
report.append("===================")
# Group by severity
for severity in ['HIGH', 'MEDIUM']:
severity_findings = [f for f in findings if f['severity'] == severity]
if severity_findings:
report.append(f"\n{severity} Severity Issues:")
report.append("-" * 20)
for finding in severity_findings:
report.append(f"Line {finding['line_number']}: {finding['message']}")
report.append(f"Content: {finding['content']}")
report.append("")
return "\n".join(report)
def scan_pickle_disassembly(content: str) -> str:
"""Main function to scan pickle disassembly content."""
scanner = PickleSecurityScanner()
findings = scanner.scan_content(content)
return scanner.format_findings(findings)
# Example usage
if __name__ == "__main__":
# Add example showing typical PyTorch tensor reconstruction
sample_content = open("data.dis").read()
result = scan_pickle_disassembly(sample_content)
print(result)
by Synthbot
by Synthbot
by Synthbot
by Synthbot
by Synthbot