Skip to content

Aggregator Module

Overview

The Aggregator module is responsible for converting per-file patch classifications into PR-level final decisions. It aggregates individual file classification results and produces summary statistics across all patches in a pull request.

Purpose

After classifying individual patches against ChatGPT code snippets, this module: - Aggregates file-level classifications to PR-level decisions - Counts classification distribution (PA, PN, NE, CC, ERROR) - Determines final PR status based on heuristics - Persists results to disk for analysis

Classification Hierarchy

Per-File Classification (from classifier.py)
    PA, PN, NE, CC, ERROR
    Aggregator (this module)
PR-Level Classification (final decision)

Key Functions

Final Classification Determination

The module uses the following logic to determine PR-level classification:

  • PA (Patch Applied): If any file is classified as PA
  • PN (Patch Not Applied): If all files are PN
  • NE (Not Existing): If all files are NE or missing
  • CC (Cannot Classify): If mixed results prevent clear decision
  • ERROR: If processing errors occurred

Usage Example

from analyzer import aggregator

# Aggregate file-level classifications into PR decisions
pr_results = {
    'PR-123': {
        'file1.py': {'result': [{'patchClass': 'PA'}, {'patchClass': 'PN'}]},
        'file2.py': {'result': [{'patchClass': 'PA'}]}
    }
}

# Get final classification per PR
final_classes = aggregator.final_class(pr_results)
# Output: {'PR-123': {'class': 'PA', 'files': 2, ...}}

# Get statistics
stats = aggregator.count_all_classifications(final_classes)
# Output: {'PA': 1, 'PN': 0, 'NE': 0, 'CC': 0, 'ERROR': 0}

Classification Constants

Constant Value Meaning
CLASS_PATCH_APPLIED PA Patch was successfully applied
CLASS_PATCH_NOT_APPLIED PN Patch was not applied
CLASS_NOT_EXISTING NE File doesn't exist in PR
CLASS_CANNOT_CLASSIFY CC Unable to classify
CLASS_ERROR ERROR Processing error occurred
CLASS_OTHER_EXT OTHER EXT Unsupported file extension

Data Structures

Input Format

Per-file classification results from classifier:

{
    'PR-123': {
        'file1.py': {
            'result': [
                {
                    'patchClass': 'PA',
                    'similarityRatio': 0.95,
                    'hunkMatches': {...},
                    'PrLink': 'https://github.com/...'
                },
                ...
            ]
        }
    }
}

Output Format

PR-level aggregated results:

{
    'PR-123': {
        'class': 'PA',
        'count': 2,
        'files': ['file1.py', 'file2.py'],
        'pa_count': 1,
        'pn_count': 1,
        'ne_count': 0,
        'cc_count': 0,
        'error_count': 0
    }
}


API Reference

Analysis totals module for PatchTrack.

Provides functions to load, aggregate, and classify patch analysis results into final decision categories based on per-file classifications.

analyzer.aggregator.read_totals(repo_file, mainline)

Load aggregated analysis results for a repository.

Parameters:

Name Type Description Default
repo_file str

Repository identifier.

required
mainline str

Branch specification in 'owner/repo' format.

required

Returns:

Type Description
Dict[str, Any]

Dictionary containing aggregated analysis totals.

Source code in analyzer/aggregator.py
def read_totals(repo_file: str, mainline: str) -> Dict[str, Any]:
    """Load aggregated analysis results for a repository.

    Args:
        repo_file: Repository identifier.
        mainline: Branch specification in 'owner/repo' format.

    Returns:
        Dictionary containing aggregated analysis totals.
    """
    owner, repo = mainline.split('/')
    file_path = os.path.join(TOTALS_DIR, f"{repo_file}_{owner}_{repo}_totals.pkl")
    with open(file_path, 'rb') as f:
        return pickle.load(f)

analyzer.aggregator.final_class(result_dict)

Compute final classification for each pull request.

Aggregates per-file classifications into a single PR-level classification using priority-based logic: - PA (Patch Applied) takes precedence if present - PN (Patch Not Applied) takes precedence if no PA but PN present - Otherwise, most frequent of CC/NE/ERROR

Parameters:

Name Type Description Default
result_dict List[Dict[str, Any]]

List of dictionaries containing per-file analysis results. Format: [{pr_id: {file: {result: [item, ...]}, ...}}, ...]

required

Returns:

Type Description
List[Dict[str, Any]]

List of dictionaries with aggregated totals and final classification.

Source code in analyzer/aggregator.py
def final_class(result_dict: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
    """Compute final classification for each pull request.

    Aggregates per-file classifications into a single PR-level classification
    using priority-based logic:
    - PA (Patch Applied) takes precedence if present
    - PN (Patch Not Applied) takes precedence if no PA but PN present
    - Otherwise, most frequent of CC/NE/ERROR

    Args:
        result_dict: List of dictionaries containing per-file analysis results.
                    Format: [{pr_id: {file: {result: [item, ...]}, ...}}, ...]

    Returns:
        List of dictionaries with aggregated totals and final classification.
    """
    pr_classes = []

    for pr_data in result_dict:
        pr_id, files_data = next(iter(pr_data.items()))
        pr_result = {pr_id: {}}

        counts = _initialize_classification_counts()
        project = ''

        for file_name, file_result in files_data.items():
            for item in file_result['result']:
                project = item['project']
                try:
                    patch_class = item['patchClass']
                    if patch_class == CLASS_OTHER_EXT or patch_class == CLASS_CANNOT_CLASSIFY:
                        counts[CLASS_CANNOT_CLASSIFY] += 1
                    elif patch_class == CLASS_NOT_EXISTING:
                        counts[CLASS_NOT_EXISTING] += 1
                    elif patch_class == CLASS_PATCH_APPLIED:
                        counts[CLASS_PATCH_APPLIED] += 1
                    elif patch_class == CLASS_PATCH_NOT_APPLIED:
                        counts[CLASS_PATCH_NOT_APPLIED] += 1
                    elif patch_class == CLASS_ERROR:
                        counts[CLASS_ERROR] += 1
                except (KeyError, ValueError):
                    counts[CLASS_ERROR] += 1

        ultimate_class = _determine_ultimate_class(counts)

        pr_result[pr_id] = {
            'totals': {
                'total_PA': counts[CLASS_PATCH_APPLIED],
                'total_NE': counts[CLASS_NOT_EXISTING],
                'total_CC': counts[CLASS_CANNOT_CLASSIFY],
                'total_PN': counts[CLASS_PATCH_NOT_APPLIED],
                'total_ERROR': counts[CLASS_ERROR]
            },
            'class': ultimate_class,
            'project': project
        }
        pr_classes.append(pr_result)

    return pr_classes

analyzer.aggregator.count_all_classifications(pr_classes)

Count final classification distribution across all pull requests.

Parameters:

Name Type Description Default
pr_classes List[Dict[str, Any]]

List of PR classification results from final_class().

required

Returns:

Type Description
Dict[str, int]

Dictionary with counts for each classification type.

Source code in analyzer/aggregator.py
def count_all_classifications(pr_classes: List[Dict[str, Any]]) -> Dict[str, int]:
    """Count final classification distribution across all pull requests.

    Args:
        pr_classes: List of PR classification results from final_class().

    Returns:
        Dictionary with counts for each classification type.
    """
    class_counts = {
        CLASS_PATCH_APPLIED: 0,
        CLASS_CANNOT_CLASSIFY: 0,
        CLASS_PATCH_NOT_APPLIED: 0,
        CLASS_NOT_EXISTING: 0,
        CLASS_ERROR: 0
    }

    for pr_result in pr_classes:
        pr_id, pr_data = next(iter(pr_result.items()))
        final_classification = pr_data.get('class')

        if final_classification in class_counts:
            class_counts[final_classification] += 1

    return class_counts

See Also

  • Classifier - Classifies individual patches
  • Main - Orchestrates the analysis pipeline
  • Analysis - Visualizes aggregated results