# -------------------------------------------------------------
# Copyright (c) Henry Spatial Analysis. All rights reserved.
# Licensed under the MIT License. See LICENSE in project root.
# -------------------------------------------------------------
"""
Taxonomy crosswalk between OSM tags and Overture Maps taxonomy.
Loads four CSV files that map OSM tag key/value pairs and Overture
(L0, L1) categories to a unified ``shared_label``, plus per-label
match radii and top-level OSM-key-to-Overture-L0 mappings.
"""
from __future__ import annotations
from importlib import resources
import numpy as np
import pandas as pd
WILDCARD = "*"
# Bit flags for the Overture L0 categories. Used by
# ``compute_osm_l0_bits`` / ``compute_overture_l0_bits`` for
# fast vectorised broad-match checks in type scoring. Backing
# dtype is uint16, leaving headroom past the 12 bits used here.
L0_BIT: dict[str, int] = {
"arts_and_entertainment": 1,
"food_and_drink": 2,
"health_care": 4,
"shopping": 8,
"sports_and_recreation": 16,
"services_and_business": 32,
"lifestyle_services": 64,
"community_and_government": 128,
"cultural_and_historic": 256,
"education": 512,
"travel_and_transportation": 1024,
"lodging": 2048,
}
# -----------------------------------------------------------------
# CSV loaders
# -----------------------------------------------------------------
def _load_csv(filename: str) -> pd.DataFrame:
"""Load a CSV from the package data directory."""
csv_path = (
resources.files("openpois.conflation.data")
.joinpath(filename)
)
with resources.as_file(csv_path) as p:
return pd.read_csv(
p, dtype = str, keep_default_na = False,
)
[docs]
def load_osm_crosswalk() -> pd.DataFrame:
"""Load the OSM taxonomy crosswalk CSV.
Columns: ``osm_key, osm_value, shared_label``.
"""
return _load_csv("taxonomy_crosswalk_openstreetmap.csv")
[docs]
def load_overture_crosswalk() -> pd.DataFrame:
"""Load the Overture Maps taxonomy crosswalk CSV.
Columns: ``overture_l0, overture_l1, overture_l2,
shared_label``.
"""
return _load_csv("taxonomy_crosswalk_overture_maps.csv")
[docs]
def load_match_radii() -> pd.DataFrame:
"""Load the match-radii CSV.
Columns: ``shared_label, match_radius_m``.
"""
return _load_csv("match_radii.csv")
[docs]
def load_top_level_matches() -> pd.DataFrame:
"""Load the top-level OSM-key ↔ Overture-L0 CSV.
Columns: ``overture_l0, osm_key``.
"""
return _load_csv("top_level_matches.csv")
# -----------------------------------------------------------------
# Shared-label assignment — OSM
# -----------------------------------------------------------------
def _build_osm_label_lookups(
osm_crosswalk: pd.DataFrame,
) -> tuple[dict[str, pd.Series], dict[str, str]]:
"""Build per-key label lookups and wildcard fallbacks."""
specific = osm_crosswalk[
osm_crosswalk["osm_value"] != WILDCARD
].copy()
wildcards_df = osm_crosswalk[
osm_crosswalk["osm_value"] == WILDCARD
]
lookups: dict[str, pd.Series] = {}
for key, grp in specific.groupby("osm_key"):
lkp = grp.set_index("osm_value")["shared_label"]
lookups[key] = lkp
wildcards: dict[str, str] = {}
for _, row in wildcards_df.iterrows():
wildcards[row["osm_key"]] = row["shared_label"]
return lookups, wildcards
[docs]
def assign_osm_shared_label(
gdf: pd.DataFrame,
osm_crosswalk: pd.DataFrame,
match_radii: pd.DataFrame,
filter_keys: list[str],
default_radius_m: float = 100.0,
return_all: bool = False,
) -> (
tuple[np.ndarray, np.ndarray]
| tuple[list[list[str]], list[list[float]]]
):
"""
Assign shared taxonomy labels to each OSM POI.
Two modes, selected by ``return_all``:
* ``return_all=False`` (default) — produces a single label per row.
Uses ``filter_keys`` in priority order (first non-null match wins),
falling back to the per-key wildcard row if the specific value
is not in the crosswalk. Returns ``(label, radius)`` as object /
float64 ndarrays of length ``len(gdf)``. Unmatched rows have
``label == ""`` and ``radius == default_radius_m``. This is the
path used by the conflation pipeline and snapshot model
application.
* ``return_all=True`` — produces zero or more labels per row,
used by the model-training pipeline which duplicates
observations across every applicable taxonomy category.
Pass 1 (specific matches): for every ``filter_key``, every row
whose value for that key is in the crosswalk receives that
label. A row can collect multiple specific labels.
Pass 2 (wildcard fallback): applied *only* to rows that had
zero specific matches in pass 1. Within such a row, wildcard
keys are walked in the order they appear in the crosswalk CSV
(``_build_osm_label_lookups`` populates the ``wildcards`` dict
via ``iterrows``, preserving CSV order via dict insertion
order); the first wildcard key with a non-null/non-empty value
wins and is the only wildcard label assigned.
Returns ``(labels_per_row, radii_per_row)`` as lists of lists;
each inner list has ``>=0`` entries and is de-duplicated (if
two keys map to the same label, it appears once).
"""
n = len(gdf)
lookups, wildcards = _build_osm_label_lookups(osm_crosswalk)
radii_dict: dict[str, float] = {}
for _, row in match_radii.iterrows():
radii_dict[row["shared_label"]] = float(
row["match_radius_m"]
)
if not return_all:
label = np.full(n, "", dtype = object)
radius = np.full(n, default_radius_m, dtype = np.float64)
matched = np.zeros(n, dtype = bool)
for key in filter_keys:
if key not in gdf.columns:
continue
col = gdf[key]
has_value = col.notna() & (col != "") & ~matched
if not has_value.any():
continue
eligible_idx = np.where(has_value)[0]
eligible_vals = col.to_numpy()[eligible_idx]
lkp = lookups.get(key)
if lkp is not None:
mapped_label = (
pd.Series(eligible_vals, dtype = str).map(lkp)
)
found = mapped_label.notna().to_numpy()
pos = eligible_idx[found]
labels_found = mapped_label.to_numpy()[found]
label[pos] = labels_found
radius[pos] = np.array(
[
radii_dict.get(lb, default_radius_m)
for lb in labels_found
]
)
matched[pos] = True
not_found = eligible_idx[~found]
else:
not_found = eligible_idx
wildcard_label = wildcards.get(key)
if wildcard_label is not None and len(not_found) > 0:
label[not_found] = wildcard_label
radius[not_found] = radii_dict.get(
wildcard_label, default_radius_m,
)
matched[not_found] = True
return label, radius
# --- return_all=True path ------------------------------------
specific_frames: list[pd.DataFrame] = []
for key in filter_keys:
if key not in gdf.columns:
continue
lkp = lookups.get(key)
if lkp is None:
continue
col = gdf[key]
mask = col.notna() & (col != "")
if not mask.any():
continue
eligible_idx = np.where(mask)[0]
eligible_vals = col.to_numpy()[eligible_idx]
mapped = pd.Series(eligible_vals, dtype = str).map(lkp)
hit = mapped.notna().to_numpy()
if not hit.any():
continue
specific_frames.append(
pd.DataFrame(
{
"row_idx": eligible_idx[hit],
"label": mapped.to_numpy()[hit],
}
)
)
rows_with_specific = np.zeros(n, dtype = bool)
if specific_frames:
specific_df = pd.concat(specific_frames, ignore_index = True)
rows_with_specific[specific_df["row_idx"].to_numpy()] = True
else:
specific_df = pd.DataFrame(
{"row_idx": pd.Series(dtype = np.int64),
"label": pd.Series(dtype = object)},
)
# Pass 2: one wildcard per row at most, in CSV order.
wildcard_frames: list[pd.DataFrame] = []
wildcard_assigned = np.zeros(n, dtype = bool)
for key, wildcard_label in wildcards.items():
if key not in gdf.columns:
continue
col = gdf[key]
mask = (
col.notna()
& (col != "")
& ~rows_with_specific
& ~wildcard_assigned
)
if not mask.any():
continue
eligible_idx = np.where(mask)[0]
wildcard_frames.append(
pd.DataFrame(
{
"row_idx": eligible_idx,
"label": np.full(
len(eligible_idx), wildcard_label, dtype = object,
),
}
)
)
wildcard_assigned[eligible_idx] = True
if wildcard_frames:
wildcard_df = pd.concat(wildcard_frames, ignore_index = True)
else:
wildcard_df = pd.DataFrame(
{"row_idx": pd.Series(dtype = np.int64),
"label": pd.Series(dtype = object)},
)
long_df = pd.concat([specific_df, wildcard_df], ignore_index = True)
if len(long_df) == 0:
empty_labels: list[list[str]] = [[] for _ in range(n)]
empty_radii: list[list[float]] = [[] for _ in range(n)]
return empty_labels, empty_radii
long_df = long_df.drop_duplicates(subset = ["row_idx", "label"])
long_df["radius"] = (
long_df["label"]
.map(radii_dict)
.fillna(default_radius_m)
.astype(np.float64)
)
grouped = long_df.groupby("row_idx").agg(
labels = ("label", list),
radii = ("radius", list),
)
labels_by_row = grouped["labels"].to_dict()
radii_by_row = grouped["radii"].to_dict()
labels_per_row = [labels_by_row.get(i, []) for i in range(n)]
radii_per_row = [radii_by_row.get(i, []) for i in range(n)]
return labels_per_row, radii_per_row
# -----------------------------------------------------------------
# Shared-label assignment — Overture
# -----------------------------------------------------------------
[docs]
def assign_overture_shared_label(
gdf: pd.DataFrame,
overture_crosswalk: pd.DataFrame,
match_radii: pd.DataFrame,
default_radius_m: float = 100.0,
) -> tuple[np.ndarray, np.ndarray]:
"""
Assign a ``shared_label`` and ``match_radius_m`` to each
Overture POI using a 4-tier cascade from most to least specific.
Tiers (applied in order, each only to unmatched rows):
1. **(L0, L1, L2)** — crosswalk rows with all three populated.
2. **(L0, L2)** — L1 empty in crosswalk; matches any L1.
3. **(L0, L1)** — L2 empty in crosswalk; catch-all for an L1.
4. **L0-only** — both L1 and L2 empty in crosswalk.
Backward-compatible: if the GeoDataFrame has no
``taxonomy_l2`` column, tiers 1-2 produce no matches and
behaviour falls back to the old (L0, L1) + L0 logic.
Returns:
(shared_label ndarray of object, match_radius_m ndarray of
float)
"""
n = len(gdf)
label = np.full(n, "", dtype = object)
radius = np.full(n, default_radius_m, dtype = np.float64)
matched = np.zeros(n, dtype = bool)
cw = overture_crosswalk.copy()
has_l1 = cw["overture_l1"] != ""
has_l2 = cw["overture_l2"] != ""
# Build radius dict
radii_dict: dict[str, float] = {}
for _, row in match_radii.iterrows():
radii_dict[row["shared_label"]] = float(
row["match_radius_m"]
)
# -- Build lookup tables for each tier --------------------------
# Tier 1: (L0, L1, L2) — all three populated
t1 = cw[has_l1 & has_l2].copy()
t1["_key"] = (
t1["overture_l0"] + "|"
+ t1["overture_l1"] + "|"
+ t1["overture_l2"]
)
t1_lkp = (
t1.drop_duplicates("_key")
.set_index("_key")["shared_label"]
)
# Tier 2: (L0, L2) — L1 empty, L2 populated
t2 = cw[~has_l1 & has_l2].copy()
t2["_key"] = t2["overture_l0"] + "|" + t2["overture_l2"]
t2_lkp = (
t2.drop_duplicates("_key")
.set_index("_key")["shared_label"]
)
# Tier 3: (L0, L1) — L1 populated, L2 empty
t3 = cw[has_l1 & ~has_l2].copy()
t3["_key"] = t3["overture_l0"] + "|" + t3["overture_l1"]
t3_lkp = (
t3.drop_duplicates("_key")
.set_index("_key")["shared_label"]
)
# Tier 4: L0-only — both L1 and L2 empty
t4 = cw[~has_l1 & ~has_l2].copy()
t4_lkp = (
t4.drop_duplicates("overture_l0")
.set_index("overture_l0")["shared_label"]
)
# -- Extract columns from the data ------------------------------
def _col(name: str) -> pd.Series:
if name in gdf.columns:
return gdf[name].fillna("").astype(str)
return pd.Series("", index = gdf.index)
l0 = _col("taxonomy_l0")
l1 = _col("taxonomy_l1")
l2 = _col("taxonomy_l2")
# -- Helper to apply a tier ------------------------------------
def _apply_tier(
keys: pd.Series,
lkp: pd.Series,
mask: np.ndarray,
) -> None:
if not mask.any() or lkp.empty:
return
mapped = keys[mask].map(lkp)
hit = mapped.notna().to_numpy()
idx = np.where(mask)[0][hit]
labels = mapped.to_numpy()[hit]
label[idx] = labels
radius[idx] = np.array(
[radii_dict.get(lb, default_radius_m) for lb in labels]
)
matched[idx] = True
# -- Apply tiers in order --------------------------------------
# Tier 1: (L0, L1, L2)
_apply_tier(
l0 + "|" + l1 + "|" + l2,
t1_lkp,
~matched & (l0 != ""),
)
# Tier 2: (L0, L2) — ignores L1 in the data
_apply_tier(
l0 + "|" + l2,
t2_lkp,
~matched & (l2 != ""),
)
# Tier 3: (L0, L1) — catch-all for an L1 group
_apply_tier(
l0 + "|" + l1,
t3_lkp,
~matched & (l1 != ""),
)
# Tier 4: L0-only
_apply_tier(
l0,
t4_lkp,
~matched & (l0 != ""),
)
return label, radius
# -----------------------------------------------------------------
# L0 bitmask helpers (for type scoring)
# -----------------------------------------------------------------
[docs]
def compute_osm_l0_bits(
gdf: pd.DataFrame,
top_level_matches: pd.DataFrame,
) -> np.ndarray:
"""
For each OSM POI, compute a uint16 bitmask encoding which
Overture L0 categories it broadly matches.
A non-null value in an OSM tag key (e.g. ``amenity``) sets the
bit(s) for every L0 linked to that key via *top_level_matches*.
For example, ``amenity`` maps to both ``arts_and_entertainment``
(bit 1) and ``food_and_drink`` (bit 2), so any POI with a
non-null ``amenity`` value gets ``1 | 2 = 3``.
"""
# Build osm_key -> combined bit value
key_bits: dict[str, int] = {}
for _, row in top_level_matches.iterrows():
osm_key = row["osm_key"]
l0 = row["overture_l0"]
bit = L0_BIT.get(l0, 0)
key_bits[osm_key] = key_bits.get(osm_key, 0) | bit
bits = np.zeros(len(gdf), dtype = np.uint16)
for osm_key, bval in key_bits.items():
if osm_key in gdf.columns:
has_val = gdf[osm_key].notna() & (
gdf[osm_key] != ""
)
bits[has_val] |= bval
return bits
[docs]
def compute_overture_l0_bits(
l0_array: np.ndarray,
) -> np.ndarray:
"""
For each Overture POI, compute a uint16 bitmask from its
``taxonomy_l0`` value. Each POI has at most one L0 category,
so a single bit is set.
"""
bits = np.zeros(len(l0_array), dtype = np.uint16)
for l0, bval in L0_BIT.items():
mask = l0_array == l0
bits[mask] = bval
return bits