fix(fetcher): handle token timeout gracefully
This commit is contained in:
@@ -3,10 +3,13 @@ from typing import Annotated
|
|||||||
from app.config import settings
|
from app.config import settings
|
||||||
from app.dependencies.database import get_redis
|
from app.dependencies.database import get_redis
|
||||||
from app.fetcher import Fetcher as OriginFetcher
|
from app.fetcher import Fetcher as OriginFetcher
|
||||||
|
from app.fetcher._base import TokenAuthError
|
||||||
|
from app.log import fetcher_logger
|
||||||
|
|
||||||
from fastapi import Depends
|
from fastapi import Depends
|
||||||
|
|
||||||
fetcher: OriginFetcher | None = None
|
fetcher: OriginFetcher | None = None
|
||||||
|
logger = fetcher_logger("FetcherDependency")
|
||||||
|
|
||||||
|
|
||||||
async def get_fetcher() -> OriginFetcher:
|
async def get_fetcher() -> OriginFetcher:
|
||||||
@@ -24,7 +27,14 @@ async def get_fetcher() -> OriginFetcher:
|
|||||||
if access_token:
|
if access_token:
|
||||||
fetcher.access_token = str(access_token)
|
fetcher.access_token = str(access_token)
|
||||||
# Always ensure the access token is valid, regardless of initial state
|
# Always ensure the access token is valid, regardless of initial state
|
||||||
await fetcher.ensure_valid_access_token()
|
try:
|
||||||
|
await fetcher.ensure_valid_access_token()
|
||||||
|
except TokenAuthError as exc:
|
||||||
|
logger.warning(
|
||||||
|
f"Failed to refresh fetcher access token during startup: {exc}. Will retry on demand."
|
||||||
|
)
|
||||||
|
except Exception as exc: # noqa: BLE001
|
||||||
|
logger.exception("Unexpected error while initializing fetcher access token", exc_info=exc)
|
||||||
return fetcher
|
return fetcher
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -5,7 +5,7 @@ import time
|
|||||||
from app.dependencies.database import get_redis
|
from app.dependencies.database import get_redis
|
||||||
from app.log import fetcher_logger
|
from app.log import fetcher_logger
|
||||||
|
|
||||||
from httpx import AsyncClient, HTTPStatusError
|
from httpx import AsyncClient, HTTPStatusError, TimeoutException
|
||||||
|
|
||||||
|
|
||||||
class TokenAuthError(Exception):
|
class TokenAuthError(Exception):
|
||||||
@@ -159,35 +159,62 @@ class BaseFetcher:
|
|||||||
return True
|
return True
|
||||||
return self.token_expiry <= int(time.time()) or not self.access_token
|
return self.token_expiry <= int(time.time()) or not self.access_token
|
||||||
|
|
||||||
async def grant_access_token(self) -> None:
|
async def grant_access_token(self, retries: int = 3, backoff: float = 1.0) -> None:
|
||||||
async with AsyncClient() as client:
|
last_error: Exception | None = None
|
||||||
response = await client.post(
|
async with AsyncClient(timeout=30.0) as client:
|
||||||
"https://osu.ppy.sh/oauth/token",
|
for attempt in range(1, retries + 1):
|
||||||
data={
|
try:
|
||||||
"client_id": self.client_id,
|
response = await client.post(
|
||||||
"client_secret": self.client_secret,
|
"https://osu.ppy.sh/oauth/token",
|
||||||
"grant_type": "client_credentials",
|
data={
|
||||||
"scope": "public",
|
"client_id": self.client_id,
|
||||||
},
|
"client_secret": self.client_secret,
|
||||||
)
|
"grant_type": "client_credentials",
|
||||||
response.raise_for_status()
|
"scope": "public",
|
||||||
token_data = response.json()
|
},
|
||||||
self.access_token = token_data["access_token"]
|
)
|
||||||
self.token_expiry = int(time.time()) + token_data["expires_in"]
|
response.raise_for_status()
|
||||||
redis = get_redis()
|
token_data = response.json()
|
||||||
await redis.set(
|
self.access_token = token_data["access_token"]
|
||||||
f"fetcher:access_token:{self.client_id}",
|
self.token_expiry = int(time.time()) + token_data["expires_in"]
|
||||||
self.access_token,
|
redis = get_redis()
|
||||||
ex=token_data["expires_in"],
|
await redis.set(
|
||||||
)
|
f"fetcher:access_token:{self.client_id}",
|
||||||
await redis.set(
|
self.access_token,
|
||||||
f"fetcher:expire_at:{self.client_id}",
|
ex=token_data["expires_in"],
|
||||||
self.token_expiry,
|
)
|
||||||
ex=token_data["expires_in"],
|
await redis.set(
|
||||||
)
|
f"fetcher:expire_at:{self.client_id}",
|
||||||
logger.success(
|
self.token_expiry,
|
||||||
f"Granted new access token for client {self.client_id}, expires in {token_data['expires_in']} seconds"
|
ex=token_data["expires_in"],
|
||||||
)
|
)
|
||||||
|
logger.success(
|
||||||
|
f"Granted new access token for client {self.client_id}, expires in {token_data['expires_in']} seconds"
|
||||||
|
)
|
||||||
|
return
|
||||||
|
|
||||||
|
except TimeoutException as exc:
|
||||||
|
last_error = exc
|
||||||
|
logger.warning(
|
||||||
|
f"Timed out while requesting access token for client {self.client_id} (attempt {attempt}/{retries})"
|
||||||
|
)
|
||||||
|
except HTTPStatusError as exc:
|
||||||
|
last_error = exc
|
||||||
|
logger.warning(
|
||||||
|
f"HTTP error while requesting access token for client {self.client_id}"
|
||||||
|
f" (status: {exc.response.status_code}, attempt {attempt}/{retries})"
|
||||||
|
)
|
||||||
|
except Exception as exc: # noqa: BLE001
|
||||||
|
last_error = exc
|
||||||
|
logger.exception(
|
||||||
|
f"Unexpected error while requesting access token for client {self.client_id}"
|
||||||
|
f" (attempt {attempt}/{retries})"
|
||||||
|
)
|
||||||
|
|
||||||
|
if attempt < retries:
|
||||||
|
await asyncio.sleep(backoff * attempt)
|
||||||
|
|
||||||
|
raise TokenAuthError("Failed to grant access token after retries") from last_error
|
||||||
|
|
||||||
async def ensure_valid_access_token(self) -> None:
|
async def ensure_valid_access_token(self) -> None:
|
||||||
if self.is_token_expired():
|
if self.is_token_expired():
|
||||||
|
|||||||
Reference in New Issue
Block a user