Coverage for acspsuedo / source / low / protocols.py: 74%

70 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-11 16:02 +0000

1""" 

2HTTPS GET methods/protocols. 

3""" 

4import asyncio 

5import typing as t 

6from warnings import warn 

7from logging import getLogger 

8 

9import aiohttp 

10import requests 

11import pandas as pd 

12 

13from acspsuedo.source.low.exceptions import APIException 

14 

15 

16logger = getLogger(__name__) 

17 

18 

19def fetch_content(url: str): 

20 """ 

21 Synchronous method to fetch the JSON content for a(n) URL. 

22 """ 

23 resp = requests.get(url) 

24 try: 

25 return resp.json() 

26 except requests.JSONDecodeError: 

27 msg = f'Error; HTTPS Status {resp.status_code}. ' + f'Response Text:\n{resp.text}' 

28 raise APIException( 

29 msg 

30 ) from None 

31 

32 

33def fetch_table(urls: t.List[str]) -> pd.DataFrame: 

34 """ 

35 Synchronous method to fetch data from the Census Bureau. 

36 """ 

37 

38 dfs = list( map(_fetch_table, urls) ) 

39 df = pd.concat(dfs, axis = 1) 

40 return df 

41 

42 

43 

44 

45async def batch_fetch_content( 

46 session: aiohttp.ClientSession, 

47 urls: t.Union[list[str], str], 

48 retry_rate: int = 30, 

49 timeout_rate: t.Union[float, int] = 0.1 

50) -> pd.DataFrame: 

51 """ 

52 Asynchronous method to fetch data from the Census Bureau. 

53 """ 

54 results = await _batch_fetch_content(session, urls, retry_rate, timeout_rate) 

55 comp_urls = [r.get('url', '') for r in results] 

56 contents = [r.get('content', [[], []]) for r in results] 

57 

58 dfs = [_census_df_fmtter(url, content) for url, content in zip(comp_urls, contents)] 

59 df = pd.concat(dfs, axis = 1) 

60 

61 if len(comp_urls) != len(urls): 

62 failed_urls = [f"'{r.get('url', '')}', Status: {r.get('content', '')}" for r in results 

63 if r.get('progress') != 'completed'] 

64 warn( 

65 f"Some data will be omitted since queries for some URLs failed:\n" \ 

66 f"{'\n'.join(failed_urls)}", 

67 UserWarning 

68 ) 

69 

70 

71 return df 

72 

73 

74async def _batch_fetch_content( 

75 session: aiohttp.ClientSession, 

76 urls: t.Union[list[str], str], 

77 retry_rate: int = 30, 

78 timeout_rate: t.Union[float, int] = 0.1 

79) -> list[dict]: 

80 """ 

81 Asynchronous method to fetch JSON objects (via HTTPS GET methods) 

82 concurrently. 

83 

84 Parameters 

85 ---------- 

86 urls 

87 One, or multiple, URLs 

88 

89 retry_rate 

90 In case of server-based blocking, how many attempts can be 

91 made per URL. Default 30. 

92 

93 timeout_rate 

94 By how much (in seconds) should each request be delayed by. 

95 Default 0.1. 

96 

97 Returns 

98 ------- 

99 A list of dictionaries containing the queried URL(s), JSON 

100 content (if the request was successful), and the state of the 

101 request. 

102  

103 Note: If a queried URL had a successful request, the state 

104 would be marked as 'completed'; in cases otherwise, it would be 

105 marked as'interrupted' and the returned content would contain an 

106 error message. 

107 """ 

108 results = await _fetch_content(urls, session, retry_rate, timeout_rate) 

109 return results 

110 

111 

112 

113async def _fetch_content( 

114 urls: t.Union[list[str], str], 

115 session: aiohttp.ClientSession, 

116 retry_rate: int = 30, 

117 timeout_rate: t.Union[float, int] = 0.1 

118) -> list[dict]: 

119 """ 

120 The actual implementation for fetching JSON objects concurrently. 

121 

122 Parameters 

123 ---------- 

124 urls 

125 One, or multiple, URLs 

126 

127 session 

128 An instance of :py:class:`aiohttp.ClientSession` to handle 

129 concurrent HTTPS GET requests. 

130 

131 retry_rate 

132 In case of server-based blocking, how many attempts can be 

133 made per URL. Default 30. 

134 

135 timeout_rate 

136 By how much (in seconds) should each request be delayed by. 

137 Default 0.1. 

138 """ 

139 if not isinstance(urls, list): 

140 urls = [urls] 

141 

142 url_tasks = [{'url': url, 'content': None, 'progress': 'not started'} for url in urls] 

143 

144 async with session: 

145 await asyncio.gather(*[_get(task, session, retry_rate, timeout_rate) for task in url_tasks]) 

146 

147 return url_tasks 

148 

149 

150async def _get( 

151 task: dict, 

152 session: aiohttp.ClientSession, 

153 retry_rate: int = 30, 

154 timeout_rate: t.Union[int, float] = 0.1 

155): 

156 """ 

157 Note that the retry_rate applies for when there are server-backed blocks, 

158 and the timeout_rate applies for rate-checking request attempts. 

159 """ 

160 resp = await session.request('GET', task['url']) 

161 if resp.status == 429: 

162 logger.warning("%s must be restarted due to too many server requests. " 

163 "Restarting...", task['url']) 

164 

165 if 'without a key' in await resp.text(): 

166 logger.warning("The content of %s cannot be fetched due to daily query limits without " 

167 "a key. Progress status resolved as completed.", task['url']) 

168 

169 task['content'] = 'Exceed daily limit for queries without an API key.' 

170 task['progress'] = 'completed' 

171 else: 

172 if retry_rate > 0: 

173 await asyncio.sleep(timeout_rate) 

174 task['progress'] = 'not started' 

175 await _get(task, session, retry_rate - 1) 

176 else: 

177 task['content'] = 'Too many requests.' 

178 task['progress'] = 'completed' 

179 

180 else: 

181 try: 

182 task['content'] = await resp.json() 

183 task['progress'] = 'completed' 

184 except aiohttp.ContentTypeError as e: 

185 task['content'] = e.message 

186 task['progress'] = 'interrupted' 

187 

188 await asyncio.sleep(timeout_rate) 

189 

190 

191def _fetch_table(url: str) -> pd.DataFrame: 

192 content = fetch_content(url) 

193 

194 return _census_df_fmtter(url, content) 

195 

196 

197def _census_df_fmtter(url: str, content: t.Any) -> pd.DataFrame: 

198 if ( 

199 isinstance(content, list) and 

200 len(content) > 1 and 

201 isinstance(content[0], list) 

202 ): 

203 

204 upper_repl: t.Callable[[str], str] = lambda x: x.replace('(', '') \ 

205 .replace(')', '') \ 

206 .replace('/', '_') \ 

207 .replace('-', '_') \ 

208 .replace(' ', '_') 

209 

210 df = pd.DataFrame( 

211 columns = [upper_repl(col) for col in content[0]], 

212 data = content[1:] 

213 ) 

214 return df 

215 

216 

217 raise APIException( 

218 f"Expected a list of lists from '{url}'. Returned content type: {type(content)}." 

219 )