Skip to content

openavmkit.data

SalesUniversePair dataclass

SalesUniversePair(sales, universe)

A container for the sales and universe DataFrames, many functions operate on this data structure. This data structure is necessary because the sales and universe DataFrames are often used together and need to be passed around together. The sales represent transactions and any known data at the time of the transaction, while the universe represents the current state of all parcels. The sales dataframe specifically allows for duplicate primary parcel transaction keys, since an individual parcel may have sold multiple times. The universe dataframe forbids duplicate primary parcel keys.

Attributes:

Name Type Description
sales DataFrame

DataFrame containing sales data.

universe DataFrame

DataFrame containing universe (parcel) data.

copy

copy()

Create a copy of the SalesUniversePair object.

Returns:

Type Description
SalesUniversePair

A new SalesUniversePair object with copied DataFrames.

Source code in openavmkit/data.py
117
118
119
120
121
122
123
124
125
def copy(self):
    """Create a copy of the SalesUniversePair object.

    Returns
    -------
    SalesUniversePair
        A new SalesUniversePair object with copied DataFrames.
    """
    return SalesUniversePair(self.sales.copy(), self.universe.copy())

set

set(key, value)

Set the sales or universe DataFrame.

Attributes:

Name Type Description
key str

Either "sales" or "universe".

value DataFrame

The new DataFrame to set for the specified key.

Raises:

Type Description
ValueError

If an invalid key is provided

Source code in openavmkit/data.py
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
def set(self, key: str, value: pd.DataFrame):
    """Set the sales or universe DataFrame.

    Attributes
    ----------
    key : str
        Either "sales" or "universe".
    value : pd.DataFrame
        The new DataFrame to set for the specified key.

    Raises
    ------
    ValueError
        If an invalid key is provided
    """
    if key == "sales":
        self.sales = value
    elif key == "universe":
        self.universe = value
    else:
        raise ValueError(f"Invalid key: {key}")

update_sales

update_sales(new_sales, allow_remove_rows)

Update the sales DataFrame with new information as an overlay without redundancy.

This function lets you push updates to "sales" while keeping it as an "overlay" that doesn't contain any redundant information.

  • First we note what fields were in sales last time.
  • Then we note what sales are in universe but were not in sales.
  • Finally, we determine the new fields generated in new_sales that are not in the previous sales or in the universe.
  • A modified version of df_sales is created with only two changes:
  • Reduced to the correct selection of keys.
  • Addition of the newly generated fields.

Parameters:

Name Type Description Default
new_sales DataFrame

New sales DataFrame with updates.

required
allow_remove_rows bool

If True, allows the update to remove rows from sales. If False, preserves all original rows.

required
Source code in openavmkit/data.py
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
def update_sales(self, new_sales: pd.DataFrame, allow_remove_rows: bool):
    """
    Update the sales DataFrame with new information as an overlay without redundancy.

    This function lets you push updates to "sales" while keeping it as an "overlay" that
    doesn't contain any redundant information.

    - First we note what fields were in sales last time.
    - Then we note what sales are in universe but were not in sales.
    - Finally, we determine the new fields generated in new_sales that are not in the
      previous sales or in the universe.
    - A modified version of df_sales is created with only two changes:
      - Reduced to the correct selection of keys.
      - Addition of the newly generated fields.

    Parameters
    ----------
    new_sales : pd.DataFrame
        New sales DataFrame with updates.
    allow_remove_rows : bool
        If True, allows the update to remove rows from sales. If False, preserves all
        original rows.
    """

    old_fields = self.sales.columns.values
    univ_fields = [
        field for field in self.universe.columns.values if field not in old_fields
    ]
    new_fields = [
        field
        for field in new_sales.columns.values
        if field not in old_fields and field not in univ_fields
    ]

    old_sales = self.sales.copy()
    return_keys = new_sales["key_sale"].values
    if not allow_remove_rows and len(return_keys) > len(old_sales):
        raise ValueError(
            "The new sales DataFrame contains more keys than the old sales DataFrame. update_sales() may only be used to shrink the dataframe or keep it the same size. Use set() if you intend to replace the sales dataframe."
        )

    if allow_remove_rows:
        old_sales = old_sales[old_sales["key_sale"].isin(return_keys)].reset_index(
            drop=True
        )
    reconciled = combine_dfs(
        old_sales,
        new_sales[["key_sale"] + new_fields].copy().reset_index(drop=True),
        index="key_sale",
    )
    self.sales = reconciled

enrich_df_streets

enrich_df_streets(df_in, settings, spacing=1.0, max_ray_length=25.0, network_buffer=500.0, verbose=False)

Enrich a GeoDataFrame with street network data.

This function enriches the input GeoDataFrame with street network data by calculating frontage, depth, distance to street, and many other related metrics, for every road vs. every parcel in the GeoDataFrame, using OpenStreetMap data.

WARNING: This function can be VERY computationally and memory intensive for large datasets and may take a long time to run.

We definitely need to work on its performance or make it easier to split into smaller chunks.

Parameters:

Name Type Description Default
df_in GeoDataFrame

Input GeoDataFrame containing parcels.

required
settings dict

Settings dictionary containing configuration for the enrichment.

required
spacing float

Spacing in meters for ray casting to calculate distances to streets. Default is 1.0.

1.0
max_ray_length float

Maximum length of rays to shoot for distance calculations, in meters. Default is 25.0.

25.0
network_buffer float

Buffer around the street network to consider for distance calculations, in meters. Default is 500.0.

500.0
verbose bool

If True, prints progress information. Default is False.

False

Returns:

Type Description
GeoDataFrame

Enriched GeoDataFrame with additional columns for street-related metrics.

Source code in openavmkit/data.py
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
def enrich_df_streets(
    df_in: gpd.GeoDataFrame,
    settings: dict,
    spacing: float = 1.0,  # in meters
    max_ray_length: float = 25.0,  # meters to shoot rays
    network_buffer: float = 500.0,  # buffer for street network
    verbose: bool = False,
) -> gpd.GeoDataFrame:
    """Enrich a GeoDataFrame with street network data.

    This function enriches the input GeoDataFrame with street network data by calculating
    frontage, depth, distance to street, and many other related metrics, for every road vs.
    every parcel in the GeoDataFrame, using OpenStreetMap data.

    WARNING: This function can be VERY computationally and memory intensive for large datasets
    and may take a long time to run.

    We definitely need to work on its performance or make it easier to split into smaller chunks.

    Parameters
    ----------
    df_in : gpd.GeoDataFrame
        Input GeoDataFrame containing parcels.
    settings : dict
        Settings dictionary containing configuration for the enrichment.
    spacing : float, optional
        Spacing in meters for ray casting to calculate distances to streets. Default is 1.0.
    max_ray_length : float, optional
        Maximum length of rays to shoot for distance calculations, in meters. Default is 25.0.
    network_buffer : float, optional
        Buffer around the street network to consider for distance calculations, in meters.
        Default is 500.0.
    verbose : bool, optional
        If True, prints progress information. Default is False.

    Returns
    -------
    gpd.GeoDataFrame
        Enriched GeoDataFrame with additional columns for street-related metrics.
    """
    df_out = _enrich_df_streets(
        df_in, settings, spacing, max_ray_length, network_buffer, verbose
    )

    # add somers unit land size normalization using frontage & depth
    df_out["land_area_somers_ft"] = get_size_in_somers_units_ft(
        df_out["frontage_ft_1"], df_out["depth_ft_1"]
    )

    return df_out

enrich_sup_spatial_lag

enrich_sup_spatial_lag(sup, settings, verbose=False)

Enrich the sales and universe DataFrames with spatial lag features.

This function calculates "spatial lag", that is, the spatially-weighted average, of the sale price and other fields, based on nearest neighbors.

For sales, the spatial lag is calculated based on the training set of sales. For non-sale characteristics, the spatial lag is calculated based on the universe parcels.

Parameters:

Name Type Description Default
sup SalesUniversePair

SalesUniversePair containing sales and universe DataFrames.

required
settings dict

Settings dictionary.

required
verbose bool

If True, prints progress information.

False

Returns:

Type Description
SalesUniversePair

Enriched SalesUniversePair with spatial lag features.

Source code in openavmkit/data.py
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
def enrich_sup_spatial_lag(
    sup: SalesUniversePair, 
    settings: dict, 
    verbose: bool = False
) -> SalesUniversePair:
    """Enrich the sales and universe DataFrames with spatial lag features.

    This function calculates "spatial lag", that is, the spatially-weighted
    average, of the sale price and other fields, based on nearest neighbors.

    For sales, the spatial lag is calculated based on the training set of sales.
    For non-sale characteristics, the spatial lag is calculated based on the
    universe parcels.

    Parameters
    ----------
    sup : SalesUniversePair
        SalesUniversePair containing sales and universe DataFrames.
    settings : dict
        Settings dictionary.
    verbose : bool, optional
        If True, prints progress information.

    Returns
    -------
    SalesUniversePair
        Enriched SalesUniversePair with spatial lag features.
    """

    mg_ids = get_model_group_ids(settings)

    df_sales = sup.sales
    df_universe = sup.universe

    # For each model group, calculate its spatial lag surface(s)
    for mg in mg_ids:
        sup_mg = _enrich_sup_spatial_lag_for_model_group(
            sup,
            settings,
            mg,
            verbose
        )
        if sup_mg is None:
            continue
        # For each spatial lag surface, copy it back to the master SalesUniversePair
        sl_cols = [field for field in sup_mg.universe.columns if field.startswith("spatial_lag_")]
        for col in sl_cols:
            # Only fill in values that haven't been set already
            if col in sup_mg.sales:
                df_sales = fill_from_df(df_sales, sup_mg.sales, "key_sale", col)
            if col in sup_mg.universe:
                df_universe = fill_from_df(df_universe, sup_mg.universe, "key", col)

    sup.sales = df_sales
    sup.universe = df_universe

    return sup

enrich_time

enrich_time(df, time_formats, settings)

Enrich the DataFrame by converting specified time fields to datetime and deriving additional fields.

For each key in time_formats, converts the column to datetime. Then, if a field with the prefix "sale" exists, enriches the DataFrame with additional time fields (e.g., "sale_year", "sale_month", "sale_age_days").

Parameters:

Name Type Description Default
df DataFrame

Input DataFrame.

required
time_formats dict

Dictionary mapping field names to datetime formats.

required
settings dict

Settings dictionary.

required

Returns:

Type Description
DataFrame

DataFrame with enriched time fields.

Source code in openavmkit/data.py
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
def enrich_time(df: pd.DataFrame, time_formats: dict, settings: dict) -> pd.DataFrame:
    """
    Enrich the DataFrame by converting specified time fields to datetime and deriving additional fields.

    For each key in time_formats, converts the column to datetime. Then, if a field with
    the prefix "sale" exists, enriches the DataFrame with additional time fields (e.g.,
    "sale_year", "sale_month", "sale_age_days").

    Parameters
    ----------
    df : pandas.DataFrame
        Input DataFrame.
    time_formats : dict
        Dictionary mapping field names to datetime formats.
    settings : dict
        Settings dictionary.

    Returns
    -------
    pandas.DataFrame
        DataFrame with enriched time fields.
    """

    for key in time_formats:
        time_format = time_formats[key]
        if key in df:
            df[key] = pd.to_datetime(df[key], format=time_format, errors="coerce")

    for prefix in ["sale"]:
        do_enrich = False
        for col in df.columns.values:
            if f"{prefix}_" in col:
                do_enrich = True
                break
        if do_enrich:
            df = _enrich_time_field(
                df, prefix, add_year_month=True, add_year_quarter=True
            )
            if prefix == "sale":
                df = _enrich_sale_age_days(df, settings)

    return df

filter_df_by_date_range

filter_df_by_date_range(df, start_date, end_date)

Filter df to rows where 'sale_date' is between start_date and end_date (inclusive). - start_date/end_date may be 'YYYY-MM-DD' strings or date/datetime/Timestamp. - Time-of-day and time zones are ignored. - Rows with missing/unparseable 'sale_date' are dropped.

Source code in openavmkit/data.py
5004
5005
5006
5007
5008
5009
5010
5011
5012
5013
5014
5015
5016
5017
5018
5019
5020
5021
5022
5023
5024
5025
5026
5027
5028
5029
5030
5031
5032
5033
5034
5035
5036
5037
5038
5039
5040
def filter_df_by_date_range(df, start_date, end_date):
    """
    Filter df to rows where 'sale_date' is between start_date and end_date (inclusive).
    - start_date/end_date may be 'YYYY-MM-DD' strings or date/datetime/Timestamp.
    - Time-of-day and time zones are ignored.
    - Rows with missing/unparseable 'sale_date' are dropped.
    """
    import pandas as pd
    from datetime import date, datetime, timedelta
    from pandas.api.types import is_datetime64tz_dtype

    def _as_date(x):
        # If already a date (but not datetime), keep it
        if isinstance(x, date) and not isinstance(x, datetime):
            return x
        # Otherwise parse and take the calendar date
        return pd.to_datetime(x).date()

    start_d = _as_date(start_date)
    end_d   = _as_date(end_date)
    if start_d > end_d:
        raise ValueError("start_date cannot be after end_date.")

    # Coerce to datetime; tolerate bad/missing → NaT
    s = pd.to_datetime(df["sale_date"], errors="coerce")

    # Strip timezone info if present, preserving local wall time
    if is_datetime64tz_dtype(s.dtype):
        s = s.dt.tz_localize(None)

    # Build inclusive range using an exclusive upper bound
    start_ts = pd.Timestamp(start_d)                       # 00:00:00 on start day
    end_excl = pd.Timestamp(end_d) + pd.Timedelta(days=1)  # first moment after end day

    # NaT values compare as False and will be dropped
    mask = s.ge(start_ts) & s.lt(end_excl)
    return df.loc[mask].copy()

get_dtypes_from_settings

get_dtypes_from_settings(settings)

Generate a dictionary mapping fields to their designated data types based on settings.

Parameters:

Name Type Description Default
settings dict

Settings dictionary.

required

Returns:

Type Description
dict

Dictionary of field names to data type strings.

Source code in openavmkit/data.py
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
def get_dtypes_from_settings(settings: dict) -> dict:
    """
    Generate a dictionary mapping fields to their designated data types based on settings.

    Parameters
    ----------
    settings : dict
        Settings dictionary.

    Returns
    -------
    dict
        Dictionary of field names to data type strings.
    """

    cats = get_fields_categorical(settings, include_boolean=False)
    bools = get_fields_boolean(settings)
    nums = get_fields_numeric(settings, include_boolean=False)
    dtypes = {}
    for c in cats:
        dtypes[c] = "string"
    for b in bools:
        dtypes[b] = "bool"
    for n in nums:
        dtypes[n] = "Float64"
    return dtypes

get_field_classifications

get_field_classifications(settings)

Retrieve a mapping of field names to their classifications (land, improvement or other) as well as their types (numeric, categorical, or boolean).

Parameters:

Name Type Description Default
settings dict

Settings dictionary.

required

Returns:

Type Description
dict

Dictionary mapping field names to type and class.

Source code in openavmkit/data.py
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
def get_field_classifications(settings: dict) -> dict:
    """
    Retrieve a mapping of field names to their classifications (land, improvement or other)
    as well as their types (numeric, categorical, or boolean).

    Parameters
    ----------
    settings : dict
        Settings dictionary.

    Returns
    -------
    dict
        Dictionary mapping field names to type and class.
    """

    field_map = {}
    for ftype in ["land", "impr", "other"]:
        nums = get_fields_numeric(
            settings, df=None, include_boolean=False, types=[ftype]
        )
        cats = get_fields_categorical(
            settings, df=None, include_boolean=False, types=[ftype]
        )
        bools = get_fields_boolean(settings, df=None, types=[ftype])
        for field in nums:
            field_map[field] = {"type": ftype, "class": "numeric"}
        for field in cats:
            field_map[field] = {"type": ftype, "class": "categorical"}
        for field in bools:
            field_map[field] = {"type": ftype, "class": "boolean"}
    return field_map

get_hydrated_sales_from_sup

get_hydrated_sales_from_sup(sup)

Merge the sales and universe DataFrames to "hydrate" the sales data.

The sales data represents transactions and any known data at the time of the transaction, while the universe data represents the current state of all parcels. When we merge the two sets, the sales data overrides any existing data in the universe data. This is useful for creating a "hydrated" sales DataFrame that contains all the information available at the time of the sale (it is assumed that any difference between the current state of the parcel and the state at the time of the sale is accounted for in the sales data).

If the merged DataFrame contains a "geometry" column and the original sales did not, the result is converted to a GeoDataFrame.

Parameters:

Name Type Description Default
sup SalesUniversePair

SalesUniversePair containing sales and universe DataFrames.

required

Returns:

Type Description
DataFrame or GeoDataFrame

The merged (hydrated) sales DataFrame.

Source code in openavmkit/data.py
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
def get_hydrated_sales_from_sup(sup: SalesUniversePair):
    """
    Merge the sales and universe DataFrames to "hydrate" the sales data.

    The sales data represents transactions and any known data at the time of the transaction,
    while the universe data represents the current state of all parcels. When we merge the
    two sets, the sales data overrides any existing data in the universe data. This is useful
    for creating a "hydrated" sales DataFrame that contains all the information available at
    the time of the sale (it is assumed that any difference between the current state of the
    parcel and the state at the time of the sale is accounted for in the sales data).

    If the merged DataFrame contains a "geometry" column and the original sales did not,
    the result is converted to a GeoDataFrame.

    Parameters
    ----------
    sup : SalesUniversePair
        SalesUniversePair containing sales and universe DataFrames.

    Returns
    -------
    pd.DataFrame or gpd.GeoDataFrame
        The merged (hydrated) sales DataFrame.
    """

    df_sales = sup["sales"]
    df_univ = sup["universe"].copy()
    df_univ = df_univ[df_univ["key"].isin(df_sales["key"].values)].reset_index(
        drop=True
    )
    df_merged = merge_and_stomp_dfs(df_sales, df_univ, df2_stomps=False)

    if "geometry" in df_merged and "geometry" not in df_sales:
        # convert df_merged to geodataframe:
        df_merged = gpd.GeoDataFrame(df_merged, geometry="geometry")

    return df_merged

get_important_field

get_important_field(settings, field_name, df=None)

Retrieve the important field name for a given field alias from settings.

Parameters:

Name Type Description Default
settings dict

Settings dictionary.

required
field_name str

Identifier for the field.

required
df DataFrame

Optional DataFrame to check field existence.

None

Returns:

Type Description
str or None

The mapped field name if found, else None.

Source code in openavmkit/data.py
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
def get_important_field(
    settings: dict, field_name: str, df: pd.DataFrame = None
) -> str | None:
    """
    Retrieve the important field name for a given field alias from settings.

    Parameters
    ----------
    settings : dict
        Settings dictionary.
    field_name : str
        Identifier for the field.
    df : pandas.DataFrame, optional
        Optional DataFrame to check field existence.

    Returns
    -------
    str or None
        The mapped field name if found, else None.
    """

    imp = settings.get("field_classification", {}).get("important", {})
    other_name = imp.get("fields", {}).get(field_name, None)
    if df is not None:
        if other_name is not None and other_name in df:
            return other_name
        else:
            return None
    return other_name

get_important_fields

get_important_fields(settings, df=None)

Retrieve important field names from settings.

Parameters:

Name Type Description Default
settings dict

Settings dictionary.

required
df DataFrame

Optional DataFrame to filter fields.

None

Returns:

Type Description
list[str]

List of important field names.

Source code in openavmkit/data.py
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
def get_important_fields(settings: dict, df: pd.DataFrame = None) -> list[str]:
    """
    Retrieve important field names from settings.

    Parameters
    ----------
    settings : dict
        Settings dictionary.
    df : pandas.DataFrame, optional
        Optional DataFrame to filter fields.

    Returns
    -------
    list[str]
        List of important field names.
    """

    imp = settings.get("field_classification", {}).get("important", {})
    fields = imp.get("fields", {})
    list_fields = []
    if df is not None:
        for field in fields:
            other_name = fields[field]
            if other_name in df:
                list_fields.append(other_name)
    return list_fields

get_locations

get_locations(settings, df=None)

Retrieve location fields from settings. These are all the fields that are considered locations.

Parameters:

Name Type Description Default
settings dict

Settings dictionary.

required
df DataFrame

Optional DataFrame to filter available locations.

None

Returns:

Type Description
list[str]

List of location field names.

Source code in openavmkit/data.py
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
def get_locations(settings: dict, df: pd.DataFrame = None) -> list[str]:
    """
    Retrieve location fields from settings. These are all the fields that are considered locations.

    Parameters
    ----------
    settings : dict
        Settings dictionary.
    df : pandas.DataFrame, optional
        Optional DataFrame to filter available locations.

    Returns
    -------
    list[str]
        List of location field names.
    """

    locations = (
        settings.get("field_classification", {})
        .get("important", {})
        .get("locations", [])
    )
    if df is not None:
        locations = [loc for loc in locations if loc in df]
    return locations

get_report_locations

get_report_locations(settings, df=None)

Retrieve report location fields from settings.

These are location fields that will be used in report breakdowns, such as for ratio studies.

Parameters:

Name Type Description Default
settings dict

Settings dictionary.

required
df DataFrame

Optional DataFrame to filter available locations.

None

Returns:

Type Description
list[str]

List of report location field names.

Source code in openavmkit/data.py
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
def get_report_locations(settings: dict, df: pd.DataFrame = None) -> list[str]:
    """
    Retrieve report location fields from settings.

    These are location fields that will be used in report breakdowns, such as for ratio studies.

    Parameters
    ----------
    settings : dict
        Settings dictionary.
    df : pandas.DataFrame, optional
        Optional DataFrame to filter available locations.

    Returns
    -------
    list[str]
        List of report location field names.
    """

    locations = (
        settings.get("field_classification", {})
        .get("important", {})
        .get("report_locations", [])
    )
    if df is not None:
        locations = [loc for loc in locations if loc in df]
    return locations

get_sale_field

get_sale_field(settings, df=None)

Determine the appropriate sale price field ("sale_price" or "sale_price_time_adj") based on time adjustment settings.

Parameters:

Name Type Description Default
settings dict

Settings dictionary.

required
df DataFrame

Optional DataFrame to check field existence.

None

Returns:

Type Description
str

Field name to be used for sale price.

Source code in openavmkit/data.py
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
def get_sale_field(settings: dict, df: pd.DataFrame = None) -> str:
    """
    Determine the appropriate sale price field ("sale_price" or "sale_price_time_adj")
    based on time adjustment settings.

    Parameters
    ----------
    settings : dict
        Settings dictionary.
    df : pandas.DataFrame, optional
        Optional DataFrame to check field existence.

    Returns
    -------
    str
        Field name to be used for sale price.
    """

    ta = settings.get("data", {}).get("process", {}).get("time_adjustment", {})
    use = ta.get("use", True)
    if use:
        sale_field = "sale_price_time_adj"
    else:
        sale_field = "sale_price"
    if df is not None:
        if sale_field == "sale_price_time_adj" and "sale_price_time_adj" in df:
            return "sale_price_time_adj"
    return sale_field

get_train_test_keys

get_train_test_keys(df_in, settings)

Get the training and testing keys for the sales DataFrame.

This function gets the train/test keys for each model group defined in the settings, combines them into a single mask for the sales DataFrame, and returns the keys for training and testing as numpy arrays.

Parameters:

Name Type Description Default
df_in DataFrame

Input DataFrame containing sales data.

required
settings dict

Settings dictionary

required

Returns:

Type Description
tuple

A tuple containing two numpy arrays: keys_train and keys_test. - keys_train: keys for training set - keys_test: keys for testing set

Source code in openavmkit/data.py
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
def get_train_test_keys(df_in: pd.DataFrame, settings: dict):
    """Get the training and testing keys for the sales DataFrame.

    This function gets the train/test keys for each model group defined in the settings,
    combines them into a single mask for the sales DataFrame, and returns the keys for
    training and testing as numpy arrays.

    Parameters
    ----------
    df_in : pd.DataFrame
        Input DataFrame containing sales data.
    settings : dict
        Settings dictionary

    Returns
    -------
    tuple
        A tuple containing two numpy arrays: keys_train and keys_test.
        - keys_train: keys for training set
        - keys_test: keys for testing set
    """

    model_group_ids = get_model_group_ids(settings, df_in)

    # an empty mask the same size as the input DataFrame
    mask_train = pd.Series(np.zeros(len(df_in), dtype=bool), index=df_in.index)
    mask_test = pd.Series(np.zeros(len(df_in), dtype=bool), index=df_in.index)

    for model_group in model_group_ids:
        # Read the split keys for the model group
        test_keys, train_keys = _read_split_keys(model_group)

        # Filter the DataFrame based on the keys
        mask_test |= df_in["key_sale"].isin(test_keys)
        mask_train |= df_in["key_sale"].isin(train_keys)

    keys_test = df_in.loc[mask_test, "key_sale"].values
    keys_train = df_in.loc[mask_train, "key_sale"].values

    return keys_train, keys_test

get_train_test_masks

get_train_test_masks(df_in, settings)

Get the training and testing masks for the sales DataFrame.

This function gets the train/test masks for each model group defined in the settings, combines them into a single mask for the sales DataFrame, and returns the masks as pandas Series

Parameters:

Name Type Description Default
df_in DataFrame

Input DataFrame containing sales data.

required
settings dict

Settings dictionary

required

Returns:

Type Description
tuple

A tuple containing two pandas Series: mask_train and mask_test. - mask_train: boolean mask for training set - mask_test: boolean mask for testing set

Source code in openavmkit/data.py
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
def get_train_test_masks(df_in: pd.DataFrame, settings: dict):
    """Get the training and testing masks for the sales DataFrame.

    This function gets the train/test masks for each model group defined in the settings,
    combines them into a single mask for the sales DataFrame, and returns the masks as pandas Series

    Parameters
    ----------
    df_in : pd.DataFrame
        Input DataFrame containing sales data.
    settings : dict
        Settings dictionary

    Returns
    -------
    tuple
        A tuple containing two pandas Series: mask_train and mask_test.
        - mask_train: boolean mask for training set
        - mask_test: boolean mask for testing set
    """
    model_group_ids = get_model_group_ids(settings, df_in)

    # an empty mask the same size as the input DataFrame
    mask_train = pd.Series(np.zeros(len(df_in), dtype=bool), index=df_in.index)
    mask_test = pd.Series(np.zeros(len(df_in), dtype=bool), index=df_in.index)

    for model_group in model_group_ids:
        # Read the split keys for the model group
        test_keys, train_keys = _read_split_keys(model_group)

        # Filter the DataFrame based on the keys
        mask_test |= df_in["key_sale"].isin(test_keys)
        mask_train |= df_in["key_sale"].isin(train_keys)

    return mask_train, mask_test

get_vacant

get_vacant(df_in, settings, invert=False)

Filter the DataFrame based on the 'is_vacant' column.

Parameters:

Name Type Description Default
df_in DataFrame

Input DataFrame.

required
settings dict

Settings dictionary.

required
invert bool

If True, return non-vacant rows.

False

Returns:

Type Description
DataFrame

DataFrame filtered by the is_vacant flag.

Raises:

Type Description
ValueError

If the is_vacant column is not boolean.

Source code in openavmkit/data.py
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
def get_vacant(
    df_in: pd.DataFrame, settings: dict, invert: bool = False
) -> pd.DataFrame:
    """
    Filter the DataFrame based on the 'is_vacant' column.

    Parameters
    ----------
    df_in : pandas.DataFrame
        Input DataFrame.
    settings : dict
        Settings dictionary.
    invert : bool, optional
        If True, return non-vacant rows.

    Returns
    -------
    pandas.DataFrame
        DataFrame filtered by the `is_vacant` flag.

    Raises
    ------
    ValueError
        If the `is_vacant` column is not boolean.
    """

    df = df_in.copy()
    is_vacant_dtype = df["is_vacant"].dtype
    if is_vacant_dtype != bool:
        raise ValueError(
            f"The 'is_vacant' column must be a boolean type (found: {is_vacant_dtype})"
        )
    idx_vacant = df["is_vacant"].eq(True)
    if invert:
        idx_vacant = ~idx_vacant
    df_vacant = df[idx_vacant].copy()
    return df_vacant

get_vacant_sales

get_vacant_sales(df_in, settings, invert=False)

Filter the sales DataFrame to return only vacant (unimproved) sales.

Parameters:

Name Type Description Default
df_in DataFrame

Input DataFrame.

required
settings dict

Settings dictionary.

required
invert bool

If True, return non-vacant (improved) sales.

False

Returns:

Type Description
DataFrame

DataFrame with an added is_vacant column.

Source code in openavmkit/data.py
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
def get_vacant_sales(
    df_in: pd.DataFrame, settings: dict, invert: bool = False
) -> pd.DataFrame:
    """
    Filter the sales DataFrame to return only vacant (unimproved) sales.

    Parameters
    ----------
    df_in : pandas.DataFrame
        Input DataFrame.
    settings : dict
        Settings dictionary.
    invert : bool, optional
        If True, return non-vacant (improved) sales.

    Returns
    -------
    pandas.DataFrame
        DataFrame with an added `is_vacant` column.
    """

    df = df_in.copy()
    df = _boolify_column_in_df(df, "vacant_sale", "na_false")
    idx_vacant_sale = df["vacant_sale"].eq(True)
    if invert:
        idx_vacant_sale = ~idx_vacant_sale
    df_vacant_sales = df[idx_vacant_sale].copy()
    return df_vacant_sales

process_data

process_data(dataframes, settings, verbose=False)

Process raw dataframes according to settings and return a SalesUniversePair.

Parameters:

Name Type Description Default
dataframes dict[str, DataFrame]

Dictionary mapping keys to DataFrames.

required
settings dict

Settings dictionary.

required
verbose bool

If True, prints progress information.

False

Returns:

Type Description
SalesUniversePair

A SalesUniversePair containing processed sales and universe data.

Raises:

Type Description
ValueError

If required merge instructions or columns are missing.

Source code in openavmkit/data.py
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
def process_data(
    dataframes: dict[str, pd.DataFrame], settings: dict, verbose: bool = False
) -> SalesUniversePair:
    """
    Process raw dataframes according to settings and return a SalesUniversePair.

    Parameters
    ----------
    dataframes : dict[str, pd.DataFrame]
        Dictionary mapping keys to DataFrames.
    settings : dict
        Settings dictionary.
    verbose : bool, optional
        If True, prints progress information.

    Returns
    -------
    SalesUniversePair
        A SalesUniversePair containing processed sales and universe data.

    Raises
    ------
    ValueError
        If required merge instructions or columns are missing.
    """

    s_data = settings.get("data", {})
    s_process = s_data.get("process", {})
    s_merge = s_process.get("merge", {})

    merge_univ: list | None = s_merge.get("universe", None)
    merge_sales: list | None = s_merge.get("sales", None)

    if merge_univ is None:
        raise ValueError(
            'No "universe" merge instructions found. data.process.merge must have exactly two keys: "universe", and "sales"'
        )
    if merge_sales is None:
        raise ValueError(
            'No "sales" merge instructions found. data.process.merge must have exactly two keys: "universe", and "sales"'
        )

    df_univ = _merge_dict_of_dfs(dataframes, merge_univ, settings, required_key="key")
    df_sales = _merge_dict_of_dfs(
        dataframes, merge_sales, settings, required_key="key_sale"
    )

    if "valid_sale" not in df_sales:
        raise ValueError("The 'valid_sale' column is required in the sales data.")
    if "vacant_sale" not in df_sales:
        raise ValueError("The 'vacant_sale' column is required in the sales data.")
    # Print number and percentage of valid sales
    valid_count = df_sales["valid_sale"].sum()
    total_count = len(df_sales)
    valid_percent = (valid_count / total_count * 100) if total_count > 0 else 0
    print(f"Valid sales: {valid_count} ({valid_percent:.1f}% of {total_count} total)")
    df_sales = df_sales[df_sales["valid_sale"].eq(True)].copy().reset_index(drop=True)

    sup: SalesUniversePair = SalesUniversePair(universe=df_univ, sales=df_sales)

    sup = _enrich_data(
        sup, s_process.get("enrich", {}), dataframes, settings, verbose=verbose
    )

    dupe_univ: dict | None = s_process.get("dupes", {}).get("universe", None)
    dupe_sales: dict | None = s_process.get("dupes", {}).get("sales", None)
    if dupe_univ:
        sup.set(
            "universe",
            _handle_duplicated_rows(sup.universe, dupe_univ, verbose=verbose),
        )
    if dupe_sales:
        sup.set(
            "sales", _handle_duplicated_rows(sup.sales, dupe_sales, verbose=verbose)
        )

    return sup

write_gpkg

write_gpkg(df, path)

Write data to a geopackage file.

Parameters:

Name Type Description Default
df DataFrame

Data to be written

required
path str

File path for saving the geopackage.

required
Source code in openavmkit/data.py
4886
4887
4888
4889
4890
4891
4892
4893
4894
4895
4896
4897
4898
4899
4900
4901
4902
4903
4904
4905
4906
4907
4908
4909
4910
4911
def write_gpkg(df, path):
    """
    Write data to a geopackage file.

    Parameters
    ----------
    df : pd.DataFrame
        Data to be written
    path : str
        File path for saving the geopackage.
    """
    if not path.endswith(".gpkg"):
        raise ValueError("Path must end with .gpkg!")

    # If it has a geometry column, write as GeoParquet
    if "geometry" in df.columns:
        # Ensure it's a GeoDataFrame
        gdf = df if isinstance(df, gpd.GeoDataFrame) else gpd.GeoDataFrame(df, geometry="geometry", crs=getattr(df, "crs", None))

        # You MUST have a CRS for it to be recorded in metadata
        if gdf.crs is None:
            raise ValueError(f"{path}: geometry has no CRS. Set it (e.g., gdf = gdf.set_crs('EPSG:4326')) before writing.")

        gdf.to_file(path, driver='GPKG', layer='name', mode='w')
    else:
        raise ValueError("cannot write to gpkg without geometry")

write_parquet

write_parquet(df, path)

Write data to a parquet file.

Parameters:

Name Type Description Default
df DataFrame

Data to be written

required
path str

File path for saving the parquet.

required
Source code in openavmkit/data.py
4855
4856
4857
4858
4859
4860
4861
4862
4863
4864
4865
4866
4867
4868
4869
4870
4871
4872
4873
4874
4875
4876
4877
4878
4879
4880
4881
4882
4883
def write_parquet(df, path):
    """
    Write data to a parquet file.

    Parameters
    ----------
    df : pd.DataFrame
        Data to be written
    path : str
        File path for saving the parquet.
    """

    if not path.endswith(".parquet"):
        raise ValueError("Path must end with .parquet!")

    # If it has a geometry column, write as GeoParquet
    if "geometry" in df.columns:
        # Ensure it's a GeoDataFrame
        gdf = df if isinstance(df, gpd.GeoDataFrame) else gpd.GeoDataFrame(df, geometry="geometry", crs=getattr(df, "crs", None))

        # You MUST have a CRS for it to be recorded in metadata
        if gdf.crs is None:
            raise ValueError(f"{path}: geometry has no CRS. Set it (e.g., gdf = gdf.set_crs('EPSG:4326')) before writing.")

        # GeoPandas writes WKB + GeoParquet metadata (including CRS)
        gdf.to_parquet(path, engine="pyarrow", index=False)
    else:
        # Regular table
        df.to_parquet(path, engine="pyarrow", index=False)

write_shapefile

write_shapefile(df, path)

Write data to a shapefile file.

Parameters:

Name Type Description Default
df DataFrame

Data to be written

required
path str

File path for saving the shapefile.

required
Source code in openavmkit/data.py
4914
4915
4916
4917
4918
4919
4920
4921
4922
4923
4924
4925
4926
4927
4928
4929
4930
4931
4932
4933
4934
4935
4936
4937
4938
4939
4940
def write_shapefile(df, path):
    """
    Write data to a shapefile file.

    Parameters
    ----------
    df : pd.DataFrame
        Data to be written
    path : str
        File path for saving the shapefile.
    """

    if not path.endswith(".shp"):
        raise ValueError("Path must end with .shp!")

    # If it has a geometry column, write as GeoParquet
    if "geometry" in df.columns:
        # Ensure it's a GeoDataFrame
        gdf = df if isinstance(df, gpd.GeoDataFrame) else gpd.GeoDataFrame(df, geometry="geometry", crs=getattr(df, "crs", None))

        # You MUST have a CRS for it to be recorded in metadata
        if gdf.crs is None:
            raise ValueError(f"{path}: geometry has no CRS. Set it (e.g., gdf = gdf.set_crs('EPSG:4326')) before writing.")

        gdf.to_file(path)
    else:
        raise ValueError("cannot write to gpkg without geometry")

write_zipped_shapefile

write_zipped_shapefile(df, path)

Write a zipped ESRI Shapefile. Produces a single {name}.shp.zip with the shapefile parts (name.shp, .shx, .dbf, .prj, .cpg, etc.) at the ZIP root.

Parameters:

Name Type Description Default
df DataFrame or GeoDataFrame

Data to be written (must include a 'geometry' column and a CRS).

required
path str

Destination path ending with '.shp.zip' (e.g., 'out/roads.shp.zip').

required

Returns:

Type Description
Path

Path to the created .shp.zip

Source code in openavmkit/data.py
4943
4944
4945
4946
4947
4948
4949
4950
4951
4952
4953
4954
4955
4956
4957
4958
4959
4960
4961
4962
4963
4964
4965
4966
4967
4968
4969
4970
4971
4972
4973
4974
4975
4976
4977
4978
4979
4980
4981
4982
4983
4984
4985
4986
4987
4988
4989
4990
4991
4992
4993
4994
4995
4996
4997
4998
4999
5000
5001
def write_zipped_shapefile(df, path: str) -> Path:
    """
    Write a zipped ESRI Shapefile. Produces a single {name}.shp.zip with the
    shapefile parts (name.shp, .shx, .dbf, .prj, .cpg, etc.) at the ZIP root.

    Parameters
    ----------
    df : pd.DataFrame or gpd.GeoDataFrame
        Data to be written (must include a 'geometry' column and a CRS).
    path : str
        Destination path ending with '.shp.zip' (e.g., 'out/roads.shp.zip').

    Returns
    -------
    pathlib.Path
        Path to the created .shp.zip
    """
    p = Path(path)

    # Require ".shp.zip" exactly, per your spec
    if p.suffixes[-2:] != [".shp", ".zip"]:
        raise ValueError("Path must end with .shp.zip (e.g., 'out/roads.shp.zip').")

    # layer name (strip .zip then .shp)
    layer = Path(p.stem).stem
    if not layer:
        raise ValueError("Could not derive layer name from path.")

    # Make sure parent directory exists
    p.parent.mkdir(parents=True, exist_ok=True)

    # Write shapefile into a temp dir, then zip and move atomically
    with tempfile.TemporaryDirectory() as tmpdir_str:
        tmpdir = Path(tmpdir_str)
        shp_path = tmpdir / f"{layer}.shp"

        # Reuse your existing function (validates geometry + CRS)
        write_shapefile(df, str(shp_path))

        # Common shapefile sidecar extensions we may need to include if present
        sidecars = {
            ".shp", ".shx", ".dbf", ".prj", ".cpg",
            ".qix", ".sbn", ".sbx", ".fbn", ".fbx",
            ".ain", ".aih", ".ixs", ".mxs", ".atx",
            ".xml", ".qpj"
        }

        tmp_zip = tmpdir / f"{layer}.shp.zip"
        with zipfile.ZipFile(tmp_zip, "w", compression=zipfile.ZIP_DEFLATED) as zf:
            for ext in sorted(sidecars):
                f = tmpdir / f"{layer}{ext}"
                if f.exists():
                    # Store with just the filename at the ZIP root
                    zf.write(f, arcname=f.name)

        # Move the finished ZIP to the destination (overwrites if exists)
        shutil.move(str(tmp_zip), str(p))

    return p