import { EventEmitter } from "events"; import { Emitter } from "@coder/events"; import { logger, field } from "@coder/logger"; import { Ping, NewEvalMessage, ServerMessage, EvalDoneMessage, EvalFailedMessage, ClientMessage, WorkingInitMessage, EvalEventMessage } from "../proto"; import { ReadWriteConnection, InitData, OperatingSystem, SharedProcessData } from "../common/connection"; import { ActiveEvalHelper, EvalHelper, Disposer, ServerActiveEvalHelper } from "../common/helpers"; import { stringify, parse } from "../common/util"; /** * Client accepts an arbitrary connection intended to communicate with the Server. */ export class Client { private evalId = 0; private readonly evalDoneEmitter = new Emitter(); private readonly evalFailedEmitter = new Emitter(); private readonly evalEventEmitter = new Emitter(); private _initData: InitData | undefined; private readonly initDataEmitter = new Emitter(); private readonly initDataPromise: Promise; private readonly sharedProcessActiveEmitter = new Emitter(); public readonly onSharedProcessActive = this.sharedProcessActiveEmitter.event; // The socket timeout is 60s, so we need to send a ping periodically to // prevent it from closing. private pingTimeout: NodeJS.Timer | number | undefined; private readonly pingTimeoutDelay = 30000; /** * @param connection Established connection to the server */ public constructor( private readonly connection: ReadWriteConnection, ) { connection.onMessage((data) => { let message: ServerMessage | undefined; try { message = ServerMessage.deserializeBinary(data); this.handleMessage(message); } catch (error) { logger.error( "Failed to handle server message", field("id", message && message.hasEvalEvent() ? message.getEvalEvent()!.getId() : undefined), field("length", data.byteLength), field("error", error.message), ); } }); connection.onClose(() => { clearTimeout(this.pingTimeout as any); // tslint:disable-line no-any this.pingTimeout = undefined; }); this.initDataPromise = new Promise((resolve): void => { this.initDataEmitter.event(resolve); }); this.startPinging(); } public dispose(): void { this.connection.close(); } public get initData(): Promise { return this.initDataPromise; } public run(func: (helper: ServerActiveEvalHelper) => Disposer): ActiveEvalHelper; public run(func: (helper: ServerActiveEvalHelper, a1: T1) => Disposer, a1: T1): ActiveEvalHelper; public run(func: (helper: ServerActiveEvalHelper, a1: T1, a2: T2) => Disposer, a1: T1, a2: T2): ActiveEvalHelper; public run(func: (helper: ServerActiveEvalHelper, a1: T1, a2: T2, a3: T3) => Disposer, a1: T1, a2: T2, a3: T3): ActiveEvalHelper; public run(func: (helper: ServerActiveEvalHelper, a1: T1, a2: T2, a3: T3, a4: T4) => Disposer, a1: T1, a2: T2, a3: T3, a4: T4): ActiveEvalHelper; public run(func: (helper: ServerActiveEvalHelper, a1: T1, a2: T2, a3: T3, a4: T4, a5: T5) => Disposer, a1: T1, a2: T2, a3: T3, a4: T4, a5: T5): ActiveEvalHelper; public run(func: (helper: ServerActiveEvalHelper, a1: T1, a2: T2, a3: T3, a4: T4, a5: T5, a6: T6) => Disposer, a1: T1, a2: T2, a3: T3, a4: T4, a5: T5, a6: T6): ActiveEvalHelper; /** * Run a function on the server and provide an event emitter which allows * listening and emitting to the emitter provided to that function. The * function should return a disposer for cleaning up when the client * disconnects and for notifying when disposal has happened outside manual * activation. */ public run(func: (helper: ServerActiveEvalHelper, a1?: T1, a2?: T2, a3?: T3, a4?: T4, a5?: T5, a6?: T6) => Disposer, a1?: T1, a2?: T2, a3?: T3, a4?: T4, a5?: T5, a6?: T6): ActiveEvalHelper { const doEval = this.doEvaluate(func, a1, a2, a3, a4, a5, a6, true); // This takes server events and emits them to the client's emitter. const eventEmitter = new EventEmitter(); const d1 = this.evalEventEmitter.event((msg) => { if (msg.getId() === doEval.id) { eventEmitter.emit(msg.getEvent(), ...msg.getArgsList().map(parse)); } }); doEval.completed.then(() => { d1.dispose(); }).catch((ex) => { d1.dispose(); // This error event is only received by the client. eventEmitter.emit("error", ex); }); return new ActiveEvalHelper({ // This takes client events and emits them to the server's emitter and // listens to events received from the server (via the event hook above). // tslint:disable no-any on: (event: string, cb: (...args: any[]) => void): EventEmitter => eventEmitter.on(event, cb), emit: (event: string, ...args: any[]): void => { const eventsMsg = new EvalEventMessage(); eventsMsg.setId(doEval.id); eventsMsg.setEvent(event); eventsMsg.setArgsList(args.map((a) => stringify(a))); const clientMsg = new ClientMessage(); clientMsg.setEvalEvent(eventsMsg); this.connection.send(clientMsg.serializeBinary()); }, removeAllListeners: (event: string): EventEmitter => eventEmitter.removeAllListeners(event), // tslint:enable no-any }); } public evaluate(func: (helper: EvalHelper) => R | Promise): Promise; public evaluate(func: (helper: EvalHelper, a1: T1) => R | Promise, a1: T1): Promise; public evaluate(func: (helper: EvalHelper, a1: T1, a2: T2) => R | Promise, a1: T1, a2: T2): Promise; public evaluate(func: (helper: EvalHelper, a1: T1, a2: T2, a3: T3) => R | Promise, a1: T1, a2: T2, a3: T3): Promise; public evaluate(func: (helper: EvalHelper, a1: T1, a2: T2, a3: T3, a4: T4) => R | Promise, a1: T1, a2: T2, a3: T3, a4: T4): Promise; public evaluate(func: (helper: EvalHelper, a1: T1, a2: T2, a3: T3, a4: T4, a5: T5) => R | Promise, a1: T1, a2: T2, a3: T3, a4: T4, a5: T5): Promise; public evaluate(func: (helper: EvalHelper, a1: T1, a2: T2, a3: T3, a4: T4, a5: T5, a6: T6) => R | Promise, a1: T1, a2: T2, a3: T3, a4: T4, a5: T5, a6: T6): Promise; /** * Evaluates a function on the server. * To pass variables, ensure they are serializable and passed through the included function. * @example * const returned = await this.client.evaluate((helper, value) => { * return value; * }, "hi"); * console.log(returned); * // output: "hi" * @param func Function to evaluate * @returns Promise rejected or resolved from the evaluated function */ public evaluate(func: (helper: EvalHelper, a1?: T1, a2?: T2, a3?: T3, a4?: T4, a5?: T5, a6?: T6) => R | Promise, a1?: T1, a2?: T2, a3?: T3, a4?: T4, a5?: T5, a6?: T6): Promise { return this.doEvaluate(func, a1, a2, a3, a4, a5, a6, false).completed; } // tslint:disable-next-line no-any private doEvaluate(func: (...args: any[]) => void | Promise | R | Promise, a1?: T1, a2?: T2, a3?: T3, a4?: T4, a5?: T5, a6?: T6, active: boolean = false): { readonly completed: Promise; readonly id: number; } { const newEval = new NewEvalMessage(); const id = this.evalId++; newEval.setId(id); newEval.setActive(active); newEval.setArgsList([a1, a2, a3, a4, a5, a6].map((a) => stringify(a))); newEval.setFunction(func.toString()); const clientMsg = new ClientMessage(); clientMsg.setNewEval(newEval); this.connection.send(clientMsg.serializeBinary()); const completed = new Promise((resolve, reject): void => { const dispose = (): void => { d1.dispose(); d2.dispose(); }; const d1 = this.evalDoneEmitter.event((doneMsg) => { if (doneMsg.getId() === id) { dispose(); resolve(parse(doneMsg.getResponse())); } }); const d2 = this.evalFailedEmitter.event((failedMsg) => { if (failedMsg.getId() === id) { dispose(); reject(parse(failedMsg.getResponse())); } }); }); return { completed, id }; } /** * Handles a message from the server. All incoming server messages should be * routed through here. */ private handleMessage(message: ServerMessage): void { if (message.hasInit()) { const init = message.getInit()!; let opSys: OperatingSystem; switch (init.getOperatingSystem()) { case WorkingInitMessage.OperatingSystem.WINDOWS: opSys = OperatingSystem.Windows; break; case WorkingInitMessage.OperatingSystem.LINUX: opSys = OperatingSystem.Linux; break; case WorkingInitMessage.OperatingSystem.MAC: opSys = OperatingSystem.Mac; break; default: throw new Error(`unsupported operating system ${init.getOperatingSystem()}`); } this._initData = { dataDirectory: init.getDataDirectory(), homeDirectory: init.getHomeDirectory(), tmpDirectory: init.getTmpDirectory(), workingDirectory: init.getWorkingDirectory(), os: opSys, shell: init.getShell(), builtInExtensionsDirectory: init.getBuiltinExtensionsDir(), }; this.initDataEmitter.emit(this._initData); } else if (message.hasEvalDone()) { this.evalDoneEmitter.emit(message.getEvalDone()!); } else if (message.hasEvalFailed()) { this.evalFailedEmitter.emit(message.getEvalFailed()!); } else if (message.hasEvalEvent()) { this.evalEventEmitter.emit(message.getEvalEvent()!); } else if (message.hasSharedProcessActive()) { const sharedProcessActiveMessage = message.getSharedProcessActive()!; this.sharedProcessActiveEmitter.emit({ socketPath: sharedProcessActiveMessage.getSocketPath(), logPath: sharedProcessActiveMessage.getLogPath(), }); } else if (message.hasPong()) { // Nothing to do since we run the pings on a timer, in case either message // is dropped which would break the ping cycle. } else { throw new Error("unknown message type"); } } private startPinging = (): void => { if (typeof this.pingTimeout !== "undefined") { return; } const schedulePing = (): void => { this.pingTimeout = setTimeout(() => { const clientMsg = new ClientMessage(); clientMsg.setPing(new Ping()); this.connection.send(clientMsg.serializeBinary()); schedulePing(); }, this.pingTimeoutDelay); }; schedulePing(); } }