工作队列浅析
就从添加一个work_sturt到工作队列开始讲起吧,有两个函数可达到该目的:schedule_work、queue_work,只看queue_work吧,因为看了下面这些代码,就不用说啥了
1 int schedule_work(struct work_struct *work) 2 { 3 return queue_work(keventd_wq, work); 4 }
1、events内核线程创建
那就先看看这个函数里面提到的keventd_wq变量,该变量的初始化在
1 void __init init_workqueues(void) 2 { 3 alloc_cpumask_var(&cpu_populated_map, GFP_KERNEL); 4 5 cpumask_copy(cpu_populated_map, cpu_online_mask); 6 singlethread_cpu = cpumask_first(cpu_possible_mask); 7 cpu_singlethread_map = cpumask_of(singlethread_cpu); 8 hotcpu_notifier(workqueue_cpu_callback, 0); 9 keventd_wq = create_workqueue("events"); 10 BUG_ON(!keventd_wq); 11 }
1 #define create_workqueue(name) __create_workqueue((name), 0, 0, 0) 2 3 #define __create_workqueue(name, singlethread, freezeable, rt) \ 4 __create_workqueue_key((name), (singlethread), (freezeable), (rt), \ 5 NULL, NULL) 6 7 struct workqueue_struct *__create_workqueue_key(const char *name, 8 int singlethread, 9 int freezeable, 10 int rt, 11 struct lock_class_key *key, 12 const char *lock_name) 13 { 14 ...15 if (singlethread) { 16 cwq = init_cpu_workqueue(wq, singlethread_cpu); 17 err = create_workqueue_thread(cwq, singlethread_cpu); 18 start_workqueue_thread(cwq, -1); 19 } else { 20 ...21 for_each_possible_cpu(cpu) { 22 cwq = init_cpu_workqueue(wq, cpu); 23 if (err || !cpu_online(cpu)) 24 continue; 25 err = create_workqueue_thread(cwq, cpu); 26 start_workqueue_thread(cwq, cpu); 27 } 28 cpu_maps_update_done(); 29 } 30 ...31 }
1 static void start_workqueue_thread(struct cpu_workqueue_struct *cwq, int cpu) 2 { 3 struct task_struct *p = cwq->thread; 4 5 if (p != NULL) { 6 if (cpu >= 0) 7 kthread_bind(p, cpu); 8 wake_up_process(p); 9 } 10 }
上面这些代码就是常见的events线程的创建,singlethread为0,就是为每个cpu创建一个events线程,创建出来的线程名字就是events/*,“*”代表cpu的id。再回过头来看看keventd_wq 工作队列的创建,工作队列的数据结构如下:
1 struct workqueue_struct { 2 struct cpu_workqueue_struct *cpu_wq; 3 struct list_head list; 4 const char *name; 5 int singlethread; 6 int freezeable; /* Freeze threads during suspend */ 7 int rt; 8 #ifdef CONFIG_LOCKDEP 9 struct lockdep_map lockdep_map; 10 #endif 11 };
1 struct cpu_workqueue_struct { 2 3 spinlock_t lock; 4 5 struct list_head worklist; 6 wait_queue_head_t more_work; 7 struct work_struct *current_work; 8 9 struct workqueue_struct *wq; 10 struct task_struct *thread; 11 } ____cacheline_aligned;
重点是填满cpu_wq,而该成员用alloc_percpu被初始化成一个每cpu变量,为每个cpu创建一个内核线程,名字如前面所示,其他变量的填充直接略过,没啥说的,除过more_work成员,这是工作队列机制实现的重点,这部分放在queue_work流程里面讲。
2、queue_work提交work_struct
1 int queue_work(struct workqueue_struct *wq, struct work_struct *work) 2 { 3 int ret; 4 5 ret = queue_work_on(get_cpu(), wq, work); 6 put_cpu(); 7 8 return ret; 9 }
1 int 2 queue_work_on(int cpu, struct workqueue_struct *wq, struct work_struct *work) 3 { 4 int ret = 0; 5 6 if (!test_and_set_bit(WORK_STRUCT_PENDING, work_data_bits(work))) { 7 BUG_ON(!list_empty(&work->entry)); 8 __queue_work(wq_per_cpu(wq, cpu), work); 9 ret = 1; 10 } 11 return ret; 12 }
1 static void __queue_work(struct cpu_workqueue_struct *cwq, 2 struct work_struct *work) 3 { 4 unsigned long flags; 5 6 spin_lock_irqsave(&cwq->lock, flags); 7 insert_work(cwq, work, &cwq->worklist); 8 spin_unlock_irqrestore(&cwq->lock, flags); 9 }
1 static void insert_work(struct cpu_workqueue_struct *cwq, 2 struct work_struct *work, struct list_head *head) 3 { 4 trace_workqueue_insertion(cwq->thread, work); 5 6 set_wq_data(work, cwq); 7 /* 8 * Ensure that we get the right work->data if we see the 9 * result of list_add() below, see try_to_grab_pending(). 10 */ 11 smp_wmb(); 12 list_add_tail(&work->entry, head); 13 wake_up(&cwq->more_work); 14 }
贴了好长一段代码,queue_work的调用流程就不说了,着重看一下insert_work函数里面的set_wq_data函数的含义,因为下面就是wake_up了,参考另外一个博文即可。
1 /* 2 * Set the workqueue on which a work item is to be run 3 * - Must *only* be called if the pending flag is set 4 */ 5 static inline void set_wq_data(struct work_struct *work, 6 struct cpu_workqueue_struct *cwq) 7 { 8 unsigned long new; 9 10 BUG_ON(!work_pending(work)); 11 12 new = (unsigned long) cwq | (1UL << WORK_STRUCT_PENDING); 13 new |= WORK_STRUCT_FLAG_MASK & *work_data_bits(work); 14 atomic_long_set(&work->data, new); 15 }
这里的重点又在work_pending,可惜全都是汇编代码,我大概说一下吧(认为CONFIG_VM86没有定义)
1 #define _TIF_NEED_RESCHED (1 << TIF_NEED_RESCHED) 2 #define TIF_NEED_RESCHED 3 /* rescheduling necessary */ 3 4 work_pending: 5 testb $_TIF_NEED_RESCHED, %cl 6 jz work_notifysig 7 work_resched: 8 call schedule 9 LOCKDEP_SYS_EXIT 10 DISABLE_INTERRUPTS(CLBR_ANY) # make sure we don\'t miss an interrupt 11 # setting need_resched or sigpending 12 # between sampling and the iret 13 TRACE_IRQS_OFF 14 movl TI_flags(%ebp), %ecx 15 andl $_TIF_WORK_MASK, %ecx # is there any work to be done other 16 # than syscall tracing? 17 jz restore_all 18 testb $_TIF_NEED_RESCHED, %cl 19 jnz work_resched 20 21 work_notifysig: 22 movl %esp, %eax 23 xorl %edx, %edx 24 call do_notify_resume 25 jmp resume_userspace_sig 26 END(work_pending)
1 #define resume_userspace_sig resume_userspace 2 3 ENTRY(resume_userspace) 4 LOCKDEP_SYS_EXIT 5 DISABLE_INTERRUPTS(CLBR_ANY) # make sure we don\'t miss an interrupt 6 # setting need_resched or sigpending 7 # between sampling and the iret 8 TRACE_IRQS_OFF 9 movl TI_flags(%ebp), %ecx 10 andl $_TIF_WORK_MASK, %ecx # is there any work to be done on 11 # int/exception return? 12 jne work_pending 13 jmp restore_all
do_notify_resume是个C函数做主要工作,这是进程管理相关的内容,这里不详谈了,交由后面去分析,这里放一下水。
总结:提交work_struct的主要函数还是queue_work,区别只在于“struct workqueue_struct”是用自己定义的还是keventd_wq,一般情况都是用keventd_wq,可工作队列用的太多的时候资源会很紧张(就靠每个cpu的events线程,可能会让内核力不从心),自己创建也就很必要了,怎么做呢,直接抄内核的做法不就行了,自己玩儿一套,可以使用单线程,可以使用一部分cpu,都由自己来定,理解工作队列的核心,掌握keventd_wq就OK啦!