Garnet命令处理源码阅读

Garnet命令处理源码阅读

首页枪战射击源代码删除官网更新时间:2024-04-16

回顾一下上一篇在Server的Start的方法,订阅socketAsyncEventArgs的Completed事件,交给了recvEventArg_Completed方法进行处理。

void Start() { var receiveEventArgs = new SocketAsyncEventArgs { AcceptSocket = socket }; receiveEventArgs.SetBuffer(networkReceiveBuffer, 0, networkReceiveBuffer.Length); //此处订阅了接收消息的事件 receiveEventArgs.Completed = RecvEventArg_Completed; // if the client already have packets, avoid handling it here on the handler so we don't block future accepts. try { if (!socket.ReceiveAsync(receiveEventArgs)) Task.Run(() => RecvEventArg_Completed(null, receiveEventArgs)); } catch (Exception ex) { logger?.LogError(ex, "An error occurred at Start.ReceiveAsync"); Dispose(receiveEventArgs); } }

SocketAsyncEventArgs的Completed事件

public event EventHandler<SocketAsyncEventArgs>? Completed;

最开始接收信息是通过PortableThreadPool中调用WorkerThreadStart,开启一个工作线程,处理,由Execution的Run方法,通过回调执行了SocketAsyncEventArgs的ExecutionCallback方法。

private static void ExecutionCallback(object state) { SocketAsyncEventArgs thisRef = (SocketAsyncEventArgs)state; thisRef.OnCompletedInternal(); }

ExecutionCallback的OnCompletedInternal方法调用了OnCompleted

private void OnCompletedInternal() { if (LastOperation <= SocketAsyncOperation.Connect) { AfterConnectAcceptTelemetry(); } OnCompleted(this); }

从而触发了Completed事件。

protected virtual void OnCompleted(SocketAsyncEventArgs e) { this.Completed?.Invoke(e._currentSocket, e); }

所以消息过来就会执行RecvEventArg_Completed方法。

void RecvEventArg_Completed(object sender, SocketAsyncEventArgs e) { try { do { //忽略一段代码 OnNetworkReceive(e.BytesTransferred); e.SetBuffer(networkReceiveBuffer, networkbytesRead, networkReceiveBuffer.Length - networkbytesRead); } while (!e.AcceptSocket.ReceiveAsync(e)); } catch (Exception ex) { //忽略了一段代码 } }

调用了OnNetworkReceive(e.BytesTransferred);

public unsafe void OnNetworkReceive(int bytesTransferred) { //删除了一段代码,重点是这个方法 Process(); }

这个方法是NetworkHandler

unsafe void Process() { if (transportBytesRead > 0) { if (session != null || serverHook.TryCreateMessageConsumer(new Span<byte>(transportReceiveBufferPtr, transportBytesRead), GetNetworkSender(), out session)) TryProcessRequest(); } }

TryProcessRequest的定义

unsafe bool TryProcessRequest() { transportReadHead = session.TryConsumeMessages(transportReceiveBufferPtr transportReadHead, transportBytesRead - transportReadHead); // We cannot shift or double transport buffer if a read may be waiting on // the old transport buffer and offset. if (readerStatus == TlsReaderStatus.Rest) { ShiftTransportReceiveBuffer(); } return true; }

紧接着会调用IMessageCumsumer的TryConsumeMessages,然后调用ProcessMessages方法处理接收到的消息

public override int TryConsumeMessages(byte* reqBuffer, int bytesReceived) { bytesRead = bytesReceived; if (!txnManager.IsSkippingOperations()) readHead = 0; try { latencyMetrics?.Start(LatencyMetricsType.NET_RS_LAT); clusterSession?.AcquireCurrentEpoch(); recvBufferPtr = reqBuffer; networkSender.GetResponseObject(); //处理消息 ProcessMessages(); recvBufferPtr = null; } catch (Exception ex) { } finally { } }

ProcessMessages通过var CMD = _authenticator.IsAuthenticated ? FastParseCommand(ptr) : RespCommand.NOAUTH;这行代码解析出要处理的消息类型,然后调用ProcessBasicCommands(ptr, cmd, ref basicGarnetApi);进行命令处理。

private void ProcessMessages() { // #if DEBUG // logger?.LogTrace("RECV: [{recv}]", Encoding.UTF8.GetString(new Span<byte>(recvBufferPtr, bytesRead)).Replace("\n", "|").Replace("\r", "")); // #endif dcurr = networkSender.GetResponseObjectHead(); dend = networkSender.GetResponseObjectTail(); var _origReadHead = readHead; while (bytesRead - readHead >= 4) { var ptr = recvBufferPtr readHead; var cmd = _authenticator.IsAuthenticated ? FastParseCommand(ptr) : RespCommand.NOAUTH; if (cmd == RespCommand.NONE && MakeUpperCase(ptr)) continue; bool success; if (txnManager.state != TxnState.None) { if (txnManager.state == TxnState.Running) { success = ProcessBasicCommands(ptr, cmd, ref lockableGarnetApi); } else success = cmd switch { RespCommand.EXEC => NetworkEXEC(), RespCommand.MULTI => NetworkMULTI(), RespCommand.DISCARD => NetworkDISCARD(), _ => NetworkSKIP(cmd), }; } else { //处理命令行 success = ProcessBasicCommands(ptr, cmd, ref basicGarnetApi); } } }

可以看到ProcessBasicCommands方法,会根据解析出来的CMD的类型,分别调用不同的方法进行消息处理。

private bool ProcessBasicCommands<TGarnetApi>(byte* ptr, RespCommand cmd, ref TGarnetApi storageApi) where TGarnetApi : IGarnetApi { bool success = cmd switch { RespCommand.GET => NetworkGET(ptr, ref storageApi), RespCommand.SET => NetworkSET(ptr, ref storageApi), RespCommand.SETEX => NetworkSETEX(ptr, false, ref storageApi), RespCommand.PSETEX => NetworkSETEX(ptr, true, ref storageApi), RespCommand.SETEXNX => NetworkSETEXNX(ptr, ref storageApi), RespCommand.DEL => NetworkDEL(ptr, ref storageApi), RespCommand.RENAME => NetworkRENAME(ptr, ref storageApi), RespCommand.EXISTS => NetworkEXISTS(ptr, ref storageApi), RespCommand.EXPIRE => NetworkEXPIRE(ptr, RespCommand.EXPIRE, ref storageApi), RespCommand.PEXPIRE => NetworkEXPIRE(ptr, RespCommand.PEXPIRE, ref storageApi), RespCommand.PERSIST => NetworkPERSIST(ptr, ref storageApi), RespCommand.GETRANGE => NetworkGetRange(ptr, ref storageApi), RespCommand.TTL => NetworkTTL(ptr, RespCommand.TTL, ref storageApi), RespCommand.PTTL => NetworkTTL(ptr, RespCommand.PTTL, ref storageApi), RespCommand.SETRANGE => NetworkSetRange(ptr, ref storageApi), RespCommand.GETDEL => NetworkGETDEL(ptr, ref storageApi), RespCommand.APPEND => NetworkAppend(ptr, ref storageApi), RespCommand.INCR => NetworkIncrement(ptr, RespCommand.INCR, ref storageApi), RespCommand.INCRBY => NetworkIncrement(ptr, RespCommand.INCRBY, ref storageApi), RespCommand.DECR => NetworkIncrement(ptr, RespCommand.DECR, ref storageApi), RespCommand.DECRBY => NetworkIncrement(ptr, RespCommand.DECRBY, ref storageApi), RespCommand.SETBIT => StringSetBit(ptr, ref storageApi), RespCommand.GETBIT => StringGetBit(ptr, ref storageApi), RespCommand.BITCOUNT => StringBitCount(ptr, ref storageApi), RespCommand.BITPOS => StringBitPosition(ptr, ref storageApi), RespCommand.PUBLISH => NetworkPUBLISH(ptr), RespCommand.PING => NetworkPING(), RespCommand.ASKING => NetworkASKING(), RespCommand.MULTI => NetworkMULTI(), RespCommand.EXEC => NetworkEXEC(), RespCommand.UNWATCH => NetworkUNWATCH(), RespCommand.DISCARD => NetworkDISCARD(), RespCommand.QUIT => NetworkQUIT(), RespCommand.RUNTXP => NetworkRUNTXPFast(ptr), RespCommand.READONLY => NetworkREADONLY(), RespCommand.READWRITE => NetworkREADWRITE(), _ => ProcessArrayCommands(ref storageApi), }; return success; }

最终会交给GarnetApi这个类下定义的各种处理方法进行命令处理,感兴趣的可以自行去github继续深入阅读其中的细节,这里就不作详细展开了。

查看全文
大家还看了
也许喜欢
更多游戏

Copyright © 2024 妖气游戏网 www.17u1u.com All Rights Reserved