Skip to content

openavmkit.data

SalesUniversePair dataclass

SalesUniversePair(sales, universe)

A container for the sales and universe DataFrames, many functions operate on this data structure. This data structure is necessary because the sales and universe DataFrames are often used together and need to be passed around together. The sales represent transactions and any known data at the time of the transaction, while the universe represents the current state of all parcels. The sales dataframe specifically allows for duplicate primary parcel transaction keys, since an individual parcel may have sold multiple times. The universe dataframe forbids duplicate primary parcel keys.

Attributes:

Name Type Description
sales DataFrame

DataFrame containing sales data.

universe DataFrame

DataFrame containing universe (parcel) data.

copy

copy()

Create a copy of the SalesUniversePair object.

Returns:

Type Description
SalesUniversePair

A new SalesUniversePair object with copied DataFrames.

Source code in openavmkit/data.py
107
108
109
110
111
112
113
114
115
def copy(self):
    """Create a copy of the SalesUniversePair object.

    Returns
    -------
    SalesUniversePair
        A new SalesUniversePair object with copied DataFrames.
    """
    return SalesUniversePair(self.sales.copy(), self.universe.copy())

set

set(key, value)

Set the sales or universe DataFrame.

Attributes:

Name Type Description
key str

Either "sales" or "universe".

value DataFrame

The new DataFrame to set for the specified key.

Raises:

Type Description
ValueError

If an invalid key is provided

Source code in openavmkit/data.py
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
def set(self, key: str, value: pd.DataFrame):
    """Set the sales or universe DataFrame.

    Attributes
    ----------
    key : str
        Either "sales" or "universe".
    value : pd.DataFrame
        The new DataFrame to set for the specified key.

    Raises
    ------
    ValueError
        If an invalid key is provided
    """
    if key == "sales":
        self.sales = value
    elif key == "universe":
        self.universe = value
    else:
        raise ValueError(f"Invalid key: {key}")

update_sales

update_sales(new_sales, allow_remove_rows)

Update the sales DataFrame with new information as an overlay without redundancy.

This function lets you push updates to "sales" while keeping it as an "overlay" that doesn't contain any redundant information.

  • First we note what fields were in sales last time.
  • Then we note what sales are in universe but were not in sales.
  • Finally, we determine the new fields generated in new_sales that are not in the previous sales or in the universe.
  • A modified version of df_sales is created with only two changes:
  • Reduced to the correct selection of keys.
  • Addition of the newly generated fields.

Parameters:

Name Type Description Default
new_sales DataFrame

New sales DataFrame with updates.

required
allow_remove_rows bool

If True, allows the update to remove rows from sales. If False, preserves all original rows.

required
Source code in openavmkit/data.py
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
def update_sales(self, new_sales: pd.DataFrame, allow_remove_rows: bool):
    """
    Update the sales DataFrame with new information as an overlay without redundancy.

    This function lets you push updates to "sales" while keeping it as an "overlay" that
    doesn't contain any redundant information.

    - First we note what fields were in sales last time.
    - Then we note what sales are in universe but were not in sales.
    - Finally, we determine the new fields generated in new_sales that are not in the
      previous sales or in the universe.
    - A modified version of df_sales is created with only two changes:
      - Reduced to the correct selection of keys.
      - Addition of the newly generated fields.

    Parameters
    ----------
    new_sales : pd.DataFrame
        New sales DataFrame with updates.
    allow_remove_rows : bool
        If True, allows the update to remove rows from sales. If False, preserves all
        original rows.
    """

    old_fields = self.sales.columns.values
    univ_fields = [
        field for field in self.universe.columns.values if field not in old_fields
    ]
    new_fields = [
        field
        for field in new_sales.columns.values
        if field not in old_fields and field not in univ_fields
    ]

    old_sales = self.sales.copy()
    return_keys = new_sales["key_sale"].values
    if not allow_remove_rows and len(return_keys) > len(old_sales):
        raise ValueError(
            "The new sales DataFrame contains more keys than the old sales DataFrame. update_sales() may only be used to shrink the dataframe or keep it the same size. Use set() if you intend to replace the sales dataframe."
        )

    if allow_remove_rows:
        old_sales = old_sales[old_sales["key_sale"].isin(return_keys)].reset_index(
            drop=True
        )
    reconciled = combine_dfs(
        old_sales,
        new_sales[["key_sale"] + new_fields].copy().reset_index(drop=True),
        index="key_sale",
    )
    self.sales = reconciled

enrich_df_streets

enrich_df_streets(df_in, settings, spacing=1.0, max_ray_length=25.0, network_buffer=500.0, verbose=False)

Enrich a GeoDataFrame with street network data.

This function enriches the input GeoDataFrame with street network data by calculating frontage, depth, distance to street, and many other related metrics, for every road vs. every parcel in the GeoDataFrame, using OpenStreetMap data.

WARNING: This function can be VERY computationally and memory intensive for large datasets and may take a long time to run.

We definitely need to work on its performance or make it easier to split into smaller chunks.

Parameters:

Name Type Description Default
df_in GeoDataFrame

Input GeoDataFrame containing parcels.

required
settings dict

Settings dictionary containing configuration for the enrichment.

required
spacing float

Spacing in meters for ray casting to calculate distances to streets. Default is 1.0.

1.0
max_ray_length float

Maximum length of rays to shoot for distance calculations, in meters. Default is 25.0.

25.0
network_buffer float

Buffer around the street network to consider for distance calculations, in meters. Default is 500.0.

500.0
verbose bool

If True, prints progress information. Default is False.

False

Returns:

Type Description
GeoDataFrame

Enriched GeoDataFrame with additional columns for street-related metrics.

Source code in openavmkit/data.py
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
def enrich_df_streets(
    df_in: gpd.GeoDataFrame,
    settings: dict,
    spacing: float = 1.0,  # in meters
    max_ray_length: float = 25.0,  # meters to shoot rays
    network_buffer: float = 500.0,  # buffer for street network
    verbose: bool = False,
) -> gpd.GeoDataFrame:
    """Enrich a GeoDataFrame with street network data.

    This function enriches the input GeoDataFrame with street network data by calculating
    frontage, depth, distance to street, and many other related metrics, for every road vs.
    every parcel in the GeoDataFrame, using OpenStreetMap data.

    WARNING: This function can be VERY computationally and memory intensive for large datasets
    and may take a long time to run.

    We definitely need to work on its performance or make it easier to split into smaller chunks.

    Parameters
    ----------
    df_in : gpd.GeoDataFrame
        Input GeoDataFrame containing parcels.
    settings : dict
        Settings dictionary containing configuration for the enrichment.
    spacing : float, optional
        Spacing in meters for ray casting to calculate distances to streets. Default is 1.0.
    max_ray_length : float, optional
        Maximum length of rays to shoot for distance calculations, in meters. Default is 25.0.
    network_buffer : float, optional
        Buffer around the street network to consider for distance calculations, in meters.
        Default is 500.0.
    verbose : bool, optional
        If True, prints progress information. Default is False.

    Returns
    -------
    gpd.GeoDataFrame
        Enriched GeoDataFrame with additional columns for street-related metrics.
    """
    df_out = _enrich_df_streets(
        df_in, settings, spacing, max_ray_length, network_buffer, verbose
    )

    # add somers unit land size normalization using frontage & depth
    df_out["land_area_somers_ft"] = get_size_in_somers_units_ft(
        df_out["frontage_ft_1"], df_out["depth_ft_1"]
    )

    return df_out

enrich_sup_spatial_lag

enrich_sup_spatial_lag(sup, settings, verbose=False)

Enrich the sales and universe DataFrames with spatial lag features.

This function calculates "spatial lag", that is, the spatially-weighted average, of the sale price and other fields, based on nearest neighbors.

For sales, the spatial lag is calculated based on the training set of sales. For non-sale characteristics, the spatial lag is calculated based on the universe parcels.

Parameters:

Name Type Description Default
sup SalesUniversePair

SalesUniversePair containing sales and universe DataFrames.

required
settings dict

Settings dictionary.

required
verbose bool

If True, prints progress information.

False

Returns:

Type Description
SalesUniversePair

Enriched SalesUniversePair with spatial lag features.

Source code in openavmkit/data.py
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
def enrich_sup_spatial_lag(
    sup: SalesUniversePair, settings: dict, verbose: bool = False
) -> SalesUniversePair:
    """Enrich the sales and universe DataFrames with spatial lag features.

    This function calculates "spatial lag", that is, the spatially-weighted
    average, of the sale price and other fields, based on nearest neighbors.

    For sales, the spatial lag is calculated based on the training set of sales.
    For non-sale characteristics, the spatial lag is calculated based on the
    universe parcels.

    Parameters
    ----------
    sup : SalesUniversePair
        SalesUniversePair containing sales and universe DataFrames.
    settings : dict
        Settings dictionary.
    verbose : bool, optional
        If True, prints progress information.

    Returns
    -------
    SalesUniversePair
        Enriched SalesUniversePair with spatial lag features.
    """

    BANDWIDTH_MILES = 0.5  # distance at which confidence → 0
    METRES_PER_MILE = 1609.344
    D_SCALE = BANDWIDTH_MILES * METRES_PER_MILE

    df_sales = sup.sales.copy()
    df_universe = sup.universe.copy()

    s_sl = (
        settings.get("data", {})
        .get("process", {})
        .get("enrich", {})
        .get("universe", {})
        .get("spatial_lag", {})
    )
    ex_model_groups = s_sl.get("exclude_model_groups", [])

    df_hydrated = get_hydrated_sales_from_sup(sup)
    train_keys, test_keys = get_train_test_keys(df_hydrated, settings)

    for mg in ex_model_groups:
        df_hydrated = df_hydrated[df_hydrated["model_group"].ne(mg)]

    sale_field = get_sale_field(settings)
    sale_field_vacant = f"{sale_field}_vacant"

    per_land_field = f"{sale_field}_land_sqft"
    per_impr_field = f"{sale_field}_impr_sqft"

    if per_land_field not in df_hydrated:
        df_hydrated[per_land_field] = div_series_z_safe(
            df_hydrated[sale_field], df_hydrated["land_area_sqft"]
        )
    if per_impr_field not in df_hydrated:
        df_hydrated[per_impr_field] = div_series_z_safe(
            df_hydrated[sale_field], df_hydrated["bldg_area_finished_sqft"]
        )
    if sale_field_vacant not in df_hydrated:
        df_hydrated[sale_field_vacant] = None
        df_hydrated[sale_field_vacant] = df_hydrated[sale_field].where(
            df_hydrated["bldg_area_finished_sqft"].le(0)
            & df_hydrated["land_area_sqft"].gt(0)
        )

    value_fields = [sale_field, sale_field_vacant, per_land_field, per_impr_field]

    for value_field in value_fields:

        if value_field == sale_field:
            df_sub = df_hydrated.loc[df_hydrated["valid_sale"].eq(True)].copy()
        elif (value_field == sale_field_vacant) or (value_field == per_land_field):
            df_sub = df_hydrated.loc[
                df_hydrated["valid_sale"].eq(True)
                & df_hydrated["vacant_sale"].eq(True)
                & df_hydrated["land_area_sqft"].gt(0)
            ].copy()
        elif value_field == per_impr_field:
            df_sub = df_hydrated.loc[
                df_hydrated["valid_sale"].eq(True)
                & df_hydrated["bldg_area_finished_sqft"].gt(0)
            ].copy()
        else:
            raise ValueError(f"Unknown value field: {value_field}")

        if df_sub.empty:
            df_universe[f"spatial_lag_{value_field}"] = 0
            df_sales[f"spatial_lag_{value_field}"] = 0
            continue

        df_sub = df_sub[~pd.isna(df_sub["latitude"]) & ~pd.isna(df_sub["longitude"])]

        # Choose the number of nearest neighbors to use
        k = 5  # adjust this number as needed

        df_sub_train = df_sub.loc[df_sub["key_sale"].isin(train_keys)].copy()

        # Get the coordinates for the universe parcels
        crs_equal_distance = get_crs(df_universe, "equal_distance")
        df_proj = df_universe.to_crs(crs_equal_distance)

        # Use the projected coordinates for the universe parcels
        universe_coords = np.vstack(
            [df_proj.geometry.centroid.x.values, df_proj.geometry.centroid.y.values]
        ).T

        # Get the coordinates for the sales training parcels
        df_sub_train_proj = df_sub_train.to_crs(crs_equal_distance)

        sales_coords_train = np.vstack(
            [
                df_sub_train_proj.centroid.geometry.x.values,
                df_sub_train_proj.centroid.geometry.y.values,
            ]
        ).T

        # Build a cKDTree from df_sales coordinates -- but ONLY from the training set
        sales_tree = cKDTree(sales_coords_train)

        # count any NA coordinates in the universe
        n_na_coords = universe_coords.shape[0] - np.count_nonzero(
            pd.isna(universe_coords).any(axis=1)
        )

        # Query the tree: for each parcel in df_universe, find the k nearest sales
        # distances: shape (n_universe, k); indices: corresponding indices in df_sales
        distances, indices = sales_tree.query(universe_coords, k=k)

        # Ensure that distances and indices are 2D arrays (if k==1, reshape them)
        if k == 1:
            distances = distances[:, None]
            indices = indices[:, None]

        # For each universe parcel, compute sigma as the mean distance to its k neighbors.
        sigma = distances.mean(axis=1, keepdims=True)

        # Handle zeros in sigma
        sigma[sigma == 0] = np.finfo(float).eps  # Avoid division by zero

        # Compute Gaussian kernel weights for all neighbors
        weights = np.exp(-(distances**2) / (2 * sigma**2))

        # Normalize the weights so that they sum to 1 for each parcel
        weights_norm = weights / weights.sum(axis=1, keepdims=True)

        # Get the sales prices corresponding to the neighbor indices
        sales_prices = df_sub_train[value_field].values
        neighbor_prices = sales_prices[indices]  # shape (n_universe, k)

        # Compute the weighted average (spatial lag) for each parcel in the universe
        spatial_lag = (np.asarray(weights_norm) * np.asarray(neighbor_prices)).sum(
            axis=1
        )

        # Add the spatial lag as a new column
        df_universe[f"spatial_lag_{value_field}"] = spatial_lag

        # Fill NaN values in the spatial lag with the median value of the original field
        median_value = df_sub_train[value_field].median()
        df_universe[f"spatial_lag_{value_field}"] = df_universe[
            f"spatial_lag_{value_field}"
        ].fillna(median_value)

        # Add the new field to sales:
        df_sales = df_sales.merge(
            df_universe[["key", f"spatial_lag_{value_field}"]], on="key", how="left"
        )

        # ------------------------------------------------
        # Calculate confidence:

        # Raw inverse-square information mass
        distances_safe = distances.copy()
        distances_safe[distances_safe == 0] = np.finfo(float).eps  # protect ÷ 0

        inv_sq = 1.0 / distances_safe**2  # shape (n_parcel, 5)
        info_mass = inv_sq.sum(axis=1)  # Σ 1/d²

        # Fixed-bandwidth confidence
        conf = 1.0 - (k / D_SCALE**2) / info_mass
        spatial_lag_confidence = np.clip(conf, 0.0, 1.0)  # keep in [0, 1]

        # store
        df_universe[f"spatial_lag_{value_field}_confidence"] = spatial_lag_confidence
        df_sales = df_sales.merge(
            df_universe[["key", f"spatial_lag_{value_field}_confidence"]],
            on="key",
            how="left",
        )
        # ------------------------------------------------

    df_test = df_sales.loc[df_sales["key_sale"].isin(test_keys)].copy()
    df_universe = _enrich_universe_spatial_lag(df_universe, df_test)

    sup.set("sales", df_sales)
    sup.set("universe", df_universe)
    return sup

enrich_time

enrich_time(df, time_formats, settings)

Enrich the DataFrame by converting specified time fields to datetime and deriving additional fields.

For each key in time_formats, converts the column to datetime. Then, if a field with the prefix "sale" exists, enriches the DataFrame with additional time fields (e.g., "sale_year", "sale_month", "sale_age_days").

Parameters:

Name Type Description Default
df DataFrame

Input DataFrame.

required
time_formats dict

Dictionary mapping field names to datetime formats.

required
settings dict

Settings dictionary.

required

Returns:

Type Description
DataFrame

DataFrame with enriched time fields.

Source code in openavmkit/data.py
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
def enrich_time(df: pd.DataFrame, time_formats: dict, settings: dict) -> pd.DataFrame:
    """
    Enrich the DataFrame by converting specified time fields to datetime and deriving additional fields.

    For each key in time_formats, converts the column to datetime. Then, if a field with
    the prefix "sale" exists, enriches the DataFrame with additional time fields (e.g.,
    "sale_year", "sale_month", "sale_age_days").

    Parameters
    ----------
    df : pandas.DataFrame
        Input DataFrame.
    time_formats : dict
        Dictionary mapping field names to datetime formats.
    settings : dict
        Settings dictionary.

    Returns
    -------
    pandas.DataFrame
        DataFrame with enriched time fields.
    """

    for key in time_formats:
        time_format = time_formats[key]
        if key in df:
            df[key] = pd.to_datetime(df[key], format=time_format, errors="coerce")

    for prefix in ["sale"]:
        do_enrich = False
        for col in df.columns.values:
            if f"{prefix}_" in col:
                do_enrich = True
                break
        if do_enrich:
            df = _enrich_time_field(
                df, prefix, add_year_month=True, add_year_quarter=True
            )
            if prefix == "sale":
                df = _enrich_sale_age_days(df, settings)

    return df

get_dtypes_from_settings

get_dtypes_from_settings(settings)

Generate a dictionary mapping fields to their designated data types based on settings.

Parameters:

Name Type Description Default
settings dict

Settings dictionary.

required

Returns:

Type Description
dict

Dictionary of field names to data type strings.

Source code in openavmkit/data.py
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
def get_dtypes_from_settings(settings: dict) -> dict:
    """
    Generate a dictionary mapping fields to their designated data types based on settings.

    Parameters
    ----------
    settings : dict
        Settings dictionary.

    Returns
    -------
    dict
        Dictionary of field names to data type strings.
    """

    cats = get_fields_categorical(settings, include_boolean=False)
    bools = get_fields_boolean(settings)
    nums = get_fields_numeric(settings, include_boolean=False)
    dtypes = {}
    for c in cats:
        dtypes[c] = "string"
    for b in bools:
        dtypes[b] = "bool"
    for n in nums:
        dtypes[n] = "Float64"
    return dtypes

get_field_classifications

get_field_classifications(settings)

Retrieve a mapping of field names to their classifications (land, improvement or other) as well as their types (numeric, categorical, or boolean).

Parameters:

Name Type Description Default
settings dict

Settings dictionary.

required

Returns:

Type Description
dict

Dictionary mapping field names to type and class.

Source code in openavmkit/data.py
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
def get_field_classifications(settings: dict) -> dict:
    """
    Retrieve a mapping of field names to their classifications (land, improvement or other)
    as well as their types (numeric, categorical, or boolean).

    Parameters
    ----------
    settings : dict
        Settings dictionary.

    Returns
    -------
    dict
        Dictionary mapping field names to type and class.
    """

    field_map = {}
    for ftype in ["land", "impr", "other"]:
        nums = get_fields_numeric(
            settings, df=None, include_boolean=False, types=[ftype]
        )
        cats = get_fields_categorical(
            settings, df=None, include_boolean=False, types=[ftype]
        )
        bools = get_fields_boolean(settings, df=None, types=[ftype])
        for field in nums:
            field_map[field] = {"type": ftype, "class": "numeric"}
        for field in cats:
            field_map[field] = {"type": ftype, "class": "categorical"}
        for field in bools:
            field_map[field] = {"type": ftype, "class": "boolean"}
    return field_map

get_hydrated_sales_from_sup

get_hydrated_sales_from_sup(sup)

Merge the sales and universe DataFrames to "hydrate" the sales data.

The sales data represents transactions and any known data at the time of the transaction, while the universe data represents the current state of all parcels. When we merge the two sets, the sales data overrides any existing data in the universe data. This is useful for creating a "hydrated" sales DataFrame that contains all the information available at the time of the sale (it is assumed that any difference between the current state of the parcel and the state at the time of the sale is accounted for in the sales data).

If the merged DataFrame contains a "geometry" column and the original sales did not, the result is converted to a GeoDataFrame.

Parameters:

Name Type Description Default
sup SalesUniversePair

SalesUniversePair containing sales and universe DataFrames.

required

Returns:

Type Description
DataFrame or GeoDataFrame

The merged (hydrated) sales DataFrame.

Source code in openavmkit/data.py
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
def get_hydrated_sales_from_sup(sup: SalesUniversePair):
    """
    Merge the sales and universe DataFrames to "hydrate" the sales data.

    The sales data represents transactions and any known data at the time of the transaction,
    while the universe data represents the current state of all parcels. When we merge the
    two sets, the sales data overrides any existing data in the universe data. This is useful
    for creating a "hydrated" sales DataFrame that contains all the information available at
    the time of the sale (it is assumed that any difference between the current state of the
    parcel and the state at the time of the sale is accounted for in the sales data).

    If the merged DataFrame contains a "geometry" column and the original sales did not,
    the result is converted to a GeoDataFrame.

    Parameters
    ----------
    sup : SalesUniversePair
        SalesUniversePair containing sales and universe DataFrames.

    Returns
    -------
    pd.DataFrame or gpd.GeoDataFrame
        The merged (hydrated) sales DataFrame.
    """

    df_sales = sup["sales"]
    df_univ = sup["universe"].copy()
    df_univ = df_univ[df_univ["key"].isin(df_sales["key"].values)].reset_index(
        drop=True
    )
    df_merged = merge_and_stomp_dfs(df_sales, df_univ, df2_stomps=False)

    if "geometry" in df_merged and "geometry" not in df_sales:
        # convert df_merged to geodataframe:
        df_merged = gpd.GeoDataFrame(df_merged, geometry="geometry")

    return df_merged

get_important_field

get_important_field(settings, field_name, df=None)

Retrieve the important field name for a given field alias from settings.

Parameters:

Name Type Description Default
settings dict

Settings dictionary.

required
field_name str

Identifier for the field.

required
df DataFrame

Optional DataFrame to check field existence.

None

Returns:

Type Description
str or None

The mapped field name if found, else None.

Source code in openavmkit/data.py
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
def get_important_field(
    settings: dict, field_name: str, df: pd.DataFrame = None
) -> str | None:
    """
    Retrieve the important field name for a given field alias from settings.

    Parameters
    ----------
    settings : dict
        Settings dictionary.
    field_name : str
        Identifier for the field.
    df : pandas.DataFrame, optional
        Optional DataFrame to check field existence.

    Returns
    -------
    str or None
        The mapped field name if found, else None.
    """

    imp = settings.get("field_classification", {}).get("important", {})
    other_name = imp.get("fields", {}).get(field_name, None)
    if df is not None:
        if other_name is not None and other_name in df:
            return other_name
        else:
            return None
    return other_name

get_important_fields

get_important_fields(settings, df=None)

Retrieve important field names from settings.

Parameters:

Name Type Description Default
settings dict

Settings dictionary.

required
df DataFrame

Optional DataFrame to filter fields.

None

Returns:

Type Description
list[str]

List of important field names.

Source code in openavmkit/data.py
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
def get_important_fields(settings: dict, df: pd.DataFrame = None) -> list[str]:
    """
    Retrieve important field names from settings.

    Parameters
    ----------
    settings : dict
        Settings dictionary.
    df : pandas.DataFrame, optional
        Optional DataFrame to filter fields.

    Returns
    -------
    list[str]
        List of important field names.
    """

    imp = settings.get("field_classification", {}).get("important", {})
    fields = imp.get("fields", {})
    list_fields = []
    if df is not None:
        for field in fields:
            other_name = fields[field]
            if other_name in df:
                list_fields.append(other_name)
    return list_fields

get_locations

get_locations(settings, df=None)

Retrieve location fields from settings. These are all the fields that are considered locations.

Parameters:

Name Type Description Default
settings dict

Settings dictionary.

required
df DataFrame

Optional DataFrame to filter available locations.

None

Returns:

Type Description
list[str]

List of location field names.

Source code in openavmkit/data.py
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
def get_locations(settings: dict, df: pd.DataFrame = None) -> list[str]:
    """
    Retrieve location fields from settings. These are all the fields that are considered locations.

    Parameters
    ----------
    settings : dict
        Settings dictionary.
    df : pandas.DataFrame, optional
        Optional DataFrame to filter available locations.

    Returns
    -------
    list[str]
        List of location field names.
    """

    locations = (
        settings.get("field_classification", {})
        .get("important", {})
        .get("locations", [])
    )
    if df is not None:
        locations = [loc for loc in locations if loc in df]
    return locations

get_report_locations

get_report_locations(settings, df=None)

Retrieve report location fields from settings.

These are location fields that will be used in report breakdowns, such as for ratio studies.

Parameters:

Name Type Description Default
settings dict

Settings dictionary.

required
df DataFrame

Optional DataFrame to filter available locations.

None

Returns:

Type Description
list[str]

List of report location field names.

Source code in openavmkit/data.py
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
def get_report_locations(settings: dict, df: pd.DataFrame = None) -> list[str]:
    """
    Retrieve report location fields from settings.

    These are location fields that will be used in report breakdowns, such as for ratio studies.

    Parameters
    ----------
    settings : dict
        Settings dictionary.
    df : pandas.DataFrame, optional
        Optional DataFrame to filter available locations.

    Returns
    -------
    list[str]
        List of report location field names.
    """

    locations = (
        settings.get("field_classification", {})
        .get("important", {})
        .get("report_locations", [])
    )
    if df is not None:
        locations = [loc for loc in locations if loc in df]
    return locations

get_sale_field

get_sale_field(settings, df=None)

Determine the appropriate sale price field ("sale_price" or "sale_price_time_adj") based on time adjustment settings.

Parameters:

Name Type Description Default
settings dict

Settings dictionary.

required
df DataFrame

Optional DataFrame to check field existence.

None

Returns:

Type Description
str

Field name to be used for sale price.

Source code in openavmkit/data.py
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
def get_sale_field(settings: dict, df: pd.DataFrame = None) -> str:
    """
    Determine the appropriate sale price field ("sale_price" or "sale_price_time_adj")
    based on time adjustment settings.

    Parameters
    ----------
    settings : dict
        Settings dictionary.
    df : pandas.DataFrame, optional
        Optional DataFrame to check field existence.

    Returns
    -------
    str
        Field name to be used for sale price.
    """

    ta = settings.get("data", {}).get("process", {}).get("time_adjustment", {})
    use = ta.get("use", True)
    if use:
        sale_field = "sale_price_time_adj"
    else:
        sale_field = "sale_price"
    if df is not None:
        if sale_field == "sale_price_time_adj" and "sale_price_time_adj" in df:
            return "sale_price_time_adj"
    return sale_field

get_train_test_keys

get_train_test_keys(df_in, settings)

Get the training and testing keys for the sales DataFrame.

This function gets the train/test keys for each model group defined in the settings, combines them into a single mask for the sales DataFrame, and returns the keys for training and testing as numpy arrays.

Parameters:

Name Type Description Default
df_in DataFrame

Input DataFrame containing sales data.

required
settings dict

Settings dictionary

required

Returns:

Type Description
tuple

A tuple containing two numpy arrays: keys_train and keys_test. - keys_train: keys for training set - keys_test: keys for testing set

Source code in openavmkit/data.py
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
def get_train_test_keys(df_in: pd.DataFrame, settings: dict):
    """Get the training and testing keys for the sales DataFrame.

    This function gets the train/test keys for each model group defined in the settings,
    combines them into a single mask for the sales DataFrame, and returns the keys for
    training and testing as numpy arrays.

    Parameters
    ----------
    df_in : pd.DataFrame
        Input DataFrame containing sales data.
    settings : dict
        Settings dictionary

    Returns
    -------
    tuple
        A tuple containing two numpy arrays: keys_train and keys_test.
        - keys_train: keys for training set
        - keys_test: keys for testing set
    """

    model_group_ids = get_model_group_ids(settings, df_in)

    # an empty mask the same size as the input DataFrame
    mask_train = pd.Series(np.zeros(len(df_in), dtype=bool), index=df_in.index)
    mask_test = pd.Series(np.zeros(len(df_in), dtype=bool), index=df_in.index)

    for model_group in model_group_ids:
        # Read the split keys for the model group
        test_keys, train_keys = _read_split_keys(model_group)

        # Filter the DataFrame based on the keys
        mask_test |= df_in["key_sale"].isin(test_keys)
        mask_train |= df_in["key_sale"].isin(train_keys)

    keys_test = df_in.loc[mask_test, "key_sale"].values
    keys_train = df_in.loc[mask_train, "key_sale"].values

    return keys_train, keys_test

get_train_test_masks

get_train_test_masks(df_in, settings)

Get the training and testing masks for the sales DataFrame.

This function gets the train/test masks for each model group defined in the settings, combines them into a single mask for the sales DataFrame, and returns the masks as pandas Series

Parameters:

Name Type Description Default
df_in DataFrame

Input DataFrame containing sales data.

required
settings dict

Settings dictionary

required

Returns:

Type Description
tuple

A tuple containing two pandas Series: mask_train and mask_test. - mask_train: boolean mask for training set - mask_test: boolean mask for testing set

Source code in openavmkit/data.py
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
def get_train_test_masks(df_in: pd.DataFrame, settings: dict):
    """Get the training and testing masks for the sales DataFrame.

    This function gets the train/test masks for each model group defined in the settings,
    combines them into a single mask for the sales DataFrame, and returns the masks as pandas Series

    Parameters
    ----------
    df_in : pd.DataFrame
        Input DataFrame containing sales data.
    settings : dict
        Settings dictionary

    Returns
    -------
    tuple
        A tuple containing two pandas Series: mask_train and mask_test.
        - mask_train: boolean mask for training set
        - mask_test: boolean mask for testing set
    """
    model_group_ids = get_model_group_ids(settings, df_in)

    # an empty mask the same size as the input DataFrame
    mask_train = pd.Series(np.zeros(len(df_in), dtype=bool), index=df_in.index)
    mask_test = pd.Series(np.zeros(len(df_in), dtype=bool), index=df_in.index)

    for model_group in model_group_ids:
        # Read the split keys for the model group
        test_keys, train_keys = _read_split_keys(model_group)

        # Filter the DataFrame based on the keys
        mask_test |= df_in["key_sale"].isin(test_keys)
        mask_train |= df_in["key_sale"].isin(train_keys)

    return mask_train, mask_test

get_vacant

get_vacant(df_in, settings, invert=False)

Filter the DataFrame based on the 'is_vacant' column.

Parameters:

Name Type Description Default
df_in DataFrame

Input DataFrame.

required
settings dict

Settings dictionary.

required
invert bool

If True, return non-vacant rows.

False

Returns:

Type Description
DataFrame

DataFrame filtered by the is_vacant flag.

Raises:

Type Description
ValueError

If the is_vacant column is not boolean.

Source code in openavmkit/data.py
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
def get_vacant(
    df_in: pd.DataFrame, settings: dict, invert: bool = False
) -> pd.DataFrame:
    """
    Filter the DataFrame based on the 'is_vacant' column.

    Parameters
    ----------
    df_in : pandas.DataFrame
        Input DataFrame.
    settings : dict
        Settings dictionary.
    invert : bool, optional
        If True, return non-vacant rows.

    Returns
    -------
    pandas.DataFrame
        DataFrame filtered by the `is_vacant` flag.

    Raises
    ------
    ValueError
        If the `is_vacant` column is not boolean.
    """

    df = df_in.copy()
    is_vacant_dtype = df["is_vacant"].dtype
    if is_vacant_dtype != bool:
        raise ValueError(
            f"The 'is_vacant' column must be a boolean type (found: {is_vacant_dtype})"
        )
    idx_vacant = df["is_vacant"].eq(True)
    if invert:
        idx_vacant = ~idx_vacant
    df_vacant = df[idx_vacant].copy()
    return df_vacant

get_vacant_sales

get_vacant_sales(df_in, settings, invert=False)

Filter the sales DataFrame to return only vacant (unimproved) sales.

Parameters:

Name Type Description Default
df_in DataFrame

Input DataFrame.

required
settings dict

Settings dictionary.

required
invert bool

If True, return non-vacant (improved) sales.

False

Returns:

Type Description
DataFrame

DataFrame with an added is_vacant column.

Source code in openavmkit/data.py
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
def get_vacant_sales(
    df_in: pd.DataFrame, settings: dict, invert: bool = False
) -> pd.DataFrame:
    """
    Filter the sales DataFrame to return only vacant (unimproved) sales.

    Parameters
    ----------
    df_in : pandas.DataFrame
        Input DataFrame.
    settings : dict
        Settings dictionary.
    invert : bool, optional
        If True, return non-vacant (improved) sales.

    Returns
    -------
    pandas.DataFrame
        DataFrame with an added `is_vacant` column.
    """

    df = df_in.copy()
    df = _boolify_column_in_df(df, "vacant_sale", "na_false")
    idx_vacant_sale = df["vacant_sale"].eq(True)
    if invert:
        idx_vacant_sale = ~idx_vacant_sale
    df_vacant_sales = df[idx_vacant_sale].copy()
    return df_vacant_sales

process_data

process_data(dataframes, settings, verbose=False)

Process raw dataframes according to settings and return a SalesUniversePair.

Parameters:

Name Type Description Default
dataframes dict[str, DataFrame]

Dictionary mapping keys to DataFrames.

required
settings dict

Settings dictionary.

required
verbose bool

If True, prints progress information.

False

Returns:

Type Description
SalesUniversePair

A SalesUniversePair containing processed sales and universe data.

Raises:

Type Description
ValueError

If required merge instructions or columns are missing.

Source code in openavmkit/data.py
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
def process_data(
    dataframes: dict[str, pd.DataFrame], settings: dict, verbose: bool = False
) -> SalesUniversePair:
    """
    Process raw dataframes according to settings and return a SalesUniversePair.

    Parameters
    ----------
    dataframes : dict[str, pd.DataFrame]
        Dictionary mapping keys to DataFrames.
    settings : dict
        Settings dictionary.
    verbose : bool, optional
        If True, prints progress information.

    Returns
    -------
    SalesUniversePair
        A SalesUniversePair containing processed sales and universe data.

    Raises
    ------
    ValueError
        If required merge instructions or columns are missing.
    """

    s_data = settings.get("data", {})
    s_process = s_data.get("process", {})
    s_merge = s_process.get("merge", {})

    merge_univ: list | None = s_merge.get("universe", None)
    merge_sales: list | None = s_merge.get("sales", None)

    if merge_univ is None:
        raise ValueError(
            'No "universe" merge instructions found. data.process.merge must have exactly two keys: "universe", and "sales"'
        )
    if merge_sales is None:
        raise ValueError(
            'No "sales" merge instructions found. data.process.merge must have exactly two keys: "universe", and "sales"'
        )

    df_univ = _merge_dict_of_dfs(dataframes, merge_univ, settings, required_key="key")
    df_sales = _merge_dict_of_dfs(
        dataframes, merge_sales, settings, required_key="key_sale"
    )

    if "valid_sale" not in df_sales:
        raise ValueError("The 'valid_sale' column is required in the sales data.")
    if "vacant_sale" not in df_sales:
        raise ValueError("The 'vacant_sale' column is required in the sales data.")
    # Print number and percentage of valid sales
    valid_count = df_sales["valid_sale"].sum()
    total_count = len(df_sales)
    valid_percent = (valid_count / total_count * 100) if total_count > 0 else 0
    print(f"Valid sales: {valid_count} ({valid_percent:.1f}% of {total_count} total)")
    df_sales = df_sales[df_sales["valid_sale"].eq(True)].copy().reset_index(drop=True)

    sup: SalesUniversePair = SalesUniversePair(universe=df_univ, sales=df_sales)

    sup = _enrich_data(
        sup, s_process.get("enrich", {}), dataframes, settings, verbose=verbose
    )

    dupe_univ: dict | None = s_process.get("dupes", {}).get("universe", None)
    dupe_sales: dict | None = s_process.get("dupes", {}).get("sales", None)
    if dupe_univ:
        sup.set(
            "universe",
            _handle_duplicated_rows(sup.universe, dupe_univ, verbose=verbose),
        )
    if dupe_sales:
        sup.set(
            "sales", _handle_duplicated_rows(sup.sales, dupe_sales, verbose=verbose)
        )

    return sup