Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,12 @@ use std::{
future::{self, Future},
str::FromStr,
sync::LazyLock,
time,
};
use url::Url;
use user_facing_errors::schema_engine::DatabaseSchemaInconsistent;

use super::{SqlConnector, SqlDialect, UsingExternalShadowDb};

const ADVISORY_LOCK_TIMEOUT: time::Duration = time::Duration::from_secs(10);

/// Connection settings applied to every new connection on CockroachDB.
///
/// https://www.cockroachlabs.com/docs/stable/experimental-features.html
Expand Down Expand Up @@ -370,19 +367,42 @@ impl SqlConnector for PostgresConnector {
self.with_connection(|connection, params| async {
// https://www.postgresql.org/docs/current/explicit-locking.html#ADVISORY-LOCKS

// We use `pg_try_advisory_lock` rather than `pg_advisory_lock` so that the
// call returns immediately instead of blocking. Blocking on an advisory
// lock can cause deadlocks when another session is running operations such
// as `CREATE INDEX CONCURRENTLY`, which require waiting for any open
// transactions/locks to finish (see issue #5755).
//
// 72707369 is a unique number we chose to identify Migrate. It does not
// have any meaning, but it should not be used by any other tool.
crosstarget_utils::time::timeout(
ADVISORY_LOCK_TIMEOUT,
connection.raw_cmd("SELECT pg_advisory_lock(72707369)"),
)
.await
.map_err(|_| ConnectorError::user_facing(user_facing_errors::common::DatabaseTimeout {
context: format!(
"Timed out trying to acquire a postgres advisory lock (SELECT pg_advisory_lock(72707369)). Timeout: {}ms. See https://pris.ly/d/migrate-advisory-locking for details.", ADVISORY_LOCK_TIMEOUT.as_millis()
),
}))?
.map_err(imp::quaint_error_mapper(params))?;
const MIGRATE_ADVISORY_LOCK_ID: i64 = 72707369;

let result = connection
.query_raw(
&format!("SELECT pg_try_advisory_lock({MIGRATE_ADVISORY_LOCK_ID})"),
&[],
)
.await
.map_err(imp::quaint_error_mapper(params))?;

let acquired = result
.first()
.and_then(|row| row.at(0).and_then(|v| v.as_bool()))
.ok_or_else(|| {
ConnectorError::from_msg(
"Unexpected result from pg_try_advisory_lock(72707369): \
expected a single boolean column."
.to_owned(),
)
})?;
Comment thread
onno-vos-dev marked this conversation as resolved.

if !acquired {
return Err(ConnectorError::from_msg(format!(
"Could not acquire the postgres advisory lock (SELECT pg_try_advisory_lock({MIGRATE_ADVISORY_LOCK_ID})). \
Another instance is likely running migrations against this database. \
See https://pris.ly/d/migrate-advisory-locking for details."
)));
}
Comment thread
onno-vos-dev marked this conversation as resolved.

Ok(())
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
pub mod shadow_db;

use crate::BitFlags;
use crate::flavour::postgres::{ADVISORY_LOCK_TIMEOUT, Circumstances, PostgresProvider};
use crate::flavour::postgres::{Circumstances, PostgresProvider};
use crate::flavour::quaint_error_to_connector_error;
use psl::PreviewFeature;
use quaint::connector::{ExternalConnector, Queryable};
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use quaint::prelude::Queryable;
use sql_migration_tests::multi_engine_test_api::*;
use std::sync::Arc;
use test_macros::test_connector;
Expand Down Expand Up @@ -37,45 +38,101 @@ fn advisory_locking_works(mut api: TestApi) {
.apply_migrations(&migrations_directory)
.send()
.await
.unwrap()
.into_output()
.map(|r| r.into_output())
},
async move {
first_me
.apply_migrations(&migrations_directory_2)
.send()
.await
.unwrap()
.into_output()
.map(|r| r.into_output())
},
async move {
third_me
.apply_migrations(&migrations_directory_3)
.send()
.await
.unwrap()
.into_output()
.map(|r| r.into_output())
},
)
});

let results = &[&result_1, &result_2, &result_3];
let results = [&result_1, &result_2, &result_3];

let applied_results_count = results
.iter()
.filter(|result| {
let applied_migration_names = &result.applied_migration_names;

applied_migration_names.len() == 1 && applied_migration_names[0] == migration_name
.filter(|result| match result {
Ok(out) => {
out.applied_migration_names.len() == 1 && out.applied_migration_names[0] == migration_name
}
Err(_) => false,
})
.count();

assert_eq!(applied_results_count, 1);

let empty_results_count = results
// The other two engines either succeeded with nothing to apply (lock acquired
// after the first engine was done) or, on Postgres, failed fast with an
// advisory-lock contention error (`pg_try_advisory_lock` returned false).
let other_results = results
.iter()
.filter(|result| result.applied_migration_names.is_empty())
.count();
.filter(|result| match result {
Ok(out) => {
!(out.applied_migration_names.len() == 1 && out.applied_migration_names[0] == migration_name)
}
Err(_) => true,
})
.collect::<Vec<_>>();

assert_eq!(other_results.len(), 2);

for result in other_results {
match result {
Ok(out) => assert!(out.applied_migration_names.is_empty()),
Err(err) => assert!(
err.to_string().contains("advisory lock"),
"unexpected error: {err}"
),
}
}
}

// Regression test for https://github.com/prisma/prisma-engines/issues/5755.
// We hold a session-level advisory lock on `72707369` from an external
// connection, then try to apply a migration. The schema engine must fail fast
// with an advisory-lock error instead of blocking (and potentially deadlocking
// against concurrent operations such as `CREATE INDEX CONCURRENTLY`).
#[test_connector(tags(Postgres), exclude(CockroachDb))]
fn postgres_advisory_lock_contention_fails_fast(mut api: TestApi) {
let mut me = api.new_engine();
let migrations_directory = api.create_migrations_directory();

let dm = api.datamodel_with_provider(
r#"
model Cat {
id Int @id
inBox Boolean
}
"#,
);

me.create_migration("01initial", &dm, &migrations_directory)
.draft(true)
.send_sync();

// Acquire a dedicated connection (separate from the engine's connection)
// and grab the same advisory lock the schema engine uses.
let lock_holder = tok(quaint::single::Quaint::new(api.connection_string())).unwrap();
tok(lock_holder.raw_cmd("SELECT pg_advisory_lock(72707369)")).unwrap();

assert_eq!(empty_results_count, 2);
let err = tok(async { me.apply_migrations(&migrations_directory).send().await }).unwrap_err();

// Release the lock so the test database teardown is not blocked.
tok(lock_holder.raw_cmd("SELECT pg_advisory_unlock(72707369)")).unwrap();
Comment on lines +128 to +131

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Description: Read-only check for existing async timeout patterns and Tokio time feature availability.
# Expectation: Either existing Rust tests already use tokio::time::timeout/time::timeout,
# or Cargo.toml enables Tokio's `time`/`full` feature for the relevant test workspace.

rg -n -C3 'tokio::time::timeout|time::timeout' --type=rust
rg -n -C3 'tokio.*features|features\s*=\s*\[[^]]*(time|full)' Cargo.toml

Repository: prisma/prisma-engines

Length of output: 2922


🏁 Script executed:

# Check the specific Cargo.toml for the test workspace
find . -path "*/schema-engine/sql-migration-tests/Cargo.toml" -type f

Repository: prisma/prisma-engines

Length of output: 112


🏁 Script executed:

# Look at the test file context around lines 128-131
sed -n '120,140p' schema-engine/sql-migration-tests/tests/migrations/advisory_locking.rs

Repository: prisma/prisma-engines

Length of output: 835


🏁 Script executed:

# Check if crosstarget_utils is a dependency of the test workspace
rg -n 'crosstarget' schema-engine/sql-migration-tests/Cargo.toml

Repository: prisma/prisma-engines

Length of output: 47


🏁 Script executed:

# Check the tokio dependency in the test workspace's Cargo.toml
cat schema-engine/sql-migration-tests/Cargo.toml | grep -A 5 '^\[dependencies\]' | head -50

Repository: prisma/prisma-engines

Length of output: 234


🏁 Script executed:

# Also check for any dev-dependencies that might include timeout utilities
rg -n 'tokio|timeout' schema-engine/sql-migration-tests/Cargo.toml -A 2

Repository: prisma/prisma-engines

Length of output: 155


🏁 Script executed:

# Check the workspace-level tokio definition in the root Cargo.toml
rg -n 'tokio.*features' Cargo.toml | head -5

Repository: prisma/prisma-engines

Length of output: 116


🏁 Script executed:

# Get more context around the tokio dependency in the workspace
rg -n 'tokio\s*=' Cargo.toml -A 3 | head -20

Repository: prisma/prisma-engines

Length of output: 270


🏁 Script executed:

# Check what workspace members exist and if crosstarget_utils is available
fd -t f 'Cargo.toml' | grep -E '(workspace|crosstarget)' | head -10

Repository: prisma/prisma-engines

Length of output: 99


🏁 Script executed:

# Check if crosstarget_utils is available as a workspace dependency
rg -n 'crosstarget' Cargo.toml | head -20

Repository: prisma/prisma-engines

Length of output: 121


🏁 Script executed:

# Look for how other tests handle timeouts or async waits
rg -n 'tokio::time|timeout|Duration' schema-engine/sql-migration-tests/tests/ -t rust | head -20

Repository: prisma/prisma-engines

Length of output: 3082


🏁 Script executed:

# Check the crosstarget_utils timeout implementation
cat libs/crosstarget-utils/src/native/time.rs

Repository: prisma/prisma-engines

Length of output: 858


🏁 Script executed:

# Check if crosstarget_utils is already a dependency of sql-schema-connector or related packages
rg -n 'crosstarget' schema-engine/ -t toml | grep -v test

Repository: prisma/prisma-engines

Length of output: 381


🏁 Script executed:

# Check how the test file imports and uses utilities
head -50 schema-engine/sql-migration-tests/tests/migrations/advisory_locking.rs

Repository: prisma/prisma-engines

Length of output: 1698


Use crosstarget_utils timeout for fail-fast regression detection.

Line 128 holds the conflicting advisory lock and then awaits apply_migrations with no deadline; if this regresses back to a blocking lock, the test hangs until the suite-level timeout instead of failing at the regression point. Wrap only this await in a short async timeout.

To implement this, add crosstarget-utils as a dependency to the test workspace and wrap the await using the repository's established timeout pattern:

🧪 Proposed test hardening
-    let err = tok(async { me.apply_migrations(&migrations_directory).send().await }).unwrap_err();
+    let err = tok(async {
+        crosstarget_utils::time::timeout(
+            std::time::Duration::from_secs(5),
+            me.apply_migrations(&migrations_directory).send(),
+        )
+        .await
+        .expect("apply_migrations did not fail fast while the advisory lock was held")
+    })
+    .unwrap_err();

Also add to schema-engine/sql-migration-tests/Cargo.toml:

crosstarget-utils.workspace = true
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@schema-engine/sql-migration-tests/tests/migrations/advisory_locking.rs`
around lines 128 - 131, Add crosstarget-utils as a workspace dependency to
schema-engine/sql-migration-tests/Cargo.toml to enable timeout functionality.
Then wrap the await on the
me.apply_migrations(&migrations_directory).send().await call with a short async
timeout using the repository's established timeout pattern from
crosstarget-utils. This ensures that if the code regresses to blocking behavior
on the advisory lock, the test will fail quickly at the regression point rather
than hanging until the suite-level timeout.


let msg = err.to_string();
assert!(
msg.contains("pg_try_advisory_lock") && msg.contains("Another instance"),
"unexpected error: {msg}"
);
}
Comment thread
onno-vos-dev marked this conversation as resolved.