Skip to content

API Reference

Auto-generated code documentation.

civic_lib_core

Civic Interconnect shared utilities package.

cli

__main__

Entry point for Civic Dev CLI.

File: cli.main

build_api

Generate and update application interface documentation using pdoc.

This script: - Locates the project root - Discovers local Python packages to document - Generates standalone HTML API documentation - Writes HTML files into the docs/api/ folder (or configured docs_api_dir)

File cli/build_api.py

main
main() -> int

Generate standalone HTML API documentation using pdoc.

Returns:

Name Type Description
int int

exit code (0 if successful, nonzero otherwise)

Source code in src/civic_lib_core/cli/build_api.py
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
def main() -> int:
    """Generate standalone HTML API documentation using pdoc.

    Returns:
        int: exit code (0 if successful, nonzero otherwise)
    """
    logger.info("Generating API documentation with pdoc...")

    # Discover the project layout (includes paths and package info)
    layout = project_layout.discover_project_layout()

    output_dir = layout.docs_api_dir
    if not output_dir:
        logger.error("No output directory configured for API documentation.")
        return 1
    output_dir.mkdir(parents=True, exist_ok=True)

    # Convert list of package Paths → dotted module names
    packages = _packages_as_module_names(layout.packages, layout.src_dir)

    if not packages:
        logger.warning("No Python packages found to document.")
        return 0

    logger.info(f"Packages discovered for API docs: {', '.join(packages)}")

    # Validate packages before building command
    validation_error = _validate_packages(packages)
    if validation_error:
        return validation_error

    # Build and validate pdoc command
    cmd = _build_pdoc_command(packages, output_dir)
    validation_error = _validate_command(cmd, output_dir)
    if validation_error:
        return validation_error

    # Execute pdoc command
    return _execute_pdoc_command(cmd, output_dir)

bump_version

Command-line tool to update version strings across key project files.

This tool replaces the old version with the new version in: - VERSION - pyproject.toml - README.md

Usage

python -m cli.bump_version OLD_VERSION NEW_VERSION or as a subcommand: civic-dev bump-version OLD_VERSION NEW_VERSION or shorthand: civic-dev bump OLD_VERSION NEW_VERSION

File: bump_version.py Module: cli.bump_version

bump_version_cmd
bump_version_cmd(old_version: str, new_version: str) -> int

CLI subcommand handler for version bump.

Returns:

Name Type Description
int int

Exit code (0 on success, 1 if no updates).

Source code in src/civic_lib_core/cli/bump_version.py
62
63
64
65
66
67
68
69
70
71
72
73
def bump_version_cmd(old_version: str, new_version: str) -> int:
    """CLI subcommand handler for version bump.

    Returns:
        int: Exit code (0 on success, 1 if no updates).
    """
    updated = _bump_version(old_version, new_version)
    if updated:
        logger.info(f"{updated} file(s) updated.")
        return 0
    logger.info("No files were updated.")
    return 1
main
main(old_version: str, new_version: str) -> int

Script-style entry point.

Returns:

Name Type Description
int int

Exit code.

Source code in src/civic_lib_core/cli/bump_version.py
76
77
78
79
80
81
82
def main(old_version: str, new_version: str) -> int:
    """Script-style entry point.

    Returns:
        int: Exit code.
    """
    return bump_version_cmd(old_version, new_version)
update_file
update_file(path: Path, old: str, new: str) -> bool

Replace version string in the specified file if found.

Returns:

Name Type Description
bool bool

True if file was modified, False otherwise.

Source code in src/civic_lib_core/cli/bump_version.py
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
def update_file(path: Path, old: str, new: str) -> bool:
    """Replace version string in the specified file if found.

    Returns:
        bool: True if file was modified, False otherwise.
    """
    if not path.exists():
        logger.info(f"Skipping: {path} (not found)")
        return False

    content = path.read_text(encoding="utf-8")
    updated = content.replace(old, new)

    if content != updated:
        path.write_text(updated, encoding="utf-8")
        logger.info(f"Updated: {path}")
        return True
    logger.info(f"No changes needed in: {path}")
    return False

check_policy

CLI utility to check Civic Interconnect project policy compliance.

main
main() -> int

Check current repo against Civic Interconnect policy.

Returns:

Name Type Description
int int

exit code (0 = OK, nonzero = errors)

Source code in src/civic_lib_core/cli/check_policy.py
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
def main() -> int:
    """Check current repo against Civic Interconnect policy.

    Returns:
        int: exit code (0 = OK, nonzero = errors)
    """
    try:
        repo_root = fs_utils.get_project_root()

        # Default to python for now
        repo_type = "python"

        issues = policy_utils.check_policy(repo_root, repo_type)

        if issues:
            print("Policy check failed with issues:")
            for issue in issues:
                print(f"  - {issue}")
            return 1
        print("Policy check passed. All required files/directories exist.")
        return 0

    except Exception as e:
        logger.error(f"Failed to check project policy: {e}")
        return 1

cli

Developer command-line interface (CLI) for Civic Interconnect projects.

Provides cross-repo automation commands for: - Installing and verifying the local development environment - Formatting, linting, and testing the codebase - Bumping version numbers for release - Tagging and pushing release commits

Run civic-dev --help for usage across all Civic Interconnect repos.

File: cli.py

build_api_command
build_api_command()

Build the docs.

Source code in src/civic_lib_core/cli/cli.py
21
22
23
24
@app.command("build-api")  # type: ignore
def build_api_command():
    """Build the docs."""
    build_api.main()
bump_version_command
bump_version_command(old_version: str, new_version: str)

Update version strings across the project.

Source code in src/civic_lib_core/cli/cli.py
27
28
29
30
@app.command("bump-version")  # type: ignore
def bump_version_command(old_version: str, new_version: str):
    """Update version strings across the project."""
    return bump_version.main(old_version, new_version)
check_policy_command
check_policy_command()

Check policies.

Source code in src/civic_lib_core/cli/cli.py
33
34
35
36
@app.command("check-policy")  # type: ignore
def check_policy_command():
    """Check policies."""
    check_policy.main()
layout_command
layout_command()

Show the current project layout.

Source code in src/civic_lib_core/cli/cli.py
39
40
41
42
@app.command("layout")  # type: ignore
def layout_command():
    """Show the current project layout."""
    layout.main()
main
main()

Entry point for the CLI application.

This function serves as the main entry point that initializes and runs the CLI application using the app() function.

Source code in src/civic_lib_core/cli/cli.py
57
58
59
60
61
62
63
def main():
    """Entry point for the CLI application.

    This function serves as the main entry point that initializes and runs the
    CLI application using the app() function.
    """
    app()
prepare_code
prepare_code()

Format, lint, and test the codebase.

Source code in src/civic_lib_core/cli/cli.py
45
46
47
48
@app.command("prep-code")  # type: ignore
def prepare_code():
    """Format, lint, and test the codebase."""
    prep_code.main()
release_command
release_command()

Tag and push the current version to GitHub.

Source code in src/civic_lib_core/cli/cli.py
51
52
53
54
@app.command("release")  # type: ignore
def release_command():
    """Tag and push the current version to GitHub."""
    release.main()

layout

CLI utility to discover and print the Civic Interconnect project layout.

File: layout.py

main
main() -> None

Discover and print the project layout.

Prints a formatted summary of: - Project root - Docs directories - Source packages - Organization name - Policy file used

Source code in src/civic_lib_core/cli/layout.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
def main() -> None:
    """Discover and print the project layout.

    Prints a formatted summary of:
    - Project root
    - Docs directories
    - Source packages
    - Organization name
    - Policy file used
    """
    try:
        layout = project_layout.discover_project_layout()
        print()
        print(project_layout.format_layout(layout))
    except Exception as e:
        logger.error(f"Failed to discover project layout: {e}")
        sys.exit(1)

prep_code

Prepare Civic Interconnect code for release or commit.

This script: - Checks whether the virtual environment may be stale (based on dependency file timestamps) - Formats code using Ruff - Lints and fixes issues with Ruff - Runs pre-commit hooks twice (first to fix, then to verify) - Executes unit tests via pytest

If dependency files changed since .venv was created, the script warns the user to rerun their setup script (e.g. setup.ps1) to reinstall the environment.

File: prep_code.py

main
main() -> int

Prepare code using a comprehensive workflow.

This function performs a comprehensive code preparation workflow including: - Virtual environment validation and dependency checking - Code formatting using Ruff - Linting and automatic fixing of issues - Pre-commit hook execution and validation - Unit test execution

The function checks if the virtual environment needs to be reinstalled by comparing dependency file timestamps, then runs a series of code quality tools in sequence. If any step fails, the process is terminated early.

Returns:

Name Type Description
int int

Exit code (0 for success, non-zero for failure). Returns 1 if virtual environment needs reinstalling, or the return code of any failed subprocess.

Raises:

Type Description
CalledProcessError

When any of the code preparation steps fail during execution.

Source code in src/civic_lib_core/cli/prep_code.py
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
def main() -> int:
    """Prepare code using a comprehensive workflow.

    This function performs a comprehensive code preparation workflow including:
    - Virtual environment validation and dependency checking
    - Code formatting using Ruff
    - Linting and automatic fixing of issues
    - Pre-commit hook execution and validation
    - Unit test execution

    The function checks if the virtual environment needs to be reinstalled by
    comparing dependency file timestamps, then runs a series of code quality
    tools in sequence. If any step fails, the process is terminated early.

    Returns:
        int: Exit code (0 for success, non-zero for failure). Returns 1 if
             virtual environment needs reinstalling, or the return code of
             any failed subprocess.

    Raises:
        subprocess.CalledProcessError: When any of the code preparation steps
                                     fail during execution.
    """
    try:
        logger.info("Checking virtual environment...")

        if should_reinstall():
            logger.warning(
                "Dependency files changed since the virtual environment was created.\n\n"
                "Please rerun your setup script (e.g. setup.ps1) to rebuild your environment.\n"
            )
            return 1
        logger.info(
            ".venv is up-to-date. Skipping environment reinstall. "
            "Rerun your setup script manually if needed."
        )

        # Log the library version for reference
        lib_version = get_repo_version()
        logger.info(f"Current library version: {lib_version}")

        config_file = Path("config.yaml")
        if not config_file.exists():
            logger.warning("config.yaml not found — logger may fall back to defaults.")

        run_check(["ruff", "format", "."], "Formatting code with Ruff")
        run_check(["ruff", "check", ".", "--fix"], "Linting and fixing issues with Ruff")
        run_check(["pre-commit", "run", "--all-files"], "Running pre-commit hooks (allowing fixes)")
        run_check(["pre-commit", "run", "--all-files"], "Verifying pre-commit hooks (must pass)")
        run_check(["pytest", "tests"], "Running unit tests")

        logger.info("Code formatted, linted, and tested successfully.")
        return 0

    except subprocess.CalledProcessError as e:
        logger.error(f"Process failed: {e}")
        return e.returncode
run_check
run_check(command: list[str], label: str) -> None

Run a shell command and fail fast if it errors.

Source code in src/civic_lib_core/cli/prep_code.py
27
28
29
30
31
32
33
34
35
36
37
38
39
40
def run_check(command: list[str], label: str) -> None:
    """Run a shell command and fail fast if it errors."""
    # Validate that the command is from our allowed list of tools
    allowed_commands = {"ruff", "pre-commit", "pytest"}
    if not command or command[0] not in allowed_commands:
        raise ValueError(f"Untrusted command attempted: {command}")

    logger.info(f"{label} ...")
    result = subprocess.run(command, shell=False, check=False)  # noqa: S603
    if result.returncode != 0:
        logger.error(
            f"{label} failed.\n\nPlease RE-RUN your setup script to apply and verify all fixes.\n"
        )
        raise subprocess.CalledProcessError(result.returncode, command)
should_reinstall
should_reinstall() -> bool

Determine whether the virtual environment should be reinstalled.

Based on timestamps of dependency files.

Source code in src/civic_lib_core/cli/prep_code.py
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
def should_reinstall() -> bool:
    """Determine whether the virtual environment should be reinstalled.

    Based on timestamps of dependency files.
    """
    venv_dir = Path(".venv")
    if not venv_dir.exists():
        logger.info(".venv does not exist. A fresh install will be required.")
        return True

    venv_time = venv_dir.stat().st_mtime
    for fname in ["pyproject.toml", "requirements.txt", "poetry.lock"]:
        path = Path(fname)
        if path.exists() and path.stat().st_mtime > venv_time:
            logger.info(f"Dependency file changed: {fname}")
            return True
    return False

release

Automate the release process for Civic Interconnect applications.

This script: - Reads the version from the VERSION file - Updates pre-commit hooks - Installs the package in editable mode - Formats and lints the code - Generates up-to-date API documentation - Runs pre-commit hooks twice (fix + verify) - Runs unit tests if present - Commits changes if any are staged - Creates a new Git tag for the release - Pushes the commit and tag to the remote repository

Update the VERSION file before running this script.

File: release.py

Example

civic-dev bump-version 1.0.3 1.0.4

main
main() -> int

Complete the release workflow for the current version.

Source code in src/civic_lib_core/cli/release.py
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
def main() -> int:
    """Complete the release workflow for the current version."""
    version_path = Path("VERSION")
    if not version_path.exists():
        logger.error("VERSION file not found.")
        return 1

    version = version_path.read_text().strip().removeprefix("v")
    tag = f"v{version}"
    logger.info(f"Releasing version {tag}")

    try:
        _validate_tag(tag)
        git_path = _validate_git_executable()

        _setup_and_format()
        _run_precommit_and_tests()
        _commit_and_tag(tag, git_path)

        logger.info(f"Release {tag} completed successfully")
        return 0

    except Exception as e:
        logger.error(f"Release process failed: {e}")
        return 1
run
run(cmd: str, check: bool = True) -> None

Run a shell command and log it.

Source code in src/civic_lib_core/cli/release.py
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
def run(cmd: str, check: bool = True) -> None:
    """Run a shell command and log it."""
    # Define allowed commands for security
    allowed_commands = {"pre-commit", "uv", "pip", "ruff", "pytest", "git"}

    # Parse and validate the command
    cmd_parts = shlex.split(cmd)
    if not cmd_parts:
        raise RuntimeError("Empty command")

    base_command = cmd_parts[0]
    if base_command not in allowed_commands:
        raise RuntimeError(f"Command not allowed: {base_command}")

    # Additional validation for command arguments
    for arg in cmd_parts:
        if not all(c.isalnum() or c in ".-_/:=[]\"'" for c in arg):
            raise RuntimeError(f"Unsafe argument detected: {arg}")

    logger.info(f"$ {cmd}")
    result = subprocess.run(cmd_parts, shell=False, check=False)  # noqa: S603
    if check and result.returncode != 0:
        raise RuntimeError(f"Command failed: {cmd}")

config_utils

civic_lib_core/config_utils.py.

Utilities for managing configuration and environment data in Civic Interconnect projects.

Provides: - Loading environment-based API keys - Reading YAML configuration files - Reading project version information - Parsing version strings into numeric tuples

Typical usage:

from civic_lib_core import config_utils

api_key = config_utils.load_api_key("MY_API_KEY", "MyService")
config = config_utils.load_yaml_config()
version = config_utils.load_version()
major, minor, patch = config_utils.parse_version("1.2.3")

load_api_key

load_api_key(env_var: str, service_name: str) -> str

Load an API key from the environment variables.

Parameters:

Name Type Description Default
env_var str

Name of the environment variable.

required
service_name str

Friendly name for error messaging.

required

Returns:

Name Type Description
str str

The API key value.

Exits

If the environment variable is missing or empty.

Source code in src/civic_lib_core/config_utils.py
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
def load_api_key(env_var: str, service_name: str) -> str:
    """Load an API key from the environment variables.

    Args:
        env_var (str): Name of the environment variable.
        service_name (str): Friendly name for error messaging.

    Returns:
        str: The API key value.

    Exits:
        If the environment variable is missing or empty.
    """
    key = (os.getenv(env_var) or "").strip()
    if not key:
        logger.error(
            f"Missing API key for {service_name}. "
            f"Fix: add {env_var!r} to your .env file or system environment."
        )
        sys.exit(f"Error: Environment variable {env_var} is required for {service_name}.")
    return key

load_version

load_version(
    filename: str = 'VERSION', root_dir: Path | None = None
) -> str

Load the version string from a VERSION file in the project.

Parameters:

Name Type Description Default
filename str

Name of the version file.

'VERSION'
root_dir Optional[Path]

Base directory to search from.

None

Returns:

Name Type Description
str str

Version string like "1.2.3".

Exits

If the file is missing or unreadable.

Source code in src/civic_lib_core/config_utils.py
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
def load_version(filename: str = "VERSION", root_dir: Path | None = None) -> str:
    """Load the version string from a VERSION file in the project.

    Args:
        filename (str): Name of the version file.
        root_dir (Optional[Path]): Base directory to search from.

    Returns:
        str: Version string like "1.2.3".

    Exits:
        If the file is missing or unreadable.
    """
    root = root_dir or fs_utils.get_project_root()
    version_path = root / filename

    try:
        version = version_path.read_text(encoding="utf-8").strip()
        logger.info(f"Loaded version: {version}")
        return version
    except Exception as e:
        logger.error(f"Error reading VERSION file at {version_path}: {e}")
        sys.exit(f"Error: VERSION file missing or unreadable at {version_path}.")

load_yaml_config

load_yaml_config(
    filename: str = 'config.yaml',
    root_dir: Path | None = None,
) -> dict[str, Any]

Load a YAML configuration file from the project root.

Parameters:

Name Type Description Default
filename str

Name of the YAML file.

'config.yaml'
root_dir Optional[Path]

Base directory to search from.

None

Returns:

Type Description
dict[str, Any]

dict[str, Any]: Parsed configuration dictionary.

Raises:

Type Description
FileNotFoundError

If config file is missing.

Source code in src/civic_lib_core/config_utils.py
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
def load_yaml_config(filename: str = "config.yaml", root_dir: Path | None = None) -> dict[str, Any]:
    """Load a YAML configuration file from the project root.

    Args:
        filename (str): Name of the YAML file.
        root_dir (Optional[Path]): Base directory to search from.

    Returns:
        dict[str, Any]: Parsed configuration dictionary.

    Raises:
        FileNotFoundError: If config file is missing.
    """
    root = root_dir or fs_utils.get_project_root()
    config_path = root / filename

    if not config_path.exists():
        msg = f"Config file not found: {config_path}"
        logger.error(msg)
        raise FileNotFoundError(msg)

    with config_path.open(encoding="utf-8") as f:
        config = yaml.safe_load(f) or {}

    logger.debug(f"Loaded config from {config_path}")
    return config

parse_version

parse_version(version: str) -> tuple[int, int, int]

Parse a version string (e.g. "1.2.3") into a tuple of integers.

Parameters:

Name Type Description Default
version str

Version string.

required

Returns:

Type Description
tuple[int, int, int]

tuple[int, int, int]: Tuple of (major, minor, patch).

Raises:

Type Description
ValueError

If the version format is invalid.

Source code in src/civic_lib_core/config_utils.py
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
def parse_version(version: str) -> tuple[int, int, int]:
    """Parse a version string (e.g. "1.2.3") into a tuple of integers.

    Args:
        version (str): Version string.

    Returns:
        tuple[int, int, int]: Tuple of (major, minor, patch).

    Raises:
        ValueError: If the version format is invalid.
    """
    match = re.match(r"^(\d+)\.(\d+)\.(\d+)$", version)
    if not match:
        raise ValueError(f"Invalid version format: {version}")
    major, minor, patch = match.groups()
    return int(major), int(minor), int(patch)

date_utils

civic_lib_core/date_utils.py.

Date and time utilities for Civic Interconnect projects.

Provides helpers to: - Generate date ranges for reports - Retrieve current UTC time or date - Format UTC datetimes into strings

Typical usage:

from civic_lib_core import date_utils

# Get today's UTC date string
today = date_utils.today_utc_str()

# Get current UTC datetime as a string
timestamp = date_utils.now_utc_str()

# Generate list of dates for the past 7 days
dates = date_utils.date_range(7)

date_range

date_range(days_back: int) -> list[str]

Generate a list of date strings from days_back days ago up to today (UTC).

Parameters:

Name Type Description Default
days_back int

Number of days to include, ending with today (inclusive). For example, days_back=7 returns 7 dates.

required

Returns:

Type Description
list[str]

list[str]: List of UTC dates in 'YYYY-MM-DD' format, earliest to latest.

Raises:

Type Description
ValueError

If days_back is negative.

Source code in src/civic_lib_core/date_utils.py
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
def date_range(days_back: int) -> list[str]:
    """Generate a list of date strings from `days_back` days ago up to today (UTC).

    Args:
        days_back (int): Number of days to include, ending with today (inclusive).
                         For example, days_back=7 returns 7 dates.

    Returns:
        list[str]: List of UTC dates in 'YYYY-MM-DD' format, earliest to latest.

    Raises:
        ValueError: If days_back is negative.
    """
    if days_back < 0:
        raise ValueError("days_back must be non-negative")

    if days_back == 0:
        return []

    today = now_utc().date()
    start_date = today - timedelta(days=days_back - 1)
    return [(start_date + timedelta(days=i)).strftime("%Y-%m-%d") for i in range(days_back)]

now_utc

now_utc() -> datetime

Return the current UTC datetime object.

Returns:

Name Type Description
datetime datetime

Current UTC datetime.

Source code in src/civic_lib_core/date_utils.py
58
59
60
61
62
63
64
def now_utc() -> datetime:
    """Return the current UTC datetime object.

    Returns:
        datetime: Current UTC datetime.
    """
    return datetime.now(UTC)

now_utc_str

now_utc_str(fmt: str = '%Y-%m-%d %H:%M:%S UTC') -> str

Return the current time in UTC as a formatted string.

Parameters:

Name Type Description Default
fmt str

Format string for datetime output. Default includes 'UTC'.

'%Y-%m-%d %H:%M:%S UTC'

Returns:

Name Type Description
str str

Formatted current UTC time.

Source code in src/civic_lib_core/date_utils.py
67
68
69
70
71
72
73
74
75
76
def now_utc_str(fmt: str = "%Y-%m-%d %H:%M:%S UTC") -> str:
    """Return the current time in UTC as a formatted string.

    Args:
        fmt (str): Format string for datetime output. Default includes 'UTC'.

    Returns:
        str: Formatted current UTC time.
    """
    return now_utc().strftime(fmt)

today_utc_str

today_utc_str() -> str

Return today's date in UTC in 'YYYY-MM-DD' format.

Returns:

Name Type Description
str str

Current UTC date as a string.

Source code in src/civic_lib_core/date_utils.py
79
80
81
82
83
84
85
def today_utc_str() -> str:
    """Return today's date in UTC in 'YYYY-MM-DD' format.

    Returns:
        str: Current UTC date as a string.
    """
    return now_utc().strftime("%Y-%m-%d")

dev_utils

civic_lib_core/dev_utils.py.

Core development utilities. Part of the Civic Interconnect agent framework.

log_suggested_paths

log_suggested_paths(
    response: Any,
    max_depth: int = 3,
    source_label: str = 'response',
) -> None

Log inferred paths to nested keys in a response object.

Parameters:

Name Type Description Default
response Any

Parsed API response.

required
max_depth int

Maximum depth to explore.

3
source_label str

Label for context in logs.

'response'
Source code in src/civic_lib_core/dev_utils.py
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
def log_suggested_paths(
    response: Any,
    max_depth: int = 3,
    source_label: str = "response",
) -> None:
    """Log inferred paths to nested keys in a response object.

    Args:
        response (Any): Parsed API response.
        max_depth (int): Maximum depth to explore.
        source_label (str): Label for context in logs.
    """
    logger.info(f"Suggested paths in {source_label}:")

    if isinstance(response, Mapping):
        logger.info(f"Top-level keys: {sorted(response.keys())}")
        paths = suggest_paths(response, max_depth=max_depth)
        for path, key, value in paths:
            logger.info(f"Path: {' -> '.join(path)} | Final Key: {key} | Value: {value}")
    elif isinstance(response, Sequence) and not isinstance(response, str | bytes):
        logger.info(f"Top-level object is a list with {len(response)} items.")
        for i, item in enumerate(response[:5]):
            logger.info(f"Index {i}: {type(item).__name__}")
    else:
        logger.warning("Response is neither a dict nor a list; cannot analyze paths.")

suggest_paths

suggest_paths(
    response: Any,
    max_depth: int = 3,
    current_path: list[str] | None = None,
) -> list[tuple[list[str], str, str]]

Suggest possible nested data paths in a response object.

Parameters:

Name Type Description Default
response Any

Parsed API response.

required
max_depth int

Maximum traversal depth.

3
current_path list[str] | None

Used internally for recursion.

None

Returns:

Type Description
list[tuple[list[str], str, str]]

list of (path, key, summary): Potential paths to explore.

Source code in src/civic_lib_core/dev_utils.py
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
def suggest_paths(
    response: Any,
    max_depth: int = 3,
    current_path: list[str] | None = None,
) -> list[tuple[list[str], str, str]]:
    """Suggest possible nested data paths in a response object.

    Args:
        response (Any): Parsed API response.
        max_depth (int): Maximum traversal depth.
        current_path (list[str] | None): Used internally for recursion.

    Returns:
        list of (path, key, summary): Potential paths to explore.
    """
    if current_path is None:
        current_path = []

    suggestions: list[tuple[list[str], str, str]] = []

    if max_depth <= 0:
        return suggestions

    if isinstance(response, Mapping):
        for key, value in response.items():
            path = current_path + [key]
            if isinstance(value, Mapping):
                suggestions.extend(suggest_paths(value, max_depth - 1, path))
            elif isinstance(value, list):
                summary = f"List[{len(value)}]" if value else "List[empty]"
                suggestions.append((path, key, summary))
            else:
                suggestions.append((path, key, str(value)))
    elif isinstance(response, list):
        summary = f"List[{len(response)}]" if response else "List[empty]"
        suggestions.append((current_path, "[list]", summary))

    return suggestions

fs_utils

civic_lib_core/fs_utils.py.

File and path utility functions for root-relative logic. Unified utilities used across Civic Interconnect agents and libraries.

discover_project_layout

discover_project_layout() -> ProjectLayout

Discover and analyze the project layout structure.

Scans the project directory to identify key components including the project root, documentation directories, source code location, valid packages, organization name, and project policy configuration.

Returns:

Name Type Description
ProjectLayout ProjectLayout

A comprehensive object containing all discovered project structure information including: - project_root: The root directory of the project - docs_dir: Main documentation directory - docs_api_dir: API documentation directory - src_dir: Source code directory - packages: List of valid Python packages found - org_name: Organization name associated with the project - policy: Loaded project policy configuration

Note

This function performs automatic discovery and may return empty collections or None values for components that are not found or configured in the project.

Source code in src/civic_lib_core/fs_utils.py
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
def discover_project_layout() -> ProjectLayout:
    """Discover and analyze the project layout structure.

    Scans the project directory to identify key components including the project root,
    documentation directories, source code location, valid packages, organization name,
    and project policy configuration.

    Returns:
        ProjectLayout: A comprehensive object containing all discovered project structure
            information including:
            - project_root: The root directory of the project
            - docs_dir: Main documentation directory
            - docs_api_dir: API documentation directory
            - src_dir: Source code directory
            - packages: List of valid Python packages found
            - org_name: Organization name associated with the project
            - policy: Loaded project policy configuration

    Note:
        This function performs automatic discovery and may return empty collections
        or None values for components that are not found or configured in the project.
    """
    root = get_project_root()
    policy = load_project_policy(root)
    docs_dir = get_docs_dir(root)
    docs_api_dir = get_docs_api_dir(root)
    src = get_source_dir(root)
    packages = get_valid_packages(src) if src else []
    org_name = get_org_name(root)

    layout = ProjectLayout(
        project_root=root,
        docs_dir=docs_dir,
        docs_api_dir=docs_api_dir,
        src_dir=src,
        packages=packages,
        org_name=org_name,
        policy=policy,
    )

    logger.debug(f"Discovered project layout: {layout}")
    return layout

ensure_dir

ensure_dir(path: str | Path) -> Path

Ensure a directory exists, creating it and any parent directories if necessary.

Parameters:

Name Type Description Default
path str | Path

The directory path to ensure exists. Can be a string or Path object.

required

Returns:

Name Type Description
Path Path

The resolved Path object of the created/existing directory.

Raises:

Type Description
OSError

If the directory cannot be created due to permissions or other filesystem issues.

Example

ensure_dir("/path/to/new/directory") PosixPath('/path/to/new/directory')

ensure_dir(Path("relative/path")) PosixPath('/absolute/path/to/relative/path')

Source code in src/civic_lib_core/fs_utils.py
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
def ensure_dir(path: str | Path) -> Path:
    """Ensure a directory exists, creating it and any parent directories if necessary.

    Args:
        path (str | Path): The directory path to ensure exists. Can be a string or Path object.

    Returns:
        Path: The resolved Path object of the created/existing directory.

    Raises:
        OSError: If the directory cannot be created due to permissions or other filesystem issues.

    Example:
        >>> ensure_dir("/path/to/new/directory")
        PosixPath('/path/to/new/directory')

        >>> ensure_dir(Path("relative/path"))
        PosixPath('/absolute/path/to/relative/path')
    """
    try:
        path = Path(path).resolve()
        path.mkdir(parents=True, exist_ok=True)
        logger.debug(f"Ensured directory exists: {path}")
        return path
    except OSError as e:
        logger.error(f"Failed to create directory {path}: {e}")
        raise

get_data_config_dir

get_data_config_dir(
    project_root: Path | None = None,
) -> Path

Get the data configuration directory path.

Parameters:

Name Type Description Default
project_root Path | None

The project root directory. If None, uses the discovered project root. Defaults to None.

None

Returns:

Name Type Description
Path Path

The path to the data-config directory.

Source code in src/civic_lib_core/fs_utils.py
121
122
123
124
125
126
127
128
129
130
131
132
def get_data_config_dir(project_root: Path | None = None) -> Path:
    """Get the data configuration directory path.

    Args:
        project_root (Path | None, optional): The project root directory.
            If None, uses the discovered project root. Defaults to None.

    Returns:
        Path: The path to the data-config directory.
    """
    root = project_root or get_project_root()
    return root / "data-config"

get_docs_api_dir

get_docs_api_dir(
    root_dir: Path | None = None, create: bool = False
) -> Path

Determine the project's API docs subdirectory.

Tries: 1. Client repo policy (docs.api_markdown_subdir) 2. Defaults to 'docs/api'

Source code in src/civic_lib_core/fs_utils.py
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
def get_docs_api_dir(root_dir: Path | None = None, create: bool = False) -> Path:
    """Determine the project's API docs subdirectory.

    Tries:
    1. Client repo policy (docs.api_markdown_subdir)
    2. Defaults to 'docs/api'
    """
    root_dir = root_dir or get_project_root()
    docs_dir = get_docs_dir(root_dir)
    policy = load_project_policy(root_dir)

    api_subdir = policy.get("docs", {}).get("api_markdown_subdir", "api")
    candidate = docs_dir / api_subdir

    logger.debug(f"API docs dir resolved to: {candidate}")
    return ensure_dir(candidate) if create else candidate

get_docs_dir

get_docs_dir(root_dir: Path | None = None) -> Path

Determine the project's main docs directory.

Tries: 1. Client repo policy (docs.site_dir or docs.docs_dir) 2. Defaults to 'docs'

Source code in src/civic_lib_core/fs_utils.py
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
def get_docs_dir(root_dir: Path | None = None) -> Path:
    """Determine the project's main docs directory.

    Tries:
    1. Client repo policy (docs.site_dir or docs.docs_dir)
    2. Defaults to 'docs'
    """
    root_dir = root_dir or get_project_root()
    policy = load_project_policy(root_dir)

    docs_dir_name = (
        policy.get("docs", {}).get("site_dir") or policy.get("docs", {}).get("docs_dir") or "docs"
    )
    docs_dir = root_dir / docs_dir_name

    if docs_dir.exists() and docs_dir.is_dir():
        logger.debug(f"Docs dir found: {docs_dir}")
        return docs_dir

    fallback = root_dir / "docs"
    logger.debug(f"Defaulting docs dir to: {fallback}")
    return fallback

get_org_name

get_org_name(project_root: Path) -> str | None

Get the organization name from the project root's parent directory.

Parameters:

Name Type Description Default
project_root Path

The root directory path of the project.

required

Returns:

Type Description
str | None

str | None: The name of the parent directory if it exists, otherwise None.

Source code in src/civic_lib_core/fs_utils.py
177
178
179
180
181
182
183
184
185
186
def get_org_name(project_root: Path) -> str | None:
    """Get the organization name from the project root's parent directory.

    Args:
        project_root (Path): The root directory path of the project.

    Returns:
        str | None: The name of the parent directory if it exists, otherwise None.
    """
    return project_root.parent.name if project_root.parent and project_root.parent.name else None

get_project_root

get_project_root(start_path: Path | None = None) -> Path

Find the project root directory by searching for common project markers.

Parameters:

Name Type Description Default
start_path Path | None

The starting directory for the search. If None, uses the current working directory. Defaults to None.

None

Returns:

Name Type Description
Path Path

The resolved path to the project root directory.

Raises:

Type Description
RuntimeError

If no project root is found by searching upward from the start path.

Source code in src/civic_lib_core/fs_utils.py
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
def get_project_root(start_path: Path | None = None) -> Path:
    """Find the project root directory by searching for common project markers.

    Args:
        start_path (Path | None, optional): The starting directory for the search.
            If None, uses the current working directory. Defaults to None.

    Returns:
        Path: The resolved path to the project root directory.

    Raises:
        RuntimeError: If no project root is found by searching upward from the start path.
    """
    current = start_path or Path.cwd()
    markers = [".git", "pyproject.toml"]

    for parent in [current] + list(current.parents):
        for marker in markers:
            if (parent / marker).exists():
                logger.debug(f"Project root found at: {parent.resolve()} (marker: {marker})")
                return parent.resolve()

    raise RuntimeError(
        f"Project root not found. Searched from '{current.resolve()}' upward for markers: {markers}."
    )

get_repo_package_names

get_repo_package_names(
    root_path: Path | None = None,
) -> list[str]

Discover all Python package names under the repo's src directory.

Returns:

Type Description
list[str]

List[str]: Fully qualified package names, e.g. ['civic_lib_core', 'civic_lib_core.cli']

Source code in src/civic_lib_core/fs_utils.py
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
def get_repo_package_names(root_path: Path | None = None) -> list[str]:
    """Discover all Python package names under the repo's src directory.

    Returns:
        List[str]: Fully qualified package names, e.g. ['civic_lib_core', 'civic_lib_core.cli']
    """
    try:
        root = root_path or get_project_root()
        src_dir = get_source_dir(root)
        if src_dir is None:
            logger.debug("No source directory found, returning empty package list.")
            return []

        packages: list[str] = []

        for init_file in src_dir.rglob("__init__.py"):
            # Compute relative path from src/ to the package folder
            pkg_path = init_file.parent.relative_to(src_dir)
            if pkg_path.parts:
                package_name = ".".join(pkg_path.parts)
                packages.append(package_name)

        if not packages:
            logger.warning("No packages discovered under src.")

        return sorted(packages)

    except Exception as e:
        logger.warning(f"Failed to get package names: {e}")
        return []

get_runtime_config_path

get_runtime_config_path(
    project_root: Path | None = None,
) -> Path

Get the runtime configuration file path.

Parameters:

Name Type Description Default
project_root Path | None

The project root directory. If None, uses the discovered project root. Defaults to None.

None

Returns:

Name Type Description
Path Path

The path to the runtime_config.yaml file.

Source code in src/civic_lib_core/fs_utils.py
248
249
250
251
252
253
254
255
256
257
258
259
def get_runtime_config_path(project_root: Path | None = None) -> Path:
    """Get the runtime configuration file path.

    Args:
        project_root (Path | None, optional): The project root directory.
            If None, uses the discovered project root. Defaults to None.

    Returns:
        Path: The path to the runtime_config.yaml file.
    """
    root = project_root or get_project_root()
    return root / "runtime_config.yaml"

get_source_dir

get_source_dir(root_dir: Path) -> Path | None

Get the source directory containing Python packages for the project.

Parameters:

Name Type Description Default
root_dir Path

The root directory path of the project.

required

Returns:

Type Description
Path | None

Path | None: The path to the source directory if found and contains valid Python packages, otherwise None.

Source code in src/civic_lib_core/fs_utils.py
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
def get_source_dir(root_dir: Path) -> Path | None:
    """Get the source directory containing Python packages for the project.

    Args:
        root_dir (Path): The root directory path of the project.

    Returns:
        Path | None: The path to the source directory if found and contains valid
            Python packages, otherwise None.
    """
    policy = load_project_policy(root_dir)
    src_dirs_config = policy.get("build", {}).get("src_dirs", ["src"])

    candidates = []
    if isinstance(src_dirs_config, str):
        candidates.append(root_dir / src_dirs_config)
    elif isinstance(src_dirs_config, list):
        candidates.extend([root_dir / s for s in src_dirs_config])

    if not candidates and (root_dir / "src").is_dir():
        candidates.append(root_dir / "src")

    for candidate in candidates:
        if candidate.exists() and candidate.is_dir():
            packages = [p for p in candidate.iterdir() if p.is_dir() and _is_python_package(p)]
            if packages:
                logger.debug(
                    f"Source directory: {candidate} with packages: {[p.name for p in packages]}"
                )
                return candidate

    logger.warning(
        f"No valid source directory with Python packages found in {root_dir} "
        f"based on policy {src_dirs_config} or default 'src'."
    )
    return None

get_valid_packages

get_valid_packages(src_dir: Path) -> list[Path]

Get all valid Python packages found in the source directory.

Parameters:

Name Type Description Default
src_dir Path

The source directory to search for Python packages.

required

Returns:

Type Description
list[Path]

list[Path]: A list of Path objects representing valid Python packages. Returns an empty list if no packages are found or if the source directory doesn't exist.

Source code in src/civic_lib_core/fs_utils.py
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
def get_valid_packages(src_dir: Path) -> list[Path]:
    """Get all valid Python packages found in the source directory.

    Args:
        src_dir (Path): The source directory to search for Python packages.

    Returns:
        list[Path]: A list of Path objects representing valid Python packages.
            Returns an empty list if no packages are found or if the source
            directory doesn't exist.
    """
    if not src_dir.exists() or not src_dir.is_dir():
        logger.warning(f"Source directory does not exist or is not a directory: {src_dir}")
        return []

    packages = [p for p in src_dir.iterdir() if _is_python_package(p)]
    if not packages:
        logger.debug(f"No valid Python packages found in: {src_dir}")
        return []

    logger.debug(f"Found packages: {[p.name for p in packages]}")
    return packages

resolve_path

resolve_path(relative_path: str | Path) -> Path

Resolve a relative path against the project root directory.

Parameters:

Name Type Description Default
relative_path str | Path

The relative path to resolve. Can be a string or Path object.

required

Returns:

Name Type Description
Path Path

The resolved absolute path relative to the project root.

Example

resolve_path("src/package") PosixPath('/absolute/path/to/project/src/package')

Source code in src/civic_lib_core/fs_utils.py
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
def resolve_path(relative_path: str | Path) -> Path:
    """Resolve a relative path against the project root directory.

    Args:
        relative_path (str | Path): The relative path to resolve. Can be a string or Path object.

    Returns:
        Path: The resolved absolute path relative to the project root.

    Example:
        >>> resolve_path("src/package")
        PosixPath('/absolute/path/to/project/src/package')
    """
    root = get_project_root()
    resolved = (root / Path(relative_path)).resolve()
    logger.debug(f"Resolved '{relative_path}' to: {resolved}")
    return resolved

safe_filename

safe_filename(name: str, max_length: int = 255) -> str

Create a safe filename by sanitizing input string.

Parameters:

Name Type Description Default
name str

The original filename or string to sanitize.

required
max_length int

Maximum length of the resulting filename. Defaults to 255.

255

Returns:

Name Type Description
str str

A sanitized filename safe for filesystem use, containing only alphanumeric characters, dots, underscores, and hyphens. Spaces and path separators are converted to underscores.

Example

safe_filename("My File/Name:Test") 'my_file_name_test'

safe_filename("", 10) 'unnamed'

Source code in src/civic_lib_core/fs_utils.py
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
def safe_filename(name: str, max_length: int = 255) -> str:
    """Create a safe filename by sanitizing input string.

    Args:
        name (str): The original filename or string to sanitize.
        max_length (int, optional): Maximum length of the resulting filename. Defaults to 255.

    Returns:
        str: A sanitized filename safe for filesystem use, containing only alphanumeric
            characters, dots, underscores, and hyphens. Spaces and path separators
            are converted to underscores.

    Example:
        >>> safe_filename("My File/Name:Test")
        'my_file_name_test'

        >>> safe_filename("", 10)
        'unnamed'
    """
    if not name:
        return "unnamed"

    safe_chars = []
    for char in name:
        if char.isalnum() or char in "._-":
            safe_chars.append(char.lower())
        elif char in " /\\:":
            safe_chars.append("_")

    result = "".join(safe_chars)
    if not result:
        result = "file"
    if result.startswith("."):
        result = "_" + result
    if len(result) > max_length:
        result = result[:max_length].rstrip("_")
    result = result.rstrip("_") or "unnamed"

    logger.debug(f"Sanitized filename '{name}' to: '{result}'")
    return result

graphql_utils

civic_lib_core/graphql_utils.py.

Unified GraphQL utilities for Civic Interconnect projects.

Provides: - Consistent error handling for GraphQL transport errors - Asynchronous and synchronous helpers for paginated GraphQL queries - Utilities to fetch all pages of results from GraphQL APIs

async_paged_query async

async_paged_query(
    url: str,
    api_key: str,
    query: Any,
    data_path: Sequence[str],
    page_info_path: Sequence[str] | None = None,
) -> list[dict[str, Any]]

Execute a paginated GraphQL query asynchronously and fetch all results.

Parameters:

Name Type Description Default
url str

The GraphQL endpoint URL.

required
api_key str

Bearer token for API authentication.

required
query Any

The GraphQL query object to execute.

required
data_path Sequence[str]

Path to the data array in the response.

required
page_info_path Sequence[str] | None

Path to pageInfo in the response. If None, will attempt to infer from data_path. Defaults to None.

None

Returns:

Type Description
list[dict[str, Any]]

list[dict[str, Any]]: List of all records collected from all pages.

Raises:

Type Description
ValueError

If page_info_path is None and pageInfo cannot be inferred.

Source code in src/civic_lib_core/graphql_utils.py
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
async def async_paged_query(
    url: str,
    api_key: str,
    query: Any,
    data_path: Sequence[str],
    page_info_path: Sequence[str] | None = None,
) -> list[dict[str, Any]]:
    """Execute a paginated GraphQL query asynchronously and fetch all results.

    Args:
        url (str): The GraphQL endpoint URL.
        api_key (str): Bearer token for API authentication.
        query (Any): The GraphQL query object to execute.
        data_path (Sequence[str]): Path to the data array in the response.
        page_info_path (Sequence[str] | None, optional): Path to pageInfo in the response.
            If None, will attempt to infer from data_path. Defaults to None.

    Returns:
        list[dict[str, Any]]: List of all records collected from all pages.

    Raises:
        ValueError: If page_info_path is None and pageInfo cannot be inferred.
    """
    headers = {"Authorization": f"Bearer {api_key}"}
    transport = AIOHTTPTransport(url=url, headers=headers, ssl=True)

    async with Client(transport=transport, fetch_schema_from_transport=False) as client:
        collected: list[dict[str, Any]] = []
        after = None

        while True:
            variables = {"first": 100, "after": after}
            response = await client.execute(query, variable_values=variables)

            data: Any = response
            for key in data_path:
                data = data[key]
            collected.extend(data)

            if page_info_path is None:
                try:
                    page_info: Any = response
                    for key in data_path[:-1]:
                        page_info = page_info[key]
                    page_info = page_info["pageInfo"]
                except (KeyError, TypeError) as e:
                    raise ValueError(
                        "Could not infer page_info path. Please specify page_info_path."
                    ) from e
            else:
                page_info: Any = response
                for key in page_info_path:
                    page_info = page_info[key]

            if not page_info.get("hasNextPage"):
                break

            after = page_info.get("endCursor")

        logger.info(f"Fetched {len(collected)} records from {url}.")
        return collected

fetch_paginated async

fetch_paginated(
    client: Any,
    query: Any,
    data_key: str,
    variables: Mapping[str, Any] | None = None,
) -> list[dict[str, Any]]

Fetch all paginated results from a GraphQL query using cursor-based pagination.

This function automatically handles pagination by making multiple requests to fetch all available data, using the standard GraphQL cursor-based pagination pattern with 'first', 'after', 'edges', 'pageInfo', 'hasNextPage', and 'endCursor' fields.

Parameters:

Name Type Description Default
client Any

The GraphQL client instance that supports async execution.

required
query Any

The GraphQL query object to execute.

required
data_key str

The key in the response data that contains the paginated results.

required
variables Mapping[str, Any] | None

Additional variables to pass with the query. Defaults to None.

None

Returns:

Type Description
list[dict[str, Any]]

list[dict[str, Any]]: A list containing all the 'node' objects from all pages of results combined.

Example
query = gql("query { users(first: $first, after: $after) { ... } }")
users = await fetch_paginated(client, query, "users", {"status": "active"})
Note
  • Uses a fixed page size of 100 items per request
  • Automatically extracts 'node' objects from GraphQL 'edges'
  • Logs the total number of records fetched upon completion
Source code in src/civic_lib_core/graphql_utils.py
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
async def fetch_paginated(
    client: Any,
    query: Any,
    data_key: str,
    variables: Mapping[str, Any] | None = None,
) -> list[dict[str, Any]]:
    """Fetch all paginated results from a GraphQL query using cursor-based pagination.

    This function automatically handles pagination by making multiple requests to fetch
    all available data, using the standard GraphQL cursor-based pagination pattern with
    'first', 'after', 'edges', 'pageInfo', 'hasNextPage', and 'endCursor' fields.

    Args:
        client (Any): The GraphQL client instance that supports async execution.
        query (Any): The GraphQL query object to execute.
        data_key (str): The key in the response data that contains the paginated results.
        variables (Mapping[str, Any] | None, optional): Additional variables to pass
            with the query. Defaults to None.

    Returns:
        list[dict[str, Any]]: A list containing all the 'node' objects from all pages
            of results combined.

    Example:
        ```python
        query = gql("query { users(first: $first, after: $after) { ... } }")
        users = await fetch_paginated(client, query, "users", {"status": "active"})
        ```

    Note:
        - Uses a fixed page size of 100 items per request
        - Automatically extracts 'node' objects from GraphQL 'edges'
        - Logs the total number of records fetched upon completion
    """
    all_results: list[dict[str, Any]] = []
    after = None

    while True:
        page_vars = dict(variables) if variables else {}
        page_vars.update({"first": 100, "after": after})

        response = await client.execute_async(query, variable_values=page_vars)
        page = response[data_key]
        edges = page.get("edges", [])

        all_results.extend(edge["node"] for edge in edges)

        if not page.get("pageInfo", {}).get("hasNextPage"):
            break

        after = page["pageInfo"].get("endCursor")

    logger.info(f"Fetched {len(all_results)} total records for '{data_key}'.")
    return all_results

handle_transport_errors

handle_transport_errors(
    e: Exception, resource_name: str = 'resource'
) -> str

Handle GraphQL transport errors with appropriate logging and re-raising.

Parameters:

Name Type Description Default
e Exception

The exception to handle, typically a GraphQL transport error.

required
resource_name str

Name of the resource being accessed for logging context. Defaults to "resource".

'resource'

Returns:

Name Type Description
str str

Error message for 403 Forbidden errors, indicating access not granted.

Raises:

Type Description
Exception

Re-raises the original exception after logging the appropriate error message based on the exception type.

Source code in src/civic_lib_core/graphql_utils.py
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
def handle_transport_errors(e: Exception, resource_name: str = "resource") -> str:
    """Handle GraphQL transport errors with appropriate logging and re-raising.

    Args:
        e (Exception): The exception to handle, typically a GraphQL transport error.
        resource_name (str, optional): Name of the resource being accessed for
            logging context. Defaults to "resource".

    Returns:
        str: Error message for 403 Forbidden errors, indicating access not granted.

    Raises:
        Exception: Re-raises the original exception after logging the appropriate
            error message based on the exception type.
    """
    if isinstance(e, TransportServerError):
        if "403" in str(e):
            logger.warning(f"{resource_name} access not yet enabled (403 Forbidden).")
            return f"{resource_name} access not yet granted"
        logger.error(f"Server error while accessing {resource_name}: {e}")

    elif isinstance(e, TransportQueryError):
        logger.error(f"GraphQL query error while accessing {resource_name}: {e}")

    elif isinstance(e, TransportProtocolError):
        logger.error(f"Transport protocol error during {resource_name} query: {e}")

    else:
        logger.error(f"Unexpected error during {resource_name} query: {e}")

    raise e

paged_query

paged_query(
    url: str,
    api_key: str,
    query: Any,
    data_path: Sequence[str],
) -> list[dict[str, Any]]

Execute a paginated GraphQL query synchronously and fetch all results.

This is a synchronous wrapper around async_paged_query that uses asyncio.run to execute the asynchronous query in a blocking manner.

Parameters:

Name Type Description Default
url str

The GraphQL endpoint URL.

required
api_key str

Bearer token for API authentication.

required
query Any

The GraphQL query object to execute.

required
data_path Sequence[str]

Path to the data array in the response.

required

Returns:

Type Description
list[dict[str, Any]]

list[dict[str, Any]]: List of all records collected from all pages, or empty list if an error occurs.

Source code in src/civic_lib_core/graphql_utils.py
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
def paged_query(
    url: str,
    api_key: str,
    query: Any,
    data_path: Sequence[str],
) -> list[dict[str, Any]]:
    """Execute a paginated GraphQL query synchronously and fetch all results.

    This is a synchronous wrapper around async_paged_query that uses asyncio.run
    to execute the asynchronous query in a blocking manner.

    Args:
        url (str): The GraphQL endpoint URL.
        api_key (str): Bearer token for API authentication.
        query (Any): The GraphQL query object to execute.
        data_path (Sequence[str]): Path to the data array in the response.

    Returns:
        list[dict[str, Any]]: List of all records collected from all pages,
            or empty list if an error occurs.
    """
    try:
        return asyncio.run(async_paged_query(url, api_key, query, data_path))
    except Exception as e:
        handle_transport_errors(e, resource_name=url)
        return []

log_utils

civic_lib_core/log_utils.py.

Centralized logging for Civic Interconnect agents and libraries.

init_logger

init_logger(
    log_level: str | None = None,
    log_to_console: bool = True,
) -> None

Initialize Loguru logging once per session.

Loads config from project_policy.yaml if available.

Parameters:

Name Type Description Default
log_level Optional[str]

Override log level (e.g. "DEBUG").

None
log_to_console bool

Whether to also log to stderr.

True
Source code in src/civic_lib_core/log_utils.py
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
def init_logger(log_level: str | None = None, log_to_console: bool = True) -> None:
    """Initialize Loguru logging once per session.

    Loads config from project_policy.yaml if available.

    Args:
        log_level (Optional[str]): Override log level (e.g. "DEBUG").
        log_to_console (bool): Whether to also log to stderr.
    """
    global _logger_initialized
    if _logger_initialized:
        logger.debug("Logger already initialized.")  # type: ignore[attr-defined]
        return

    # Remove default handlers
    logger.remove()  # type: ignore[attr-defined]

    layout = fs_utils.discover_project_layout()
    project_root = layout.project_root

    try:
        policy = project_policy.load_project_policy(project_root)
    except Exception as e:
        policy = {}
        logger.warning(f"Failed to load project policy: {e}")  # type: ignore[attr-defined]

    log_subdir = policy.get("log_subdir", "logs")
    log_file_template = policy.get("log_file_template", "{time:YYYY-MM-DD}.log")
    retention_days = policy.get("log_retention_days", 7)

    logs_dir = project_root / log_subdir
    fs_utils.ensure_dir(logs_dir)

    log_file_path = logs_dir / log_file_template

    try:
        runtime_config_path = fs_utils.get_runtime_config_path(project_root)
        if runtime_config_path.exists():
            import yaml  # type: ignore

            runtime_cfg = yaml.safe_load(runtime_config_path.read_text())
            runtime_log_level = runtime_cfg.get("log_level")
        else:
            runtime_log_level = None
    except Exception as e:
        logger.warning(f"Failed to load runtime config: {e}")
        runtime_log_level = None

    level = (log_level or runtime_log_level or policy.get("log_level", "INFO")).upper().strip()

    # Add file sink with structured format
    logger.add(
        str(log_file_path),
        format="{time} | {level} | {message}",
        rotation="1 day",
        retention=f"{retention_days} days",
        encoding="utf-8",
        level=level,
        backtrace=True,
        diagnose=True,
    )

    if log_to_console:
        logger.add(
            sink=sys.stderr,
            format="<green>{time}</green> | <level>{level}</level> | {message}",
            level=level,
            backtrace=True,
            diagnose=True,
        )

    logger.info(f"===== Civic Interconnect logger initialized (level: {level}) =====")
    _logger_initialized = True

log_agent_end

log_agent_end(
    agent_name: str, status: str = 'success'
) -> None

Log the end of an agent.

Parameters:

Name Type Description Default
agent_name str

Name of the agent.

required
status str

Status text (e.g. "success" or "error").

'success'
Source code in src/civic_lib_core/log_utils.py
108
109
110
111
112
113
114
115
116
def log_agent_end(agent_name: str, status: str = "success") -> None:
    """Log the end of an agent.

    Args:
        agent_name (str): Name of the agent.
        status (str): Status text (e.g. "success" or "error").
    """
    timestamp = now_utc_str()
    logger.info(f"===== {agent_name} completed with status: {status} at {timestamp} =====")

log_agent_start

log_agent_start(agent_name: str) -> None

Log the start of an agent.

Parameters:

Name Type Description Default
agent_name str

Name of the agent.

required
Source code in src/civic_lib_core/log_utils.py
 99
100
101
102
103
104
105
def log_agent_start(agent_name: str) -> None:
    """Log the start of an agent.

    Args:
        agent_name (str): Name of the agent.
    """
    logger.info(f"===== Starting {agent_name} =====")

path_utils

Path utilities.

File: civic_lib_core/path_utils.py

policy_utils

Policy enforcement utilities for Civic Interconnect projects.

check_policy

check_policy(repo_root: Path, repo_type: str) -> list[str]

Check the project at repo_root against policy for the given repo_type.

Returns:

Type Description
list[str]

List of issues (empty list = all good)

Source code in src/civic_lib_core/policy_utils.py
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
def check_policy(repo_root: Path, repo_type: str) -> list[str]:
    """Check the project at repo_root against policy for the given repo_type.

    Returns:
        List of issues (empty list = all good)
    """
    policy = project_policy.load_project_policy(repo_root)

    issues = []

    # Check required files for all repos
    issues.extend(project_checks.check_required_files(repo_root, policy))

    if repo_type == "python":
        issues.extend(project_checks.check_python_project_files(repo_root, policy))
        issues.extend(project_checks.check_python_project_dirs(repo_root, policy))
        layout = _safe_discover_layout(repo_root)
        if layout and layout.src_dir:
            issues.extend(
                project_checks.check_oversized_py_files(repo_root, layout.src_dir, policy)
            )
            issues.extend(project_checks.check_py_files_outside_src(repo_root, layout.src_dir))
        else:
            issues.append("No src/ directory found; skipping Python file checks.")

    if repo_type == "node":
        issues.extend(
            project_checks.check_additional_files(repo_root, policy, "node_project_files")
        )

    if repo_type == "pwa":
        issues.extend(project_checks.check_additional_files(repo_root, policy, "pwa_project_files"))

    # Check for empty directories
    issues.extend(project_checks.check_empty_dirs(repo_root))

    return issues

project_checks

civic_lib_core/project_checks.py.

Run structural and policy checks on a Civic Interconnect project.

check_additional_files

check_additional_files(
    project_root: Path, policy: dict, key: str
) -> list[str]

Check additional file requirements from project policy.

Parameters:

Name Type Description Default
project_root Path

Project root.

required
policy dict

Project policy.

required
key str

Policy key like 'node_project_files' or 'pwa_project_files'.

required

Returns:

Type Description
list[str]

list[str]: Issues for missing files.

Source code in src/civic_lib_core/project_checks.py
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
def check_additional_files(project_root: Path, policy: dict, key: str) -> list[str]:
    """Check additional file requirements from project policy.

    Args:
        project_root (Path): Project root.
        policy (dict): Project policy.
        key (str): Policy key like 'node_project_files' or 'pwa_project_files'.

    Returns:
        list[str]: Issues for missing files.
    """
    issues = []
    required = policy.get(key, [])
    for filename in required:
        if not (project_root / filename).exists():
            issues.append(f"Missing {key.replace('_', ' ')} file: {filename}")
    return issues

check_empty_dirs

check_empty_dirs(project_root: Path) -> list[str]

Find empty directories in the project.

Parameters:

Name Type Description Default
project_root Path

Root of the project.

required

Returns:

Type Description
list[str]

list[str]: Issues for empty directories.

Source code in src/civic_lib_core/project_checks.py
42
43
44
45
46
47
48
49
50
51
52
53
54
55
def check_empty_dirs(project_root: Path) -> list[str]:
    """Find empty directories in the project.

    Args:
        project_root (Path): Root of the project.

    Returns:
        list[str]: Issues for empty directories.
    """
    issues = []
    for path in project_root.rglob("*"):
        if path.is_dir() and not any(path.iterdir()):
            issues.append(f"Empty directory found: {path.relative_to(project_root)}")
    return issues

check_oversized_py_files

check_oversized_py_files(
    project_root: Path, src_dir: Path, policy: dict
) -> list[str]

Check for Python files exceeding allowed line limits.

Parameters:

Name Type Description Default
project_root Path

Project root.

required
src_dir Path

Source directory.

required
policy dict

Project policy.

required

Returns:

Type Description
list[str]

list[str]: Issues for oversized files.

Source code in src/civic_lib_core/project_checks.py
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
def check_oversized_py_files(project_root: Path, src_dir: Path, policy: dict) -> list[str]:
    """Check for Python files exceeding allowed line limits.

    Args:
        project_root (Path): Project root.
        src_dir (Path): Source directory.
        policy (dict): Project policy.

    Returns:
        list[str]: Issues for oversized files.
    """
    issues = []
    max_py_length = policy.get("max_python_file_length", 1000)

    for py_file in src_dir.rglob("*.py"):
        try:
            lines = py_file.read_text(encoding="utf-8", errors="ignore").splitlines()
            if len(lines) > max_py_length:
                issues.append(
                    f"Python file too long ({len(lines)} lines): {py_file.relative_to(project_root)}"
                )
        except Exception as e:
            issues.append(f"Could not read file {py_file}: {e}")

    return issues

check_py_files_outside_src

check_py_files_outside_src(
    project_root: Path, src_dir: Path
) -> list[str]

Check for .py files outside src_dir, ignoring top-level scripts.

Parameters:

Name Type Description Default
project_root Path

Project root.

required
src_dir Path

Source directory.

required

Returns:

Type Description
list[str]

list[str]: Issues for files outside src.

Source code in src/civic_lib_core/project_checks.py
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
def check_py_files_outside_src(project_root: Path, src_dir: Path) -> list[str]:
    """Check for .py files outside src_dir, ignoring top-level scripts.

    Args:
        project_root (Path): Project root.
        src_dir (Path): Source directory.

    Returns:
        list[str]: Issues for files outside src.
    """
    issues = []
    for py_file in project_root.rglob("*.py"):
        if src_dir in py_file.parents:
            continue
        # Ignore top-level scripts in the repo root
        if py_file.parent == project_root:
            continue
        issues.append(f"Python file outside src/ directory: {py_file.relative_to(project_root)}")
    return issues

check_python_project_dirs

check_python_project_dirs(
    project_root: Path, policy: dict
) -> list[str]

Check required directories for Python projects.

Parameters:

Name Type Description Default
project_root Path

Project root.

required
policy dict

Project policy.

required

Returns:

Type Description
list[str]

list[str]: Issues for missing dirs.

Source code in src/civic_lib_core/project_checks.py
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
def check_python_project_dirs(project_root: Path, policy: dict) -> list[str]:
    """Check required directories for Python projects.

    Args:
        project_root (Path): Project root.
        policy (dict): Project policy.

    Returns:
        list[str]: Issues for missing dirs.
    """
    issues = []
    for dirname in policy.get("python_project_dirs", []):
        if not (project_root / dirname).exists():
            issues.append(f"Missing Python project directory: {dirname}/")
    return issues

check_python_project_files

check_python_project_files(
    project_root: Path, policy: dict
) -> list[str]

Check required files for Python projects.

Parameters:

Name Type Description Default
project_root Path

Project root.

required
policy dict

Project policy.

required

Returns:

Type Description
list[str]

list[str]: Issues for missing files.

Source code in src/civic_lib_core/project_checks.py
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
def check_python_project_files(project_root: Path, policy: dict) -> list[str]:
    """Check required files for Python projects.

    Args:
        project_root (Path): Project root.
        policy (dict): Project policy.

    Returns:
        list[str]: Issues for missing files.
    """
    issues = []
    for filename in policy.get("python_project_files", []):
        if not (project_root / filename).exists():
            issues.append(f"Missing Python project file: {filename}")
    return issues

check_required_files

check_required_files(
    project_root: Path, policy: dict
) -> list[str]

Check files required in all Civic Interconnect repos.

Parameters:

Name Type Description Default
project_root Path

Project root.

required
policy dict

Project policy.

required

Returns:

Type Description
list[str]

list[str]: Issues for missing required files.

Source code in src/civic_lib_core/project_checks.py
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
def check_required_files(project_root: Path, policy: dict) -> list[str]:
    """Check files required in all Civic Interconnect repos.

    Args:
        project_root (Path): Project root.
        policy (dict): Project policy.

    Returns:
        list[str]: Issues for missing required files.
    """
    issues = []
    for filename in policy.get("required_files", []):
        if not (project_root / filename).exists():
            issues.append(f"Missing required file: {filename}")
    return issues

main

main() -> None

Run all checks from CLI entry point.

Prints results and exits with appropriate code.

Source code in src/civic_lib_core/project_checks.py
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
def main() -> None:
    """Run all checks from CLI entry point.

    Prints results and exits with appropriate code.
    """
    import sys

    issues = run_all_checks()
    if issues:
        print("Project checks found the following issues:\n")
        for issue in issues:
            print(f"- {issue}")
        sys.exit(1)
    else:
        print("All project checks passed successfully.")
        sys.exit(0)

run_all_checks

run_all_checks() -> list[str]

Run all project-level checks.

Returns:

Type Description
list[str]

list[str]: List of issues found.

Source code in src/civic_lib_core/project_checks.py
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
def run_all_checks() -> list[str]:
    """Run all project-level checks.

    Returns:
        list[str]: List of issues found.
    """
    issues = []

    project_root = fs_utils.get_project_root()
    policy = project_policy.load_project_policy(project_root)
    layout = fs_utils.discover_project_layout()
    src_dir = getattr(layout, "src_dir", None)

    issues.extend(check_required_files(project_root, policy))

    # Check Python-specific files
    if isinstance(src_dir, Path):
        issues.extend(check_python_project_files(project_root, policy))
        issues.extend(check_python_project_dirs(project_root, policy))
        issues.extend(check_oversized_py_files(project_root, src_dir, policy))
        issues.extend(check_py_files_outside_src(project_root, src_dir))
    else:
        issues.append("No source directory found. Skipping Python file checks.")

    # Check Node.js files if applicable
    issues.extend(check_additional_files(project_root, policy, key="node_project_files"))

    # Check PWA files if applicable
    issues.extend(check_additional_files(project_root, policy, key="pwa_project_files"))

    issues.extend(check_empty_dirs(project_root))

    return issues

project_layout

civic_lib_core/project_layout.py.

Discover and verify basic project layout for any Civic Interconnect client repo.

ProjectLayout

Bases: NamedTuple

Represents the layout of a Civic Interconnect project.

Attributes:

Name Type Description
project_root Path

Root directory of the project.

src_dir Path | None

Source directory, or None if not found.

docs_dir Path | None

Documentation directory, or None if not found.

docs_api_dir Path | None

API documentation source directory, or None if not found.

packages list[Path]

List of package directories under src_dir.

org_name str | None

Organization name, if detected.

policy dict

Loaded project policy data.

Source code in src/civic_lib_core/project_layout.py
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
class ProjectLayout(NamedTuple):
    """Represents the layout of a Civic Interconnect project.

    Attributes:
        project_root (Path): Root directory of the project.
        src_dir (Path | None): Source directory, or None if not found.
        docs_dir (Path | None): Documentation directory, or None if not found.
        docs_api_dir (Path | None): API documentation source directory, or None if not found.
        packages (list[Path]): List of package directories under src_dir.
        org_name (str | None): Organization name, if detected.
        policy (dict): Loaded project policy data.
    """

    project_root: Path
    src_dir: Path | None
    docs_dir: Path | None
    docs_api_dir: Path | None
    packages: list[Path]
    org_name: str | None
    policy: dict

discover_project_layout

discover_project_layout() -> ProjectLayout

Discover and return the layout of the current Civic Interconnect project.

Delegates to fs_utils.discover_project_layout() to perform actual discovery.

Returns:

Name Type Description
ProjectLayout ProjectLayout

Populated project layout info.

Source code in src/civic_lib_core/project_layout.py
41
42
43
44
45
46
47
48
49
def discover_project_layout() -> ProjectLayout:
    """Discover and return the layout of the current Civic Interconnect project.

    Delegates to `fs_utils.discover_project_layout()` to perform actual discovery.

    Returns:
        ProjectLayout: Populated project layout info.
    """
    return fs_utils.discover_project_layout()

format_layout

format_layout(layout: ProjectLayout) -> str

Format the layout info for display.

Parameters:

Name Type Description Default
layout ProjectLayout

The layout info to format.

required

Returns:

Name Type Description
str str

Formatted layout details.

Source code in src/civic_lib_core/project_layout.py
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
def format_layout(layout: ProjectLayout) -> str:
    """Format the layout info for display.

    Args:
        layout (ProjectLayout): The layout info to format.

    Returns:
        str: Formatted layout details.
    """
    parts = [
        f"Org:      {layout.org_name or 'unknown'}",
        f"Root:     {layout.project_root}",
        f"API Docs: {layout.docs_api_dir or 'none'}",
        f"Source:   {layout.src_dir or 'none'}",
        f"Policy:   {layout.policy.get('__policy_path__', 'unknown')}",
        "Packages:",
        *(
            [f"  - {p.relative_to(layout.project_root)}" for p in layout.packages]
            or ["  (no packages found)"]
        ),
    ]
    return "\n".join(parts)

main

main() -> None

Standalone entry point for manual testing of this layout module.

Prints layout and any issues found.

Source code in src/civic_lib_core/project_layout.py
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
def main() -> None:
    """Standalone entry point for manual testing of this layout module.

    Prints layout and any issues found.
    """
    import sys

    try:
        layout = discover_project_layout()
        print("\n" + format_layout(layout) + "\n")

        issues = verify_layout(layout)
        if issues:
            print("\nProblems found:")
            for issue in issues:
                print(f"- {issue}")
            sys.exit(1)
        else:
            print("\nLayout verified successfully.")
            sys.exit(0)
    except Exception as e:
        print(
            f"\nAn error occurred during project layout discovery/verification: {e}",
            file=sys.stderr,
        )
        sys.exit(1)

verify_layout

verify_layout(layout: ProjectLayout) -> list[str]

Verify that the discovered layout satisfies expectations.

Parameters:

Name Type Description Default
layout ProjectLayout

The layout to check.

required

Returns:

Type Description
list[str]

list[str]: List of issues found (empty list means all OK).

Source code in src/civic_lib_core/project_layout.py
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
def verify_layout(layout: ProjectLayout) -> list[str]:
    """Verify that the discovered layout satisfies expectations.

    Args:
        layout (ProjectLayout): The layout to check.

    Returns:
        list[str]: List of issues found (empty list means all OK).
    """
    errors: list[str] = []

    if not layout.project_root.exists():
        errors.append(f"Project root not found: {layout.project_root}")
    elif not layout.project_root.is_dir():
        errors.append(f"Project root is not a directory: {layout.project_root}")

    if layout.src_dir:
        if not layout.src_dir.exists():
            errors.append(f"Missing source directory: {layout.src_dir}")
        elif not layout.src_dir.is_dir():
            errors.append(f"Source directory is not a directory: {layout.src_dir}")
        elif not layout.packages:
            errors.append(f"No Python packages found under: {layout.src_dir}")

    if layout.docs_api_dir:
        if not layout.docs_api_dir.exists():
            errors.append(f"Missing API docs source directory: {layout.docs_api_dir}")
        elif not layout.docs_api_dir.is_dir():
            errors.append(f"API docs source directory is not a directory: {layout.docs_api_dir}")

    return errors

project_policy

civic_lib_core/project_policy.py.

Load the project policy for any Civic Interconnect client repo.

load_project_policy

load_project_policy(
    project_root: Path | None = None,
    override_file: Path | None = None,
) -> dict[str, Any]

Load Civic Interconnect project policy.

Behavior: - Load defaults from civic_lib_core's bundled project_policy.yaml. - If a client repo defines its own project_policy.yaml, merge its overrides into the default policy.

Parameters:

Name Type Description Default
project_root Path | None

Optional project root to look for client project_policy.yaml.

None
override_file Path | None

Optional path to explicitly provide a custom policy file.

None

Returns:

Name Type Description
dict dict[str, Any]

Combined policy dictionary.

Source code in src/civic_lib_core/project_policy.py
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
def load_project_policy(
    project_root: Path | None = None,
    override_file: Path | None = None,
) -> dict[str, Any]:
    """Load Civic Interconnect project policy.

    Behavior:
    - Load defaults from civic_lib_core's bundled `project_policy.yaml`.
    - If a client repo defines its own `project_policy.yaml`, merge its
      overrides into the default policy.

    Args:
        project_root: Optional project root to look for client `project_policy.yaml`.
        override_file: Optional path to explicitly provide a custom policy file.

    Returns:
        dict: Combined policy dictionary.
    """
    # Load default policy
    try:
        with DEFAULT_POLICY_PATH.open(encoding="utf-8") as f:
            policy_data = yaml.safe_load(f) or {}
    except FileNotFoundError:
        logger.warning(f"Default policy file not found at {DEFAULT_POLICY_PATH}")
        policy_data = {}
    except yaml.YAMLError as e:
        logger.error(f"Failed to parse default policy file at {DEFAULT_POLICY_PATH}: {e}")
        raise

    # Load client-specific override if provided
    custom_policy_path = None

    if override_file:
        custom_policy_path = Path(override_file)
    elif project_root:
        custom_policy_path = Path(project_root) / "project_policy.yaml"

    if custom_policy_path and custom_policy_path.exists():
        try:
            with custom_policy_path.open(encoding="utf-8") as f:
                custom_data = yaml.safe_load(f) or {}

            policy_data = _deep_merge_dicts(policy_data, custom_data)
            policy_data["__policy_path__"] = str(custom_policy_path)

            logger.debug(f"Loaded custom policy from {custom_policy_path}")

        except yaml.YAMLError as e:
            logger.error(f"Failed to parse custom policy at {custom_policy_path}: {e}")
            raise

    # Indicate the policy file used
    if "__policy_path__" not in policy_data:
        policy_data["__policy_path__"] = str(DEFAULT_POLICY_PATH)

    return policy_data

report_archiver

civic_lib_core/report_archiver.py.

Archives old Civic Interconnect agent reports by renaming them with .archived.json. Used by admin and maintenance tools, not daily agents.

archive_old_reports

archive_old_reports(
    agent_dir: Path, keep_latest: bool = True
) -> list[Path]

Rename old .json reports to .archived.json, optionally keeping the latest.

Parameters:

Name Type Description Default
agent_dir Path

Directory with report files.

required
keep_latest bool

Whether to keep the most recent report unarchived.

True

Returns:

Type Description
list[Path]

list[Path]: List of archived report file paths.

Source code in src/civic_lib_core/report_archiver.py
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
def archive_old_reports(agent_dir: Path, keep_latest: bool = True) -> list[Path]:
    """Rename old .json reports to .archived.json, optionally keeping the latest.

    Args:
        agent_dir (Path): Directory with report files.
        keep_latest (bool): Whether to keep the most recent report unarchived.

    Returns:
        list[Path]: List of archived report file paths.
    """
    if not agent_dir.exists() or not agent_dir.is_dir():
        logger.warning(f"Agent report directory does not exist: {agent_dir}")
        return []

    json_reports = sorted(agent_dir.glob("*.json"), reverse=True)

    if keep_latest and json_reports:
        json_reports = json_reports[1:]  # Skip most recent

    archived = []

    for path in json_reports:
        # Safer alternative if you want to preserve complex suffixes:
        # archived_path = path.with_name(path.stem + ".archived.json")
        archived_path = path.with_suffix(".archived.json")
        try:
            path.rename(archived_path)
            logger.info(f"Archived report: {archived_path}")
            archived.append(archived_path)
        except Exception as e:
            logger.error(f"Failed to archive {path}: {e}")
            # raise e  # Uncomment if you want failures to crash the script

    return archived

archive_reports_older_than

archive_reports_older_than(
    agent_dir: Path, days_old: int
) -> list[Path]

Archive reports older than a specified number of days.

Parameters:

Name Type Description Default
agent_dir Path

Directory with report files.

required
days_old int

Number of days to retain. Older reports get archived.

required

Returns:

Type Description
list[Path]

list[Path]: List of archived report file paths.

Source code in src/civic_lib_core/report_archiver.py
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
def archive_reports_older_than(agent_dir: Path, days_old: int) -> list[Path]:
    """Archive reports older than a specified number of days.

    Args:
        agent_dir (Path): Directory with report files.
        days_old (int): Number of days to retain. Older reports get archived.

    Returns:
        list[Path]: List of archived report file paths.
    """
    cutoff_date = datetime.now(UTC) - timedelta(days=days_old)
    archived = []

    for path in agent_dir.glob("*.json"):
        try:
            date_str = path.stem
            report_date = datetime.strptime(date_str, DATE_ONLY_FORMAT).replace(tzinfo=UTC)
            if report_date < cutoff_date:
                archived_path = path.with_suffix(".archived.json")
                path.rename(archived_path)
                logger.info(f"Archived report older than {days_old} days: {archived_path}")
                archived.append(archived_path)
        except ValueError:
            logger.warning(f"Skipping non-date report file: {path.name}")
        except Exception as e:
            logger.error(f"Failed to archive {path.name}: {e}")
            # raise e

    return archived

report_constants

civic_lib_core/report_constants.py.

Shared constants for report generation, reading, validation, and indexing. Used across Civic Interconnect agents and admin tools.

report_formatter

civic_lib_core/report_formatter.py.

Format Civic Interconnect agent reports into various human-readable forms. Supports Markdown, plain text, and CSV formats.

format_report_as_csv

format_report_as_csv(report: dict[str, Any]) -> str

Format a report dictionary as a CSV string.

Parameters:

Name Type Description Default
report dict

A dictionary containing report data with a 'results' key that holds a list of dictionaries to be formatted as CSV.

required

Returns:

Name Type Description
str str

A CSV-formatted string with headers and data rows, or a message indicating no results are available if the results list is empty.

Source code in src/civic_lib_core/report_formatter.py
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
def format_report_as_csv(report: dict[str, Any]) -> str:
    """Format a report dictionary as a CSV string.

    Args:
        report (dict): A dictionary containing report data with a 'results' key
                      that holds a list of dictionaries to be formatted as CSV.

    Returns:
        str: A CSV-formatted string with headers and data rows, or a message
             indicating no results are available if the results list is empty.
    """
    results = report.get("results", [])
    if not results:
        return "No results to export."

    output = io.StringIO()
    writer = csv.DictWriter(output, fieldnames=results[0].keys())
    writer.writeheader()
    writer.writerows(results)
    return output.getvalue()

format_report_as_markdown

format_report_as_markdown(report: dict[str, Any]) -> str

Format a report dictionary as a markdown string.

Takes a report dictionary containing agent information, metadata, and results, and converts it into a well-formatted markdown document with a summary section and a sample result displayed as JSON.

Parameters:

Name Type Description Default
report dict[str, Any]

A dictionary containing report data with the following optional keys: - 'agent': Name of the agent that generated the report - 'timestamp': When the report was generated - 'agent_version': Version of the agent - 'lib_version': Version of the library used - 'record_count': Number of records processed - 'results': List of result objects

required

Returns:

Name Type Description
str str

A markdown-formatted string containing the report summary and the first result from the results list (if available) displayed as a JSON code block.

Example

report = { ... 'agent': 'DataProcessor', ... 'timestamp': '2023-10-01T12:00:00Z', ... 'record_count': 100, ... 'results': [{'id': 1, 'status': 'success'}], ... } markdown = format_report_as_markdown(report) print(markdown)

Report Summary for DataProcessor

Date: 2023-10-01T12:00:00Z ...

Source code in src/civic_lib_core/report_formatter.py
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
def format_report_as_markdown(report: dict[str, Any]) -> str:
    """Format a report dictionary as a markdown string.

    Takes a report dictionary containing agent information, metadata, and results,
    and converts it into a well-formatted markdown document with a summary section
    and a sample result displayed as JSON.

    Args:
        report (dict[str, Any]): A dictionary containing report data with the following
            optional keys:
            - 'agent': Name of the agent that generated the report
            - 'timestamp': When the report was generated
            - 'agent_version': Version of the agent
            - 'lib_version': Version of the library used
            - 'record_count': Number of records processed
            - 'results': List of result objects

    Returns:
        str: A markdown-formatted string containing the report summary and
             the first result from the results list (if available) displayed
             as a JSON code block.

    Example:
        >>> report = {
        ...     'agent': 'DataProcessor',
        ...     'timestamp': '2023-10-01T12:00:00Z',
        ...     'record_count': 100,
        ...     'results': [{'id': 1, 'status': 'success'}],
        ... }
        >>> markdown = format_report_as_markdown(report)
        >>> print(markdown)
        # Report Summary for DataProcessor
        **Date:** 2023-10-01T12:00:00Z
        ...
    """
    lines = [
        f"# Report Summary for {report.get('agent', 'Unknown Agent')}",
        f"**Date:** {report.get('timestamp', 'Unknown')}",
        f"**Agent Version:** {report.get('agent_version', 'N/A')}",
        f"**Library Version:** {report.get('lib_version', 'N/A')}",
        f"**Record Count:** {report.get('record_count', 'N/A')}",
        "",
        "## Sample Result",
    ]
    sample = report.get("results", [])
    if sample:
        lines.append("```json")
        lines.append(json.dumps(sample[0], indent=2))
        lines.append("```")
    else:
        lines.append("_No results to display._")

    return "\n".join(lines)

format_report_as_text

format_report_as_text(report: dict[str, Any]) -> str

Format a report dictionary as a human-readable text string.

Parameters:

Name Type Description Default
report dict

A dictionary containing report data with keys like 'agent', 'timestamp', 'agent_version', 'lib_version', 'record_count', and 'results'.

required

Returns:

Name Type Description
str str

A formatted multi-line string representation of the report including metadata and the first sample result (if available).

Example

report = { ... 'agent': 'DataCollector', ... 'timestamp': '2023-10-15 14:30:00', ... 'agent_version': '1.2.3', ... 'lib_version': '2.1.0', ... 'record_count': 150, ... 'results': [{'id': 1, 'value': 'sample'}], ... } print(format_report_as_text(report)) Report: DataCollector Date: 2023-10-15 14:30:00 Agent Version: 1.2.3 Library Version: 2.1.0 Record Count: 150

Sample Result: { "id": 1, "value": "sample" }

Source code in src/civic_lib_core/report_formatter.py
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
def format_report_as_text(report: dict[str, Any]) -> str:
    """Format a report dictionary as a human-readable text string.

    Args:
        report (dict): A dictionary containing report data with keys like 'agent',
                      'timestamp', 'agent_version', 'lib_version', 'record_count',
                      and 'results'.

    Returns:
        str: A formatted multi-line string representation of the report including
             metadata and the first sample result (if available).

    Example:
        >>> report = {
        ...     'agent': 'DataCollector',
        ...     'timestamp': '2023-10-15 14:30:00',
        ...     'agent_version': '1.2.3',
        ...     'lib_version': '2.1.0',
        ...     'record_count': 150,
        ...     'results': [{'id': 1, 'value': 'sample'}],
        ... }
        >>> print(format_report_as_text(report))
        Report: DataCollector
        Date: 2023-10-15 14:30:00
        Agent Version: 1.2.3
        Library Version: 2.1.0
        Record Count: 150

        Sample Result:
        {
          "id": 1,
          "value": "sample"
        }
    """
    lines = [
        f"Report: {report.get('agent', 'Unknown Agent')}",
        f"Date: {report.get('timestamp', 'Unknown')}",
        f"Agent Version: {report.get('agent_version', 'N/A')}",
        f"Library Version: {report.get('lib_version', 'N/A')}",
        f"Record Count: {report.get('record_count', 'N/A')}",
        "",
        "Sample Result:",
    ]
    sample = report.get("results", [])
    if sample:
        lines.append(json.dumps(sample[0], indent=2))
    else:
        lines.append("No results to display.")
    return "\n".join(lines)

to_csv

to_csv(data: list[dict[str, Any]], path: Path) -> None

Write a list of dictionaries to a CSV file.

If the data list is empty, writes "No results to export." to the file instead. The CSV header is generated from the keys of the first dictionary in the list.

Parameters:

Name Type Description Default
data list[dict[str, Any]]

A list of dictionaries to write to CSV. All dictionaries should have the same keys for proper CSV formatting.

required
path Path

The file path where the CSV will be written. Will be created if it doesn't exist.

required

Returns:

Type Description
None

None

Note

The file is written with UTF-8 encoding. If data is empty, a plain text message is written instead of CSV format.

Source code in src/civic_lib_core/report_formatter.py
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
def to_csv(data: list[dict[str, Any]], path: Path) -> None:
    """Write a list of dictionaries to a CSV file.

    If the data list is empty, writes "No results to export." to the file instead.
    The CSV header is generated from the keys of the first dictionary in the list.

    Args:
        data: A list of dictionaries to write to CSV. All dictionaries should
              have the same keys for proper CSV formatting.
        path: The file path where the CSV will be written. Will be created
              if it doesn't exist.

    Returns:
        None

    Note:
        The file is written with UTF-8 encoding. If data is empty, a plain text
        message is written instead of CSV format.
    """
    if not data:
        path.write_text("No results to export.", encoding="utf-8")
        return

    with path.open("w", newline="", encoding="utf-8") as f:
        writer = csv.DictWriter(f, fieldnames=data[0].keys())
        writer.writeheader()
        writer.writerows(data)

to_markdown

to_markdown(data: list[dict[str, Any]], path: Path) -> None

Convert a list of dictionaries to a Markdown table and write it to a file.

Takes a list of dictionaries where each dictionary represents a row of data, and converts it into a Markdown table format. The keys of the first dictionary are used as column headers. If the data list is empty, writes a message indicating no results to display.

Parameters:

Name Type Description Default
data list[dict[str, Any]]

List of dictionaries containing the data to convert. Each dictionary should have the same keys which will be used as table headers.

required
path Path

Path object specifying where to write the Markdown table file.

required

Returns:

Name Type Description
None None

The function writes directly to the specified file path.

Note
  • Pipe characters (|) in data values are automatically escaped to preserve Markdown table formatting.
  • The file is written with UTF-8 encoding.
  • If data is empty, writes "No results to display." to the file.
Source code in src/civic_lib_core/report_formatter.py
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
def to_markdown(data: list[dict[str, Any]], path: Path) -> None:
    """Convert a list of dictionaries to a Markdown table and write it to a file.

    Takes a list of dictionaries where each dictionary represents a row of data,
    and converts it into a Markdown table format. The keys of the first dictionary
    are used as column headers. If the data list is empty, writes a message
    indicating no results to display.

    Args:
        data (list[dict[str, Any]]): List of dictionaries containing the data to convert.
                                   Each dictionary should have the same keys which will
                                   be used as table headers.
        path (Path): Path object specifying where to write the Markdown table file.

    Returns:
        None: The function writes directly to the specified file path.

    Note:
        - Pipe characters (|) in data values are automatically escaped to preserve
          Markdown table formatting.
        - The file is written with UTF-8 encoding.
        - If data is empty, writes "_No results to display._" to the file.
    """
    if not data:
        path.write_text("_No results to display._", encoding="utf-8")
        return

    headers = list(data[0].keys())
    lines = ["| " + " | ".join(headers) + " |"]
    lines.append("|" + "|".join(["---"] * len(headers)) + "|")

    for row in data:
        # Escape any pipe characters to preserve Markdown table
        row_values = [str(row[h]).replace("|", "\\|") for h in headers]
        lines.append("| " + " | ".join(row_values) + " |")

    path.write_text("\n".join(lines), encoding="utf-8")

report_indexer

Module for generating a Markdown index of agent reports.

This module provides: - generate_index: generates a Markdown index listing the latest report from each agent

generate_index

generate_index(report_dir: Path = REPORTS_DIR) -> None

Generate a Markdown index listing the latest report from each agent.

Parameters:

Name Type Description Default
report_dir Path

The base reports/ directory to scan.

REPORTS_DIR
Source code in src/civic_lib_core/report_indexer.py
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
def generate_index(report_dir: Path = REPORTS_DIR) -> None:
    """Generate a Markdown index listing the latest report from each agent.

    Args:
        report_dir (Path): The base `reports/` directory to scan.
    """
    logger.info("Generating index of agent reports...")

    index_file = report_dir / "index.md"
    ensure_dir(index_file.parent)

    if index_file.exists():
        logger.debug(f"Removing old index file at {index_file}")
        index_file.unlink()

    logger.debug(f"Creating new index file at {index_file}")
    lines = ["# Civic Interconnect Agent Reports", ""]

    for agent_dir in sorted(report_dir.iterdir(), key=lambda p: p.name.lower()):
        if agent_dir.is_dir():
            latest = get_latest_report(agent_dir)
            if latest:
                relative_path = latest.relative_to(report_dir)
                agent_display = get_agent_name_from_path(latest) or agent_dir.name
                lines.append(f"- **{agent_display}**: [Latest Report]({relative_path})")

    if len(lines) == 2:
        lines.append("_No reports found._")
        logger.warning("No agent reports found. Generated empty index.")

    index_file.write_text("\n".join(lines) + "\n", encoding="utf-8")
    logger.info(f"Index written to {index_file}")

report_reader

civic_lib_core/report_reader.py.

Functions for reading, inspecting, and validating Civic Interconnect agent reports. Used by dashboards, CLI tools, and indexing utilities.

get_latest_report

get_latest_report(agent_dir: Path) -> Path | None

Get the most recent report file from the specified agent directory.

Parameters:

Name Type Description Default
agent_dir Path

Path to the agent's report folder.

required

Returns:

Type Description
Path | None

Path | None: The latest report file, or None if none found.

Source code in src/civic_lib_core/report_reader.py
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
def get_latest_report(agent_dir: Path) -> Path | None:
    """Get the most recent report file from the specified agent directory.

    Args:
        agent_dir (Path): Path to the agent's report folder.

    Returns:
        Path | None: The latest report file, or None if none found.
    """
    report_files = sorted(
        (f for f in agent_dir.glob(f"*{REPORT_EXTENSION}") if is_report_file(f)),
        reverse=True,
    )
    latest = report_files[0] if report_files else None

    if latest:
        logger.debug(f"Latest report for {agent_dir.name}: {latest.name}")
    else:
        logger.warning(f"No reports found in {agent_dir.name}")

    return latest

read_latest_report

read_latest_report(
    agent_dir: Path, strict: bool = False
) -> dict[str, Any] | None

Read and return the contents of the latest report for a given agent.

Parameters:

Name Type Description Default
agent_dir Path

Path to the agent's report folder.

required
strict bool

If True, raise errors on missing or invalid reports. If False, return None and log a warning.

False

Returns:

Type Description
dict[str, Any] | None

dict | None: Parsed report contents, or None if no report exists or format is invalid (in non-strict mode).

Source code in src/civic_lib_core/report_reader.py
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
def read_latest_report(agent_dir: Path, strict: bool = False) -> dict[str, Any] | None:
    """Read and return the contents of the latest report for a given agent.

    Args:
        agent_dir (Path): Path to the agent's report folder.
        strict (bool): If True, raise errors on missing or invalid reports.
                       If False, return None and log a warning.

    Returns:
        dict | None: Parsed report contents, or None if no report exists or format is invalid (in non-strict mode).
    """
    latest = get_latest_report(agent_dir)
    if not latest:
        msg = f"No report found in {agent_dir}"
        if strict:
            raise FileNotFoundError(msg)
        logger.warning(msg)
        return None

    try:
        data = json.loads(latest.read_text(encoding="utf-8"))
    except Exception as e:
        msg = f"Failed to read report: {latest}{e}"
        if strict:
            raise ValueError(msg) from e
        logger.warning(msg)
        return None

    if not validate_report_format(data):
        msg = f"Invalid report format in: {latest}"
        if strict:
            raise ValueError(msg)
        logger.warning(msg)
        return None

    return data

validate_report_format

validate_report_format(report: dict[str, Any]) -> bool

Validate that a report contains all expected top-level keys.

Parameters:

Name Type Description Default
report dict

The parsed report to validate.

required

Returns:

Name Type Description
bool bool

True if valid, False otherwise.

Source code in src/civic_lib_core/report_reader.py
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
def validate_report_format(report: dict[str, Any]) -> bool:
    """Validate that a report contains all expected top-level keys.

    Args:
        report (dict): The parsed report to validate.

    Returns:
        bool: True if valid, False otherwise.
    """
    keys = set(report.keys())
    missing = EXPECTED_REPORT_KEYS - keys
    if missing:
        logger.warning(f"Report missing expected keys: {missing}")
        return False
    return True

report_summary

civic_lib_core/report_summary.py.

Generates human-readable Markdown summaries of Civic Interconnect agent reports. Used optionally by agents or admin tools alongside JSON output.

write_markdown_summary

write_markdown_summary(
    report: dict[str, Any], path: Path
) -> None

Write a Markdown summary of a report's key metadata.

Parameters:

Name Type Description Default
report dict

The report data (already parsed).

required
path Path

The output path to write the .md file.

required
Source code in src/civic_lib_core/report_summary.py
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
def write_markdown_summary(report: dict[str, Any], path: Path) -> None:
    """Write a Markdown summary of a report's key metadata.

    Args:
        report (dict): The report data (already parsed).
        path (Path): The output path to write the .md file.
    """
    lines = [
        f"# Report Summary for {report.get('agent', 'Unknown Agent')}",
        f"**Date:** {report.get('timestamp', 'Unknown')}",
        f"**Agent Version:** {report.get('agent_version', 'N/A')}",
        f"**Library Version:** {report.get('lib_version', 'N/A')}",
        f"**Record Count:** {report.get('record_count', 'N/A')}",
        "",
        "Auto-generated summary. Data is available in the JSON report.",
    ]

    # Ensure output folder exists
    path.parent.mkdir(parents=True, exist_ok=True)

    path.write_text("\n".join(lines), encoding="utf-8")
    logger.info(f"Markdown summary written to {path}")

report_utils

civic_lib_core/report_utils.py.

Basic helpers for working with Civic Interconnect reports. Part of the Civic Interconnect agent framework.

get_agent_name_from_path

get_agent_name_from_path(path: Path) -> str

Extract and format the agent name from a report file path.

The agent name is derived from the parent folder of the report file, with underscores replaced by spaces and title-cased.

If the path does not have a parent directory, returns 'Unknown Agent'.

Parameters:

Name Type Description Default
path Path

The path to a report file.

required

Returns:

Name Type Description
str str

Formatted agent name or fallback string.

Source code in src/civic_lib_core/report_utils.py
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
def get_agent_name_from_path(path: Path) -> str:
    """Extract and format the agent name from a report file path.

    The agent name is derived from the parent folder of the report file,
    with underscores replaced by spaces and title-cased.

    If the path does not have a parent directory, returns 'Unknown Agent'.

    Args:
        path (Path): The path to a report file.

    Returns:
        str: Formatted agent name or fallback string.
    """
    name = path.parent.name
    return name.replace("_", " ").title() if name else "Unknown Agent"

is_report_file

is_report_file(path: Path) -> bool

Determine whether the given file path is a valid report file.

A valid report file must: - Have a ".json" extension - Begin with a date prefix (e.g., "2024-01-01")

Parameters:

Name Type Description Default
path Path

The path to check.

required

Returns:

Name Type Description
bool bool

True if the path matches report file format, False otherwise.

Source code in src/civic_lib_core/report_utils.py
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
def is_report_file(path: Path) -> bool:
    """Determine whether the given file path is a valid report file.

    A valid report file must:
    - Have a ".json" extension
    - Begin with a date prefix (e.g., "2024-01-01")

    Args:
        path (Path): The path to check.

    Returns:
        bool: True if the path matches report file format, False otherwise.
    """
    if path.suffix != ".json":
        return False
    try:
        datetime.date.fromisoformat(path.stem[:10])
        return True
    except ValueError:
        return False

report_writer

civic_lib_core/report_writer.py.

Functions for writing timestamped agent reports in multiple formats. Used by daily Civic Interconnect agents.

write_report

write_report(
    data: list[dict[str, Any]],
    agent_name: str,
    agent_version: str,
    schema_version: str = '1.0.0',
    report_dir: str | Path = REPORTS_DIR,
    file_format: str = 'json',
) -> Path

Write agent output to a timestamped report file with metadata.

Parameters:

Name Type Description Default
data list[dict[str, Any]]

The results to include in the report.

required
agent_name str

The name of the agent generating the report.

required
agent_version str

The version of the agent code.

required
schema_version str

The version of the report schema (default: "1.0.0").

'1.0.0'
report_dir str | Path

Root directory where reports are saved (default: REPORTS_DIR).

REPORTS_DIR
file_format str

Output format, one of "json" or "csv" (default: "json").

'json'

Returns:

Name Type Description
Path Path

The full path to the saved report file.

Source code in src/civic_lib_core/report_writer.py
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
def write_report(
    data: list[dict[str, Any]],
    agent_name: str,
    agent_version: str,
    schema_version: str = "1.0.0",
    report_dir: str | Path = REPORTS_DIR,
    file_format: str = "json",
) -> Path:
    """Write agent output to a timestamped report file with metadata.

    Args:
        data (list[dict[str, Any]]): The results to include in the report.
        agent_name (str): The name of the agent generating the report.
        agent_version (str): The version of the agent code.
        schema_version (str): The version of the report schema (default: "1.0.0").
        report_dir (str | Path): Root directory where reports are saved (default: REPORTS_DIR).
        file_format (str): Output format, one of "json" or "csv" (default: "json").

    Returns:
        Path: The full path to the saved report file.
    """
    timestamp = now_utc_str(TIMESTAMP_FORMAT)
    date_str = datetime.strptime(timestamp, TIMESTAMP_FORMAT).strftime(DATE_ONLY_FORMAT)

    agent_folder = ensure_dir(Path(report_dir) / safe_filename(agent_name))
    report_path = agent_folder / f"{date_str}.{file_format}"

    if file_format == "json":
        report = {
            "agent": agent_name,
            "timestamp": timestamp,
            "record_count": len(data),
            "agent_version": agent_version,
            "schema_version": schema_version,
            "lib_version": version_utils.get_repo_version(),
            "results": data,
        }
        report_path.write_text(json.dumps(report, indent=2), encoding="utf-8")

    elif file_format == "csv":
        report_formatter.to_csv(data, report_path)

    else:
        raise ValueError(f"Unsupported report format: {file_format}")

    logger.info(f"Report written: {report_path}")
    return report_path

schema_utils

civic_lib_core/schema_utils.py.

Centralized schema change detection utilities for Civic Interconnect agents. Part of the Civic Interconnect agent framework.

detect_schema_change

detect_schema_change(
    old_file: Path, new_data: dict[str, Any]
) -> bool

Detect if the schema has changed by comparing the old file's hash with the new data.

Parameters:

Name Type Description Default
old_file Path

The path to the old schema file.

required
new_data dict[str, Any]

The new schema data to compare against.

required

Returns:

Name Type Description
bool bool

True if the schema has changed (i.e., hashes differ), False otherwise.

Source code in src/civic_lib_core/schema_utils.py
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
def detect_schema_change(old_file: Path, new_data: dict[str, Any]) -> bool:
    """Detect if the schema has changed by comparing the old file's hash with the new data.

    Args:
        old_file (Path): The path to the old schema file.
        new_data (dict[str, Any]): The new schema data to compare against.

    Returns:
        bool: True if the schema has changed (i.e., hashes differ), False otherwise.
    """
    if not old_file.exists():
        logger.info(f"Old schema file not found: {old_file}. Treating as changed.")
        return True

    old_data = load_json(old_file)
    changed = hash_dict(old_data) != hash_dict(new_data)
    if changed:
        logger.info("Schema change detected.")
    else:
        logger.info("No schema change detected.")
    return changed

hash_dict

hash_dict(data: dict[str, Any]) -> str

Hash a JSON-serializable dictionary for change detection.

Parameters:

Name Type Description Default
data dict[str, Any]

The dictionary to hash.

required

Returns:

Name Type Description
str str

The SHA-256 hash of the JSON-encoded dictionary.

Source code in src/civic_lib_core/schema_utils.py
43
44
45
46
47
48
49
50
51
52
53
54
55
def hash_dict(data: dict[str, Any]) -> str:
    """Hash a JSON-serializable dictionary for change detection.

    Args:
        data (dict[str, Any]): The dictionary to hash.

    Returns:
        str: The SHA-256 hash of the JSON-encoded dictionary.
    """
    encoded = json.dumps(data, sort_keys=True).encode("utf-8")
    digest = hashlib.sha256(encoded).hexdigest()
    logger.debug(f"Computed hash: {digest}")
    return digest

load_json

load_json(path: str | Path) -> dict[str, Any]

Load a JSON file and return its contents as a dictionary.

Parameters:

Name Type Description Default
path str | Path

The path to the JSON file.

required

Returns:

Type Description
dict[str, Any]

dict[str, Any]: The parsed JSON data.

Raises:

Type Description
FileNotFoundError

If the file does not exist.

JSONDecodeError

If the file is not valid JSON.

Source code in src/civic_lib_core/schema_utils.py
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
def load_json(path: str | Path) -> dict[str, Any]:
    """Load a JSON file and return its contents as a dictionary.

    Args:
        path (str | Path): The path to the JSON file.

    Returns:
        dict[str, Any]: The parsed JSON data.

    Raises:
        FileNotFoundError: If the file does not exist.
        json.JSONDecodeError: If the file is not valid JSON.
    """
    path = Path(path)
    with path.open(encoding="utf-8") as f:
        data = json.load(f)
    logger.debug(f"Loaded JSON from {path}")
    return data

version_utils

civic_lib_core/version_utils.py.

Version discovery utilities for Civic Interconnect projects.

Supports: - Python projects (via importlib.metadata or pyproject.toml) - Non-Python projects (via VERSION file) - JavaScript/NodeJS projects (via package.json)

This allows the Civic CLI and shared tools to work seamlessly across mixed technology stacks, ensuring consistent version handling even in frontend-only repos.

get_repo_version

get_repo_version(
    package_name: str = 'civic-lib-core',
    root_dir: Path | None = None,
) -> str

Retrieve the project version from various sources.

  1. Python metadata (if package installed)
  2. pyproject.toml
  3. VERSION file
  4. package.json.

Returns:

Name Type Description
str str

The discovered version string, or "0.0.0" if none found.

Source code in src/civic_lib_core/version_utils.py
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
def get_repo_version(
    package_name: str = "civic-lib-core",
    root_dir: Path | None = None,
) -> str:
    """Retrieve the project version from various sources.

    1. Python metadata (if package installed)
    2. pyproject.toml
    3. VERSION file
    4. package.json.

    Returns:
        str: The discovered version string, or "0.0.0" if none found.
    """
    # 1. Check Python metadata
    version_str = get_version_from_python_metadata(package_name)
    if version_str:
        return version_str

    # 2. Determine root
    try:
        root = root_dir or fs_utils.get_project_root()
    except Exception as e:
        logger.warning(f"Could not detect project root. Defaulting to cwd. Error: {e}")
        root = Path.cwd()

    # 3. Check files in root
    version_str = get_version_from_files(root)
    if version_str:
        return version_str

    logger.info("No version found in repo. Defaulting to 0.0.0")
    return "0.0.0"

get_version_from_files

get_version_from_files(root: Path) -> str | None

Check pyproject.toml, VERSION, or package.json for the project version.

Source code in src/civic_lib_core/version_utils.py
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
def get_version_from_files(root: Path) -> str | None:
    """Check pyproject.toml, VERSION, or package.json for the project version."""
    pyproject = root / "pyproject.toml"
    if pyproject.exists():
        try:
            with pyproject.open("rb") as f:
                data = tomllib.load(f)
            version_str = data.get("project", {}).get("version")
            if version_str:
                logger.info(f"Version found in pyproject.toml: {version_str}")
                return version_str
        except Exception as e:
            logger.warning(f"Error parsing pyproject.toml: {e}")

    version_file = root / "VERSION"
    if version_file.exists():
        try:
            version_str = version_file.read_text(encoding="utf-8").strip()
            if version_str:
                logger.info(f"Version found in VERSION file: {version_str}")
                return version_str
        except Exception as e:
            logger.warning(f"Error reading VERSION file: {e}")

    package_json = root / "package.json"
    if package_json.exists():
        try:
            data = json.loads(package_json.read_text(encoding="utf-8"))
            version_str = data.get("version")
            if version_str:
                logger.info(f"Version found in package.json: {version_str}")
                return version_str
        except Exception as e:
            logger.warning(f"Error reading package.json: {e}")

    return None

get_version_from_python_metadata

get_version_from_python_metadata(
    package_name: str,
) -> str | None

Try reading the version from installed Python package metadata.

Source code in src/civic_lib_core/version_utils.py
29
30
31
32
33
34
35
36
37
38
39
def get_version_from_python_metadata(package_name: str) -> str | None:
    """Try reading the version from installed Python package metadata."""
    try:
        version_str = get_python_version(package_name)
        logger.info(f"Version found via Python metadata: {version_str}")
        return version_str
    except PackageNotFoundError:
        logger.debug(f"Package {package_name} not installed.")
    except Exception as e:
        logger.warning(f"Unexpected error reading Python version metadata: {e}")
    return None

yaml_utils

Lightweight helpers for reading and writing YAML files.

File: yaml_utils.py

read_yaml

read_yaml(path: str | Path) -> dict[str, Any]

Read and parse a YAML file into a dictionary.

Parameters:

Name Type Description Default
path str | Path

YAML file path.

required

Returns:

Name Type Description
dict dict[str, Any]

Parsed YAML data.

Source code in src/civic_lib_core/yaml_utils.py
31
32
33
34
35
36
37
38
39
40
41
42
def read_yaml(path: str | Path) -> dict[str, Any]:
    """Read and parse a YAML file into a dictionary.

    Args:
        path (str | Path): YAML file path.

    Returns:
        dict: Parsed YAML data.
    """
    path = Path(path)
    with path.open("r", encoding="utf-8") as f:
        return yaml.safe_load(f)

write_yaml

write_yaml(data: dict[str, Any], path: str | Path) -> Path

Write a dictionary to a YAML file.

Parameters:

Name Type Description Default
data dict

Data to write.

required
path str | Path

File path to write to.

required

Returns:

Name Type Description
Path Path

The path the file was written to.

Source code in src/civic_lib_core/yaml_utils.py
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
def write_yaml(data: dict[str, Any], path: str | Path) -> Path:
    """Write a dictionary to a YAML file.

    Args:
        data (dict): Data to write.
        path (str | Path): File path to write to.

    Returns:
        Path: The path the file was written to.
    """
    path = Path(path)
    path.parent.mkdir(parents=True, exist_ok=True)
    with path.open("w", encoding="utf-8") as f:
        yaml.dump(data, f, sort_keys=False)
    return path