Rust bindings for Postgres

Over the libpq C library

We provide a new set of bindings over Postgresql libpq C library, that focuses to be Rusty but stick to the original library as much as possible.

The main features that we want to support are:

  • tracing statements,
  • collecting notice messages,
  • query execution,
  • pretty-printing results,
  • notify and listen facilities,
  • error messages in both connections and query results.

The main repo is on Github, which hosts the corresponding crate doc too.

Test: catch_notices

Verifies that server NOTICE messages are captured via PgConn::set_notice_processor, and that query results can be printed and round-tripped back into a string representation.

  • Connects via PgConn::connect_db_env_vars() and asserts ConnStatusType_CONNECTION_OK.
  • Enables libpq tracing to ./test-out/trace.log.
  • Installs a notice processor callback that pushes notice strings into a local Vec<String>.

Runs a query that:

  • emits two server notices (raise notice 'Hello,'; and raise notice 'world!';), and
  • returns a small result set (select 1 as one, 2 as two;).

Output, formatting checks:

  • Prints the result to ./test-out/res.out using res.print(...).
  • Reads the file back and asserts res.to_string() matches the printed output.

Confirms two notices were captured, in order:

  • "NOTICE: Hello,\n"
  • "NOTICE: world!\n"

On the one hand, the output for results looks like this:

+-----+-----+
| one | two |
+-----+-----+
|   1 |   2 |
+-----+-----+
(1 row)

On the other hand, the output for tracing the given query looks like this:

2026-01-27 09:34:36.826258	F	97	Query	 "do $$ begin raise notice 'Hello,'; raise notice 'world!'; end $$; select 1 as one, 2 as two;"
2026-01-27 09:34:36.826952	B	123	NoticeResponse	 S "NOTICE" V "NOTICE" C "00000" M "Hello," W "PL/pgSQL function inline_code_block line 1 at RAISE" F "pl_exec.c" L "3923" R "exec_stmt_raise" \x00
2026-01-27 09:34:36.826959	B	123	NoticeResponse	 S "NOTICE" V "NOTICE" C "00000" M "world!" W "PL/pgSQL function inline_code_block line 1 at RAISE" F "pl_exec.c" L "3923" R "exec_stmt_raise" \x00
2026-01-27 09:34:36.826961	B	7	CommandComplete	 "DO"
2026-01-27 09:34:36.826964	B	50	RowDescription	 2 "one" 0 0 23 4 -1 0 "two" 0 0 23 4 -1 0
2026-01-27 09:34:36.826967	B	16	DataRow	 2 1 '1' 1 '2'
2026-01-27 09:34:36.826968	B	13	CommandComplete	 "SELECT 1"
2026-01-27 09:34:36.826969	B	5	ReadyForQuery	 I
2026-01-27 09:34:36.827770	F	4	Terminate

#[test]
fn catch_notices() {
    let mut conn =
        PgConn::connect_db_env_vars().expect("Failed to create PGconn from connection string.");

    assert_eq!(conn.status(), ConnStatusType_CONNECTION_OK);

    conn.trace("./test-out/trace.log");

    let mut w = Vec::new();

    let _w_pusher = conn.set_notice_processor(|s| w.push(s));

    let query = "do $$ begin raise notice 'Hello,'; raise notice 'world!'; end $$; select 1 as one, 2 as two;";

    let mut res = conn.exec(query).expect("Failed to execute query.");

    res.print(
        "./test-out/res.out",
        true,
        true,
        "|",
        true,
        false,
        false,
        false,
    );

    let s =
        fs::read_to_string("./test-out/res.out").expect("Should have been able to read the file");

    assert_eq!(res.to_string(), s);

    assert_eq!(res.status(), ExecStatusType_PGRES_TUPLES_OK);
    assert_eq!(res.error_message(), "");
    assert!(res.error_field(PG_DIAG_SEVERITY).is_none());
    assert_eq!(res.cmd_status(), "SELECT 1");

    assert_eq!(w.len(), 2);
    assert_eq!(w[0], "NOTICE:  Hello,\n");
    assert_eq!(w[1], "NOTICE:  world!\n");
}

Test: listen_notify

Verifies that PostgreSQL LISTEN/NOTIFY notifications are delivered and can be consumed through libpq-rs. Based on Example 32.2.

  • Listener thread

    • Connects via PgConn::connect_db_env_vars().
    • Asserts the connection is OK: ConnStatusType_CONNECTION_OK.
    • Executes LISTEN TBL2 and asserts ExecStatusType_PGRES_COMMAND_OK.
  • Main thread (sender)

    • Sleeps 100ms to give the listener time to subscribe.
    • Connects via PgConn::connect_db_env_vars() and checks status.
    • Executes NOTIFY TBL2 five times, asserting PGRES_COMMAND_OK each time.

In the listener thread:

  • Loops up to 5 times.
  • Each iteration:
    1. Waits for the socket to become readable with conn.socket().poll(true, false, Some(10.0)) (10s timeout).
    2. On readiness:
      • Calls conn.consume_input() to read data into libpq.
      • Drains queued notifications via while let Some(notify) = conn.notifies() { ... }.
      • Asserts for each notification:
        • notify.relname() == "tbl2"
        • notify.extra() == "" (no payload)
      • Pushes notify.relname() into recvs.

After joining the listener thread:

  • recvs.len() == 5
  • recvs == vec!["tbl2", "tbl2", "tbl2", "tbl2", "tbl2"]

PostgreSQL folds unquoted identifiers to lowercase, so TBL2 is received as "tbl2".


#[test]
fn listen_notify() {
    let handle = thread::spawn(|| {
        let mut conn =
            PgConn::connect_db_env_vars().expect("Failed to create PGconn from connection string.");

        assert_eq!(conn.status(), ConnStatusType_CONNECTION_OK);

        {
            let res = conn.exec("LISTEN TBL2").expect("Failed to execute LISTEN.");
            assert_eq!(res.status(), ExecStatusType_PGRES_COMMAND_OK);
        }

        let mut recvs = Vec::new();

        for _ in 0..5 {
            match conn.socket().poll(true, false, Some(10.0)) {
                Ok(()) => {
                    conn.consume_input().expect("Failed to consume input.");

                    while let Some(notify) = conn.notifies() {
                        assert_eq!(notify.relname(), "tbl2");
                        assert_eq!(notify.extra(), "");

                        recvs.push(notify.relname());

                        conn.consume_input().expect("Failed to consume input.");
                    }
                }
                Err(_e) => break,
            }
        }

        recvs
    });

    // Give the listener a moment to set up.
    thread::sleep(std::time::Duration::from_millis(100));

    // Now send some NOTIFY messages.

    let conn =
        PgConn::connect_db_env_vars().expect("Failed to create PGconn from connection string.");

    assert_eq!(conn.status(), ConnStatusType_CONNECTION_OK);

    for _ in 0..5 {
        let res = conn.exec("NOTIFY TBL2").expect("Failed to execute NOTIFY.");
        assert_eq!(res.status(), ExecStatusType_PGRES_COMMAND_OK);
    }

    let recvs = handle.join().expect("Thread panicked.");

    assert_eq!(recvs.len(), 5);
    assert_eq!(recvs, vec!["tbl2", "tbl2", "tbl2", "tbl2", "tbl2"]);
}

Test: listen_notify_api

Verifies the higher-level PgConn::listen API by exercising PostgreSQL LISTEN/NOTIFY end-to-end.

Unlike listen_notify (which manually polls the socket and drains conn.notifies()), this test delegates the waiting/consumption loop to PgConn::listen.

What it sets up:

  • Listener thread

    • Connects via PgConn::connect_db_env_vars().
    • Asserts the connection is OK: ConnStatusType_CONNECTION_OK.
    • Executes LISTEN TBL3 and asserts ExecStatusType_PGRES_COMMAND_OK.
    • Calls conn.listen(Some(1.0), callback) to collect notifications.
  • Main thread (sender)

    • Sleeps 100ms to give the listener time to subscribe.
    • Connects via PgConn::connect_db_env_vars() and checks status.
    • Executes NOTIFY TBL3 five times, asserting PGRES_COMMAND_OK each time.

The listener thread calls:

  • conn.listen(Some(1.0), |_i, notify| ControlFlow::Continue(Some(notify.relname())))

Where:

  • Some(1.0) is the poll/timeout interval (seconds) used by the listening loop.
  • The callback returns ControlFlow::Continue(Some(value)) to:
    • keep listening, and
    • append value (here, notify.relname()) to the collected results.

After joining the listener thread:

  • recvs.len() == 5
  • recvs == vec!["tbl3", "tbl3", "tbl3", "tbl3", "tbl3"]

PostgreSQL folds unquoted identifiers to lowercase, so TBL3 is received as "tbl3".


#[test]
fn listen_notify_api() {
    let handle = thread::spawn(|| {
        let mut conn =
            PgConn::connect_db_env_vars().expect("Failed to create PGconn from connection string.");

        assert_eq!(conn.status(), ConnStatusType_CONNECTION_OK);

        let res = conn.listen("TBL3").expect("Failed to execute LISTEN.");
        assert_eq!(res.status(), ExecStatusType_PGRES_COMMAND_OK);

        conn.listen_loop(Some(1.0), |_i, notify| {
            ControlFlow::Continue(Some(notify.relname()))
        })
    });

    // Give the listener a moment to set up.
    thread::sleep(std::time::Duration::from_millis(100));

    // Now send some NOTIFY messages.

    let mut conn =
        PgConn::connect_db_env_vars().expect("Failed to create PGconn from connection string.");

    assert_eq!(conn.status(), ConnStatusType_CONNECTION_OK);

    for _ in 0..5 {
        let res = conn
            .notify("TBL3", None)
            .expect("Failed to execute NOTIFY.");
        assert_eq!(res.status(), ExecStatusType_PGRES_COMMAND_OK);
    }

    let recvs = handle.join().expect("Thread panicked.");

    assert_eq!(recvs.len(), 5);
    assert_eq!(recvs, vec!["tbl3", "tbl3", "tbl3", "tbl3", "tbl3"]);
}

Test: select_from_non_existing_table

Verifies error handling when executing a query against a non-existent relation.

What it does:

  • Connects via PgConn::connect_db_env_vars() and asserts ConnStatusType_CONNECTION_OK.
  • Executes:
    • select * from this_table_does_not_exist;

Assertions:

  • The PgResult error message matches the expected PostgreSQL error text (including caret line).
  • The connection-level conn.error_message() matches the same text.
  • The result status is ExecStatusType_PGRES_FATAL_ERROR.

#[test]
fn select_from_non_existing_table() {
    let conn =
        PgConn::connect_db_env_vars().expect("Failed to create PGconn from connection string.");

    assert_eq!(conn.status(), ConnStatusType_CONNECTION_OK);

    let query = "select * from this_table_does_not_exist;";
    let res = conn.exec(query).expect("Failed to execute query.");

    assert_eq!(
        "ERROR:  relation \"this_table_does_not_exist\" does not exist
LINE 1: select * from this_table_does_not_exist;
                      ^
",
        res.error_message()
    );

    assert_eq!(
        "ERROR:  relation \"this_table_does_not_exist\" does not exist
LINE 1: select * from this_table_does_not_exist;
                      ^
",
        conn.error_message()
    );

    assert_eq!(res.status(), ExecStatusType_PGRES_FATAL_ERROR);
}