Skip to content

feat(environments): add DatabaseExecutableEnvironment#2441

Closed
aniruddh-alt wants to merge 27 commits into
mainfrom
aniruddh-alt/db-executable-env
Closed

feat(environments): add DatabaseExecutableEnvironment#2441
aniruddh-alt wants to merge 27 commits into
mainfrom
aniruddh-alt/db-executable-env

Conversation

@aniruddh-alt

@aniruddh-alt aniruddh-alt commented May 8, 2026

Copy link
Copy Markdown
Contributor

Description

Adds DatabaseExecutableEnvironment — a real-database-backed environment for agentic tool execution. Users supply a SQLAlchemy connection config and dotted-path Python executors that take a live sqlalchemy.Connection. The env handles the connection pool, dialect-aware safety knobs, and SQL-error → structured-ToolResult auto-wrapping so agents can self-correct on bad queries.

This is the first concrete consumer of a new abstract ExecutableEnvironment middle layer that future tool-execution backends (HTTP, shell, filesystem) will subclass — the same shape Oumi uses for inference engines (BaseInferenceEngine → RemoteInferenceEngine → AnthropicInferenceEngine).

StatefulEnvironment is not touched.

Class hierarchy (new code in bold)

BaseEnvironment (existing, unchanged)
├── DeterministicEnvironment        (unchanged)
├── SyntheticEnvironment            (unchanged)
└── ExecutableEnvironment           ← NEW abstract — owns dotted-path executor
    │                                 resolution, ToolResult validation,
    │                                 output_schema validation, close() lifecycle,
    │                                 _build_execution_context hook
    └── DatabaseExecutableEnvironment   ← NEW concrete — SQLAlchemy Engine,
                                         dialect guards, autocommit checkout,
                                         DBAPIError auto-wrap, audit logging

User-facing surface

YAML (env_type: database):

env_kwargs:
  connection:
    driver: postgresql+psycopg
    host: ${oc.env:DB_HOST}
    database: ehr
    username: oumi
    password_env_var: DB_PASSWORD     # secret pulled from env at connect-time, never inlined in YAML
    pool_size: 10
    pool_max_overflow: 20
  read_only: false                    # connection-level enforcement (Postgres default_transaction_read_only,
                                      # MySQL SESSION TRANSACTION READ ONLY, SQLite PRAGMA query_only)
  statement_timeout_ms: 30000         # Postgres / MySQL session vars
  audit: true                         # opt-in per-tool-call INFO log

Or with a full DSN from env (mutually exclusive with structured fields):

env_kwargs:
  connection:
    dsn_env_var: DATABASE_URL

Executor signature:

from sqlalchemy import text
from sqlalchemy.engine import Connection
from oumi.core.types.tool_call import ToolResult

def list_patients(arguments: dict, db: Connection) -> ToolResult:
    rows = db.execute(text(
        "SELECT patient_id, name FROM patients ORDER BY name"
    )).mappings().all()
    return ToolResult(output={"patients": [dict(r) for r in rows]})

The DB is the state — executors read/write it directly. Each db.execute(...) runs in autocommit mode, so one tool call = one atomic transaction. updated_state on ToolResult is rejected (raises ToolError).

Failure semantics

When a SQL error escapes the executor (any sqlalchemy.exc.DBAPIErrorIntegrityError, OperationalError, ProgrammingError, DataError), the env auto-wraps it as a structured ToolResult so the agent can see the DB's own message:

{
  "status": "error",
  "error": "IntegrityError",
  "message": "duplicate key value violates unique constraint ...",
  "sql_state": "23505"
}

Tools that want to handle specific constraint violations themselves (e.g. translating UNIQUE to already_prescribed) catch IntegrityError in their executor and return their own structured shape. The auto-wrap is the fallback. Schema validation is skipped on auto-wrapped results so a strict output_schema doesn't conflict with the wrap shape.

Pool exhaustion (sqlalchemy.exc.TimeoutError — not a DBAPIError), config errors, and bugs propagate as exceptions.

from_params runs a fail-fast SELECT 1 after engine creation so misconfig surfaces immediately, not on the first tool call.

Per-tool override + checkin reset

Tools can tighten env-level statement_timeout_ms via a per-tool field. The override is applied as a session-level SET at connection checkout. To prevent the override from leaking into the next checkout from the pool, a SQLAlchemy checkin event handler resets the timeout back to env-level on connection return. Validated such that per-tool can only tighten, never loosen, and only if env-level is set.

Sandboxing scope (intentionally outside)

Open-source ships connection management + executors. Sandboxing/isolation is the user's responsibility — they point the connection at a sandbox replica, a Docker DB, a per-conversation snapshot, or whatever fits their setup. We don't parse SQL, don't enforce read-only via SQL inspection, and don't isolate parallel conversations. What we do ship: connection-level read-only flags (DB enforces), statement timeouts (DB enforces), and per-tool-call audit logging.

EHR DB example

src/oumi/examples/ehr_db/ ships:

  • schema.sql — patients / allergies / medications / diagnoses / vitals tables
  • seed.sql — 6-patient fixture
  • executors.py — 6 SQL-backed tool executors (list_patients, get_patient, record_vitals, add_diagnosis (catches duplicate), prescribe_medication (catches allergy conflict + duplicate), update_allergies)

configs/examples/synthesis/ehr_db_synth.yaml is a runnable synthesis config wired to the example.

tests/e2e/synthesis/test_ehr_db_e2e.py is a SQLite-backed e2e that loads the YAML, builds the env, seeds a fresh DB from the bundled SQL, and walks 5 clinical flows end-to-end. No LLM required.

Testing

  • Unit (default CI, ~93 new tests): tests/unit/environments/test_executable_*.py, tests/unit/environments/test_database_executable_*.py, tests/unit/environments/test_ehr_db_executors.py, tests/unit/configs/params/test_database_connection_params.py. SQLite-backed — fast, no external deps.
  • E2E (default CI): tests/e2e/synthesis/test_ehr_db_e2e.py. SQLite-backed.
  • Postgres integration (opt-in): tests/integration/environments/test_database_executable_environment_postgres.py behind @pytest.mark.requires_postgres. Verifies dialect-specific behavior SQLite can't (real default_transaction_read_only, real statement_timeout, IntegrityError.sqlstate=23505). Run with pytest -m requires_postgres (Docker required for the testcontainers fixture).

Dependency changes

  • sqlalchemy>=2.0,<3.0 added as a direct runtime dep.
  • testcontainers>=4.0,<5.0 added to [project.optional-dependencies] dev.
  • requires_postgres pytest marker registered in pyproject.toml.

What's deferred (not blocking this PR)

  • Async SQLAlchemy paths.
  • Grounding query support (no sample_grounding from a real DB yet).
  • Per-statement audit logging (only per-tool-call now).
  • max_rows result-set capping (no clean implementation path against a raw sqlalchemy.Connection without breaking executor ergonomics; executors use LIMIT N themselves for now).
  • SQL parsing / statement allow-listing (security theater — DB role is the right enforcement layer).
  • Refactoring StatefulEnvironment under ExecutableEnvironment (mechanical cleanup, follow-on PR).

Related issues

Fixes # (no associated issue)

Before submitting

  • This PR only changes documentation. (You can ignore the following checks in that case)
  • Did you read the contributor guideline Pull Request guidelines?
  • Did you link the issue(s) related to this PR in the section above?
  • Did you add / update tests where needed?

Reviewers

At least one review from a member of oumi-ai/oumi-staff is required.

Note

Liberate
Risk: low

@gitar-bot

gitar-bot Bot commented May 8, 2026

Copy link
Copy Markdown

Gitar is working

Gitar

raise ValueError("boom")


_NOT_CALLABLE = 42
Comment thread src/oumi/environments/database_executable_environment.py Fixed
finally:
try:
cursor.close()
except Exception:
Comment thread src/oumi/environments/database_executable_environment.py Fixed
f"Available tools: {[tool.id for tool in self._params.tools]}"
)

def step(self, tool_id: str, arguments: dict[str, Any]) -> ToolResult:
testcontainers' PostgresContainer defaults to the psycopg2 driver and
emits ``postgresql+psycopg2://`` DSNs. The previous DSN rewrite used
``.replace("postgresql://", ...)`` which didn't match, so the integration
test still ran on psycopg2 — whose ``Error.pgcode`` carries SQLSTATE,
not ``sqlstate``. The auto-wrap therefore returned ``sql_state=None`` and
``test_integrity_error_carries_sqlstate`` failed on the unique-violation
case.

Fall back to ``pgcode`` when ``sqlstate`` is unset so the wrap works for
either driver, and drop the dead URL rewrite from the test fixture.
@aniruddh-alt aniruddh-alt marked this pull request as ready for review May 15, 2026 07:08
f"DB-backed envs hold state in the database, not in ToolResult."
)

def step(self, tool_id: str, arguments: dict[str, Any]) -> ToolResult:

@liberate-bot liberate-bot Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few correctness concerns worth a look — all left as inline comments.

assert kwargs.connection is not None # validated above

url = kwargs.connection.resolve_url()
engine_kwargs: dict[str, Any] = {

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DatabaseConnectionConfig.connect_timeout_s is defined, validated (must be > 0), and tested, but it's never threaded into create_engine here (no connect_args={"connect_timeout": ...}, no pool_timeout). So the field is dead config — users who set it will think they've tightened the connect-time timeout but the underlying driver still uses its default. Either wire it into engine_kwargs["connect_args"] (driver-specific key — connect_timeout for psycopg/pymysql) or drop the field.

assert isinstance(tool, DatabaseExecutableTool)
env._executors[tool.id] = _import_executor(tool.executor, tool.id)
return env

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The > check lets statement_timeout_ms = 0 (or any negative) slip through when env-level is set. In Postgres, SET statement_timeout = 0 disables the timeout — so a per-tool override of 0 silently loosens the env-level guard rather than tightening it (the opposite of what the docstring promises). Suggest validating 0 < tool.statement_timeout_ms <= env_timeout, or rejecting non-positive values in DatabaseExecutableTool.__post_init__ alongside the existing executor check.

sqlalchemy.text(
"INSERT INTO allergies (patient_id, substance) "
"VALUES (:pid, :substance)"
),

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because the env runs in autocommit (one transaction per conn.execute), the DELETE commits before any INSERT runs. If the loop fails partway through (e.g. driver disconnect, statement timeout, executor crash), the patient's allergy list ends up empty or partial with no rollback. For a replace operation on PHI this is risky. Either run this executor under an explicit with db.begin(): block to make the swap atomic, or do it as a single statement (DELETE ... ; INSERT ... VALUES (..), (..), ...) inside one transaction.

{"pid": patient_id, "name": name},
).first()
if allergy_hit is not None:
return ToolResult(

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The allergy lookup uses LOWER(substance) = LOWER(:name) (case-insensitive), but the medications table PK is (patient_id, name) — case-sensitive. So prescribe_medication("Tylenol") and prescribe_medication("tylenol") both succeed and produce two rows for the same drug, while the allergy refusal path treats them as the same drug. Recommend normalizing the medication name (e.g. name.lower() or name.strip().lower()) before the duplicate check / insert so both paths agree on identity.

"SQLite does not support statement_timeout; ignoring "
"statement_timeout_ms=%s.",
env_timeout_ms,
)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The checkin reset only runs for postgresql / mysql, and that's fine — but note this whole mechanism relies on the per-tool override having been applied via _set_session_timeout on a checked-out connection. If a future subclass adds a path that mutates connection state outside _build_execution_context (e.g. a different transport that touches the same engine), reset coverage drifts. Worth a one-line comment here pointing back to _build_execution_context so the invariant ("anything we SET on checkout must be RESET on checkin") is documented in both places.

def _exec_sql_file(conn, path: Path) -> None:
"""Execute statements in a `.sql` file. Strips line comments before splitting."""
raw = path.read_text()
lines = [line for line in raw.splitlines() if not line.lstrip().startswith("--")]

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment-stripping splits on ;, which breaks if any future schema/seed file contains a string literal with a semicolon (e.g. INSERT ... VALUES ('Smith; Jr.')). Fine for the current fixtures, but flagging because tests/unit/environments/test_ehr_db_executors.py ships an identical helper — if either file's SQL grows, both helpers will silently mis-split. Consider using sqlalchemy's executescript (SQLite) or pulling the helper into a shared test util.

@wizeng23

Copy link
Copy Markdown
Collaborator

Is it possible to break this PR down? It's quite large which makes a thorough review tough.
Also, are the AI-suggested reviews helpful? Could you address and resolve them all (either adopt the fix or note that it's not helpful)?

@aniruddh-alt

Copy link
Copy Markdown
Contributor Author

Is it possible to break this PR down? It's quite large which makes a thorough review tough. Also, are the AI-suggested reviews helpful? Could you address and resolve them all (either adopt the fix or note that it's not helpful)?

Yes! I shall break this PR down and stack them instead! I meant to just keep it as a draft PR.

@aniruddh-alt

Copy link
Copy Markdown
Contributor Author

Closing this in favor of a phase-merge chain against a long-lived feature trunk aniruddh-alt/db-executable-env-trunk. Smaller reviewable units, no permanent multi-PR stack open at once, and each phase leaves the trunk buildable.

Open now (both draft, targeting the trunk):

What follows after these land (each its own phase PR against the trunk, then a final integration PR trunk → main):

  • Phase 3+: ExecutableEnvironment._step_one implementation (executor resolution, ToolResult + output_schema validation, _absorb_result invocation).
  • Phase 4+: DatabaseExecutableEnvironment.from_params (SQLAlchemy engine, fail-fast SELECT 1, dialect guards for read_only + statement_timeout_ms, per-tool tightening with checkin reset).
  • Phase 5+: DBAPIError auto-wrap (SQLSTATE from psycopg3 / psycopg2) and opt-in audit: true per-call logging.
  • Phase 6+: Postgres integration tests behind requires_postgres marker (testcontainers).
  • Phase 7+: EHR DB example (schema, seed, six executors, runnable synth config, e2e).

Liberate-bot's HIGH-priority findings on this PR are pre-addressed in the new chain:

  • connect_timeout_s will be threaded into engine_kwargs["connect_args"] in the engine phase (Phase 4+) or dropped — won't ship as dead config.
  • statement_timeout_ms = 0 is rejected at DatabaseExecutableEnvironmentKwargs.finalize_and_validate (already on [db-executable-env] Phase 2: DatabaseExecutableEnvironment + tool + connection-config skeleton #2468).
  • update_allergies atomicity: the EHR example phase will wrap DELETE+INSERT in with db.begin():.
  • prescribe_medication case-mismatch: medication name normalized before duplicate check.

Thanks for the review nudge @wizeng23 — the chain shape is much easier to review one phase at a time.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants