Skip to content

openavmkit.data

SalesUniversePair dataclass

SalesUniversePair(sales, universe)

A container for the sales and universe DataFrames, many functions operate on this data structure. This data structure is necessary because the sales and universe DataFrames are often used together and need to be passed around together. The sales represent transactions and any known data at the time of the transaction, while the universe represents the current state of all parcels. The sales dataframe specifically allows for duplicate primary parcel transaction keys, since an individual parcel may have sold multiple times. The universe dataframe forbids duplicate primary parcel keys.

Attributes:

Name Type Description
sales DataFrame

DataFrame containing sales data.

universe DataFrame

DataFrame containing universe (parcel) data.

copy

copy()

Create a copy of the SalesUniversePair object.

Returns:

Type Description
SalesUniversePair

A new SalesUniversePair object with copied DataFrames.

Source code in openavmkit/data.py
119
120
121
122
123
124
125
126
127
def copy(self):
    """Create a copy of the SalesUniversePair object.

    Returns
    -------
    SalesUniversePair
        A new SalesUniversePair object with copied DataFrames.
    """
    return SalesUniversePair(self.sales.copy(), self.universe.copy())

limit_sales_to_keys

limit_sales_to_keys(new_sale_keys)

Update the sales DataFrame to only those that match a key in new_sale_keys

Parameters:

Name Type Description Default
new_sale_keys list[str]

List of sale keys to filter to

required
Source code in openavmkit/data.py
152
153
154
155
156
157
158
159
160
161
162
163
164
def limit_sales_to_keys(self, new_sale_keys: list[str]):
    """
    Update the sales DataFrame to only those that match a key in `new_sale_keys`

    Parameters
    ----------
    new_sale_keys : list[str]
        List of sale keys to filter to
    """

    s = self.sales.copy()
    s = s[s["key_sale"].isin(new_sale_keys)]
    self.sales = s

set

set(key, value)

Set the sales or universe DataFrame.

Attributes:

Name Type Description
key str

Either "sales" or "universe".

value DataFrame

The new DataFrame to set for the specified key.

Raises:

Type Description
ValueError

If an invalid key is provided

Source code in openavmkit/data.py
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
def set(self, key: str, value: pd.DataFrame):
    """Set the sales or universe DataFrame.

    Attributes
    ----------
    key : str
        Either "sales" or "universe".
    value : pd.DataFrame
        The new DataFrame to set for the specified key.

    Raises
    ------
    ValueError
        If an invalid key is provided
    """
    if key == "sales":
        self.sales = value
    elif key == "universe":
        self.universe = value
    else:
        raise ValueError(f"Invalid key: {key}")

update_sales

update_sales(new_sales, allow_remove_rows)

Update the sales DataFrame with new information as an overlay without redundancy.

This function lets you push updates to "sales" while keeping it as an "overlay" that doesn't contain any redundant information.

  • First we note what fields were in sales last time.
  • Then we note what sales are in universe but were not in sales.
  • Finally, we determine the new fields generated in new_sales that are not in the previous sales or in the universe.
  • A modified version of df_sales is created with only two changes:
  • Reduced to the correct selection of keys.
  • Addition of the newly generated fields.

Parameters:

Name Type Description Default
new_sales DataFrame

New sales DataFrame with updates.

required
allow_remove_rows bool

If True, allows the update to remove rows from sales. If False, preserves all original rows.

required
Source code in openavmkit/data.py
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
def update_sales(self, new_sales: pd.DataFrame, allow_remove_rows: bool):
    """
    Update the sales DataFrame with new information as an overlay without redundancy.

    This function lets you push updates to "sales" while keeping it as an "overlay" that
    doesn't contain any redundant information.

    - First we note what fields were in sales last time.
    - Then we note what sales are in universe but were not in sales.
    - Finally, we determine the new fields generated in new_sales that are not in the
      previous sales or in the universe.
    - A modified version of df_sales is created with only two changes:
      - Reduced to the correct selection of keys.
      - Addition of the newly generated fields.

    Parameters
    ----------
    new_sales : pd.DataFrame
        New sales DataFrame with updates.
    allow_remove_rows : bool
        If True, allows the update to remove rows from sales. If False, preserves all
        original rows.
    """

    old_fields = self.sales.columns.values
    univ_fields = [
        field for field in self.universe.columns.values if field not in old_fields
    ]
    new_fields = [
        field
        for field in new_sales.columns.values
        if field not in old_fields and field not in univ_fields
    ]

    old_sales = self.sales.copy()
    return_keys = new_sales["key_sale"].values
    if not allow_remove_rows and len(return_keys) > len(old_sales):
        raise ValueError(
            "The new sales DataFrame contains more keys than the old sales DataFrame. update_sales() may only be used to shrink the dataframe or keep it the same size. Use set() if you intend to replace the sales dataframe."
        )

    if allow_remove_rows:
        old_sales = old_sales[old_sales["key_sale"].isin(return_keys)].reset_index(
            drop=True
        )
    reconciled = combine_dfs(
        old_sales,
        new_sales[["key_sale"] + new_fields].copy().reset_index(drop=True),
        index="key_sale",
    )
    self.sales = reconciled

enrich_df_streets

enrich_df_streets(df_in, settings, spacing=1.0, max_ray_length=25.0, network_buffer=500.0, verbose=False)

Enrich a GeoDataFrame with street network data.

This function enriches the input GeoDataFrame with street network data by calculating frontage, depth, distance to street, and many other related metrics, for every road vs. every parcel in the GeoDataFrame, using OpenStreetMap data.

WARNING: This function can be VERY computationally and memory intensive for large datasets and may take a long time to run.

We definitely need to work on its performance or make it easier to split into smaller chunks.

Parameters:

Name Type Description Default
df_in GeoDataFrame

Input GeoDataFrame containing parcels.

required
settings dict

Settings dictionary containing configuration for the enrichment.

required
spacing float

Spacing in meters for ray casting to calculate distances to streets. Default is 1.0.

1.0
max_ray_length float

Maximum length of rays to shoot for distance calculations, in meters. Default is 25.0.

25.0
network_buffer float

Buffer around the street network to consider for distance calculations, in meters. Default is 500.0.

500.0
verbose bool

If True, prints progress information. Default is False.

False

Returns:

Type Description
GeoDataFrame

Enriched GeoDataFrame with additional columns for street-related metrics.

Source code in openavmkit/data.py
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
def enrich_df_streets(
    df_in: gpd.GeoDataFrame,
    settings: dict,
    spacing: float = 1.0,  # in meters
    max_ray_length: float = 25.0,  # meters to shoot rays
    network_buffer: float = 500.0,  # buffer for street network
    verbose: bool = False,
) -> gpd.GeoDataFrame:
    """Enrich a GeoDataFrame with street network data.

    This function enriches the input GeoDataFrame with street network data by calculating
    frontage, depth, distance to street, and many other related metrics, for every road vs.
    every parcel in the GeoDataFrame, using OpenStreetMap data.

    WARNING: This function can be VERY computationally and memory intensive for large datasets
    and may take a long time to run.

    We definitely need to work on its performance or make it easier to split into smaller chunks.

    Parameters
    ----------
    df_in : gpd.GeoDataFrame
        Input GeoDataFrame containing parcels.
    settings : dict
        Settings dictionary containing configuration for the enrichment.
    spacing : float, optional
        Spacing in meters for ray casting to calculate distances to streets. Default is 1.0.
    max_ray_length : float, optional
        Maximum length of rays to shoot for distance calculations, in meters. Default is 25.0.
    network_buffer : float, optional
        Buffer around the street network to consider for distance calculations, in meters.
        Default is 500.0.
    verbose : bool, optional
        If True, prints progress information. Default is False.

    Returns
    -------
    gpd.GeoDataFrame
        Enriched GeoDataFrame with additional columns for street-related metrics.
    """
    e_streets = settings.get("data",{}).get("process", {}).get("enrich", {}).get("streets", {})
    do_streets = e_streets.get("enabled", False)

    if do_streets:
        df_out = _enrich_df_streets(
            df_in, settings, spacing, max_ray_length, network_buffer, verbose
        )

        # add somers unit land size normalization using frontage & depth
        df_out["land_area_somers_ft"] = get_size_in_somers_units_ft(
            df_out["frontage_ft_1"], df_out["depth_ft_1"]
        )
    else:
        df_out = df_in
        if verbose:
            print(f"Street enrichment disabled. To enable it, add `data.process.enrich.streets.enabled = true` to your settings file.")

    return df_out

enrich_sup_spatial_lag

enrich_sup_spatial_lag(sup, settings, verbose=False)

Enrich the sales and universe DataFrames with spatial lag features.

This function calculates "spatial lag", that is, the spatially-weighted average, of the sale price and other fields, based on nearest neighbors.

For sales, the spatial lag is calculated based on the training set of sales. For non-sale characteristics, the spatial lag is calculated based on the universe parcels.

Parameters:

Name Type Description Default
sup SalesUniversePair

SalesUniversePair containing sales and universe DataFrames.

required
settings dict

Settings dictionary.

required
verbose bool

If True, prints progress information.

False

Returns:

Type Description
SalesUniversePair

Enriched SalesUniversePair with spatial lag features.

Source code in openavmkit/data.py
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
def enrich_sup_spatial_lag(
    sup: SalesUniversePair, 
    settings: dict, 
    verbose: bool = False
) -> SalesUniversePair:
    """Enrich the sales and universe DataFrames with spatial lag features.

    This function calculates "spatial lag", that is, the spatially-weighted
    average, of the sale price and other fields, based on nearest neighbors.

    For sales, the spatial lag is calculated based on the training set of sales.
    For non-sale characteristics, the spatial lag is calculated based on the
    universe parcels.

    Parameters
    ----------
    sup : SalesUniversePair
        SalesUniversePair containing sales and universe DataFrames.
    settings : dict
        Settings dictionary.
    verbose : bool, optional
        If True, prints progress information.

    Returns
    -------
    SalesUniversePair
        Enriched SalesUniversePair with spatial lag features.
    """

    mg_ids = get_model_group_ids(settings)

    df_sales = sup.sales
    df_universe = sup.universe

    # For each model group, calculate its spatial lag surface(s)
    for mg in mg_ids:
        sup_mg = _enrich_sup_spatial_lag_for_model_group(
            sup,
            settings,
            mg,
            verbose
        )
        if sup_mg is None:
            continue
        # For each spatial lag surface, copy it back to the master SalesUniversePair
        sl_cols = [field for field in sup_mg.universe.columns if field.startswith("spatial_lag_")]
        for col in sl_cols:
            # Only fill in values that haven't been set already
            if col in sup_mg.sales:
                df_sales = fill_from_df(df_sales, sup_mg.sales, "key_sale", col)
            if col in sup_mg.universe:
                df_universe = fill_from_df(df_universe, sup_mg.universe, "key", col)

    sup.sales = df_sales
    sup.universe = df_universe

    return sup

enrich_time

enrich_time(df, time_formats, settings)

Enrich the DataFrame by converting specified time fields to datetime and deriving additional fields.

For each key in time_formats, converts the column to datetime. Then, if a field with the prefix "sale" exists, enriches the DataFrame with additional time fields (e.g., "sale_year", "sale_month", "sale_age_days").

Parameters:

Name Type Description Default
df DataFrame

Input DataFrame.

required
time_formats dict

Dictionary mapping field names to datetime formats.

required
settings dict

Settings dictionary.

required

Returns:

Type Description
DataFrame

DataFrame with enriched time fields.

Source code in openavmkit/data.py
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
def enrich_time(df: pd.DataFrame, time_formats: dict, settings: dict) -> pd.DataFrame:
    """
    Enrich the DataFrame by converting specified time fields to datetime and deriving additional fields.

    For each key in time_formats, converts the column to datetime. Then, if a field with
    the prefix "sale" exists, enriches the DataFrame with additional time fields (e.g.,
    "sale_year", "sale_month", "sale_age_days").

    Parameters
    ----------
    df : pandas.DataFrame
        Input DataFrame.
    time_formats : dict
        Dictionary mapping field names to datetime formats.
    settings : dict
        Settings dictionary.

    Returns
    -------
    pandas.DataFrame
        DataFrame with enriched time fields.
    """

    for key in time_formats:
        time_format = time_formats[key]
        if key in df:
            df[key] = pd.to_datetime(df[key], format=time_format, errors="coerce")

    for prefix in ["sale"]:
        do_enrich = False
        for col in df.columns.values:
            if f"{prefix}_" in col:
                do_enrich = True
                break
        if do_enrich:
            df = _enrich_time_field(
                df, prefix, add_year_month=True, add_year_quarter=True
            )
            if prefix == "sale":
                df = _enrich_sale_age_days(df, settings)

    return df

filter_df_by_date_range

filter_df_by_date_range(df, start_date, end_date)

Filter df to rows where 'sale_date' is between start_date and end_date (inclusive). - start_date/end_date may be 'YYYY-MM-DD' strings or date/datetime/Timestamp. - Time-of-day and time zones are ignored. - Rows with missing/unparseable 'sale_date' are dropped.

Source code in openavmkit/data.py
5344
5345
5346
5347
5348
5349
5350
5351
5352
5353
5354
5355
5356
5357
5358
5359
5360
5361
5362
5363
5364
5365
5366
5367
5368
5369
5370
5371
5372
5373
5374
5375
5376
5377
5378
5379
5380
def filter_df_by_date_range(df, start_date, end_date):
    """
    Filter df to rows where 'sale_date' is between start_date and end_date (inclusive).
    - start_date/end_date may be 'YYYY-MM-DD' strings or date/datetime/Timestamp.
    - Time-of-day and time zones are ignored.
    - Rows with missing/unparseable 'sale_date' are dropped.
    """
    import pandas as pd
    from datetime import date, datetime, timedelta
    from pandas.api.types import is_datetime64tz_dtype

    def _as_date(x):
        # If already a date (but not datetime), keep it
        if isinstance(x, date) and not isinstance(x, datetime):
            return x
        # Otherwise parse and take the calendar date
        return pd.to_datetime(x).date()

    start_d = _as_date(start_date)
    end_d   = _as_date(end_date)
    if start_d > end_d:
        raise ValueError("start_date cannot be after end_date.")

    # Coerce to datetime; tolerate bad/missing → NaT
    s = pd.to_datetime(df["sale_date"], errors="coerce")

    # Strip timezone info if present, preserving local wall time
    if isinstance(s.dtype, pd.DatetimeTZDtype):
        s = s.dt.tz_localize(None)

    # Build inclusive range using an exclusive upper bound
    start_ts = pd.Timestamp(start_d)                       # 00:00:00 on start day
    end_excl = pd.Timestamp(end_d) + pd.Timedelta(days=1)  # first moment after end day

    # NaT values compare as False and will be dropped
    mask = s.ge(start_ts) & s.lt(end_excl)
    return df.loc[mask].copy()

get_dtypes_from_settings

get_dtypes_from_settings(settings)

Generate a dictionary mapping fields to their designated data types based on settings.

Parameters:

Name Type Description Default
settings dict

Settings dictionary.

required

Returns:

Type Description
dict

Dictionary of field names to data type strings.

Source code in openavmkit/data.py
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
def get_dtypes_from_settings(settings: dict) -> dict:
    """
    Generate a dictionary mapping fields to their designated data types based on settings.

    Parameters
    ----------
    settings : dict
        Settings dictionary.

    Returns
    -------
    dict
        Dictionary of field names to data type strings.
    """

    cats = get_fields_categorical(settings, include_boolean=False)
    bools = get_fields_boolean(settings)
    nums = get_fields_numeric(settings, include_boolean=False)
    dtypes = {}
    for c in cats:
        dtypes[c] = "string"
    for b in bools:
        dtypes[b] = "bool"
    for n in nums:
        dtypes[n] = "Float64"
    return dtypes

get_field_classifications

get_field_classifications(settings)

Retrieve a mapping of field names to their classifications (land, improvement or other) as well as their types (numeric, categorical, or boolean).

Parameters:

Name Type Description Default
settings dict

Settings dictionary.

required

Returns:

Type Description
dict

Dictionary mapping field names to type and class.

Source code in openavmkit/data.py
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
def get_field_classifications(settings: dict) -> dict:
    """
    Retrieve a mapping of field names to their classifications (land, improvement or other)
    as well as their types (numeric, categorical, or boolean).

    Parameters
    ----------
    settings : dict
        Settings dictionary.

    Returns
    -------
    dict
        Dictionary mapping field names to type and class.
    """

    field_map = {}
    for ftype in ["land", "impr", "other"]:
        nums = get_fields_numeric(
            settings, df=None, include_boolean=False, types=[ftype]
        )
        cats = get_fields_categorical(
            settings, df=None, include_boolean=False, types=[ftype]
        )
        bools = get_fields_boolean(settings, df=None, types=[ftype])
        for field in nums:
            field_map[field] = {"type": ftype, "class": "numeric"}
        for field in cats:
            field_map[field] = {"type": ftype, "class": "categorical"}
        for field in bools:
            field_map[field] = {"type": ftype, "class": "boolean"}
    return field_map

get_hydrated_sales_from_sup

get_hydrated_sales_from_sup(sup)

Merge the sales and universe DataFrames to "hydrate" the sales data.

The sales data represents transactions and any known data at the time of the transaction, while the universe data represents the current state of all parcels. When we merge the two sets, the sales data overrides any existing data in the universe data. This is useful for creating a "hydrated" sales DataFrame that contains all the information available at the time of the sale (it is assumed that any difference between the current state of the parcel and the state at the time of the sale is accounted for in the sales data).

If the merged DataFrame contains a "geometry" column and the original sales did not, the result is converted to a GeoDataFrame.

Parameters:

Name Type Description Default
sup SalesUniversePair

SalesUniversePair containing sales and universe DataFrames.

required

Returns:

Type Description
DataFrame or GeoDataFrame

The merged (hydrated) sales DataFrame.

Source code in openavmkit/data.py
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
def get_hydrated_sales_from_sup(sup: SalesUniversePair):
    """
    Merge the sales and universe DataFrames to "hydrate" the sales data.

    The sales data represents transactions and any known data at the time of the transaction,
    while the universe data represents the current state of all parcels. When we merge the
    two sets, the sales data overrides any existing data in the universe data. This is useful
    for creating a "hydrated" sales DataFrame that contains all the information available at
    the time of the sale (it is assumed that any difference between the current state of the
    parcel and the state at the time of the sale is accounted for in the sales data).

    If the merged DataFrame contains a "geometry" column and the original sales did not,
    the result is converted to a GeoDataFrame.

    Parameters
    ----------
    sup : SalesUniversePair
        SalesUniversePair containing sales and universe DataFrames.

    Returns
    -------
    pd.DataFrame or gpd.GeoDataFrame
        The merged (hydrated) sales DataFrame.
    """

    df_sales = sup["sales"]
    df_univ = sup["universe"].copy()
    df_univ = df_univ[df_univ["key"].isin(df_sales["key"].values)].reset_index(
        drop=True
    )
    df_merged = merge_and_stomp_dfs(df_sales, df_univ, df2_stomps=False)

    if "geometry" in df_merged and "geometry" not in df_sales:
        # convert df_merged to geodataframe:
        df_merged = gpd.GeoDataFrame(df_merged, geometry="geometry")

    return df_merged

get_important_field

get_important_field(settings, field_name, df=None)

Retrieve the important field name for a given field alias from settings.

Parameters:

Name Type Description Default
settings dict

Settings dictionary.

required
field_name str

Identifier for the field.

required
df DataFrame

Optional DataFrame to check field existence.

None

Returns:

Type Description
str or None

The mapped field name if found, else None.

Source code in openavmkit/data.py
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
def get_important_field(
    settings: dict, field_name: str, df: pd.DataFrame = None
) -> str | None:
    """
    Retrieve the important field name for a given field alias from settings.

    Parameters
    ----------
    settings : dict
        Settings dictionary.
    field_name : str
        Identifier for the field.
    df : pandas.DataFrame, optional
        Optional DataFrame to check field existence.

    Returns
    -------
    str or None
        The mapped field name if found, else None.
    """

    imp = settings.get("field_classification", {}).get("important", {})
    other_name = imp.get("fields", {}).get(field_name, None)
    if df is not None:
        if other_name is not None and other_name in df:
            return other_name
        else:
            return None
    return other_name

get_important_fields

get_important_fields(settings, df=None)

Retrieve important field names from settings.

Parameters:

Name Type Description Default
settings dict

Settings dictionary.

required
df DataFrame

Optional DataFrame to filter fields.

None

Returns:

Type Description
list[str]

List of important field names.

Source code in openavmkit/data.py
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
def get_important_fields(settings: dict, df: pd.DataFrame = None) -> list[str]:
    """
    Retrieve important field names from settings.

    Parameters
    ----------
    settings : dict
        Settings dictionary.
    df : pandas.DataFrame, optional
        Optional DataFrame to filter fields.

    Returns
    -------
    list[str]
        List of important field names.
    """

    imp = settings.get("field_classification", {}).get("important", {})
    fields = imp.get("fields", {})
    list_fields = []
    if df is not None:
        for field in fields:
            other_name = fields[field]
            if other_name in df:
                list_fields.append(other_name)
    return list_fields

get_locations

get_locations(settings, df=None)

Retrieve location fields from settings. These are all the fields that are considered locations.

Parameters:

Name Type Description Default
settings dict

Settings dictionary.

required
df DataFrame

Optional DataFrame to filter available locations.

None

Returns:

Type Description
list[str]

List of location field names.

Source code in openavmkit/data.py
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
def get_locations(settings: dict, df: pd.DataFrame = None) -> list[str]:
    """
    Retrieve location fields from settings. These are all the fields that are considered locations.

    Parameters
    ----------
    settings : dict
        Settings dictionary.
    df : pandas.DataFrame, optional
        Optional DataFrame to filter available locations.

    Returns
    -------
    list[str]
        List of location field names.
    """

    locations = (
        settings.get("field_classification", {})
        .get("important", {})
        .get("locations", [])
    )
    if df is not None:
        locations = [loc for loc in locations if loc in df]
    return locations

get_report_locations

get_report_locations(settings, df=None)

Retrieve report location fields from settings.

These are location fields that will be used in report breakdowns, such as for ratio studies.

Parameters:

Name Type Description Default
settings dict

Settings dictionary.

required
df DataFrame

Optional DataFrame to filter available locations.

None

Returns:

Type Description
list[str]

List of report location field names.

Source code in openavmkit/data.py
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
def get_report_locations(settings: dict, df: pd.DataFrame = None) -> list[str]:
    """
    Retrieve report location fields from settings.

    These are location fields that will be used in report breakdowns, such as for ratio studies.

    Parameters
    ----------
    settings : dict
        Settings dictionary.
    df : pandas.DataFrame, optional
        Optional DataFrame to filter available locations.

    Returns
    -------
    list[str]
        List of report location field names.
    """

    locations = (
        settings.get("field_classification", {})
        .get("important", {})
        .get("report_locations", [])
    )
    if df is not None:
        locations = [loc for loc in locations if loc in df]
    return locations

get_sale_field

get_sale_field(settings, df=None)

Determine the appropriate sale price field ("sale_price" or "sale_price_time_adj") based on time adjustment settings.

Parameters:

Name Type Description Default
settings dict

Settings dictionary.

required
df DataFrame

Optional DataFrame to check field existence.

None

Returns:

Type Description
str

Field name to be used for sale price.

Source code in openavmkit/data.py
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
def get_sale_field(settings: dict, df: pd.DataFrame = None) -> str:
    """
    Determine the appropriate sale price field ("sale_price" or "sale_price_time_adj")
    based on time adjustment settings.

    Parameters
    ----------
    settings : dict
        Settings dictionary.
    df : pandas.DataFrame, optional
        Optional DataFrame to check field existence.

    Returns
    -------
    str
        Field name to be used for sale price.
    """

    ta = settings.get("data", {}).get("process", {}).get("time_adjustment", {})
    use = ta.get("use", True)
    if use:
        sale_field = "sale_price_time_adj"
    else:
        sale_field = "sale_price"
    if df is not None:
        if sale_field == "sale_price_time_adj" and "sale_price_time_adj" in df:
            return "sale_price_time_adj"
    return sale_field

get_train_test_keys

get_train_test_keys(df_in, settings)

Get the training and testing keys for the sales DataFrame.

This function gets the train/test keys for each model group defined in the settings, combines them into a single mask for the sales DataFrame, and returns the keys for training and testing as numpy arrays.

Parameters:

Name Type Description Default
df_in DataFrame

Input DataFrame containing sales data.

required
settings dict

Settings dictionary

required

Returns:

Type Description
tuple

A tuple containing two numpy arrays: keys_train and keys_test. - keys_train: keys for training set - keys_test: keys for testing set

Source code in openavmkit/data.py
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
def get_train_test_keys(df_in: pd.DataFrame, settings: dict):
    """Get the training and testing keys for the sales DataFrame.

    This function gets the train/test keys for each model group defined in the settings,
    combines them into a single mask for the sales DataFrame, and returns the keys for
    training and testing as numpy arrays.

    Parameters
    ----------
    df_in : pd.DataFrame
        Input DataFrame containing sales data.
    settings : dict
        Settings dictionary

    Returns
    -------
    tuple
        A tuple containing two numpy arrays: keys_train and keys_test.
        - keys_train: keys for training set
        - keys_test: keys for testing set
    """

    model_group_ids = get_model_group_ids(settings, df_in)

    # an empty mask the same size as the input DataFrame
    mask_train = pd.Series(np.zeros(len(df_in), dtype=bool), index=df_in.index)
    mask_test = pd.Series(np.zeros(len(df_in), dtype=bool), index=df_in.index)

    for model_group in model_group_ids:
        # Read the split keys for the model group
        test_keys, train_keys = _read_split_keys(model_group)

        # Filter the DataFrame based on the keys
        mask_test |= df_in["key_sale"].isin(test_keys)
        mask_train |= df_in["key_sale"].isin(train_keys)

    keys_test = df_in.loc[mask_test, "key_sale"].values
    keys_train = df_in.loc[mask_train, "key_sale"].values

    return keys_train, keys_test

get_train_test_masks

get_train_test_masks(df_in, settings)

Get the training and testing masks for the sales DataFrame.

This function gets the train/test masks for each model group defined in the settings, combines them into a single mask for the sales DataFrame, and returns the masks as pandas Series

Parameters:

Name Type Description Default
df_in DataFrame

Input DataFrame containing sales data.

required
settings dict

Settings dictionary

required

Returns:

Type Description
tuple

A tuple containing two pandas Series: mask_train and mask_test. - mask_train: boolean mask for training set - mask_test: boolean mask for testing set

Source code in openavmkit/data.py
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
def get_train_test_masks(df_in: pd.DataFrame, settings: dict):
    """Get the training and testing masks for the sales DataFrame.

    This function gets the train/test masks for each model group defined in the settings,
    combines them into a single mask for the sales DataFrame, and returns the masks as pandas Series

    Parameters
    ----------
    df_in : pd.DataFrame
        Input DataFrame containing sales data.
    settings : dict
        Settings dictionary

    Returns
    -------
    tuple
        A tuple containing two pandas Series: mask_train and mask_test.
        - mask_train: boolean mask for training set
        - mask_test: boolean mask for testing set
    """
    model_group_ids = get_model_group_ids(settings, df_in)

    # an empty mask the same size as the input DataFrame
    mask_train = pd.Series(np.zeros(len(df_in), dtype=bool), index=df_in.index)
    mask_test = pd.Series(np.zeros(len(df_in), dtype=bool), index=df_in.index)

    for model_group in model_group_ids:
        # Read the split keys for the model group
        test_keys, train_keys = _read_split_keys(model_group)

        # Filter the DataFrame based on the keys
        mask_test |= df_in["key_sale"].isin(test_keys)
        mask_train |= df_in["key_sale"].isin(train_keys)

    return mask_train, mask_test

get_vacant

get_vacant(df_in, settings, invert=False)

Filter the DataFrame based on the 'is_vacant' column.

Parameters:

Name Type Description Default
df_in DataFrame

Input DataFrame.

required
settings dict

Settings dictionary.

required
invert bool

If True, return non-vacant rows.

False

Returns:

Type Description
DataFrame

DataFrame filtered by the is_vacant flag.

Raises:

Type Description
ValueError

If the is_vacant column is not boolean.

Source code in openavmkit/data.py
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
def get_vacant(
    df_in: pd.DataFrame, settings: dict, invert: bool = False
) -> pd.DataFrame:
    """
    Filter the DataFrame based on the 'is_vacant' column.

    Parameters
    ----------
    df_in : pandas.DataFrame
        Input DataFrame.
    settings : dict
        Settings dictionary.
    invert : bool, optional
        If True, return non-vacant rows.

    Returns
    -------
    pandas.DataFrame
        DataFrame filtered by the `is_vacant` flag.

    Raises
    ------
    ValueError
        If the `is_vacant` column is not boolean.
    """

    df = df_in.copy()
    is_vacant_dtype = df["is_vacant"].dtype
    if is_vacant_dtype != bool:
        raise ValueError(
            f"The 'is_vacant' column must be a boolean type (found: {is_vacant_dtype})"
        )
    idx_vacant = df["is_vacant"].eq(True)
    if invert:
        idx_vacant = ~idx_vacant
    df_vacant = df[idx_vacant].copy()
    return df_vacant

get_vacant_sales

get_vacant_sales(df_in, settings, invert=False)

Filter the sales DataFrame to return only vacant (unimproved) sales.

Parameters:

Name Type Description Default
df_in DataFrame

Input DataFrame.

required
settings dict

Settings dictionary.

required
invert bool

If True, return non-vacant (improved) sales.

False

Returns:

Type Description
DataFrame

DataFrame with an added is_vacant column.

Source code in openavmkit/data.py
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
def get_vacant_sales(
    df_in: pd.DataFrame, settings: dict, invert: bool = False
) -> pd.DataFrame:
    """
    Filter the sales DataFrame to return only vacant (unimproved) sales.

    Parameters
    ----------
    df_in : pandas.DataFrame
        Input DataFrame.
    settings : dict
        Settings dictionary.
    invert : bool, optional
        If True, return non-vacant (improved) sales.

    Returns
    -------
    pandas.DataFrame
        DataFrame with an added `is_vacant` column.
    """

    df = df_in.copy()
    df = _boolify_column_in_df(df, "vacant_sale", "na_false")
    idx_vacant_sale = df["vacant_sale"].eq(True)
    if invert:
        idx_vacant_sale = ~idx_vacant_sale
    df_vacant_sales = df[idx_vacant_sale].copy()
    return df_vacant_sales

load_dataframe

load_dataframe(entry, settings, verbose=False, fields_cat=None, fields_bool=None, fields_num=None)

Load a DataFrame from a file based on instructions and perform calculations and type adjustments.

Source code in openavmkit/data.py
3595
3596
3597
3598
3599
3600
3601
3602
3603
3604
3605
3606
3607
3608
3609
3610
3611
3612
3613
3614
3615
3616
3617
3618
3619
3620
3621
3622
3623
3624
3625
3626
3627
3628
3629
3630
3631
3632
3633
3634
3635
3636
3637
3638
3639
3640
3641
3642
3643
3644
3645
3646
3647
3648
3649
3650
3651
3652
3653
3654
3655
3656
3657
3658
3659
3660
3661
3662
3663
3664
3665
3666
3667
3668
3669
3670
3671
3672
3673
3674
3675
3676
3677
3678
3679
3680
3681
3682
3683
3684
3685
3686
3687
3688
3689
3690
3691
3692
3693
3694
3695
3696
3697
3698
3699
3700
3701
3702
3703
3704
3705
3706
3707
3708
3709
3710
3711
3712
3713
3714
3715
3716
3717
3718
3719
3720
3721
3722
3723
3724
3725
3726
3727
3728
3729
3730
3731
3732
3733
3734
3735
3736
3737
3738
3739
3740
3741
3742
3743
3744
3745
3746
3747
3748
3749
3750
3751
3752
3753
3754
3755
3756
3757
3758
3759
3760
3761
3762
3763
3764
3765
3766
3767
3768
3769
3770
3771
3772
3773
3774
3775
3776
3777
3778
3779
3780
3781
3782
3783
3784
3785
3786
3787
3788
3789
3790
3791
3792
3793
3794
3795
3796
3797
3798
3799
3800
3801
3802
3803
3804
3805
3806
3807
3808
3809
3810
3811
3812
3813
3814
3815
3816
3817
3818
3819
3820
3821
3822
3823
3824
3825
3826
3827
3828
3829
3830
3831
3832
3833
3834
3835
3836
3837
3838
3839
3840
3841
3842
3843
3844
def load_dataframe(
    entry: dict,
    settings: dict,
    verbose: bool = False,
    fields_cat: list = None,
    fields_bool: list = None,
    fields_num: list = None,
) -> pd.DataFrame | None:
    """Load a DataFrame from a file based on instructions and perform calculations and
    type adjustments.
    """
    filename = entry.get("filename", "")
    entry_key = entry.get("key", "")
    if filename == "":
        return None
    filename = f"in/{filename}"
    ext = str(filename).split(".")[-1]

    column_names = _snoop_column_names(filename)

    e_load = entry.get("load", {})

    # Get all calc and tweak operations in order they appear
    operation_order = []
    for key in entry:
        if "calc" in key or "tweak" in key:  # Match any key containing calc or tweak
            op_type = "calc" if "calc" in key else "tweak"
            operation_order.append({"type": op_type, "operations": entry[key]})

    # Get all fields used in aggregation operations
    dupes = get_dupes(entry, None, "geometry" in column_names)

    agg = dupes.get("agg", {})

    agg_fields = []
    for agg_key in agg:
        agg_entry = agg[agg_key]
        agg_field = agg_entry.get("field", "")
        if agg_field != "" and agg_field not in agg_fields:
            agg_fields.append(agg_field)


    if verbose:
        print(f'Loading "{filename}"...')

    rename_map = {}
    dtype_map = {}
    extra_map = {}
    cols_to_load = []

    for rename_key in e_load:
        original = e_load[rename_key]
        original_key = None
        if isinstance(original, list):
            if len(original) > 0:
                original_key = original[0]
                cols_to_load += [original_key]
                rename_map[original_key] = rename_key
            if len(original) > 1:
                dtype_map[original_key] = original[1]
            if len(original) > 2:
                extra_map[rename_key] = original[2]
        elif isinstance(original, str):
            cols_to_load += [original]
            rename_map[original] = rename_key

    # Only include fields from calcs that exist in the source data
    fields_in_calc = []
    for operation in operation_order:
        if operation["type"] == "calc":
            fields_in_calc.extend(_crawl_calc_dict_for_fields(operation["operations"]))
    fields_in_calc = [f for f in fields_in_calc if f in column_names]
    cols_to_load += fields_in_calc

    # Only include fields from aggs that exist in the source data
    fields_in_agg = [f for f in agg_fields if f in column_names]
    cols_to_load += fields_in_agg

    cols_to_load = list(set(cols_to_load))

    is_geometry = False
    if "geometry" in column_names and "geometry" not in cols_to_load:
        cols_to_load.append("geometry")
        is_geometry = True
    if is_geometry:
        is_geometry = entry.get("geometry", is_geometry)

    if ext == "parquet":
        try:
            df = gpd.read_parquet(filename, columns=cols_to_load)
            if "geometry" in df:
                crs, geom_col = detect_crs_from_parquet(filename, "geometry")
                df = ensure_geometries(df, geom_col=geom_col, crs=crs)
        except ValueError:
            df = pd.read_parquet(filename, columns=cols_to_load)
    elif ext == "csv":
        csv_dtype_map = {}
        for key in dtype_map:
            dtype_value = dtype_map[key]
            if dtype_value == "datetime":
                dtype_value = "string"
            csv_dtype_map[key] = dtype_value
        df = pd.read_csv(filename, usecols=cols_to_load, dtype=csv_dtype_map)
    else:
        raise ValueError(f"Unsupported file extension: {ext}")

    # Enforce user's dtypes
    for col in df.columns:
        if col in dtype_map:
            target_dtype = dtype_map[col]
            if target_dtype == "bool" or target_dtype == "boolean":
                rename_key = rename_map.get(col, col)
                if rename_key in extra_map:
                    # if the user has specified a na_handling, we will manually boolify the column
                    na_handling = extra_map[rename_key]
                    df = _boolify_column_in_df(df, col, na_handling)
                else:
                    # otherwise, we use the exact dtype they specified with a warning and default to casting NA to false
                    warnings.warn(
                        f"Column '{col}' is being converted to boolean, but you didn't specify na_handling. All ambiguous values/NA's will be cast to false."
                    )
                    df[col] = df[col].astype(target_dtype)
                    df = _boolify_column_in_df(df, col, "na_false")
            elif target_dtype == "datetime":
                rename_key = rename_map.get(col, col)
                format_str = extra_map.get(rename_key)
                if rename_key in extra_map:
                    format_str = extra_map[rename_key]
                    try:
                        result = pd.to_datetime(df[col].astype(str), format=format_str)
                    except ValueError:
                        s = df[col].astype(str).replace({None: pd.NA, "None": pd.NA, "": pd.NA})
                        result = pd.to_datetime(s, format=format_str, errors="coerce", exact=True)
                    df[col] = result
                else:
                    warnings.warn(
                        f"Column '{col}' is being converted to datetime, but you didn't specify the format. Will attempt to auto-cast and coerce, which could be wrong!"
                    )
                    df[col] = pd.to_datetime(df[col].astype(str), errors="coerce")
            else:
                try:
                    df[col] = df[col].astype(target_dtype)
                except ValueError as e:
                    if target_dtype == "float":
                        # force lowercase since we've converting to float anyways
                        df[col] = df[col].astype(str).str.lower()

                        # check for and clear various known problematic strings
                        for badvalue in [' ', '<na>', 'none', 'null', 'na']:
                            df.loc[df[col].eq(badvalue), col] = None

                        warnings.warn(f"Column {col} had values that could not be cast to float, suppressed them to null")
                        df[col] = df[col].astype(target_dtype, errors="ignore")
                    else:
                        raise ValueError(f"Error casting column {col} to dtype {dtype_map[col]}: {e}")

    # Rename columns
    df = df.rename(columns=rename_map)

    # Perform operations in order they appear in settings
    for operation in operation_order:
        op_type = operation["type"]
        if op_type == "calc":
            df = perform_calculations(df, operation["operations"], rename_map)
        elif op_type == "tweak":
            df = perform_tweaks(df, operation["operations"], rename_map)

    if fields_cat is None:
        fields_cat = get_fields_categorical(settings, include_boolean=False)
    if fields_bool is None:
        fields_bool = get_fields_boolean(settings)
    if fields_num is None:
        fields_num = get_fields_numeric(settings, include_boolean=False)

    for col in df.columns:
        if col in fields_cat:
            if "date" not in col:
                df[col] = df[col].astype("string")
        elif col in fields_bool or df[col].dtype == "boolean":
            na_handling = None
            if col in extra_map:
                na_handling = extra_map[col]
            df = _boolify_column_in_df(df, col, na_handling)
        elif col in fields_num:
            mask_non_numeric = ~df[col].apply(lambda x: isinstance(x, (int, float)))
            if mask_non_numeric.sum() > 0:
                df.loc[mask_non_numeric, col] = np.nan
            df[col] = df[col].astype("Float64")

    date_fields = get_fields_date(settings, df)
    time_format_map = {}
    for xkey in extra_map:
        if xkey in date_fields:
            time_format_map[xkey] = extra_map[xkey]

    for dkey in date_fields:
        if dkey not in time_format_map:
            example_value = df[~df[dkey].isna()][dkey].iloc[0]
            dtype = df[dkey].dtype

            if not (
                pd.api.types.is_datetime64_any_dtype(df[dkey].dtype) or
                pd.api.types.is_datetime64_dtype(df[dkey].dtype)
            ):
                raise ValueError(
                    f"Date field '{dkey}' does not have a time format specified. Example value from {dkey}: \"{example_value}\""
                )

            s = df[dkey]
            if s.dt.tz is not None:
                s = s.dt.tz_localize(None)  # strips tz, keeps wall time
            # As strings 'YYYY-MM-DD'
            ymd = s.dt.strftime('%Y-%m-%d')
            df[dkey] = pd.to_datetime(ymd, format="%Y-%m-%d", errors="coerce")

    df = enrich_time(df, time_format_map, settings)

    dupes = get_dupes(entry, df, is_geometry)

    # If it's a sales dataframe, and we're not deduplicating on key_sale, something is probably wrong:
    if "key_sale" in df.columns.values:
        subset = dupes.get("subset", [])
        if dupes is not None and "key_sale" not in subset:
            warnings.warn(
                f"df '{entry_key}' contains field 'key_sale', indicating it is likely a sales dataframe. However, it's de-dupe subset is {subset}, which does not contain 'key_sale'. This could result in improper de-duplication of sales transactions."
            )

    df = _handle_duplicated_rows(df, dupes)

    if is_geometry:
        gdf: gpd.GeoDataFrame = gpd.GeoDataFrame(df, geometry="geometry", crs=df.crs)

        pre_len = len(gdf)
        gdf = clean_geometry(gdf, ensure_polygon=True)
        post_len = len(gdf)

        perc_len = (pre_len-post_len)/pre_len
        if perc_len >= 0.25:
            warnings.warn(f"Dropped {perc_len:.0%} of rows from dataframe \"{entry_key}\" due to invalid/null geometry. If you don't care about geometry for this dataframe and want to retain all rows, then set '\"geometry\": false' in settings under this dataframe's 'data.load' entry")

        df = gdf

    drop = entry.get("drop", [])
    if len(drop) > 0:
        df = df.drop(columns=drop, errors="ignore")

    if verbose:
        print(f"--> rows = {len(df)}")

    return df

process_data

process_data(dataframes, settings, verbose=False)

Process raw dataframes according to settings and return a SalesUniversePair.

Parameters:

Name Type Description Default
dataframes dict[str, DataFrame]

Dictionary mapping keys to DataFrames.

required
settings dict

Settings dictionary.

required
verbose bool

If True, prints progress information.

False

Returns:

Type Description
SalesUniversePair

A SalesUniversePair containing processed sales and universe data.

Raises:

Type Description
ValueError

If required merge instructions or columns are missing.

Source code in openavmkit/data.py
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
def process_data(
    dataframes: dict[str, pd.DataFrame], settings: dict, verbose: bool = False
) -> SalesUniversePair:
    """
    Process raw dataframes according to settings and return a SalesUniversePair.

    Parameters
    ----------
    dataframes : dict[str, pd.DataFrame]
        Dictionary mapping keys to DataFrames.
    settings : dict
        Settings dictionary.
    verbose : bool, optional
        If True, prints progress information.

    Returns
    -------
    SalesUniversePair
        A SalesUniversePair containing processed sales and universe data.

    Raises
    ------
    ValueError
        If required merge instructions or columns are missing.
    """

    s_data = settings.get("data", {})
    s_process = s_data.get("process", {})
    s_merge = s_process.get("merge", {})

    merge_univ: list | None = s_merge.get("universe", None)
    merge_sales: list | None = s_merge.get("sales", None)

    if merge_univ is None:
        raise ValueError(
            'No "universe" merge instructions found. data.process.merge must have exactly two keys: "universe", and "sales"'
        )
    if merge_sales is None:
        raise ValueError(
            'No "sales" merge instructions found. data.process.merge must have exactly two keys: "universe", and "sales"'
        )

    df_univ = _merge_dict_of_dfs(dataframes, merge_univ, settings, required_key="key")
    df_sales = _merge_dict_of_dfs(
        dataframes, merge_sales, settings, required_key="key_sale"
    )

    if "valid_sale" not in df_sales:
        raise ValueError("The 'valid_sale' column is required in the sales data. If you don't have anything to go on, you can just create that column and fill it with an assumption (i.e. all are valid), but ideally you should look for some kind of validation criteria for your sales.")
    if "vacant_sale" not in df_sales:
        raise ValueError("The 'vacant_sale' column is required in the sales data. If you don't have anything to go on, you can just create that column and fill it with an assumption (i.e. match vacant status in the universe), but ideally you should look for some kind of sales metadata on this.")
    # Print number and percentage of valid sales
    valid_count = df_sales["valid_sale"].sum()
    total_count = len(df_sales)
    valid_percent = (valid_count / total_count * 100) if total_count > 0 else 0
    print(f"Valid sales: {valid_count} ({valid_percent:.1f}% of {total_count} total)")
    df_sales = df_sales[df_sales["valid_sale"].eq(True)].copy().reset_index(drop=True)

    sup: SalesUniversePair = SalesUniversePair(universe=df_univ, sales=df_sales)

    sup = _enrich_data(
        sup, s_process.get("enrich", {}), dataframes, settings, verbose=verbose
    )

    dupe_univ: dict | None = s_process.get("dupes", {}).get("universe", None)
    dupe_sales: dict | None = s_process.get("dupes", {}).get("sales", None)
    if dupe_univ:
        sup.set(
            "universe",
            _handle_duplicated_rows(sup.universe, dupe_univ, verbose=verbose),
        )
    if dupe_sales:
        sup.set(
            "sales", _handle_duplicated_rows(sup.sales, dupe_sales, verbose=verbose)
        )

    return sup

write_gpkg

write_gpkg(df, path)

Write data to a geopackage file.

Parameters:

Name Type Description Default
df DataFrame

Data to be written

required
path str

File path for saving the geopackage.

required
Source code in openavmkit/data.py
5226
5227
5228
5229
5230
5231
5232
5233
5234
5235
5236
5237
5238
5239
5240
5241
5242
5243
5244
5245
5246
5247
5248
5249
5250
5251
def write_gpkg(df, path):
    """
    Write data to a geopackage file.

    Parameters
    ----------
    df : pd.DataFrame
        Data to be written
    path : str
        File path for saving the geopackage.
    """
    if not path.endswith(".gpkg"):
        raise ValueError("Path must end with .gpkg!")

    # If it has a geometry column, write as GeoParquet
    if "geometry" in df.columns:
        # Ensure it's a GeoDataFrame
        gdf = df if isinstance(df, gpd.GeoDataFrame) else gpd.GeoDataFrame(df, geometry="geometry", crs=getattr(df, "crs", None))

        # You MUST have a CRS for it to be recorded in metadata
        if gdf.crs is None:
            raise ValueError(f"{path}: geometry has no CRS. Set it (e.g., gdf = gdf.set_crs('EPSG:4326')) before writing.")

        gdf.to_file(path, driver='GPKG', layer='name', mode='w')
    else:
        raise ValueError("cannot write to gpkg without geometry")

write_parquet

write_parquet(df, path)

Write data to a parquet file.

Parameters:

Name Type Description Default
df DataFrame

Data to be written

required
path str

File path for saving the parquet.

required
Source code in openavmkit/data.py
5195
5196
5197
5198
5199
5200
5201
5202
5203
5204
5205
5206
5207
5208
5209
5210
5211
5212
5213
5214
5215
5216
5217
5218
5219
5220
5221
5222
5223
def write_parquet(df, path):
    """
    Write data to a parquet file.

    Parameters
    ----------
    df : pd.DataFrame
        Data to be written
    path : str
        File path for saving the parquet.
    """

    if not path.endswith(".parquet"):
        raise ValueError("Path must end with .parquet!")

    # If it has a geometry column, write as GeoParquet
    if "geometry" in df.columns:
        # Ensure it's a GeoDataFrame
        gdf = df if isinstance(df, gpd.GeoDataFrame) else gpd.GeoDataFrame(df, geometry="geometry", crs=getattr(df, "crs", None))

        # You MUST have a CRS for it to be recorded in metadata
        if gdf.crs is None:
            raise ValueError(f"{path}: geometry has no CRS. Set it (e.g., gdf = gdf.set_crs('EPSG:4326')) before writing.")

        # GeoPandas writes WKB + GeoParquet metadata (including CRS)
        gdf.to_parquet(path, engine="pyarrow", index=False)
    else:
        # Regular table
        df.to_parquet(path, engine="pyarrow", index=False)

write_shapefile

write_shapefile(df, path)

Write data to a shapefile file.

Parameters:

Name Type Description Default
df DataFrame

Data to be written

required
path str

File path for saving the shapefile.

required
Source code in openavmkit/data.py
5254
5255
5256
5257
5258
5259
5260
5261
5262
5263
5264
5265
5266
5267
5268
5269
5270
5271
5272
5273
5274
5275
5276
5277
5278
5279
5280
def write_shapefile(df, path):
    """
    Write data to a shapefile file.

    Parameters
    ----------
    df : pd.DataFrame
        Data to be written
    path : str
        File path for saving the shapefile.
    """

    if not path.endswith(".shp"):
        raise ValueError("Path must end with .shp!")

    # If it has a geometry column, write as GeoParquet
    if "geometry" in df.columns:
        # Ensure it's a GeoDataFrame
        gdf = df if isinstance(df, gpd.GeoDataFrame) else gpd.GeoDataFrame(df, geometry="geometry", crs=getattr(df, "crs", None))

        # You MUST have a CRS for it to be recorded in metadata
        if gdf.crs is None:
            raise ValueError(f"{path}: geometry has no CRS. Set it (e.g., gdf = gdf.set_crs('EPSG:4326')) before writing.")

        gdf.to_file(path)
    else:
        raise ValueError("cannot write to gpkg without geometry")

write_zipped_shapefile

write_zipped_shapefile(df, path)

Write a zipped ESRI Shapefile. Produces a single {name}.shp.zip with the shapefile parts (name.shp, .shx, .dbf, .prj, .cpg, etc.) at the ZIP root.

Parameters:

Name Type Description Default
df DataFrame or GeoDataFrame

Data to be written (must include a 'geometry' column and a CRS).

required
path str

Destination path ending with '.shp.zip' (e.g., 'out/roads.shp.zip').

required

Returns:

Type Description
Path

Path to the created .shp.zip

Source code in openavmkit/data.py
5283
5284
5285
5286
5287
5288
5289
5290
5291
5292
5293
5294
5295
5296
5297
5298
5299
5300
5301
5302
5303
5304
5305
5306
5307
5308
5309
5310
5311
5312
5313
5314
5315
5316
5317
5318
5319
5320
5321
5322
5323
5324
5325
5326
5327
5328
5329
5330
5331
5332
5333
5334
5335
5336
5337
5338
5339
5340
5341
def write_zipped_shapefile(df, path: str) -> Path:
    """
    Write a zipped ESRI Shapefile. Produces a single {name}.shp.zip with the
    shapefile parts (name.shp, .shx, .dbf, .prj, .cpg, etc.) at the ZIP root.

    Parameters
    ----------
    df : pd.DataFrame or gpd.GeoDataFrame
        Data to be written (must include a 'geometry' column and a CRS).
    path : str
        Destination path ending with '.shp.zip' (e.g., 'out/roads.shp.zip').

    Returns
    -------
    pathlib.Path
        Path to the created .shp.zip
    """
    p = Path(path)

    # Require ".shp.zip" exactly, per your spec
    if p.suffixes[-2:] != [".shp", ".zip"]:
        raise ValueError("Path must end with .shp.zip (e.g., 'out/roads.shp.zip').")

    # layer name (strip .zip then .shp)
    layer = Path(p.stem).stem
    if not layer:
        raise ValueError("Could not derive layer name from path.")

    # Make sure parent directory exists
    p.parent.mkdir(parents=True, exist_ok=True)

    # Write shapefile into a temp dir, then zip and move atomically
    with tempfile.TemporaryDirectory() as tmpdir_str:
        tmpdir = Path(tmpdir_str)
        shp_path = tmpdir / f"{layer}.shp"

        # Reuse your existing function (validates geometry + CRS)
        write_shapefile(df, str(shp_path))

        # Common shapefile sidecar extensions we may need to include if present
        sidecars = {
            ".shp", ".shx", ".dbf", ".prj", ".cpg",
            ".qix", ".sbn", ".sbx", ".fbn", ".fbx",
            ".ain", ".aih", ".ixs", ".mxs", ".atx",
            ".xml", ".qpj"
        }

        tmp_zip = tmpdir / f"{layer}.shp.zip"
        with zipfile.ZipFile(tmp_zip, "w", compression=zipfile.ZIP_DEFLATED) as zf:
            for ext in sorted(sidecars):
                f = tmpdir / f"{layer}{ext}"
                if f.exists():
                    # Store with just the filename at the ZIP root
                    zf.write(f, arcname=f.name)

        # Move the finished ZIP to the destination (overwrites if exists)
        shutil.move(str(tmp_zip), str(p))

    return p