Coverage for acspsuedo / source / shpfile.py: 98%

236 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-11 16:02 +0000

1""" 

2Shapefile handler for user-defined geometries. 

3 

4**NOTE**: We try our best to fit as many geometries as possible. However, 

5the Census Bureau has not maintained a uniform naming criteria throughout 

6the years for either the TIGER shapefile or Cartographic Boundary databases, 

7so requests for geometric information may not be satisfied. 

8 

9If requests are not satisfied, we encourage users to peruse the map 

10documentation here: https://www2.census.gov/geo/tiger/. 

11 

12See also the addendum that follows in this module for extra information. 

13""" 

14 

15# ADDENDUM #1 (version 0.2.1) 

16# We add deterministic rules for some geographic components, but not 

17# for all. This is for two reasons: 

18# 1. The Bureau has not maintained a uniform naming criteria throughout 

19# the years (e.g. 2012 bucks the predicted congressional district 

20# session, and geometries for the 117th congressional district are not 

21# avaiable because of the Census' previous methodological design that 

22# steered away from collecting congressional district boundaries for 

23# the session that aligns with the Decennial Census; cf. 

24# https://www.census.gov/programs-surveys/geography/guidance/geo-areas/congressional-dist.html). 

25# 2. For some components, the Bureau has stored shapefiles in such a way 

26# as to obfuscate a general URL locator rule. However, this is not 

27# generally the case for more recent years which have become much more 

28# standardized (post-2010 for TIGER shapefiles, or post-2014/15 Census 

29# GeoDatabases, to be sure). 

30# It is for these reasons that a general rule is preferred. 

31# 

32# In any case, we implement a broad "net" (so to speak) and throw a warning 

33# for any stragglers and appropriately name these considerations when the 

34# user runs queries. 

35 

36import os 

37import re 

38import shutil 

39import warnings 

40import typing as t 

41from pathlib import Path 

42from logging import getLogger 

43 

44import geopandas as gpd 

45import requests 

46 

47from acspsuedo.source.low.exceptions import APIException 

48from acspsuedo.source.shpfile_fmt import GEO_SPEC_METADATA 

49from acspsuedo.fips import STATE_FIPS 

50 

51 

52logger = getLogger(__name__) 

53 

54 

55 

56class ShpfileException(APIException): 

57 """Custom exception class for TIGER Shapefile extraction-related errors.""" 

58 pass 

59 

60class ShpfileFormatterException(APIException): 

61 """Custom exception class for TIGER Shapefile formatting-related errors.""" 

62 pass 

63 

64 

65class ShpfileWarning(UserWarning): 

66 """ 

67 Custom warning class for TIGER Shapefile-related warnings. 

68  

69 This will be used as per the module doc-string's guidance. 

70 """ 

71 pass 

72 

73 

74 

75drop_digits_re = re.compile(r"\d+") 

76""" 

77Regex pattern for dropping digits. 

78""" 

79 

80drop_alphaletters_re = re.compile(r"[a-zA-Z]") 

81""" 

82Regex pattern for non-numeric, alphabetic characters. 

83""" 

84 

85 

86class ShpFileHandler: 

87 """ 

88 Class for handling geometric extraction via TIGER shapefiles 

89 and/or Census GeoDatabases. 

90 """ 

91 

92 def __init__( 

93 self, 

94 auto_cache: bool = True, 

95 cache_path: t.Optional[t.Union[str, Path]] = None, 

96 track_updated_cache: bool = True 

97 ) -> None: 

98 """ 

99 Initialization for :py:class:`acspsuedo.source.shpfile.ShpFileHandler`. 

100 

101 Parameters 

102 ---------- 

103 auto_cache 

104 Boolean; default True. 

105  

106 Indicate whether or not to locally cache TIGER shapefiles. 

107 

108 cache_path 

109 If `auto_cache` is True, indicate the caching folder in which TIGER 

110 shapefiles should be stored. 

111  

112 Default `~/cache/acspsuedo/TIGER_shapefiles/`. 

113 

114 track_updated_cache 

115 Boolean; default True. 

116 

117 If the cache folder is updated, move all shapefiles from the previous 

118 cache folder to the updated cache folder. 

119 """ 

120 

121 self._auto_cache = auto_cache 

122 self._track_updated_cache = track_updated_cache 

123 

124 if cache_path is None: 

125 cache_path = Path.home() / 'cache' / 'acspsuedo' / 'TIGER_shapefiles' 

126 self._cache_path = cache_path 

127 else: 

128 self._cache_path = Path(cache_path) 

129 

130 # Internally track cached files 

131 self._cached_files = [] 

132 

133 

134 @property 

135 def auto_cache(self): 

136 """ 

137 Boolean indicating the state of locally caching fetched 

138 TIGER shapefiles. Default `True`. 

139 """ 

140 return self._auto_cache 

141 

142 @auto_cache.setter 

143 def auto_cache(self, new_state: bool): 

144 self._auto_cache = new_state 

145 

146 @property 

147 def track_updated_cache(self): 

148 """ 

149 Boolean indicating the whether or not we should move all files 

150 from the previous cache folder to the new cache folder. Default 

151 `True`. 

152 

153 **NOTE**: With this implementation, it is assumed that the previous 

154 cached folder may be relevant beyond the immediate use within this 

155 overall architecture (i.e. to cache TIGER shapefiles that can be 

156 reached with this interface). For instance, users may want to cache 

157 TIGER shapefiles that they themselves have manually downloaded. 

158  

159 Thus, we would be moving any of the existing cached files *without* 

160 deleting the previous cache folder. 

161 """ 

162 return self._track_updated_cache 

163 

164 @track_updated_cache.setter 

165 def track_updated_cache(self, new_state: bool): 

166 self._track_updated_cache = new_state 

167 

168 @property 

169 def cache_path(self): 

170 """ 

171 If `auto_cache` is True, this attribute specifies the local 

172 folder containing cached TIGER shapefiles. 

173 

174 Default `~/cache/acspsuedo/TIGER_shapefiles/`. 

175 """ 

176 return self._cache_path 

177 

178 @cache_path.setter 

179 def cache_path(self, new_cache_path: t.Union[Path, str]): 

180 if self._track_updated_cache: 

181 self.__move_files_new_cache(new_cache_path) 

182 

183 self._cache_path = Path(new_cache_path) 

184 

185 @cache_path.deleter 

186 def cache_path(self): 

187 raise AttributeError("Cannot delete reference to the cache path.") 

188 

189 

190 

191 def _tiger_url_fmtter_2008_2009(self, year: int, **geographic_specifiers: t.Any): 

192 for_scope, scope, _, outer = ShpFileHandler._tiger_init(year, **geographic_specifiers) 

193 base_url = f'https://www2.census.gov/geo/tiger/TIGER{year}/' 

194 

195 # For special scopes, which require state identifiers 

196 if scope in ['BG', 'COUSUB', 'PLACE', 'TRACT', 'SLDL', 'SLDU', 'UNSD'] \ 

197 or scope.startswith( ('CD', 'PUMA') ) \ 

198 or outer == 'state': 

199 

200 statefp = str(geographic_specifiers.get('state', '')) 

201 if not statefp: 

202 raise ShpfileFormatterException( 

203 f"Geometries at the '{for_scope}' scope take a 'state' outer-level " 

204 f"point of reference for extracting TIGER shapefile geometries at this " 

205 f"scope. Missing a state FIPS code." 

206 ) 

207 state = {v: k for k, v in STATE_FIPS.items()}.get(statefp, '') 

208 

209 base_url += f'{statefp}_{state.replace(' ', '_')}/' 

210 path = f'tl_{year}_{statefp}_{scope.lower()}' 

211 

212 if not scope.startswith( ('CD', 'PUMA') ): 

213 path += '00' 

214 

215 path += '.zip' 

216 

217 else: 

218 # Hmm... 

219 path = f'tl_{year}_us_{scope.lower()}.zip' 

220 

221 return base_url, path 

222 

223 

224 def _tiger_url_fmtter_pre_2008_and_2010(self, year: int, **geographic_specifiers: t.Any): 

225 _, scope, folder, outer = ShpFileHandler._tiger_init(year, **geographic_specifiers) 

226 

227 if outer == 'state': 

228 outer = geographic_specifiers.get('state', '') 

229 

230 nested = year 

231 if scope.startswith( ('CD',) ): 

232 nested = drop_alphaletters_re.sub('', scope) 

233 

234 suffix = str(year)[-2:] 

235 if scope.startswith( ('CD', 'PUMA', 'ZCTA', 'UAC') ): 

236 suffix = '' 

237 

238 base_url = f'https://www2.census.gov/geo/tiger/TIGER2010/{folder}/{nested}/' 

239 path = f'tl_2010_{outer}_{scope.lower()}{suffix}.zip' 

240 

241 return base_url, path 

242 

243 

244 def _tiger_url_fmtter_post_2010(self, year: int, **geographic_specifiers: t.Any): 

245 for_scope, scope, folder, outer = ShpFileHandler._tiger_init(year, **geographic_specifiers) 

246 

247 if outer == 'state': 

248 outer = geographic_specifiers.get('state', '') 

249 if not outer: 

250 raise ShpfileFormatterException( 

251 f"Geometries at the '{for_scope}' scope take a 'state' outer-level " 

252 f"point of reference for extracting TIGER shapefile geometries at this " 

253 f"scope. Missing a state FIPS code." 

254 ) 

255 

256 base_url = f'https://www2.census.gov/geo/tiger/TIGER{year}/{folder}/' 

257 path = f'tl_{year}_{outer}_{scope.lower()}.zip' 

258 

259 return base_url, path 

260 

261 def _tiger_url_fmtter(self, year: int, **geographic_specifiers) -> t.Tuple[str, str]: 

262 if 2008 <= year <= 2009: 

263 return self._tiger_url_fmtter_2008_2009(year, **geographic_specifiers) 

264 elif year == 2010 or year < 2008: 

265 return self._tiger_url_fmtter_pre_2008_and_2010(year, **geographic_specifiers) 

266 else: 

267 return self._tiger_url_fmtter_post_2010(year, **geographic_specifiers) 

268 

269 

270 def fetch_tiger_shpfile(self, year, **geographic_specifiers) -> t.Optional[gpd.GeoDataFrame]: 

271 """ 

272 Fetch the appropriate TIGER shapefile for the supplied geographic specifiers 

273 and return a formated :py:class:`geopandas.GeoDataFrame` containing identifier 

274 and geometric information. 

275 

276 **NOTE**: We try our best to fit as many geometries as possible. However, 

277 the Census Bureau has not maintained a uniform naming criteria throughout 

278 the years for either the TIGER shapefile or Cartographic Boundary databases, 

279 so requests for geometric information may not be satisfied. 

280 

281 If requests are not satisfied, we encourage users to peruse the map 

282 documentation here: https://www2.census.gov/geo/tiger/. 

283 

284 Returns 

285 ------- 

286 If the appropriate TIGER shapefile is found, returns a :py:class:`geopandas.GeoDataFrame` 

287 instance. 

288 """ 

289 

290 if (gdf := self._fetch_tiger_shpfile(year, **geographic_specifiers)) is None: 

291 return 

292 

293 return gdf 

294 

295 

296 def _fetch_tiger_shpfile(self, year: int, **geographic_specifiers: t.Any) -> t.Optional[gpd.GeoDataFrame]: 

297 """ 

298 The actual underlying for running an attempt to the TIGER shapefile, 

299 as suggested by our approximation of the map data naming convention. 

300 """ 

301 

302 gdf = self._cache_fetch_tiger_shpfile(year, **geographic_specifiers) 

303 

304 if gdf is None: 

305 return 

306 

307 gdf = self._tiger_shpfile_fmtter(gdf, year, **geographic_specifiers) 

308 return gdf 

309 

310 def _tiger_shpfile_fmtter(self, gdf: gpd.GeoDataFrame, year: int, **geographic_specifiers) -> gpd.GeoDataFrame: 

311 """ 

312 Formatter for fetched TIGER shapefiles. 

313 """ 

314 for_scope, _, _, _ = ShpFileHandler._tiger_init(year, **geographic_specifiers) 

315 

316 # Create a year column 

317 gdf['YEAR'] = year 

318 

319 # Keep identifier and geometric info 

320 gdf.columns = [drop_digits_re.sub('', col) for col in gdf.columns] 

321 

322 id_cols = GEO_SPEC_METADATA[for_scope][3] 

323 if not isinstance(id_cols, list): 

324 id_cols = [id_cols] 

325 

326 id_cols = [col for col in [*id_cols, 'GEOID', 'YEAR'] if col in list(gdf.columns)] 

327 

328 gdf_cols = [*id_cols, 'geometry'] 

329 gdf = gdf[gdf_cols].copy() 

330 

331 # Rename GEOID to cohere with the GEO_ID in the Census queried data 

332 gdf.rename(columns = {'GEOID': 'GEO_ID'}, inplace = True) 

333 

334 # Sort by 

335 if 'GEO_ID' in gdf.columns: 

336 gdf.sort_values(by = 'GEO_ID', ignore_index=True, inplace=True) 

337 else: 

338 gdf.sort_values(by = id_cols, ignore_index=True, inplace = True) 

339 

340 return gdf 

341 

342 def _cache_fetch_tiger_shpfile(self, year: int, **geographic_specifiers): 

343 """ 

344 The actual underlying for fetching TIGER shapefiles and (if specified) 

345 caching them locally within the specified cache folder. 

346 """ 

347 base_url, path = self._tiger_url_fmtter(year, **geographic_specifiers) 

348 

349 try: 

350 # Check if caching is enabled. Then, check if the shapefile has already been cached. 

351 if self._auto_cache: 

352 file_path = f"{self._cache_path}/{path.removesuffix('.zip')}.shp" 

353 if Path(file_path).exists(): 

354 gdf = gpd.read_file(file_path) 

355 return gdf 

356 

357 content = _fetch_shpfile(f'{base_url}{path}') 

358 gdf = gpd.read_file(content) 

359 

360 if self._auto_cache: 

361 self._cache_init() 

362 gdf.to_file( 

363 filename = file_path, 

364 index = False 

365 ) 

366 self._cached_files.append(path.removesuffix('.zip')) 

367 

368 return gdf 

369 

370 except ShpfileException: 

371 for_scope, _, _, _ = ShpFileHandler._tiger_init(year, **geographic_specifiers) 

372 msg = \ 

373 f"\nCould not extract the appropriate TIGER shapefile for the '{for_scope}' scope \n" \ 

374 f"during the {year} calendar year. This is partially due to several reasons: \n" \ 

375 " 1. The Census Bureau has not maintained a uniform naming convention for its map file\n" \ 

376 " throughout the years.\n"\ 

377 " 2. While attempts have been made to implement custom deterministic rules for generating \n"\ 

378 " the appropriate URLs for each geographic scope, this may not be successful in virtue \n"\ 

379 " of the previously stated reason and the fact that some files are stored in a different \n"\ 

380 " folders across the years, or aren't made available at all. This is particularly the case \n"\ 

381 " for data years 2008 and 2009.\n"\ 

382 "See `acspsuedo.shpfile` for more information."\ 

383 

384 warnings.warn( 

385 msg, 

386 ShpfileWarning 

387 ) 

388 return 

389 

390 def _cache_init(self): 

391 """ 

392 Initialization for caching. Only applies if `auto_cache` is True. 

393 """ 

394 if self._auto_cache: 

395 self._cache_path.mkdir(parents = True, exist_ok = True) 

396 

397 def __move_files_new_cache(self, new_cache: t.Union[str, Path]): 

398 """Internal for moving new files to the updated cache.""" 

399 file_dict = {} 

400 for root, _, files in os.walk(self._cache_path): 

401 for file in files: 

402 if any(path in file for path in self._cached_files): 

403 file_dict[f'{root}/{file}'] = f'{new_cache}/{file}' 

404 

405 for old_path, new_path in file_dict.items(): 

406 shutil.move(old_path, new_path) 

407 

408 @classmethod 

409 def _tiger_init(cls, year: int, **geographic_specifiers): 

410 """ 

411 Initialization for TIGER shapefile configuration. 

412  

413 Used to format case-match oddities in URL destinations. 

414 """ 

415 for_scope, scope, folder = cls.__tiger_scope(year, **geographic_specifiers) 

416 outer = cls.__needs_scope(year, **geographic_specifiers) 

417 

418 return for_scope, scope, folder, outer 

419 

420 @classmethod 

421 def __tiger_scope(cls, year: int, **geographic_specifiers: t.Any) -> t.Tuple[str, str, str]: 

422 """ 

423 Get the TIGER scope based off of the indicated `for` clause. 

424 For special scopes, we apply any custom deterministic rules 

425 via case-matching. 

426 """ 

427 for_scope = list(geographic_specifiers)[-1] 

428 shp_scope = GEO_SPEC_METADATA[for_scope][2] 

429 shp_scope = shp_scope if shp_scope is not None else '' 

430 

431 folder = shp_scope 

432 

433 match shp_scope: 

434 case 'CD': 

435 shp_scope += str(_congressional_district_rule(year)) 

436 case 'ZCTA': 

437 shp_scope, folder = _zipcode_tabulation_area_rule(year) 

438 case 'PUMA': 

439 shp_scope, folder = _public_use_microdata_area_rule(year) 

440 case 'SUBMCD': 

441 shp_scope, folder = _subminor_civil_division_rule(year) 

442 case 'UAC': 

443 shp_scope, folder = _urban_area_rule(year) 

444 

445 return for_scope, shp_scope, folder 

446 

447 @classmethod 

448 def __needs_scope(cls, year: int, **geographic_specifiers: t.Any) -> t.Optional[str]: 

449 """ 

450 For some TIGER scopes, a state specifier must be specified to 

451 enforce scopes being taken from the outer-reference point of 

452 state-level as opposed to one at the nation-level. 

453 """ 

454 for_scope = list(geographic_specifiers)[-1] 

455 shpfile_scope = GEO_SPEC_METADATA[for_scope][2] 

456 outer = GEO_SPEC_METADATA[for_scope][4] 

457 

458 # For some years, congressional districts only had a 'us' outer reference. 

459 # But after 2022, they reverted to the 'state' outer reference... 

460 if shpfile_scope == 'CD' and year >= 2022: 

461 outer = 'state' 

462 

463 return outer 

464 

465 

466 

467shapefile_handler: ShpFileHandler = ShpFileHandler() 

468 

469 

470 

471# ------------ Rules for special scopes ------------ # 

472# These deterministic rules are implemented here for 

473# special scopes from the TIGER shapefile data. Under, 

474# usual circumstances, most scopes will adhere to a 

475# general implementation and only change marginally, 

476# depending on whether the outer point of reference 

477# is the nation-level or state-level, but the 

478# following scopes must be manually handled. There 

479# will be a handful of extringent circumstances that 

480# these custom rules fail to handle, but the 

481# catch-all warning should suffice for the time being. 

482# ------------ ----------- ----------- ------------ # 

483 

484def _congressional_district_rule(year: int): 

485 """ 

486 Bureau's guidance on congressional districting: 

487 https://www.census.gov/programs-surveys/geography/guidance/geo-areas/congressional-dist.html 

488 

489 Note that these congressional sessions specify align with those suggested 

490 by Census Bureau data and may/may not cohere with the actual session years. 

491 """ 

492 

493 # Congressional districts by sessions 

494 # - 103th (1993, 1994) 

495 # - 104th (1995, 1996) 

496 # - 105th (1997, 1998) 

497 # - 106th (1999, 2000) 

498 # - 107th (2001, 2002) 

499 # - 108th (2003, 2004) 

500 # - 109th (2005, 2006) 

501 # - 110th (2007, 2008) 

502 # - 111th (2009, 2010) 

503 # - 112th (2011, 2012) 

504 # - 113th (2013) 

505 # - 114th (2014, 2015) 

506 # - 115th (2016, 2017) 

507 # - 116th (2018, 2019, 2020, 2021) 

508 # - 118th (2022, 2023) 

509 # - 119th (2024) 

510 

511 # For CD sessions 103 to 110 (i.e. 1993 to 2008): 

512 # https://www2.census.gov/geo/tiger/PREVGENZ/cd/ 

513 # Note that each starts with state FIPS, e.g. 

514 # cd06_103_ship.zip for CA congressional districts 

515 

516 cd_session = 103 + (year - 1993) // 2 

517 

518 # Special years that disobey the general rule. Pandemic? 

519 if 2020 <= year <= 2021: 

520 return 116 

521 

522 # There is a weird regime switch that occurs at 2013-14. From preliminary 

523 # research (cf. congressional district documentation in docstring above), 

524 # it may have seemed justified for the Bureau to remap congressional 

525 # districts given House seats were apportioned in the 113th session based 

526 # on the 2010 Decennial Census. 

527 if year >= 2014: 

528 return 114 + (year - 2014) // 2 

529 

530 return cd_session 

531 

532 

533def _zipcode_tabulation_area_rule(year: int): 

534 zcta = f'ZCTA5{str(year)[2]}0' 

535 folder = f'ZCTA5{str(year)[2]}0' 

536 

537 if (2010 <= year <= 2019) or (year == 2000): 

538 folder = 'ZCTA5' 

539 

540 return zcta, folder 

541 

542def _public_use_microdata_area_rule(year: int): 

543 folder = f'PUMA{str(year)[2]}0' 

544 if year == 2010: 

545 folder = 'PUMA5' 

546 if 2011 <= year <= 2023: 

547 folder = 'PUMA' 

548 

549 puma = f'PUMA{str(year)[2]}0' 

550 if 2008 <= year <= 2009: 

551 puma = 'PUMA500' 

552 if 2020 <= year <= 2021: 

553 puma = 'PUMA10' 

554 

555 return puma, folder 

556 

557def _subminor_civil_division_rule(year: int): 

558 """ 

559 This only applies for Puerto Rico, since they have subbarrios 

560 and not subminor civil divisions (semantically different). 

561 

562 *Note*: This applies for 2010 and later. For 2008 and 2009, we 

563 simply point back to the naming convention error because subbarrios 

564 are nested in their county folders as opposed to the state folder 

565 for these particular years 

566 """ 

567 folder = 'SUBMCD' 

568 scd = 'SUBMCD' 

569 

570 if year >= 2013: 

571 scd = 'SUBBARRIO' 

572 if year >= 2020: 

573 folder = 'SUBBARRIO' 

574 

575 return scd, folder 

576 

577def _urban_area_rule(year: int): 

578 folder = f'UAC{str(year)[2]}0' 

579 if year == 2010: 

580 folder = 'UA' 

581 if 2011 <= year <= 2023: 

582 folder = 'UAC' 

583 

584 uac = f'UAC{str(year)[2]}0' 

585 

586 if 2020 <= year <= 2021: 

587 uac = 'UAC10' 

588 

589 return uac, folder 

590 

591 

592 

593 

594def _fetch_shpfile(url: str) -> t.Optional[bytes]: 

595 """ 

596 Synchronous method to fetch the binary content of a url. 

597 

598 This is the underlying for fetching TIGER shapefile data. 

599 """ 

600 resp = requests.get(url) 

601 

602 if (status := resp.status_code) == 404: 

603 raise ShpfileException( 

604 f"Could not fetch the binary content of '{url}'. HTTPS Status Code: {status}." 

605 ) 

606 

607 if (content_type := resp.headers['Content-Type']) != 'application/zip': 

608 raise APIException( 

609 f"Expected 'application/zip' content-type. Received '{content_type}'." 

610 ) 

611 

612 return resp.content