banner
Shemol

Shemol

我好想伸出手,拥抱这个世界
pleroma
x
telegram
github

KubeEdge云边通信框架

参考资料#

Beehive#

Beehive 是 KubeEdge 中的核心消息通讯框架,用于不同模块的注册和模块之间的通信,KubeEdge 中的 CloudCore 和 EdgeCore 组件都依赖于 beehive 框架。因此,我们需要首先了解 Beehive 的工作机制,才能更进一步理解 KubeEdge 的设计理念和工作原理。
Beehive 是基于 goland channel 实现的消息通信框架,核心能力包含两部分:module 注册管理和 module 间通信管理,分别对应 beehive 中定义的接口 ModuleContext 和 MessageContext,其架构图如下所示:
Pasted image 20240312203517

message 通信格式#

在分析 beehive 的具体功能之前,我们先看一下通信的消息格式,message 是 beehive 中不同 module 之间通信的信息载体,message 包含三部分内容,如下所示:

  • Header:
    • ID:消息 ID,UUID 字符串
    • ParentID:如果是对同步消息的响应,则说明 parentID 存在 (只会在同步消息的响应中存在)
    • TimeStamp:生成消息的时间 (时间戳
    • Sync:消息是否为同步类型消息的标志,为true则说明是同步消息
  • Route:
    • Source:消息的来源
    • Group:消息所属的 group
    • Operation:资源的操作
    • Resource:操作的资源
  • Content:消息的内容

context 数据结构#

ModuleContext 和 MessageContext 定义的接口均由 Context 来实现,其数据结构如下所示:

//Context is object for Context channel
type Context struct{
	//ConfigFactory goarchaius.ConfigurationFactory
	channels map[string]chan model.Message
	chsLock  sync.RwMutex
	typeChannels map[string]map[string]chan model.Message
	typeChsLock  sync.RWMutex
	anonChannels map[string]chan model.Message
	anonChsLock  sync.RWMutex
}

完整代码链接:https://github.com/kubeedge/kubeedge/blob/master/staging/src/github.com/kubeedge/beehive/pkg/core/channel/context_channel.go

  • channels - channels 是模块的名称和对应的消息 channel 映射,用于将消息发送到相应的模块
  • chsLock - channels map 的锁
  • typeChannels - typeChannels 是一个两级 map,第一级 key 是 group 名字,第二级 key 是 module 名字,value 是 module 对应的消息 channel。
  • typeChsLock - typeChannels map 的锁
  • annoChannels - annoChannels 是消息 parentID 到 channel 的映射,将用于发送同步消息的响应。
  • annoChsLock - annoChannels map 的锁

beehive module 管理#

在 beehive 中,module 定义是一个接口,只要实现了此接口,便可以称为 module,在 KubeEdge 中常见的模块,如 cloudhub、edgehub、edgeController 等,都已经实现了这个接口。

//Module Interface 
type Moudule interface{
	Name() string
	Group() string
	Start() 
	Enable() bool
}

完整代码:https://github.com/kubeedge/kubeedge/blob/master/staging/src/github.com/kubeedge/beehive/pkg/core/module.go

Beehive 支持的 module 操作如下:

	type ModuleContext interface {
	AddModule(info *common.ModuleInfo)
	AddModuleGroup(module.group string)
	Cleanup(module string)
}

完整代码:https://github.com/kubeedge/kubeedge/blob/master/staging/src/github.com/kubeedge/beehive/pkg/core/context/context.go

接口功能实现
AddModule添加 module首先创建一个 message 类型的 channel,然后保存到 Context 中的 channels map 里
AddModuleGroup添加 module 到所属 group首先会从 Context 中的 channels map 里查询对应的 channel,然后将对应的 group 以及 module 和 channel 保存到 typeChannels 里面
Cleanup清理 module将 module 信息从 channels 和 typeChannels 清除

beehive 消息通信管理#

beehive 中注册的模块之间,可以相互通信,beehive 支持多种通信方式,如下所示:

//MessageContext is interface for messaging syncing
type MessageContext interface {
	//async mode
	Send(module string, message model.Message)
	Receive(module string)(model.Message error)
	//sync mode
	SendSync(module string,message model.Message,timeout time.Duration)(model.Message error)
	SendResp(message model.Message)
	//group broadcast
	SendToGroup(group string,message model.Message)
	SendToGroupSync(group string,message model.Message,timeout time.Duration) error
}

https://github.com/kubeedge/kubeedge/blob/master/staging/src/github.com/kubeedge/beehive/pkg/core/context/context.go

接口功能实现
Send发送异步消息到指定 module从 Context 中的 channels map 里查询对应 module 的 channel,然后将消息放入到 channel 里面
Receive接收发送到指定 module 的消息从 Context 中的 Channels map 查询对应 module 的 channel,然后从 channel 里面取出消息,如果没有消息到达,则会阻塞,直至有消息到达
SendSync发送同步消息到指定 moduleSendSync 从 channels map 中获取模块的 channel,将消息放入 channel,然后创建一个新的 channel,并将其添加到 annoChannels 映射中,其中 key 是 messageID,然后在这个 channel 上等待接收消息(响应),直到超时,如果在超时之前收到响应消息,则返回响应消息,否则返回空的消息和超时错误
SendResp发送对同步消息的响应根据 message 中 parentID 在 annoChannels 查找对应的 channel,然后将消息放入 channel 中,如果不存在,则记录错误
SendToGroup发送异步消息到指定 group 下的所有 moduleSendToGroup 从 typeChannels map 中获取指定 group 下的所有 module,然后遍历 module,依次发送消息到 module
SendToGroupSync发送同步消息到指定 group 下的所有 moduleSendToGroup 从 typeChannels map 中获取指定 group 下的所有 module,创建了一个 size 和 module 数量一样的匿名通道,然后遍历 module,调用 send 发送消息,然后等待匿名消息通道收到消息的数量等于 size

beehive 模块注册启动#

在 cloudcore 或者 edgecore 启动的时候,会将所有的 module 注册到 beehive 内核,beehive 中维护了 module 名字到 module 的映射。

//registerModules register all the modules started in cloudcore
func registerModules(c *v1alpha1.CloudCoreConfig){
	cloudhub.Register(c.Modules.Cloudhub)
	edgecontroller.Register(c.Modules.EdgeController)
	devicecontroller.Register(c.Modules.DeviceController)
	nodeupgradejobcontroller.Register(c.Modules.NodeUpgradeJobController)
	synccontroller.Register(c.Modules.SyncController)
	cloudstream.Register(c.Modules.CloudStream,c.CommonConfig)
	router.Register(c.Modules.Router)
	dynamiccontroller.Register(c.Modules.DynamicController)
	policycontroller.Register(client.CrdConfig)
}

https://github.com/kubeedge/kubeedge/blob/master/cloud/cmd/cloudcore/app/server.go#L155-L166
在 beehive 启动的时候,会获取所有注册的 module,然后遍历所有的 module,依次执行如下操作:

  1. 根据 module 的类型,初始化 moduleInfo 信息
  2. 执行 beehiveContext.AddModule
  3. 执行 beehiveContext.AddModuleGroup
  4. 调用每个 module 的 start 方法启动 module
//StartModules starts modules that are registered 
func StartModules(){
	//only register channel mode,if want to use socket mode,we should also pass in common.MsgCtxTypeUSparameter
	beehiveContext.InitContext([]string{common.MsgCtxTypeChannel})

	modules := GetModules()

	for name, module := range modules{
		var m common.ModuleInfo
		switch module.contextType{
		case common.MsgCtxTypeChannel:
			m = common.ModuleInfo{
				ModuleName: name,
				ModuleType: module.contextType,
			}
		......

		default:
			klog.Exitf("unsupported context type: %s",module.contextType)
		}

		beehiveContext.AddModule(&m)
		beehiveContext.AddModuleGroup(name,module.module.Group())

		go moduleKeeper(name, module, m)
		klog.Infof("starting module %s",name)
	}
}

https://github.com/kubeedge/kubeedge/blob/master/staging/src/github.com/kubeedge/beehive/pkg/core/core.go

viaduct#

viaduct 是 KubeEdge 云边通信的中间件,基于统一的抽象接口,提供了不同协议的服务端和客户端实现,用于边缘节点和云端管理面的连接管理和数据传输管理。viaduct 屏蔽了不同网络协议之间的差异,使用统一的接口对上层提供服务,支持用户可以通过配置云边通信的网络协议灵活选择接入协议。目前内置了多种网络协议的实现。如 websocket 和 quic。后续根据不同的边缘接入场景和业务场景,通过 viaduct 可以快速对接新的网络协议,满足用户的需求
viaduct 中主要分为两部分:服务端和客户端接口以及不同协议的实现。服务端被云端组件 CloudCore 所使用,用来启动不同协议的 server,用来边缘节点的接入和数据传输,客户端被边缘组件 edgeCore 所使用,是用来发起接入的 client,用于连接云端 CloudCore 组件。其主要架构如下图所示:
Pasted image 20240312221514
接下来以 WebSocket 协议为例子,来介绍服务端和客户端接口以及实现。

Connection 接口定义#

Connection 是 viaduct 中核心的接口,viaduct 支持双向通信协议,云端和边缘节点可以双向消息传输,在边缘节点接入的时候,server 端和 client 端均需要初始化 Connection,并通过此 Connection 进行全双工通信。Connection 接口定义如下:

//the operation set of connection
type Connection interface{
	//process message from the connection
	ServeConn() //服务端从通道中持续的读取消息

	//SetReadDeadline sets the deadline for future Read calls
	//and any currently-blocked Read call.
	//A zero value for t means Read will not time out.
	SetReadDeadline(t time.Time) error

	//SetWriteDeadline sets the deadline for future write calls
	//and any currently-blocked write call.
	//Even if write times out.It may return n > 0, indicating that 
	//some of the data was successfully written.
	//A zero value for t means write will not time out.
	SetWriteDeadline(t time.Time) error

	//write write raw data to the connection 
	//it will open a stream for raw data 
	write(raw []byte)(int,error)

	//writeMessageAsync writes data to the connection and don't care about the response 
	WriteMessageAsync(msg *model.Message) error //异步

	//writeMessageSync writes data to the connection and care about the response
	WriteMessageSync(msg *model.Message)(*model.Message,error) //同步

	//ReadMessage reads message from the connection
	//it will be blocked when no message received 
	//if you want to use this api for message reading
	//make sure AutoRoute be false 
	ReadMessage(msg *model.Message ) error

	//RemoteAddr returns the remote network address
	RemoteAddr() net.Addr

	//LocalAddr returns the local network address
	LocalAddr() net.Addr

	//connectState return the current connection state
	ConnectionState() connectionState

	//Close closes the connection
	//Any blocked Read or Write operations will be unblocked and return errors
	Close() error
}

https://github.com/kubeedge/kubeedge/blob/master/staging/src/github.com/kubeedge/viaduct/pkg/conn/conn.go

其核心接口如下:

接口功能
ServeConn云端 server 使用,持续的从 connection 中读取数据并转换成 message,然后调用回调函数执行分发操作
Read从 connection 中读取原始 byte 数据
Write向 connection 中写入原始 byte 数据
WriteMessageAsync向 connection 中写入 message 数据,不需要对端响应
WriteMessageSync向 connection 中写入 message 数据,同时等待对端的响应消息
ReadMessage从 connection 中读取数据并转化为 message 格式

server 接口定义及 websocket 实现#

server 的接口定义比较简单

//protocol server 
type ProtocolServer interface {
	ListenAndServerTLS() error
	close() error
}

websocket 协议的实现也比较简单

func (srv *WSServer) ListenAndServeTLS() error{
		return srv.server.ListenAndServeTLS("","")
}
func (srv *WSServer)Close() error{
	if srv.server != nil{
		return srv.server.Close()
	}
	return nil
}

https://github.com/kubeedge/kubeedge/blob/master/staging/src/github.com/kubeedge/viaduct/pkg/server/ws.go
WSServer 的核心逻辑在于 ServeHTTP 方法里面,用于处理边缘节点的接入,流程如下所示:
Pasted image 20240312223542

client 接口定义及 websocket 实现#

client 接口定义也是比较简单,如下所示,Connect 方法用于连接云端的 server,并返回 Connection 对象,然后边缘侧使用此 connection 进行消息的读取和写入

//each protocol(websocket/quic) provides Connect
type Protocolclient interface{
	Connect() (conn.Connection,error)
}

websocket Client 通过 dial 发起对云端的连接,连接成功之后,如果对应的 Callback 不为空,则发起回调函数,然后初始化 connection 对象并返回

//Connect try to connect remote server
func(c *WSClient)Connect()(conn.Connection,error){
	header := c.exOpts.Header
	header.Add("ConnectionUse",string(c.options.ConnUse))
	wsConn,resp,err := c.dialer.Dial(c.options.Addr,header)
	if err ==nil{
		klog.Infof("dial %s successfully",c.options.Addr)

		//do user's processing on connection or response
		if c.exOpts.callback != nil{
			c.exOpts.Callback(wsConn,resp)
		}
		return conn.NewConnection(&conn.ConnectionOptions{
			ConnType: api.ProtocolTypeWS,
			ConnUse:  c.options.ConnUse,
			Base:     wsConn,
			Consumer: c.options.Consumer,
			Handler:  c.options.Handler,
			CtrlLane: lane.NewLane(api.ProtocolTypeWS,wsConn),
			State:    &conn.ConnectionState{
				State:  api.StatConnected,
				Headers:c.exOpts.Header.Clone(),
			},
			AutoRoute: c.options.AutoRoute,
		}),nil
	}

	//something wrong!
	var respMsg string
	if resp != nil{
		body, errRead := io.ReadAll(io.LimitReader(resp.Body,comm.MaxReadLength))
		if errRead ==nil{
			respMsg = fmt.Sprintf("response code: %d,response body: %s",resp.StatusCode,string(body))
		}else{
			respMsg = fmt.Sprintf("response code: %d",resp.StatusCode)
		}
		resp.Body.Close()
	}
	klog.Errorf("dial websocket error(%+v), response message: %s",err, respMsg)

	return nil,err
}

https://github.com/kubeedge/kubeedge/blob/master/staging/src/github.com/kubeedge/viaduct/pkg/client/ws.go

cloudhub#

Cloudhub 是云端组件 Cloudcore 的一个模块,负责边缘节点的接入和云边数据传输,是 Controllers 和边缘 Edgecore 之间的中介,它负责分发下行消息(其内封装了 k8s 资源事件,如 pod update 等)到边缘节点,也负责接收边缘节点发送的状态消息并转发至对应的 controllers。Cloudhub 在 KubeEdge 中的位置如下所示:
Pasted image 20240312192822
Cloudhub 内部有几个重要的代码模块,如下所示:
Pasted image 20240312230218

  • HTTP server:为边缘节点提供证书服务入口,如获取 CA 证书、证书签发与证书轮转
  • WebSocket server:可配置是否开启,为边缘节点提供 WebSocket 协议接入服务
  • QUIC server:可配置是否开启,为边缘节点提供 QUIC 协议接入服务
  • CSI socket server:在云端用来和 csi driver 通信
  • Token manager:边缘节点接入 token 凭据管理,token 默认 12h 轮转
  • Certificate manager:边缘节点证书签发和轮转的实现模块
  • message handler:边缘节点接入管理和边缘消息处理分发
  • node session manager:边缘节点会话生命周期管理
  • message dispatcher:上行和下行消息分发管理

Cloudhub 启动流程#

Cloudhub 在 Cloudcore 启动时注册,通过 beehive 消息通信框架调用 Start () 函数启动 Cloudhub 模块

cloudhub.Register(c.modules.Cloudhub)

Cloudhub 启动的时候,首先会启动 dispatcher.DispatchDownstream 协程,用来异步分发下行消息,其次进行证书的初始化,如果没有配置证书,则会自动生成 CA 和服务证书,用于后续 WebSocket、Quic、HTTP 服务的安全通讯。然后启动 token manager 模块,生成边缘节点接入使用的 token 凭据以及开启自动轮转服务。StartHTTPServer () 启动服务器监听,主要用于 EdgeCore 申请证书,它将等待 edgecore 发来请求,获取证书。然后,启动 cloudhub 服务,具体的操作是使用 viaduct 中间件启动一个服务器,等待 edgecore 发来连接的请求,协议可以是基于 tcp 的 WebSocket 或基于 udp 的 QUIC。如果用户需要使用 CSI 相关功能,则会启动 CSI socket server。

func (ch *cloudHub) Start(){
	if !cache.WaitForCacheSync(beehiveContext.Done(),ch.informersSyncedFuncs...
	{
		klog.Errorf("unable to sync caches for objectSyncController")
		os.Exit(1)
	}

	//start dispatch message from the cloud to edge node
	go ch.dispatcher.DispatchDownstream()

	//check whether the certificates exists in the local directory.
	//and then check whether certificates exists in the secret.
	//generate if they don't exist
	if err := httpserver.PrepareAllCerts(): err!=nil{
		klog.Exit(err)
	}

	DoneTLSTunnelCerts <- true
	close(DoneTLSTunnelCerts)

	//generate Token
	if err:=httpserver.GenerateToken():err!=nil{
		klog.Exit(err)
	}

	//HttpServer mainly used to issue certificates for the edge
	go httpserver.StartHTTPServer()

	servers.StartCloudHub(ch.messageHandler)

	if hubconfig.Config.UnixSocket.Enable{
	//The uds server is only used to communicate with csi driver from kubeedge on cloud
	//It is not used to communicate  between cloud and edge
	go udsserver.StartServer(hubconfig.Config.unixSocket.Address)
	}
}

https://github.com/kubeedge/kubeedge/blob/master/cloud/pkg/cloudhub/cloudhub.go
接下来,我们看一下 cloudhub 的核心功能,边缘节点接入管理和消息分发管理,下图是 CloudHub 的内部实现架构图:
Pasted image 20240313082138

下行消息发送模式#

发送到边缘节点的下行消息,有两种发送模式,这两种发送模式,直接关系到下行消息的分发和节点 session 的消息处理,如下所示:

ACK 模式:在这种模式下,边缘节点收到下行消息并将消息正确保存到本地数据存储之后,需要给云端发送 ACK 响应消息以通知云端消息在边缘侧被正确处理,如果云端没有收到 ACK 消息,则认为消息没有在边缘节点正确处理,则会重试,直到收到 ACK 响应消息

NO-ACK 模式:在这种模式下,边缘节点收到下行消息后,不需要给云端发送 ACK 响应消息,云端认为边缘侧已经收到消息并正确处理,在这种模式下,消息有可能丢失。这种模式,通常用于给边缘节点同步消息发送响应,如果边缘侧没有收到响应,则会触发重试操作。

边缘节点接入#

边缘节点接入的主要逻辑在 messageHandler 里面,handler 接口如下所示:

type Handler interface{
	//HandleConnection is invoked when a new connection arrives
	HandleConnection(connection conn.Connection)

	//HandleMessage is invoked when a new message arrives
	HandleMessage(container *mux.MessageContainer,writer mux.ResponseWriter)

	//OnEdgeNodeConnect is invoked when a new connection is established
	OnEdgeNodeConnect(info *model.HubInfo,connection conn.Connection) error
	//OnEdgeNodeDisconnect is invoked when a connection is lost
	OnEdgeNodeDisconnect(info *model.HubInfo,connection conn.Connection)

	//OnReadTransportErr is invoked when the connection read message err
	
}

https://github.com/kubeedge/kubeedge/blob/master/cloud/pkg/cloudhub/handler/message_handler.go
HandleConnection 用来处理边缘节点接入,以 WebSocket 协议接入为例,WebSocket server 通过 viaduct 启动之后,当有边缘节点接上来时,viaduct 中 serverHTTP 将 http 协议 upgrade 成为 websocket 协议,然后初始化 Connection 对象,HandleConnection 根据传入的 connection 对象进行一系列初始化操作:

  1. 执行初始化前的校验工作,如是否超过配置的 node 数量限制。
nodeID := connection.ConnectionState().Headers.Get("node_id")
projectID := connection.ConnectionState().Headers.Get("project_id")

if mh.SessionManager.ReachLimit(){
	klog.Errorf("Fail to serve node %s,reach node limit",nodeID)
	return 
}
  1. 初始化 nodeMessagePool,并加入到 MessageDispatcher 的哈希表中,用于存储分发的下行消息。
//init node message pool and add to the dispatcher
nodeMessagePool := common.InitNodeMessagePool(nodeID)
mh.MessageDispatcher.AddNodeMessagePool(nodeID,nodeMessagePool)

nodeMessagePool 是用来存储下行消息的队列,每个边缘节点在接入时,都会初始化一个对应的 nodeMessagePool,和之前的下行消息发送模式对应,nodeMessagePool 包含两个队列,分别用来存储 ACK 和 NO-ACK 模式的下行消息。

//NodeMessagePool is a collection of all downstream message sent to an
//edge node.There are two types of messages,one that requires an ack 
//and another that does not.For each type of message.we use the 'queue' 
//to mark the order of sending, and use the 'store' to store specific messages
type NodeMessagePool struct{
	//AckMessageStore store message that will send to edge node 
	//and require acknowledgement from edge node
	AckMessageStore cache.Store
	//AckMessageQueue store message key that will send to edge node
	//and require acknowledgement from edge node 
	AckMessageQueue workqueue.RateLimitingInterface
	//NoAckMessageStore store message that will send to edge node
	//and do not require acknowledgement from edge node
	NoAckMessageStore cache.Store
	//NoAckMessageQueue store message key that will send to edge node
	//and do not require acknowledgement from edge node
	NoAckMessageQueue workqueue.RateLimitingInterface
}

https://github.com/kubeedge/kubeedge/blob/master/cloud/pkg/cloudhub/common/message_pool.go
3. 初始化 nodeSession 对象,加入到 SessionManager 哈希表中,并启动 nodeSession

//create a node session for each edge node 
nodeSession := session.NewNodeSession(nodeID,projectID,connection,
	keepaliveInterval,nodeMessagePool,mh.reliableClient)
//add node session  to the session manager
mh.SessionManager.AddSession(nodeSession)

//Start session for each edge node and it will keep running until 
//it encounters some Transport Error from underlying connection.
nodeSession.Start()

每个边缘节点对应一个 nodeSession,nodeSession 是对每个边缘节点连接会话的抽象,SessionManager 存储并管理连接到当前 cloudHub 的所有边缘节点的 session,nodeSession 启动时,会启动该节点所需要的所有处理协程,包括:KeepAliveCheck 心跳检测,SendAckMessage 发送 ACK 模式的下行消息,SendNoAckMessage 发送 NO-ACK 模式的下行消息。

//Start the main goroutine responsible for serving node session
func (ns *NodeSession)Start(){
	klog.Infof("Start session for edge node %s",ns.nodeID)

	go ns.KeepAliveCheck()
	go ns.SendAckMessage()
	go ns.SendNoAckMessage()

	<-ns.ctx.Done()
}

上下行消息分发#

在 CloudHub 中,上下行消息的处理比较简单,主要逻辑在 messageHandler 的 HandleMessage 方法中,底层的 viaduct 库进行数据的解析转换成 MessageContainer 对象,里面包含了 message 信息,HandleMessage 收到 message 后,进行简单的校验,然后调用 MessageDispatcher DispatchUpstream 方法,转发到不同的模块,如 edgeController、deviceController 等。

//HandleMessage handle all the request from node 
func (mh *messageHandler)HandleMessage(coantainer *mux.MessageContainer,writer mux.ResponseWriter){
	nodeID := container.Header.Get("node_id")
	projectID := container.Header.Get("project_id")

	//validate message
	if container.Message == nil{
		klog.Errorf("The message is nil for node: %s",nodeID)
		return 
	}

	klog.v(4).Infof("[messageHandler]get msg from node(%s): %+v",nodeID,container.Message)

	//dispatch upstream message 
	mh.MessageDispatcher.DispatchUpstream(container.Message,&model.HubInfo{ProjectID: projectID,NodeID:nodeID})
}

下行消息的分发流程,以发送 ACK 消息为例,主要包括以下流程:

  1. KubeEdge 使用 K8s objectSync CRD 存储已成功发送到 Edge 的资源的最新的 resourceVersion。当 Cloudhub 重新启动或正常启动时,它将检查待发送的资源 resourceVersion 和已发送成功的 resourceVersion,以避免发送旧消息。
  2. EdgeController 和 devicecontroller 等将消息发送到 Cloudhub,MessageDispatcher 将根据消息中的节点名称,将消息发送到对应的 NodeMessagePool,同时会根据消息的 resource 等信息来选择发送模式。在加入队列的过程中,会查询资源对应的 objectSync CR,获取发送成功的最新资源 resourceVersion,并和待加入队列的消息比较,避免重复发送。
  3. 节点对应的 nodeSession SendAckMessage 协程将顺序地将数据从 NodeMessagePool 取出发送到相应的边缘节点,同时并将消息 ID 存储在 ACK channel 中。当收到来自边缘节点的 ACK 消息时,ACK channel 将收到通知,并将当前消息的 resourceVersion 保存到 objectSync CR 中,并发送下一条消息。
  4. 当 Edgecore 收到消息时,它将首先将消息保存到本地数据存储中,然后将 ACK 消息返回给云端。如果 cloudhub 在此间隔内未收到 ACK 消息,它将继续重新发送该消息 5 次。如果所有 5 次重试均失败,cloudhub 将丢弃该事件。
  5. CloudCore 中另一个模块 SyncController 将处理这些失败的事件。即使边缘节点收到该消息,返回的 ACK 消息也可能在传输过程中丢失。在这种情况下,SyncController 将再次发送消息给 cloudhub,再次触发下行消息分发,直至成功。
func (ns *NodeSession)SendMessageWithRetry(copyMsg, msg *beehivemodel.Message)error{
	ackChan := make(chan struct{})
	ns.ackMessageCache.Store(copyMsg.GetID(),ackChan)

	//initialize retry count and timer for sending message
	retryCount := 0
	ticker := time.NewTimer(sendRetryInterval)

	err := ns.connection.WriteMessageAsync(copyMsg)
	if err !=nil{
		return err	
	}

	for{
		select{
		case <- ackChan:
			ns.saveSuccessPoint(msg)
			return nil

		case <- ticker.C:
			if retryCount == 4{
				return ErrwaitTimeout
			}
			err := ns.connection.WriteMessageAsync(copyMsg)
			if err !=nil{
				return err
			}
			retryCount++
			ticker.Reset(sendRetryInterval)			
		}
	}
}

https://github.com/kubeedge/kubeedge/tree/master/cloud/pkg/cloudhub/session

synccontroller#

在边缘计算场景下,边缘的网络通常是不稳定的,这将导致云边的网络连接频繁断开,在云边协同通信时存在丢失数据的风险。synccontroller 是 CloudCore 中的一个模块,用来保障消息的可靠性发送。在 KubeEdge 中,使用 objectSync 对象来持久化云边协同消息状态。在云和边缘状态同步的过程中,云端会实时记录每个边缘节点同步成功的最新消息版本号(ResourceVersion),并以 CR 的形式持久化保存到 K8s 中。该机制可以保证在边缘场景下云端故障或者边缘离线重启后消息发送的顺序和连续性,避免重发旧消息引起云边状态不一致问题。与此同时,synccontroller 会周期性检查同步云边数据,保持一致性。它主要负责周期性检查每个边缘节点的同步状态,对比 K8s 中资源的信息,将不一致的状态同步到边缘,确保云边状态的最终一致性。
synccontroller 在 Cloudcore 启动时注册,通过 beehive 消息通信框架调用 Start () 函数启动 synccontroller 模块。

synccontroller.Register(c.Modules.syncController)

synccontroller 启动时,会开启周期性的检测,间隔 5s 执行一次。

func (sctl *SyncController) Start(){
	if !cache.WaitForCacheSync(beehiveContext.Done(),sctl.informersSyncedFuncs...){
		klog.Errorf("unable to sync caches for sync controller")
		return 
	}

	sctl.deleteObjectSyncs() //check outdate sync before start to reconcile
	go wait.Until(sctl.reconcile, 5*time.Second,beehiveContext.Done())
}

https://github.com/kubeedge/kubeedge/tree/master/cloud/pkg/synccontroller
ObjectSync 用于保存命名空间范围的对象。它们的名称由相关的节点名称和对象 UUID 组成,SyncController 将定期比较保存的 ObjectSync 对象中的已发送 resourceVersion 与 K8s 中的对象,然后触发诸如重试或删除之类的事件。当 cloudhub 将事件添加到 NodeMessagePool 时,它将与 NodeMessagePool 中的相应对象进行比较。如果 NodeMessagePool 中的对象较新,它将直接丢弃这些事件,否则 CloudHub 将消息发送到边缘侧。
Pasted image 20240313142006

edgehub#

EdgeHub 是一个 Web Socket 或者 Quic 协议的客户端,负责与云端 CloudCore 交互,包括同步云端资源更新、报告边缘主机和设备状态变化到云端等功能。
EdgeHub 在 EdgeCore 启动时通过 beehive 框架注册,并对 edgehub 进行了初始化。

func Register(eh *v1alpha2.EdgeHub,nodeName string){
	config.InitConfigure(eh,nodeName)
	core.Register(newEdgeHub(eh.Enable))
}

EdgeHub 启动代码如下所示:

func (eh *EdgeHub)Start(){
	eh.certManager = certificate.NewCertManager(config.config.EdgeHub,config.config.NodeName)
	eh.certManager.Start()
	for _, v := range GetCertSyncChannel(){
		v <- true
		close(v)
	}

	go eh.ifRotationDone()

	for{
		select{
			case <- beehiveContext.Done():
			klog.warning("EdgeHub stop")
			return 
		default:
		}
		err := eh.initial()
		if err != nil{
			klog.Exitf("failed to init controller:%v",err)
			return 
		}
		waitTime := time.Duration(config.Config.Heartbeat)*time.Second*2

		err = eh.chClient.Init()
		if err!= nil{
			klog.Errorf("connection failed: %v,will reconnect after %s",err,waitTime.String())
			time.Sleep(waitTime)
			continue
		}
		//execute hook func after connect
		eh.pubConnectInfo(true)
		go eh.routeToEdge()
		go eh.routeToCloud()
		go eh.keepalive()

		//wait the stop signal
		//stop authinfo manager/websocket connection
		<-eh.reconnectChan
		eh.chClient.UnInit()

		//execute hook function after disconnect
		eh.pubConnectInfo(false)

		//sleep one period of heartbeat, then try to connect cloud hub again 
		klog.warningf("connection is broken, will reconnect after %s",waitTime.String())
		time.Sleep(waitTime)

	//clean channel
	clean:
		for{
			select{
			case <- eh.reconnectChan:
			default:
				break clean
			}
		}	
	}
}

https://github.com/kubeedge/kubeedge/blob/master/edge/pkg/edgehub/edgehub.go

EdgeHub 的启动过程如下所示,主要包含以下步骤:

  1. 证书初始化,从 cloudcore 申请证书(若正确配置本地证书,则直接使用本地证书),启动证书轮转模式,然后进入循环
  2. 调用 eh.initial () 创建 eh.chClient,接着调用 eh.chClient.Init (),初始化过程通过 viaduct 库建立了 websocket/quic 的 connection
  3. 调用 eh.pubConnectInfo (true),向 edgecore 各模块广播已经连接成功的消息
  4. 接下来启动了三个协程:
    • routeToEdge
    • routeToCloud
    • keepalive

routeToEdge:接收云端发送下来的消息,如果是同步消息响应,则调用 beehive sendResp 发送响应,否则,根据消息的 group,发送到对应的 group

func (eh *EdgeHub)routeToEdge(){
	for{
		select{
		case<-beehiveContext.Done():
			klog.Warning("EdgeHub RouteToEdge stop")
			return 
		default:
		}
		message, err := eh.chClient.Receive()
		if err!=nil{
			klog.Errorf("failed to dispatch message,discard: %v",err)
		}
	}
}

routeToCloud:接收边缘侧其他 module 发送过来的消息,然后将消息通过 websocket/quic client 发送到云端

func (eh *EdgeHub)routeToCloud(){
	for{
		select{
		case<-beehiveContext.Done():
			klog.warning("EdgeHub RouteToCloud stop")
			return 
		default:
		}
		message,err:= beehiveContext.Receive(modules.EdgeHubModuleName)
		if err !=nil{
			klog.Errorf("failed to receive message from edge: %v",err)
			time.Sleep(time.Second)
			continue
		}

		err = eh.tryThrottle(message.GetID())
		if err !=nil{
			klog.Errorf("msgID: %s,client rate limiter returned an error: %v",message.GetID(),err)
			continue
		}

		//post message to cloud hub 
		err = eh.sendToCloud(message)
		if err !=nil{
			klog.Errorf("failed to send message to cloud: %v",err)
			eh.reconnectChan <- struct{}{}
			return 
		}
	}
}

keepalive:根据心跳周期定期向云端发送心跳信息

func (eh *EdgeHub)keepalive(){
	for{
		select{
		case <-beehiveContext.Done():
			klog.warning("EdgeHub KeepAlive stop")
			return 
		default:
		}
		msg := model.NewMessage("").
			BuildRouter(modules.EdgeHubModuleName,"resource","node",messagepkg.OperationKeepalive)
			FillBody("ping")

		//post message to cloud hub
		err := eh.sendToCloud(*msg)
		if err != nil{
			klog.Errorf("websocket write error: %v",err)
			eh.reconnectChan <- struct{}{}
			return 
		}
		time.Sleep(time.Duration(config.Config.Heartbeat)*time.Second)
	}
}
  1. 当云边消息传送过程中出现错误时,边缘部分会重新 init 相应的 websocket/quic client,与云端重新建立连接。
Loading...
Ownership of this post data is guaranteed by blockchain and smart contracts to the creator alone.