回顾一下上一篇在Server的Start的方法,订阅socketAsyncEventArgs的Completed事件,交给了recvEventArg_Completed方法进行处理。
void Start()
{
var receiveEventArgs = new SocketAsyncEventArgs { AcceptSocket = socket };
receiveEventArgs.SetBuffer(networkReceiveBuffer, 0, networkReceiveBuffer.Length);
//此处订阅了接收消息的事件
receiveEventArgs.Completed = RecvEventArg_Completed;
// if the client already have packets, avoid handling it here on the handler so we don't block future accepts.
try
{
if (!socket.ReceiveAsync(receiveEventArgs))
Task.Run(() => RecvEventArg_Completed(null, receiveEventArgs));
}
catch (Exception ex)
{
logger?.LogError(ex, "An error occurred at Start.ReceiveAsync");
Dispose(receiveEventArgs);
}
}
SocketAsyncEventArgs的Completed事件
public event EventHandler<SocketAsyncEventArgs>? Completed;
最开始接收信息是通过PortableThreadPool中调用WorkerThreadStart,开启一个工作线程,处理,由Execution的Run方法,通过回调执行了SocketAsyncEventArgs的ExecutionCallback方法。
private static void ExecutionCallback(object state)
{
SocketAsyncEventArgs thisRef = (SocketAsyncEventArgs)state;
thisRef.OnCompletedInternal();
}
ExecutionCallback的OnCompletedInternal方法调用了OnCompleted
private void OnCompletedInternal()
{
if (LastOperation <= SocketAsyncOperation.Connect)
{
AfterConnectAcceptTelemetry();
}
OnCompleted(this);
}
从而触发了Completed事件。
protected virtual void OnCompleted(SocketAsyncEventArgs e)
{
this.Completed?.Invoke(e._currentSocket, e);
}
所以消息过来就会执行RecvEventArg_Completed方法。
void RecvEventArg_Completed(object sender, SocketAsyncEventArgs e)
{
try
{
do
{
//忽略一段代码
OnNetworkReceive(e.BytesTransferred);
e.SetBuffer(networkReceiveBuffer, networkbytesRead, networkReceiveBuffer.Length - networkbytesRead);
} while (!e.AcceptSocket.ReceiveAsync(e));
}
catch (Exception ex)
{
//忽略了一段代码
}
}
调用了OnNetworkReceive(e.BytesTransferred);
public unsafe void OnNetworkReceive(int bytesTransferred)
{
//删除了一段代码,重点是这个方法
Process();
}
这个方法是NetworkHandler
unsafe void Process()
{
if (transportBytesRead > 0)
{
if (session != null || serverHook.TryCreateMessageConsumer(new Span<byte>(transportReceiveBufferPtr, transportBytesRead), GetNetworkSender(), out session))
TryProcessRequest();
}
}
TryProcessRequest的定义
unsafe bool TryProcessRequest()
{
transportReadHead = session.TryConsumeMessages(transportReceiveBufferPtr transportReadHead, transportBytesRead - transportReadHead);
// We cannot shift or double transport buffer if a read may be waiting on
// the old transport buffer and offset.
if (readerStatus == TlsReaderStatus.Rest)
{
ShiftTransportReceiveBuffer();
}
return true;
}
紧接着会调用IMessageCumsumer的TryConsumeMessages,然后调用ProcessMessages方法处理接收到的消息
public override int TryConsumeMessages(byte* reqBuffer, int bytesReceived)
{
bytesRead = bytesReceived;
if (!txnManager.IsSkippingOperations())
readHead = 0;
try
{
latencyMetrics?.Start(LatencyMetricsType.NET_RS_LAT);
clusterSession?.AcquireCurrentEpoch();
recvBufferPtr = reqBuffer;
networkSender.GetResponseObject();
//处理消息
ProcessMessages();
recvBufferPtr = null;
}
catch (Exception ex)
{
}
finally
{
}
}
ProcessMessages通过var CMD = _authenticator.IsAuthenticated ? FastParseCommand(ptr) : RespCommand.NOAUTH;这行代码解析出要处理的消息类型,然后调用ProcessBasicCommands(ptr, cmd, ref basicGarnetApi);进行命令处理。
private void ProcessMessages()
{
// #if DEBUG
// logger?.LogTrace("RECV: [{recv}]", Encoding.UTF8.GetString(new Span<byte>(recvBufferPtr, bytesRead)).Replace("\n", "|").Replace("\r", ""));
// #endif
dcurr = networkSender.GetResponseObjectHead();
dend = networkSender.GetResponseObjectTail();
var _origReadHead = readHead;
while (bytesRead - readHead >= 4)
{
var ptr = recvBufferPtr readHead;
var cmd = _authenticator.IsAuthenticated ? FastParseCommand(ptr) : RespCommand.NOAUTH;
if (cmd == RespCommand.NONE && MakeUpperCase(ptr)) continue;
bool success;
if (txnManager.state != TxnState.None)
{
if (txnManager.state == TxnState.Running)
{
success = ProcessBasicCommands(ptr, cmd, ref lockableGarnetApi);
}
else success = cmd switch
{
RespCommand.EXEC => NetworkEXEC(),
RespCommand.MULTI => NetworkMULTI(),
RespCommand.DISCARD => NetworkDISCARD(),
_ => NetworkSKIP(cmd),
};
}
else
{
//处理命令行
success = ProcessBasicCommands(ptr, cmd, ref basicGarnetApi);
}
}
}
可以看到ProcessBasicCommands方法,会根据解析出来的CMD的类型,分别调用不同的方法进行消息处理。
private bool ProcessBasicCommands<TGarnetApi>(byte* ptr, RespCommand cmd, ref TGarnetApi storageApi)
where TGarnetApi : IGarnetApi
{
bool success = cmd switch
{
RespCommand.GET => NetworkGET(ptr, ref storageApi),
RespCommand.SET => NetworkSET(ptr, ref storageApi),
RespCommand.SETEX => NetworkSETEX(ptr, false, ref storageApi),
RespCommand.PSETEX => NetworkSETEX(ptr, true, ref storageApi),
RespCommand.SETEXNX => NetworkSETEXNX(ptr, ref storageApi),
RespCommand.DEL => NetworkDEL(ptr, ref storageApi),
RespCommand.RENAME => NetworkRENAME(ptr, ref storageApi),
RespCommand.EXISTS => NetworkEXISTS(ptr, ref storageApi),
RespCommand.EXPIRE => NetworkEXPIRE(ptr, RespCommand.EXPIRE, ref storageApi),
RespCommand.PEXPIRE => NetworkEXPIRE(ptr, RespCommand.PEXPIRE, ref storageApi),
RespCommand.PERSIST => NetworkPERSIST(ptr, ref storageApi),
RespCommand.GETRANGE => NetworkGetRange(ptr, ref storageApi),
RespCommand.TTL => NetworkTTL(ptr, RespCommand.TTL, ref storageApi),
RespCommand.PTTL => NetworkTTL(ptr, RespCommand.PTTL, ref storageApi),
RespCommand.SETRANGE => NetworkSetRange(ptr, ref storageApi),
RespCommand.GETDEL => NetworkGETDEL(ptr, ref storageApi),
RespCommand.APPEND => NetworkAppend(ptr, ref storageApi),
RespCommand.INCR => NetworkIncrement(ptr, RespCommand.INCR, ref storageApi),
RespCommand.INCRBY => NetworkIncrement(ptr, RespCommand.INCRBY, ref storageApi),
RespCommand.DECR => NetworkIncrement(ptr, RespCommand.DECR, ref storageApi),
RespCommand.DECRBY => NetworkIncrement(ptr, RespCommand.DECRBY, ref storageApi),
RespCommand.SETBIT => StringSetBit(ptr, ref storageApi),
RespCommand.GETBIT => StringGetBit(ptr, ref storageApi),
RespCommand.BITCOUNT => StringBitCount(ptr, ref storageApi),
RespCommand.BITPOS => StringBitPosition(ptr, ref storageApi),
RespCommand.PUBLISH => NetworkPUBLISH(ptr),
RespCommand.PING => NetworkPING(),
RespCommand.ASKING => NetworkASKING(),
RespCommand.MULTI => NetworkMULTI(),
RespCommand.EXEC => NetworkEXEC(),
RespCommand.UNWATCH => NetworkUNWATCH(),
RespCommand.DISCARD => NetworkDISCARD(),
RespCommand.QUIT => NetworkQUIT(),
RespCommand.RUNTXP => NetworkRUNTXPFast(ptr),
RespCommand.READONLY => NetworkREADONLY(),
RespCommand.READWRITE => NetworkREADWRITE(),
_ => ProcessArrayCommands(ref storageApi),
};
return success;
}
最终会交给GarnetApi这个类下定义的各种处理方法进行命令处理,感兴趣的可以自行去github继续深入阅读其中的细节,这里就不作详细展开了。
Copyright © 2024 妖气游戏网 www.17u1u.com All Rights Reserved