feat(fetcher): refresh access_token automatically
This commit is contained in:
@@ -38,6 +38,22 @@ class BaseFetcher:
|
||||
"Content-Type": "application/json",
|
||||
}
|
||||
|
||||
async def request_api(self, url: str, method: str = "GET", **kwargs) -> dict:
|
||||
if self.is_token_expired():
|
||||
await self.refresh_access_token()
|
||||
header = kwargs.pop("headers", {})
|
||||
header = self.header
|
||||
|
||||
async with AsyncClient() as client:
|
||||
response = await client.request(
|
||||
method,
|
||||
url,
|
||||
headers=header,
|
||||
**kwargs,
|
||||
)
|
||||
response.raise_for_status()
|
||||
return response.json()
|
||||
|
||||
def is_token_expired(self) -> bool:
|
||||
return self.token_expiry <= int(time.time())
|
||||
|
||||
|
||||
Reference in New Issue
Block a user