就从添加一个work_sturt到工作队列开始讲起吧,有两个函数可达到该目的:schedule_work、queue_work,只看queue_work吧,因为看了下面这些代码,就不用说啥了

1 int schedule_work(struct work_struct *work)
2 {   
3         return queue_work(keventd_wq, work);
4 }

1、events内核线程创建

那就先看看这个函数里面提到的keventd_wq变量,该变量的初始化在

 1 void __init init_workqueues(void)
 2 {   
 3         alloc_cpumask_var(&cpu_populated_map, GFP_KERNEL);
 4 
 5         cpumask_copy(cpu_populated_map, cpu_online_mask);
 6         singlethread_cpu = cpumask_first(cpu_possible_mask);
 7         cpu_singlethread_map = cpumask_of(singlethread_cpu);
 8         hotcpu_notifier(workqueue_cpu_callback, 0);
 9         keventd_wq = create_workqueue("events");
10         BUG_ON(!keventd_wq);  
11 }
 1 #define create_workqueue(name) __create_workqueue((name), 0, 0, 0)
 2 
 3 #define __create_workqueue(name, singlethread, freezeable, rt)  \
 4         __create_workqueue_key((name), (singlethread), (freezeable), (rt), \
 5                                NULL, NULL)
 6 
 7 struct workqueue_struct *__create_workqueue_key(const char *name,
 8                                                 int singlethread,
 9                                                 int freezeable,
10                                                 int rt,
11                                                 struct lock_class_key *key,
12                                                 const char *lock_name)
13 {
14 ...15 if (singlethread) {
16                 cwq = init_cpu_workqueue(wq, singlethread_cpu);
17                 err = create_workqueue_thread(cwq, singlethread_cpu);
18                 start_workqueue_thread(cwq, -1);
19         } else {
20 ...21                 for_each_possible_cpu(cpu) {
22                         cwq = init_cpu_workqueue(wq, cpu);
23                         if (err || !cpu_online(cpu))
24                                 continue;
25                         err = create_workqueue_thread(cwq, cpu);
26                         start_workqueue_thread(cwq, cpu);
27                 }
28                 cpu_maps_update_done();
29         }
30 ...31 }
 1 static void start_workqueue_thread(struct cpu_workqueue_struct *cwq, int cpu)
 2 {
 3         struct task_struct *p = cwq->thread;
 4 
 5         if (p != NULL) {
 6                 if (cpu >= 0)
 7                         kthread_bind(p, cpu);
 8                 wake_up_process(p);
 9         }               
10 }

上面这些代码就是常见的events线程的创建,singlethread为0,就是为每个cpu创建一个events线程,创建出来的线程名字就是events/*,“*”代表cpu的id。再回过头来看看keventd_wq 工作队列的创建,工作队列的数据结构如下:

 1 struct workqueue_struct {                       
 2         struct cpu_workqueue_struct *cpu_wq;    
 3         struct list_head list;                  
 4         const char *name;                       
 5         int singlethread;
 6         int freezeable;         /* Freeze threads during suspend */
 7         int rt;
 8 #ifdef CONFIG_LOCKDEP
 9         struct lockdep_map lockdep_map;
10 #endif  
11 };
 1 struct cpu_workqueue_struct {
 2 
 3         spinlock_t lock;
 4 
 5         struct list_head worklist;
 6         wait_queue_head_t more_work;
 7         struct work_struct *current_work;
 8         
 9         struct workqueue_struct *wq;
10         struct task_struct *thread;
11 } ____cacheline_aligned;

重点是填满cpu_wq,而该成员用alloc_percpu被初始化成一个每cpu变量,为每个cpu创建一个内核线程,名字如前面所示,其他变量的填充直接略过,没啥说的,除过more_work成员,这是工作队列机制实现的重点,这部分放在queue_work流程里面讲。

2、queue_work提交work_struct

1 int queue_work(struct workqueue_struct *wq, struct work_struct *work)
2 {                       
3         int ret;
4                 
5         ret = queue_work_on(get_cpu(), wq, work);
6         put_cpu();
7         
8         return ret;
9 }
 1 int                             
 2 queue_work_on(int cpu, struct workqueue_struct *wq, struct work_struct *work)
 3 {                       
 4         int ret = 0;
 5                 
 6         if (!test_and_set_bit(WORK_STRUCT_PENDING, work_data_bits(work))) {
 7                 BUG_ON(!list_empty(&work->entry));
 8                 __queue_work(wq_per_cpu(wq, cpu), work);
 9                 ret = 1;
10         }       
11         return ret;
12 }
1 static void __queue_work(struct cpu_workqueue_struct *cwq,
2                          struct work_struct *work) 
3 {               
4         unsigned long flags;
5                 
6         spin_lock_irqsave(&cwq->lock, flags);
7         insert_work(cwq, work, &cwq->worklist);
8         spin_unlock_irqrestore(&cwq->lock, flags);
9 }
 1 static void insert_work(struct cpu_workqueue_struct *cwq,
 2                         struct work_struct *work, struct list_head *head)
 3 {
 4         trace_workqueue_insertion(cwq->thread, work);
 5 
 6         set_wq_data(work, cwq);
 7         /*
 8          * Ensure that we get the right work->data if we see the
 9          * result of list_add() below, see try_to_grab_pending().
10          */
11         smp_wmb();
12         list_add_tail(&work->entry, head);
13         wake_up(&cwq->more_work);
14 }

贴了好长一段代码,queue_work的调用流程就不说了,着重看一下insert_work函数里面的set_wq_data函数的含义,因为下面就是wake_up了,参考另外一个博文即可。

 1 /*      
 2  * Set the workqueue on which a work item is to be run
 3  * - Must *only* be called if the pending flag is set
 4  */
 5 static inline void set_wq_data(struct work_struct *work, 
 6                                 struct cpu_workqueue_struct *cwq)
 7 {       
 8         unsigned long new;
 9         
10         BUG_ON(!work_pending(work));
11 
12         new = (unsigned long) cwq | (1UL << WORK_STRUCT_PENDING);
13         new |= WORK_STRUCT_FLAG_MASK & *work_data_bits(work);
14         atomic_long_set(&work->data, new);
15 }

这里的重点又在work_pending,可惜全都是汇编代码,我大概说一下吧(认为CONFIG_VM86没有定义)

 1 #define _TIF_NEED_RESCHED       (1 << TIF_NEED_RESCHED)
 2 #define TIF_NEED_RESCHED        3       /* rescheduling necessary */
 3 
 4 work_pending:
 5           testb $_TIF_NEED_RESCHED, %cl       
 6           jz work_notifysig   
 7 work_resched:
 8           call schedule       
 9         LOCKDEP_SYS_EXIT    
10         DISABLE_INTERRUPTS(CLBR_ANY)    # make sure we don\'t miss an interrupt
11                                           # setting need_resched or sigpending
12                                           # between sampling and the iret     
13         TRACE_IRQS_OFF      
14           movl TI_flags(%ebp), %ecx
15           andl $_TIF_WORK_MASK, %ecx      # is there any work to be done other
16                                           # than syscall tracing?             
17           jz restore_all      
18           testb $_TIF_NEED_RESCHED, %cl       
19           jnz work_resched    
20     
21 work_notifysig:
22           movl %esp, %eax
23           xorl %edx, %edx
24           call do_notify_resume
25           jmp resume_userspace_sig
26 END(work_pending)
 1 #define resume_userspace_sig    resume_userspace
 2 
 3 ENTRY(resume_userspace)
 4         LOCKDEP_SYS_EXIT
 5         DISABLE_INTERRUPTS(CLBR_ANY)    # make sure we don\'t miss an interrupt
 6                                           # setting need_resched or sigpending
 7                                           # between sampling and the iret
 8         TRACE_IRQS_OFF
 9           movl TI_flags(%ebp), %ecx
10           andl $_TIF_WORK_MASK, %ecx      # is there any work to be done on
11                                           # int/exception return?
12           jne work_pending
13           jmp restore_all

do_notify_resume是个C函数做主要工作,这是进程管理相关的内容,这里不详谈了,交由后面去分析,这里放一下水。

 

总结:提交work_struct的主要函数还是queue_work,区别只在于“struct workqueue_struct”是用自己定义的还是keventd_wq,一般情况都是用keventd_wq,可工作队列用的太多的时候资源会很紧张(就靠每个cpu的events线程,可能会让内核力不从心),自己创建也就很必要了,怎么做呢,直接抄内核的做法不就行了,自己玩儿一套,可以使用单线程,可以使用一部分cpu,都由自己来定,理解工作队列的核心,掌握keventd_wq就OK啦!

版权声明:本文为ziranyyp原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://www.cnblogs.com/ziranyyp/p/6596930.html