跳转到内容

Realtime API

PocketBase Realtime API 基于 WebSocket,允许客户端订阅集合或特定记录的变更事件,实现实时数据同步功能。

客户端 <--WebSocket--> PocketBase Server <--> SQLite Database
|
+-- 事件推送 (create/update/delete)

当记录发生变化时,服务器会主动推送事件给所有订阅的客户端。

// 订阅 posts 集合的所有变更
pb.collection("posts").subscribe("*", function (e) {
console.log(e.action); // 'create', 'update', 'delete'
console.log(e.record); // 变更的记录数据
});
// 只订阅某篇文章的变更
pb.collection("posts").subscribe("record_id_here", function (e) {
console.log("Record changed:", e.record);
});
// 订阅多条特定记录(用逗号分隔)
pb.collection("posts").subscribe("id1,id2,id3", function (e) {
console.log("One of the watched records changed");
});
// 保存订阅引用
const subscription = pb.collection("posts").subscribe("*", function (e) {
console.log(e);
});
// 取消订阅
subscription.unsubscribe();
// 或
pb.collection("posts").unsubscribe("*");
{
action: 'create' | 'update' | 'delete',
record: {
id: string;
collectionId: string;
collectionName: string;
// ... 其他字段
}
}

新记录创建时触发:

pb.collection("comments").subscribe("*", function (e) {
if (e.action === "create") {
console.log("New comment:", e.record);
// 在列表中添加新评论
appendComment(e.record);
}
});

记录更新时触发:

pb.collection("posts").subscribe("*", function (e) {
if (e.action === "update") {
console.log("Post updated:", e.record);
// 更新本地缓存或 UI
updatePostInList(e.record);
}
});

记录删除时触发:

pb.collection("posts").subscribe("*", function (e) {
if (e.action === "delete") {
console.log("Post deleted:", e.record);
// 从列表中移除
removePostFromList(e.record.id);
}
});
import PocketBase from "pocketbase";
const pb = new PocketBase("http://127.0.0.1:8090");
// 基本订阅
pb.collection("posts").subscribe("*", (e) => {
switch (e.action) {
case "create":
console.log("New post created");
break;
case "update":
console.log("Post updated");
break;
case "delete":
console.log("Post deleted");
break;
}
});
// 订阅并处理错误
pb.collection("posts")
.subscribe(
"*",
(e) => {
handleEvent(e);
},
{
// 自定义选项
expand: "author,category",
},
)
.catch((err) => {
console.error("Subscription failed:", err);
});
import { PocketBase } from "pocketbase";
import { onMounted, onUnmounted, ref } from "vue";
const pb = new PocketBase("http://127.0.0.1:8090");
export function useRealtime(collection) {
const records = ref([]);
let subscription = null;
onMounted(async () => {
// 初始加载
const result = await pb.collection(collection).getList(1, 50);
records.value = result.items;
// 订阅变更
subscription = pb.collection(collection).subscribe("*", (e) => {
switch (e.action) {
case "create":
records.value.unshift(e.record);
break;
case "update":
const idx = records.value.findIndex((r) => r.id === e.record.id);
if (idx !== -1) records.value[idx] = e.record;
break;
case "delete":
records.value = records.value.filter((r) => r.id !== e.record.id);
break;
}
});
});
onUnmounted(() => {
subscription?.unsubscribe();
});
return { records };
}
import { PocketBase } from "pocketbase";
import { useEffect, useState } from "react";
const pb = new PocketBase("http://127.0.0.1:8090");
export function useRealtime(collection) {
const [records, setRecords] = useState([]);
useEffect(() => {
let mounted = true;
let subscription = null;
// 初始加载
pb.collection(collection)
.getList(1, 50)
.then((result) => {
if (mounted) setRecords(result.items);
});
// 订阅变更
pb.collection(collection)
.subscribe("*", (e) => {
if (!mounted) return;
setRecords((prev) => {
switch (e.action) {
case "create":
return [e.record, ...prev];
case "update":
return prev.map((r) => (r.id === e.record.id ? e.record : r));
case "delete":
return prev.filter((r) => r.id !== e.record.id);
default:
return prev;
}
});
})
.then((sub) => {
subscription = sub;
});
return () => {
mounted = false;
subscription?.unsubscribe();
};
}, [collection]);
return records;
}
<script setup>
import { PocketBase } from "pocketbase";
import { onMounted, onUnmounted, ref } from "vue";
const pb = new PocketBase("http://127.0.0.1:8090");
const posts = ref([]);
let subscription = null;
onMounted(async () => {
// 获取初始数据
const result = await pb.collection("posts").getList(1, 20);
posts.value = result.items;
// 订阅实时更新
subscription = pb.collection("posts").subscribe("*", (e) => {
switch (e.action) {
case "create":
posts.value.unshift(e.record);
break;
case "update":
const index = posts.value.findIndex((p) => p.id === e.record.id);
if (index !== -1) {
posts.value[index] = e.record;
}
break;
case "delete":
posts.value = posts.value.filter((p) => p.id !== e.record.id);
break;
}
});
});
onUnmounted(() => {
subscription?.unsubscribe();
});
</script>
<template>
<div>
<h1>Posts ({{ posts.length }})</h1>
<div v-for="post in posts" :key="post.id">
<h3>{{ post.title }}</h3>
<p>{{ post.content }}</p>
</div>
</div>
</template>

PocketBase JS SDK 内置自动重连机制:

const pb = new PocketBase("http://127.0.0.1:8090");
// SDK 会自动处理断线重连
// 可通过以下方式监听连接状态
pb.realtime.subscribe("connect", () => {
console.log("WebSocket connected");
});
pb.realtime.subscribe("disconnect", () => {
console.log("WebSocket disconnected");
});
class ReconnectingPocketBase extends PocketBase {
constructor(url) {
super(url);
this.reconnectAttempts = 0;
this.maxReconnectAttempts = 5;
this.reconnectDelay = 1000;
}
async subscribeWithRetry(collection, callback) {
const subscribeFunc = async () => {
try {
await this.collection(collection).subscribe("*", callback);
this.reconnectAttempts = 0;
} catch (err) {
this.reconnectAttempts++;
if (this.reconnectAttempts <= this.maxReconnectAttempts) {
const delay =
this.reconnectDelay * Math.pow(2, this.reconnectAttempts - 1);
console.log(`Reconnecting in ${delay}ms...`);
setTimeout(subscribeFunc, delay);
} else {
console.error("Max reconnection attempts reached");
throw err;
}
}
};
subscribeFunc();
}
}
let heartbeatInterval;
function startHeartbeat() {
heartbeatInterval = setInterval(() => {
if (pb.realtime.isConnected()) {
// 发送 ping 检查连接
pb.collection("_health").subscribe("*", () => {}, {
timeout: 5000,
});
}
}, 30000); // 每 30 秒
}
function stopHeartbeat() {
clearInterval(heartbeatInterval);
}
server {
listen 443 ssl http2;
server_name your-domain.com;
ssl_certificate /path/to/cert.pem;
ssl_certificate_key /path/to/key.pem;
# WebSocket 支持
location / {
proxy_pass http://127.0.0.1:8090;
proxy_http_version 1.1;
# WebSocket 头部
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
# 超时设置
proxy_connect_timeout 7d;
proxy_send_timeout 7d;
proxy_read_timeout 7d;
}
}
your-domain.com {
reverse_proxy 127.0.0.1:8090 {
# WebSocket 自动支持
}
}

如果使用阿里云 ECS,需要在安全组中开放 WebSocket 端口:

  1. 登录阿里云控制台
  2. 进入 ECS 实例 -> 安全组
  3. 添加入方向规则:
    • 端口范围:8090/tcp
    • 授权对象:0.0.0.0/0
// 错误:每次组件渲染都创建新订阅
useEffect(() => {
pb.collection("posts").subscribe("*", handler);
}, [posts]); // posts 变化时会重复订阅
// 正确:只在挂载时订阅一次
useEffect(() => {
const sub = pb.collection("posts").subscribe("*", handler);
return () => sub.unsubscribe();
}, []);
// 只订阅当前用户相关的内容
pb.collection("notifications").subscribe(`userId='${currentUserId}'`, handler);
// 订阅特定聊天
pb.collection("messages").subscribe(`chatId='${currentChatId}'`, handler);
try {
await pb.collection("posts").subscribe("*", handler);
} catch (err) {
if (err.status === 403) {
console.error("Permission denied for subscription");
// 降级到轮询
setInterval(() => fetchUpdates(), 5000);
} else {
console.error("Subscription failed:", err);
}
}

PocketBase 会根据集合规则过滤订阅事件:

// 如果用户只能查看自己的 posts
// 则只会收到自己 posts 的变更事件
pb.collection("posts").subscribe("*", handler);
// 不会收到其他用户的 posts 变更
let eventQueue = [];
let processing = false;
async function processQueue() {
if (processing) return;
processing = true;
while (eventQueue.length > 0) {
const batch = eventQueue.splice(0, 10);
await processBatch(batch);
}
processing = false;
}
pb.collection("posts").subscribe("*", (e) => {
eventQueue.push(e);
processQueue();
});

检查清单:

  1. 反向代理是否支持 WebSocket
  2. SSL 证书是否有效
  3. 防火墙是否允许 WebSocket 端口
  4. 服务器是否允许并发连接

调试方法:

pb.realtime.on("connect", () => console.log("Connected"));
pb.realtime.on("disconnect", () => console.log("Disconnected"));
pb.realtime.on("error", (err) => console.error("Error:", err));

Q: 如何实现”在线用户”功能?

Section titled “Q: 如何实现”在线用户”功能?”
// 用户上线时创建 presence 记录
async function setOnline(userId) {
await pb.collection("presence").create({
userId: userId,
lastSeen: new Date().toISOString(),
});
}
// 定期更新
setInterval(() => setOnline(currentUserId), 30000);
// 订阅在线状态变化
pb.collection("presence").subscribe("*", (e) => {
if (e.action === "create" || e.action === "update") {
updateUserOnlineStatus(e.record.userId, true);
}
});
// 订阅当前用户的通知
pb.collection("notifications").subscribe(
`userId='${currentUserId}' && read=false`,
(e) => {
if (e.action === "create") {
showNotification(e.record);
playNotificationSound();
}
},
);
// 使用操作转换或 CRDT 算法
pb.collection("documents").subscribe(docId, (e) => {
if (e.action === "update") {
const remoteOps = e.record.pendingOps || [];
remoteOps.forEach((op) => applyOperation(op));
}
});
async function sendLocalOp(op) {
await pb.collection("documents").update(docId, {
pendingOps: [op],
});
}
  1. 验证订阅权限:确保集合规则正确限制用户只能订阅有权限访问的数据
  2. 敏感数据过滤:使用 fields 参数限制订阅事件返回的字段
  3. 限流保护:对订阅频率进行限制,防止滥用
  4. 监控连接数:监控活跃 WebSocket 连接,防止资源耗尽