Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
*.idx

# build directory
build/
bin/

# vim temporary files
*.swp
Expand Down
107 changes: 107 additions & 0 deletions CLAUDE.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
# CLAUDE.md

This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository.

## What this is

`mgconsole` is a C++20 command-line client for the [Memgraph](https://memgraph.com) graph
database. It talks the Bolt protocol via the bundled `mgclient` library, reads Cypher from
either an interactive prompt (replxx) or stdin, and prints results as tabular / CSV / cypherl.

## Build

Dependencies are fetched and built from source via CMake `ExternalProject` (`gflags` pinned to
`70c01a6`, `mgclient` pinned to `v1.5.0`); `OpenSSL` and a C++20 compiler must be present. The
first configure/build is slow because of these external builds.

```bash
cmake -B build -G Ninja -DCMAKE_BUILD_TYPE=Release . # macOS: add -DOPENSSL_ROOT_DIR="$(brew --prefix openssl)"
cmake --build build
cmake --install build # installs to /usr (Linux) or /usr/local (macOS) by default
```

The binary lands at `build/src/mgconsole`. `compile_commands.json` is emitted into `build/`.
`-Wall -Wextra -pedantic -Werror` is enabled — warnings break the build.

**Static / release build** (matches what ships): `./build-generic-linux.sh` builds inside the
`memgraph/mgbuild` Docker image with the Memgraph toolchain and `-DMGCONSOLE_STATIC_SSL=ON`,
producing `build/generic/mgconsole`.

## Test

Tests are wired into CTest and require a Memgraph instance (binary or Docker image) to run
against — they spin one up, exercise the client, then tear it down.

```bash
# Configure tests against a Docker Memgraph (no local binary needed):
cmake -B build -G Ninja -DMEMGRAPH_USE_DOCKER=ON -DMEMGRAPH_DOCKER_IMAGE=memgraph/memgraph:latest
cmake --build build
ctest --verbose --test-dir build # runs all tests

ctest --test-dir build -R parameters-unit-test # single unit test (no DB needed)
ctest --test-dir build -R mgconsole-test # end-to-end I/O tests (plaintext)
ctest --test-dir build -R mgconsole-secure-test # same, over SSL
```

To point at a local Memgraph binary instead of Docker, configure with
`-DMEMGRAPH_PATH=/path/to/memgraph` and leave `MEMGRAPH_USE_DOCKER=OFF` (default).

Two test kinds:
- **Unit** (`tests/unit/`): `parameters_test.cpp` links the standalone `params` library and runs
without a database.
- **End-to-end** (`tests/input_output/`): driven by `run-tests.sh`. For every file in `input/`,
it runs the client once per output format and diffs stdout against the matching golden file in
`output_tabular/`, `output_csv/`, etc. **When you change output formatting or add an input
case, regenerate/add the corresponding golden file in every `output_*` directory** — the test
matrix is the cross-product of inputs × formats.

## Architecture

`main.cpp` parses gflags, sets up signal handlers, and dispatches to exactly one of four "modes"
based on whether stdin is a TTY and the `--import-mode` flag. Each mode lives in its own
translation unit under the `mode::` namespace and implements a `Run(...)` entry point:

- **`mode::interactive`** (`interactive.cpp`) — chosen when stdin is a TTY. The replxx REPL loop:
reads a query, handles `:param`/`:params`/`:help`/`:quit` commands, executes via `mgclient`,
prints results, manages history, and reconnects (3 retries) on fatal connection errors.
- **`mode::serial_import`** (`serial_import.cpp`) — default non-interactive (piped) path. Reads
queries one at a time and executes them in order. This is the `DUMP DATABASE | mgconsole` path.
- **`mode::batch_import`** (`batch_import.cpp`) — `--import-mode=batched-parallel`. EXPERIMENTAL.
Classifies each query (via `QueryInfo`) as pre/vertex/edge/post, groups vertex and edge queries
into batches, and executes batches concurrently across a thread pool of `--workers-number`
Bolt sessions, with exponential backoff + retry on failure. Vertices are flushed before edges
because edges depend on existing vertices. Query classification is heuristic — that's why this
mode is experimental.
- **`mode::parsing`** (`parsing.cpp`) — `--import-mode=parser`. Parses queries and prints
`QueryInfo` stats without touching the database.

All four modes funnel through shared primitives in `src/utils/`, organized by namespace within
`utils.hpp`/`utils.cpp` (a large ~1300-line file):

- **`query::`** — `GetQuery()` reads and accumulates a complete (`;`-terminated, possibly
multi-line) query from the input source, optionally producing `QueryInfo` (the has_create /
has_match / has_merge / ... flags that drive batch classification). `ExecuteQuery()` /
`ExecuteBatch()` run against an `mg_session`. `QueryResult` carries records, header, timing,
notifications, and execution stats.
- **`console::`** — TTY detection, line reading, `Echo*` output helpers (failure/info/stats).
- **`format::`** — `CsvOptions` and `OutputOptions`; `Output()` renders a result set as tabular,
CSV, or cypherl.
- **`utils::bolt`** (`bolt.hpp`/`bolt.cpp`) — `Config` struct and `MakeBoltSession()`, the single
place sessions are created (direct or routing connection).

`src/parameters.{hpp,cpp}` is deliberately a **separate static library (`params`)** depending
only on `mgclient`, so the `:param` parsing/storage logic can be unit-tested in isolation. Don't
add heavier dependencies to it.

Concurrency support for batch mode is custom and lives in `utils/`: `thread_pool`, `future`
(promise/future with notification hooks), `notifier`, and `synchronized`. `mg_memory.hpp` wraps
raw `mgclient` C pointers in RAII unique-ptr types (`MgSessionPtr`, `MgValuePtr`, etc.) — use
these rather than managing `mg_*` lifetimes by hand.

## Conventions

- Every source file carries the GPLv3 license header — copy it onto new files.
- Formatting is enforced by `.clang-format` (Google base, 120 col). Run `clang-format` before
committing.
- `MG_ASSERT` / `MG_FAIL` (`utils/assert.hpp`) are the assertion/abort macros.
- `date.hpp` is a large vendored third-party header (Howard Hinnant's date lib) — don't edit it.
41 changes: 36 additions & 5 deletions src/batch_import.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

#include "batch_import.hpp"

#include <chrono>
#include <random>
#include <thread>
#include <unordered_map>
Expand Down Expand Up @@ -156,10 +157,19 @@ struct BatchExecutionContext {
: batch_size(batch_size),
max_batches(max_batches),
max_concurrent_executions(max_concurrent_executions),
thread_pool(max_concurrent_executions) {
thread_pool(max_concurrent_executions),
config(bolt_config) {
sessions.reserve(max_concurrent_executions);
for (uint64_t thread_i = 0; thread_i < max_concurrent_executions; ++thread_i) {
sessions[thread_i] = MakeBoltSession(bolt_config);
// Each worker connects to the resolved main. In routed mode every worker
// re-routes independently at creation, so the coordinator may receive up
// to `workers_number` (e.g. 32) ROUTE calls at startup. The TTL expiry of
// the last route determines when sessions are refreshed (see Run).
if (config.routed_connection) {
sessions.push_back(utils::bolt::MakeRoutedBoltSession(config, &expiry));
} else {
sessions.push_back(MakeBoltSession(config));
}
if (!sessions[thread_i].get()) {
MG_FAIL("a session uninitialized");
}
Expand All @@ -175,6 +185,10 @@ struct BatchExecutionContext {
utils::ThreadPool thread_pool{max_concurrent_executions};
utils::Notifier notifier;
std::vector<mg_memory::MgSessionPtr> sessions;
/// Connection config; kept so workers can be reconnected / re-routed.
utils::bolt::Config config;
/// When the current routing table's TTL expires (routed mode only).
std::chrono::steady_clock::time_point expiry{};
};

Batches FetchBatches(BatchExecutionContext &execution_context) {
Expand All @@ -201,7 +215,7 @@ Batches FetchBatches(BatchExecutionContext &execution_context) {
void ExecuteSerial(const std::vector<query::Query> &queries, BatchExecutionContext &context) {
for (const auto &query : queries) {
try {
query::ExecuteQuery(context.sessions[0].get(), query.query);
query::ExecuteQuery(context.sessions[0].get(), query.query, nullptr, context.config.db);
} catch (const utils::ClientQueryException &e) {
console::EchoFailure("Client received query exception", e.what());
MG_FAIL("Unable to ExecuteSerial");
Expand Down Expand Up @@ -248,7 +262,7 @@ uint64_t ExecuteBatchesParallel(std::vector<query::Batch> &batches, BatchExecuti
if (batch.backoff > 1) {
std::this_thread::sleep_for(std::chrono::milliseconds(batch.backoff));
}
auto ret = query::ExecuteBatch(execution_context.sessions[thread_i].get(), batch);
auto ret = query::ExecuteBatch(execution_context.sessions[thread_i].get(), batch, bolt_config.db);
if (ret.is_executed) {
batch.is_executed = true;
executed_batches++;
Expand All @@ -265,7 +279,11 @@ uint64_t ExecuteBatchesParallel(std::vector<query::Batch> &batches, BatchExecuti
promise->Fill(false);
}
if (mg_session_status(execution_context.sessions[thread_i].get()) == MG_SESSION_BAD) {
execution_context.sessions[thread_i] = MakeBoltSession(bolt_config);
if (bolt_config.routed_connection) {
execution_context.sessions[thread_i] = utils::bolt::MakeRoutedBoltSession(bolt_config, nullptr);
} else {
execution_context.sessions[thread_i] = MakeBoltSession(bolt_config);
}
}
});
f_execs.insert_or_assign(thread_i, std::move(future));
Expand All @@ -287,6 +305,19 @@ int Run(const utils::bolt::Config &bolt_config, int batch_size, int workers_numb
// (workers_number).
BatchExecutionContext execution_context(batch_size, workers_number, workers_number, bolt_config);
while (true) {
// Round boundary checkpoint (single-threaded): in routed mode, if the
// routing table's TTL has expired, re-route once to find the (possibly new)
// main and reconnect every worker session to it. This keeps proactive TTL
// refresh out of the parallel rounds where it would be race-prone.
if (bolt_config.routed_connection && std::chrono::steady_clock::now() >= execution_context.expiry) {
for (uint64_t thread_i = 0; thread_i < execution_context.max_concurrent_executions; ++thread_i) {
execution_context.sessions[thread_i] =
utils::bolt::MakeRoutedBoltSession(bolt_config, &execution_context.expiry);
if (!execution_context.sessions[thread_i].get()) {
MG_FAIL("failed to re-route a worker session after TTL expiry");
}
}
Comment thread
as51340 marked this conversation as resolved.
}
auto batches = FetchBatches(execution_context);
if (batches.Empty()) {
break;
Expand Down
32 changes: 18 additions & 14 deletions src/interactive.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ namespace params = query::params;
// Evaluates a Cypher expression server-side and returns a copy of the resulting
// value. Existing parameters are made available to the expression.
mg_memory::MgValuePtr EvaluateParamExpression(mg_session *session, const std::string &expression,
const params::ParamStore &store) {
auto result = query::ExecuteQuery(session, "RETURN " + expression, store.AsMap().get());
const params::ParamStore &store, const std::string &db) {
auto result = query::ExecuteQuery(session, "RETURN " + expression, store.AsMap().get(), db);
if (result.records.empty() || mg_list_size(result.records.front().get()) == 0) {
throw utils::ClientQueryException("expression did not produce a value");
}
Expand All @@ -59,7 +59,8 @@ void ListParams(const params::ParamStore &store) {
// Handles a `:param`/`:params` command line. Query-level failures (e.g. a bad
// expression) are reported without aborting the shell; fatal connection
// failures propagate to the reconnect logic in Run.
void HandleParamCommand(mg_session *session, params::ParamStore &store, const std::string &line) {
void HandleParamCommand(mg_session *session, params::ParamStore &store, const std::string &line,
const std::string &db) {
const auto parsed = params::ParseParamCommand(line);
if (!parsed.command) {
console::EchoFailure("Invalid parameter command", parsed.error);
Expand All @@ -68,7 +69,7 @@ void HandleParamCommand(mg_session *session, params::ParamStore &store, const st
switch (parsed.command->kind) {
case params::ParamCommand::Kind::kSet:
try {
auto value = EvaluateParamExpression(session, parsed.command->expression, store);
auto value = EvaluateParamExpression(session, parsed.command->expression, store, db);
store.Set(parsed.command->name, value.get());
console::EchoInfo("Set parameter '" + parsed.command->name + "'");
} catch (const utils::ClientQueryException &e) {
Expand Down Expand Up @@ -153,8 +154,8 @@ int Run(utils::bolt::Config &bolt_config, const std::string &history, bool no_hi
};

int num_retries = 3;
auto session = MakeBoltSession(bolt_config);
if (session.get() == nullptr) {
utils::bolt::RoutedSession session(bolt_config);
if (!session.Connected()) {
cleanup_resources();
return 1;
}
Expand All @@ -179,15 +180,20 @@ int Run(utils::bolt::Config &bolt_config, const std::string &history, bool no_hi

try {
if (query->is_param_command) {
HandleParamCommand(session.get(), param_store, query->query);
HandleParamCommand(session.Get(), param_store, query->query, bolt_config.db);
auto history_ret = save_history();
if (history_ret != 0) {
cleanup_resources();
return history_ret;
}
continue;
}
auto ret = query::ExecuteQuery(session.get(), query->query, param_store.AsMap().get());
// Resolve the session (may proactively re-route) before observing this query: the re-route decision must
// use the transaction state as it was *before* this query, so a COMMIT/ROLLBACK still runs on the session
// that holds the open transaction.
auto *session_ptr = session.Get();
session.ObserveQuery(query->query);
auto ret = query::ExecuteQuery(session_ptr, query->query, param_store.AsMap().get(), bolt_config.db);
if (ret.records.size() > 0) {
Output(ret.header, ret.records, output_opts, csv_opts);
}
Expand Down Expand Up @@ -220,14 +226,12 @@ int Run(utils::bolt::Config &bolt_config, const std::string &history, bool no_hi
console::EchoFailure("Client received connection exception", e.what());
console::EchoInfo("Trying to reconnect...");
bool is_connected = false;
session.reset(nullptr);
while (num_retries > 0) {
--num_retries;
session = utils::bolt::MakeBoltSession(bolt_config);
if (session.get() == nullptr) {
console::EchoFailure("Connection failure", mg_session_error(session.get()));
session.reset(nullptr);
} else {
// In routed mode this re-fetches the routing table (failover); in
// direct mode it reconnects to the same instance.
session.Reconnect();
if (session.Connected()) {
is_connected = true;
break;
}
Expand Down
26 changes: 19 additions & 7 deletions src/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,12 @@ DEFINE_int32(port, 7687, "Server port.");
DEFINE_string(username, "", "Database username.");
DEFINE_string(password, "", "Database password.");
DEFINE_bool(use_ssl, false, "Use SSL when connecting to the server.");
DEFINE_string(connection_type, "direct",
"(direct|routing) If routing, uses client-side routing protocol to connect to the main data instance.");
// Selects the target database (sent in the ROUTE extra and in every RUN/BEGIN extra). When empty, the
// server's default database is used. Multi-database selection requires Memgraph enterprise; community ignores it.
DEFINE_string(db, "", "Database to use. When set, queries run against this database (enterprise multi-tenancy); "
"a non-existent database fails with 'Unknown database name'. Empty uses the default database.");

// output
DEFINE_bool(fit_to_screen, false, "Fit output width to screen width.");
Expand Down Expand Up @@ -180,13 +186,19 @@ int main(int argc, char **argv) {

#endif /* _WIN32 */

utils::bolt::Config bolt_config{
.host = FLAGS_host,
.port = FLAGS_port,
.username = FLAGS_username,
.password = FLAGS_password,
.use_ssl = FLAGS_use_ssl,
};
auto const connection_type = utils::ToLowerCase(FLAGS_connection_type);
if (connection_type != "direct" && connection_type != "routing") {
console::EchoFailure("Unsupported connection type!", "Connection type can be 'direct' or 'routing'.");
return 1;
}

utils::bolt::Config bolt_config{.db = FLAGS_db,
.host = FLAGS_host,
.username = FLAGS_username,
.password = FLAGS_password,
.port = FLAGS_port,
.use_ssl = FLAGS_use_ssl,
.routed_connection = connection_type == "routing"};

if (console::is_a_tty(STDIN_FILENO)) { // INTERACTIVE
auto const history_file = std::invoke([&]() -> std::string {
Expand Down
10 changes: 7 additions & 3 deletions src/serial_import.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ using namespace std::string_literals;

int Run(const utils::bolt::Config &bolt_config, const format::CsvOptions &csv_opts,
const format::OutputOptions &output_opts) {
auto session = MakeBoltSession(bolt_config);
if (session.get() == nullptr) {
utils::bolt::RoutedSession session(bolt_config);
if (!session.Connected()) {
return 1;
}

Expand All @@ -36,7 +36,11 @@ int Run(const utils::bolt::Config &bolt_config, const format::CsvOptions &csv_op
}

try {
auto ret = query::ExecuteQuery(session.get(), query->query);
// Resolve the session (may proactively re-route) before observing this query so transaction-control
// statements in a dump stream are not interrupted by a re-route mid-transaction.
auto *session_ptr = session.Get();
session.ObserveQuery(query->query);
auto ret = query::ExecuteQuery(session_ptr, query->query, nullptr, bolt_config.db);
if (ret.records.size() > 0) {
Output(ret.header, ret.records, output_opts, csv_opts);
}
Expand Down
Loading
Loading