Skip to content

fpg_observational_model

FPG Observational Model - Genomic surveillance sampling for EMOD simulations.

This package provides tools for converting EMOD's Full Parasite Genetics (FPG) simulation output into recapitulative sampling for genomic surveillance analysis.

comprehensive_group_summary(group)

Calculate comprehensive summary statistics for infection data.

Returns mean, median, std, min, max for continuous variables.

Source code in fpg_observational_model/unified_metric_calculations.py
def comprehensive_group_summary(group):
    """
    Calculate comprehensive summary statistics for infection data.

    Returns mean, median, std, min, max for continuous variables.
    """
    if len(group) == 0:
        return _empty_comprehensive_summary()

    # Monogenomics and polygenomics counts and COI stats
    true_coi_stats = _comprehensive_stats(group['true_coi'], 'true_coi')
    effective_coi_stats = _comprehensive_stats(group['effective_coi'], 'effective_coi')

    # Genome ID analysis
    all_genome_stats = _analyze_genome_ids(group['recursive_nid'], "all_genomes")
    mono_genome_stats = _analyze_genome_ids(group[group['effective_coi'] == 1]['recursive_nid'], "mono_genomes")

    # Cotransmission and superinfection analysis
    poly_series =  group[group['true_coi'] > 1]['cotx']
    cotxn_counts = _analyze_binary_in_subset(poly_series, 'cotransmission')

    # Combine all stats
    result = pd.Series({
        'n_infections': len(group),
        **true_coi_stats,
        **effective_coi_stats,
        **all_genome_stats,
        **mono_genome_stats,
        **cotxn_counts
    })

    if 'genotype_coi' in group.columns:
        genotype_coi_stats = _comprehensive_stats(group['genotype_coi'], 'genotype_coi')

        result = pd.concat([result, genotype_coi_stats])

    return result

extract_sampled_infections(sample_df)

Remove rows where all sampling columns ('rep' columns) are NaN.

Parameters:

Name Type Description Default
sampled_df DataFrame

DataFrame with sampling columns

required

Returns:

Type Description

pd.DataFrame: DataFrame with rows removed where all sampling columns are NaN

Source code in fpg_observational_model/run_observational_model.py
def extract_sampled_infections(sample_df):
    """
    Remove rows where all sampling columns ('rep' columns) are NaN.

    Parameters:
        sampled_df (pd.DataFrame): DataFrame with sampling columns

    Returns:
        pd.DataFrame: DataFrame with rows removed where all sampling columns are NaN
    """
    # String to match in column names
    match_string = 'rep'

    # Identify columns matching the string
    sample_cols = sample_df.filter(regex=match_string)

    if sample_cols.empty:
        print("No 'rep' columns found - returning original DataFrame")
        return sample_df

    # Find rows where ALL sampling columns are NaN
    rows_all_nan = sample_cols.isna().all(axis=1)
    rows_to_drop = rows_all_nan.sum()

    print(f"Found {len(sample_cols.columns)} sampling columns: {sample_cols.columns.tolist()}")
    print(f"Dropping {rows_to_drop} rows where all sampling columns are NaN")

    # Keep rows where at least one sampling column has a value
    df_cleaned = sample_df[~rows_all_nan]

    return df_cleaned

get_default_config()

Return the default observational model configuration.

Source code in fpg_observational_model/run_observational_model.py
def get_default_config():
    """Return the default observational model configuration."""
    observational_model_config = {
        'hard_filters': {
            'symptomatics_only': True, 
            'monogenomic_infections_only': False,
            'day_snapshot': False
        },
        'intervention_start_month': 29, # Provide month where an intervention is applied. Currently any sampling pre/post intervention for a single intervention is supported. 
        'sampling_configs': {
            'random': {
                'method': 'random',
                'n_samples_year': 100,
                'replicates': 2,
                'method_params': {
                    'population_proportions': [1, 0], # Use to sample from the source or sink only, equally, etc. Within population comparisons of genetic metrics can be specified below - just make sure to total number of samples per year * proportion reflects the numbers you want per population.
                    'monogenomic_proportion': False, # Set to False if sampling randomly 
                    'equal_monthly': False}
            },
            'seasonal': {
                'method': 'seasonal',
                'n_samples_year': 100,
                'replicates': 2,
                'method_params': {
                    'season': 'full', # Options: full or peak; currently hardcoded to match Senegal's seasonality; update for other scenarios in unified_sampling.py
                }
            },
            # 'age': { # Example of how to set-up a sampling scheme based on age, to mirror biased sampling such as school surveys and health facility comparisons. 
            #     'method': 'age',
            #     'n_samples_year': 15,
            #     'replicates': 1
            # }

        },
        'metrics': {
            'cotransmission_proportion': True,
            'complexity_of_infection': True, # Will calculate both true COI and effective COI from the unique strains identified in the sample.
            'heterozygosity': True,
            'identity_by_descent': False,
            'identity_by_state': True,
            'individual_ibx': True,
            'fws': True,
            'monogenomic_proportion': True,
            'rh': True,
            'unique_genome_proportion': True # Will calculate both the proportion of unique genomes in the sampled infections to replicate phasing and from monogenomic samples with an effective COI of 1  only to match barcode limits.
        },
        'subpopulation_comparisons': { # Supported for yearly and seasonal temporal sampling schemes, not age-based sampling. 
            'add_monthly': False,  # Whether to add monthly comparisons within each year
            'populations': False,  # Defined by the population node in EMOD
            'polygenomic': True,  # Is polygenomic = 1, else monogenomic = 0
            'symptomatic': False,  # Is symptomatic = 1, else asymptomatic = 0
            'age_bins': False     # Default age bins: 0-5, 5-15, 15+
        }
    }

    return observational_model_config

get_matrix(name)

Get a registered matrix

Source code in fpg_observational_model/unified_metric_calculations.py
def get_matrix(name):
    """Get a registered matrix"""
    if name in _matrix_registry:
        return _matrix_registry[name]
    else:
        available = list(_matrix_registry.keys())
        raise KeyError(f"Matrix '{name}' not found. Available: {available}")

register_matrix(name, matrix)

Register a matrix for IBx calculations

Source code in fpg_observational_model/unified_metric_calculations.py
def register_matrix(name, matrix):
    """Register a matrix for IBx calculations"""
    _matrix_registry[name] = matrix
    print(f"Registered matrix: {name}")

run_sampling_model(input_df, config, intervention_start_month=None, verbose=True)

Main function to run the observational model sampling pipeline.

Parameters:

Name Type Description Default
input_df DataFrame

Input FPG DataFrame (required)

required
config dict

Configuration dictionary (required)

required
intervention_start_month int

Continuous month when intervention starts

None
verbose bool

Whether to print progress messages

True

Returns:

Type Description

pd.DataFrame: Final dataframe with all sampling columns added

Raises:

Type Description
ValueError

If input_df or config is None

Source code in fpg_observational_model/unified_sampling.py
def run_sampling_model(input_df, config, intervention_start_month=None, verbose=True):
    """
    Main function to run the observational model sampling pipeline.

    Parameters:
        input_df (pd.DataFrame): Input FPG DataFrame (required)
        config (dict): Configuration dictionary (required)
        intervention_start_month (int, optional): Continuous month when intervention starts
        verbose (bool): Whether to print progress messages

    Returns:
        pd.DataFrame: Final dataframe with all sampling columns added

    Raises:
        ValueError: If input_df or config is None
    """
    if verbose:
        print("=== Running Observational Model Sampling Pipeline ===")

    # Validate required inputs
    if input_df is None:
        raise ValueError("input_df is required and cannot be None")

    if config is None:
        raise ValueError("config is required and cannot be None")

    df = input_df.copy()

    # Add simulation_year column if it doesn't exist
    if 'simulation_year' not in df.columns and 'year' in df.columns:
        df['simulation_year'] = df['year'].copy()    


    try:
        # Step 1: Apply hard filters
        if verbose:
            print("\n=== Step 1: Apply hard filters ===")
        filter_params = process_config_filters(config)
        df_filtered = apply_emod_filters(df, **filter_params)

        intervention_start_month = config.get('intervention_start_month', intervention_start_month)
        if intervention_start_month is not None or intervention_start_month > 0:
            df_filtered = adjust_time_columns(df_filtered, intervention_start_month=intervention_start_month)

        df_filtered['group_year'] = df_filtered['intervention_year'].copy() if 'intervention_year' in df_filtered.columns else df['simulation_year'].copy()
        if config['subpopulation_comparisons'].get('add_monthly'):
            df_filtered['group_month'] = df_filtered['intervention_month'].copy() if 'intervention_month' in df_filtered.columns else df['continuous_month'].copy() 

        # Step 2: Calculate infection metrics
        if verbose:
            print("\n=== Step 2: Calculate individual infection metrics ===")
        df_metrics = calculate_infection_metrics(df_filtered.copy())

        # Step 3: Run sampling for each method
        if verbose:
            print("\n=== Step 3: Run sampling methods ===")

        # Start with the base dataframe
        final_df = df_metrics.copy()


        # Apply each sampling method
        for sampling_name, sampling_config in config['sampling_configs'].items():
            if verbose:
                print(f"\n--- Processing {sampling_name} sampling ---")

            # Run the sampling method
            sampled_df = run_sampling_functions(df_metrics, sampling_config)

            if config['subpopulation_comparisons'].get('add_monthly'):
                sampled_df['month_rep0'] = 1 

            # Merge the sampling columns back to the main dataframe
            sampling_columns = [col for col in sampled_df.columns 
                              if sampling_name in col and 'rep' in col]

            for col in sampling_columns:
                final_df[col] = sampled_df[col]

        if config['subpopulation_comparisons'].get('add_monthly'):
            final_df['month_rep0'] = 1  # All infections included for monthly analysis
            print("Added 'month_rep0' column for monthly subpopulation comparisons")


        # Final summary
        if verbose:
            print(f"\n=== Final Results ===")
            print(f"Final dataframe shape: {final_df.shape}")

            # Show sampling column summary
            sampling_cols = [col for col in final_df.columns 
                            if any(method in col for method in ['random', 'seasonal', 'age', 'month']) and 'rep' in col]
            print(f"Sampling columns created: {sampling_cols}")

            for col in sampling_cols:
                sampled_count = final_df[col].notna().sum()
                print(f"  {col}: {sampled_count} samples")

        return final_df

    except Exception as e:
        print(f"Error in observational model pipeline: {str(e)}")
        raise

subset_by_age(infection_df, n_samples_year, replicates, scheme='ageBins', age_bins=None, age_bin_weights=None, base_seed=418)

Subset samples based on age bins and add columns to the original dataframe.

Parameters:

Name Type Description Default
infection_df DataFrame

Input DataFrame

required
n_samples_year int

Number of samples per year (or per age bin if weights provided)

required
replicates int

Number of replicates

required
scheme str

Base name for columns (e.g., 'ageBins')

'ageBins'
age_bins list

Age bin boundaries in days [0, bin1, bin2, ..., max]

None
age_bin_weights list

Optional weights for age bins (must sum to 1)

None
base_seed int

Base random seed

418

Returns:

Type Description

pd.DataFrame: Original dataframe with new age bin sampling columns added

Source code in fpg_observational_model/unified_sampling.py
def subset_by_age(infection_df, n_samples_year, replicates,
                  scheme="ageBins",
                  age_bins=None, 
                  age_bin_weights=None,
                  base_seed=418):
    """
    Subset samples based on age bins and add columns to the original dataframe.

    Parameters:
      infection_df (pd.DataFrame): Input DataFrame
      n_samples_year (int): Number of samples per year (or per age bin if weights provided)
      replicates (int): Number of replicates
      scheme (str): Base name for columns (e.g., 'ageBins')
      age_bins (list): Age bin boundaries in days [0, bin1, bin2, ..., max]
      age_bin_weights (list): Optional weights for age bins (must sum to 1)
      base_seed (int): Base random seed

    Returns:
      pd.DataFrame: Original dataframe with new age bin sampling columns added
    """
    # Validate inputs
    validate_subset_inputs(infection_df, n_samples_year, replicates, "subset_by_age")

    # Start with copy of original dataframe
    result_df = infection_df.copy()

    # Set up age bins
    if age_bins is None:
        days_per_year = 365.25
        age_bins = [0, int(days_per_year * 5), int(days_per_year * 15), int(result_df['age_day'].max() + 1)]
        age_bin_labels = ['0-5yrs', '5-15yrs', '15+yrs']
    else:
        age_bin_labels = [f"age_{age_bins[i]//365}-{age_bins[i+1]//365}yrs" for i in range(len(age_bins)-1)]

    # Create age bin column
    result_df['age_bin'] = pd.cut(result_df['age_day'], bins=age_bins, labels=age_bin_labels, include_lowest=True)
    print(f"Created age bins: {result_df['age_bin'].value_counts().to_dict()}")

    # Calculate samples per age bin
    if age_bin_weights is not None:
        if len(age_bin_weights) != len(age_bin_labels):
            raise ValueError(f"Age bin weights length doesn't match number of age bins")
        if abs(sum(age_bin_weights) - 1.0) > 0.01:
            raise ValueError("Age bin weights must sum to 1.0")

        samples_per_age_bin = {label: int(n_samples_year * weight) 
                              for label, weight in zip(age_bin_labels, age_bin_weights)}
    else:
        # Equal distribution across age bins
        samples_per_age_bin = {label: n_samples_year // len(age_bin_labels) 
                              for label in age_bin_labels}
        # Distribute remainder
        remainder = n_samples_year % len(age_bin_labels)
        for i, label in enumerate(age_bin_labels[:remainder]):
            samples_per_age_bin[label] += 1

    # Process each replicate
    for rep in range(replicates):
        # Create column name for this replicate
        col_name = f"{scheme}_{n_samples_year}_rep{rep+1}"

        # Initialize column with NA
        result_df[col_name] = pd.NA

        # Track sampled indices for this replicate
        sampled_indices = []

        # Sample from each age bin
        for age_bin_label in age_bin_labels:
            age_df = result_df[result_df['age_bin'] == age_bin_label]

            # Get target samples for this age bin
            target_samples = samples_per_age_bin[age_bin_label]
            if age_df.empty or target_samples == 0:
                continue

            available = len(age_df)
            take = min(target_samples, available)

            if take > 0:
                seed = create_robust_random_seed(base_seed, rep, extra=hash(age_bin_label))
                sampled = age_df.sample(take, random_state=seed)

                # Add sampled indices to list
                sampled_indices.extend(sampled.index.tolist())

        # Mark sampled infections with 1
        result_df.loc[sampled_indices, col_name] = 1

    return result_df

subset_by_seasons(infection_df, n_samples_year, replicates, scheme='seasonal', season='full', base_seed=418)

Subset samples based on seasonal groupings.

Source code in fpg_observational_model/unified_sampling.py
def subset_by_seasons(infection_df, n_samples_year, replicates,
                      scheme="seasonal", season='full', base_seed=418):
    """
    Subset samples based on seasonal groupings.
    """
    result_df = infection_df.copy()

    # Apply seasonal groupings
    if season == "full":
        result_df['season_group'] = result_df.apply(assign_season_group, axis=1)
    elif season == "peak":
        result_df['season_group'] = result_df.apply(assign_peak_group, axis=1)
    else:
        raise ValueError("Only season_groups \"full\" or \"peak\" allowed as options.")

    validate_subset_inputs(result_df, n_samples_year, replicates, "subset_by_seasons")   

    # Process each replicate
    for rep in range(replicates):
        # Create column name for this replicate
        sample_col_name = f"{scheme}{season.capitalize()}_{n_samples_year}_rep{rep+1}"

        # Initialize column with NA
        result_df[sample_col_name] = pd.NA

        # Track sampled data for this replicate
        sampled_data = {}  # {index: season_label}

        season_groups = result_df['season_group'].unique()
        for season_name in season_groups:
            season_df = result_df[result_df['season_group'] == season_name]

            # Each season gets exactly n_samples_year samples
            available_samples = len(season_df)
            target_samples = handle_insufficient_samples(
                available_samples, n_samples_year, 
                f"Season {season_name}"
            )
            if season_df.empty or target_samples == 0:
                continue

            take = min(target_samples, available_samples)
            if take > 0:
                seed = create_robust_random_seed(base_seed, rep, extra=hash(season_name))
                sampled = season_df.sample(take, random_state=seed)

                # Store the season label for each sampled index
                for idx in sampled.index:
                    sampled_data[idx] = season_name

        # Set the season labels for sampled infections
        for idx, season_label in sampled_data.items():
            result_df.loc[idx, sample_col_name] = season_label

    return result_df

subset_randomly(infection_df, n_samples_year, replicates, scheme='random', monogenomic_proportion=False, equal_monthly=False, population_proportions=False, base_seed=418)

Randomly subset samples and add columns to the original dataframe.

Parameters:

Name Type Description Default
infection_df DataFrame

Input DataFrame

required
n_samples_year int

Number of samples per year per population

required
replicates int

Number of replicates

required
scheme str

Base name for columns (e.g., 'random')

'random'
equal_monthly bool

Whether to sample equally across months

False
population_proportions list

Optional population fractions (must sum to 1)

False
base_seed int

Base random seed

418

Returns:

Type Description

pd.DataFrame: Original dataframe with new sampling columns added

Source code in fpg_observational_model/unified_sampling.py
def subset_randomly(infection_df, 
                   n_samples_year, 
                   replicates,
                   scheme='random',
                   monogenomic_proportion=False,
                   equal_monthly=False, 
                   population_proportions=False, 
                   base_seed=418):
    """
    Randomly subset samples and add columns to the original dataframe.

    Parameters:
      infection_df (pd.DataFrame): Input DataFrame
      n_samples_year (int): Number of samples per year per population
      replicates (int): Number of replicates
      scheme (str): Base name for columns (e.g., 'random')
      equal_monthly (bool): Whether to sample equally across months
      population_proportions (list): Optional population fractions (must sum to 1)
      base_seed (int): Base random seed

    Returns:
      pd.DataFrame: Original dataframe with new sampling columns added
    """
    # Validate inputs
    validate_subset_inputs(infection_df, n_samples_year, replicates, "subset_randomly")

    # Start with copy of original dataframe
    result_df = infection_df.copy()

    # Get sampling numbers per population
    n_per_pop = n_samples_by_pop(infection_df, n_samples_year, population_proportions)
    populations = sorted(infection_df['population'].unique())

    # Process each replicate
    for rep in range(replicates):
        # Create column name for this replicate
        sample_col_name = f"{scheme}_{n_samples_year}_rep{rep+1}"
        # Initialize column with NA
        result_df[sample_col_name] = pd.NA
        # Track sampled indices for this replicate
        sampled_indices = []

        for pop_id, pop_n_samp in zip(populations, n_per_pop):
            pop_df = infection_df[infection_df['population'] == pop_id]

            for year in sorted(infection_df['group_year'].unique()):
                year_df = pop_df[pop_df['group_year'] == year]

                if year_df.empty:
                    continue

                available_samples = len(year_df)
                target_samples = handle_insufficient_samples(
                    available_samples, pop_n_samp, 
                    f"Pop {pop_id}, Year {year}"
                )

                if target_samples == 0:
                    continue

                seed = create_robust_random_seed(base_seed, rep, year, pop_id)

                if monogenomic_proportion:
                    if not isinstance(monogenomic_proportion, float):
                        monogenomic_proportion = float(monogenomic_proportion)

                    if 0 < monogenomic_proportion < 1:
                        # print("Subsetting samples based on a targeted ",  monogenomic_proportion,  "monogenomic proportion",)
                        polygenomic_proportion = 1-monogenomic_proportion

                        mono_take = min(len(year_df[year_df['effective_coi'] == 1]), int(target_samples * monogenomic_proportion))
                        poly_take = min(len(year_df[year_df['effective_coi'] > 1]), int(target_samples * polygenomic_proportion))

                        year_sampled = pd.concat(
                            [year_df[year_df['effective_coi'] == 1].sample(mono_take, random_state=seed),
                            year_df[year_df['effective_coi'] > 1].sample(poly_take, random_state=seed)]
                        )
                        sampled_indices.extend(year_sampled.index.tolist())

                elif equal_monthly and 'continuous_month' in year_df.columns:
                    # Equal monthly sampling
                    unique_months = sorted(year_df['continuous_month'].unique())
                    samples_per_month = n_samples_year // 12

                    for month in unique_months:
                        month_df = year_df[year_df['continuous_month'] == month]
                        if month_df.empty:
                            continue

                        month_available = len(month_df)
                        month_take = min(month_available, samples_per_month)
                        if month_take > 0:
                            month_seed = seed + month  # Simple seed modification
                            month_sampled = month_df.sample(month_take, random_state=month_seed)
                            sampled_indices.extend(month_sampled.index.tolist())
                else:
                    # Simple random sampling
                    year_sampled = year_df.sample(target_samples, random_state=seed)
                    sampled_indices.extend(year_sampled.index.tolist())

        # Mark sampled infections with 1
        result_df.loc[sampled_indices, sample_col_name] = 1

    return result_df

update_matrix_indices(sample_df)

Update the sample_df DataFrame to include a new column 'original_nid' that stores the original maps the 'recursive_nid' values to a global order based on their unique values.

Source code in fpg_observational_model/run_observational_model.py
def update_matrix_indices(sample_df):
    """ 
    Update the sample_df DataFrame to include a new column 'original_nid' that stores the original maps the 'recursive_nid' values to a global order based on their unique values.
    """
    df = sample_df.copy()  # Fixed: use sample_df instead of undefined df

    # Parse the recursive_nid column
    df['original_nid'] = df['recursive_nid'].copy()
    df['original_nid'] = df['original_nid'].apply(
    lambda x: ast.literal_eval(x) if isinstance(x, str) else x)

    # Step 1: Get all unique recursive_nid values across all rows
    all_nids = []
    for nid_list in df['original_nid']:
        all_nids.extend(nid_list)

    # Get unique values and sort them
    unique_nids = sorted(set(all_nids))

    # Step 2: Create a mapping from nid to its global order
    nid_to_order = {nid: i for i, nid in enumerate(unique_nids)}

    # Step 3: Apply the mapping to create the order column
    def map_to_global_order(nid_list):
        return [nid_to_order[nid] for nid in nid_list]

    df['recursive_nid'] = df['original_nid'].apply(map_to_global_order)

    return df