iOS 开发之 生产者与消费者模式及其实现
概念:
在工作中,大家可能会碰到这样一种情况:某个模块负责产生数据,这些数据由另一个模块来负责处理(此处的模块是广义的,可以是类、函数、线程、进程等)。产生数据的模块,就形象地称为生产者;而处理数据的模块,就称为消费者。在生产者与消费者之间在加个缓冲区,我们形象的称之为仓库,生产者负责往仓库了进商品,而消费者负责从仓库里拿商品,这就构成了生产者消费者模式。
优点:
1、解耦:
假设生产者和消费者分别是两个类。如果让生产者直接调用消费者的某个方法,那 么生产者对于消费者就会产生依赖(也就是耦合)。将来如果消费者的代码发生变化, 可能会影响到生产者。而如果两者都依赖于某个缓冲区,两者之间不直接依赖,耦合也 就相应降低了。
举个例子,我们去邮局投递信件,如果不使用邮筒(也就是缓冲区),你必须得把 信直接交给邮递员。有同学会说,直接给邮递员不是挺简单的嘛?其实不简单,你必须 得认识谁是邮递员,才能把信给他(光凭身上穿的制服,万一有人假冒,就惨了)。这 就产生和你和邮递员之间的依赖(相当于生产者和消费者的强耦合)。万一哪天邮递员 换人了,你还要重新认识一下(相当于消费者变化导致修改生产者代码)。而邮筒相对 来说比较固定,你依赖它的成本就比较低(相当于和缓冲区之间的弱耦合)。
2、支持并发
由于生产者与消费者是两个独立的并发体,他们之间是用缓冲区作为桥梁连接,生产者只需要往缓冲区里丢数据,就可以继续生产下一个数据,而消费者只需要从缓冲区了拿数据即可,这样就不会因为彼此的处理速度而发生阻塞。
接上面的例子,如果我们不使用邮筒,我们就得在邮局等邮递员,直到他回来,我们把信件交给他,这期间我们啥事儿都不能干(也就是生产者阻塞),或者邮递员得挨家挨户问,谁要寄信(相当于消费者轮询)。
3、支持忙闲不均
缓冲区还有另一个好处。如果制造数据的速度时快时慢,缓冲区的好处就体现出来 了。当数据制造快的时候,消费者来不及处理,未处理的数据可以暂时存在缓冲区中。 等生产者的制造速度慢下来,消费者再慢慢处理掉。
为了充分复用,我们再拿寄信的例子来说事。假设邮递员一次只能带走1000封信。 万一某次碰上情人节(也可能是圣诞节)送贺卡,需要寄出去的信超过1000封,这时 候邮筒这个缓冲区就派上用场了。邮递员把来不及带走的信暂存在邮筒中,等下次过来 时再拿走。
实现:
GCD 和 信号量实现 生产者消费者模式:
GCD提供两种方式支持dispatch队列同步,即dispatch组和信号量。
一、dispatch组(dispatch group)
// 1. 创建dispatch组 dispatch_group_t group = dispatch_group_create(); dispatch_queue_t queue = dispatch_queue_create("www.ztb.queue", DISPATCH_QUEUE_SERIAL); // 2. 启动dispatch队列中的block关联到group中 dispatch_group_async(group, queue, ^{ // 。。。 }); // 3. 等待group关联的block执行完毕,也可以设置超时参数 dispatch_group_wait(group, DISPATCH_TIME_FOREVER); // 4. 为group设置通知一个block,当group关联的block执行完毕后,就调用这个block。类似dispatch_barrier_async。 dispatch_group_notify(group, queue, ^{ // 。。。 }); // 5. 手动管理group关联的block的运行状态(或计数),进入和退出group次数必须匹配 dispatch_group_enter(group); dispatch_group_leave(group); // 所以下面的两种调用其实是等价的, //A) dispatch_group_async(group, queue, ^{ // 。。。 }); // B) dispatch_group_enter(group); dispatch_async(queue, ^{ //。。。 dispatch_group_leave(group); });
dispatch_group_enter、 dispatch_group_leave和dispatch_group_wait来实现同步.
dispatch_group_t group = dispatch_group_create(); MyCoreDataObject *coreDataObject; dispatch_group_enter(group); AFHTTPRequestOperation *operation1 = [[AFHTTPRequestOperation alloc] initWithRequest:request1]; [operation1 setCompletionBlockWithSuccess:^(AFHTTPRequestOperation *operation, id responseObject) { coreDataObject.attribute1 = responseObject; sleep(5); dispatch_group_leave(group); }]; [operation1 start]; dispatch_group_enter(group); AFHTTPRequestOperation *operation2 = [[AFHTTPRequestOperation alloc] initWithRequest:request1]; [operation2 setCompletionBlockWithSuccess:^(AFHTTPRequestOperation *operation, id responseObject) { coreDataObject.attribute2 = responseObject; sleep(10); dispatch_group_leave(group); }]; [operation2 start]; dispatch_group_wait(group, DISPATCH_TIME_FOREVER); dispatch_release(group); [context save:nil];
二、dispatch信号量(dispatch semaphore)
// 1. 创建信号量,可以设置信号量的资源数。0表示没有资源,调用dispatch_semaphore_wait会立即等待。 dispatch_semaphore_t semaphore = dispatch_semaphore_create(0); // 2. 等待信号,可以设置超时参数。该函数返回0表示得到通知,非0表示超时。 dispatch_semaphore_wait(semaphore, DISPATCH_TIME_FOREVER); //3. 通知信号,如果等待线程被唤醒则返回非0,否则返回0。 dispatch_semaphore_signal(semaphore);
使用dispatch信号量是如何实现同步:
NSCondition *condition = [[NSCondition alloc] init];//使用NSCondition实现多线程同步的问题,也就是解决生产者消费者问题(如收发同步等等)。 dispatch_semaphore_t sem = dispatch_semaphore_create(0); dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{ //消费者队列 while (condition) { if (dispatch_semaphore_wait(sem, dispatch_time(DISPATCH_TIME_NOW, 10*NSEC_PER_SEC))) //等待10秒 continue; //得到数据 } }); dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{ //生产者队列 while (condition) { if (!dispatch_semaphore_signal(sem)) { sleep(1); //wait for a while continue; } //通知成功 } });
// 生产者生产数据 -(void)producerHandle { while (1) { NSString *currentThreadName = [NSThread currentThread].name; // NSLog(@"%@" ,currentThreadName); //生产者限制生产数 [self.cond lock]; if (self.productNum > 10) { NSLog(@"!!!!生产太多,限制"); [self.cond unlock]; sleep(1); continue; }else { [self.cond unlock]; } [self.cond lock]; NSLog(@"============================"); NSLog(@"%@...Begin - %d" ,currentThreadName ,self.productNum); self.productNum++; [self.cond signal]; NSLog(@"%@...end - %d" ,currentThreadName ,self.productNum); NSLog(@"============================"); [self.cond unlock]; } } // 消费者消费资源 -(void)customerHandle { while (1) { NSString *currentThreadName = [NSThread currentThread].name; // NSLog(@"%@" ,currentThreadName); [self.cond lock]; NSLog(@"============================"); NSLog(@"%@...Begin - %d" ,currentThreadName ,self.productNum); while (self.productNum <= 0) { [self.cond wait]; } self.productNum--; NSLog(@"%@...end - %d" ,currentThreadName ,self.productNum); NSLog(@"============================"); [self.cond unlock]; } }