为了实现远端设备可访问,这里选用了 WebRTC 作为 P2P 数据传输方案。避免繁琐的内网穿透处理。Peer使用方式可以直接跳转
什么是 NAT
NAT(Network Address Translation,网络地址转换)是一种在计算机网络中广泛使用的技术,主要用于将私有网络中的IP地址转换为公共网络中的IP地址。NAT的主要目的是解决IPv4地址短缺的问题,并提供一定程度的网络安全。
NAT在一定程度上隐藏了内部网络的IP地址,使得外部网络无法直接访问内部网络中的设备。相当于寄快递只填写到了合租的地址(公共地址),里面具体房间的地址是私有的,别人没法直接填写(访问)。
所以我们需要一些方法(内网穿透),让外部网络能够访问到内部网络中的设备。
什么是 STUN/TURN、ICE
点对点(P2P)通信的关键协议和机制。它们主要用于解决 NAT(网络地址转换)和防火墙等网络环境下的通信问题。
STUN(Session Traversal Utilities for NAT):
用于帮助客户端发现自己的公共 IP 地址和端口,以及了解 NAT 设备的类型。
客户端向 STUN 服务器发送请求。STUN 服务器返回客户端的公共 IP 地址和端口。客户端使用这些信息来建立与其他客户端的直接连接。
只能处理简单的 NAT 类型,复杂、对称NAT 类型可能无法穿透。
TURN(Traversal Using Relays around NAT):
用于在无法直接建立 P2P 连接的情况下,通过中继服务器转发数据。
数据都通过 TURN 服务器转发,从而绕过 NAT 和防火墙的限制。
相应的,数据经过服务器,会增加一定的延迟和带宽消耗。
ICE(Interactive Connectivity Establishment,交互式连接建立):
是一种框架,用于在复杂的网络环境中建立 P2P 连接。它结合了 STUN 和 TURN 的功能,并尝试使用多种候选地址(包括本地地址、STUN 提供的公共地址、TURN 中继地址)来建立最优的连接。
什么是 WebRTC
简单了解了 NAT、STUN/TURN、ICE 这些内容,可以大致了解 WebRTC 的工作原理。不管是想要以什么方式传输(UDP, TCP)、传输什么数据(MediaStream, RTCDataChannel),只要能建立连接,其他的都有很多的方式。
这里就不过多介绍了,更多可以去 Mdn Web docs 查阅。概要说连接流程就是,我把本地对应的端口(不管以什么方式,正常以 WS 的方式连接信令服务,交互双方信息)告诉给对方,对方收到之后,和我沟通建立连接。
Peer 使用
peerJs 默认连接 peer serve(免费的信令服务)
// client A
<script type="module">
import {Peer} from "https://esm.sh/peerjs@1.5.4?bundle-deps"
var peer = new Peer();
peer.on('open', function(id) {
console.log('My peer ID is: ' + id);
});
peer.on('connection', function(conn) {
// 接收到别的连接邀请
conn.on('data', function(data) {
console.log('Received', data);
// 发送数据
conn.send('hello');
});
});
</script>
// client B
<script type="module">
import {Peer} from "https://esm.sh/peerjs@1.5.4?bundle-deps"
var peer = new Peer();
// 与信令服务建立连接,可以通过 peer 属性获取连接状态。主动 disconnect 断开(建议退出时候销毁和断连)
peer.on('open', function(id) {
console.log('My peer ID is: ' + id);
// 如果需要主动发起邀请,需要在这个阶段之后再发起
const conn = peer.connect('peer A id');
conn.on('open', function() {
// 发送数据
conn.send('hello im B client');
});
conn.on('data', function(data) {
// 接收到数据
console.log('Received', data);
});
});
peer.on('connection', function(conn) {
// ...
});
</script>
再封装
基本使用也没太多的内容了,可以封装一下数据传输与初始化上,减少重复连接和管理 Connection,peer Connection 使用 Map 和 Object管理,但是属性废除了,获取不稳定(如有新的内容望大佬告知下),所以需要自己管理。
这里给出监听和请求两侧的封装。希望对大家有帮助。
import Peer, { type DataConnection } from "peerjs";
import {
ActionType,
type WebRTCContextType,
type ConnectionFunctionType,
} from './type';
import {
getRequest,
setRequest,
deleteRequest,
generateWebRTCContext,
} from './requestManager';
import {
on,
emit,
off,
} from './listeners';
const peerConfig = {
// debug: 3,
config: {
iceServers: [
{ urls: 'stun:stun.l.google.com:19302' }
]
}
}
type InitPeerConfigType = {
deviceId: string;
onPeerOpen?: (p: Peer) => void;
onPeerClosed?: (p: Peer) => void;
onPeerError?: (p: Peer, err: Error) => void;
onPeerReceivedConn?: (conn: DataConnection) => void;
}
type InitUniConnConfigType = {
connDeviceId: string;
onConnOpen?: (conn: DataConnection) => void;
onConnClose?: (conn: DataConnection) => void;
onConnError?: (conn: DataConnection, err: Error) => void;
onConnData?: (conn: DataConnection, d: unknown) => void;
}
export class PeerInstance {
// 静态属性,用于存储单例实例
private static instance: PeerInstance;
private peer: Peer | undefined;
private uniConn: DataConnection | undefined; // 为使用侧保留,连接到存储侧的conn
private uniConnInitBackup?: () => void;
private connectionsMap = new Map<string, DataConnection>();
private chunkMap = new Map<string, Uint8Array[]>();
constructor() {
console.warn('Dont new instance, use PeerInstance.getInstance()')
}
public static getInstance(): PeerInstance {
if (!PeerInstance.instance) {
PeerInstance.instance = new PeerInstance();
}
return PeerInstance.instance;
}
public init(config: InitPeerConfigType) {
if (this.peer) {
console.warn('peer already init');
return;
}
this.peer = new Peer(config.deviceId, peerConfig);
// @ts-ignore
window.peer = this.peer;
this.handlePeerListeners(config);
}
public destroy() {
if (this.peer) {
if (this.connections.length > 0) {
// todo: 改 Promise,全部处理了再destroy
this.connections.forEach((conn) => {
conn.close();
conn.once('close', () => {
console.log('connection close');
})
})
}
this.peer.disconnect();
this.peer.once('disconnected', () => {
console.log('peer disconnected');
this.peer?.destroy();
this.peer = undefined;
})
}
}
private handlePeerListeners(config: InitPeerConfigType) {
const p = this.peer;
if (p !== undefined) {
p.on('open', () => {
config.onPeerOpen && config.onPeerOpen(p);
if (this.uniConnInitBackup) {
this.uniConnInitBackup();
}
})
p.on('close', () => {
config.onPeerClosed && config.onPeerClosed(p);
})
p.on('error', (err) => {
config.onPeerError && config.onPeerError(p, err);
})
// 存储侧接受到的连接邀请
p.on('connection', (conn) => {
conn.on('open', () => {
config.onPeerReceivedConn && config.onPeerReceivedConn(conn);
this.connectionsMap.set(conn.peer, conn); // 不用删除
})
conn.on('data', (d: unknown) => {
console.log('存储端 route 接收到的数据:', d)
// 这里不用处理响应数据吧,只需要当成router来处理就好了
// this.handleConnResponse(d)
this.handleReceived(conn, d)
})
})
}
}
// ---- 存储侧注册的类似router事件,有handleReceived处理 ----
public register(action: ActionType, fn: ConnectionFunctionType) {
on(action, fn);
}
public unregister(action: ActionType) {
off(action);
}
private async handleReceived(conn: DataConnection, d: unknown) {
try {
const data = d as WebRTCContextType;
await emit(data.action, data, conn);
conn.send(data);
} catch (error) {
console.warn(error);
}
}
// ----
// 本侧发送数据 - 与接收返回数据的处理
public connect(config: InitUniConnConfigType) {
if (this.peer) {
if (this.uniConn) {
console.warn('uniConn already init');
return;
}
if (this.peer.open) {
this.handleUniConnListeners(config);
return;
}
this.uniConnInitBackup = () => {
this.handleUniConnListeners(config);
}
}
}
public handleUniConnListeners(config: InitUniConnConfigType) {
const p = this.peer;
if (!p) return;
if (this.uniConn) return;
const uc = p.connect(config.connDeviceId)
this.uniConn = uc;
this.uniConnInitBackup = undefined;
uc.on('open', () => {
config.onConnOpen && config.onConnOpen(uc);
})
uc.on('close', () => {
config.onConnClose && config.onConnClose(uc);
})
uc.on('error', (err) => {
config.onConnError && config.onConnError(uc, err);
})
uc.on('data', (d) => {
config.onConnData && config.onConnData(uc, d);
// 处理返回数据
console.log('使用侧 接收到 存储侧 的数据:', d)
this.handleConnResponse(d);
})
// @ts-ignore
window.uniConn = uc;
}
public request<T>(action: ActionType, body: unknown, config: {
target?: DataConnection;
request?: {}
}): Promise<T> {
return new Promise((resolve, reject) => {
const context = generateWebRTCContext(action, body);
context.request = {
...context.request,
...config.request,
};
setRequest(context.request.id, {
resolve,
reject,
context,
})
const conn = config.target || this.getUniConn();
if (conn) {
conn.send(context);
} else {
reject('conn not init');
}
})
}
private handleConnResponse(d: unknown) {
try {
const ctx = d as WebRTCContextType;
// todo: reject handle?
if (ctx.response.headers?.contentType === 'application/octet-stream') {
const body = ctx.response.body.data as {
type: 'start' | 'chunk' | 'end';
chunk: Uint8Array;
};
if (body.type === 'start') {
this.chunkMap.set(ctx.request.id, [body.chunk]);
return;
}
if (body.type === 'chunk') {
const chunks = this.chunkMap.get(ctx.request.id);
chunks?.push(body.chunk);
return;
}
if (body.type === 'end') {
const chunks = this.chunkMap.get(ctx.request.id);
// chunks 转成reader
const reader = new ReadableStream({
start(controller) {
chunks?.forEach((chunk) => {
controller.enqueue(chunk);
})
controller.close();
},
}).getReader();
ctx.response.body = {
...ctx.response.body,
data: reader
};
this.chunkMap.delete(ctx.request.id);
}
}
const target = getRequest(ctx.request.id);
if (target) {
target.resolve(ctx);
deleteRequest(ctx.request.id);
}
} catch (error) {
console.warn(error);
}
}
// ---- peer ----
public getPeer() {
return this.peer;
}
// ---- connections ----
public getConnectionsMap() {
return this.connectionsMap;
}
public getUniConn() {
return this.uniConn;
}
}
// requestManager.ts
// 被peer模块依赖
import { v4 as uuidv4 } from "uuid";
import { type ActionType, type WebRTCContextType, type RequestManagerType } from './type'
const requestMap = new Map();
export const setRequest = (id: string, ctx: RequestManagerType) => {
requestMap.set(id, ctx);
}
export const getRequest = (id: string) => {
return requestMap.get(id);
}
export const deleteRequest = (id: string) => {
requestMap.delete(id);
}
export function generateWebRTCContext(action: ActionType, body: unknown): WebRTCContextType {
return {
action,
request: {
id: uuidv4(),
body,
},
response: {
body: {
code: 200,
data: undefined,
message: ''
},
}
}
}
// listeners.ts
import type { DataConnection } from 'peerjs';
import {
type ActionType,
type WebRTCContextType,
type ConnectionFunctionType,
} from './type';
const functionMap = new Map<ActionType, ConnectionFunctionType>();
export function on(action: ActionType, fn: ConnectionFunctionType) {
// 使用set可以重复注册
functionMap.set(action, fn);
}
export async function emit(action: ActionType, ctx: WebRTCContextType, conn: DataConnection) {
const fn = functionMap.get(action);
if (fn) {
await fn(ctx, conn);
}
}
export function off(action: ActionType) {
functionMap.delete(action);
}