Coverage for acspsuedo / source / geog.py: 93%

206 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-11 16:02 +0000

1""" 

2Handler objects for geographic specifiers and the (optionally 

3user supplied) Census Bureau API key. 

4""" 

5import os 

6import typing as t 

7import collections.abc as cABC 

8from itertools import groupby, combinations 

9from collections import namedtuple, defaultdict 

10from pathlib import Path 

11from warnings import warn 

12from logging import getLogger 

13 

14 

15from acspsuedo.source.low.protocols import fetch_content 

16from acspsuedo.source.low.exceptions import APIException 

17from acspsuedo.source.shpfile_fmt import GEO_SPEC_METADATA 

18from acspsuedo.datasets import API_METADATA 

19 

20 

21logger = getLogger(__name__) 

22 

23 

24class GeoScopeException(APIException): 

25 """Exceptions for geographic scope-related errors.""" 

26 pass 

27 

28class UnsupportedSpecException(APIException): 

29 """Exceptions for any unsupported specifier handling.""" 

30 pass 

31 

32 

33 

34class GeoSpecFmtter: 

35 """ 

36 Formatter for geographic specifiers. 

37 """ 

38 def __init__( 

39 self, 

40 **geog_specifiers 

41 ) -> None: 

42 """ 

43 Formatter for geographic specifiers. 

44 

45 Note that instances can double as callables, such that the results of the 

46 callable refer to the fully-specified geographic path (for a dataset of 

47 interest during a calendar year) inferred from the supplied specifiers. 

48  

49 Parameters 

50 ---------- 

51 geog_specifiers 

52 Geographic specifiers of interest. 

53  

54 See `GeoSpecFmtter.view_geographic_areas()` to view an exhaustive list 

55 of fully-specified paths, each containing the component geographic 

56 specifiers required for each path, or `GeoSpecFmtter.check_path_existence()`, 

57 to view whether or not a particular specifier (or specified path) is 

58 supported by a dataset of interest during a particular calendar year. 

59 """ 

60 

61 self._geog_specifiers = geog_specifiers 

62 

63 @property 

64 def geog_specifiers(self): 

65 return self._geog_specifiers 

66 

67 @geog_specifiers.setter 

68 def geog_specifiers(self, new_geog_specifiers): 

69 self._geog_specifiers = new_geog_specifiers 

70 

71 def __len__(self): 

72 """Return the length of the supplied geographic specifiers.""" 

73 return len(self.geog_specifiers) 

74 

75 def _kwarg_fmt(self): 

76 return ', '.join( [f"{k} = '{v}'" for k, v in self.geog_specifiers.items()] ) 

77 

78 def __str__(self) -> str: 

79 return f"GeoSpecFmtter({self._kwarg_fmt()})" 

80 

81 def __repr__(self) -> str: 

82 return str(self) 

83 

84 def __call__(self, dataset: str, year: int) -> str: 

85 """ 

86 Infer the specific path for a given dataset on a supported year 

87 using the instance's supplied keyword arguments and return the 

88 formatted geographic path. 

89 """ 

90 fmt_path, _ = GeoSpecFmtter.get_fmt_path(dataset, year, **self.geog_specifiers) 

91 return fmt_path 

92 

93 @classmethod 

94 def check_path_existence( 

95 cls, 

96 dataset: str, 

97 year: int, 

98 geographic_specifier: t.Union[cABC.Iterable[str], str] 

99 ) -> t.Optional[list[list[str]]]: 

100 """ 

101 Check if the specifier, or collection of specifiers, is supported for the 

102 dataset of interest during the specified calendar year. 

103 

104 If supported, returns a list of fully-specified geographic paths containing 

105 at least one of the supplied specifiers anywhere. 

106 

107 Parameters 

108 ---------- 

109 dataset 

110 The Census Bureau dataset of interest. See `~datasets` 

111 for a list of supported datasets. 

112 

113 year 

114 The calendar year, for which the dataset must be supported 

115 on. 

116 

117 geographic_specifier 

118 One, or multiple, geographic specifiers. 

119 

120 

121 Returns 

122 ------- 

123 If found, returns a list containing all fully-specified paths corresponding 

124 to the specifier or collection of specifiers. Otherwise, a warning is raised. 

125 """ 

126 

127 if isinstance(geographic_specifier, str): 

128 geographic_specifier = [geographic_specifier] 

129 

130 paths = cls.view_geographic_paths(dataset, year) 

131 paths = [p for p in paths if all(q in p for q in geographic_specifier)] 

132 

133 if paths: 

134 return paths 

135 else: 

136 msg = \ 

137 f"\nCould not find any fully-specified paths corresponding to " \ 

138 f"{geographic_specifier} in the '{dataset}' dataset for the {year} calendar\n" \ 

139 f"year. This may be due to a combination of the following reasons:\n" \ 

140 " 1. Potential misspelling in the geographic specifier(s)\n" \ 

141 " 2. Unavailable and/or unsupported geographic specifier(s) for the dataset and calendar year.\n" \ 

142 " 3. Incompatible combination of geographic specifiers." 

143 

144 warn( msg, UserWarning ) 

145 return 

146 

147 @classmethod 

148 def view_geographic_paths( 

149 cls, 

150 dataset: str, 

151 year: int 

152 ) -> list[list[str]]: 

153 """ 

154 View all fully-specified geographic paths that are 

155 supported by a dataset of interest during the calendar 

156 year of interest. 

157 

158 Parameters 

159 ---------- 

160 dataset 

161 The Census Bureau dataset of interest. See `~datasets` 

162 for a list of supported datasets. 

163 

164 year 

165 The calendar year, for which the dataset must be supported 

166 on. 

167 

168 Returns 

169 ------- 

170 A list of fully-specified geographic pathways, each of which 

171 are a list and whose respective last element represent the 

172 'for' clause in queries to the Census Bureau. 

173  

174 For instance, one fully-specified path may be ['state', 'county'] 

175 where 'county' represents the 'for' clause to specify queries to 

176 a particular geographic scope while 'state' represents the 'in' 

177 clause, which governs how geographies to the aforementioned scope 

178 are restricted to. 

179  

180 Thus, for this particular example, data will be shown for county-level 

181 geographies based on the restriction in the specified state-level 

182 geographies: `{'state': '06', 'county': '*'}` specifies the user wishes 

183 to query all county-level geographies (the wildcard '`*`' operator) for 

184 the state of California ('06' corresponding to the FIPS code for California). 

185 """ 

186 

187 paths = cls.__get_all_paths(dataset, year) 

188 

189 paths = [list(p.kwargs) for p in paths] 

190 

191 return paths 

192 

193 @classmethod 

194 def get_geo_cols(cls, **geog_specifiers): 

195 """ 

196 Given a set of geographic specifiers, list the names of their respective 

197 geographic columns that will be returned from data queries. 

198 """ 

199 return list(dict.fromkeys([x for k in geog_specifiers for x in GEO_SPEC_METADATA[k][1]])) 

200 

201 _repl: t.Callable[[str], str] = lambda x: x.replace('(', '') \ 

202 .replace(')', '') \ 

203 .replace('-', '_') \ 

204 .replace('/', '_') \ 

205 .replace(' ', '_') 

206 

207 __CACHE_PATHS_BY_DATASET_YEAR: t.DefaultDict[ 

208 str, dict[int, list['_InferSpec']] 

209 ] = defaultdict(dict) 

210 """ 

211 Internal for caching information on geographic pathways w/o having 

212 to query the Bureau's API each time. 

213 """ 

214 

215 @classmethod 

216 def get_fmt_path( 

217 cls, 

218 dataset: str, 

219 year: int, 

220 **kwargs 

221 ) -> t.Tuple[str, t.Dict[str, str]]: 

222 """ 

223 Given a set of geographic specifiers, get the fully-specified 

224 geographic path for a dataset of interest on a supported year. 

225 

226 Parameters 

227 ---------- 

228 dataset 

229 The Census Bureau dataset of interest. See `~datasets` 

230 for a list of supported datasets. 

231 

232 year 

233 The calendar year, for which the dataset must be supported 

234 on. 

235 

236 **kwargs 

237 A set of geographic specifiers. Note that the wildcard 

238 operator, `*`, indicates that the user wishes to query 

239 information for the entire set of geographies of the 

240 scope (e.g. `state = '*'` indicates the users wishes to 

241 view a dataset's information across all states). 

242 

243 Returns 

244 ------- 

245 A formatted string corresponding to the fully specified 

246 geographic path. 

247 """ 

248 return cls._fmt_path(dataset, year, **kwargs) 

249 

250 

251 @classmethod 

252 def _fmt_path( 

253 cls, 

254 dataset: str, 

255 year: int, 

256 **kwargs 

257 ): 

258 """ 

259 Internal for taking the inferred geographic path 

260 and formatting it into the data query URL. 

261 """ 

262 path = cls._infer_path(dataset, year, **kwargs) 

263 logger.debug('Inferred the following path -- %s', path) 

264 

265 if path.len == 1: 

266 return '&for={}:{}'.format(*path.spec, *path.kwargs.values()), path.kwargs 

267 else: 

268 *in_k, for_k = path.spec 

269 *in_v, for_v = path.kwargs.values() 

270 

271 in_clause = ' '.join([f'{k}:{v}' for k, v in zip(in_k, in_v)]) 

272 

273 return '&for={}:{}&in={}'.format(for_k, for_v, in_clause), path.kwargs 

274 

275 @classmethod 

276 def _infer_path( 

277 cls, 

278 dataset: str, 

279 year: int, 

280 **kwargs 

281 ) -> "_InferSpec": 

282 """ 

283 Given a set of geographic specifiers, infer the path 

284 specification of interest for a dataset on a given year. 

285 

286 Parameters 

287 ---------- 

288 dataset 

289 The Census Bureau dataset of interest. See `~datasets` 

290 for a list of supported datasets. 

291 

292 year 

293 The calendar year, for which the dataset must be supported 

294 on. 

295 

296 **kwargs 

297 A set of geographic specifiers. Note that the wildcard 

298 operator, `*`, indicates that the user wishes to query 

299 information for the entire set of geographies of the 

300 scope (e.g. `state = '*'` indicates the users wishes to 

301 view a dataset's information across all states). 

302 

303 Returns 

304 ------- 

305 An internal (:py:class:`~_InferSpec`, subclassing 

306 `collections.namedtuple`) for formatting query URLs. 

307 """ 

308 all_paths = cls.__get_all_paths(dataset, year) 

309 

310 path = cls.__infer_path(dataset, year, all_paths, **kwargs) 

311 

312 cls.__wc_check(path) 

313 

314 return path 

315 

316 @classmethod 

317 def __get_all_paths( 

318 cls, 

319 dataset: str, 

320 year: int 

321 ) -> list['_InferSpec']: 

322 """ 

323 Fetch protocol to get all of a dataset's geographic paths for 

324 a particular year. 

325 

326 If a dataset for a certain year has already been fetched in some 

327 way (whether that be perusing, or path inference), it will be 

328 cached so as to avoid repeated calls to the Bureau. 

329 """ 

330 try: 

331 paths = cls.__CACHE_PATHS_BY_DATASET_YEAR[dataset][year] 

332 logger.debug("Found the '%s' dataset for the calendar year %s in cache.",dataset, year) 

333 except: 

334 paths = cls._list_geo_specs(dataset, year) 

335 cls.__CACHE_PATHS_BY_DATASET_YEAR[dataset][year] = paths 

336 logger.debug( 

337 "Could not find the '%s' dataset for the calendar year %s in cache. " 

338 "Fetched and cached for potential re-use later.", dataset, year 

339 ) 

340 

341 return paths 

342 

343 @classmethod 

344 def __wc_check( 

345 cls, 

346 path: '_InferSpec', 

347 ) -> None: 

348 """ 

349 Check if any specifiers are supplied with wildcards that shouldn't be. 

350 Raises an error. 

351 """ 

352 wc_errors = [k for check, (k, v) in zip(path.spec, path.kwargs.items()) 

353 if v == '*' and not path.supports_wildcard[check]] 

354 

355 if wc_errors: 

356 raise GeoScopeException( 

357 f"The wildcard operator (`*`) is not permitted for the specifier(s) {wc_errors} " 

358 f"in the supplied path: {path.spec}." 

359 ) 

360 

361 @classmethod 

362 def __infer_path( 

363 cls, 

364 dataset: str, 

365 year: int, 

366 geo_specs: list['_InferSpec'], 

367 **kwargs 

368 ): 

369 """ 

370 Actual implementation of inferring a path specification. 

371 """ 

372 

373 path_len = len(kwargs) 

374 

375 # Loose search for matches 

376 # 

377 # Note that while we loosely search for paths 

378 # containing at least one instance of any specifier, 

379 # we are also priming for irreducibility by testing 

380 # the length of supplied (and supported) specifiers 

381 # (cf. strict test below). 

382 opts = [p for p in geo_specs if any(k in p.kwargs for k in kwargs)] 

383 

384 if opts: 

385 full = [p for p in opts if (p.len == path_len) and 

386 all(k in p.kwargs for k in kwargs)] 

387 

388 if full: 

389 return cls._full_match(full, **kwargs) 

390 else: 

391 # 'for' clause is governed by the last specifier of a path. 

392 avail_specs = list(dict.fromkeys([list(p.kwargs)[-1] for p in geo_specs])) 

393 

394 return cls._partial_match(dataset, year, opts, avail_specs, **kwargs) 

395 

396 else: 

397 raise GeoScopeException( 

398 "Invalid/unsupported geographic specifiers were supplied for the " 

399 f"'{dataset}' dataset for the {year} calendar year." 

400 ) 

401 

402 @classmethod 

403 def _full_match( 

404 cls, 

405 full: list['_InferSpec'], 

406 **kwargs 

407 ) -> '_InferSpec': 

408 """ 

409 Full test. 

410 """ 

411 # Note that, by this construction, each path is irreducible. Thus, if a path (e.g. 

412 # ['state', 'county']) was similar to that containing itself and some additional 

413 # specifiers (e.g. ['state', 'county', 'tract']), only the former would be matched 

414 # because it is of length 2 (or the exact amount of specifiers supplied). 

415 if len(full) > 1: 

416 raise GeoScopeException( 

417 "Excessive geographic specifiers; could not infer a(n) unique geographic " 

418 f"path. Inferred multiple paths: {[list(p.kwargs) for p in full]}." 

419 ) 

420 

421 match = full[0] 

422 match.kwargs.update(kwargs) 

423 return match 

424 

425 @classmethod 

426 def _partial_match( 

427 cls, 

428 dataset: str, 

429 year: int, 

430 opts: list['_InferSpec'], 

431 avail_specs: list, 

432 **kwargs 

433 ): 

434 """ 

435 Partial test. Defaulted to in the case of null full matches. 

436 """ 

437 # Note that, in the initial construction, 2^n combinations are generated based on 

438 # the supported kwargs inputed, where n represents the length of supported kwargs. 

439 # Thus, we are effectively searching for spec paths such that they correspond to 

440 # at least one of these 2^n path combinations. From there, if the generated results 

441 # miss any of the specifiers, we go back and specify it. 

442 

443 c_test = [c for L in range(len(kwargs) + 1) for c in combinations(kwargs, L) ] 

444 

445 p_opts = [list(p.kwargs) for c in c_test for p in opts if set(p.kwargs).issubset(c)] 

446 

447 # If we miss paths for certain specifiers, add them. 

448 for k in kwargs: 

449 if not any(k in i for i in p_opts): 

450 p_opts.extend([list(p.kwargs) for p in opts if k in p.kwargs]) 

451 

452 # Remove duplicates 

453 p_opts.sort() 

454 p_opts = list(p_opts for p_opts, _ in groupby(p_opts)) 

455 

456 # If any scopes are not supported, list them. 

457 unsupported = [k for k in kwargs if k not in avail_specs] 

458 usd_msg = f' Additionally, invalid/unsupported specifiers were found: {unsupported}.' if \ 

459 unsupported else '' 

460 

461 raise GeoScopeException( 

462 "Could not infer a fully-specified path from the supplied geographic specifiers: " 

463 f"{list(kwargs)}. Potential fully-specified path matches for the '{dataset}' " 

464 f"dataset during the {year} calendar year based on the supplied specifiers include " 

465 f"one of: {p_opts}.{usd_msg}" 

466 ) 

467 

468 

469 @classmethod 

470 def _list_geo_specs(cls, dataset: str, year: int) -> list['_InferSpec']: 

471 """ 

472 Internal for fetching all possible geographic paths for a given 

473 dataset during a given calendar year. 

474 """ 

475 

476 url = cls._url_fmt(dataset, year) 

477 try: 

478 content = fetch_content(url) 

479 except: 

480 raise APIException( 

481 f"The {year} calendar year was not supported for the '{dataset}' dataset." 

482 ) from None 

483 geographies = content.get('fips', None) 

484 

485 if geographies: 

486 geo_combinations = [] 

487 

488 for g in geographies: 

489 scope = g.get('name') 

490 reqs = g.get('requires', []) 

491 opt_reqs = g.get('optionalWithWCFor', []) 

492 if not isinstance(opt_reqs, list): 

493 opt_reqs = [opt_reqs] 

494 wcs = g.get('wildcard', []) 

495 

496 d_scope, d_reqs = cls._repl(scope), map(cls._repl, reqs) 

497 

498 scp = [*reqs, scope] 

499 d_scp = [*d_reqs, d_scope] 

500 

501 wcs = {**{k: True if k in wcs else False for k in reqs}, scope: True} 

502 

503 ifp = _InferSpec(scp, len(scp), dict.fromkeys(d_scp), wcs) 

504 geo_combinations.append(ifp) 

505 

506 # Accomodate for any optional specifiers 

507 if opt_reqs: 

508 for opt in opt_reqs: 

509 mod_ifp = [j for j in scp if j != opt] 

510 mod_wcs = {k: v for k, v in wcs.items() if k != opt} 

511 d_mod_ifp = map(cls._repl, mod_ifp) 

512 

513 mod_ifp = _InferSpec(mod_ifp, len(mod_ifp), dict.fromkeys(d_mod_ifp), mod_wcs) 

514 geo_combinations.append(mod_ifp) 

515 

516 return geo_combinations 

517 

518 else: 

519 # Although American Community Survey datasets have geographic specifiers, 

520 # this specific implementation is to call out any extraneous handling for 

521 # multi-year handling (TODO). 

522 raise UnsupportedSpecException( 

523 f"The '{dataset}' API does not have supported geographic specifier handling " 

524 f"for Federal Informating Process Standard (FIPS) codes during the {year} " 

525 f"calendar year." 

526 ) 

527 

528 @classmethod 

529 def _url_fmt(cls, dataset: str, year: int) -> str: 

530 _dataset_meta_check(dataset, year) 

531 geography_url = 'https://api.census.gov/data/{}/{}/geography.json'.format(year, dataset) 

532 

533 return geography_url 

534 

535 

536class ApiKeyConfig: 

537 """ 

538 Formatter for user-defined Census Bureau API keys. 

539 """ 

540 

541 def __init__(self) -> None: 

542 self._FILE_PATH = Path.cwd() / 'api_key.txt' 

543 self._OS_ENV_LOCATION = 'CENSUS_BUREAU_API_KEY' 

544 

545 self._API_KEY = None 

546 

547 @property 

548 def API_KEY(self): 

549 """ 

550 The API key. Note that this can be directly set, if 

551 you prefer. 

552 """ 

553 return self._API_KEY 

554 

555 @API_KEY.setter 

556 def API_KEY(self, new_key: t.Any): 

557 self._API_KEY = new_key 

558 

559 @property 

560 def OS_ENV_LOCATION(self): 

561 """ 

562 The operation system (OS) environment location to the 

563 API key. Note that this is prioritized first. 

564 """ 

565 return self._OS_ENV_LOCATION 

566 

567 @OS_ENV_LOCATION.setter 

568 def OS_ENV_LOCATION(self, new_location: str): 

569 self._OS_ENV_LOCATION = new_location 

570 

571 @property 

572 def FILE_PATH(self): 

573 """ 

574 The textfile path containing the API key. Note that this 

575 is prioritized second. 

576 """ 

577 return self._FILE_PATH 

578 

579 @FILE_PATH.setter 

580 def FILE_PATH(self, new_file_path: t.Union[str, Path]): 

581 self._FILE_PATH = new_file_path 

582 

583 def _get_api_key(self): 

584 self._set_api_key() 

585 

586 if self.API_KEY: 

587 return f'&key={self.API_KEY}' 

588 else: 

589 logger.debug('Could not locate a Census Bureau API key.') 

590 return '' 

591 

592 

593 def _set_api_key(self): 

594 if not self.API_KEY: 

595 # First, check the operating system environment. 

596 key = os.environ.get(self.OS_ENV_LOCATION, None) 

597 

598 # Next, check the file. 

599 if not key: 

600 try: 

601 with open(self.FILE_PATH, 'r') as f: 

602 key = f.readlines()[0] 

603 except: 

604 key = None 

605 

606 self._API_KEY = key 

607 

608def _dataset_meta_check(dataset: str, year: int) -> None: 

609 if 'acs1' in dataset and year == 2020: 

610 raise APIException( 

611 f"The Census Bureau did not release 2020 estimates for the '{dataset}' dataset due " 

612 "to the impact of the COVID-19 pandemic on data collection efforts for 1-year estimate " 

613 "data. Nonetheless, experimental data for the American Community Survey's 1-year data " 

614 "estimates can be viewed at https://www.census.gov/programs-surveys/acs/data/experimental-data/1-year.html." 

615 ) 

616 

617 if dataset not in API_METADATA: 

618 raise KeyError( 

619 f"'{dataset}' was not a recognizable/supported dataset. Supported datasets include " 

620 f"one of: {list(API_METADATA)}" 

621 ) 

622 

623 

624 

625_InferSpec = namedtuple( 

626 '_InferSpec', 

627 ['spec', 'len', 'kwargs', 'supports_wildcard'], 

628 defaults = ([], None, {}, {}) 

629) 

630""" 

631Custom tuple for fully-specified geographic paths specifying: 

6321. The list of geographic specifiers actually imputed into API queries. 

6332. The length of the fully-specified path, which is advantageous for strict/weak testing. 

6343. The keyword arguments that the user will actually supply, whose values correspond to 

635the specifiers that will be imputed into API queries. 

6364. Geographic specifiers that do and don't support wildcard operators (`'*'`). 

637"""