Source code for litegram.client.session.httpx

from __future__ import annotations

import asyncio
from typing import TYPE_CHECKING, Any, Self, cast

import httpx

from litegram.__meta__ import __version__
from litegram.exceptions import TelegramNetworkError
from litegram.methods.base import Request

# Removed TelegramType import as we'll use PEP 695 type parameters
from .base import BaseSession

if TYPE_CHECKING:
    from collections.abc import AsyncGenerator

    from litegram.client.bot import Bot
    from litegram.methods import TelegramMethod
    from litegram.types import InputFile


[docs] class HttpxSession(BaseSession): def __init__(self, proxy: str | None = None, **kwargs: Any) -> None: super().__init__(**kwargs) self._proxy = proxy self._client: httpx.AsyncClient | None = None async def create_client(self) -> httpx.AsyncClient: if self._client is None or self._client.is_closed: self._client = httpx.AsyncClient( proxy=self._proxy, headers={"User-Agent": f"litegram/{__version__}"}, timeout=self.timeout, ) return self._client async def close(self) -> None: if self._client is not None and not self._client.is_closed: await self._client.aclose() def build_request_data[TelegramType]( self, bot: Bot, method: TelegramMethod[TelegramType] ) -> tuple[dict[str, Any], dict[str, Any]]: data = {} files = {} input_files: dict[str, InputFile] = {} for key, value in method.model_dump(warnings=False).items(): value = self.prepare_value(value, bot=bot, files=input_files) if value is not None: data[key] = value for key, value in input_files.items(): files[key] = (value.filename or key, value.read(bot)) return data, files async def make_request[TelegramType]( self, bot: Bot, method: TelegramMethod[TelegramType], timeout: int | None = None, ) -> TelegramType: client = await self.create_client() url = self.api.api_url(token=bot.token, method=method.__api_method__) data, files = self.build_request_data(bot=bot, method=method) try: response = await client.post( url, data=data, files=files, timeout=self.timeout if timeout is None else timeout, ) raw_result = response.text except httpx.TimeoutException as e: raise TelegramNetworkError(method=method, message="Request timeout error") from e except httpx.RequestError as e: raise TelegramNetworkError(method=method, message=f"{type(e).__name__}: {e}") from e checked_response = self.check_response( bot=bot, method=method, status_code=response.status_code, content=raw_result, ) return cast("TelegramType", checked_response.result) async def stream_content( self, url: str, headers: dict[str, Any] | None = None, timeout: int = 30, chunk_size: int = 65536, raise_for_status: bool = True, ) -> AsyncGenerator[bytes]: client = await self.create_client() async with client.stream("GET", url, headers=headers, timeout=timeout, follow_redirects=True) as response: if raise_for_status: response.raise_for_status() async for chunk in response.aiter_bytes(chunk_size): yield chunk async def __aenter__(self) -> Self: await self.create_client() return self async def __aexit__(self, exc_type: type[BaseException] | None, exc_value: BaseException | None, traceback: Any) -> None: await self.close()