Skip to content

openavmkit.utilities.stats

calc_chds

calc_chds(df_in, field_cluster, field_value)

Calculate the Coefficient of Horizontal Dispersion (CHD) for each cluster in a DataFrame.

CHD is the same statistic as COD, the Coefficient of Dispersion, but calculated for horizontal equity clusters and used to measure horizontal dispersion, on the theory that similar properties in similar locations should have similar valuations. The use of the name "CHD" is chosen to avoid confusion because assessors strongly associate "COD" with sales ratio studies.

This function computes the CHD for each unique cluster in the input DataFrame based on the values in the specified field.

Parameters:

Name Type Description Default
df_in DataFrame

Input DataFrame

required
field_cluster str

Name of the column representing cluster identifiers.

required
field_value str

Name of the column containing the values for COD calculation

required

Returns:

Type Description
A Series of COD values for each row, aligned with df_in
Source code in openavmkit/utilities/stats.py
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
def calc_chds(df_in: pd.DataFrame, field_cluster: str, field_value: str):
    """Calculate the Coefficient of Horizontal Dispersion (CHD) for each cluster in a
    DataFrame.

    CHD is the same statistic as COD, the Coefficient of Dispersion, but
    calculated for horizontal equity clusters and used to measure horizontal dispersion, on
    the theory that similar properties in similar locations should have similar valuations.
    The use of the name "CHD" is chosen to avoid confusion because assessors strongly
    associate "COD" with sales ratio studies.

    This function computes the CHD for each unique cluster in the input DataFrame based on
    the values in the specified field.

    Parameters
    ----------
    df_in : pd.DataFrame
        Input DataFrame
    field_cluster : str
        Name of the column representing cluster identifiers.
    field_value : str
        Name of the column containing the values for COD calculation

    Returns
    -------
    A Series of COD values for each row, aligned with df_in
    """
    # Create a dataframe matching the index of df_in with only the cluster id.
    df = df_in[[field_cluster]].copy()
    df["chd"] = 0.0

    clusters = df[field_cluster].unique()

    for cluster in clusters:
        df_cluster = df[df[field_cluster].eq(cluster)]
        # exclude negative and null values:
        df_cluster = df_cluster[
            ~pd.isna(df_cluster[field_value]) & df_cluster[field_value].gt(0)
        ]

        chd = calc_cod(df_cluster[field_value].values)
        df.loc[df[field_cluster].eq(cluster), "chd"] = chd

    return df["chd"]

calc_cod

calc_cod(values)

Calculate the Coefficient of Dispersion (COD) for an array of values.

COD is defined as the average absolute deviation from the median, divided by the median, multiplied by 100. Special cases are handled if the median is zero.

Parameters:

Name Type Description Default
values ndarray

Array of numeric values.

required

Returns:

Type Description
float

The COD percentage.

Source code in openavmkit/utilities/stats.py
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
def calc_cod(values: np.ndarray) -> float:
    """
    Calculate the Coefficient of Dispersion (COD) for an array of values.

    COD is defined as the average absolute deviation from the median, divided by the
    median, multiplied by 100. Special cases are handled if the median is zero.

    Parameters
    ----------
    values : numpy.ndarray
        Array of numeric values.

    Returns
    -------
    float
        The COD percentage.
    """
    if len(values) == 0:
        return float("nan")

    median_value = np.median(values)
    abs_delta_values = np.abs(values - median_value)
    avg_abs_deviation = np.sum(abs_delta_values) / len(values)
    if median_value == 0:
        # if every value is zero, the COD is zero:
        if np.all(values == 0):
            return 0.0
        else:
            # if the median is zero but not all values are zero, return infinity
            return float("inf")
    cod = avg_abs_deviation / median_value
    cod *= 100
    return cod

calc_cod_bootstrap

calc_cod_bootstrap(values, confidence_interval=0.95, iterations=10000, seed=777)

Calculate COD using bootstrapping.

This function bootstraps the input values (resampling with replacement) to generate a distribution of CODs, then returns the median COD along with the lower and upper bounds of the confidence interval.

Parameters:

Name Type Description Default
values ndarray

Array of numeric values.

required
confidence_interval float

The desired confidence level. Defaults to 0.95.

0.95
iterations int

Number of bootstrap iterations. Defaults to 10000.

10000
seed int

Random seed for reproducibility. Defaults to 777.

777

Returns:

Type Description
tuple[float, float, float]

A tuple containing the median COD, lower bound, and upper bound of the confidence interval.

Source code in openavmkit/utilities/stats.py
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
def calc_cod_bootstrap(
    values: np.ndarray,
    confidence_interval: float = 0.95,
    iterations: int = 10000,
    seed: int = 777
) -> tuple[float, float, float]:
    """
    Calculate COD using bootstrapping.

    This function bootstraps the input values (resampling with replacement) to
    generate a distribution of CODs, then returns the median COD along with the
    lower and upper bounds of the confidence interval.

    Parameters
    ----------
    values : numpy.ndarray
        Array of numeric values.
    confidence_interval : float, optional
        The desired confidence level. Defaults to 0.95.
    iterations : int, optional
        Number of bootstrap iterations. Defaults to 10000.
    seed : int, optional
        Random seed for reproducibility. Defaults to 777.

    Returns
    -------
    tuple[float, float, float]
        A tuple containing the median COD, lower bound, and upper bound of the confidence interval.
    """

    n = len(values)
    if n == 0:
        return float("nan"), float("nan"), float("nan")
    np.random.seed(seed)

    # Replace negative values with zero:
    values = np.where(values < 0, 0, values)

    median = np.median(values)
    samples = np.random.choice(values, size=(iterations, n), replace=True)
    abs_delta_values = np.abs(samples - median)
    bootstrap_cods = np.mean(abs_delta_values, axis=1) / median * 100
    alpha = (1.0 - confidence_interval) / 2
    lower_bound, upper_bound = np.quantile(bootstrap_cods, [alpha, 1.0 - alpha])
    median_cod = np.median(bootstrap_cods)
    return median_cod, lower_bound, upper_bound

calc_correlations

calc_correlations(X, threshold=0.1, do_plots=False)

Calculate correlations and iteratively drop variables with low combined scores.

This function computes the correlation matrix of X, then calculates a combined score for each variable based on its correlation strength with the target variable and its average correlation with other variables. Variables whose scores fall below the specified threshold are removed.

Parameters:

Name Type Description Default
X DataFrame

Input DataFrame containing the variables to evaluate.

required
threshold float

Minimum acceptable combined score for variables. Variables with a score below this value will be dropped. Defaults to 0.1.

0.1
do_plots bool

If True, plot the initial and final correlation heatmaps. Defaults to False.

False

Returns:

Type Description
dict

Dictionary with two keys:

  • "initial": pandas.Series of combined scores from the first iteration.
  • "final": pandas.Series of combined scores after dropping low-scoring variables.
Source code in openavmkit/utilities/stats.py
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
def calc_correlations(
    X: pd.DataFrame,
    threshold: float = 0.1,
    do_plots: bool = False
) -> dict:
    """
    Calculate correlations and iteratively drop variables with low combined scores.

    This function computes the correlation matrix of `X`, then calculates a combined score
    for each variable based on its correlation strength with the target variable and its
    average correlation with other variables. Variables whose scores fall below the
    specified `threshold` are removed.

    Parameters
    ----------
    X : pandas.DataFrame
        Input DataFrame containing the variables to evaluate.
    threshold : float, optional
        Minimum acceptable combined score for variables. Variables with a score below this
        value will be dropped. Defaults to 0.1.
    do_plots : bool, optional
        If True, plot the initial and final correlation heatmaps. Defaults to False.

    Returns
    -------
    dict
        Dictionary with two keys:

        - "initial": pandas.Series of combined scores from the first iteration.
        - "final": pandas.Series of combined scores after dropping low-scoring variables.
    """
    X = X.copy()
    first_run = None

    # Normalize all numerical values prior to computation:
    for col in X.columns:
        if X[col].dtype != "object":
            X[col] = (X[col] - X[col].mean()) / X[col].std()

    while True:
        # Compute the correlation matrix
        naive_corr = X.corr()

        # Identify variables with the highest correlation with the target variable (the first column)
        target_corr = naive_corr.iloc[:, 0].abs().sort_values(ascending=False)

        # Sort naive_corr by the correlation of the target variable
        naive_corr = naive_corr.loc[target_corr.index, target_corr.index]

        naive_corr_sans_target = naive_corr.iloc[1:, 1:]

        # Get the (absolute) strength of the correlation with the target variable
        strength = naive_corr.iloc[:, 0].abs()

        # drop the target variable from strength:
        strength = strength.iloc[1:]

        # Calculate the clarity of the correlation: how correlated it is with all other variables *except* the target variable
        clarity = 1 - (
            (naive_corr_sans_target.abs().sum(axis=1) - 1.0)
            / (len(naive_corr_sans_target.columns) - 1)
        )

        # Combine the strength and clarity into a single score -- bigger is better, and we want high strength and high clarity
        score = strength * clarity * clarity

        # Identify the variable with the lowest score
        min_score_idx = score.idxmin()

        if pd.isna(min_score_idx):
            min_score = score[0]
        else:
            min_score = score[min_score_idx]

        data = {"corr_strength": strength, "corr_clarity": clarity, "corr_score": score}
        df_score = pd.DataFrame(data)
        df_score = df_score.reset_index().rename(columns={"index": "variable"})

        if first_run is None:
            first_run = df_score
            first_run = first_run.sort_values("corr_score", ascending=False)

        if min_score < threshold:
            X = X.drop(min_score_idx, axis=1)
        else:
            break

    # sort by score:
    df_score = df_score.sort_values("corr_score", ascending=False)

    if do_plots:
        plot_correlation(naive_corr, "Correlation of Variables (initial)")

    # recompute the correlation matrix
    final_corr = X.corr()

    if do_plots:
        plot_correlation(final_corr, "Correlation of Variables (final)")

    return {"initial": first_run, "final": df_score}

calc_cross_validation_score

calc_cross_validation_score(X, y)

Calculate cross-validation score using negative mean squared error.

This function fits a LinearRegression model using 5-fold cross validation and returns the mean cross-validated MSE (positive value).

Parameters:

Name Type Description Default
X array - like or DataFrame

Input features for modeling.

required
y array - like or Series

Target variable.

required

Returns:

Type Description
float

The mean cross-validated mean squared error.

Source code in openavmkit/utilities/stats.py
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
def calc_cross_validation_score(
    X: pd.DataFrame | np.ndarray,
    y: pd.Series | np.ndarray
) -> float:
    """
    Calculate cross-validation score using negative mean squared error.

    This function fits a LinearRegression model using 5-fold cross validation and
    returns the mean cross-validated MSE (positive value).

    Parameters
    ----------
    X : array-like or pandas.DataFrame
        Input features for modeling.
    y : array-like or pandas.Series
        Target variable.

    Returns
    -------
    float
        The mean cross-validated mean squared error.
    """
    model = LinearRegression()
    # Use negative MSE and negate it to return positive MSE
    try:
        scores = cross_val_score(model, X, y, cv=5, scoring="neg_mean_squared_error")
    except ValueError:
        return float("nan")

    return -scores.mean()  # Convert negative MSE to positive

calc_elastic_net_regularization

calc_elastic_net_regularization(X, y, threshold_fraction=0.05)

Calculate Elastic Net regularization coefficients while iteratively dropping variables with low coefficients.

This function standardizes X, fits an Elastic Net model, and iteratively removes variables whose absolute coefficients fall below a fraction of the maximum coefficient.

Parameters:

Name Type Description Default
X DataFrame

Input features DataFrame.

required
y Series

Target variable series.

required
threshold_fraction float

Fraction of the maximum coefficient below which variables are dropped. Defaults to 0.05.

0.05

Returns:

Type Description
dict

Dictionary with two keys:

  • "initial": pandas.Series of coefficients from the first Elastic Net fit.
  • "final": pandas.Series of coefficients after dropping low-magnitude variables.
Source code in openavmkit/utilities/stats.py
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
def calc_elastic_net_regularization(
    X: pd.DataFrame,
    y: pd.Series,
    threshold_fraction: float = 0.05
) -> dict:
    """
    Calculate Elastic Net regularization coefficients while iteratively dropping variables with low coefficients.

    This function standardizes `X`, fits an Elastic Net model, and iteratively removes
    variables whose absolute coefficients fall below a fraction of the maximum coefficient.

    Parameters
    ----------
    X : pd.DataFrame
        Input features DataFrame.
    y : pd.Series
        Target variable series.
    threshold_fraction : float, optional
        Fraction of the maximum coefficient below which variables are dropped. Defaults to 0.05.

    Returns
    -------
    dict
        Dictionary with two keys:

        - "initial": pandas.Series of coefficients from the first Elastic Net fit.
        - "final": pandas.Series of coefficients after dropping low-magnitude variables.
    """

    X = X.copy()

    # Standardize the features
    scaler = StandardScaler()
    X_scaled = scaler.fit_transform(X)

    first_run = None

    while True:

        # Apply Elastic Net regularization
        elastic_net = ElasticNet(alpha=1.0, l1_ratio=0.5)
        elastic_net.fit(X_scaled, y)

        # Calculate the absolute values of the coefficients
        abs_coefficients = np.abs(elastic_net.coef_)

        # Determine the threshold as a fraction of the largest coefficient
        max_coef = np.max(abs_coefficients)
        threshold = max_coef * threshold_fraction

        coefficients = elastic_net.coef_
        # align coefficients into a dataframe with variable names:
        coefficients = pd.DataFrame(
            {
                "variable": X.columns,
                "enr_coef": coefficients,
                "enr_coef_sign": np.sign(coefficients),
            }
        )
        coefficients = coefficients.sort_values(
            "enr_coef", ascending=False, key=lambda x: x.abs()
        )

        # identify worst variable:
        min_coef_idx = np.argmin(abs_coefficients)
        min_coef = abs_coefficients[min_coef_idx]

        if first_run is None:
            first_run = coefficients

        if min_coef < threshold:
            # remove the worst variable from X_scaled:
            X_scaled = np.delete(X_scaled, min_coef_idx, axis=1)
            # remove corresponding column from X:
            X = X.drop(X.columns[min_coef_idx], axis=1)
        else:
            break

    return {"initial": first_run, "final": coefficients}

calc_mse

calc_mse(prediction, ground_truth)

Calculate the Mean Squared Error (MSE) between predictions and ground truth.

Parameters:

Name Type Description Default
prediction ndarray

Array of predicted values.

required
ground_truth ndarray

Array of true values.

required

Returns:

Type Description
float

The MSE value.

Source code in openavmkit/utilities/stats.py
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
def calc_mse(
    prediction: np.ndarray,
    ground_truth: np.ndarray
) -> float:
    """
    Calculate the Mean Squared Error (MSE) between predictions and ground truth.

    Parameters
    ----------
    prediction : numpy.ndarray
        Array of predicted values.
    ground_truth : numpy.ndarray
        Array of true values.

    Returns
    -------
    float
        The MSE value.
    """
    return float(np.mean((prediction - ground_truth) ** 2))

calc_mse_r2_adj_r2

calc_mse_r2_adj_r2(predictions, ground_truth, num_vars)

Calculate the Mean Squared Error (MSE), r-squared, and adjusted r-squared

Parameters:

Name Type Description Default
predictions ndarray

Array of predicted values.

required
ground_truth ndarray

Array of true values.

required
num_vars int

Number of independent variables used to produce the predictions

required

Returns:

Type Description
tuple[float, float, float]

A tuple containing three values:

  • The MSE value
  • The r-squared value
  • The adjusted r-squared value
Source code in openavmkit/utilities/stats.py
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
def calc_mse_r2_adj_r2(
    predictions: np.ndarray, ground_truth: np.ndarray, num_vars: int
):
    """
    Calculate the Mean Squared Error (MSE), r-squared, and adjusted r-squared

    Parameters
    ----------
    predictions : numpy.ndarray
        Array of predicted values.
    ground_truth : numpy.ndarray
        Array of true values.
    num_vars : int
        Number of independent variables used to produce the predictions

    Returns
    -------
    tuple[float, float, float]

        A tuple containing three values:

        - The MSE value
        - The r-squared value
        - The adjusted r-squared value
    """

    mse = np.mean((ground_truth - predictions) ** 2)
    ss_res = np.sum((ground_truth - predictions) ** 2)
    ss_tot = np.sum((ground_truth - np.mean(ground_truth)) ** 2)

    r2 = 1 - (ss_res / ss_tot) if ss_tot != 0 else float("inf")

    n = len(predictions)
    k = num_vars
    divisor = n - k - 1
    if divisor == 0:
        adj_r2 = float("inf")
    else:
        adj_r2 = 1 - ((1 - r2) * (n - 1) / divisor)
    return mse, r2, adj_r2

calc_p_values_recursive_drop

calc_p_values_recursive_drop(X, y, sig_threshold=0.05)

Recursively drop variables with p-values above a specified significance threshold.

Fits an OLS model on X and iteratively removes the variable with the highest p-value until all remaining variables have p-values below the threshold.

Parameters:

Name Type Description Default
X DataFrame

Input features DataFrame.

required
y Series

Target variable series.

required
sig_threshold float

Significance threshold for p-values. Variables with p-values above this threshold will be dropped. Defaults to 0.05.

0.05

Returns:

Type Description
dict

Dictionary with two keys:

  • "initial": pd.Series of p-values from the first OLS fit.
  • "final": pd.Series of p-values after recursively dropping high-p-value variables.

Raises:

Type Description
ValueError

If the OLS regression fails or no variables remain.

Source code in openavmkit/utilities/stats.py
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
def calc_p_values_recursive_drop(
    X: pd.DataFrame,
    y: pd.Series,
    sig_threshold: float = 0.05
) -> dict:
    """
    Recursively drop variables with p-values above a specified significance threshold.

    Fits an OLS model on `X` and iteratively removes the variable with the highest
    p-value until all remaining variables have p-values below the threshold.

    Parameters
    ----------
    X : pd.DataFrame
        Input features DataFrame.
    y : pd.Series
        Target variable series.
    sig_threshold : float, optional
        Significance threshold for p-values. Variables with p-values above this
        threshold will be dropped. Defaults to 0.05.

    Returns
    -------
    dict
        Dictionary with two keys:

        - "initial": pd.Series of p-values from the first OLS fit.
        - "final": pd.Series of p-values after recursively dropping high-p-value variables.

    Raises
    ------
    ValueError
        If the OLS regression fails or no variables remain.
    """

    X = X.copy()
    X = sm.add_constant(X)
    X = X.astype(np.float64)
    model = sm.OLS(y, X).fit()
    first_run = None
    while True:
        max_p_value = model.pvalues.max()
        p_values = model.pvalues
        if first_run is None:
            first_run = p_values
        if max_p_value > sig_threshold:
            var_to_drop = p_values.idxmax()
            if var_to_drop == "const":
                # don't pick const, pick the next variable to drop:
                try:
                    var_to_drop = p_values.iloc[1:].idxmax()
                except ValueError as e:
                    break
            if pd.isna(var_to_drop):
                break
            X = X.drop(var_to_drop, axis=1)
            try:
                new_model = sm.OLS(y, X).fit()
            except ValueError as e:
                print(f"Error fitting model after dropping {var_to_drop}: {e}")
                break
            model = new_model
        else:
            break

    # align p_values into a dataframe with variable names:
    p_values = (
        pd.DataFrame({"p_value": model.pvalues})
        .sort_values("p_value", ascending=True)
        .reset_index()
        .rename(columns={"index": "variable"})
    )

    # do the same for "first_run":
    first_run = (
        pd.DataFrame({"p_value": first_run})
        .sort_values("p_value", ascending=True)
        .reset_index()
        .rename(columns={"index": "variable"})
    )

    return {
        "initial": first_run,
        "final": p_values,
    }

calc_prb

calc_prb(predictions, ground_truth, confidence_interval=0.95)

Calculate the Price Related Bias (PRB) metric using a regression-based approach.

This function fits an OLS model on the transformed ratios of predictions to ground truth, then returns the PRB value along with its lower and upper confidence bounds.

Parameters:

Name Type Description Default
predictions ndarray

Array of predicted values.

required
ground_truth ndarray

Array of ground truth values.

required
confidence_interval float

Desired confidence interval. Defaults to 0.95.

0.95

Returns:

Type Description
tuple[float, float, float]

A tuple containing:

  • PRB value
  • Lower bound of the confidence interval
  • Upper bound of the confidence interval

Raises:

Type Description
ValueError

If predictions and ground_truth have different lengths.

Source code in openavmkit/utilities/stats.py
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
def calc_prb(
    predictions: np.ndarray,
    ground_truth: np.ndarray,
    confidence_interval: float = 0.95
) -> tuple[float, float, float]:
    """
    Calculate the Price Related Bias (PRB) metric using a regression-based approach.

    This function fits an OLS model on the transformed ratios of predictions to ground
    truth, then returns the PRB value along with its lower and upper confidence bounds.

    Parameters
    ----------
    predictions : np.ndarray
        Array of predicted values.
    ground_truth : np.ndarray
        Array of ground truth values.
    confidence_interval : float, optional
        Desired confidence interval. Defaults to 0.95.

    Returns
    -------
    tuple[float, float, float]
        A tuple containing:

        - PRB value
        - Lower bound of the confidence interval
        - Upper bound of the confidence interval

    Raises
    ------
    ValueError
        If `predictions` and `ground_truth` have different lengths.
    """
    # 1. Basic shape checks --------------------------------------------------
    predictions = np.asarray(predictions, dtype=float)
    ground_truth = np.asarray(ground_truth, dtype=float)
    if predictions.shape != ground_truth.shape:
        raise ValueError("predictions and ground_truth must have the same length/shape")

    # 2. Clean rows that cannot be used --------------------------------------
    mask = (
        ~np.isnan(predictions)
        & ~np.isnan(ground_truth)
        & (predictions > 0)          # cannot take log2 of non‑positive numbers
        & (ground_truth > 0)
    )
    n_ok = int(mask.sum())
    if n_ok < 3:                     # OLS needs at least 3 rows
        warnings.warn(
            f"Only {n_ok} valid observation(s) after cleaning – PRB not computed."
        )
        return np.nan, np.nan, np.nan

    preds = predictions[mask]
    truth = ground_truth[mask]

    # 3. Build transformed variables -----------------------------------------
    ratios = preds / truth
    median_ratio = np.median(ratios)

    left = (ratios - median_ratio) / median_ratio
    right = np.log2(preds / median_ratio + truth)
    right = sm.add_constant(right)   # adds intercept term

    # 4. Fit model + CI -------------------------------------------------------
    with np.errstate(all="ignore"):  # silence harmless internal numpy warnings
        model = sm.OLS(left, right).fit()

    # Guard against degenerate fit (rare but better to be explicit)
    if model.df_resid <= 0 or not np.isfinite(model.params[0]):
        return np.nan, np.nan, np.nan

    prb = float(model.params[0])
    prb_lower, prb_upper = (
        model.conf_int(alpha=1.0 - confidence_interval)[0].tolist()
    )

    return prb, prb_lower, prb_upper

calc_prd

calc_prd(predictions, ground_truth)

Calculate the Price Related Differential (PRD).

PRD is computed as the ratio of the mean ratio to the weighted mean ratio of predictions to ground truth.

Parameters:

Name Type Description Default
predictions ndarray

Array of predicted values.

required
ground_truth ndarray

Array of ground truth values.

required

Returns:

Type Description
float

The PRD value.

Source code in openavmkit/utilities/stats.py
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
def calc_prd(predictions: np.ndarray, ground_truth: np.ndarray) -> float:
    """
    Calculate the Price Related Differential (PRD).

    PRD is computed as the ratio of the mean ratio to the weighted mean ratio of
    predictions to ground truth.

    Parameters
    ----------
    predictions : numpy.ndarray
        Array of predicted values.
    ground_truth : numpy.ndarray
        Array of ground truth values.

    Returns
    -------
    float
        The PRD value.
    """
    ratios = div_series_z_safe(predictions, ground_truth)
    if len(ratios) == 0:
        return float("nan")
    mean_ratio = np.mean(ratios)
    sum_ground_truth = np.sum(ground_truth)
    if sum_ground_truth == 0:
        return float("inf")
    weighted_mean_ratio = np.sum(predictions) / sum_ground_truth
    if weighted_mean_ratio == 0:
        return float("inf")
    prd = mean_ratio / weighted_mean_ratio
    return prd

calc_prd_bootstrap

calc_prd_bootstrap(predictions, ground_truth, confidence_interval=0.95, iterations=10000, seed=777)

Calculate PRD with bootstrapping.

This function bootstraps the prediction-to-ground_truth ratios to produce a distribution of PRD values and returns the lower bound, median, and upper bound of the confidence interval.

Parameters:

Name Type Description Default
predictions ndarray

Array of predicted values.

required
ground_truth ndarray

Array of ground truth values.

required
confidence_interval float

The desired confidence level. Defaults to 0.95.

0.95
iterations int

Number of bootstrap iterations. Defaults to 10000.

10000
seed int

Random seed for reproducibility. Defaults to 777.

777

Returns:

Type Description
tuple[float, float, float]

A tuple containing the lower bound, median PRD, and upper bound of the confidence interval.

Source code in openavmkit/utilities/stats.py
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
def calc_prd_bootstrap(
    predictions: np.ndarray,
    ground_truth: np.ndarray,
    confidence_interval: float = 0.95,
    iterations: int = 10000,
    seed: int = 777,
) -> tuple[float, float, float]:
    """
    Calculate PRD with bootstrapping.

    This function bootstraps the prediction-to-ground_truth ratios to produce a
    distribution of PRD values and returns the lower bound, median, and upper bound of the
    confidence interval.

    Parameters
    ----------
    predictions : np.ndarray
        Array of predicted values.
    ground_truth : np.ndarray
        Array of ground truth values.
    confidence_interval : float, optional
        The desired confidence level. Defaults to 0.95.
    iterations : int, optional
        Number of bootstrap iterations. Defaults to 10000.
    seed : int, optional
        Random seed for reproducibility. Defaults to 777.

    Returns
    -------
    tuple[float, float, float]
        A tuple containing the lower bound, median PRD, and upper bound of the confidence interval.
    """

    np.random.seed(seed)
    n = len(predictions)
    ratios = predictions / ground_truth
    samples = np.random.choice(ratios, size=(iterations, n), replace=True)
    mean_ratios = np.mean(samples, axis=1)
    weighted_mean_ratios = np.sum(predictions) / np.sum(ground_truth)
    prds = mean_ratios / weighted_mean_ratios
    alpha = (1.0 - confidence_interval) / 2
    lower_bound, upper_bound = np.quantile(prds, [alpha, 1.0 - alpha])
    median_prd = np.median(prds)
    return lower_bound, median_prd, upper_bound

calc_r2

calc_r2(df, variables, y)

Calculate R² and adjusted R² values for each variable.

For each variable in the provided list, an OLS model is fit and the R², adjusted R², and the sign of the coefficient are recorded.

Parameters:

Name Type Description Default
df DataFrame

DataFrame containing the variables.

required
variables list[str]

List of variable names to evaluate.

required
y Series

Target variable series.

required

Returns:

Type Description
DataFrame

DataFrame with columns for variable, R², adjusted R², and coefficient sign.

Source code in openavmkit/utilities/stats.py
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
def calc_r2(
    df: pd.DataFrame,
    variables: list[str],
    y: pd.Series
) -> pd.DataFrame:
    """
    Calculate R² and adjusted R² values for each variable.

    For each variable in the provided list, an OLS model is fit and the R², adjusted R²,
    and the sign of the coefficient are recorded.

    Parameters
    ----------
    df : pandas.DataFrame
        DataFrame containing the variables.
    variables : list[str]
        List of variable names to evaluate.
    y : pandas.Series
        Target variable series.

    Returns
    -------
    pandas.DataFrame
        DataFrame with columns for variable, R², adjusted R², and coefficient sign.
    """
    results = {"variable": [], "r2": [], "adj_r2": [], "coef_sign": []}
    for var in variables:
        data = pd.concat([df[var], y], axis=1).dropna()
        if len(data) < 3 or data[var].nunique() < 2:
            results["variable"].append(var)
            results["r2"].append(float("nan"))
            results["adj_r2"].append(float("nan"))
            results["coef_sign"].append(float("nan"))
            continue                          # skip ill-posed models

        X = sm.add_constant(data[var].astype(float))
        try:
            model = sm.OLS(y, X).fit()
        except MissingDataError as e:
            print(f"Error fitting model for variable {var}: {e}")
            for var in variables:
                # check for missing/null/nan values:
                if df[var].isna().any():
                    n = df[var].isna().sum()
                    print(f'Variable "{var}" has {n} missing values.')
            raise e

        results["variable"].append(var)
        results["r2"].append(model.rsquared)
        results["adj_r2"].append(model.rsquared_adj)
        results["coef_sign"].append(1 if model.params[var] >= 0 else -1)
    df_results = pd.DataFrame(data=results)
    return df_results

calc_t_values

calc_t_values(X, y)

Calculate t-values for an OLS model.

Fits an ordinary least squares regression of y on X and returns the t-values of the estimated coefficients.

Parameters:

Name Type Description Default
X DataFrame

Input features DataFrame (should include a constant term column).

required
y Series

Target variable series.

required

Returns:

Type Description
Series

Series of t-values corresponding to each coefficient in X.

Source code in openavmkit/utilities/stats.py
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
def calc_t_values(X: pd.DataFrame, y: pd.Series) -> pd.Series:
    """
    Calculate t-values for an OLS model.

    Fits an ordinary least squares regression of `y` on `X` and returns the t-values
    of the estimated coefficients.

    Parameters
    ----------
    X : pandas.DataFrame
        Input features DataFrame (should include a constant term column).
    y : pandas.Series
        Target variable series.

    Returns
    -------
    pandas.Series
        Series of t-values corresponding to each coefficient in `X`.
    """
    linear_model = sm.OLS(y, X)
    fitted_model = linear_model.fit()
    return fitted_model.tvalues

calc_t_values_recursive_drop

calc_t_values_recursive_drop(X, y, threshold=2)

Recursively drop variables with t-values below a given threshold.

Fits an OLS model on X and iteratively removes the variable with the smallest absolute t-value until all remaining variables have |t-value| above the threshold.

Parameters:

Name Type Description Default
X DataFrame

Input features DataFrame.

required
y Series

Target variable series.

required
threshold float

Minimum acceptable absolute t-value. Variables with |t-value| below this threshold will be dropped. Defaults to 2.

2

Returns:

Type Description
dict

Dictionary with two keys:

  • "initial": pandas.Series of t-values and their signs from the first OLS fit.
  • "final": pandas.Series of t-values and their signs after recursive dropping.
Source code in openavmkit/utilities/stats.py
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
def calc_t_values_recursive_drop(
    X: pd.DataFrame,
    y: pd.Series,
    threshold: float = 2
) -> dict:
    """
    Recursively drop variables with t-values below a given threshold.

    Fits an OLS model on `X` and iteratively removes the variable with the smallest
    absolute t-value until all remaining variables have |t-value| above the threshold.

    Parameters
    ----------
    X : pandas.DataFrame
        Input features DataFrame.
    y : pandas.Series
        Target variable series.
    threshold : float, optional
        Minimum acceptable absolute t-value. Variables with |t-value| below this threshold
        will be dropped. Defaults to 2.

    Returns
    -------
    dict
        Dictionary with two keys:

        - "initial": pandas.Series of t-values and their signs from the first OLS fit.
        - "final": pandas.Series of t-values and their signs after recursive dropping.
    """

    X = X.copy()
    X = sm.add_constant(X)
    X = X.astype(np.float64)

    first_run = None
    i = 0
    while True:
        i += 1
        try:
            t_values = calc_t_values(X, y)
        except ValueError as e:
            t_values = pd.Series([float("nan")] * X.shape[1], index=X.columns)
        if first_run is None:
            first_run = t_values

        if t_values.isna().all():
            min_t_var = float("nan")
        else:
            min_t_var = t_values.abs().idxmin()

        if pd.isna(min_t_var):
            min_t_var = 0

        if len(t_values) > 0:
            min_t_val = float("nan")
        else:
            min_t_val = t_values[min_t_var]

        if min_t_val < threshold:
            X = X.drop(min_t_var, axis=1)
        else:
            break

    # align t_values into a dataframe with variable names:
    t_values = (
        pd.DataFrame({"t_value": t_values, "t_value_sign": np.sign(t_values)})
        .sort_values("t_value", ascending=False, key=lambda x: x.abs())
        .reset_index()
        .rename(columns={"index": "variable"})
    )

    # do the same for "first_run":
    first_run = (
        pd.DataFrame({"t_value": first_run, "t_value_sign": np.sign(first_run)})
        .sort_values("t_value", ascending=False, key=lambda x: x.abs())
        .reset_index()
        .rename(columns={"index": "variable"})
    )

    return {"initial": first_run, "final": t_values}

calc_vif

calc_vif(X)

Calculate the Variance Inflation Factor (VIF) for each variable in a DataFrame.

Parameters:

Name Type Description Default
X DataFrame

Input features DataFrame.

required

Returns:

Type Description
DataFrame

DataFrame with columns:

  • "variable": Name of each feature in X.
  • "VIF": Variance Inflation Factor value for that feature.
Source code in openavmkit/utilities/stats.py
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
def calc_vif(X: pd.DataFrame) -> pd.DataFrame:
    """
    Calculate the Variance Inflation Factor (VIF) for each variable in a DataFrame.

    Parameters
    ----------
    X : pandas.DataFrame
        Input features DataFrame.

    Returns
    -------
    pandas.DataFrame
        DataFrame with columns:

        - "variable": Name of each feature in `X`.
        - "VIF": Variance Inflation Factor value for that feature.
    """
    vif_data = pd.DataFrame()
    vif_data["variable"] = X.columns

    if len(X.values) < 5:
        warnings.warn("Can't calculate VIF for less than 5 samples")
        vif_data["vif"] = [float("nan")] * len(X.columns)
        return vif_data

    # Calculate VIF for each column
    vif_data["vif"] = [
        variance_inflation_factor(X.values, i) for i in range(X.shape[1])
    ]

    return vif_data

calc_vif_recursive_drop

calc_vif_recursive_drop(X, threshold=10.0, settings=None)

Recursively drop variables with a Variance Inflation Factor (VIF) exceeding the threshold.

Parameters:

Name Type Description Default
X DataFrame

Input features DataFrame.

required
threshold float

Maximum acceptable VIF. Variables with VIF above this threshold will be removed. Defaults to 10.0.

10.0
settings dict

Settings dictionary containing field classifications, if needed for VIF computation. Defaults to None.

None

Returns:

Type Description
dict

Dictionary with two keys:

  • "initial": pandas.DataFrame of VIF values before dropping variables.
  • "final": pandas.DataFrame of VIF values after recursively dropping high-VIF variables.

Raises:

Type Description
ValueError

If no columns remain for VIF calculation.

Source code in openavmkit/utilities/stats.py
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
def calc_vif_recursive_drop(
    X: pd.DataFrame,
    threshold: float = 10.0,
    settings: dict = None
) -> dict:
    """
    Recursively drop variables with a Variance Inflation Factor (VIF) exceeding the threshold.

    Parameters
    ----------
    X : pandas.DataFrame
        Input features DataFrame.
    threshold : float, optional
        Maximum acceptable VIF. Variables with VIF above this threshold will be removed.
        Defaults to 10.0.
    settings : dict, optional
        Settings dictionary containing field classifications, if needed for VIF computation.
        Defaults to None.

    Returns
    -------
    dict
        Dictionary with two keys:

        - "initial": pandas.DataFrame of VIF values before dropping variables.
        - "final": pandas.DataFrame of VIF values after recursively dropping high-VIF variables.

    Raises
    ------
    ValueError
        If no columns remain for VIF calculation.
    """
    X = X.copy()
    X = X.astype(np.float64)

    # Get boolean and categorical variables from settings if provided
    bool_fields = get_fields_boolean(settings, X)
    cat_fields = get_fields_categorical(settings, X, include_boolean=False)
    exclude_vars = bool_fields + cat_fields

    # Remove boolean and categorical variables
    X = X.drop(
        columns=[col for col in X.columns if col in exclude_vars], errors="ignore"
    )

    # Drop constant columns (VIF cannot be calculated for constant columns)
    X = X.loc[:, X.nunique() > 1]  # Keep only columns with more than one unique value

    # If no columns are left after removing constant columns or dropping NaN values, raise an error
    if X.shape[1] == 0:
        raise ValueError(
            "All columns are constant or have missing values; VIF cannot be computed."
        )
    first_run = None
    while True:
        vif_data = calc_vif(X)
        if first_run is None:
            first_run = vif_data
        if vif_data["vif"].max() > threshold:
            max_vif_idx = vif_data["vif"].idxmax()
            X = X.drop(X.columns[max_vif_idx], axis=1)
        else:
            break
    return {"initial": first_run, "final": vif_data}

plot_correlation

plot_correlation(corr, title='Correlation of Variables')

Plot a heatmap of a correlation matrix.

Parameters:

Name Type Description Default
corr DataFrame

Correlation matrix as a DataFrame.

required
title str

Title of the plot. Defaults to "Correlation of Variables".

'Correlation of Variables'
Source code in openavmkit/utilities/stats.py
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
def plot_correlation(corr: pd.DataFrame, title: str = "Correlation of Variables") -> None:
    """
    Plot a heatmap of a correlation matrix.

    Parameters
    ----------
    corr : pandas.DataFrame
        Correlation matrix as a DataFrame.
    title : str, optional
        Title of the plot. Defaults to "Correlation of Variables".
    """

    cmap = sns.diverging_palette(220, 10, as_cmap=True)
    cmap = cmap.reversed()

    plt.figure(figsize=(10, 8))

    # Create the heatmap with the correct labels
    sns.heatmap(
        corr,
        annot=True,
        fmt=".1f",
        cbar=True,
        cmap=cmap,
        vmax=1.0,
        vmin=-1.0,
        xticklabels=corr.columns.tolist(),  # explicitly set the xticklabels
        yticklabels=corr.index.tolist(),  # explicitly set the yticklabels
        annot_kws={"size": 8},  # adjust font size if needed
    )

    plt.title(title)
    plt.xticks(rotation=45, ha="right")  # rotate x labels if needed
    plt.yticks(rotation=0)  # keep y labels horizontal
    plt.tight_layout(pad=2)
    plt.show()

quick_median_chd_pl

quick_median_chd_pl(df, field_value, field_cluster)

Calculate the median CHD for groups in a Polars DataFrame.

This function filters out missing values for the given value field, groups the data by the specified cluster field, computes COD for each group, and returns the median COD value.

Parameters:

Name Type Description Default
df DataFrame

Input Polars DataFrame.

required
field_value str

Name of the field containing values for COD calculation.

required
field_cluster str

Name of the field to group by for computing COD.

required

Returns:

Type Description
float

The median COD value across all groups.

Source code in openavmkit/utilities/stats.py
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
def quick_median_chd_pl(
    df: pl.DataFrame, field_value: str, field_cluster: str
) -> float:
    """
    Calculate the median CHD for groups in a Polars DataFrame.

    This function filters out missing values for the given value field, groups the data
    by the specified cluster field, computes COD for each group, and returns the median
    COD value.

    Parameters
    ----------
    df : pl.DataFrame
        Input Polars DataFrame.
    field_value : str
        Name of the field containing values for COD calculation.
    field_cluster : str
        Name of the field to group by for computing COD.

    Returns
    -------
    float
        The median COD value across all groups.
    """
    # Filter out rows with missing values for field_value.
    df = df.filter(~pd.isna(df[field_value]))
    df = df.filter(df[field_value].gt(0))

    chds = df.group_by(field_cluster).agg(pl.col(field_value).alias("values"))

    # Apply the calc_cod function to each group (the list of values)
    chd_values = np.array([calc_cod(group.to_numpy()) for group in chds["values"]])

    # Calculate the median of the CHD values
    median_chd = float(np.median(chd_values))
    return median_chd

trim_outliers

trim_outliers(values, lower_quantile=0.25, upper_quantile=0.75)

Trim outliers from an array of values based on quantile thresholds.

Parameters:

Name Type Description Default
values ndarray

Input array of numeric values.

required
lower_quantile float

Lower quantile bound. Values below this quantile will be removed. Defaults to 0.25.

0.25
upper_quantile float

Upper quantile bound. Values above this quantile will be removed. Defaults to 0.75.

0.75

Returns:

Type Description
ndarray

Array with values outside the specified quantile bounds removed.

Source code in openavmkit/utilities/stats.py
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
def trim_outliers(
    values: np.ndarray,
    lower_quantile: float = 0.25,
    upper_quantile: float = 0.75
) -> np.ndarray:
    """
    Trim outliers from an array of values based on quantile thresholds.

    Parameters
    ----------
    values : np.ndarray
        Input array of numeric values.
    lower_quantile : float, optional
        Lower quantile bound. Values below this quantile will be removed. Defaults to 0.25.
    upper_quantile : float, optional
        Upper quantile bound. Values above this quantile will be removed. Defaults to 0.75.

    Returns
    -------
    np.ndarray
        Array with values outside the specified quantile bounds removed.
    """
    if len(values) == 0:
        return values
    lower_bound = np.quantile(values, lower_quantile)
    upper_bound = np.quantile(values, upper_quantile)
    return values[(values >= lower_bound) & (values <= upper_bound)]

trim_outliers_mask

trim_outliers_mask(values, lower_quantile=0.25, upper_quantile=0.75)

Generate a boolean mask for values within specified quantile bounds.

Parameters:

Name Type Description Default
values ndarray

Input array of numeric values.

required
lower_quantile float

Lower quantile bound. Values below this quantile are masked out. Defaults to 0.25.

0.25
upper_quantile float

Upper quantile bound. Values above this quantile are masked out. Defaults to 0.75.

0.75

Returns:

Type Description
ndarray

Boolean array where True indicates values within the quantile bounds.

Source code in openavmkit/utilities/stats.py
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
def trim_outliers_mask(
    values: np.ndarray,
    lower_quantile: float = 0.25,
    upper_quantile: float = 0.75
) -> np.ndarray:
    """
    Generate a boolean mask for values within specified quantile bounds.

    Parameters
    ----------
    values : np.ndarray
        Input array of numeric values.
    lower_quantile : float, optional
        Lower quantile bound. Values below this quantile are masked out. Defaults to 0.25.
    upper_quantile : float, optional
        Upper quantile bound. Values above this quantile are masked out. Defaults to 0.75.

    Returns
    -------
    np.ndarray
        Boolean array where `True` indicates values within the quantile bounds.
    """
    if len(values) == 0:
        return values
    lower_bound = np.quantile(values, lower_quantile)
    upper_bound = np.quantile(values, upper_quantile)
    return (values >= lower_bound) & (values <= upper_bound)