Combinando RocksDB con PostgreSQL para construir un sistema de mensajería en tiempo real (rápido y “liviano”)

Cuando diseñé mi servicio de mensajería entendí rápido que el cuello de botella iba a ser el almacenamiento. Un chat activo implica ráfagas de escrituras, lecturas frecuentes, contadores de no leídos, listados de conversaciones por usuario y reglas relacionales (bloqueos, pertenencia). Si ponía todo en SQL, cada mensaje sería un INSERT
compitiendo por índices y locks; si ponía todo en un KV embebido, modelar relaciones y consultas declarativas se volvería engorroso. La solución fue separar el problema en dos planos: mensajes y metadatos volátiles en RocksDB (ruta caliente), y relaciones e integridad en PostgreSQL (ruta de razonamiento). Esta frontera me permite mantener el código comprensible, la latencia baja y el consumo de recursos contenido.
Elegí RocksDB porque escribir en secuencia y leer por prefijo es exactamente el patrón de un timeline. Defino claves con localidad por prefijo para iterar rápido los últimos mensajes de una conversación y guardo metadatos pequeños (no leídos por usuario y last_ts
) en un documento aparte. PostgreSQL queda para lo que necesita consistencia fuerte: la pertenencia usuario↔conversación y los bloqueos. Así, listar “qué conversaciones tiene este usuario” es una consulta barata en SQL, y recuperar “qué contienen” esas conversaciones es un acceso directo en KV.
Claves KV y esquema SQL que me funcionan
En el KV uso prefijos deterministas; en SQL mantengo tablas pequeñas y estables. Un detalle importante: fecho las claves con RFC3339Nano
en UTC porque ordena lexicográficamente por tiempo. Para evitar empates, añado un tie-breaker (por ejemplo, un ULID):
// Prefijos con localidad por conversación
convMsgsPrefix := fmt.Sprintf("conv/%s/msg/", convID) // conv/<uuid>/msg/
convMetaKey := fmt.Sprintf("conv/%s/meta", convID) // conv/<uuid>/meta
type KVMeta struct {
Unread map[string]int `json:"unread_by_user"`
LastTs string `json:"last_ts"`
}
type WireMessage struct {
ID string `json:"id"` // ULID o UUID
ConvID string `json:"conv_id"`
Sender string `json:"sender"`
Receiver string `json:"receiver"`
Content string `json:"content"`
TsRFC3339 string `json:"ts"`
}
// Clave de mensaje: conv/<uuid>/msg/<tsRFC3339Nano>_<ulid>
ts := time.Now().UTC().Format(time.RFC3339Nano)
id := newULID() // usa tu lib favorita
key := []byte(fmt.Sprintf("conv/%s/msg/%s_%s", convID, ts, id))
En PostgreSQL solo guardo lo relacional:
-- Pertenencia usuario<->conversación
CREATE TABLE messaging.conversations (
user_uuid UUID NOT NULL,
conversation_id UUID NOT NULL,
PRIMARY KEY (user_uuid, conversation_id)
);
-- Bloqueos
CREATE TABLE messaging.blocked_users (
user_uuid UUID NOT NULL,
blocked_user_uuid UUID NOT NULL,
PRIMARY KEY (user_uuid, blocked_user_uuid)
);
SQL resuelve qué conversaciones tiene un usuario; KV resuelve qué hay dentro sin castigar al RDBMS con la ruta caliente.
Latencia baja y huella de recursos mínima
Desde el principio lo diseñé para latencia baja y bajo consumo. No quiero round-trips de red al hot path: RocksDB vive embebido en el mismo proceso y sirve escrituras secuenciales y lecturas por prefijo en disco local. Eso elimina hops de red, reduce la varianza de latencia y evita cargar a PostgreSQL con operaciones que no son su fuerte.
En la práctica, puedo atender miles de WebSockets por proceso sin que el almacenamiento sea el cuello de botella. Go gestiona sockets y lógica; RocksDB maneja el caudal de mensajes con una LSM-tree (niveles + compaction) optimizada para append; PostgreSQL se mantiene pequeño y barato de escalar porque solo guarda relaciones.
Tuning minimalista en RocksDB (caché, bloom y prefijos)
Ajusto la caché (LRU), activo Bloom filters y, como mis claves tienen prefijo fijo (conv/<uuid>/msg/
), uso prefix extractor para que las lecturas por prefijo sean O(1) en la práctica. El prefijo fijo mide 46 bytes: "conv/"
(5) + <uuid>
(36) + "/msg/"
(5) = 46.
bbto := grocksdb.NewDefaultBlockBasedTableOptions()
bbto.SetBlockCache(grocksdb.NewLRUCache(128 << 20)) // ~128MB de caché LRU
bbto.SetFilterPolicy(grocksdb.NewBloomFilter(10)) // bloom por bloque
opts := grocksdb.NewDefaultOptions()
opts.SetCreateIfMissing(true)
opts.SetBlockBasedTableFactory(bbto)
// prefix extractor para conv/<uuid>/msg/
opts.SetPrefixExtractor(grocksdb.NewFixedPrefixTransform(46))
// Compaction por niveles (LSM) optimizada para append
opts.OptimizeLevelStyleCompaction(0)
db, err := grocksdb.OpenDb(opts, dataPath)
if err != nil { log.Fatal(err) }
defer db.Close()
Escritura: primero integridad, después velocidad (y el trade-off de Sync)
En la escritura fallo rápido en SQL (bloqueos) y aseguro pertenencia usuario↔conversación (idempotente). Luego hago un WriteBatch en KV para guardar mensaje + metadatos de un tirón. Sobre durabilidad: por defecto dejo Sync=false
para latencia mínima; si necesito más durabilidad por operación, activo wo.SetSync(true)
sabiendo que sube la latencia.
func Store(ctx context.Context, db *grocksdb.DB, pg *sql.DB, m WireMessage) error {
// 1) Reglas relacionales (fail fast)
var blocked bool
err := pg.QueryRowContext(ctx, `
SELECT EXISTS(
SELECT 1 FROM messaging.blocked_users
WHERE (user_uuid=$1 AND blocked_user_uuid=$2)
OR (user_uuid=$2 AND blocked_user_uuid=$1)
)`, m.Sender, m.Receiver).Scan(&blocked)
if err != nil || blocked { return fmt.Errorf("blocked or error: %w", err) }
// 2) Pertenencia idempotente
_, _ = pg.ExecContext(ctx, `
INSERT INTO messaging.conversations(user_uuid, conversation_id)
VALUES ($1,$2) ON CONFLICT DO NOTHING`, m.Sender, m.ConvID)
_, _ = pg.ExecContext(ctx, `
INSERT INTO messaging.conversations(user_uuid, conversation_id)
VALUES ($1,$2) ON CONFLICT DO NOTHING`, m.Receiver, m.ConvID)
// 3) Append en KV: mensaje + metadata en un batch
wb := grocksdb.NewWriteBatch(); defer wb.Destroy()
wo := grocksdb.NewDefaultWriteOptions(); defer wo.Destroy()
// wo.SetSync(true) // ↑durabilidad, ↑latencia (elige según tu SLO)
ts := time.Now().UTC().Format(time.RFC3339Nano)
id := newULID()
msgKey := []byte(fmt.Sprintf("conv/%s/msg/%s_%s", m.ConvID, ts, id))
val, _ := json.Marshal(m)
wb.Put(msgKey, val)
ro := grocksdb.NewDefaultReadOptions(); defer ro.Destroy()
ro.SetPrefixSameAsStart(true)
metaKey := []byte(fmt.Sprintf("conv/%s/meta", m.ConvID))
slice, _ := db.Get(ro, metaKey)
var meta KVMeta
if slice.Exists() { _ = json.Unmarshal(slice.Data(), &meta) }
if meta.Unread == nil { meta.Unread = map[string]int{} }
meta.Unread[m.Receiver]++
meta.LastTs = ts
b, _ := json.Marshal(meta)
wb.Put(metaKey, b)
return db.Write(wo, wb)
}
Lectura: SQL decide “qué”; KV entrega “cuánto” y “qué último”
Para la bandeja de entrada, primero pregunto a PostgreSQL qué conversaciones tiene el usuario. Con esa lista en mano, en KV recupero el último mensaje y el documento de metadatos. Uso iteración por prefijo y, para evitar recorrer todo el rango si la conversación es larga, fijo un upper bound exclusivo:
func LastMessageAndMeta(db *grocksdb.DB, convID string) (WireMessage, KVMeta, error) {
ro := grocksdb.NewDefaultReadOptions()
defer ro.Destroy()
ro.SetPrefixSameAsStart(true)
prefix := []byte(fmt.Sprintf("conv/%s/msg/", convID))
// upper bound = prefix + 0xFF (límite exclusivo inmediato)
upper := append(append([]byte{}, prefix...), 0xFF)
ro.SetIterateUpperBound(upper)
it := db.NewIterator(ro); defer it.Close()
var last WireMessage
for it.Seek(prefix); it.ValidForPrefix(prefix); it.Next() {
var tmp WireMessage
_ = json.Unmarshal(it.Value().Data(), &tmp)
last = tmp
}
metaKey := []byte(fmt.Sprintf("conv/%s/meta", convID))
slice, _ := db.Get(ro, metaKey)
var meta KVMeta
if slice.Exists() { _ = json.Unmarshal(slice.Data(), &meta) }
return last, meta, nil
}
La consulta previa en SQL es pequeña y estable (crece por conversación, no por mensaje):
SELECT conversation_id
FROM messaging.conversations
WHERE user_uuid = $1;
Marcar como leído: un RMW barato
Para marcar una conversación como leída, hago un read-modify-write sobre el documento de metadatos; no toco el cuerpo de mensajes:
func MarkAsRead(db *grocksdb.DB, convID, user string) error {
ro := grocksdb.NewDefaultReadOptions()
wo := grocksdb.NewDefaultWriteOptions()
defer ro.Destroy(); defer wo.Destroy()
k := []byte(fmt.Sprintf("conv/%s/meta", convID))
s, err := db.Get(ro, k); if err != nil || !s.Exists() { return nil }
var meta KVMeta
if err := json.Unmarshal(s.Data(), &meta); err != nil { return err }
if meta.Unread[user] > 0 { meta.Unread[user]-- }
b, _ := json.Marshal(&meta)
return db.Put(wo, k, b)
}
Consistencia entre SQL y KV: resiliencia práctica
¿Qué pasa si SQL “sucede” y KV falla (o al revés)? Trato las escrituras como at-least-once: si falla KV tras confirmar SQL, retorno error y no muestro la conversación hasta reintento. Un job de reconciliación periódico limpia vínculos huérfanos o reintenta puts fallidos (incluso puedes usar un outbox sencillo). Si esperas mucha concurrencia en una misma conversación, un lock por convID
alrededor del RMW de metadatos evita condiciones de carrera con coste mínimo.
Dónde gana cada motor (y por qué juntos son mejores)
RocksDB me da lo que necesito en el camino caliente: escritura secuencial, lectura por prefijo, caché y bloom ajustables y metadatos locales con coste predecible. PostgreSQL me da consistencia fuerte, constraints y consultas expresivas (EXISTS
, INTERSECT
) para razonar sobre el grafo usuario↔conversación y las reglas de negocio. Si usara solo KV, listar conversaciones por usuario sería incómodo; si usara solo SQL, las ráfagas de mensajes y los índices me pasarían factura. Separando responsabilidades, cada motor hace únicamente lo que mejor sabe hacer y ninguno estorba al otro.
Para cerrar este tema
Diseñé el sistema para que SQL decida y KV ejecute. PostgreSQL mantiene la verdad sobre quién puede hablar con quién y a qué conversaciones pertenece cada usuario; RocksDB guarda el caudal de mensajes y los contadores con la mínima latencia posible. Ese equilibrio me permite correr rápido y “liviano” (sin una “BD grande” en el hot path), escalar horizontalmente sin convertir la base relacional en cuello de botella y mantener una ruta caliente simple, predecible y muy rápida.
Estoy seguro que todavía existen algunos retos que resolver y que no he llegado a ellos o ellos a mi, pero creo que este es un buen punto de partida para mi aplicación.
Happy coding! :D
Written with StackEdit.