import json import logging import os from typing import Optional import httpx log = logging.getLogger("docker_logs") def _docker_transport() -> httpx.AsyncHTTPTransport: sock_path = os.getenv("DOCKER_SOCK", "/var/run/docker.sock") return httpx.AsyncHTTPTransport(uds=sock_path) async def _docker_get(path: str, params: Optional[dict] = None) -> httpx.Response: timeout = httpx.Timeout(10.0, read=10.0) async with httpx.AsyncClient(transport=_docker_transport(), base_url="http://docker", timeout=timeout) as client: resp = await client.get(path, params=params) resp.raise_for_status() return resp def _decode_docker_stream(data: bytes) -> str: if not data: return "" out = bytearray() idx = 0 while idx + 8 <= len(data): stream_type = data[idx] size = int.from_bytes(data[idx + 4: idx + 8], "big") idx += 8 if idx + size > len(data): break chunk = data[idx: idx + size] idx += size if stream_type in (1, 2): out.extend(chunk) else: out.extend(chunk) if out: return out.decode("utf-8", errors="replace") return data.decode("utf-8", errors="replace") async def docker_container_logs(container_name: str, tail_lines: int = 200) -> str: filters = json.dumps({"name": [container_name]}) resp = await _docker_get("/containers/json", params={"filters": filters}) containers = resp.json() or [] if not containers: log.info("No docker container found for name=%s", container_name) return "" container_id = containers[0].get("Id") if not container_id: return "" resp = await _docker_get( f"/containers/{container_id}/logs", params={"stdout": 1, "stderr": 1, "tail": tail_lines}, ) return _decode_docker_stream(resp.content)