研究数据结构QuestDB是一个时间序列数据库,提供快速的摄取速度、InfluxDB 线路协议和 PGWire 支持以及 SQL 查询语法。QuestDB 主要是用 Java 编写的,我们学到了很多困难而有趣的教训。我们很高兴与您分享。
并发数据结构设计很难。该博客提供了有关构建非常有利于读者的专用并发地图的指导。本文不仅会介绍另一种现成的数据结构。相反,我将引导您完成设计过程,同时解决实际问题。我什至会介绍我一路上遇到的死胡同。对于对并发编程感兴趣的程序员来说,这是一个侦探故事。
在本文结束时,我们将拥有一个用于在本机内存中存储数据 blob 的并发映射。该映射在读取路径上是无锁的,并且在内存分配方面也非常保守。让我们开始吧!
本文假设您具备 Java 或类 Java 编程语言的基础知识。
问题我需要一个并发映射,其中键是字符串,值是固定大小的 blob(公共加密密钥)。这听起来像是JDK 中普通的旧式 ConcurentHashMap 的工作,但有一点不同:blob 必须在 Java 堆之外可用。
为什么?这样调用者就可以获取指向 blob 的指针并通过 JNI 将其传递给 Rust 代码。然后 Rust 代码使用公钥来验证数字签名。
这是该界面的简化版本:
interface ConcurrentString2KeyMap {
void set(String username, long keyPtr);
long get(String username);
void remove(String username);
}
该set()方法接收用户名和指向密钥的指针。映射的寿命比它接收到的指针的寿命长,因此它必须将接收到的指针下的内存复制到自己的缓冲区中。换句话说:该get()方法必须返回一个指向该内部缓冲区的指针,而不是用于 的原始指针set()。
我可以假设该get()方法将在热路径上频繁使用,而突变方法将很少被调用,并且永远不会在热路径上被调用。
读者大致将如何使用它:
boolean verifySignature(CharSequence username, long challengePtr, int
challengeLen, long signaturePtr, int signatureLen) {
long keyPtr = map.get(username);
return AuthCrypto.verifySignature(keyPtr, PUBLIC_KEY_SIZE_BYTES, challengePtr, challengeLen, signaturePtr, signatureLen);
}
如果没有突变,我可以只实现一个预先填充的不可变查找目录,然后就到此为止了。然而,共享可变状态带来了两类挑战:
第一个问题归结为确保当 amap.get()返回指针时,该指针必须保持有效,并且其后面的内存在需要的时间内不得更改。在我们的例子中,这意味着直到AuthCrypto.verifySignature()返回。
第二个问题是关于并发数据结构设计的,我们稍后将更详细地讨论这一点。我们
先探讨第一个问题。
如果我们的映射值只是 JVM 管理的常规对象,那么事情可能会很简单:map.get()返回对对象的引用,然后它可能会忘记get()曾经发生过这个调用。和方法只会删除映射对值对象的引用,并且永远不会更改已返回的对象remove()。set()简单的。但这不是我们的情况,我们正在使用堆外内存,并且必须自己管理它。
从根本上来说,有两种方法可以解决:
第一个选项看起来很有趣。新合同可能如下所示:
interface ConcurrentString2KeyMap {
void set(String username, long srcKeyPtr);
void get(String username, long dstKeyPtr);
void remove(String username);
}
调用者将拥有该dstKeyPtr指针,并且映射会将密钥从其内部复制到该指针,并忘记get()曾经发生过此调用。
起初这听起来相当不错,直到我们意识到它只是把罐头踢了下去:它强制每个调用线程维护自己的缓冲区以传递给get(). 如果调用者都是单线程的,这仍然很容易:每个调用对象都拥有一个要传递给 的缓冲区get()。
但如果调用函数本身是并发的,那就会变得更加复杂。我们必须确保每个调用线程使用不同的缓冲区。
理想情况下,缓冲区应该在堆栈上分配,但这是 Java,所以这是不可能的。我们当然不希望
为每次调用在进程堆中分配/取消分配新的缓冲区。
那么还剩下什么呢?汇集?那很乱。
线程局部?更混乱且更难限制缓冲区的数量。
也许选项 1 并不像乍看起来那样有趣。
选项 2:生命周期通知让我们探讨第二个选项。合同与原始提案中概述的相同:long get(String username)。我们必须确保指针后面的内存保持不变,直到完成为止。
最简单的事情就是使用读写锁。
每个映射都会有一个关联的读写锁,然后读者在调用之前获取读锁get(),并仅在从 返回后释放它AuthCrypto.verifySignature():
boolean verifySignature(CharSequence username, long challengePtr, int
challengeLen, long signaturePtr, int signatureLen) {
map.acquireReadLock();
try {
long keyPtr = map.get(username);
return AuthCrypto.verifySignature(keyPtr, PUBLIC_KEY_SIZE_BYTES, challengePtr, challengeLen, signaturePtr, signatureLen);
} finally {
map.releaseReadLock();
}
}
变异器只需在调用set()or之前获取写锁remove()。这种设计不仅推理简单,而且实现起来也很简单。
假设只set()改变remove()内部状态,我们可以采用单线程映射实现,它就会做到这一点。但有一个问题...它违反了我们最初的要求!
读者经常处于热路径上,我们希望他们保持无锁状态。所提出的设计会在地图更新时阻止读者,因此这是不行的。
我们可以做什么?我们可以将锁定模式更改为更细粒度 - 我们可以锁定特定条目,而不是锁定整个映射。虽然这会改善实际行为,但也会使地图设计复杂化,并且当更新相同的密钥时,读者仍然可能被阻止。
还有什么?我们可以使用乐观锁定模式,但这
会带来其自身的复杂性。
越来越明显的是,指针生命周期管理必须与内部映射实现协同工作。那么这次的演习就完全没有结果了吗?不完全的。
我们仍然可以重用一种设计思想:地图用户必须
明确通知他们不再使用指针。
让我们探索如何
设计地图内部!
我认为自己是一位经验丰富的多面手。我对并发编程、分布式系统和各种其他领域有所了解,但我并不真正专注于任何特定主题。
是万事通却一事无成?大概。因此,当我在考虑合适的数据结构时,我做了每个通才在 2023 年都会做的事情:问 ChatGPT!
我很惊讶 GPT 意识到我的意思是写“单一作者”而不是“单一读者”,我认为这是 GPT 知道它在说什么的证据! 所以我进一步阅读:我以前可能听说过 RCU,但我自己从未使用过它。我发现这个描述有点太模糊了,无法用作实施指南,而且无论如何,那是午餐时间。
写时复制插曲当我走向一个吃午饭的地方时,我思考了更多,并有了一个想法。为什么不使用Copy-On-Write技术来实现持久映射?
这样,我就可以采用常规的单线程映射,并且变异器将克隆当前映射,执行其操作,然后以原子方式将这个新创建的映射设置为读者的映射。然后,读者将使用最新发布的地图。已发布的映射是不可变的,因此对于并发读取者来说始终是安全的,即使来自多个线程,也是无锁的。事实上,甚至无需等待。耶!
此外,当过时的(=不再是最新发布的地图)地图没有读者时,我们必须引入一种安全地重新分配地图内部缓冲区的机制。否则,我们就会泄漏内存。这是一个复杂的问题,但感觉只要有足够的奉献精神和原子引用计数器就可以轻松修复。
所以这一切听起来不错,但正如所预料的那样……仍然有一个问题。我们需要为每个突变的映射内容分配一块内存。我们说过突变很罕见,所以也许这没什么大不了的?也许不是,但 QuestDB 设计原则之一是对内存分配持保守态度,因为它们会消耗 CPU 周期、内存带宽,导致 CPU 缓存抖动,并且通常会引入不可预测的行为。
回到绘图板:地图回收因此,我无法实现简单的“写时复制”映射,但我觉得我正走在实现目标的正确道路上:无锁读取器。在某些时候,我意识到,我可以只重复使用 2 个地图,而不是在发生更改时分配新地图:一个可供读者使用,另一个可供作者使用。
一旦地图向读者发布,只要至少有一个读者仍在访问它,它就保证是不可变的。
它看起来类似于:
class ConcurrentMap {
private InternalMap readerMap = new ...
private InternalMap writerMap = new ...
void set(String username, long keyPtr) {
getMapForWriters().set(username, keyPtr);
swapMaps();
}
long get(String username) {
return getMapForReaders().get(username);
}
void remove(String username) {
getMapForWriters().remove(username);
swapMaps();
}
}
这个想法看起来很简洁,但很明显,上面概述的代码存在许多问题和悬而未决的问题:
让我们从问题#2 开始——多个变异线程。
我们说过突变是罕见的,而且从来不会出现在热门路径上。因此,我们可以采取残酷的做法并使用简单的互斥体 - 以确保始终最多只有一个变异器。无论如何,单写入器原则可以简化并发算法的设计。
因此地图现在看起来像这样:
class ConcurrentMap {
private InternalMap readerMap = new ...
private InternalMap writerMap = new ...
private final Object writeMutex = new Object();
void set(String username, long keyPtr) {
synchronized(writeMutex) {
getMapForWriters().set(username, keyPtr);
swapMaps();
}
}
long get(String username) {
return getMapForReaders().get(username);
}
void remove(String username) {
synchronized(writeMutex) {
getMapForWriters().remove(username);
swapMaps();
}
}
}
那很简单。也许暴力,但很容易。
赛车线让我们探讨一些更复杂的问题 - 问题#3 - 多个连续的写入操作。我这是什么意思?
考虑这种情况:
如果场景看起来太长且无聊并且您跳过了它,这里有一个简短的摘要:读者获得mapForReaders并在下一刻这张地图变成writerMap。所以曾经的 areaderMap现在是 a writerMap,因此下一个写操作可以随意改变它。除了陈旧的读者仍然认为同一张地图可以安全阅读。这是一个严重的并发错误!
我们如何才能防止上述的不良情况发生?我们已经在写入路径上使用了互斥体,这几乎是最糟糕的。几乎?!我们还能更恶心吗?我们当然可以!
每个内部映射都可以有一个读取器计数器,并且getMapForWriters()在当前的读取器计数器mapForWriters达到 0 之前不会返回。换句话说:写入器不会发生变异,writerMap直到所有读取器表明他们不再使用该映射。
新来的读者怎么样?新读者根本不接触writerMap,他们总是加载电流readerMap,所以这不是问题。
说够了!让我们看一些代码:
class ConcurrentMap {
private InternalMap readerMap = new InternalMap();
private InternalMap writerMap = new InternalMap();
private final Object writeMutex = new Object();
void set(String username, long keyPtr) {
synchronized(writeMutex) {
getMapForWriters().set(username, keyPtr);
swapMaps();
}
}
Reader concurrentReader() {
InternalMap map;
for (;;) {
map = readerMap;
map.readerArrived();
if (map == readerMap) {
return map;
}
map.readerGone();
}
}
private InternalMap getMapForWriters() {
InternalMap map = writerMap;
while(map.hasReaders()) {
backoff();
}
return map;
}
void remove(String username) {
synchronized(writeMutex) {
getMapForWriters().remove(username);
swapMaps();
}
}
interface Reader {
long get(String username);
void readerGone();
}
static class InternalMap implement Reader {
private final AtomicInteger readerCounter;
public void readerGone() {
readerCounter.decrement();
}
public void readerArrived() {
readerCounter.increment();
}
public boolean hasReaders() {
return readerCounter.get() > 0;
}
// the rest of a single threaded map impl
}
}
上面的代码看起来比之前有问题的版本要复杂得多。
让我们简要回顾一下这些变化:
让我们仔细看看每一个变化。
为什么我们要引入Reader接口?这难道不是一种不必要的复杂化,也是 Java 文化中普遍存在的过度设计的一个例子吗?
好吧,也许吧,但它简化了读者通知映射他们将不再访问返回的指针的机制。
如何?每个内部地图都有自己的读者计数器。当读者不再需要先前返回的指针时,它必须在正确的内部映射上get()调用。readerGone()
该Reader接口正是这样做的——它知道正在使用哪个
实例。InternalMap当线程调用 时reader.readerGone(),它会减少该映射上的读取器计数器。
例如:
boolean verifySignature(CharSequence username, long challengePtr, int
challengeLen, long signaturePtr, int signatureLen) {
ConcurrentMap.Reader reader = map.concurrentReader();
try {
long keyPtr = map.get(username);
return AuthCrypto.verifySignature(keyPtr, [...]);
} finally {
reader.readerGone();
}
}
希望这能让我们更清楚为什么我们需要这个Reader接口。
题外话:你还记得读写锁的设计思想吗?我决定不使用它,因为它可能会阻止读者。但锁的使用模式是这种通知机制的灵感来源。
避免先检查后行动的错误我们重点关注concurrentReader()方法的实现。
它看起来像这样:
Reader concurrentReader() {
InternalMap map;
for (;;) {
map = readerMap;
map.readerArrived();
if (map == readerMap) {
return map;
}
map.readerGone();
}
}
它加载 current readerMap,增加其读取器计数器,并且当且仅当该readerMap字段仍然指向同一InternalMap实例时将其返回给调用者。否则,它会递减读取器计数器以撤消增量,并从头开始重试所有操作。
为什么这么复杂?为什么我们需要重试机制?这是为了保护我们免受我们已经讨论过的陈旧读者的类似问题的影响。
考虑这个更简单的实现concurrentReader():
Reader concurrentReader() { // buggy!
InternalMap map = readerMap;
map.readerArrived(); // increment the reader counter
return map;
}
// getMapForWriters() shown for reference only
private InternalMap getMapForWriters() {
InternalMap map = writerMap;
while (map.hasReaders()) {
backoff();
}
return map;
}
细分下来,我们看到:
额外的签到concurrentReader()是为了防止上述情况的发生。它保证它增加了地图上的读者计数器,该计数器仍然是当前的readerMap:
Reader concurrentReader() {
InternalMap map;
for (;;) {
map = readerMap;
map.readerArrived(); // increment the reader counter
if (map == readerMap) {
return map;
}
map.readerGone();
}
}
读取器线程仍然有可能增加读取器计数器,将 . 返回Reader给调用者,并且在下一微秒内写入器线程交换映射,因此返回给调用者的映射实例现在被设置为writerMap. 这是完全可能的,但不会造成任何损害。在读取器计数器达到零之前,写入
器将无法访问。writerMap
至此我们解决了并发算法最难的部分,但是还有一些问题没有解决:
我们可以解决第一个问题。交换地图后,我们可以等到当前writerMap没有读者再更新。
所以变异操作看起来像这样:
void set(String username, long keyPtr) {
synchronized(writeMutex) {
getMapForWriters().set(username, keyPtr);
swapMaps();
getMapForWriters().set(username, keyPtr);
}
}
这是一个安全的实现,因为可以getMapForWriters()保证返回的映射没有读取器,并且在下一次交换之前不会有新的读取器到达。
另一方面,它效率低下:当我们在写入后切换映射时,新映射writerMap可能会有陈旧的读取器,从而导致延迟,直到它们被清除。
有更好的选择吗?事实证明是有的!
我们可以更改第一个地图,交换它们并记住包括所有参数在内的操作。在下一次突变期间,我们将重播对 的操作mapForWriters,如果
突变足够罕见,那么当我们重播操作时, 已writerMap不再有任何读者。
我们看一下代码:
void set(String username, long keyPtr) {
synchronized(writeMutex) {
InternalMap map = getMapForWriters();
replayLastOperationOn(map);
map.set(username, keyPtr);
swapMaps();
rememberSetOperation(username, keyPtr);
}
}
看起来map.remove()像这样:
void remove(String username) {
synchronized(writeMutex) {
InternalMap map = getMapForWriters();
replayLastOperationOn(map);
map.remove(username);
swapMaps();
rememberRemoveOperation(username);
}
}
rememberSetOperation()必须将指针下的内存复制到自己的缓冲区,但我们只需要记住单个操作。鉴于我们的 blob 是固定大小的,它允许我们继续重用相同的重播缓冲区。零分配。
遵循 Java 内存模型规则现在让我们做最后一个重要的改变。
看起来是这样的ConcurrentMap:
class ConcurrentMap {
private InternalMap readerMap = new InternalMap();
private InternalMap writerMap = new InternalMap();
private final Object writeMutex = new Object();
private final WriterOperation lastWriterOperation;
[...]
}
整个可变状态被封装在这 4 个对象中。字段writerMap和lastWriterOperations只能由持有互斥锁的编写器线程访问。但该readerMap字段是由编写器线程设置,然后由读取器加载。
读取器是无锁的,在访问读取器映射之前它们不会获取任何互斥体。这是一场数据竞争,可能会导致可见性问题。
修复很简单,只需将其标记readerMap为volatile:
class ConcurrentMap {
private volatile InternalMap readerMap = new InternalMap();
private InternalMap writerMap = new InternalMap();
private final Object writeMutex = new Object();
private final WriterOperation lastWriterOperation;
[...]
编写器路径现在如下所示:
现在被readerMap标记为易失性,为我们提供了顺序一致性。
通俗地说,读者将看到编写器线程完成的最新地图交换。读者还可以保证看到在编写器线程将地图设置为 之前执行的所有更改mapForReaders。就是这样!
概括我们经历了设计读取路径上无锁的并发数据结构的过程。我们可以将我们应用的一些设计原则概括为以下规则:
我们有并发映射的有效实现,但尚未准备好投入生产。还有一些问题需要解决:
完成实施后,我非常兴奋,想与世界分享。我天真地以为我是第一个想出这个想法的人。我错了。
首先,我在令人惊叹的Concurrency Freaks博客中发现了双实例锁定模式。这种模式与我在这里描述的模式非常相似。它还使用两种内部结构,读者可以在它们之间交替使用。它使用读写锁来保护映射被改变。假设只有一个写入者,那么在任何给定时间都至少有一个内部映射可供读取。这为读者提供了锁定自由。
公平地说,双实例锁定
模式更容易推理。它更好地分解问题。但我仍然认为我的贡献是延迟重播上次操作的技巧 - 如果写入者足够稀有,那么写入者根本不会被阻止。
关注并回复1即可领取【Java学习资料大礼包】
Copyright © 2024 妖气游戏网 www.17u1u.com All Rights Reserved