Skip to content

Source Loader

Overview

The SourceLoader class traverses source files in a repository, normalizes them (removes comments, collapses whitespace), and uses Bloom filters to efficiently detect matches against known patches. It is a critical component of the PatchTrack pipeline for identifying where patches may have been applied in source code.

This page explains the workflow, Bloom filter approach, key methods, data structures, and matching logic. For the complete API reference and docstrings, see the mkdocstrings section below.

Purpose

  • Load and traverse source files from the filesystem (single files or trees)
  • Normalize source code by removing language-specific comments
  • Build Bloom filters for fast probabilistic n-gram matching
  • Detect patches in source files using hash-based queries
  • Record matches and provide detailed results

Key Concepts

Bloom Filters

A Bloom filter is a space-efficient probabilistic data structure used to test whether an element is a member of a set. In SourceLoader:

  • A bit vector of size BLOOMFILTER_SIZE (2,097,152 bits ≈ 256 KB) stores presence information
  • Each n-gram is hashed three ways (FNV-1a, DJB2, SDBM) to set 3 bits
  • A query checks if all 3 hash bits are set; if not, the element is definitely not in the set
  • Trade-off: False positives possible, false negatives impossible

Normalization

Source code is normalized by:

  1. Removing language-specific comments (using helpers.remove_comments())
  2. Collapsing whitespace (all whitespace replaced with empty string)
  3. Converting to lowercase
  4. Splitting into tokens

This aggressive normalization ensures consistent matching regardless of formatting or comments.

Matching Strategy

  • Source code is split into sliding n-grams
  • Each source n-gram is hashed and stored in the Bloom filter
  • For each patch, all patch hashes are queried against the Bloom filter
  • Matches are recorded with patch ID and position information

Batch Processing

To avoid memory overflow with very large source files:

  • Bloom filter is rebuilt periodically (when num_ngram_processed exceeds BLOOMFILTER_SIZE / MIN_MN_RATIO)
  • Old patch hashes are checked against the current filter before reset
  • This creates "batches" of hashes within the file

Workflow

SourceLoader.traverse(source_path, patch, file_ext)
    ├─── Store patch_list and count from patch object
    ├─── For each source file:
    ├─── _process(source_path, file_ext):
    │    ├─ Read file
    │    ├─ Call _normalize()
    │    └─ Call _query_bloomfilter()
    ├─── _query_bloomfilter():
    │    ├─ Split source into tokens
    │    │
    │    ├─ For each patch:
    │    │  │
    │    │  ├─ Build Bloom filter from source n-grams
    │    │  ├─ For each batch:
    │    │  │  ├─ _check_bloom_match() [against old hashes]
    │    │  │  └─ Reset Bloom filter
    │    │  │
    │    │  └─ _check_patch_hashes() [final check]
    │    │
    │    └─ Record matches in _match_dict
    └─── Return total match count

Key Methods

Method Returns Purpose
traverse(source_path, patch, file_ext) int Load and analyze source files; query against patches. Return total matches.
_process(source_path, magic_ext) None Read a source file and normalize it; call _query_bloomfilter().
_normalize(source, file_ext) str Remove comments, collapse whitespace, lowercase. Return normalized string.
_query_bloomfilter(source_norm_lines, magic_ext) None Build Bloom filter from source n-grams and query against all patches. Handles batch processing and resets.
_check_bloom_match(patch_id) None Query old patch hashes against current Bloom filter; record matches.
_check_patch_hashes(patch_id) None Check all hashes from current patch against Bloom filter; populate _match_dict and _results.
items() List Get source file list.
length() int Get count of source files processed.
match_items() Dict Get match dictionary mapping patch ID → matches.
results() Dict Get results dictionary with per-hash match status.
source_hashes() List[Tuple] Get source n-gram hashes [(ngram_str, [hash1, hash2, hash3]), ...].

Data Structures

Match Dictionary

_match_dict: Dict[int, Dict[int, Dict[int, bool]]] maps:

{
    patch_id: {
        seq_num: {
            hash_value: is_match_bool,
            ...
        },
        ...
    },
    ...
}

Where: - patch_id: Identifier from patch list - seq_num: Sequence number of n-gram within patch (increments every 3 hashes) - hash_value: One of the three hash values (FNV-1a, DJB2, SDBM) - is_match_bool: Whether this hash was found in Bloom filter

Results Dictionary

_results: Dict[int, Dict[str, Any]] maps hash values to match results:

{
    hash_value: {'Match': is_match_bool},
    ...
}

Source Hashes List

_source_hashes: List[Tuple[str, List[int]]] stores n-grams and their hashes:

[
    (ngram_str, [fnv1a, djb2, sdbm]),
    (ngram_str, [fnv1a, djb2, sdbm]),
    ...
]

Usage Example

Basic workflow

from analyzer.patchLoader import PatchLoader
from analyzer.sourceLoader import SourceLoader

# Load patches first
loader = PatchLoader()
loader.traverse('data/buggy/', patch_type='buggy', file_ext=2)

# Now query source against patches
source_loader = SourceLoader()
match_count = source_loader.traverse(
    source_path='data/src/',
    patch=loader,
    file_ext=2  # Same file type index
)
print(f"Found {match_count} possible matches")

# Inspect results
matches = source_loader.match_items()
for patch_id, match_info in matches.items():
    print(f"Patch {patch_id}: {match_info}")

Checking specific hashes

source_loader = SourceLoader()
source_loader.traverse('data/src/', loader, file_ext=2)

results = source_loader.results()
for hash_val, result in results.items():
    if result['Match']:
        print(f"Hash {hash_val} found in source")

# Reverse lookup
source_ngrams = source_loader.source_hashes()
for ngram, hashes in source_ngrams:
    print(f"N-gram: {ngram} -> Hashes: {hashes}")

Bloom Filter Tuning

Memory and False Positive Trade-offs

  • BLOOMFILTER_SIZE (default 2,097,152): Larger size = fewer false positives but more memory used.
  • MIN_MN_RATIO (default 32): Controls batch boundary. Smaller ratio = more frequent resets and checks, reducing false positives but increasing computation.

Batch Size Calculation

When num_ngram_processed exceeds BLOOMFILTER_SIZE / MIN_MN_RATIO:

Batch size threshold = 2,097,152 / 32 ≈ 65,536 n-grams per batch

For typical file with 10 KB of normalized tokens, this means ~100+ batches. Adjust MIN_MN_RATIO or BLOOMFILTER_SIZE to tune performance vs. accuracy.

Best Practices

  • Use consistent file_ext indices across both PatchLoader and SourceLoader to ensure language-aware normalization is uniform.
  • Verify source files are readable (handle encoding errors gracefully).
  • Consider pre-filtering source files by size (skip very large files if memory-constrained).
  • Store match_items() and results() for post-processing and analysis.
  • Use source_hashes() for debugging mismatches or reverse n-gram lookup.

Notes on Bloom Filter Implementation

The code uses bitarray.bitarray from the bitarray package for efficient bit manipulation. The Bloom filter is reset between batches to manage memory usage on large files. This means the final _check_patch_hashes() checks only the last batch; earlier batches are checked via _check_bloom_match() during the resets.

Integration with PatchLoader and Classifier

  • Input: PatchLoader provides normalized patches and their hash lists
  • Output: Match information passed to classifier for further analysis
  • See: docs/reference/classifier.md for how matches are scored and classified as PA/PN/NE

API Reference

Source file loader for patch analysis.

The SourceLoader class traverses source files, normalizes them (removes comments, collapses whitespace), and uses Bloom filters (Replaced with temporary array of hash tables) to efficiently query for matches against patches.

analyzer.sourceLoader.SourceLoader

Loads and analyzes source files using Bloom filter-based patch matching.

This class normalizes source files (removing comments and excess whitespace), builds Bloom filters, and detects matches against known patches.

Source code in analyzer/sourceLoader.py
class SourceLoader:
    """Loads and analyzes source files using Bloom filter-based patch matching.

    This class normalizes source files (removing comments and excess whitespace),
    builds Bloom filters, and detects matches against known patches.
    """

    def __init__(self) -> None:
        """Initialize the SourceLoader with empty data structures."""
        self._patch_list: List[Any] = []
        self._npatch: int = 0
        self._source_list: List[Any] = []
        self._nsource: int = 0
        self._match_dict: Dict[int, Any] = {}
        self._nmatch: int = 0
        self._bit_vector: bitarray.bitarray = bitarray.bitarray(common.bloomfilter_size)
        self._results: Dict[int, Dict[str, Any]] = {}
        self._source_hashes: List[Tuple[str, List[int]]] = []
        self._patch_hashes: List[Any] = []

    def traverse(self, source_path: str, patch: Any, file_ext: int) -> int:
        """Traverse source files and query against patches.

        Args:
            source_path: Path to a file or directory to analyze.
            patch: Patch object with items() and length() methods.
            file_ext: File extension type index (from FileExt class).

        Returns:
            The number of matches found.
        """
        common.verbose_print('[+] traversing source files')
        start_time = time.time()
        self._patch_list = patch.items()
        self._npatch = patch.length()

        if os.path.isfile(source_path):
            common.verbose_print(f'  [-] {source_path}: {file_ext}')
            if MIN_FILE_EXT_TYPE <= file_ext < MAX_FILE_EXT_TYPE:
                self._process(source_path, file_ext)
        elif os.path.isdir(source_path):
            for root, dirs, files in os.walk(source_path):
                for file in files:
                    file_path = os.path.join(root, file)
                    common.verbose_print(f'  [-] {file_path}: {file_ext}')
                    if MIN_FILE_EXT_TYPE <= file_ext < MAX_FILE_EXT_TYPE:
                        self._process(file_path, file_ext)

        elapsed_time = time.time() - start_time
        common.verbose_print(f'[+] {self._nmatch} possible matches ... {elapsed_time:.1f}s\n')
        return self._nmatch

    def _process(self, source_path: str, magic_ext: int) -> None:
        """Normalize a source file and build a Bloom filter for queries.

        Args:
            source_path: Path to the source file.
            magic_ext: File extension type index.
        """
        with open(source_path, 'r') as source_file:
            source_orig_lines = source_file.read()

        source_norm_lines = self._normalize(source_orig_lines, magic_ext)
        self._query_bloomfilter(source_norm_lines, magic_ext)

    def _normalize(self, source: str, file_ext: int) -> str:
        """Normalize a source file by removing comments and whitespace.

        Args:
            source: The source code as a string.
            file_ext: File extension type index.

        Returns:
            Normalized source (lowercase, no comments, minimal whitespace).
        """
        source_no_comments = helpers.remove_comments(source, file_ext)
        # Remove whitespaces except newlines
        source_compact = common.WHITESPACE_REGEX.sub("", source_no_comments)
        # Convert to lowercase
        return source_compact.lower()

    def _query_bloomfilter(self, source_norm_lines: str, magic_ext: int) -> None:
        """Query Bloom filter against source to find patch matches.

        Uses n-gram hashing and Bloom filters to efficiently detect matches.

        Args:
            source_norm_lines: Normalized source code.
            magic_ext: File extension type index.
        """
        tokens = source_norm_lines.split()

        for patch_id in range(0, self._npatch):
            if len(tokens) < common.ngram_size:
                common.verbose_print('Warning: source too short for n-gram analysis')
                return

            common.ngram_size = self._patch_list[patch_id][6]
            self._bit_vector.setall(0)
            num_ngram = len(tokens) - common.ngram_size + 1
            num_ngram_processed = 0

            # Build Bloom filter from n-grams
            for i in range(0, num_ngram):
                if num_ngram_processed > common.bloomfilter_size / common.min_mn_ratio:
                    # Reset and re-check against old hashes
                    self._check_bloom_match(patch_id)
                    num_ngram_processed = 0
                    self._bit_vector.setall(0)

                ngram = ''.join(tokens[i : i + common.ngram_size])
                hash1 = common.fnv1a_hash(ngram) & (common.bloomfilter_size - 1)
                hash2 = common.djb2_hash(ngram) & (common.bloomfilter_size - 1)
                hash3 = common.sdbm_hash(ngram) & (common.bloomfilter_size - 1)
                self._bit_vector[hash1] = 1
                self._bit_vector[hash2] = 1
                self._bit_vector[hash3] = 1
                num_ngram_processed += 1
                self._source_hashes.append([ngram, [hash1, hash2, hash3]])

            # Final check against patch hashes
            self._check_patch_hashes(patch_id)

    def _check_bloom_match(self, patch_id: int) -> None:
        """Check if old patch hashes match current Bloom filter.

        Args:
            patch_id: The patch identifier.
        """
        hash_list_old = self._patch_list[patch_id].get('old_norm_lines', [])
        is_match = True
        for h in hash_list_old:
            if not self._bit_vector[h]:
                is_match = False
                break
        if is_match:
            if patch_id not in self._match_dict:
                self._match_dict[patch_id] = []
            self._match_dict[patch_id].append(self._nsource)
            self._nmatch += 1

    def _check_patch_hashes(self, patch_id: int) -> None:
        """Check and record patch hash matches against Bloom filter.

        Args:
            patch_id: The patch identifier.
        """
        hash_list = self._patch_list[patch_id].hash_list
        i = 0
        seq = 0
        for h in hash_list:
            if i == 3:
                i = 0
                seq += 1

            if patch_id not in self._match_dict:
                self._match_dict[patch_id] = {}

            if seq not in self._match_dict[patch_id]:
                self._match_dict[patch_id][seq] = {}

            is_match = bool(self._bit_vector[h])
            self._results[h] = {'Match': is_match}
            self._match_dict[patch_id][seq][h] = is_match
            i += 1

    def items(self) -> List[Any]:
        """Return the source list."""
        return self._source_list

    def length(self) -> int:
        """Return the number of sources."""
        return self._nsource

    def match_items(self) -> Dict[int, Any]:
        """Return the match dictionary."""
        return self._match_dict

    def results(self) -> Dict[int, Dict[str, Any]]:
        """Return the results dictionary."""
        return self._results

    def source_hashes(self) -> List[Tuple[str, List[int]]]:
        """Return the source hashes list."""
        return self._source_hashes

analyzer.sourceLoader.SourceLoader.__init__()

Initialize the SourceLoader with empty data structures.

Source code in analyzer/sourceLoader.py
def __init__(self) -> None:
    """Initialize the SourceLoader with empty data structures."""
    self._patch_list: List[Any] = []
    self._npatch: int = 0
    self._source_list: List[Any] = []
    self._nsource: int = 0
    self._match_dict: Dict[int, Any] = {}
    self._nmatch: int = 0
    self._bit_vector: bitarray.bitarray = bitarray.bitarray(common.bloomfilter_size)
    self._results: Dict[int, Dict[str, Any]] = {}
    self._source_hashes: List[Tuple[str, List[int]]] = []
    self._patch_hashes: List[Any] = []

analyzer.sourceLoader.SourceLoader.traverse(source_path, patch, file_ext)

Traverse source files and query against patches.

Parameters:

Name Type Description Default
source_path str

Path to a file or directory to analyze.

required
patch Any

Patch object with items() and length() methods.

required
file_ext int

File extension type index (from FileExt class).

required

Returns:

Type Description
int

The number of matches found.

Source code in analyzer/sourceLoader.py
def traverse(self, source_path: str, patch: Any, file_ext: int) -> int:
    """Traverse source files and query against patches.

    Args:
        source_path: Path to a file or directory to analyze.
        patch: Patch object with items() and length() methods.
        file_ext: File extension type index (from FileExt class).

    Returns:
        The number of matches found.
    """
    common.verbose_print('[+] traversing source files')
    start_time = time.time()
    self._patch_list = patch.items()
    self._npatch = patch.length()

    if os.path.isfile(source_path):
        common.verbose_print(f'  [-] {source_path}: {file_ext}')
        if MIN_FILE_EXT_TYPE <= file_ext < MAX_FILE_EXT_TYPE:
            self._process(source_path, file_ext)
    elif os.path.isdir(source_path):
        for root, dirs, files in os.walk(source_path):
            for file in files:
                file_path = os.path.join(root, file)
                common.verbose_print(f'  [-] {file_path}: {file_ext}')
                if MIN_FILE_EXT_TYPE <= file_ext < MAX_FILE_EXT_TYPE:
                    self._process(file_path, file_ext)

    elapsed_time = time.time() - start_time
    common.verbose_print(f'[+] {self._nmatch} possible matches ... {elapsed_time:.1f}s\n')
    return self._nmatch

analyzer.sourceLoader.SourceLoader.items()

Return the source list.

Source code in analyzer/sourceLoader.py
def items(self) -> List[Any]:
    """Return the source list."""
    return self._source_list

analyzer.sourceLoader.SourceLoader.length()

Return the number of sources.

Source code in analyzer/sourceLoader.py
def length(self) -> int:
    """Return the number of sources."""
    return self._nsource

analyzer.sourceLoader.SourceLoader.match_items()

Return the match dictionary.

Source code in analyzer/sourceLoader.py
def match_items(self) -> Dict[int, Any]:
    """Return the match dictionary."""
    return self._match_dict

analyzer.sourceLoader.SourceLoader.results()

Return the results dictionary.

Source code in analyzer/sourceLoader.py
def results(self) -> Dict[int, Dict[str, Any]]:
    """Return the results dictionary."""
    return self._results

analyzer.sourceLoader.SourceLoader.source_hashes()

Return the source hashes list.

Source code in analyzer/sourceLoader.py
def source_hashes(self) -> List[Tuple[str, List[int]]]:
    """Return the source hashes list."""
    return self._source_hashes

See Also

  • Main Module — How source loading fits in the pipeline
  • Patch Loader — How patches are prepared
  • Classifier — How matches are classified
  • ConstantBLOOMFILTER_SIZE and MIN_MN_RATIO
  • Helpers — Comment removal for different languages