This commit is contained in:
Rizky 2023-10-23 15:46:39 +07:00
parent 030abd91b0
commit 2f9000033e
25 changed files with 258 additions and 159 deletions

View File

@ -19,7 +19,8 @@ export const SyncActionDefinition = {
},
"yjs": {
"sv_local": "12",
"diff_local": "13"
"diff_local": "13",
"sv_remote": "14"
}
};
export const SyncActionPaths = {
@ -36,5 +37,6 @@ export const SyncActionPaths = {
"10": "page.list",
"11": "page.load",
"12": "yjs.sv_local",
"13": "yjs.diff_local"
"13": "yjs.diff_local",
"14": "yjs.sv_remote"
};

View File

@ -43,5 +43,11 @@ export const SyncActions = {
id: string,
bin: Uint8Array
) => {},
sv_remote: async (
mode: "page" | "comp",
id: string,
sv: Uint8Array,
diff: Uint8Array
) => ({}) as { diff: Uint8Array } | void,
},
};

View File

@ -2,9 +2,9 @@ import { syncronize } from "y-pojo";
import { docs } from "../entity/docs";
import { snapshot } from "../entity/snapshot";
import { gzipAsync } from "../entity/zlib";
import { ActionCtx } from "../type";
import { SyncConnection } from "../type";
export const comp_load = async function (this: ActionCtx, id: string) {
export const comp_load = async function (this: SyncConnection, id: string) {
let snap = snapshot.get("comp", id);
let ydoc = docs.comp[id];

View File

@ -1,3 +1,3 @@
import { ActionCtx } from "../type";
import { SyncConnection } from "../type";
export const comp_redo = async function (this: ActionCtx, id: string) {};
export const comp_redo = async function (this: SyncConnection, id: string) {};

View File

@ -1,3 +1,3 @@
import { ActionCtx } from "../type";
import { SyncConnection } from "../type";
export const comp_undo = async function (this: ActionCtx, id: string) {};
export const comp_undo = async function (this: SyncConnection, id: string) {};

View File

@ -8,3 +8,4 @@ export * from "./comp_undo";
export * from "./comp_redo";
export * from "./yjs_sv_local";
export * from "./yjs_diff_local";
export * from "./yjs_sv_remote";

View File

@ -2,16 +2,49 @@ import { syncronize } from "y-pojo";
import { SAction } from "../actions";
import { Y, docs } from "../entity/docs";
import { snapshot } from "../entity/snapshot";
import { user } from "../entity/user";
import { gzipAsync } from "../entity/zlib";
import { ActionCtx } from "../type";
import { SyncConnection, SyncType } from "../type";
import { conns } from "../entity/conn";
import { sendWS } from "../sync-handler";
export const page_load: SAction["page"]["load"] = async function (
this: ActionCtx,
this: SyncConnection,
id: string
) {
let snap = snapshot.get("page", id);
let ydoc = docs.page[id];
if (this.conf) this.conf.page_id = id;
const createUndoManager = (root: Y.Map<any>) => {
const um = new Y.UndoManager(root, { ignoreRemoteMapChanges: false });
return um;
};
const attachOnUpdate = (doc: Y.Doc, um: Y.UndoManager) => {
doc.on("update", async (update: Uint8Array, origin: any) => {
if (origin === um) {
} else {
const sv_local = await gzipAsync(update);
user.active.findAll({ page_id: id }).map((e) => {
if (e.client_id === origin) return;
const ws = conns.get(e.client_id)?.ws;
if (ws)
sendWS(ws, {
type: SyncType.Event,
event: "remote_svlocal",
data: { type: "page", sv_local, id },
});
});
}
});
};
const defaultActive = {
select: "" as "" | "comp" | "item" | "section" | "text",
};
if (!snap && !ydoc) {
const page = await db.page.findFirst({ where: { id } });
if (page) {
@ -19,7 +52,7 @@ export const page_load: SAction["page"]["load"] = async function (
let root = doc.getMap("map");
syncronize(root, { id, root: page.content_tree });
const um = new Y.UndoManager(root, { ignoreRemoteMapChanges: true });
const um = createUndoManager(root);
docs.page[id] = {
doc: doc as any,
id,
@ -27,6 +60,8 @@ export const page_load: SAction["page"]["load"] = async function (
};
const bin = Y.encodeStateAsUpdate(doc);
attachOnUpdate(doc, um);
snapshot.update({
bin,
id,
@ -34,6 +69,15 @@ export const page_load: SAction["page"]["load"] = async function (
name: page.name,
ts: Date.now(),
url: page.url,
id_site: page.id_site,
});
user.active.add({
...defaultActive,
client_id: this.client_id,
user_id: this.user_id,
site_id: page.id_site,
page_id: page.id,
});
return {
@ -48,13 +92,23 @@ export const page_load: SAction["page"]["load"] = async function (
Y.applyUpdate(doc, snap.bin);
let root = doc.getMap("map");
const um = new Y.UndoManager(root, { ignoreRemoteMapChanges: true });
const um = createUndoManager(root);
attachOnUpdate(doc, um);
docs.page[id] = {
doc: doc as any,
id,
um,
};
user.active.add({
...defaultActive,
client_id: this.client_id,
user_id: this.user_id,
site_id: snap.id_site,
page_id: snap.id,
});
return {
id: id,
url: snap.url,
@ -62,6 +116,14 @@ export const page_load: SAction["page"]["load"] = async function (
snapshot: await gzipAsync(snap.bin),
};
} else if (snap && ydoc) {
user.active.add({
...defaultActive,
client_id: this.client_id,
user_id: this.user_id,
site_id: snap.id_site,
page_id: snap.id,
});
return {
id: snap.id,
url: snap.url,

View File

@ -1,13 +1,14 @@
import { docs } from "../entity/docs";
import { ActionCtx } from "../type";
import { SyncConnection } from "../type";
export const page_redo = async function (this: ActionCtx, id: string) {
export const page_redo = async function (this: SyncConnection, id: string) {
if (!docs.page[id]) {
return;
}
const um = docs.page[id].um;
if (um.canRedo()) {
console.log("redoing");
um.redo();
}
};

View File

@ -1,13 +1,15 @@
import { docs } from "../entity/docs";
import { ActionCtx } from "../type";
import { SyncConnection } from "../type";
export const page_undo = async function (this: ActionCtx, id: string) {
export const page_undo = async function (this: SyncConnection, id: string) {
if (!docs.page[id]) {
return;
}
const um = docs.page[id].um;
if (um.canUndo()) {
console.log("undoing");
um.undo();
}
};

View File

@ -1,6 +1,6 @@
import { ActionCtx } from "../type";
import { SyncConnection } from "../type";
export const site_group = async function (this: ActionCtx) {
export const site_group = async function (this: SyncConnection) {
console.log(this.user);
return "gruop";
};

View File

@ -1,15 +1,17 @@
import { validate } from "uuid";
import { ActionCtx } from "../type";
import { ESite } from "../../../../web/src/render/ed/logic/ed-global";
import { SAction } from "../actions";
import { SyncConnection } from "../type";
export const site_load: SAction["site"]["load"] = async function (
this: ActionCtx,
this: SyncConnection,
id: string
) {
if (validate(id)) {
const site = await db.site.findFirst({ where: { id } });
if (site) {
if (this.conf) this.conf.site_id = site.id;
return {
id: site.id,
config: site.config as ESite["config"],

View File

@ -2,10 +2,10 @@ import { SAction } from "../actions";
import { Y, docs } from "../entity/docs";
import { snapshot } from "../entity/snapshot";
import { gunzipAsync } from "../entity/zlib";
import { ActionCtx } from "../type";
import { SyncConnection } from "../type";
export const yjs_diff_local: SAction["yjs"]["diff_local"] = async function (
this: ActionCtx,
this: SyncConnection,
mode,
id,
bin
@ -15,7 +15,10 @@ export const yjs_diff_local: SAction["yjs"]["diff_local"] = async function (
}
const doc = docs[mode][id].doc as Y.Doc;
const diff = await gunzipAsync(bin);
Y.applyUpdate(doc, diff);
const um = docs[mode][id].um;
um.addTrackedOrigin(this.client_id);
Y.applyUpdate(doc, diff, this.client_id);
const save = Y.encodeStateAsUpdate(doc);
snapshot.set(mode, id, "bin", save);

View File

@ -1,10 +1,10 @@
import { SAction } from "../actions";
import { Y, docs } from "../entity/docs";
import { gunzipAsync, gzipAsync } from "../entity/zlib";
import { ActionCtx } from "../type";
import { SyncConnection } from "../type";
export const yjs_sv_local: SAction["yjs"]["sv_local"] = async function (
this: ActionCtx,
this: SyncConnection,
mode,
id,
bin

View File

@ -0,0 +1,20 @@
import { SAction } from "../actions";
import { Y, docs } from "../entity/docs";
import { gunzipAsync, gzipAsync } from "../entity/zlib";
import { SyncConnection } from "../type";
export const yjs_sv_remote: SAction["yjs"]["sv_remote"] = async function (
this: SyncConnection,
mode,
id,
sv,
diff
) {
if (!docs[mode][id]) {
return;
}
const doc = docs[mode][id].doc;
const diff_local = Y.encodeStateAsUpdate(doc as any, await gunzipAsync(sv));
Y.applyUpdate(doc as any, await gunzipAsync(diff), "local");
return { diff: await gzipAsync(diff_local) };
};

View File

@ -0,0 +1,6 @@
import { ServerWebSocket } from "bun";
import { SyncConnection } from "../type";
import { WSData } from "../../../../../pkgs/core/server/create";
export const conns = new Map<string, SyncConnection>();
export const wconns = new WeakMap<ServerWebSocket<WSData>, string>();

View File

@ -6,11 +6,14 @@ const emptySnapshot = {
type: "" as "" | "comp" | "page",
id: "",
bin: new Uint8Array(),
url: "",
name: "",
ts: Date.now(),
};
export type DocSnapshot = typeof emptySnapshot;
export type DocSnapshot = typeof emptySnapshot & {
type: "page";
url: string;
id_site: string;
};
export const snapshot = {
_db: null as null | RootDatabase<DocSnapshot>,
@ -32,7 +35,7 @@ export const snapshot = {
const id = `${data.type}-${data.id}`;
let res = this.db.get(id);
if (!res) {
await this.db.put(id, structuredClone(emptySnapshot));
await this.db.put(id, structuredClone(emptySnapshot as DocSnapshot));
res = this.db.get(id);
}
return res as DocSnapshot;

View File

@ -1,7 +1,7 @@
import { dir } from "dir";
import { RootDatabase, open } from "lmdb";
import { g } from "utils/global";
import { KeyMap } from "../../../../web/src/utils/sync/keymap";
import { IndexedMap } from "../../../../web/src/utils/sync/idx-map";
const defaultConf = {
site_id: "",
@ -10,11 +10,15 @@ const defaultConf = {
export type UserConf = typeof defaultConf;
export const user = {
active: KeyMap.create<{
user_id: string;
site_id: string;
page_id: string;
}>("user_id"),
active: IndexedMap.create<
{
user_id: string;
site_id: string;
page_id: string;
select: "" | "comp" | "item" | "section" | "text";
},
"client_id"
>("client_id"),
conf: {
_db: null as null | RootDatabase<UserConf>,
init() {

View File

@ -3,27 +3,15 @@ import { ServerWebSocket, WebSocketHandler } from "bun";
import { Packr } from "msgpackr";
import { WSData } from "../../../../pkgs/core/server/create";
import { ClientEvent } from "../../../web/src/utils/sync/ws-client";
import { loadDefaultSite } from "./editor/load";
import { ActionCtx, SyncType } from "./type";
import { SyncActionPaths } from "./actions-def";
import * as actions from "./actions/index";
import { loadDefaultSite } from "./editor/load";
import { UserConf, user } from "./entity/user";
import { SyncType } from "./type";
import { conns, wconns } from "./entity/conn";
const packr = new Packr({ structuredClone: true });
const conns = new Map<
string,
{
user_id: string;
conf?: UserConf & { toJSON: () => UserConf };
ws: ServerWebSocket<WSData>;
msg: {
pending: Record<string, Promise<any>>;
resolve: Record<string, (result: any) => void>;
};
}
>();
const wconns = new WeakMap<ServerWebSocket<WSData>, string>();
const send = (ws: ServerWebSocket<WSData>, msg: any) => {
export const sendWS = (ws: ServerWebSocket<WSData>, msg: any) => {
ws.sendBinary(packr.pack(msg));
};
export const syncHandler: WebSocketHandler<WSData> = {
@ -31,11 +19,12 @@ export const syncHandler: WebSocketHandler<WSData> = {
const client_id = createId();
conns.set(client_id, {
user_id: "",
client_id,
ws,
msg: { pending: {}, resolve: {} },
});
wconns.set(ws, client_id);
send(ws, { type: SyncType.ClientID, client_id });
sendWS(ws, { type: SyncType.ClientID, client_id });
},
close(ws, code, reason) {
const conn_id = wconns.get(ws);
@ -69,7 +58,7 @@ export const syncHandler: WebSocketHandler<WSData> = {
return true;
},
}) as UserConf & { toJSON: () => UserConf };
send(ws, {
sendWS(ws, {
type: SyncType.Event,
event: "editor_start" as ClientEvent,
data: conn.conf.toJSON(),
@ -84,9 +73,7 @@ export const syncHandler: WebSocketHandler<WSData> = {
console.log(`app/ws/edit/sync/${actionName}.ts not found`);
}
if (baseAction) {
const action = baseAction.bind({
user: { id: conn.user_id, conf: conn.conf },
} as ActionCtx);
const action = baseAction.bind(conn);
ws.sendBinary(
packr.pack({

View File

@ -1,4 +1,6 @@
import { ServerWebSocket } from "bun";
import { UserConf } from "./entity/user";
import { WSData } from "../../../../pkgs/core/server/create";
export enum SyncType {
ClientID,
@ -7,6 +9,14 @@ export enum SyncType {
Action,
ActionResult,
}
export type ActionCtx = {
user: { id: string; conf: UserConf & { toJSON: () => UserConf } };
export type SyncConnection = {
user_id: string;
client_id: string;
conf?: UserConf & { toJSON: () => UserConf };
ws: ServerWebSocket<WSData>;
msg: {
pending: Record<string, Promise<any>>;
resolve: Record<string, (result: any) => void>;
};
};

View File

@ -31,6 +31,8 @@ export const edRoute = async (p: PG) => {
const doc = new Y.Doc();
Y.applyUpdate(doc, decompress(page.snapshot));
doc.on("update", async (bin: Uint8Array, origin: any) => {
if (origin === "sv_remote") return;
const res = await p.sync.yjs.sv_local(
"page",
p.page.cur.id,

View File

@ -1,6 +1,9 @@
import { compress, decompress } from "wasm-gzip";
import { clientStartSync } from "../../../utils/sync/ws-client";
import { Loading } from "../../../utils/ui/loading";
import { PG } from "./ed-global";
import { Y } from "../../../../../srv/ws/sync/entity/docs";
import { treeRebuild } from "./tree/build";
export const edInitSync = (p: PG) => {
const session = JSON.parse(
@ -24,12 +27,12 @@ export const edInitSync = (p: PG) => {
if (!paramsOK) {
if (e.site_id && e.page_id) {
p.site.id = e.site_id;
p.page.id = e.page_id;
p.page.cur.id = e.page_id;
navigate(`/ed/${e.site_id}/${e.page_id}`);
}
} else {
p.site.id = params.site_id;
p.page.id = params.page_id;
p.page.cur.id = params.page_id;
p.render();
}
},
@ -37,6 +40,32 @@ export const edInitSync = (p: PG) => {
p.site = site;
p.render();
},
async remote_svlocal(data) {
if (p[data.type].cur.id === data.id) {
const doc = p[data.type].doc as Y.Doc;
if (doc) {
const diff_remote = Y.encodeStateAsUpdate(
doc,
decompress(data.sv_local)
);
const sv_remote = Y.encodeStateVector(doc);
const sv = Buffer.from(compress(sv_remote));
const diff = Buffer.from(compress(diff_remote));
const res = await p.sync.yjs.sv_remote(
data.type,
data.id,
sv,
diff
);
if (res) {
Y.applyUpdate(doc, decompress(res.diff), "sv_remote");
await treeRebuild(p, { note: "sv_remote" });
}
}
}
},
},
}).then((e) => (p.sync = e));

View File

@ -18,7 +18,11 @@ export const edUndoManager = async (p: PG) => {
(evt.ctrlKey || evt.metaKey) &&
!evt.shiftKey
) {
console.log("redo");
if (p.comp.cur.id) {
p.sync.comp.redo(p.comp.cur.id);
} else {
p.sync.page.redo(p.page.cur.id);
}
return;
}
@ -27,7 +31,11 @@ export const edUndoManager = async (p: PG) => {
(evt.ctrlKey || evt.metaKey) &&
evt.shiftKey
) {
console.log("redo");
if (p.comp.cur.id) {
p.sync.comp.redo(p.comp.cur.id);
} else {
p.sync.page.redo(p.page.cur.id);
}
return;
}

View File

@ -0,0 +1,34 @@
export const IndexedMap = {
create: <OBJ extends Record<string, any>, KEY extends string>(id: KEY) => {
const all = {} as Record<string, OBJ & Record<KEY, string>>;
return {
add(item: OBJ & Record<KEY, string>) {
const _id = item[id];
all[_id] = item as any;
return _id;
},
find(id: KEY) {
return all[id];
},
del(id: KEY) {
delete all[id];
},
findAll(where: Partial<OBJ>) {
return Object.values(all).filter((item) => {
for (const key in where) {
if (item[key] !== where[key]) {
return false;
}
}
return true;
});
},
delAll(where: Partial<OBJ>) {
for (const item of this.findAll(where)) {
delete all[item[id]];
}
},
};
},
};

View File

@ -1,86 +0,0 @@
export const KeyMap = {
create: <K extends Record<string, string>>(main: keyof K) => {
const childs = {} as Record<string, ReturnType<typeof createChild>>;
const all = {} as Record<string, Record<string, string>>;
const createChild = (key: string) => {
return {
remove(id: string) {
if (all[id]) {
map.delete(all[key][id]);
}
},
};
};
const map = new EventedMap({
onSet({ key, value }) {
for (const [k, v] of Object.entries(value)) {
if (!childs[k]) {
childs[k] = createChild(k);
all[k] = {};
}
all[k][v] = key;
}
},
onDelete(key) {
const val = map.get(key);
for (const [k, v] of Object.entries(val)) {
delete all[k][v as string];
}
return true;
},
});
return new Proxy(
{},
{
get(_, key: string, receiver) {
if (key === "get") {
return map.get;
}
if (key === "set") {
return (val: K) => {
const key = val[main];
map.set(key, val);
};
}
if (!childs[key]) {
childs[key] = createChild(key);
}
return childs[key];
},
}
) as { get: (key: typeof main) => K; set: (val: K) => void } & Record<
keyof K,
EventedMap
>;
},
};
type MapEvents = Partial<{
onSet: (arg: { key: string; value: Record<string, string> }) => void;
onDelete: (key: string) => boolean;
}>;
class EventedMap extends Map {
_events: MapEvents;
constructor(events: MapEvents) {
super();
this._events = events;
}
set(key: string, value: Record<string, any>): this {
super.set(key, value);
this._events.onSet?.({ key, value });
return this;
}
delete(key: string): boolean {
const value = this.get(key);
if (value) {
this._events.onDelete?.(key);
}
return super.delete(key);
}
}

View File

@ -8,6 +8,7 @@ import {
SyncActionDefinition,
SyncActionPaths,
} from "../../../../srv/ws/sync/actions-def";
import { UserConf } from "../../../../srv/ws/sync/entity/user";
import { SyncType } from "../../../../srv/ws/sync/type";
import { ESite } from "../../render/ed/logic/ed-global";
import { w } from "../types/general";
@ -48,12 +49,13 @@ const sendWs = (ws: WebSocket, msg: any) => {
export const clientStartSync = async (arg: {
user_id: string;
events: {
editor_start: (arg: {
user_id: string;
site_id?: string;
page_id?: string;
}) => void;
editor_start: (arg: UserConf) => void;
site_loaded: (arg: { site: ESite }) => void;
remote_svlocal: (arg: {
type: "page" | "comp";
id: string;
sv_local: Uint8Array;
}) => void;
};
}) => {
const { user_id, events } = arg;
@ -115,6 +117,7 @@ const connect = (user_id: string, event: ClientEventObject) => {
conf.ws = ws;
};
ws.onclose = async () => {
console.log("disconnected..");
w.offline = true;
if (!conf.ws) {
await connect(user_id, event);