Realtime API
PocketBase Realtime API 基于 WebSocket,允许客户端订阅集合或特定记录的变更事件,实现实时数据同步功能。
客户端 <--WebSocket--> PocketBase Server <--> SQLite Database | +-- 事件推送 (create/update/delete)当记录发生变化时,服务器会主动推送事件给所有订阅的客户端。
订阅整个集合
Section titled “订阅整个集合”// 订阅 posts 集合的所有变更pb.collection("posts").subscribe("*", function (e) { console.log(e.action); // 'create', 'update', 'delete' console.log(e.record); // 变更的记录数据});订阅特定记录
Section titled “订阅特定记录”// 只订阅某篇文章的变更pb.collection("posts").subscribe("record_id_here", function (e) { console.log("Record changed:", e.record);});订阅多条记录
Section titled “订阅多条记录”// 订阅多条特定记录(用逗号分隔)pb.collection("posts").subscribe("id1,id2,id3", function (e) { console.log("One of the watched records changed");});// 保存订阅引用const subscription = pb.collection("posts").subscribe("*", function (e) { console.log(e);});
// 取消订阅subscription.unsubscribe();// 或pb.collection("posts").unsubscribe("*");{ action: 'create' | 'update' | 'delete', record: { id: string; collectionId: string; collectionName: string; // ... 其他字段 }}create 事件
Section titled “create 事件”新记录创建时触发:
pb.collection("comments").subscribe("*", function (e) { if (e.action === "create") { console.log("New comment:", e.record); // 在列表中添加新评论 appendComment(e.record); }});update 事件
Section titled “update 事件”记录更新时触发:
pb.collection("posts").subscribe("*", function (e) { if (e.action === "update") { console.log("Post updated:", e.record); // 更新本地缓存或 UI updatePostInList(e.record); }});delete 事件
Section titled “delete 事件”记录删除时触发:
pb.collection("posts").subscribe("*", function (e) { if (e.action === "delete") { console.log("Post deleted:", e.record); // 从列表中移除 removePostFromList(e.record.id); }});SDK 使用示例
Section titled “SDK 使用示例”JavaScript SDK
Section titled “JavaScript SDK”import PocketBase from "pocketbase";
const pb = new PocketBase("http://127.0.0.1:8090");
// 基本订阅pb.collection("posts").subscribe("*", (e) => { switch (e.action) { case "create": console.log("New post created"); break; case "update": console.log("Post updated"); break; case "delete": console.log("Post deleted"); break; }});
// 订阅并处理错误pb.collection("posts") .subscribe( "*", (e) => { handleEvent(e); }, { // 自定义选项 expand: "author,category", }, ) .catch((err) => { console.error("Subscription failed:", err); });Vue 3 Composition API
Section titled “Vue 3 Composition API”import { PocketBase } from "pocketbase";import { onMounted, onUnmounted, ref } from "vue";
const pb = new PocketBase("http://127.0.0.1:8090");
export function useRealtime(collection) { const records = ref([]); let subscription = null;
onMounted(async () => { // 初始加载 const result = await pb.collection(collection).getList(1, 50); records.value = result.items;
// 订阅变更 subscription = pb.collection(collection).subscribe("*", (e) => { switch (e.action) { case "create": records.value.unshift(e.record); break; case "update": const idx = records.value.findIndex((r) => r.id === e.record.id); if (idx !== -1) records.value[idx] = e.record; break; case "delete": records.value = records.value.filter((r) => r.id !== e.record.id); break; } }); });
onUnmounted(() => { subscription?.unsubscribe(); });
return { records };}React Hooks
Section titled “React Hooks”import { PocketBase } from "pocketbase";import { useEffect, useState } from "react";
const pb = new PocketBase("http://127.0.0.1:8090");
export function useRealtime(collection) { const [records, setRecords] = useState([]);
useEffect(() => { let mounted = true; let subscription = null;
// 初始加载 pb.collection(collection) .getList(1, 50) .then((result) => { if (mounted) setRecords(result.items); });
// 订阅变更 pb.collection(collection) .subscribe("*", (e) => { if (!mounted) return;
setRecords((prev) => { switch (e.action) { case "create": return [e.record, ...prev]; case "update": return prev.map((r) => (r.id === e.record.id ? e.record : r)); case "delete": return prev.filter((r) => r.id !== e.record.id); default: return prev; } }); }) .then((sub) => { subscription = sub; });
return () => { mounted = false; subscription?.unsubscribe(); }; }, [collection]);
return records;}Vue 3 示例(组件内)
Section titled “Vue 3 示例(组件内)”<script setup>import { PocketBase } from "pocketbase";import { onMounted, onUnmounted, ref } from "vue";
const pb = new PocketBase("http://127.0.0.1:8090");const posts = ref([]);let subscription = null;
onMounted(async () => { // 获取初始数据 const result = await pb.collection("posts").getList(1, 20); posts.value = result.items;
// 订阅实时更新 subscription = pb.collection("posts").subscribe("*", (e) => { switch (e.action) { case "create": posts.value.unshift(e.record); break; case "update": const index = posts.value.findIndex((p) => p.id === e.record.id); if (index !== -1) { posts.value[index] = e.record; } break; case "delete": posts.value = posts.value.filter((p) => p.id !== e.record.id); break; } });});
onUnmounted(() => { subscription?.unsubscribe();});</script>
<template> <div> <h1>Posts ({{ posts.length }})</h1> <div v-for="post in posts" :key="post.id"> <h3>{{ post.title }}</h3> <p>{{ post.content }}</p> </div> </div></template>PocketBase JS SDK 内置自动重连机制:
const pb = new PocketBase("http://127.0.0.1:8090");
// SDK 会自动处理断线重连// 可通过以下方式监听连接状态
pb.realtime.subscribe("connect", () => { console.log("WebSocket connected");});
pb.realtime.subscribe("disconnect", () => { console.log("WebSocket disconnected");});自定义重连策略
Section titled “自定义重连策略”class ReconnectingPocketBase extends PocketBase { constructor(url) { super(url); this.reconnectAttempts = 0; this.maxReconnectAttempts = 5; this.reconnectDelay = 1000; }
async subscribeWithRetry(collection, callback) { const subscribeFunc = async () => { try { await this.collection(collection).subscribe("*", callback); this.reconnectAttempts = 0; } catch (err) { this.reconnectAttempts++; if (this.reconnectAttempts <= this.maxReconnectAttempts) { const delay = this.reconnectDelay * Math.pow(2, this.reconnectAttempts - 1); console.log(`Reconnecting in ${delay}ms...`); setTimeout(subscribeFunc, delay); } else { console.error("Max reconnection attempts reached"); throw err; } } };
subscribeFunc(); }}let heartbeatInterval;
function startHeartbeat() { heartbeatInterval = setInterval(() => { if (pb.realtime.isConnected()) { // 发送 ping 检查连接 pb.collection("_health").subscribe("*", () => {}, { timeout: 5000, }); } }, 30000); // 每 30 秒}
function stopHeartbeat() { clearInterval(heartbeatInterval);}生产环境配置
Section titled “生产环境配置”Nginx 反向代理
Section titled “Nginx 反向代理”server { listen 443 ssl http2; server_name your-domain.com;
ssl_certificate /path/to/cert.pem; ssl_certificate_key /path/to/key.pem;
# WebSocket 支持 location / { proxy_pass http://127.0.0.1:8090; proxy_http_version 1.1;
# WebSocket 头部 proxy_set_header Upgrade $http_upgrade; proxy_set_header Connection "upgrade"; proxy_set_header Host $host; proxy_set_header X-Real-IP $remote_addr; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; proxy_set_header X-Forwarded-Proto $scheme;
# 超时设置 proxy_connect_timeout 7d; proxy_send_timeout 7d; proxy_read_timeout 7d; }}Caddy 反向代理
Section titled “Caddy 反向代理”your-domain.com { reverse_proxy 127.0.0.1:8090 { # WebSocket 自动支持 }}阿里云 ECS 配置
Section titled “阿里云 ECS 配置”如果使用阿里云 ECS,需要在安全组中开放 WebSocket 端口:
- 登录阿里云控制台
- 进入 ECS 实例 -> 安全组
- 添加入方向规则:
- 端口范围:8090/tcp
- 授权对象:0.0.0.0/0
1. 避免重复订阅
Section titled “1. 避免重复订阅”// 错误:每次组件渲染都创建新订阅useEffect(() => { pb.collection("posts").subscribe("*", handler);}, [posts]); // posts 变化时会重复订阅
// 正确:只在挂载时订阅一次useEffect(() => { const sub = pb.collection("posts").subscribe("*", handler); return () => sub.unsubscribe();}, []);2. 限制订阅范围
Section titled “2. 限制订阅范围”// 只订阅当前用户相关的内容pb.collection("notifications").subscribe(`userId='${currentUserId}'`, handler);
// 订阅特定聊天pb.collection("messages").subscribe(`chatId='${currentChatId}'`, handler);3. 处理订阅失败
Section titled “3. 处理订阅失败”try { await pb.collection("posts").subscribe("*", handler);} catch (err) { if (err.status === 403) { console.error("Permission denied for subscription"); // 降级到轮询 setInterval(() => fetchUpdates(), 5000); } else { console.error("Subscription failed:", err); }}4. 服务端事件限制
Section titled “4. 服务端事件限制”PocketBase 会根据集合规则过滤订阅事件:
// 如果用户只能查看自己的 posts// 则只会收到自己 posts 的变更事件pb.collection("posts").subscribe("*", handler);// 不会收到其他用户的 posts 变更5. 批量处理事件
Section titled “5. 批量处理事件”let eventQueue = [];let processing = false;
async function processQueue() { if (processing) return; processing = true;
while (eventQueue.length > 0) { const batch = eventQueue.splice(0, 10); await processBatch(batch); }
processing = false;}
pb.collection("posts").subscribe("*", (e) => { eventQueue.push(e); processQueue();});Q: WebSocket 连接失败
Section titled “Q: WebSocket 连接失败”检查清单:
- 反向代理是否支持 WebSocket
- SSL 证书是否有效
- 防火墙是否允许 WebSocket 端口
- 服务器是否允许并发连接
调试方法:
pb.realtime.on("connect", () => console.log("Connected"));pb.realtime.on("disconnect", () => console.log("Disconnected"));pb.realtime.on("error", (err) => console.error("Error:", err));Q: 如何实现”在线用户”功能?
Section titled “Q: 如何实现”在线用户”功能?”// 用户上线时创建 presence 记录async function setOnline(userId) { await pb.collection("presence").create({ userId: userId, lastSeen: new Date().toISOString(), });}
// 定期更新setInterval(() => setOnline(currentUserId), 30000);
// 订阅在线状态变化pb.collection("presence").subscribe("*", (e) => { if (e.action === "create" || e.action === "update") { updateUserOnlineStatus(e.record.userId, true); }});Q: 如何处理实时通知?
Section titled “Q: 如何处理实时通知?”// 订阅当前用户的通知pb.collection("notifications").subscribe( `userId='${currentUserId}' && read=false`, (e) => { if (e.action === "create") { showNotification(e.record); playNotificationSound(); } },);Q: 如何实现协作编辑?
Section titled “Q: 如何实现协作编辑?”// 使用操作转换或 CRDT 算法pb.collection("documents").subscribe(docId, (e) => { if (e.action === "update") { const remoteOps = e.record.pendingOps || []; remoteOps.forEach((op) => applyOperation(op)); }});
async function sendLocalOp(op) { await pb.collection("documents").update(docId, { pendingOps: [op], });}- 验证订阅权限:确保集合规则正确限制用户只能订阅有权限访问的数据
- 敏感数据过滤:使用
fields参数限制订阅事件返回的字段 - 限流保护:对订阅频率进行限制,防止滥用
- 监控连接数:监控活跃 WebSocket 连接,防止资源耗尽