Skip to content

Patch Loader

Overview

The PatchLoader class loads and processes patch files in unified diff format, extracts added/removed lines, builds n-gram hash lists, and tracks patches by file type. It is a core component of the PatchTrack pipeline responsible for normalizing and preparing patch data for classification.

This page explains the workflow, key methods, data structures, and tuning parameters. For the complete API reference and docstrings, see the mkdocstrings section below.

Purpose

  • Load patch files from the filesystem (single files or directory trees)
  • Extract added lines (from patch files) and removed lines (from buggy files)
  • Normalize content by removing comments and collapsing whitespace
  • Build n-gram hash lists using three independent hash functions
  • Track file type information for language-aware processing

Key Concepts

Patch Types

  • Buggy patches: Files with removed lines (prefixed with - in diff format) represent the original buggy code
  • Patch files: Files with added lines (prefixed with + in diff format) represent the fix/patch applied to address the bug

N-gram hashing

Sequences of NGRAM_SIZE tokens are hashed using three hash functions:

  • FNV-1a: Fast, non-cryptographic hash
  • DJB2: Daniel J. Bernstein's hash
  • SDBM: SDBM hash function

All three hashes are stored for each n-gram to improve matching robustness (three independent hash functions reduce collision risk).

Normalization

Raw patch lines are normalized by:

  1. Converting to lowercase
  2. Removing language-specific comments (using helpers.remove_comment())
  3. Collapsing whitespace and splitting into tokens

File Type Index

The file_ext parameter is an integer index (2–39 range) that identifies the file type/language. This is used to apply language-specific comment removal and tokenization rules. The index maps to extensions defined in analyzer.constant.EXTENSIONS.

Workflow

PatchLoader.traverse(patch_path, patch_type, file_ext)
    ├─── For each patch file:
    ├─── _process_patch_file() routes to:
    │    ├─ _process_buggy()  [removed lines]
    │    └─ _process_patch()  [added lines]
    ├─── For each diff hunk (@@):
    │    ├─ Extract lines (- or +)
    │    ├─ Format for display (HTML color tags)
    │    └─ Call _add_patch_from_diff()
    ├─── _add_patch_from_diff():
    │    ├─ Normalize lines
    │    ├─ Call _build_hash_list()
    │    ├─ Create PatchInfo object
    │    └─ Append to _patch_list
    └─── Return count of patches loaded

Key Methods

Method Returns Purpose
traverse(patch_path, patch_type, file_ext) int Load and process all patches from path; return count. Routes to _process_buggy() or _process_patch().
_process_buggy(patch_path, file_ext) None Extract removed lines (prefix -), accumulate diff hunks, and call _add_patch_from_diff().
_process_patch(patch_path, file_ext) None Extract added lines (prefix +), accumulate diff hunks, and call _add_patch_from_diff().
_add_patch_from_diff(...) None Normalize diff lines, build hash list, create PatchInfo, append to internal list.
_normalize(patch, file_ext) str Remove comments, collapse whitespace, lowercase.
_build_hash_list(diff_norm_lines) Tuple Compute FNV-1a, DJB2, SDBM hashes for each n-gram. Return (hash_list, patch_hashes).
items() List[PatchInfo] Get all loaded PatchInfo objects.
length() int Get count of loaded patches.
hashes() Dict[int, str] Get hash-to-ngram lookup table.
added() List[List[str]] Get all added (patch) lines as token lists.
removed() List[List[str]] Get all removed (buggy) lines as token lists.

Data Structures

PatchInfo (from common.py)

Each patch record contains:

PatchInfo(
    path: str,                      # "[filename] file.py #2" (patch id)
    file_type: int,                 # File extension type index (2-39)
    diff_orig_lines: str,           # Raw diff lines (HTML-formatted)
    diff_norm_lines: List[str],     # Normalized tokens
    hash_list: List[int],           # All hashes (fnv1a, djb2, sdbm per n-gram)
    patch_hashes: List[Tuple[...]], # (ngram_str, [hash1, hash2, hash3])
    ngram_size: int                 # Size of n-grams used
)

Hash Storage

Internal _hashes: Dict[int, str] maps hash values to n-gram strings for reverse lookup. Built and populated during _build_hash_list().

Usage Example

Basic usage (CLI-like)

from analyzer.patchLoader import PatchLoader

loader = PatchLoader()

# Process buggy patches (removed lines)
buggy_count = loader.traverse(
    patch_path='data/buggy/',
    patch_type='buggy',
    file_ext=2  # Language type index for Python
)
print(f"Loaded {buggy_count} buggy patches")

# Process fixes (added lines)
patch_count = loader.traverse(
    patch_path='data/patches/',
    patch_type='patch',
    file_ext=2
)
print(f"Loaded {patch_count} patch files")

# Inspect results
for patch_info in loader.items():
    print(f"Patch: {patch_info.path}")
    print(f"  File type: {patch_info.file_type}")
    print(f"  Hashes: {len(patch_info.hash_list)}")
    print(f"  N-gram size: {patch_info.ngram_size}")

# Retrieve specific data
hash_map = loader.hashes()  # Dict[hash_value] -> ngram string
removed_lines = loader.removed()  # List of removed token lists
added_lines = loader.added()  # List of added token lists

Processing a single file

loader = PatchLoader()
loader.traverse(
    patch_path='data/buggy/file.patch',
    patch_type='buggy',
    file_ext=2
)

N-gram Size and Performance

  • Default NGRAM_SIZE from analyzer.constant is 1.
  • Larger n-grams (e.g., 3–7) reduce false positives but increase hashing overhead and may decrease recall.
  • If a diff is shorter than ngram_size, the ngram_size is reduced dynamically in _add_patch_from_diff() to match the diff length.
  • The ngram_size used is stored in each PatchInfo for later reference.

Best Practices

  • Use consistent file_ext indices across buggy and patch processing to ensure language-aware normalization is applied uniformly.
  • Pre-validate patch file format (unified diff) before passing to traverse().
  • Monitor _npatch or call length() to confirm patches were loaded successfully.
  • Store loader.hashes() for reverse-lookup if you need to map hash values back to n-gram strings during classification.

Notes on Comments Removal

The _normalize() method calls helpers.remove_comment(source, file_ext) to strip language-specific comments before tokenization. The file_ext parameter tells the helper which language syntax to use (e.g., # for Python, // for Java, etc.). See docs/reference/helpers.md for details.

API Reference

Patch loader for analyzing patch files.

The PatchLoader class loads and processes patch files (unified diff format), builds hash lists using n-grams, and tracks added/removed lines.

analyzer.patchLoader.PatchLoader

Loads and processes patch files using diff format and n-gram hashing.

Source code in analyzer/patchLoader.py
class PatchLoader:
    """Loads and processes patch files using diff format and n-gram hashing."""

    def __init__(self) -> None:
        """Initialize the PatchLoader with empty data structures."""
        self._patch_list: List[common.PatchInfo] = []
        self._npatch: int = 0
        self._hashes: Dict[int, str] = {}
        self._only_removed: List[List[str]] = []
        self._only_added: List[List[str]] = []

    def traverse(self, patch_path: str, patch_type: str, file_ext: int) -> int:
        """Traverse patch files and process them.

        Args:
            patch_path: Path to a patch file or directory.
            patch_type: Type of patch ('buggy' or 'patch').
            file_ext: File extension type index.

        Returns:
            The number of patches processed.
        """
        start_time = time.time()

        if os.path.isfile(patch_path):
            common.verbose_print(f'  [-] {patch_path}: {file_ext}')
            if MIN_FILE_EXT_TYPE <= file_ext < MAX_FILE_EXT_TYPE:
                self._process_patch_file(patch_path, patch_type, file_ext)
        elif os.path.isdir(patch_path):
            for root, dirs, files in os.walk(patch_path):
                for file in files:
                    file_path = os.path.join(root, file)
                    common.verbose_print(f'  [-] {file_path}: {file_ext}')
                    if MIN_FILE_EXT_TYPE <= file_ext < MAX_FILE_EXT_TYPE:
                        self._process_patch_file(file_path, patch_type, file_ext)
                        if patch_type == 'buggy':
                            self.important_hashes = []

        self._npatch = len(self._patch_list)
        elapsed_time = time.time() - start_time
        return self._npatch

    def _process_patch_file(self, patch_path: str, patch_type: str, file_type: int) -> None:
        """Route patch processing based on type.

        Args:
            patch_path: Path to the patch file.
            patch_type: 'buggy' or 'patch'.
            file_type: File extension type index.
        """
        if patch_type == 'buggy':
            self._process_buggy(patch_path, file_type)
        elif patch_type == 'patch':
            self._process_patch(patch_path, file_type)

    def _add_patch_from_diff(
        self,
        patch_filename: str,
        diff_file: str,
        diff_cnt: int,
        diff_lines: List[str],
        diff_orig_lines: List[str],
        file_type: int,
    ) -> None:
        """Add a patch record from diff hunks.

        Normalizes diff lines and builds hash list if sufficient length.

        Args:
            patch_filename: Name of the patch file.
            diff_file: Path to the file being diffed.
            diff_cnt: Diff hunk counter.
            diff_lines: Lines from the diff.
            diff_orig_lines: Original formatted lines for display.
            file_type: File extension type index.
        """
        diff_norm_lines = self._normalize(''.join(diff_lines), file_type).split()

        if len(diff_norm_lines) >= common.ngram_size:
            path = f'[{patch_filename}] {diff_file} #{diff_cnt}'
            hash_list, patch_hashes = self._build_hash_list(diff_norm_lines)
            self._patch_list.append(
                common.PatchInfo(
                    path, file_type, ''.join(diff_orig_lines),
                    diff_norm_lines, hash_list, patch_hashes, common.ngram_size
                )
            )
        else:
            # Adjust ngram_size if diff is too short
            common.ngram_size = len(diff_norm_lines)
            path = f'[{patch_filename}] {diff_file} #{diff_cnt}'
            hash_list, patch_hashes = self._build_hash_list(diff_norm_lines)
            self._patch_list.append(
                common.PatchInfo(
                    path, file_type, ''.join(diff_orig_lines),
                    diff_norm_lines, hash_list, patch_hashes, common.ngram_size
                )
            )

    def _process_buggy(self, patch_path: str, file_type: int) -> None:
        """Process a 'buggy' patch file (removed lines).

        Args:
            patch_path: Path to the patch file.
            file_type: File extension type index.
        """
        patch_filename = os.path.basename(patch_path)
        with open(patch_path, 'r') as f:
            patch_lines = f.readlines()

        diff_file = re.sub(r'\.patch$', '', patch_path)
        diff_cnt = 0
        diff_buggy_lines = []
        diff_orig_lines = []
        removed_lines = []

        for line in patch_lines:
            if line.startswith('@@'):
                if diff_buggy_lines:
                    self._add_patch_from_diff(
                        patch_filename, diff_file, diff_cnt,
                        diff_buggy_lines, diff_orig_lines, file_type
                    )
                    diff_buggy_lines.clear()
                    diff_orig_lines.clear()

                if removed_lines:
                    for removed in removed_lines:
                        removed_norm = self._normalize(removed, file_type).split()
                        self._only_removed.append(removed_norm)
                    removed_lines.clear()

                diff_cnt += 1

            elif line.startswith('-'):
                diff_buggy_lines.append(line[1:])
                diff_orig_lines.append('<font color="#AA0000">')
                diff_orig_lines.append(line.replace('<', '&lt;').replace('>', '&gt;'))
                diff_orig_lines.append('</font>')
                removed_lines.append(line[1:])

            elif line.startswith(' '):
                diff_buggy_lines.append(line[1:])
                diff_orig_lines.append(line.replace('<', '&lt;').replace('>', '&gt;'))

        # Process final diff hunk if any
        if diff_buggy_lines:
            self._add_patch_from_diff(
                patch_filename, diff_file, diff_cnt,
                diff_buggy_lines, diff_orig_lines, file_type
            )

            if removed_lines:
                for removed in removed_lines:
                    removed_norm = self._normalize(removed, file_type).split()
                    self._only_removed.append(removed_norm)

    def _process_patch(self, patch_path: str, file_type: int) -> None:
        """Process a 'patch' file (added lines).

        Args:
            patch_path: Path to the patch file.
            file_type: File extension type index.
        """
        patch_filename = os.path.basename(patch_path)
        with open(patch_path, 'r') as f:
            patch_lines = f.readlines()

        diff_file = re.sub(r'\.patch$', '', patch_path)
        diff_cnt = 0
        diff_patch_lines = []
        diff_orig_lines = []
        added_lines = []

        for line in patch_lines:
            if line.startswith('@@'):
                if diff_patch_lines:
                    self._add_patch_from_diff(
                        patch_filename, diff_file, diff_cnt,
                        diff_patch_lines, diff_orig_lines, file_type
                    )
                    diff_patch_lines.clear()
                    diff_orig_lines.clear()

                if added_lines:
                    for added in added_lines:
                        added_norm = self._normalize(added, file_type).split()
                        self._only_added.append(added_norm)
                    added_lines.clear()

                diff_cnt += 1

            elif line.startswith('+'):
                diff_patch_lines.append(line[1:])
                diff_orig_lines.append('<font color="#00AA00">')
                diff_orig_lines.append(line.replace('<', '&lt;').replace('>', '&gt;'))
                diff_orig_lines.append('</font>')
                added_lines.append(line[1:])

            elif line.startswith(' '):
                diff_patch_lines.append(line[1:])
                diff_orig_lines.append(line.replace('<', '&lt;').replace('>', '&gt;'))

        # Process final diff hunk if any
        if diff_patch_lines:
            self._add_patch_from_diff(
                patch_filename, diff_file, diff_cnt,
                diff_patch_lines, diff_orig_lines, file_type
            )

            if added_lines:
                for added in added_lines:
                    added_norm = self._normalize(added, file_type).split()
                    self._only_added.append(added_norm)

    def _normalize(self, patch: str, file_ext: int) -> str:
        """Normalize patch content by removing comments and collapsing whitespace.

        Args:
            patch: Raw patch text.
            file_ext: File extension type index.

        Returns:
            Normalized patch text (lowercased, whitespace-collapsed).
        """
        source = patch.lower()
        source = helpers.remove_comment(source, file_ext)
        source = re.sub(common.WHITESPACE_REGEX, ' ', source).strip()
        return source

    def _build_hash_list(self, diff_norm_lines: List[str]) -> Tuple[List[int], List[Tuple[str, List[int]]]]:
        """Build n-gram hash list from normalized diff lines.

        Args:
            diff_norm_lines: Normalized lines split by whitespace.

        Returns:
            Tuple of (hash_list, patch_hashes) where hash_list contains hashes
            and patch_hashes contains (original_ngram, hash_list) tuples.
        """
        hash_list = []
        patch_hashes = []

        for i in range(len(diff_norm_lines) - common.ngram_size + 1):
            ngram = ' '.join(diff_norm_lines[i:i + common.ngram_size])
            fnv1a = common.fnv1a_hash(ngram)
            djb2 = common.djb2_hash(ngram)
            sdbm = common.sdbm_hash(ngram)

            hash_list.append(fnv1a)
            hash_list.append(djb2)
            hash_list.append(sdbm)
            patch_hashes.append((ngram, [fnv1a, djb2, sdbm]))

            self._hashes[fnv1a] = ngram
            self._hashes[djb2] = ngram
            self._hashes[sdbm] = ngram

        return hash_list, patch_hashes

    def items(self) -> List[common.PatchInfo]:
        """Get all patch items."""
        return self._patch_list

    def length(self) -> int:
        """Get number of patches loaded."""
        return len(self._patch_list)

    def hashes(self) -> Dict[int, str]:
        """Get mapping of hash to ngram."""
        return self._hashes

    def added(self) -> List[List[str]]:
        """Get all added lines."""
        return self._only_added

    def removed(self) -> List[List[str]]:
        """Get all removed lines."""
        return self._only_removed

analyzer.patchLoader.PatchLoader.__init__()

Initialize the PatchLoader with empty data structures.

Source code in analyzer/patchLoader.py
def __init__(self) -> None:
    """Initialize the PatchLoader with empty data structures."""
    self._patch_list: List[common.PatchInfo] = []
    self._npatch: int = 0
    self._hashes: Dict[int, str] = {}
    self._only_removed: List[List[str]] = []
    self._only_added: List[List[str]] = []

analyzer.patchLoader.PatchLoader.traverse(patch_path, patch_type, file_ext)

Traverse patch files and process them.

Parameters:

Name Type Description Default
patch_path str

Path to a patch file or directory.

required
patch_type str

Type of patch ('buggy' or 'patch').

required
file_ext int

File extension type index.

required

Returns:

Type Description
int

The number of patches processed.

Source code in analyzer/patchLoader.py
def traverse(self, patch_path: str, patch_type: str, file_ext: int) -> int:
    """Traverse patch files and process them.

    Args:
        patch_path: Path to a patch file or directory.
        patch_type: Type of patch ('buggy' or 'patch').
        file_ext: File extension type index.

    Returns:
        The number of patches processed.
    """
    start_time = time.time()

    if os.path.isfile(patch_path):
        common.verbose_print(f'  [-] {patch_path}: {file_ext}')
        if MIN_FILE_EXT_TYPE <= file_ext < MAX_FILE_EXT_TYPE:
            self._process_patch_file(patch_path, patch_type, file_ext)
    elif os.path.isdir(patch_path):
        for root, dirs, files in os.walk(patch_path):
            for file in files:
                file_path = os.path.join(root, file)
                common.verbose_print(f'  [-] {file_path}: {file_ext}')
                if MIN_FILE_EXT_TYPE <= file_ext < MAX_FILE_EXT_TYPE:
                    self._process_patch_file(file_path, patch_type, file_ext)
                    if patch_type == 'buggy':
                        self.important_hashes = []

    self._npatch = len(self._patch_list)
    elapsed_time = time.time() - start_time
    return self._npatch

analyzer.patchLoader.PatchLoader.items()

Get all patch items.

Source code in analyzer/patchLoader.py
def items(self) -> List[common.PatchInfo]:
    """Get all patch items."""
    return self._patch_list

analyzer.patchLoader.PatchLoader.length()

Get number of patches loaded.

Source code in analyzer/patchLoader.py
def length(self) -> int:
    """Get number of patches loaded."""
    return len(self._patch_list)

analyzer.patchLoader.PatchLoader.hashes()

Get mapping of hash to ngram.

Source code in analyzer/patchLoader.py
def hashes(self) -> Dict[int, str]:
    """Get mapping of hash to ngram."""
    return self._hashes

analyzer.patchLoader.PatchLoader.added()

Get all added lines.

Source code in analyzer/patchLoader.py
def added(self) -> List[List[str]]:
    """Get all added lines."""
    return self._only_added

analyzer.patchLoader.PatchLoader.removed()

Get all removed lines.

Source code in analyzer/patchLoader.py
def removed(self) -> List[List[str]]:
    """Get all removed lines."""
    return self._only_removed

See Also

  • Main Module — How patches are loaded in the pipeline
  • Classifier — How hash lists are used for matching
  • Constant — File type indices and NGRAM_SIZE
  • Helpers — Comment removal for different languages