Skip to content

openavmkit.utilities.cache

check_cache

check_cache(filename, signature, filetype)

Check if the cached data exists and if the signatures match

Parameters:

Name Type Description Default
filename str

The filename of the cached data

required
signature str

The signature of the cached data

required
filetype str

The type of file ("dict", "str", "pickle", or "df")

required

Returns:

Type Description
bool

True if the file exists AND the signatures match, False otherwise.

Source code in openavmkit/utilities/cache.py
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
def check_cache(filename: str, signature: dict | str, filetype: str):
    """Check if the cached data exists and if the signatures match

    Parameters
    ----------
    filename : str
        The filename of the cached data
    signature : str
        The signature of the cached data
    filetype : str
        The type of file ("dict", "str", "pickle", or "df")

    Returns
    -------
    bool
        True if the file exists AND the signatures match, False otherwise.
    """
    ext = _get_extension(filetype)
    path = f"cache/{filename}"
    match = _match_signature(path, signature)
    if match:
        path_exists = os.path.exists(f"{path}.{ext}")
        return path_exists
    return False

clear_cache

clear_cache(filename, filetype)

Clear the specified cache data

Parameters:

Name Type Description Default
filename str

The filename of the cached data to clear

required
filetype str

The type of file ("dict", "str", "pickle", or "df")

required
Source code in openavmkit/utilities/cache.py
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
def clear_cache(filename: str, filetype: str):
    """Clear the specified cache data

    Parameters
    ----------
    filename : str
        The filename of the cached data to clear
    filetype : str
        The type of file ("dict", "str", "pickle", or "df")
    """
    ext = _get_extension(filetype)
    path = f"cache/{filename}"
    if os.path.exists(f"{path}.{ext}"):
        os.remove(f"{path}.{ext}")
    if os.path.exists(f"{path}.cols{ext}"):
        os.remove(f"{path}..cols.{ext}")
    if os.path.exists(f"{path}.rows{ext}"):
        os.remove(f"{path}.rows{ext}")
    if os.path.exists(f"{path}.signature.json"):
        os.remove(f"{path}.signature.json")
    if os.path.exists(f"{path}.cols.signature.json"):
        os.remove(f"{path}.cols.signature.json")
    if os.path.exists(f"{path}.rows.signature.json"):
        os.remove(f"{path}.rows.signature.json")

get_cached_df

get_cached_df(df, filename, key='key', extra_signature=None, only_signature=None)

Reconstruct a DataFrame from cached row and column diffs on disk.

This function looks for cache fragments named <filename>.cols (column-level diffs) and <filename>.rows (new row fragments), validates them against a signature derived from the base DataFrame and optional extra_signature, and merges them to produce an updated DataFrame.

Parameters:

Name Type Description Default
df DataFrame

The base DataFrame to which column diffs will be applied. This should correspond to the state of the data when the cache fragments were generated.

required
filename str

Base file name (no extension) for the cache files. The function will check for <filename>.cols and <filename>.rows.

required
key str

Name of the primary-key column to align rows between the base DataFrame and cached fragments.

'key'
extra_signature dict or str

Additional signature to include when computing the cache signature via _get_df_signature.

None
only_signature dict or str

If provided, this signature is used directly (instead of recomputing from df + extra_signature) when validating cache fragments.

None

Returns:

Type Description
DataFrame or GeoDataFrame or None
  • A merged DataFrame incorporating all column updates and appended rows from the cache. The returned type matches df (GeoDataFrame if geometry diffs were cached).
  • Returns None if neither <filename>.cols nor <filename>.rows exist or pass the signature check.
Notes
  • Column diffs are read first: existing rows in df are filtered to those present in the columns fragment, old columns dropped, and new columns merged in.
  • Row fragments are then appended to the merged DataFrame.
  • The function preserves the data type of the primary key column.
  • If both column and row fragments are absent or invalid, the function returns None.
Source code in openavmkit/utilities/cache.py
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
def get_cached_df(
    df: pd.DataFrame,
    filename: str,
    key: str = "key",
    extra_signature: dict | str = None,
    only_signature: dict | str = None,
) -> pd.DataFrame | gpd.GeoDataFrame | None:
    """
    Reconstruct a DataFrame from cached row and column diffs on disk.

    This function looks for cache fragments named
    ``<filename>.cols`` (column-level diffs) and
    ``<filename>.rows`` (new row fragments), validates them against a
    signature derived from the base DataFrame and optional ``extra_signature``,
    and merges them to produce an updated DataFrame.

    Parameters
    ----------
    df : pandas.DataFrame
        The base DataFrame to which column diffs will be applied.  This
        should correspond to the state of the data when the cache
        fragments were generated.
    filename : str
        Base file name (no extension) for the cache files.  The function
        will check for ``<filename>.cols`` and ``<filename>.rows``.
    key : str, default 'key'
        Name of the primary-key column to align rows between the base DataFrame
        and cached fragments.
    extra_signature : dict or str, optional
        Additional signature to include when computing the cache signature via
        ``_get_df_signature``.
    only_signature : dict or str, optional
        If provided, this signature is used directly (instead of recomputing
        from ``df`` + ``extra_signature``) when validating cache fragments.

    Returns
    -------
    pandas.DataFrame or geopandas.GeoDataFrame or None

        - A merged DataFrame incorporating all column updates and appended rows
          from the cache.  The returned type matches ``df`` (GeoDataFrame if
          geometry diffs were cached).
        - Returns ``None`` if neither ``<filename>.cols`` nor
          ``<filename>.rows`` exist or pass the signature check.

    Notes
    -----
    - Column diffs are read first: existing rows in ``df`` are filtered to
      those present in the columns fragment, old columns dropped, and new
      columns merged in.
    - Row fragments are then appended to the merged DataFrame.
    - The function preserves the data type of the primary key column.
    - If both column and row fragments are absent or invalid, the function
      returns ``None``.
    """

    if only_signature is not None:
        signature = only_signature
    else:
        signature = _get_df_signature(df, extra_signature)

    filename_rows = f"{filename}.rows"
    filename_cols = f"{filename}.cols"

    df_merged = None

    if check_cache(filename_cols, signature, "df"):
        # Merge new columns
        df_diff = read_cache(filename_cols, "df")
        if not df_diff is None and not df_diff.empty:
            df_diff[key] = df_diff[key].astype(df[key].dtype)

            cols_to_replace = [c for c in df_diff.columns if c != key]

            # Drop the columns that are going to be replaced
            df_base = df.drop(columns=cols_to_replace, errors="ignore")

            # Drop the keys that are not in the diff
            df_base = df_base[df_base["key"].isin(df_diff[key])].copy()

            df_merged = df_base.merge(df_diff, how="left", on=key)

            if isinstance(df_diff, gpd.GeoDataFrame):
                df_merged = gpd.GeoDataFrame(df_merged, geometry="geometry")
                df_merged = ensure_geometries(df_merged, "geometry", df_diff.crs)

    if check_cache(filename_rows, signature, "df"):
        # Add new rows
        df_diff = read_cache(filename_rows, "df")
        if not df_diff is None and not df_diff.empty:
            df_diff[key] = df_diff[key].astype(df[key].dtype)

            if df_merged is None:
                df_merged = df.copy()

            # add the new rows onto the end of the DataFrame
            df_merged = pd.concat([df_merged, df_diff], ignore_index=True)

    return df_merged

read_cache

read_cache(filename, filetype)

Reads cached data from disk

Parameters:

Name Type Description Default
filename str

The filename of the data to load (relative to the cache/ directory)

required
filetype str

The type of file ("dict", "str", "pickle", or "df")

required

Returns:

Type Description
dict | str | object | DataFrame | GeoDataFrame | None

The cached data

Source code in openavmkit/utilities/cache.py
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
def read_cache(filename: str, filetype: str):
    """Reads cached data from disk

    Parameters
    ----------
    filename : str
        The filename of the data to load (relative to the cache/ directory)
    filetype : str
        The type of file ("dict", "str", "pickle", or "df")

    Returns
    -------
    dict | str | object | pd.DataFrame | gpd.GeoDataFrame | None
        The cached data
    """
    extension = _get_extension(filetype)
    path = f"cache/{filename}.{extension}"
    if os.path.exists(path):
        if filetype == "dict":
            with open(path, "r") as file:
                return json.load(file)
        elif filetype == "str":
            with open(path, "r") as file:
                return file.read()
        elif filetype == "pickle":
            with open(path, "rb") as file:
                return pickle.load(file)
        elif filetype == "df":
            try:
                df = gpd.read_parquet(path)
                if "geometry" in df:
                    df = gpd.GeoDataFrame(df, geometry="geometry")
                    ensure_geometries(df, "geometry", df.crs)
            except ValueError:
                df = pd.read_parquet(path)
            return df
    return None

write_cache

write_cache(filename, payload, signature, filetype)

Caches the data to disk

Parameters:

Name Type Description Default
filename str

The filename to associate with this data

required
payload dict | str | DataFrame | GeoDataFrame | bytes

The data to cache to disk

required
signature dict | str

A "signature" value that, if changed in the future, indicates the cache has been broken. That is, when you go to load cached data, if the signature has changed from the one written to disk, the system knows not to trust the cached data and will generate the data from scratch instead. If the signatures match, the system will trust the cached data and skip the potentially expensive generation step.

required
filetype str

The type of file ("dict", "str", "pickle", or "df")

required
Source code in openavmkit/utilities/cache.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
def write_cache(
    filename: str,
    payload: dict | str | pd.DataFrame | gpd.GeoDataFrame | bytes,
    signature: dict | str,
    filetype: str,
):
    """Caches the data to disk

    Parameters
    ----------
    filename : str
        The filename to associate with this data
    payload : dict | str | pd.DataFrame | gpd.GeoDataFrame | bytes
        The data to cache to disk
    signature : dict | str
        A "signature" value that, if changed in the future, indicates the cache has been broken.
        That is, when you go to load cached data, if the signature has changed from the one written to disk, the system
        knows not to trust the cached data and will generate the data from scratch instead. If the signatures match,
        the system will trust the cached data and skip the potentially expensive generation step.
    filetype : str
        The type of file ("dict", "str", "pickle", or "df")
    """
    extension = _get_extension(filetype)
    path = f"cache/{filename}.{extension}"
    base_path = os.path.dirname(path)
    os.makedirs(base_path, exist_ok=True)
    if filetype == "dict":
        with open(path, "w") as file:
            json.dump(payload, file)
    elif filetype == "str":
        with open(path, "w") as file:
            file.write(payload)
    elif filetype == "pickle":
        with open(path, "wb") as file:
            pickle.dump(payload, file)
    elif filetype == "df":
        if isinstance(payload, pd.DataFrame):
            if isinstance(payload, gpd.GeoDataFrame):
                payload.to_parquet(path, engine="pyarrow")
            else:
                payload.to_parquet(path)
        else:
            raise TypeError("Payload must be a DataFrame for df type.")

    if type(signature) is dict:
        sig_ext = "json"
    elif type(signature) is str:
        sig_ext = "txt"
    else:
        raise TypeError(
            f"Unsupported type for signature value: {type(signature)} sig = {signature}"
        )

    signature_path = f"cache/{filename}.signature.{sig_ext}"
    with open(signature_path, "w") as file:
        if sig_ext == "json":
            json.dump(signature, file)
        else:
            file.write(signature)

write_cached_df

write_cached_df(df_orig, df_new, filename, key='key', extra_signature=None)

Update an on-disk cache with row- or column-level differences between two pandas DataFrames and return the fully reconstructed, cached DataFrame.

The function compares df_new against df_orig using the primary key column key. Any newly added rows, deleted rows, or modified columns are written to cache files <filename>.rows and/or <filename>.cols via :pyfunc:write_cache. A deterministic signature (optionally augmented by extra_signature) is stored alongside each cache fragment to protect against cache poisoning. The routine then calls :pyfunc:get_cached_df to rebuild the complete DataFrame and verifies that the round-trip result is equal to df_new (allowing for NaN equality and primary-key re-ordering).

Parameters:

Name Type Description Default
df_orig DataFrame

The baseline DataFrame previously loaded from cache (or computed earlier in the session). Must contain the key column.

required
df_new DataFrame

The candidate DataFrame whose content should be cached.

required
filename str

Base file name (no extension) used when writing cache fragments. Two files may be created:

  • <filename>.cols – modified columns for existing rows
  • <filename>.rows – entirely new rows
required
key str

Name of the primary-key column that uniquely identifies each row.

``"key"``
extra_signature dict or str

Additional entropy to include in the cache signature. Use this when the same data structure can vary by external configuration (e.g., feature flags or environment).

None

Returns:

Type Description
DataFrame or None
  • If no columns/rows differ between df_new and df_orig, the original DataFrame df_orig is returned immediately and no cache files are written.
  • Otherwise, the function returns the DataFrame reconstructed from the cache (identical in content to df_new). The return type is never None unless the underlying helpers are altered.

Raises:

Type Description
ValueError

If the DataFrame reconstructed from cache does not match df_new after a diff has been written. This guards against cache corruption or mismatched signatures.

Notes
  • Column comparison treats NaN as equal to NaN.
  • Only the minimum required data (changed columns and/or new rows) is written, which keeps cache artifacts small even for wide tables.
  • Deleted rows are not written to disk; instead they are omitted when df_new is reconstructed.

Examples:

>>> df_baseline = pd.DataFrame({"key": [1, 2], "a": [10, 20]})
>>> df_update   = pd.DataFrame({"key": [1, 2, 3], "a": [10, 99, 30]})
>>> df_cached   = write_cached_df(df_baseline, df_update, "mycache")
>>> df_cached.equals(df_update)
True
Source code in openavmkit/utilities/cache.py
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
def write_cached_df(
    df_orig: pd.DataFrame,
    df_new: pd.DataFrame,
    filename: str,
    key: str = "key",
    extra_signature: dict | str = None,
) -> pd.DataFrame | None:
    """Update an on-disk cache with row- or column-level differences between two
    ``pandas`` DataFrames and return the fully reconstructed, cached DataFrame.

    The function compares *df_new* against *df_orig* using the primary key
    column *key*.  Any newly added rows, deleted rows, or modified columns are
    written to cache files ``<filename>.rows`` and/or ``<filename>.cols`` via
    :pyfunc:`write_cache`.  A deterministic signature (optionally augmented by
    *extra_signature*) is stored alongside each cache fragment to protect
    against cache poisoning.  The routine then calls
    :pyfunc:`get_cached_df` to rebuild the complete DataFrame and verifies that
    the round-trip result is equal to *df_new* (allowing for NaN equality and
    primary-key re-ordering).

    Parameters
    ----------
    df_orig : pandas.DataFrame
        The **baseline** DataFrame previously loaded from cache (or computed
        earlier in the session).  Must contain the *key* column.
    df_new : pandas.DataFrame
        The **candidate** DataFrame whose content should be cached.
    filename : str
        Base file name (no extension) used when writing cache fragments.
        Two files may be created:

        * ``<filename>.cols`` – modified columns for existing rows
        * ``<filename>.rows`` – entirely new rows
    key : str, default ``"key"``
        Name of the primary-key column that uniquely identifies each row.
    extra_signature : dict or str, optional
        Additional entropy to include in the cache signature.  Use this when
        the same data structure can vary by external configuration (e.g.,
        feature flags or environment).

    Returns
    -------
    pandas.DataFrame or None
        * If **no** columns/rows differ between *df_new* and *df_orig*, the
          original DataFrame *df_orig* is returned immediately and **no** cache
          files are written.
        * Otherwise, the function returns the DataFrame reconstructed from the
          cache (identical in content to *df_new*).  The return type is never
          *None* unless the underlying helpers are altered.

    Raises
    ------
    ValueError
        If the DataFrame reconstructed from cache does **not** match *df_new*
        after a diff has been written.  This guards against cache corruption or
        mismatched signatures.

    Notes
    -----
    * Column comparison treats *NaN* as equal to *NaN*.
    * Only the minimum required data (changed columns and/or new rows) is
      written, which keeps cache artifacts small even for wide tables.
    * Deleted rows are **not** written to disk; instead they are omitted when
      *df_new* is reconstructed.

    Examples
    --------
    >>> df_baseline = pd.DataFrame({"key": [1, 2], "a": [10, 20]})
    >>> df_update   = pd.DataFrame({"key": [1, 2, 3], "a": [10, 99, 30]})
    >>> df_cached   = write_cached_df(df_baseline, df_update, "mycache")
    >>> df_cached.equals(df_update)
    True
    """

    df_new = df_new.copy()

    orig_cols = set(df_orig.columns)
    new_cols = [c for c in df_new.columns if c not in orig_cols]
    common = [c for c in df_new.columns if c in orig_cols]

    orig_rows_by_key = df_orig[key].values
    new_rows_by_key = df_new[key].values
    if len(orig_rows_by_key) > len(new_rows_by_key):
        added_rows = [key for key in new_rows_by_key if key not in orig_rows_by_key]
        orig_set = set(orig_rows_by_key)
        new_set = set(new_rows_by_key)
        added_rows = []
        deleted_rows = list(orig_set - new_set)
    elif len(orig_rows_by_key) < len(new_rows_by_key):
        orig_set = set(orig_rows_by_key)
        new_set = set(new_rows_by_key)
        added_rows = list(new_set - orig_set)
        deleted_rows = []
    else:
        added_rows = []
        deleted_rows = []

    modified = []
    for c in common:
        col_new = df_new[c].reset_index(drop=True)
        col_orig = df_orig[c].reset_index(drop=True)

        is_different = False
        if len(col_new) == len(col_orig):
            values_equal = col_new.values == col_orig.values
            na_equal = col_new.isna() & col_orig.isna()

            count_na_equal = na_equal.sum()
            count_values_equal = values_equal.sum()

            count_to_match = len(col_new)

            all_equal = (
                count_na_equal == count_to_match
                and count_values_equal == count_to_match
            )
            if not all_equal:
                is_different = True
        else:
            is_different = True

        if is_different:
            modified.append(c)
            continue

    changed_cols = new_cols + modified
    if not changed_cols:
        # nothing new or modified → no cache update needed
        return df_orig

    the_cols = changed_cols
    if key not in the_cols:
        the_cols = [key] + changed_cols

    df_diff_cols = df_new[the_cols].copy()
    df_diff_cols = df_diff_cols[~df_diff_cols[key].isin(added_rows)]
    signature = _get_df_signature(df_orig, extra_signature)
    df_type = "df"
    write_cache(f"{filename}.cols", df_diff_cols, signature, df_type)
    if len(deleted_rows) > 0:
        df_new = df_new[~df_new[key].isin(deleted_rows)].copy()

    if len(added_rows) > 0:
        df_diff_rows = df_new[df_new[key].isin(added_rows)].copy()
        if not df_diff_rows.empty:
            write_cache(f"{filename}.rows", df_diff_rows, signature, df_type)

    df_cached = get_cached_df(df_orig, filename, key, extra_signature)

    are_equal = dfs_are_equal(df_new, df_cached, allow_weak=True, primary_key=key)
    if not are_equal:
        raise ValueError(f"Cached DataFrame does not match the original DataFrame.")

    return df_cached