Coverage for acspsuedo / source / geog.py: 93%
206 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-11 16:02 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-11 16:02 +0000
1"""
2Handler objects for geographic specifiers and the (optionally
3user supplied) Census Bureau API key.
4"""
5import os
6import typing as t
7import collections.abc as cABC
8from itertools import groupby, combinations
9from collections import namedtuple, defaultdict
10from pathlib import Path
11from warnings import warn
12from logging import getLogger
15from acspsuedo.source.low.protocols import fetch_content
16from acspsuedo.source.low.exceptions import APIException
17from acspsuedo.source.shpfile_fmt import GEO_SPEC_METADATA
18from acspsuedo.datasets import API_METADATA
21logger = getLogger(__name__)
24class GeoScopeException(APIException):
25 """Exceptions for geographic scope-related errors."""
26 pass
28class UnsupportedSpecException(APIException):
29 """Exceptions for any unsupported specifier handling."""
30 pass
34class GeoSpecFmtter:
35 """
36 Formatter for geographic specifiers.
37 """
38 def __init__(
39 self,
40 **geog_specifiers
41 ) -> None:
42 """
43 Formatter for geographic specifiers.
45 Note that instances can double as callables, such that the results of the
46 callable refer to the fully-specified geographic path (for a dataset of
47 interest during a calendar year) inferred from the supplied specifiers.
49 Parameters
50 ----------
51 geog_specifiers
52 Geographic specifiers of interest.
54 See `GeoSpecFmtter.view_geographic_areas()` to view an exhaustive list
55 of fully-specified paths, each containing the component geographic
56 specifiers required for each path, or `GeoSpecFmtter.check_path_existence()`,
57 to view whether or not a particular specifier (or specified path) is
58 supported by a dataset of interest during a particular calendar year.
59 """
61 self._geog_specifiers = geog_specifiers
63 @property
64 def geog_specifiers(self):
65 return self._geog_specifiers
67 @geog_specifiers.setter
68 def geog_specifiers(self, new_geog_specifiers):
69 self._geog_specifiers = new_geog_specifiers
71 def __len__(self):
72 """Return the length of the supplied geographic specifiers."""
73 return len(self.geog_specifiers)
75 def _kwarg_fmt(self):
76 return ', '.join( [f"{k} = '{v}'" for k, v in self.geog_specifiers.items()] )
78 def __str__(self) -> str:
79 return f"GeoSpecFmtter({self._kwarg_fmt()})"
81 def __repr__(self) -> str:
82 return str(self)
84 def __call__(self, dataset: str, year: int) -> str:
85 """
86 Infer the specific path for a given dataset on a supported year
87 using the instance's supplied keyword arguments and return the
88 formatted geographic path.
89 """
90 fmt_path, _ = GeoSpecFmtter.get_fmt_path(dataset, year, **self.geog_specifiers)
91 return fmt_path
93 @classmethod
94 def check_path_existence(
95 cls,
96 dataset: str,
97 year: int,
98 geographic_specifier: t.Union[cABC.Iterable[str], str]
99 ) -> t.Optional[list[list[str]]]:
100 """
101 Check if the specifier, or collection of specifiers, is supported for the
102 dataset of interest during the specified calendar year.
104 If supported, returns a list of fully-specified geographic paths containing
105 at least one of the supplied specifiers anywhere.
107 Parameters
108 ----------
109 dataset
110 The Census Bureau dataset of interest. See `~datasets`
111 for a list of supported datasets.
113 year
114 The calendar year, for which the dataset must be supported
115 on.
117 geographic_specifier
118 One, or multiple, geographic specifiers.
121 Returns
122 -------
123 If found, returns a list containing all fully-specified paths corresponding
124 to the specifier or collection of specifiers. Otherwise, a warning is raised.
125 """
127 if isinstance(geographic_specifier, str):
128 geographic_specifier = [geographic_specifier]
130 paths = cls.view_geographic_paths(dataset, year)
131 paths = [p for p in paths if all(q in p for q in geographic_specifier)]
133 if paths:
134 return paths
135 else:
136 msg = \
137 f"\nCould not find any fully-specified paths corresponding to " \
138 f"{geographic_specifier} in the '{dataset}' dataset for the {year} calendar\n" \
139 f"year. This may be due to a combination of the following reasons:\n" \
140 " 1. Potential misspelling in the geographic specifier(s)\n" \
141 " 2. Unavailable and/or unsupported geographic specifier(s) for the dataset and calendar year.\n" \
142 " 3. Incompatible combination of geographic specifiers."
144 warn( msg, UserWarning )
145 return
147 @classmethod
148 def view_geographic_paths(
149 cls,
150 dataset: str,
151 year: int
152 ) -> list[list[str]]:
153 """
154 View all fully-specified geographic paths that are
155 supported by a dataset of interest during the calendar
156 year of interest.
158 Parameters
159 ----------
160 dataset
161 The Census Bureau dataset of interest. See `~datasets`
162 for a list of supported datasets.
164 year
165 The calendar year, for which the dataset must be supported
166 on.
168 Returns
169 -------
170 A list of fully-specified geographic pathways, each of which
171 are a list and whose respective last element represent the
172 'for' clause in queries to the Census Bureau.
174 For instance, one fully-specified path may be ['state', 'county']
175 where 'county' represents the 'for' clause to specify queries to
176 a particular geographic scope while 'state' represents the 'in'
177 clause, which governs how geographies to the aforementioned scope
178 are restricted to.
180 Thus, for this particular example, data will be shown for county-level
181 geographies based on the restriction in the specified state-level
182 geographies: `{'state': '06', 'county': '*'}` specifies the user wishes
183 to query all county-level geographies (the wildcard '`*`' operator) for
184 the state of California ('06' corresponding to the FIPS code for California).
185 """
187 paths = cls.__get_all_paths(dataset, year)
189 paths = [list(p.kwargs) for p in paths]
191 return paths
193 @classmethod
194 def get_geo_cols(cls, **geog_specifiers):
195 """
196 Given a set of geographic specifiers, list the names of their respective
197 geographic columns that will be returned from data queries.
198 """
199 return list(dict.fromkeys([x for k in geog_specifiers for x in GEO_SPEC_METADATA[k][1]]))
201 _repl: t.Callable[[str], str] = lambda x: x.replace('(', '') \
202 .replace(')', '') \
203 .replace('-', '_') \
204 .replace('/', '_') \
205 .replace(' ', '_')
207 __CACHE_PATHS_BY_DATASET_YEAR: t.DefaultDict[
208 str, dict[int, list['_InferSpec']]
209 ] = defaultdict(dict)
210 """
211 Internal for caching information on geographic pathways w/o having
212 to query the Bureau's API each time.
213 """
215 @classmethod
216 def get_fmt_path(
217 cls,
218 dataset: str,
219 year: int,
220 **kwargs
221 ) -> t.Tuple[str, t.Dict[str, str]]:
222 """
223 Given a set of geographic specifiers, get the fully-specified
224 geographic path for a dataset of interest on a supported year.
226 Parameters
227 ----------
228 dataset
229 The Census Bureau dataset of interest. See `~datasets`
230 for a list of supported datasets.
232 year
233 The calendar year, for which the dataset must be supported
234 on.
236 **kwargs
237 A set of geographic specifiers. Note that the wildcard
238 operator, `*`, indicates that the user wishes to query
239 information for the entire set of geographies of the
240 scope (e.g. `state = '*'` indicates the users wishes to
241 view a dataset's information across all states).
243 Returns
244 -------
245 A formatted string corresponding to the fully specified
246 geographic path.
247 """
248 return cls._fmt_path(dataset, year, **kwargs)
251 @classmethod
252 def _fmt_path(
253 cls,
254 dataset: str,
255 year: int,
256 **kwargs
257 ):
258 """
259 Internal for taking the inferred geographic path
260 and formatting it into the data query URL.
261 """
262 path = cls._infer_path(dataset, year, **kwargs)
263 logger.debug('Inferred the following path -- %s', path)
265 if path.len == 1:
266 return '&for={}:{}'.format(*path.spec, *path.kwargs.values()), path.kwargs
267 else:
268 *in_k, for_k = path.spec
269 *in_v, for_v = path.kwargs.values()
271 in_clause = ' '.join([f'{k}:{v}' for k, v in zip(in_k, in_v)])
273 return '&for={}:{}&in={}'.format(for_k, for_v, in_clause), path.kwargs
275 @classmethod
276 def _infer_path(
277 cls,
278 dataset: str,
279 year: int,
280 **kwargs
281 ) -> "_InferSpec":
282 """
283 Given a set of geographic specifiers, infer the path
284 specification of interest for a dataset on a given year.
286 Parameters
287 ----------
288 dataset
289 The Census Bureau dataset of interest. See `~datasets`
290 for a list of supported datasets.
292 year
293 The calendar year, for which the dataset must be supported
294 on.
296 **kwargs
297 A set of geographic specifiers. Note that the wildcard
298 operator, `*`, indicates that the user wishes to query
299 information for the entire set of geographies of the
300 scope (e.g. `state = '*'` indicates the users wishes to
301 view a dataset's information across all states).
303 Returns
304 -------
305 An internal (:py:class:`~_InferSpec`, subclassing
306 `collections.namedtuple`) for formatting query URLs.
307 """
308 all_paths = cls.__get_all_paths(dataset, year)
310 path = cls.__infer_path(dataset, year, all_paths, **kwargs)
312 cls.__wc_check(path)
314 return path
316 @classmethod
317 def __get_all_paths(
318 cls,
319 dataset: str,
320 year: int
321 ) -> list['_InferSpec']:
322 """
323 Fetch protocol to get all of a dataset's geographic paths for
324 a particular year.
326 If a dataset for a certain year has already been fetched in some
327 way (whether that be perusing, or path inference), it will be
328 cached so as to avoid repeated calls to the Bureau.
329 """
330 try:
331 paths = cls.__CACHE_PATHS_BY_DATASET_YEAR[dataset][year]
332 logger.debug("Found the '%s' dataset for the calendar year %s in cache.",dataset, year)
333 except:
334 paths = cls._list_geo_specs(dataset, year)
335 cls.__CACHE_PATHS_BY_DATASET_YEAR[dataset][year] = paths
336 logger.debug(
337 "Could not find the '%s' dataset for the calendar year %s in cache. "
338 "Fetched and cached for potential re-use later.", dataset, year
339 )
341 return paths
343 @classmethod
344 def __wc_check(
345 cls,
346 path: '_InferSpec',
347 ) -> None:
348 """
349 Check if any specifiers are supplied with wildcards that shouldn't be.
350 Raises an error.
351 """
352 wc_errors = [k for check, (k, v) in zip(path.spec, path.kwargs.items())
353 if v == '*' and not path.supports_wildcard[check]]
355 if wc_errors:
356 raise GeoScopeException(
357 f"The wildcard operator (`*`) is not permitted for the specifier(s) {wc_errors} "
358 f"in the supplied path: {path.spec}."
359 )
361 @classmethod
362 def __infer_path(
363 cls,
364 dataset: str,
365 year: int,
366 geo_specs: list['_InferSpec'],
367 **kwargs
368 ):
369 """
370 Actual implementation of inferring a path specification.
371 """
373 path_len = len(kwargs)
375 # Loose search for matches
376 #
377 # Note that while we loosely search for paths
378 # containing at least one instance of any specifier,
379 # we are also priming for irreducibility by testing
380 # the length of supplied (and supported) specifiers
381 # (cf. strict test below).
382 opts = [p for p in geo_specs if any(k in p.kwargs for k in kwargs)]
384 if opts:
385 full = [p for p in opts if (p.len == path_len) and
386 all(k in p.kwargs for k in kwargs)]
388 if full:
389 return cls._full_match(full, **kwargs)
390 else:
391 # 'for' clause is governed by the last specifier of a path.
392 avail_specs = list(dict.fromkeys([list(p.kwargs)[-1] for p in geo_specs]))
394 return cls._partial_match(dataset, year, opts, avail_specs, **kwargs)
396 else:
397 raise GeoScopeException(
398 "Invalid/unsupported geographic specifiers were supplied for the "
399 f"'{dataset}' dataset for the {year} calendar year."
400 )
402 @classmethod
403 def _full_match(
404 cls,
405 full: list['_InferSpec'],
406 **kwargs
407 ) -> '_InferSpec':
408 """
409 Full test.
410 """
411 # Note that, by this construction, each path is irreducible. Thus, if a path (e.g.
412 # ['state', 'county']) was similar to that containing itself and some additional
413 # specifiers (e.g. ['state', 'county', 'tract']), only the former would be matched
414 # because it is of length 2 (or the exact amount of specifiers supplied).
415 if len(full) > 1:
416 raise GeoScopeException(
417 "Excessive geographic specifiers; could not infer a(n) unique geographic "
418 f"path. Inferred multiple paths: {[list(p.kwargs) for p in full]}."
419 )
421 match = full[0]
422 match.kwargs.update(kwargs)
423 return match
425 @classmethod
426 def _partial_match(
427 cls,
428 dataset: str,
429 year: int,
430 opts: list['_InferSpec'],
431 avail_specs: list,
432 **kwargs
433 ):
434 """
435 Partial test. Defaulted to in the case of null full matches.
436 """
437 # Note that, in the initial construction, 2^n combinations are generated based on
438 # the supported kwargs inputed, where n represents the length of supported kwargs.
439 # Thus, we are effectively searching for spec paths such that they correspond to
440 # at least one of these 2^n path combinations. From there, if the generated results
441 # miss any of the specifiers, we go back and specify it.
443 c_test = [c for L in range(len(kwargs) + 1) for c in combinations(kwargs, L) ]
445 p_opts = [list(p.kwargs) for c in c_test for p in opts if set(p.kwargs).issubset(c)]
447 # If we miss paths for certain specifiers, add them.
448 for k in kwargs:
449 if not any(k in i for i in p_opts):
450 p_opts.extend([list(p.kwargs) for p in opts if k in p.kwargs])
452 # Remove duplicates
453 p_opts.sort()
454 p_opts = list(p_opts for p_opts, _ in groupby(p_opts))
456 # If any scopes are not supported, list them.
457 unsupported = [k for k in kwargs if k not in avail_specs]
458 usd_msg = f' Additionally, invalid/unsupported specifiers were found: {unsupported}.' if \
459 unsupported else ''
461 raise GeoScopeException(
462 "Could not infer a fully-specified path from the supplied geographic specifiers: "
463 f"{list(kwargs)}. Potential fully-specified path matches for the '{dataset}' "
464 f"dataset during the {year} calendar year based on the supplied specifiers include "
465 f"one of: {p_opts}.{usd_msg}"
466 )
469 @classmethod
470 def _list_geo_specs(cls, dataset: str, year: int) -> list['_InferSpec']:
471 """
472 Internal for fetching all possible geographic paths for a given
473 dataset during a given calendar year.
474 """
476 url = cls._url_fmt(dataset, year)
477 try:
478 content = fetch_content(url)
479 except:
480 raise APIException(
481 f"The {year} calendar year was not supported for the '{dataset}' dataset."
482 ) from None
483 geographies = content.get('fips', None)
485 if geographies:
486 geo_combinations = []
488 for g in geographies:
489 scope = g.get('name')
490 reqs = g.get('requires', [])
491 opt_reqs = g.get('optionalWithWCFor', [])
492 if not isinstance(opt_reqs, list):
493 opt_reqs = [opt_reqs]
494 wcs = g.get('wildcard', [])
496 d_scope, d_reqs = cls._repl(scope), map(cls._repl, reqs)
498 scp = [*reqs, scope]
499 d_scp = [*d_reqs, d_scope]
501 wcs = {**{k: True if k in wcs else False for k in reqs}, scope: True}
503 ifp = _InferSpec(scp, len(scp), dict.fromkeys(d_scp), wcs)
504 geo_combinations.append(ifp)
506 # Accomodate for any optional specifiers
507 if opt_reqs:
508 for opt in opt_reqs:
509 mod_ifp = [j for j in scp if j != opt]
510 mod_wcs = {k: v for k, v in wcs.items() if k != opt}
511 d_mod_ifp = map(cls._repl, mod_ifp)
513 mod_ifp = _InferSpec(mod_ifp, len(mod_ifp), dict.fromkeys(d_mod_ifp), mod_wcs)
514 geo_combinations.append(mod_ifp)
516 return geo_combinations
518 else:
519 # Although American Community Survey datasets have geographic specifiers,
520 # this specific implementation is to call out any extraneous handling for
521 # multi-year handling (TODO).
522 raise UnsupportedSpecException(
523 f"The '{dataset}' API does not have supported geographic specifier handling "
524 f"for Federal Informating Process Standard (FIPS) codes during the {year} "
525 f"calendar year."
526 )
528 @classmethod
529 def _url_fmt(cls, dataset: str, year: int) -> str:
530 _dataset_meta_check(dataset, year)
531 geography_url = 'https://api.census.gov/data/{}/{}/geography.json'.format(year, dataset)
533 return geography_url
536class ApiKeyConfig:
537 """
538 Formatter for user-defined Census Bureau API keys.
539 """
541 def __init__(self) -> None:
542 self._FILE_PATH = Path.cwd() / 'api_key.txt'
543 self._OS_ENV_LOCATION = 'CENSUS_BUREAU_API_KEY'
545 self._API_KEY = None
547 @property
548 def API_KEY(self):
549 """
550 The API key. Note that this can be directly set, if
551 you prefer.
552 """
553 return self._API_KEY
555 @API_KEY.setter
556 def API_KEY(self, new_key: t.Any):
557 self._API_KEY = new_key
559 @property
560 def OS_ENV_LOCATION(self):
561 """
562 The operation system (OS) environment location to the
563 API key. Note that this is prioritized first.
564 """
565 return self._OS_ENV_LOCATION
567 @OS_ENV_LOCATION.setter
568 def OS_ENV_LOCATION(self, new_location: str):
569 self._OS_ENV_LOCATION = new_location
571 @property
572 def FILE_PATH(self):
573 """
574 The textfile path containing the API key. Note that this
575 is prioritized second.
576 """
577 return self._FILE_PATH
579 @FILE_PATH.setter
580 def FILE_PATH(self, new_file_path: t.Union[str, Path]):
581 self._FILE_PATH = new_file_path
583 def _get_api_key(self):
584 self._set_api_key()
586 if self.API_KEY:
587 return f'&key={self.API_KEY}'
588 else:
589 logger.debug('Could not locate a Census Bureau API key.')
590 return ''
593 def _set_api_key(self):
594 if not self.API_KEY:
595 # First, check the operating system environment.
596 key = os.environ.get(self.OS_ENV_LOCATION, None)
598 # Next, check the file.
599 if not key:
600 try:
601 with open(self.FILE_PATH, 'r') as f:
602 key = f.readlines()[0]
603 except:
604 key = None
606 self._API_KEY = key
608def _dataset_meta_check(dataset: str, year: int) -> None:
609 if 'acs1' in dataset and year == 2020:
610 raise APIException(
611 f"The Census Bureau did not release 2020 estimates for the '{dataset}' dataset due "
612 "to the impact of the COVID-19 pandemic on data collection efforts for 1-year estimate "
613 "data. Nonetheless, experimental data for the American Community Survey's 1-year data "
614 "estimates can be viewed at https://www.census.gov/programs-surveys/acs/data/experimental-data/1-year.html."
615 )
617 if dataset not in API_METADATA:
618 raise KeyError(
619 f"'{dataset}' was not a recognizable/supported dataset. Supported datasets include "
620 f"one of: {list(API_METADATA)}"
621 )
625_InferSpec = namedtuple(
626 '_InferSpec',
627 ['spec', 'len', 'kwargs', 'supports_wildcard'],
628 defaults = ([], None, {}, {})
629)
630"""
631Custom tuple for fully-specified geographic paths specifying:
6321. The list of geographic specifiers actually imputed into API queries.
6332. The length of the fully-specified path, which is advantageous for strict/weak testing.
6343. The keyword arguments that the user will actually supply, whose values correspond to
635the specifiers that will be imputed into API queries.
6364. Geographic specifiers that do and don't support wildcard operators (`'*'`).
637"""