Coverage for acspsuedo / source / shpfile.py: 98%
236 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-11 16:02 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-11 16:02 +0000
1"""
2Shapefile handler for user-defined geometries.
4**NOTE**: We try our best to fit as many geometries as possible. However,
5the Census Bureau has not maintained a uniform naming criteria throughout
6the years for either the TIGER shapefile or Cartographic Boundary databases,
7so requests for geometric information may not be satisfied.
9If requests are not satisfied, we encourage users to peruse the map
10documentation here: https://www2.census.gov/geo/tiger/.
12See also the addendum that follows in this module for extra information.
13"""
15# ADDENDUM #1 (version 0.2.1)
16# We add deterministic rules for some geographic components, but not
17# for all. This is for two reasons:
18# 1. The Bureau has not maintained a uniform naming criteria throughout
19# the years (e.g. 2012 bucks the predicted congressional district
20# session, and geometries for the 117th congressional district are not
21# avaiable because of the Census' previous methodological design that
22# steered away from collecting congressional district boundaries for
23# the session that aligns with the Decennial Census; cf.
24# https://www.census.gov/programs-surveys/geography/guidance/geo-areas/congressional-dist.html).
25# 2. For some components, the Bureau has stored shapefiles in such a way
26# as to obfuscate a general URL locator rule. However, this is not
27# generally the case for more recent years which have become much more
28# standardized (post-2010 for TIGER shapefiles, or post-2014/15 Census
29# GeoDatabases, to be sure).
30# It is for these reasons that a general rule is preferred.
31#
32# In any case, we implement a broad "net" (so to speak) and throw a warning
33# for any stragglers and appropriately name these considerations when the
34# user runs queries.
36import os
37import re
38import shutil
39import warnings
40import typing as t
41from pathlib import Path
42from logging import getLogger
44import geopandas as gpd
45import requests
47from acspsuedo.source.low.exceptions import APIException
48from acspsuedo.source.shpfile_fmt import GEO_SPEC_METADATA
49from acspsuedo.fips import STATE_FIPS
52logger = getLogger(__name__)
56class ShpfileException(APIException):
57 """Custom exception class for TIGER Shapefile extraction-related errors."""
58 pass
60class ShpfileFormatterException(APIException):
61 """Custom exception class for TIGER Shapefile formatting-related errors."""
62 pass
65class ShpfileWarning(UserWarning):
66 """
67 Custom warning class for TIGER Shapefile-related warnings.
69 This will be used as per the module doc-string's guidance.
70 """
71 pass
75drop_digits_re = re.compile(r"\d+")
76"""
77Regex pattern for dropping digits.
78"""
80drop_alphaletters_re = re.compile(r"[a-zA-Z]")
81"""
82Regex pattern for non-numeric, alphabetic characters.
83"""
86class ShpFileHandler:
87 """
88 Class for handling geometric extraction via TIGER shapefiles
89 and/or Census GeoDatabases.
90 """
92 def __init__(
93 self,
94 auto_cache: bool = True,
95 cache_path: t.Optional[t.Union[str, Path]] = None,
96 track_updated_cache: bool = True
97 ) -> None:
98 """
99 Initialization for :py:class:`acspsuedo.source.shpfile.ShpFileHandler`.
101 Parameters
102 ----------
103 auto_cache
104 Boolean; default True.
106 Indicate whether or not to locally cache TIGER shapefiles.
108 cache_path
109 If `auto_cache` is True, indicate the caching folder in which TIGER
110 shapefiles should be stored.
112 Default `~/cache/acspsuedo/TIGER_shapefiles/`.
114 track_updated_cache
115 Boolean; default True.
117 If the cache folder is updated, move all shapefiles from the previous
118 cache folder to the updated cache folder.
119 """
121 self._auto_cache = auto_cache
122 self._track_updated_cache = track_updated_cache
124 if cache_path is None:
125 cache_path = Path.home() / 'cache' / 'acspsuedo' / 'TIGER_shapefiles'
126 self._cache_path = cache_path
127 else:
128 self._cache_path = Path(cache_path)
130 # Internally track cached files
131 self._cached_files = []
134 @property
135 def auto_cache(self):
136 """
137 Boolean indicating the state of locally caching fetched
138 TIGER shapefiles. Default `True`.
139 """
140 return self._auto_cache
142 @auto_cache.setter
143 def auto_cache(self, new_state: bool):
144 self._auto_cache = new_state
146 @property
147 def track_updated_cache(self):
148 """
149 Boolean indicating the whether or not we should move all files
150 from the previous cache folder to the new cache folder. Default
151 `True`.
153 **NOTE**: With this implementation, it is assumed that the previous
154 cached folder may be relevant beyond the immediate use within this
155 overall architecture (i.e. to cache TIGER shapefiles that can be
156 reached with this interface). For instance, users may want to cache
157 TIGER shapefiles that they themselves have manually downloaded.
159 Thus, we would be moving any of the existing cached files *without*
160 deleting the previous cache folder.
161 """
162 return self._track_updated_cache
164 @track_updated_cache.setter
165 def track_updated_cache(self, new_state: bool):
166 self._track_updated_cache = new_state
168 @property
169 def cache_path(self):
170 """
171 If `auto_cache` is True, this attribute specifies the local
172 folder containing cached TIGER shapefiles.
174 Default `~/cache/acspsuedo/TIGER_shapefiles/`.
175 """
176 return self._cache_path
178 @cache_path.setter
179 def cache_path(self, new_cache_path: t.Union[Path, str]):
180 if self._track_updated_cache:
181 self.__move_files_new_cache(new_cache_path)
183 self._cache_path = Path(new_cache_path)
185 @cache_path.deleter
186 def cache_path(self):
187 raise AttributeError("Cannot delete reference to the cache path.")
191 def _tiger_url_fmtter_2008_2009(self, year: int, **geographic_specifiers: t.Any):
192 for_scope, scope, _, outer = ShpFileHandler._tiger_init(year, **geographic_specifiers)
193 base_url = f'https://www2.census.gov/geo/tiger/TIGER{year}/'
195 # For special scopes, which require state identifiers
196 if scope in ['BG', 'COUSUB', 'PLACE', 'TRACT', 'SLDL', 'SLDU', 'UNSD'] \
197 or scope.startswith( ('CD', 'PUMA') ) \
198 or outer == 'state':
200 statefp = str(geographic_specifiers.get('state', ''))
201 if not statefp:
202 raise ShpfileFormatterException(
203 f"Geometries at the '{for_scope}' scope take a 'state' outer-level "
204 f"point of reference for extracting TIGER shapefile geometries at this "
205 f"scope. Missing a state FIPS code."
206 )
207 state = {v: k for k, v in STATE_FIPS.items()}.get(statefp, '')
209 base_url += f'{statefp}_{state.replace(' ', '_')}/'
210 path = f'tl_{year}_{statefp}_{scope.lower()}'
212 if not scope.startswith( ('CD', 'PUMA') ):
213 path += '00'
215 path += '.zip'
217 else:
218 # Hmm...
219 path = f'tl_{year}_us_{scope.lower()}.zip'
221 return base_url, path
224 def _tiger_url_fmtter_pre_2008_and_2010(self, year: int, **geographic_specifiers: t.Any):
225 _, scope, folder, outer = ShpFileHandler._tiger_init(year, **geographic_specifiers)
227 if outer == 'state':
228 outer = geographic_specifiers.get('state', '')
230 nested = year
231 if scope.startswith( ('CD',) ):
232 nested = drop_alphaletters_re.sub('', scope)
234 suffix = str(year)[-2:]
235 if scope.startswith( ('CD', 'PUMA', 'ZCTA', 'UAC') ):
236 suffix = ''
238 base_url = f'https://www2.census.gov/geo/tiger/TIGER2010/{folder}/{nested}/'
239 path = f'tl_2010_{outer}_{scope.lower()}{suffix}.zip'
241 return base_url, path
244 def _tiger_url_fmtter_post_2010(self, year: int, **geographic_specifiers: t.Any):
245 for_scope, scope, folder, outer = ShpFileHandler._tiger_init(year, **geographic_specifiers)
247 if outer == 'state':
248 outer = geographic_specifiers.get('state', '')
249 if not outer:
250 raise ShpfileFormatterException(
251 f"Geometries at the '{for_scope}' scope take a 'state' outer-level "
252 f"point of reference for extracting TIGER shapefile geometries at this "
253 f"scope. Missing a state FIPS code."
254 )
256 base_url = f'https://www2.census.gov/geo/tiger/TIGER{year}/{folder}/'
257 path = f'tl_{year}_{outer}_{scope.lower()}.zip'
259 return base_url, path
261 def _tiger_url_fmtter(self, year: int, **geographic_specifiers) -> t.Tuple[str, str]:
262 if 2008 <= year <= 2009:
263 return self._tiger_url_fmtter_2008_2009(year, **geographic_specifiers)
264 elif year == 2010 or year < 2008:
265 return self._tiger_url_fmtter_pre_2008_and_2010(year, **geographic_specifiers)
266 else:
267 return self._tiger_url_fmtter_post_2010(year, **geographic_specifiers)
270 def fetch_tiger_shpfile(self, year, **geographic_specifiers) -> t.Optional[gpd.GeoDataFrame]:
271 """
272 Fetch the appropriate TIGER shapefile for the supplied geographic specifiers
273 and return a formated :py:class:`geopandas.GeoDataFrame` containing identifier
274 and geometric information.
276 **NOTE**: We try our best to fit as many geometries as possible. However,
277 the Census Bureau has not maintained a uniform naming criteria throughout
278 the years for either the TIGER shapefile or Cartographic Boundary databases,
279 so requests for geometric information may not be satisfied.
281 If requests are not satisfied, we encourage users to peruse the map
282 documentation here: https://www2.census.gov/geo/tiger/.
284 Returns
285 -------
286 If the appropriate TIGER shapefile is found, returns a :py:class:`geopandas.GeoDataFrame`
287 instance.
288 """
290 if (gdf := self._fetch_tiger_shpfile(year, **geographic_specifiers)) is None:
291 return
293 return gdf
296 def _fetch_tiger_shpfile(self, year: int, **geographic_specifiers: t.Any) -> t.Optional[gpd.GeoDataFrame]:
297 """
298 The actual underlying for running an attempt to the TIGER shapefile,
299 as suggested by our approximation of the map data naming convention.
300 """
302 gdf = self._cache_fetch_tiger_shpfile(year, **geographic_specifiers)
304 if gdf is None:
305 return
307 gdf = self._tiger_shpfile_fmtter(gdf, year, **geographic_specifiers)
308 return gdf
310 def _tiger_shpfile_fmtter(self, gdf: gpd.GeoDataFrame, year: int, **geographic_specifiers) -> gpd.GeoDataFrame:
311 """
312 Formatter for fetched TIGER shapefiles.
313 """
314 for_scope, _, _, _ = ShpFileHandler._tiger_init(year, **geographic_specifiers)
316 # Create a year column
317 gdf['YEAR'] = year
319 # Keep identifier and geometric info
320 gdf.columns = [drop_digits_re.sub('', col) for col in gdf.columns]
322 id_cols = GEO_SPEC_METADATA[for_scope][3]
323 if not isinstance(id_cols, list):
324 id_cols = [id_cols]
326 id_cols = [col for col in [*id_cols, 'GEOID', 'YEAR'] if col in list(gdf.columns)]
328 gdf_cols = [*id_cols, 'geometry']
329 gdf = gdf[gdf_cols].copy()
331 # Rename GEOID to cohere with the GEO_ID in the Census queried data
332 gdf.rename(columns = {'GEOID': 'GEO_ID'}, inplace = True)
334 # Sort by
335 if 'GEO_ID' in gdf.columns:
336 gdf.sort_values(by = 'GEO_ID', ignore_index=True, inplace=True)
337 else:
338 gdf.sort_values(by = id_cols, ignore_index=True, inplace = True)
340 return gdf
342 def _cache_fetch_tiger_shpfile(self, year: int, **geographic_specifiers):
343 """
344 The actual underlying for fetching TIGER shapefiles and (if specified)
345 caching them locally within the specified cache folder.
346 """
347 base_url, path = self._tiger_url_fmtter(year, **geographic_specifiers)
349 try:
350 # Check if caching is enabled. Then, check if the shapefile has already been cached.
351 if self._auto_cache:
352 file_path = f"{self._cache_path}/{path.removesuffix('.zip')}.shp"
353 if Path(file_path).exists():
354 gdf = gpd.read_file(file_path)
355 return gdf
357 content = _fetch_shpfile(f'{base_url}{path}')
358 gdf = gpd.read_file(content)
360 if self._auto_cache:
361 self._cache_init()
362 gdf.to_file(
363 filename = file_path,
364 index = False
365 )
366 self._cached_files.append(path.removesuffix('.zip'))
368 return gdf
370 except ShpfileException:
371 for_scope, _, _, _ = ShpFileHandler._tiger_init(year, **geographic_specifiers)
372 msg = \
373 f"\nCould not extract the appropriate TIGER shapefile for the '{for_scope}' scope \n" \
374 f"during the {year} calendar year. This is partially due to several reasons: \n" \
375 " 1. The Census Bureau has not maintained a uniform naming convention for its map file\n" \
376 " throughout the years.\n"\
377 " 2. While attempts have been made to implement custom deterministic rules for generating \n"\
378 " the appropriate URLs for each geographic scope, this may not be successful in virtue \n"\
379 " of the previously stated reason and the fact that some files are stored in a different \n"\
380 " folders across the years, or aren't made available at all. This is particularly the case \n"\
381 " for data years 2008 and 2009.\n"\
382 "See `acspsuedo.shpfile` for more information."\
384 warnings.warn(
385 msg,
386 ShpfileWarning
387 )
388 return
390 def _cache_init(self):
391 """
392 Initialization for caching. Only applies if `auto_cache` is True.
393 """
394 if self._auto_cache:
395 self._cache_path.mkdir(parents = True, exist_ok = True)
397 def __move_files_new_cache(self, new_cache: t.Union[str, Path]):
398 """Internal for moving new files to the updated cache."""
399 file_dict = {}
400 for root, _, files in os.walk(self._cache_path):
401 for file in files:
402 if any(path in file for path in self._cached_files):
403 file_dict[f'{root}/{file}'] = f'{new_cache}/{file}'
405 for old_path, new_path in file_dict.items():
406 shutil.move(old_path, new_path)
408 @classmethod
409 def _tiger_init(cls, year: int, **geographic_specifiers):
410 """
411 Initialization for TIGER shapefile configuration.
413 Used to format case-match oddities in URL destinations.
414 """
415 for_scope, scope, folder = cls.__tiger_scope(year, **geographic_specifiers)
416 outer = cls.__needs_scope(year, **geographic_specifiers)
418 return for_scope, scope, folder, outer
420 @classmethod
421 def __tiger_scope(cls, year: int, **geographic_specifiers: t.Any) -> t.Tuple[str, str, str]:
422 """
423 Get the TIGER scope based off of the indicated `for` clause.
424 For special scopes, we apply any custom deterministic rules
425 via case-matching.
426 """
427 for_scope = list(geographic_specifiers)[-1]
428 shp_scope = GEO_SPEC_METADATA[for_scope][2]
429 shp_scope = shp_scope if shp_scope is not None else ''
431 folder = shp_scope
433 match shp_scope:
434 case 'CD':
435 shp_scope += str(_congressional_district_rule(year))
436 case 'ZCTA':
437 shp_scope, folder = _zipcode_tabulation_area_rule(year)
438 case 'PUMA':
439 shp_scope, folder = _public_use_microdata_area_rule(year)
440 case 'SUBMCD':
441 shp_scope, folder = _subminor_civil_division_rule(year)
442 case 'UAC':
443 shp_scope, folder = _urban_area_rule(year)
445 return for_scope, shp_scope, folder
447 @classmethod
448 def __needs_scope(cls, year: int, **geographic_specifiers: t.Any) -> t.Optional[str]:
449 """
450 For some TIGER scopes, a state specifier must be specified to
451 enforce scopes being taken from the outer-reference point of
452 state-level as opposed to one at the nation-level.
453 """
454 for_scope = list(geographic_specifiers)[-1]
455 shpfile_scope = GEO_SPEC_METADATA[for_scope][2]
456 outer = GEO_SPEC_METADATA[for_scope][4]
458 # For some years, congressional districts only had a 'us' outer reference.
459 # But after 2022, they reverted to the 'state' outer reference...
460 if shpfile_scope == 'CD' and year >= 2022:
461 outer = 'state'
463 return outer
467shapefile_handler: ShpFileHandler = ShpFileHandler()
471# ------------ Rules for special scopes ------------ #
472# These deterministic rules are implemented here for
473# special scopes from the TIGER shapefile data. Under,
474# usual circumstances, most scopes will adhere to a
475# general implementation and only change marginally,
476# depending on whether the outer point of reference
477# is the nation-level or state-level, but the
478# following scopes must be manually handled. There
479# will be a handful of extringent circumstances that
480# these custom rules fail to handle, but the
481# catch-all warning should suffice for the time being.
482# ------------ ----------- ----------- ------------ #
484def _congressional_district_rule(year: int):
485 """
486 Bureau's guidance on congressional districting:
487 https://www.census.gov/programs-surveys/geography/guidance/geo-areas/congressional-dist.html
489 Note that these congressional sessions specify align with those suggested
490 by Census Bureau data and may/may not cohere with the actual session years.
491 """
493 # Congressional districts by sessions
494 # - 103th (1993, 1994)
495 # - 104th (1995, 1996)
496 # - 105th (1997, 1998)
497 # - 106th (1999, 2000)
498 # - 107th (2001, 2002)
499 # - 108th (2003, 2004)
500 # - 109th (2005, 2006)
501 # - 110th (2007, 2008)
502 # - 111th (2009, 2010)
503 # - 112th (2011, 2012)
504 # - 113th (2013)
505 # - 114th (2014, 2015)
506 # - 115th (2016, 2017)
507 # - 116th (2018, 2019, 2020, 2021)
508 # - 118th (2022, 2023)
509 # - 119th (2024)
511 # For CD sessions 103 to 110 (i.e. 1993 to 2008):
512 # https://www2.census.gov/geo/tiger/PREVGENZ/cd/
513 # Note that each starts with state FIPS, e.g.
514 # cd06_103_ship.zip for CA congressional districts
516 cd_session = 103 + (year - 1993) // 2
518 # Special years that disobey the general rule. Pandemic?
519 if 2020 <= year <= 2021:
520 return 116
522 # There is a weird regime switch that occurs at 2013-14. From preliminary
523 # research (cf. congressional district documentation in docstring above),
524 # it may have seemed justified for the Bureau to remap congressional
525 # districts given House seats were apportioned in the 113th session based
526 # on the 2010 Decennial Census.
527 if year >= 2014:
528 return 114 + (year - 2014) // 2
530 return cd_session
533def _zipcode_tabulation_area_rule(year: int):
534 zcta = f'ZCTA5{str(year)[2]}0'
535 folder = f'ZCTA5{str(year)[2]}0'
537 if (2010 <= year <= 2019) or (year == 2000):
538 folder = 'ZCTA5'
540 return zcta, folder
542def _public_use_microdata_area_rule(year: int):
543 folder = f'PUMA{str(year)[2]}0'
544 if year == 2010:
545 folder = 'PUMA5'
546 if 2011 <= year <= 2023:
547 folder = 'PUMA'
549 puma = f'PUMA{str(year)[2]}0'
550 if 2008 <= year <= 2009:
551 puma = 'PUMA500'
552 if 2020 <= year <= 2021:
553 puma = 'PUMA10'
555 return puma, folder
557def _subminor_civil_division_rule(year: int):
558 """
559 This only applies for Puerto Rico, since they have subbarrios
560 and not subminor civil divisions (semantically different).
562 *Note*: This applies for 2010 and later. For 2008 and 2009, we
563 simply point back to the naming convention error because subbarrios
564 are nested in their county folders as opposed to the state folder
565 for these particular years
566 """
567 folder = 'SUBMCD'
568 scd = 'SUBMCD'
570 if year >= 2013:
571 scd = 'SUBBARRIO'
572 if year >= 2020:
573 folder = 'SUBBARRIO'
575 return scd, folder
577def _urban_area_rule(year: int):
578 folder = f'UAC{str(year)[2]}0'
579 if year == 2010:
580 folder = 'UA'
581 if 2011 <= year <= 2023:
582 folder = 'UAC'
584 uac = f'UAC{str(year)[2]}0'
586 if 2020 <= year <= 2021:
587 uac = 'UAC10'
589 return uac, folder
594def _fetch_shpfile(url: str) -> t.Optional[bytes]:
595 """
596 Synchronous method to fetch the binary content of a url.
598 This is the underlying for fetching TIGER shapefile data.
599 """
600 resp = requests.get(url)
602 if (status := resp.status_code) == 404:
603 raise ShpfileException(
604 f"Could not fetch the binary content of '{url}'. HTTPS Status Code: {status}."
605 )
607 if (content_type := resp.headers['Content-Type']) != 'application/zip':
608 raise APIException(
609 f"Expected 'application/zip' content-type. Received '{content_type}'."
610 )
612 return resp.content