Push database events to applications in real time. LISTEN/NOTIFY, WebSocket live queries at 29.2M+ notifications/sec, CDC streaming, continuous streaming views, and Natural Language SQL.
LISTEN/NOTIFY is the standard PostgreSQL-wire pub/sub mechanism. Sessions subscribe to named channels; any session can publish a message to a channel. All subscribers receive the message asynchronously. Absolute DB is wire-compatible with the PostgreSQL NotificationResponse message format, so any library that handles PG notifications works without modification.
| Property | Value |
|---|---|
| Max payload size | 8,000 bytes |
| Per-session notification queue | 256 entries |
| Channel name max length | 63 characters |
| Wire format | PostgreSQL NotificationResponse ('A' message) |
| Fan-out | All sessions subscribed to a channel receive the message |
-- Session A: subscribe to a channel
LISTEN order_updates;
-- Session B: publish a notification to all listeners
NOTIFY order_updates, '{"order_id": 12345, "status": "shipped"}';
-- Session A receives asynchronously:
-- Notification: channel "order_updates"
-- payload: {"order_id": 12345, "status": "shipped"}
-- Trigger-based notifications (auto-notify on table change)
CREATE OR REPLACE FUNCTION notify_order_change()
RETURNS TRIGGER AS $$
BEGIN
PERFORM pg_notify('order_updates',
json_build_object(
'op', TG_OP,
'order_id', NEW.id,
'status', NEW.status
)::text
);
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER trg_order_notify
AFTER INSERT OR UPDATE ON orders
FOR EACH ROW EXECUTE FUNCTION notify_order_change();
-- Unsubscribe
UNLISTEN order_updates;
UNLISTEN *; -- unsubscribe from all channels
import psycopg2, select
conn = psycopg2.connect("postgresql://absdb:pw@localhost:5433/mydb")
conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
cur = conn.cursor()
cur.execute("LISTEN order_updates;")
print("Waiting for notifications...")
while True:
select.select([conn], [], [])
conn.poll()
while conn.notifies:
n = conn.notifies.pop(0)
print(f"Got notification: channel={n.channel} payload={n.payload}")
const { Client } = require('pg');
const client = new Client({ host: 'localhost', port: 5433, database: 'mydb' });
await client.connect();
client.on('notification', (msg) => {
console.log('Channel:', msg.channel);
console.log('Payload:', JSON.parse(msg.payload));
});
await client.query('LISTEN order_updates');
Absolute DB exposes a WebSocket endpoint at ws://host:8080/ws (or wss:// with TLS). Clients send SQL queries and receive result-set change events in real time as the underlying data changes. Throughput is 29.2 million+ notifications/sec on a single server via a lock-free write ring.
const ws = new WebSocket('wss://mydb.example.com:8080/ws');
ws.onopen = () => {
// Subscribe to live results of a query
ws.send(JSON.stringify({
type: 'subscribe',
query: "SELECT id, status, total FROM orders WHERE status = 'pending'",
id: 'pending-orders'
}));
};
ws.onmessage = (event) => {
const msg = JSON.parse(event.data);
if (msg.type === 'data') {
console.log('Initial results:', msg.rows);
} else if (msg.type === 'change') {
// op: 'insert' | 'update' | 'delete'
console.log(`Row ${msg.op}:`, msg.row);
updateUI(msg.op, msg.row);
}
};
// Unsubscribe
ws.send(JSON.stringify({ type: 'unsubscribe', id: 'pending-orders' }));
Change Data Capture (CDC) taps the Write-Ahead Log and delivers row-level change events (INSERT, UPDATE, DELETE) in Debezium-compatible JSON or Protobuf binary format. Delivery is via WebSocket or gRPC bidirectional streaming. A 100 MB ring buffer decouples producer and consumer speeds.
-- Subscribe to all changes on a table
SUBSCRIBE TO TABLE orders;
-- Subscribe with a predicate filter
SUBSCRIBE TO TABLE orders
WHERE status IN ('pending', 'processing');
-- Subscribe to all changes in a database from a specific LSN
SUBSCRIBE TO DATABASE mydb
STARTING AT LSN 12345;
-- CDC cursor management
SELECT * FROM _cdc_cursors;
-- Acknowledge processed events (advance cursor)
SELECT absdb_cdc_ack('cursor_id', lsn => 99999);
{
"op": "u",
"ts_ms": 1743897600000,
"source": {
"db": "mydb",
"table": "orders",
"lsn": 98765
},
"before": { "id": 42, "status": "pending", "total": 199.99 },
"after": { "id": 42, "status": "shipped", "total": 199.99 }
}
| Property | Value |
|---|---|
| Ring buffer size | 100 MB per subscription |
| Delivery semantics | At-least-once with ACK cursor |
| Output formats | Debezium JSON, Protobuf binary |
| Delivery channels | WebSocket (ws://host:8080/cdc), gRPC bidirectional |
Streaming views continuously evaluate a SELECT query against an event stream. Each view maintains a 65,536-event ring buffer and a background processor thread. Results are delivered to subscribers via NOTIFY, a result table, or WebSocket — within milliseconds of new events arriving.
Streaming views support sliding windows, tumbling windows, and watermark-based out-of-order event handling. Late-arriving events within the watermark tolerance are included in the correct window.
-- Create a streaming view: 1-minute sliding window on order totals
CREATE STREAMING VIEW order_totals_1m AS
SELECT
date_trunc('minute', created_at) AS minute,
count(*) AS order_count,
sum(total) AS total_revenue
FROM orders
WHERE created_at >= now() - INTERVAL '1 minute'
GROUP BY 1;
-- Create a streaming view with out-of-order watermark (5 seconds)
CREATE STREAMING VIEW sensor_avg AS
SELECT
device_id,
avg(value) AS avg_value,
window_start,
window_end
FROM sensor_readings
TUMBLING WINDOW '30 seconds'
WATERMARK '5 seconds'
GROUP BY device_id, window_start, window_end;
-- List all active streaming views
SHOW STREAMING VIEWS;
-- Drop a streaming view
DROP STREAMING VIEW order_totals_1m;
-- Subscribe to a streaming view via NOTIFY
LISTEN streaming_view_order_totals_1m;
| Property | Value |
|---|---|
| Ring buffer per view | 65,536 events |
| Processor | Background thread, one per view |
| Window types | Sliding, tumbling (configurable interval) |
| Out-of-order handling | Watermark (configurable tolerance) |
| Delivery modes | NOTIFY channel, result table, WebSocket |
| Max concurrent views | 65,536 per server |
The NL2SQL() SQL function translates a plain English question into a SQL query using a schema-aware rule engine. It runs entirely in-database — no external LLM API call required. The rule engine recognises common patterns: aggregation queries, top-N ranking, JOIN conditions, date phrases, comparison operators, ORDER BY, and more.
-- Convert a natural language question to SQL
SELECT NL2SQL(
question => 'What are the top 10 customers by total order value this month?',
schema_context => 'orders(id, customer_id, total, created_at), customers(id, name, email)'
);
-- Result: generated SQL query as text
-- SELECT c.name, sum(o.total) AS total_value
-- FROM orders o JOIN customers c ON c.id = o.customer_id
-- WHERE o.created_at >= date_trunc('month', now())
-- GROUP BY c.id, c.name
-- ORDER BY total_value DESC
-- LIMIT 10
-- Execute the NL2SQL result directly
EXECUTE IMMEDIATE NL2SQL(
question => 'How many orders were placed yesterday?',
schema_context => 'orders(id, created_at, status)'
);
For production AI applications, NL2SQL can be combined with an ONNX model backend via the ML serving layer for more sophisticated natural language understanding.
Rails ActionCable uses PostgreSQL LISTEN/NOTIFY as its pub/sub adapter. Point ActionCable at Absolute DB's PostgreSQL wire port and it works without modification.
production:
adapter: postgresql
url: postgresql://absdb:password@localhost:5433/myapp_production
Django Channels supports PostgreSQL as a channel layer backend. Use the channels_postgres package with Absolute DB's PostgreSQL wire port.
CHANNEL_LAYERS = {
"default": {
"BACKEND": "channels_postgres.db.DatabaseChannelLayer",
"CONFIG": {
"ENGINE": "django.db.backends.postgresql",
"NAME": "myapp",
"USER": "absdb",
"PASSWORD": "password",
"HOST": "localhost",
"PORT": "5433",
},
},
}
| Statement / Function | Description |
|---|---|
LISTEN channel | Subscribe to a notification channel |
UNLISTEN channel | Unsubscribe from a channel (UNLISTEN * = all) |
NOTIFY channel [, payload] | Send a notification to all channel subscribers |
pg_notify(channel, payload) | Function form of NOTIFY (usable in triggers) |
SUBSCRIBE TO TABLE t [WHERE ...] | Start a CDC subscription on a table |
SUBSCRIBE TO DATABASE d [STARTING AT LSN n] | Start a CDC subscription on a whole database |
CREATE STREAMING VIEW name AS SELECT ... | Create a continuously evaluated streaming view |
SHOW STREAMING VIEWS | List all active streaming views with status |
DROP STREAMING VIEW name | Stop and remove a streaming view |
NL2SQL(question, schema_context) | Convert a natural language question to SQL |
EXECUTE IMMEDIATE sql_text | Parse and execute a dynamically generated SQL string |
~154 KB binary · zero external dependencies · 2,737 tests passing · SQL:2023 100%