Coverage for acspsuedo / source / low / protocols.py: 74%
70 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-11 16:02 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-11 16:02 +0000
1"""
2HTTPS GET methods/protocols.
3"""
4import asyncio
5import typing as t
6from warnings import warn
7from logging import getLogger
9import aiohttp
10import requests
11import pandas as pd
13from acspsuedo.source.low.exceptions import APIException
16logger = getLogger(__name__)
19def fetch_content(url: str):
20 """
21 Synchronous method to fetch the JSON content for a(n) URL.
22 """
23 resp = requests.get(url)
24 try:
25 return resp.json()
26 except requests.JSONDecodeError:
27 msg = f'Error; HTTPS Status {resp.status_code}. ' + f'Response Text:\n{resp.text}'
28 raise APIException(
29 msg
30 ) from None
33def fetch_table(urls: t.List[str]) -> pd.DataFrame:
34 """
35 Synchronous method to fetch data from the Census Bureau.
36 """
38 dfs = list( map(_fetch_table, urls) )
39 df = pd.concat(dfs, axis = 1)
40 return df
45async def batch_fetch_content(
46 session: aiohttp.ClientSession,
47 urls: t.Union[list[str], str],
48 retry_rate: int = 30,
49 timeout_rate: t.Union[float, int] = 0.1
50) -> pd.DataFrame:
51 """
52 Asynchronous method to fetch data from the Census Bureau.
53 """
54 results = await _batch_fetch_content(session, urls, retry_rate, timeout_rate)
55 comp_urls = [r.get('url', '') for r in results]
56 contents = [r.get('content', [[], []]) for r in results]
58 dfs = [_census_df_fmtter(url, content) for url, content in zip(comp_urls, contents)]
59 df = pd.concat(dfs, axis = 1)
61 if len(comp_urls) != len(urls):
62 failed_urls = [f"'{r.get('url', '')}', Status: {r.get('content', '')}" for r in results
63 if r.get('progress') != 'completed']
64 warn(
65 f"Some data will be omitted since queries for some URLs failed:\n" \
66 f"{'\n'.join(failed_urls)}",
67 UserWarning
68 )
71 return df
74async def _batch_fetch_content(
75 session: aiohttp.ClientSession,
76 urls: t.Union[list[str], str],
77 retry_rate: int = 30,
78 timeout_rate: t.Union[float, int] = 0.1
79) -> list[dict]:
80 """
81 Asynchronous method to fetch JSON objects (via HTTPS GET methods)
82 concurrently.
84 Parameters
85 ----------
86 urls
87 One, or multiple, URLs
89 retry_rate
90 In case of server-based blocking, how many attempts can be
91 made per URL. Default 30.
93 timeout_rate
94 By how much (in seconds) should each request be delayed by.
95 Default 0.1.
97 Returns
98 -------
99 A list of dictionaries containing the queried URL(s), JSON
100 content (if the request was successful), and the state of the
101 request.
103 Note: If a queried URL had a successful request, the state
104 would be marked as 'completed'; in cases otherwise, it would be
105 marked as'interrupted' and the returned content would contain an
106 error message.
107 """
108 results = await _fetch_content(urls, session, retry_rate, timeout_rate)
109 return results
113async def _fetch_content(
114 urls: t.Union[list[str], str],
115 session: aiohttp.ClientSession,
116 retry_rate: int = 30,
117 timeout_rate: t.Union[float, int] = 0.1
118) -> list[dict]:
119 """
120 The actual implementation for fetching JSON objects concurrently.
122 Parameters
123 ----------
124 urls
125 One, or multiple, URLs
127 session
128 An instance of :py:class:`aiohttp.ClientSession` to handle
129 concurrent HTTPS GET requests.
131 retry_rate
132 In case of server-based blocking, how many attempts can be
133 made per URL. Default 30.
135 timeout_rate
136 By how much (in seconds) should each request be delayed by.
137 Default 0.1.
138 """
139 if not isinstance(urls, list):
140 urls = [urls]
142 url_tasks = [{'url': url, 'content': None, 'progress': 'not started'} for url in urls]
144 async with session:
145 await asyncio.gather(*[_get(task, session, retry_rate, timeout_rate) for task in url_tasks])
147 return url_tasks
150async def _get(
151 task: dict,
152 session: aiohttp.ClientSession,
153 retry_rate: int = 30,
154 timeout_rate: t.Union[int, float] = 0.1
155):
156 """
157 Note that the retry_rate applies for when there are server-backed blocks,
158 and the timeout_rate applies for rate-checking request attempts.
159 """
160 resp = await session.request('GET', task['url'])
161 if resp.status == 429:
162 logger.warning("%s must be restarted due to too many server requests. "
163 "Restarting...", task['url'])
165 if 'without a key' in await resp.text():
166 logger.warning("The content of %s cannot be fetched due to daily query limits without "
167 "a key. Progress status resolved as completed.", task['url'])
169 task['content'] = 'Exceed daily limit for queries without an API key.'
170 task['progress'] = 'completed'
171 else:
172 if retry_rate > 0:
173 await asyncio.sleep(timeout_rate)
174 task['progress'] = 'not started'
175 await _get(task, session, retry_rate - 1)
176 else:
177 task['content'] = 'Too many requests.'
178 task['progress'] = 'completed'
180 else:
181 try:
182 task['content'] = await resp.json()
183 task['progress'] = 'completed'
184 except aiohttp.ContentTypeError as e:
185 task['content'] = e.message
186 task['progress'] = 'interrupted'
188 await asyncio.sleep(timeout_rate)
191def _fetch_table(url: str) -> pd.DataFrame:
192 content = fetch_content(url)
194 return _census_df_fmtter(url, content)
197def _census_df_fmtter(url: str, content: t.Any) -> pd.DataFrame:
198 if (
199 isinstance(content, list) and
200 len(content) > 1 and
201 isinstance(content[0], list)
202 ):
204 upper_repl: t.Callable[[str], str] = lambda x: x.replace('(', '') \
205 .replace(')', '') \
206 .replace('/', '_') \
207 .replace('-', '_') \
208 .replace(' ', '_')
210 df = pd.DataFrame(
211 columns = [upper_repl(col) for col in content[0]],
212 data = content[1:]
213 )
214 return df
217 raise APIException(
218 f"Expected a list of lists from '{url}'. Returned content type: {type(content)}."
219 )