我写的由 GCD 代码支持的 reader writer lock 在并行测试中导致死锁
The reader writer lock I wrote backed by GCD code causes a deadlock in parallel test
我在 GCD 中实现了这个 reader/writer 锁,但它在并行测试中失败了。我可以得到失败原因的解释吗?
这是为了 iOS 开发。该代码基于 Objective C。为了数据保护,我在 GCD 中写了一个带 reader/writer 锁的 RWCache。
@interface RWCache : NSObject
- (void)setObject:(id)object forKey:(id <NSCopying>)key;
- (id)objectForKey:(id <NSCopying>)key;
@end
@interface RWCache()
@property (nonatomic, strong) NSMutableDictionary *memoryStorage;
@property (nonatomic, strong) dispatch_queue_t storageQueue;
@end
@implementation RWCache
- (instancetype)init {
self = [super init];
if (self) {
_memoryStorage = [NSMutableDictionary new];
_storageQueue = dispatch_queue_create("Storage Queue", DISPATCH_QUEUE_CONCURRENT);
}
return self;
}
- (void)setObject:(id)object forKey:(id <NSCopying>)key {
dispatch_barrier_async(self.storageQueue, ^{
self.memoryStorage[key] = object;
});
}
- (id)objectForKey:(id <NSCopying>)key {
__block id object = nil;
dispatch_sync(self.storageQueue, ^{
object = self.memoryStorage[key];
});
return object;
}
@end
int main(int argc, const char * argv[]) {
@autoreleasepool {
RWCache *cache = [RWCache new];
dispatch_queue_t testQueue = dispatch_queue_create("Test Queue", DISPATCH_QUEUE_CONCURRENT);
dispatch_group_t group = dispatch_group_create();
for (int i = 0; i < 100; i++) {
dispatch_group_async(group, testQueue, ^{
[cache setObject:@(i) forKey:@(i)];
});
dispatch_group_async(group, testQueue, ^{
[cache objectForKey:@(i)];
});
}
dispatch_group_wait(group, DISPATCH_TIME_FOREVER);
}
return 0;
}
如果没有死锁,程序会0退出,否则程序会挂起不退出。
问题不在于 reader/writer 模式, 本身, 而是因为此代码中的一般线程爆炸。请参阅 WWDC 2015 视频中的“线程爆炸导致死锁”讨论 Building Responsive and Efficient Apps with GCD. The WWDC 2016 Concurrent Programming With GCD in Swift 3 也是一个不错的视频。在这两个链接中,我都会让您离开视频的相关部分,但两者都值得完整观看。
最重要的是,您正在耗尽 GCD 线程池中非常有限的工作线程。只有 64 个。但是你有 100 个带有障碍的写入,这意味着调度块不能 运行 直到该队列上的其他所有内容都完成。它们散布着 100 次读取,因为它们是同步的,所以会阻塞您从中调度它的工作线程,直到它 returns.
让我们将其简化为更简单的问题:
dispatch_queue_t queue1 = dispatch_queue_create("queue1", DISPATCH_QUEUE_CONCURRENT);
dispatch_queue_t queue2 = dispatch_queue_create("queue2", DISPATCH_QUEUE_CONCURRENT);
for (int i = 0; i < 100; i++) {
dispatch_async(queue2, ^{
dispatch_barrier_async(queue1, ^{
NSLog(@"barrier async %d", i);
});
dispatch_sync(queue1, ^{
NSLog(@"sync %d", i);
});
});
}
NSLog(@"done dispatching all blocks to queue1");
这会产生如下内容:
starting
done dispatching all blocks to queue1
barrier async 0
sync 0
它死锁了。
但是如果我们限制它不超过,比如 30 个项目可以同时在 queue2
上 运行,那么问题就消失了:
dispatch_queue_t queue1 = dispatch_queue_create("queue1", DISPATCH_QUEUE_CONCURRENT);
dispatch_queue_t queue2 = dispatch_queue_create("queue2", DISPATCH_QUEUE_CONCURRENT);
dispatch_semaphore_t semaphore = dispatch_semaphore_create(30);
for (int i = 0; i < 100; i++) {
dispatch_semaphore_wait(semaphore, DISPATCH_TIME_FOREVER);
dispatch_async(queue2, ^{
dispatch_barrier_async(queue1, ^{
NSLog(@"barrier async %d", i);
});
dispatch_sync(queue1, ^{
NSLog(@"sync %d", i);
});
dispatch_semaphore_signal(semaphore);
});
}
NSLog(@"done dispatching all blocks to queue1");
或者,另一种方法是使用 dispatch_apply
,这实际上是一个并行化的 for
循环,但在任何给定时刻将并发任务的数量限制为您机器上的核心数量(保持我们远低于耗尽工作线程的阈值):
dispatch_queue_t queue1 = dispatch_queue_create("queue1", DISPATCH_QUEUE_CONCURRENT);
dispatch_queue_t queue2 = dispatch_queue_create("queue2", DISPATCH_QUEUE_CONCURRENT);
dispatch_apply(100, queue2, ^(size_t i) {
dispatch_barrier_async(queue1, ^{
NSLog(@"barrier async %ld", i);
});
dispatch_sync(queue1, ^{
NSLog(@"sync %ld", i);
});
});
NSLog(@"done dispatching all blocks to queue1");
我在 GCD 中实现了这个 reader/writer 锁,但它在并行测试中失败了。我可以得到失败原因的解释吗?
这是为了 iOS 开发。该代码基于 Objective C。为了数据保护,我在 GCD 中写了一个带 reader/writer 锁的 RWCache。
@interface RWCache : NSObject
- (void)setObject:(id)object forKey:(id <NSCopying>)key;
- (id)objectForKey:(id <NSCopying>)key;
@end
@interface RWCache()
@property (nonatomic, strong) NSMutableDictionary *memoryStorage;
@property (nonatomic, strong) dispatch_queue_t storageQueue;
@end
@implementation RWCache
- (instancetype)init {
self = [super init];
if (self) {
_memoryStorage = [NSMutableDictionary new];
_storageQueue = dispatch_queue_create("Storage Queue", DISPATCH_QUEUE_CONCURRENT);
}
return self;
}
- (void)setObject:(id)object forKey:(id <NSCopying>)key {
dispatch_barrier_async(self.storageQueue, ^{
self.memoryStorage[key] = object;
});
}
- (id)objectForKey:(id <NSCopying>)key {
__block id object = nil;
dispatch_sync(self.storageQueue, ^{
object = self.memoryStorage[key];
});
return object;
}
@end
int main(int argc, const char * argv[]) {
@autoreleasepool {
RWCache *cache = [RWCache new];
dispatch_queue_t testQueue = dispatch_queue_create("Test Queue", DISPATCH_QUEUE_CONCURRENT);
dispatch_group_t group = dispatch_group_create();
for (int i = 0; i < 100; i++) {
dispatch_group_async(group, testQueue, ^{
[cache setObject:@(i) forKey:@(i)];
});
dispatch_group_async(group, testQueue, ^{
[cache objectForKey:@(i)];
});
}
dispatch_group_wait(group, DISPATCH_TIME_FOREVER);
}
return 0;
}
如果没有死锁,程序会0退出,否则程序会挂起不退出。
问题不在于 reader/writer 模式, 本身, 而是因为此代码中的一般线程爆炸。请参阅 WWDC 2015 视频中的“线程爆炸导致死锁”讨论 Building Responsive and Efficient Apps with GCD. The WWDC 2016 Concurrent Programming With GCD in Swift 3 也是一个不错的视频。在这两个链接中,我都会让您离开视频的相关部分,但两者都值得完整观看。
最重要的是,您正在耗尽 GCD 线程池中非常有限的工作线程。只有 64 个。但是你有 100 个带有障碍的写入,这意味着调度块不能 运行 直到该队列上的其他所有内容都完成。它们散布着 100 次读取,因为它们是同步的,所以会阻塞您从中调度它的工作线程,直到它 returns.
让我们将其简化为更简单的问题:
dispatch_queue_t queue1 = dispatch_queue_create("queue1", DISPATCH_QUEUE_CONCURRENT);
dispatch_queue_t queue2 = dispatch_queue_create("queue2", DISPATCH_QUEUE_CONCURRENT);
for (int i = 0; i < 100; i++) {
dispatch_async(queue2, ^{
dispatch_barrier_async(queue1, ^{
NSLog(@"barrier async %d", i);
});
dispatch_sync(queue1, ^{
NSLog(@"sync %d", i);
});
});
}
NSLog(@"done dispatching all blocks to queue1");
这会产生如下内容:
starting
done dispatching all blocks to queue1
barrier async 0
sync 0
它死锁了。
但是如果我们限制它不超过,比如 30 个项目可以同时在 queue2
上 运行,那么问题就消失了:
dispatch_queue_t queue1 = dispatch_queue_create("queue1", DISPATCH_QUEUE_CONCURRENT);
dispatch_queue_t queue2 = dispatch_queue_create("queue2", DISPATCH_QUEUE_CONCURRENT);
dispatch_semaphore_t semaphore = dispatch_semaphore_create(30);
for (int i = 0; i < 100; i++) {
dispatch_semaphore_wait(semaphore, DISPATCH_TIME_FOREVER);
dispatch_async(queue2, ^{
dispatch_barrier_async(queue1, ^{
NSLog(@"barrier async %d", i);
});
dispatch_sync(queue1, ^{
NSLog(@"sync %d", i);
});
dispatch_semaphore_signal(semaphore);
});
}
NSLog(@"done dispatching all blocks to queue1");
或者,另一种方法是使用 dispatch_apply
,这实际上是一个并行化的 for
循环,但在任何给定时刻将并发任务的数量限制为您机器上的核心数量(保持我们远低于耗尽工作线程的阈值):
dispatch_queue_t queue1 = dispatch_queue_create("queue1", DISPATCH_QUEUE_CONCURRENT);
dispatch_queue_t queue2 = dispatch_queue_create("queue2", DISPATCH_QUEUE_CONCURRENT);
dispatch_apply(100, queue2, ^(size_t i) {
dispatch_barrier_async(queue1, ^{
NSLog(@"barrier async %ld", i);
});
dispatch_sync(queue1, ^{
NSLog(@"sync %ld", i);
});
});
NSLog(@"done dispatching all blocks to queue1");