PASTEDOWN   58   0
   1012 9.6 KB    242

hacky pkl scanner

By Synthbot
Created: 2024-10-27 22:57:38
Expiry: Never

import re
from typing import List, Dict, Set, Tuple
from dataclasses import dataclass
from collections import deque

@dataclass
class PickleContext:
    """Maintains context for pickle operation analysis"""
    stack: deque  # Simulates pickle's stack
    memo: dict    # Simulates pickle's memo
    last_global: str = None  # Tracks last GLOBAL operation
    last_mark: int = 0  # Tracks position of last MARK
    binpersid_context: List[str] = None  # Tracks context for BINPERSID ops

    def __post_init__(self):
        self.binpersid_context = []

class PickleSecurityScanner:
    def __init__(self):
        # Define dangerous operations and patterns
        self.dangerous_ops = {
            'GLOBAL': "Imports modules/objects which could be dangerous",
            'REDUCE': "Calls functions/methods which could execute arbitrary code",
            'BUILD': "Calls __setstate__ which could execute arbitrary code",
            'INST': "Creates instances which could be dangerous",
            'OBJ': "Creates arbitrary objects which could be dangerous",
            'NEWOBJ': "Creates new objects which could be dangerous"
        }

        self.dangerous_modules = {
            'os', 'sys', 'subprocess', 'commands', 'base64', 
            'pickle', 'cpickle', 'shelve', 'marshal', 'builtins',
            'posix', 'nt', 'shutil', 'importlib', 'pathlib',
            'tempfile', 'requests', 'urllib', 'socket'
        }

        self.dangerous_patterns = [
            # Command execution patterns
            r'eval\(', r'exec\(', r'execfile\(',
            r'system\(', r'popen\(', r'spawn',
            r'subprocess\.', r'shell=True',
            # File operations
            r'open\(', r'file\(', r'unlink\(',
            r'remove\(', r'rmdir\(', r'delete\(',
            r'write\(', r'chmod\(', r'chown\(',
            # Import related
            r'__import__', r'importlib', r'load_module',
            r'load_source', r'load_compiled',
            # Special methods
            r'__[\w]+__',  # Double underscore methods
            r'__reduce__', r'__reduce_ex__',
            r'__getattr__', r'__setattr__',
            r'__delattr__', r'__getattribute__',
            # Network related
            r'urllib\.', r'requests\.', r'socket\.',
            r'connect\(', r'bind\(', r'listen\(',
            # Code compilation
            r'compile\(', r'code\.',
            # Process manipulation
            r'fork\(', r'exec\(', r'execv\(',
            r'kill\(', r'pkill', r'terminate\(',
            # Shell interaction
            r'shell', r'bash', r'sh\s',
            r'command', r'getoutput\(',
            # Other dangerous operations
            r'serializ', r'deserializ',
            r'marshal\.', r'base64\.',
            r'tempfile\.', r'mktemp',
            r'random\.'
        ]

        self.safe_reduces = {
            'collections.OrderedDict',
            'torch._utils._rebuild_tensor_v2',
            'torch.HalfStorage',
            'torch.FloatStorage',
            'torch.IntStorage',
            'numpy.core.multiarray._reconstruct',
            'numpy.ndarray',
            'pandas.core.series.Series',
            'pandas.core.frame.DataFrame'
        }

    def analyze_binpersid_context(self, context: PickleContext) -> Tuple[bool, str]:
        """
        Analyze the context around a BINPERSID operation to determine if it's safe.
        Returns (is_safe, reason)
        """
        stack_contents = ' '.join(str(x) for x in context.stack)
        context_string = ' '.join(context.binpersid_context[-5:])

        # Safe if dealing with PyTorch storage/tensors
        if 'torch' in context.last_global and 'Storage' in context.last_global:
            return True, "PyTorch storage reconstruction"

        # Check against safe patterns
        for pattern in self.safe_persistent_patterns:
            if (re.search(pattern, stack_contents) or 
                re.search(pattern, context_string)):
                return True, f"Matches safe pattern: {pattern}"

        # Check for potential memory exhaustion
        try:
            sizes = re.findall(r'BININT\d*\s+(\d+)', context_string)
            if sizes:
                size = max(int(s) for s in sizes)
                if size > 1_000_000_000:
                    return False, f"Potential memory exhaustion (size: {size})"
        except (ValueError, TypeError):
            pass

        # Check for dangerous patterns
        for pattern in self.dangerous_patterns:
            if (re.search(pattern, stack_contents, re.IGNORECASE) or 
                re.search(pattern, context_string, re.IGNORECASE)):
                return False, f"Dangerous pattern in context: {pattern}"

        return True, "No suspicious patterns found"

    def is_safe_reduce(self, context: PickleContext) -> bool:
        """Analyze if a REDUCE operation is safe based on context."""
        if not context.last_global:
            return False

        # Check if it's a known safe reduction
        if any(safe in context.last_global for safe in self.safe_reduces):
            return True

        # Analyze stack contents for potentially dangerous operations
        stack_contents = ' '.join(str(x) for x in context.stack)
        dangerous_patterns = [
            r'eval', r'exec', r'system', r'popen', r'spawn',
            r'importlib', r'__import__', r'subprocess',
            r'file', r'open', r'remove', r'unlink', r'rmdir'
        ]

        return not any(re.search(pattern, stack_contents, re.IGNORECASE) 
                      for pattern in dangerous_patterns)

    def scan_line(self, line: str, context: PickleContext) -> Tuple[str, str, str]:
        """Scan a single line for dangerous operations with comprehensive pattern matching."""
        line = line.strip()
        parts = line.split()

        if not parts:
            return None

        # Update context
        context.binpersid_context.append(line)
        if len(context.binpersid_context) > 20:
            context.binpersid_context.pop(0)

        # Check for GLOBAL operations with dangerous modules
        if 'GLOBAL' in parts:
            try:
                global_name = ' '.join(parts[parts.index('GLOBAL')+1:])
                context.last_global = global_name
                if any(mod in global_name.lower() for mod in self.dangerous_modules):
                    return ('HIGH', f'Dangerous module import: {global_name}', line)
            except IndexError:
                pass

        # Special handling for BINPERSID
        if 'BINPERSID' in parts:
            is_safe, reason = self.analyze_binpersid_context(context)
            if not is_safe:
                return ('MEDIUM', f'Potentially unsafe BINPERSID: {reason}', line)
            return None

        # Check for dangerous patterns in the line
        for pattern in self.dangerous_patterns:
            if re.search(pattern, line, re.IGNORECASE):
                return ('HIGH', f'Dangerous pattern found: {pattern}', line)

        # Handle other dangerous operations
        for op in self.dangerous_ops:
            if op in parts:
                if op == 'REDUCE' and context.last_global:
                    # Check if this is a known safe reduction
                    if self.is_safe_reduce(context):
                        return None
                return ('MEDIUM', self.dangerous_ops[op], line)

        return None

    def scan_content(self, content: str) -> List[Dict]:
        """Scan pickle disassembly content for security issues with context."""
        findings = []
        context = PickleContext(stack=deque(), memo={})

        for line_num, line in enumerate(content.split('\n'), 1):
            result = self.scan_line(line, context)
            if result:
                severity, message, line_content = result
                findings.append({
                    'line_number': line_num,
                    'severity': severity,
                    'message': message,
                    'content': line_content
                })

        return findings

    def format_findings(self, findings: List[Dict]) -> str:
        """Format the findings into a readable report."""
        if not findings:
            return "No security issues found."

        report = []
        report.append("Security Scan Results")
        report.append("===================")

        # Group by severity
        for severity in ['HIGH', 'MEDIUM']:
            severity_findings = [f for f in findings if f['severity'] == severity]
            if severity_findings:
                report.append(f"\n{severity} Severity Issues:")
                report.append("-" * 20)
                for finding in severity_findings:
                    report.append(f"Line {finding['line_number']}: {finding['message']}")
                    report.append(f"Content: {finding['content']}")
                    report.append("")

        return "\n".join(report)

def scan_pickle_disassembly(content: str) -> str:
    """Main function to scan pickle disassembly content."""
    scanner = PickleSecurityScanner()
    findings = scanner.scan_content(content)
    return scanner.format_findings(findings)

# Example usage
if __name__ == "__main__":
    # Add example showing typical PyTorch tensor reconstruction
    sample_content = open("data.dis").read()

    result = scan_pickle_disassembly(sample_content)
    print(result)

Pony Preservation Project - /mlp/con 2021

by Synthbot

Pony Preservation Project - /mlp/con 2020

by Synthbot

Preservation Project History - 2020 to 2021

by Synthbot

Missing music

by Synthbot

Animation format

by Synthbot