重定向提交到调度队列的块
Retarget blocks submitted to dispatch queue
我有串行调度队列 Q
(以另一个串行队列 T
作为目标)并且已经通过 dispatch_async(Q, block)
提交了几个块。有没有办法将挂起的块重新定位到另一个队列 A
?
我的简单测试显示 Q
尽快将块转发到 T
,因此设置新目标没有效果:
#define print(f, ...) printf(f "\n", ##__VA_ARGS__)
dispatch_queue_t Q = dispatch_queue_create("Q", DISPATCH_QUEUE_SERIAL);
dispatch_queue_t T = dispatch_queue_create("T", DISPATCH_QUEUE_SERIAL);
dispatch_queue_t A = dispatch_queue_create("A", DISPATCH_QUEUE_SERIAL);
dispatch_set_target_queue(Q, T);
dispatch_async(T, ^{ print("T sleeping"); sleep(2); print("T ready"); });
dispatch_async(A, ^{ print("A sleeping"); sleep(5); print("A ready"); });
dispatch_async(Q, ^{
print("block 1");
dispatch_set_target_queue(Q, A); // no effect!
});
dispatch_async(Q, ^{ print("block 2"); });
dispatch_async(Q, ^{ print("block 3"); });
dispatch_async(Q, ^{ print("block 4"); });
输出:
A sleeping
T sleeping
(wait 2 seconds)
T ready
block 1
block 2
block 3
block 4
(wait 3 seconds)
A ready
如您所见,块 2-4 已固定到 T
,即使手动状态为:
The new target queue setting will take effect between block executions on the object, but not in the middle of any existing block executions (non-preemptive).
我不清楚这是否仅适用于调度源,或者 "existing" 表示已经提交(甚至尚未执行)块,但无论如何,我的串行队列不会发生这种情况。
有办法吗?
好的,经过一番研究,我想出了以下解决方案。它基于自定义调度源,我认为这是唯一的及时提交块的方法。
BlockSource.h:
dispatch_source_t dispatch_block_source_create(dispatch_queue_t queue);
void dispatch_block_source_add_block(dispatch_source_t source, dispatch_block_t block);
BlockSource.c:
struct context {
CFMutableArrayRef array;
pthread_mutex_t mutex;
dispatch_source_t source;
};
static void
s_event(struct context *context)
{
dispatch_block_t block;
CFIndex pending;
pthread_mutex_lock(&context->mutex); {
block = CFArrayGetValueAtIndex(context->array, 0);
CFArrayRemoveValueAtIndex(context->array, 0);
pending = CFArrayGetCount(context->array);
}
pthread_mutex_unlock(&context->mutex);
block();
Block_release(block);
if (pending)
dispatch_source_merge_data(context->source, 1);
}
static void
s_cancel(struct context *context)
{
CFIndex count = CFArrayGetCount(context->array);
for (CFIndex i = 0; i < count; i++) {
dispatch_block_t block = CFArrayGetValueAtIndex(context->array, i);
Block_release(block);
}
CFRelease(context->array);
pthread_mutex_destroy(&context->mutex);
print("canceled");
}
dispatch_source_t
dispatch_block_source_create(dispatch_queue_t queue)
{
dispatch_source_t source = dispatch_source_create(DISPATCH_SOURCE_TYPE_DATA_OR, 0, 0, queue);
struct context *context = calloc(1, sizeof(*context));
context->array = CFArrayCreateMutable(kCFAllocatorDefault, 0, NULL);
pthread_mutex_init(&context->mutex, NULL);
context->source = source;
dispatch_set_context(source, context);
dispatch_source_set_event_handler_f(source, (dispatch_function_t)s_event);
dispatch_source_set_cancel_handler_f(source, (dispatch_function_t)s_cancel);
dispatch_set_finalizer_f(source, (dispatch_function_t)free);
return source;
}
void
dispatch_block_source_add_block(dispatch_source_t source, dispatch_block_t block)
{
struct context *context = dispatch_get_context(source);
pthread_mutex_lock(&context->mutex); {
CFArrayAppendValue(context->array, Block_copy(block));
dispatch_source_merge_data(context->source, 1);
}
pthread_mutex_unlock(&context->mutex);
}
和测试用例:
dispatch_queue_t T = dispatch_queue_create("T", DISPATCH_QUEUE_SERIAL);
dispatch_queue_t A = dispatch_queue_create("A", DISPATCH_QUEUE_SERIAL);
static int queue_name_key;
dispatch_queue_set_specific(T, &queue_name_key, "T", NULL);
dispatch_queue_set_specific(A, &queue_name_key, "A", NULL);
dispatch_source_t source = dispatch_block_source_create(T);
dispatch_resume(source);
for (int i = 1; i <= 10; i++) {
dispatch_block_source_add_block(source, ^{
print("block %d on queue %s", i, dispatch_get_specific(&queue_name_key));
sleep(1);
if (i == 2) {
dispatch_set_target_queue(source, A);
}
else if (i == 5) {
dispatch_source_cancel(source);
dispatch_release(source);
}
});
}
输出:
block 1 on queue T
block 2 on queue T
block 3 on queue A
block 4 on queue A
block 5 on queue A
canceled
至少现在它在 dispatch_set_target_queue()
调用后遵循调度源的方案。这意味着如果在提交的块之一内设置了新的目标队列,则可以保证所有剩余的块都将进入新队列。
可能仍然存在错误。
我有串行调度队列 Q
(以另一个串行队列 T
作为目标)并且已经通过 dispatch_async(Q, block)
提交了几个块。有没有办法将挂起的块重新定位到另一个队列 A
?
我的简单测试显示 Q
尽快将块转发到 T
,因此设置新目标没有效果:
#define print(f, ...) printf(f "\n", ##__VA_ARGS__)
dispatch_queue_t Q = dispatch_queue_create("Q", DISPATCH_QUEUE_SERIAL);
dispatch_queue_t T = dispatch_queue_create("T", DISPATCH_QUEUE_SERIAL);
dispatch_queue_t A = dispatch_queue_create("A", DISPATCH_QUEUE_SERIAL);
dispatch_set_target_queue(Q, T);
dispatch_async(T, ^{ print("T sleeping"); sleep(2); print("T ready"); });
dispatch_async(A, ^{ print("A sleeping"); sleep(5); print("A ready"); });
dispatch_async(Q, ^{
print("block 1");
dispatch_set_target_queue(Q, A); // no effect!
});
dispatch_async(Q, ^{ print("block 2"); });
dispatch_async(Q, ^{ print("block 3"); });
dispatch_async(Q, ^{ print("block 4"); });
输出:
A sleeping
T sleeping
(wait 2 seconds)
T ready
block 1
block 2
block 3
block 4
(wait 3 seconds)
A ready
如您所见,块 2-4 已固定到 T
,即使手动状态为:
The new target queue setting will take effect between block executions on the object, but not in the middle of any existing block executions (non-preemptive).
我不清楚这是否仅适用于调度源,或者 "existing" 表示已经提交(甚至尚未执行)块,但无论如何,我的串行队列不会发生这种情况。
有办法吗?
好的,经过一番研究,我想出了以下解决方案。它基于自定义调度源,我认为这是唯一的及时提交块的方法。
BlockSource.h:
dispatch_source_t dispatch_block_source_create(dispatch_queue_t queue);
void dispatch_block_source_add_block(dispatch_source_t source, dispatch_block_t block);
BlockSource.c:
struct context {
CFMutableArrayRef array;
pthread_mutex_t mutex;
dispatch_source_t source;
};
static void
s_event(struct context *context)
{
dispatch_block_t block;
CFIndex pending;
pthread_mutex_lock(&context->mutex); {
block = CFArrayGetValueAtIndex(context->array, 0);
CFArrayRemoveValueAtIndex(context->array, 0);
pending = CFArrayGetCount(context->array);
}
pthread_mutex_unlock(&context->mutex);
block();
Block_release(block);
if (pending)
dispatch_source_merge_data(context->source, 1);
}
static void
s_cancel(struct context *context)
{
CFIndex count = CFArrayGetCount(context->array);
for (CFIndex i = 0; i < count; i++) {
dispatch_block_t block = CFArrayGetValueAtIndex(context->array, i);
Block_release(block);
}
CFRelease(context->array);
pthread_mutex_destroy(&context->mutex);
print("canceled");
}
dispatch_source_t
dispatch_block_source_create(dispatch_queue_t queue)
{
dispatch_source_t source = dispatch_source_create(DISPATCH_SOURCE_TYPE_DATA_OR, 0, 0, queue);
struct context *context = calloc(1, sizeof(*context));
context->array = CFArrayCreateMutable(kCFAllocatorDefault, 0, NULL);
pthread_mutex_init(&context->mutex, NULL);
context->source = source;
dispatch_set_context(source, context);
dispatch_source_set_event_handler_f(source, (dispatch_function_t)s_event);
dispatch_source_set_cancel_handler_f(source, (dispatch_function_t)s_cancel);
dispatch_set_finalizer_f(source, (dispatch_function_t)free);
return source;
}
void
dispatch_block_source_add_block(dispatch_source_t source, dispatch_block_t block)
{
struct context *context = dispatch_get_context(source);
pthread_mutex_lock(&context->mutex); {
CFArrayAppendValue(context->array, Block_copy(block));
dispatch_source_merge_data(context->source, 1);
}
pthread_mutex_unlock(&context->mutex);
}
和测试用例:
dispatch_queue_t T = dispatch_queue_create("T", DISPATCH_QUEUE_SERIAL);
dispatch_queue_t A = dispatch_queue_create("A", DISPATCH_QUEUE_SERIAL);
static int queue_name_key;
dispatch_queue_set_specific(T, &queue_name_key, "T", NULL);
dispatch_queue_set_specific(A, &queue_name_key, "A", NULL);
dispatch_source_t source = dispatch_block_source_create(T);
dispatch_resume(source);
for (int i = 1; i <= 10; i++) {
dispatch_block_source_add_block(source, ^{
print("block %d on queue %s", i, dispatch_get_specific(&queue_name_key));
sleep(1);
if (i == 2) {
dispatch_set_target_queue(source, A);
}
else if (i == 5) {
dispatch_source_cancel(source);
dispatch_release(source);
}
});
}
输出:
block 1 on queue T
block 2 on queue T
block 3 on queue A
block 4 on queue A
block 5 on queue A
canceled
至少现在它在 dispatch_set_target_queue()
调用后遵循调度源的方案。这意味着如果在提交的块之一内设置了新的目标队列,则可以保证所有剩余的块都将进入新队列。
可能仍然存在错误。