Merge branch 'devel-stable' into devel
[deliverable/linux.git] / kernel / slow-work.c
1 /* Worker thread pool for slow items, such as filesystem lookups or mkdirs
2 *
3 * Copyright (C) 2008 Red Hat, Inc. All Rights Reserved.
4 * Written by David Howells (dhowells@redhat.com)
5 *
6 * This program is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU General Public Licence
8 * as published by the Free Software Foundation; either version
9 * 2 of the Licence, or (at your option) any later version.
10 *
11 * See Documentation/slow-work.txt
12 */
13
14 #include <linux/module.h>
15 #include <linux/slow-work.h>
16 #include <linux/kthread.h>
17 #include <linux/freezer.h>
18 #include <linux/wait.h>
19 #include <linux/proc_fs.h>
20 #include "slow-work.h"
21
22 static void slow_work_cull_timeout(unsigned long);
23 static void slow_work_oom_timeout(unsigned long);
24
25 #ifdef CONFIG_SYSCTL
26 static int slow_work_min_threads_sysctl(struct ctl_table *, int,
27 void __user *, size_t *, loff_t *);
28
29 static int slow_work_max_threads_sysctl(struct ctl_table *, int ,
30 void __user *, size_t *, loff_t *);
31 #endif
32
33 /*
34 * The pool of threads has at least min threads in it as long as someone is
35 * using the facility, and may have as many as max.
36 *
37 * A portion of the pool may be processing very slow operations.
38 */
39 static unsigned slow_work_min_threads = 2;
40 static unsigned slow_work_max_threads = 4;
41 static unsigned vslow_work_proportion = 50; /* % of threads that may process
42 * very slow work */
43
44 #ifdef CONFIG_SYSCTL
45 static const int slow_work_min_min_threads = 2;
46 static int slow_work_max_max_threads = SLOW_WORK_THREAD_LIMIT;
47 static const int slow_work_min_vslow = 1;
48 static const int slow_work_max_vslow = 99;
49
50 ctl_table slow_work_sysctls[] = {
51 {
52 .ctl_name = CTL_UNNUMBERED,
53 .procname = "min-threads",
54 .data = &slow_work_min_threads,
55 .maxlen = sizeof(unsigned),
56 .mode = 0644,
57 .proc_handler = slow_work_min_threads_sysctl,
58 .extra1 = (void *) &slow_work_min_min_threads,
59 .extra2 = &slow_work_max_threads,
60 },
61 {
62 .ctl_name = CTL_UNNUMBERED,
63 .procname = "max-threads",
64 .data = &slow_work_max_threads,
65 .maxlen = sizeof(unsigned),
66 .mode = 0644,
67 .proc_handler = slow_work_max_threads_sysctl,
68 .extra1 = &slow_work_min_threads,
69 .extra2 = (void *) &slow_work_max_max_threads,
70 },
71 {
72 .ctl_name = CTL_UNNUMBERED,
73 .procname = "vslow-percentage",
74 .data = &vslow_work_proportion,
75 .maxlen = sizeof(unsigned),
76 .mode = 0644,
77 .proc_handler = &proc_dointvec_minmax,
78 .extra1 = (void *) &slow_work_min_vslow,
79 .extra2 = (void *) &slow_work_max_vslow,
80 },
81 { .ctl_name = 0 }
82 };
83 #endif
84
85 /*
86 * The active state of the thread pool
87 */
88 static atomic_t slow_work_thread_count;
89 static atomic_t vslow_work_executing_count;
90
91 static bool slow_work_may_not_start_new_thread;
92 static bool slow_work_cull; /* cull a thread due to lack of activity */
93 static DEFINE_TIMER(slow_work_cull_timer, slow_work_cull_timeout, 0, 0);
94 static DEFINE_TIMER(slow_work_oom_timer, slow_work_oom_timeout, 0, 0);
95 static struct slow_work slow_work_new_thread; /* new thread starter */
96
97 /*
98 * slow work ID allocation (use slow_work_queue_lock)
99 */
100 static DECLARE_BITMAP(slow_work_ids, SLOW_WORK_THREAD_LIMIT);
101
102 /*
103 * Unregistration tracking to prevent put_ref() from disappearing during module
104 * unload
105 */
106 #ifdef CONFIG_MODULES
107 static struct module *slow_work_thread_processing[SLOW_WORK_THREAD_LIMIT];
108 static struct module *slow_work_unreg_module;
109 static struct slow_work *slow_work_unreg_work_item;
110 static DECLARE_WAIT_QUEUE_HEAD(slow_work_unreg_wq);
111 static DEFINE_MUTEX(slow_work_unreg_sync_lock);
112 #endif
113
114 /*
115 * Data for tracking currently executing items for indication through /proc
116 */
117 #ifdef CONFIG_SLOW_WORK_PROC
118 struct slow_work *slow_work_execs[SLOW_WORK_THREAD_LIMIT];
119 pid_t slow_work_pids[SLOW_WORK_THREAD_LIMIT];
120 DEFINE_RWLOCK(slow_work_execs_lock);
121 #endif
122
123 /*
124 * The queues of work items and the lock governing access to them. These are
125 * shared between all the CPUs. It doesn't make sense to have per-CPU queues
126 * as the number of threads bears no relation to the number of CPUs.
127 *
128 * There are two queues of work items: one for slow work items, and one for
129 * very slow work items.
130 */
131 LIST_HEAD(slow_work_queue);
132 LIST_HEAD(vslow_work_queue);
133 DEFINE_SPINLOCK(slow_work_queue_lock);
134
135 /*
136 * The following are two wait queues that get pinged when a work item is placed
137 * on an empty queue. These allow work items that are hogging a thread by
138 * sleeping in a way that could be deferred to yield their thread and enqueue
139 * themselves.
140 */
141 static DECLARE_WAIT_QUEUE_HEAD(slow_work_queue_waits_for_occupation);
142 static DECLARE_WAIT_QUEUE_HEAD(vslow_work_queue_waits_for_occupation);
143
144 /*
145 * The thread controls. A variable used to signal to the threads that they
146 * should exit when the queue is empty, a waitqueue used by the threads to wait
147 * for signals, and a completion set by the last thread to exit.
148 */
149 static bool slow_work_threads_should_exit;
150 static DECLARE_WAIT_QUEUE_HEAD(slow_work_thread_wq);
151 static DECLARE_COMPLETION(slow_work_last_thread_exited);
152
153 /*
154 * The number of users of the thread pool and its lock. Whilst this is zero we
155 * have no threads hanging around, and when this reaches zero, we wait for all
156 * active or queued work items to complete and kill all the threads we do have.
157 */
158 static int slow_work_user_count;
159 static DEFINE_MUTEX(slow_work_user_lock);
160
161 static inline int slow_work_get_ref(struct slow_work *work)
162 {
163 if (work->ops->get_ref)
164 return work->ops->get_ref(work);
165
166 return 0;
167 }
168
169 static inline void slow_work_put_ref(struct slow_work *work)
170 {
171 if (work->ops->put_ref)
172 work->ops->put_ref(work);
173 }
174
175 /*
176 * Calculate the maximum number of active threads in the pool that are
177 * permitted to process very slow work items.
178 *
179 * The answer is rounded up to at least 1, but may not equal or exceed the
180 * maximum number of the threads in the pool. This means we always have at
181 * least one thread that can process slow work items, and we always have at
182 * least one thread that won't get tied up doing so.
183 */
184 static unsigned slow_work_calc_vsmax(void)
185 {
186 unsigned vsmax;
187
188 vsmax = atomic_read(&slow_work_thread_count) * vslow_work_proportion;
189 vsmax /= 100;
190 vsmax = max(vsmax, 1U);
191 return min(vsmax, slow_work_max_threads - 1);
192 }
193
194 /*
195 * Attempt to execute stuff queued on a slow thread. Return true if we managed
196 * it, false if there was nothing to do.
197 */
198 static noinline bool slow_work_execute(int id)
199 {
200 #ifdef CONFIG_MODULES
201 struct module *module;
202 #endif
203 struct slow_work *work = NULL;
204 unsigned vsmax;
205 bool very_slow;
206
207 vsmax = slow_work_calc_vsmax();
208
209 /* see if we can schedule a new thread to be started if we're not
210 * keeping up with the work */
211 if (!waitqueue_active(&slow_work_thread_wq) &&
212 (!list_empty(&slow_work_queue) || !list_empty(&vslow_work_queue)) &&
213 atomic_read(&slow_work_thread_count) < slow_work_max_threads &&
214 !slow_work_may_not_start_new_thread)
215 slow_work_enqueue(&slow_work_new_thread);
216
217 /* find something to execute */
218 spin_lock_irq(&slow_work_queue_lock);
219 if (!list_empty(&vslow_work_queue) &&
220 atomic_read(&vslow_work_executing_count) < vsmax) {
221 work = list_entry(vslow_work_queue.next,
222 struct slow_work, link);
223 if (test_and_set_bit_lock(SLOW_WORK_EXECUTING, &work->flags))
224 BUG();
225 list_del_init(&work->link);
226 atomic_inc(&vslow_work_executing_count);
227 very_slow = true;
228 } else if (!list_empty(&slow_work_queue)) {
229 work = list_entry(slow_work_queue.next,
230 struct slow_work, link);
231 if (test_and_set_bit_lock(SLOW_WORK_EXECUTING, &work->flags))
232 BUG();
233 list_del_init(&work->link);
234 very_slow = false;
235 } else {
236 very_slow = false; /* avoid the compiler warning */
237 }
238
239 #ifdef CONFIG_MODULES
240 if (work)
241 slow_work_thread_processing[id] = work->owner;
242 #endif
243 if (work) {
244 slow_work_mark_time(work);
245 slow_work_begin_exec(id, work);
246 }
247
248 spin_unlock_irq(&slow_work_queue_lock);
249
250 if (!work)
251 return false;
252
253 if (!test_and_clear_bit(SLOW_WORK_PENDING, &work->flags))
254 BUG();
255
256 /* don't execute if the work is in the process of being cancelled */
257 if (!test_bit(SLOW_WORK_CANCELLING, &work->flags))
258 work->ops->execute(work);
259
260 if (very_slow)
261 atomic_dec(&vslow_work_executing_count);
262 clear_bit_unlock(SLOW_WORK_EXECUTING, &work->flags);
263
264 /* wake up anyone waiting for this work to be complete */
265 wake_up_bit(&work->flags, SLOW_WORK_EXECUTING);
266
267 slow_work_end_exec(id, work);
268
269 /* if someone tried to enqueue the item whilst we were executing it,
270 * then it'll be left unenqueued to avoid multiple threads trying to
271 * execute it simultaneously
272 *
273 * there is, however, a race between us testing the pending flag and
274 * getting the spinlock, and between the enqueuer setting the pending
275 * flag and getting the spinlock, so we use a deferral bit to tell us
276 * if the enqueuer got there first
277 */
278 if (test_bit(SLOW_WORK_PENDING, &work->flags)) {
279 spin_lock_irq(&slow_work_queue_lock);
280
281 if (!test_bit(SLOW_WORK_EXECUTING, &work->flags) &&
282 test_and_clear_bit(SLOW_WORK_ENQ_DEFERRED, &work->flags))
283 goto auto_requeue;
284
285 spin_unlock_irq(&slow_work_queue_lock);
286 }
287
288 /* sort out the race between module unloading and put_ref() */
289 slow_work_put_ref(work);
290
291 #ifdef CONFIG_MODULES
292 module = slow_work_thread_processing[id];
293 slow_work_thread_processing[id] = NULL;
294 smp_mb();
295 if (slow_work_unreg_work_item == work ||
296 slow_work_unreg_module == module)
297 wake_up_all(&slow_work_unreg_wq);
298 #endif
299
300 return true;
301
302 auto_requeue:
303 /* we must complete the enqueue operation
304 * - we transfer our ref on the item back to the appropriate queue
305 * - don't wake another thread up as we're awake already
306 */
307 slow_work_mark_time(work);
308 if (test_bit(SLOW_WORK_VERY_SLOW, &work->flags))
309 list_add_tail(&work->link, &vslow_work_queue);
310 else
311 list_add_tail(&work->link, &slow_work_queue);
312 spin_unlock_irq(&slow_work_queue_lock);
313 slow_work_thread_processing[id] = NULL;
314 return true;
315 }
316
317 /**
318 * slow_work_sleep_till_thread_needed - Sleep till thread needed by other work
319 * work: The work item under execution that wants to sleep
320 * _timeout: Scheduler sleep timeout
321 *
322 * Allow a requeueable work item to sleep on a slow-work processor thread until
323 * that thread is needed to do some other work or the sleep is interrupted by
324 * some other event.
325 *
326 * The caller must set up a wake up event before calling this and must have set
327 * the appropriate sleep mode (such as TASK_UNINTERRUPTIBLE) and tested its own
328 * condition before calling this function as no test is made here.
329 *
330 * False is returned if there is nothing on the queue; true is returned if the
331 * work item should be requeued
332 */
333 bool slow_work_sleep_till_thread_needed(struct slow_work *work,
334 signed long *_timeout)
335 {
336 wait_queue_head_t *wfo_wq;
337 struct list_head *queue;
338
339 DEFINE_WAIT(wait);
340
341 if (test_bit(SLOW_WORK_VERY_SLOW, &work->flags)) {
342 wfo_wq = &vslow_work_queue_waits_for_occupation;
343 queue = &vslow_work_queue;
344 } else {
345 wfo_wq = &slow_work_queue_waits_for_occupation;
346 queue = &slow_work_queue;
347 }
348
349 if (!list_empty(queue))
350 return true;
351
352 add_wait_queue_exclusive(wfo_wq, &wait);
353 if (list_empty(queue))
354 *_timeout = schedule_timeout(*_timeout);
355 finish_wait(wfo_wq, &wait);
356
357 return !list_empty(queue);
358 }
359 EXPORT_SYMBOL(slow_work_sleep_till_thread_needed);
360
361 /**
362 * slow_work_enqueue - Schedule a slow work item for processing
363 * @work: The work item to queue
364 *
365 * Schedule a slow work item for processing. If the item is already undergoing
366 * execution, this guarantees not to re-enter the execution routine until the
367 * first execution finishes.
368 *
369 * The item is pinned by this function as it retains a reference to it, managed
370 * through the item operations. The item is unpinned once it has been
371 * executed.
372 *
373 * An item may hog the thread that is running it for a relatively large amount
374 * of time, sufficient, for example, to perform several lookup, mkdir, create
375 * and setxattr operations. It may sleep on I/O and may sleep to obtain locks.
376 *
377 * Conversely, if a number of items are awaiting processing, it may take some
378 * time before any given item is given attention. The number of threads in the
379 * pool may be increased to deal with demand, but only up to a limit.
380 *
381 * If SLOW_WORK_VERY_SLOW is set on the work item, then it will be placed in
382 * the very slow queue, from which only a portion of the threads will be
383 * allowed to pick items to execute. This ensures that very slow items won't
384 * overly block ones that are just ordinarily slow.
385 *
386 * Returns 0 if successful, -EAGAIN if not (or -ECANCELED if cancelled work is
387 * attempted queued)
388 */
389 int slow_work_enqueue(struct slow_work *work)
390 {
391 wait_queue_head_t *wfo_wq;
392 struct list_head *queue;
393 unsigned long flags;
394 int ret;
395
396 if (test_bit(SLOW_WORK_CANCELLING, &work->flags))
397 return -ECANCELED;
398
399 BUG_ON(slow_work_user_count <= 0);
400 BUG_ON(!work);
401 BUG_ON(!work->ops);
402
403 /* when honouring an enqueue request, we only promise that we will run
404 * the work function in the future; we do not promise to run it once
405 * per enqueue request
406 *
407 * we use the PENDING bit to merge together repeat requests without
408 * having to disable IRQs and take the spinlock, whilst still
409 * maintaining our promise
410 */
411 if (!test_and_set_bit_lock(SLOW_WORK_PENDING, &work->flags)) {
412 if (test_bit(SLOW_WORK_VERY_SLOW, &work->flags)) {
413 wfo_wq = &vslow_work_queue_waits_for_occupation;
414 queue = &vslow_work_queue;
415 } else {
416 wfo_wq = &slow_work_queue_waits_for_occupation;
417 queue = &slow_work_queue;
418 }
419
420 spin_lock_irqsave(&slow_work_queue_lock, flags);
421
422 if (unlikely(test_bit(SLOW_WORK_CANCELLING, &work->flags)))
423 goto cancelled;
424
425 /* we promise that we will not attempt to execute the work
426 * function in more than one thread simultaneously
427 *
428 * this, however, leaves us with a problem if we're asked to
429 * enqueue the work whilst someone is executing the work
430 * function as simply queueing the work immediately means that
431 * another thread may try executing it whilst it is already
432 * under execution
433 *
434 * to deal with this, we set the ENQ_DEFERRED bit instead of
435 * enqueueing, and the thread currently executing the work
436 * function will enqueue the work item when the work function
437 * returns and it has cleared the EXECUTING bit
438 */
439 if (test_bit(SLOW_WORK_EXECUTING, &work->flags)) {
440 set_bit(SLOW_WORK_ENQ_DEFERRED, &work->flags);
441 } else {
442 ret = slow_work_get_ref(work);
443 if (ret < 0)
444 goto failed;
445 slow_work_mark_time(work);
446 list_add_tail(&work->link, queue);
447 wake_up(&slow_work_thread_wq);
448
449 /* if someone who could be requeued is sleeping on a
450 * thread, then ask them to yield their thread */
451 if (work->link.prev == queue)
452 wake_up(wfo_wq);
453 }
454
455 spin_unlock_irqrestore(&slow_work_queue_lock, flags);
456 }
457 return 0;
458
459 cancelled:
460 ret = -ECANCELED;
461 failed:
462 spin_unlock_irqrestore(&slow_work_queue_lock, flags);
463 return ret;
464 }
465 EXPORT_SYMBOL(slow_work_enqueue);
466
467 static int slow_work_wait(void *word)
468 {
469 schedule();
470 return 0;
471 }
472
473 /**
474 * slow_work_cancel - Cancel a slow work item
475 * @work: The work item to cancel
476 *
477 * This function will cancel a previously enqueued work item. If we cannot
478 * cancel the work item, it is guarenteed to have run when this function
479 * returns.
480 */
481 void slow_work_cancel(struct slow_work *work)
482 {
483 bool wait = true, put = false;
484
485 set_bit(SLOW_WORK_CANCELLING, &work->flags);
486 smp_mb();
487
488 /* if the work item is a delayed work item with an active timer, we
489 * need to wait for the timer to finish _before_ getting the spinlock,
490 * lest we deadlock against the timer routine
491 *
492 * the timer routine will leave DELAYED set if it notices the
493 * CANCELLING flag in time
494 */
495 if (test_bit(SLOW_WORK_DELAYED, &work->flags)) {
496 struct delayed_slow_work *dwork =
497 container_of(work, struct delayed_slow_work, work);
498 del_timer_sync(&dwork->timer);
499 }
500
501 spin_lock_irq(&slow_work_queue_lock);
502
503 if (test_bit(SLOW_WORK_DELAYED, &work->flags)) {
504 /* the timer routine aborted or never happened, so we are left
505 * holding the timer's reference on the item and should just
506 * drop the pending flag and wait for any ongoing execution to
507 * finish */
508 struct delayed_slow_work *dwork =
509 container_of(work, struct delayed_slow_work, work);
510
511 BUG_ON(timer_pending(&dwork->timer));
512 BUG_ON(!list_empty(&work->link));
513
514 clear_bit(SLOW_WORK_DELAYED, &work->flags);
515 put = true;
516 clear_bit(SLOW_WORK_PENDING, &work->flags);
517
518 } else if (test_bit(SLOW_WORK_PENDING, &work->flags) &&
519 !list_empty(&work->link)) {
520 /* the link in the pending queue holds a reference on the item
521 * that we will need to release */
522 list_del_init(&work->link);
523 wait = false;
524 put = true;
525 clear_bit(SLOW_WORK_PENDING, &work->flags);
526
527 } else if (test_and_clear_bit(SLOW_WORK_ENQ_DEFERRED, &work->flags)) {
528 /* the executor is holding our only reference on the item, so
529 * we merely need to wait for it to finish executing */
530 clear_bit(SLOW_WORK_PENDING, &work->flags);
531 }
532
533 spin_unlock_irq(&slow_work_queue_lock);
534
535 /* the EXECUTING flag is set by the executor whilst the spinlock is set
536 * and before the item is dequeued - so assuming the above doesn't
537 * actually dequeue it, simply waiting for the EXECUTING flag to be
538 * released here should be sufficient */
539 if (wait)
540 wait_on_bit(&work->flags, SLOW_WORK_EXECUTING, slow_work_wait,
541 TASK_UNINTERRUPTIBLE);
542
543 clear_bit(SLOW_WORK_CANCELLING, &work->flags);
544 if (put)
545 slow_work_put_ref(work);
546 }
547 EXPORT_SYMBOL(slow_work_cancel);
548
549 /*
550 * Handle expiry of the delay timer, indicating that a delayed slow work item
551 * should now be queued if not cancelled
552 */
553 static void delayed_slow_work_timer(unsigned long data)
554 {
555 wait_queue_head_t *wfo_wq;
556 struct list_head *queue;
557 struct slow_work *work = (struct slow_work *) data;
558 unsigned long flags;
559 bool queued = false, put = false, first = false;
560
561 if (test_bit(SLOW_WORK_VERY_SLOW, &work->flags)) {
562 wfo_wq = &vslow_work_queue_waits_for_occupation;
563 queue = &vslow_work_queue;
564 } else {
565 wfo_wq = &slow_work_queue_waits_for_occupation;
566 queue = &slow_work_queue;
567 }
568
569 spin_lock_irqsave(&slow_work_queue_lock, flags);
570 if (likely(!test_bit(SLOW_WORK_CANCELLING, &work->flags))) {
571 clear_bit(SLOW_WORK_DELAYED, &work->flags);
572
573 if (test_bit(SLOW_WORK_EXECUTING, &work->flags)) {
574 /* we discard the reference the timer was holding in
575 * favour of the one the executor holds */
576 set_bit(SLOW_WORK_ENQ_DEFERRED, &work->flags);
577 put = true;
578 } else {
579 slow_work_mark_time(work);
580 list_add_tail(&work->link, queue);
581 queued = true;
582 if (work->link.prev == queue)
583 first = true;
584 }
585 }
586
587 spin_unlock_irqrestore(&slow_work_queue_lock, flags);
588 if (put)
589 slow_work_put_ref(work);
590 if (first)
591 wake_up(wfo_wq);
592 if (queued)
593 wake_up(&slow_work_thread_wq);
594 }
595
596 /**
597 * delayed_slow_work_enqueue - Schedule a delayed slow work item for processing
598 * @dwork: The delayed work item to queue
599 * @delay: When to start executing the work, in jiffies from now
600 *
601 * This is similar to slow_work_enqueue(), but it adds a delay before the work
602 * is actually queued for processing.
603 *
604 * The item can have delayed processing requested on it whilst it is being
605 * executed. The delay will begin immediately, and if it expires before the
606 * item finishes executing, the item will be placed back on the queue when it
607 * has done executing.
608 */
609 int delayed_slow_work_enqueue(struct delayed_slow_work *dwork,
610 unsigned long delay)
611 {
612 struct slow_work *work = &dwork->work;
613 unsigned long flags;
614 int ret;
615
616 if (delay == 0)
617 return slow_work_enqueue(&dwork->work);
618
619 BUG_ON(slow_work_user_count <= 0);
620 BUG_ON(!work);
621 BUG_ON(!work->ops);
622
623 if (test_bit(SLOW_WORK_CANCELLING, &work->flags))
624 return -ECANCELED;
625
626 if (!test_and_set_bit_lock(SLOW_WORK_PENDING, &work->flags)) {
627 spin_lock_irqsave(&slow_work_queue_lock, flags);
628
629 if (test_bit(SLOW_WORK_CANCELLING, &work->flags))
630 goto cancelled;
631
632 /* the timer holds a reference whilst it is pending */
633 ret = work->ops->get_ref(work);
634 if (ret < 0)
635 goto cant_get_ref;
636
637 if (test_and_set_bit(SLOW_WORK_DELAYED, &work->flags))
638 BUG();
639 dwork->timer.expires = jiffies + delay;
640 dwork->timer.data = (unsigned long) work;
641 dwork->timer.function = delayed_slow_work_timer;
642 add_timer(&dwork->timer);
643
644 spin_unlock_irqrestore(&slow_work_queue_lock, flags);
645 }
646
647 return 0;
648
649 cancelled:
650 ret = -ECANCELED;
651 cant_get_ref:
652 spin_unlock_irqrestore(&slow_work_queue_lock, flags);
653 return ret;
654 }
655 EXPORT_SYMBOL(delayed_slow_work_enqueue);
656
657 /*
658 * Schedule a cull of the thread pool at some time in the near future
659 */
660 static void slow_work_schedule_cull(void)
661 {
662 mod_timer(&slow_work_cull_timer,
663 round_jiffies(jiffies + SLOW_WORK_CULL_TIMEOUT));
664 }
665
666 /*
667 * Worker thread culling algorithm
668 */
669 static bool slow_work_cull_thread(void)
670 {
671 unsigned long flags;
672 bool do_cull = false;
673
674 spin_lock_irqsave(&slow_work_queue_lock, flags);
675
676 if (slow_work_cull) {
677 slow_work_cull = false;
678
679 if (list_empty(&slow_work_queue) &&
680 list_empty(&vslow_work_queue) &&
681 atomic_read(&slow_work_thread_count) >
682 slow_work_min_threads) {
683 slow_work_schedule_cull();
684 do_cull = true;
685 }
686 }
687
688 spin_unlock_irqrestore(&slow_work_queue_lock, flags);
689 return do_cull;
690 }
691
692 /*
693 * Determine if there is slow work available for dispatch
694 */
695 static inline bool slow_work_available(int vsmax)
696 {
697 return !list_empty(&slow_work_queue) ||
698 (!list_empty(&vslow_work_queue) &&
699 atomic_read(&vslow_work_executing_count) < vsmax);
700 }
701
702 /*
703 * Worker thread dispatcher
704 */
705 static int slow_work_thread(void *_data)
706 {
707 int vsmax, id;
708
709 DEFINE_WAIT(wait);
710
711 set_freezable();
712 set_user_nice(current, -5);
713
714 /* allocate ourselves an ID */
715 spin_lock_irq(&slow_work_queue_lock);
716 id = find_first_zero_bit(slow_work_ids, SLOW_WORK_THREAD_LIMIT);
717 BUG_ON(id < 0 || id >= SLOW_WORK_THREAD_LIMIT);
718 __set_bit(id, slow_work_ids);
719 slow_work_set_thread_pid(id, current->pid);
720 spin_unlock_irq(&slow_work_queue_lock);
721
722 sprintf(current->comm, "kslowd%03u", id);
723
724 for (;;) {
725 vsmax = vslow_work_proportion;
726 vsmax *= atomic_read(&slow_work_thread_count);
727 vsmax /= 100;
728
729 prepare_to_wait_exclusive(&slow_work_thread_wq, &wait,
730 TASK_INTERRUPTIBLE);
731 if (!freezing(current) &&
732 !slow_work_threads_should_exit &&
733 !slow_work_available(vsmax) &&
734 !slow_work_cull)
735 schedule();
736 finish_wait(&slow_work_thread_wq, &wait);
737
738 try_to_freeze();
739
740 vsmax = vslow_work_proportion;
741 vsmax *= atomic_read(&slow_work_thread_count);
742 vsmax /= 100;
743
744 if (slow_work_available(vsmax) && slow_work_execute(id)) {
745 cond_resched();
746 if (list_empty(&slow_work_queue) &&
747 list_empty(&vslow_work_queue) &&
748 atomic_read(&slow_work_thread_count) >
749 slow_work_min_threads)
750 slow_work_schedule_cull();
751 continue;
752 }
753
754 if (slow_work_threads_should_exit)
755 break;
756
757 if (slow_work_cull && slow_work_cull_thread())
758 break;
759 }
760
761 spin_lock_irq(&slow_work_queue_lock);
762 slow_work_set_thread_pid(id, 0);
763 __clear_bit(id, slow_work_ids);
764 spin_unlock_irq(&slow_work_queue_lock);
765
766 if (atomic_dec_and_test(&slow_work_thread_count))
767 complete_and_exit(&slow_work_last_thread_exited, 0);
768 return 0;
769 }
770
771 /*
772 * Handle thread cull timer expiration
773 */
774 static void slow_work_cull_timeout(unsigned long data)
775 {
776 slow_work_cull = true;
777 wake_up(&slow_work_thread_wq);
778 }
779
780 /*
781 * Start a new slow work thread
782 */
783 static void slow_work_new_thread_execute(struct slow_work *work)
784 {
785 struct task_struct *p;
786
787 if (slow_work_threads_should_exit)
788 return;
789
790 if (atomic_read(&slow_work_thread_count) >= slow_work_max_threads)
791 return;
792
793 if (!mutex_trylock(&slow_work_user_lock))
794 return;
795
796 slow_work_may_not_start_new_thread = true;
797 atomic_inc(&slow_work_thread_count);
798 p = kthread_run(slow_work_thread, NULL, "kslowd");
799 if (IS_ERR(p)) {
800 printk(KERN_DEBUG "Slow work thread pool: OOM\n");
801 if (atomic_dec_and_test(&slow_work_thread_count))
802 BUG(); /* we're running on a slow work thread... */
803 mod_timer(&slow_work_oom_timer,
804 round_jiffies(jiffies + SLOW_WORK_OOM_TIMEOUT));
805 } else {
806 /* ratelimit the starting of new threads */
807 mod_timer(&slow_work_oom_timer, jiffies + 1);
808 }
809
810 mutex_unlock(&slow_work_user_lock);
811 }
812
813 static const struct slow_work_ops slow_work_new_thread_ops = {
814 .owner = THIS_MODULE,
815 .execute = slow_work_new_thread_execute,
816 #ifdef CONFIG_SLOW_WORK_PROC
817 .desc = slow_work_new_thread_desc,
818 #endif
819 };
820
821 /*
822 * post-OOM new thread start suppression expiration
823 */
824 static void slow_work_oom_timeout(unsigned long data)
825 {
826 slow_work_may_not_start_new_thread = false;
827 }
828
829 #ifdef CONFIG_SYSCTL
830 /*
831 * Handle adjustment of the minimum number of threads
832 */
833 static int slow_work_min_threads_sysctl(struct ctl_table *table, int write,
834 void __user *buffer,
835 size_t *lenp, loff_t *ppos)
836 {
837 int ret = proc_dointvec_minmax(table, write, buffer, lenp, ppos);
838 int n;
839
840 if (ret == 0) {
841 mutex_lock(&slow_work_user_lock);
842 if (slow_work_user_count > 0) {
843 /* see if we need to start or stop threads */
844 n = atomic_read(&slow_work_thread_count) -
845 slow_work_min_threads;
846
847 if (n < 0 && !slow_work_may_not_start_new_thread)
848 slow_work_enqueue(&slow_work_new_thread);
849 else if (n > 0)
850 slow_work_schedule_cull();
851 }
852 mutex_unlock(&slow_work_user_lock);
853 }
854
855 return ret;
856 }
857
858 /*
859 * Handle adjustment of the maximum number of threads
860 */
861 static int slow_work_max_threads_sysctl(struct ctl_table *table, int write,
862 void __user *buffer,
863 size_t *lenp, loff_t *ppos)
864 {
865 int ret = proc_dointvec_minmax(table, write, buffer, lenp, ppos);
866 int n;
867
868 if (ret == 0) {
869 mutex_lock(&slow_work_user_lock);
870 if (slow_work_user_count > 0) {
871 /* see if we need to stop threads */
872 n = slow_work_max_threads -
873 atomic_read(&slow_work_thread_count);
874
875 if (n < 0)
876 slow_work_schedule_cull();
877 }
878 mutex_unlock(&slow_work_user_lock);
879 }
880
881 return ret;
882 }
883 #endif /* CONFIG_SYSCTL */
884
885 /**
886 * slow_work_register_user - Register a user of the facility
887 * @module: The module about to make use of the facility
888 *
889 * Register a user of the facility, starting up the initial threads if there
890 * aren't any other users at this point. This will return 0 if successful, or
891 * an error if not.
892 */
893 int slow_work_register_user(struct module *module)
894 {
895 struct task_struct *p;
896 int loop;
897
898 mutex_lock(&slow_work_user_lock);
899
900 if (slow_work_user_count == 0) {
901 printk(KERN_NOTICE "Slow work thread pool: Starting up\n");
902 init_completion(&slow_work_last_thread_exited);
903
904 slow_work_threads_should_exit = false;
905 slow_work_init(&slow_work_new_thread,
906 &slow_work_new_thread_ops);
907 slow_work_may_not_start_new_thread = false;
908 slow_work_cull = false;
909
910 /* start the minimum number of threads */
911 for (loop = 0; loop < slow_work_min_threads; loop++) {
912 atomic_inc(&slow_work_thread_count);
913 p = kthread_run(slow_work_thread, NULL, "kslowd");
914 if (IS_ERR(p))
915 goto error;
916 }
917 printk(KERN_NOTICE "Slow work thread pool: Ready\n");
918 }
919
920 slow_work_user_count++;
921 mutex_unlock(&slow_work_user_lock);
922 return 0;
923
924 error:
925 if (atomic_dec_and_test(&slow_work_thread_count))
926 complete(&slow_work_last_thread_exited);
927 if (loop > 0) {
928 printk(KERN_ERR "Slow work thread pool:"
929 " Aborting startup on ENOMEM\n");
930 slow_work_threads_should_exit = true;
931 wake_up_all(&slow_work_thread_wq);
932 wait_for_completion(&slow_work_last_thread_exited);
933 printk(KERN_ERR "Slow work thread pool: Aborted\n");
934 }
935 mutex_unlock(&slow_work_user_lock);
936 return PTR_ERR(p);
937 }
938 EXPORT_SYMBOL(slow_work_register_user);
939
940 /*
941 * wait for all outstanding items from the calling module to complete
942 * - note that more items may be queued whilst we're waiting
943 */
944 static void slow_work_wait_for_items(struct module *module)
945 {
946 DECLARE_WAITQUEUE(myself, current);
947 struct slow_work *work;
948 int loop;
949
950 mutex_lock(&slow_work_unreg_sync_lock);
951 add_wait_queue(&slow_work_unreg_wq, &myself);
952
953 for (;;) {
954 spin_lock_irq(&slow_work_queue_lock);
955
956 /* first of all, we wait for the last queued item in each list
957 * to be processed */
958 list_for_each_entry_reverse(work, &vslow_work_queue, link) {
959 if (work->owner == module) {
960 set_current_state(TASK_UNINTERRUPTIBLE);
961 slow_work_unreg_work_item = work;
962 goto do_wait;
963 }
964 }
965 list_for_each_entry_reverse(work, &slow_work_queue, link) {
966 if (work->owner == module) {
967 set_current_state(TASK_UNINTERRUPTIBLE);
968 slow_work_unreg_work_item = work;
969 goto do_wait;
970 }
971 }
972
973 /* then we wait for the items being processed to finish */
974 slow_work_unreg_module = module;
975 smp_mb();
976 for (loop = 0; loop < SLOW_WORK_THREAD_LIMIT; loop++) {
977 if (slow_work_thread_processing[loop] == module)
978 goto do_wait;
979 }
980 spin_unlock_irq(&slow_work_queue_lock);
981 break; /* okay, we're done */
982
983 do_wait:
984 spin_unlock_irq(&slow_work_queue_lock);
985 schedule();
986 slow_work_unreg_work_item = NULL;
987 slow_work_unreg_module = NULL;
988 }
989
990 remove_wait_queue(&slow_work_unreg_wq, &myself);
991 mutex_unlock(&slow_work_unreg_sync_lock);
992 }
993
994 /**
995 * slow_work_unregister_user - Unregister a user of the facility
996 * @module: The module whose items should be cleared
997 *
998 * Unregister a user of the facility, killing all the threads if this was the
999 * last one.
1000 *
1001 * This waits for all the work items belonging to the nominated module to go
1002 * away before proceeding.
1003 */
1004 void slow_work_unregister_user(struct module *module)
1005 {
1006 /* first of all, wait for all outstanding items from the calling module
1007 * to complete */
1008 if (module)
1009 slow_work_wait_for_items(module);
1010
1011 /* then we can actually go about shutting down the facility if need
1012 * be */
1013 mutex_lock(&slow_work_user_lock);
1014
1015 BUG_ON(slow_work_user_count <= 0);
1016
1017 slow_work_user_count--;
1018 if (slow_work_user_count == 0) {
1019 printk(KERN_NOTICE "Slow work thread pool: Shutting down\n");
1020 slow_work_threads_should_exit = true;
1021 del_timer_sync(&slow_work_cull_timer);
1022 del_timer_sync(&slow_work_oom_timer);
1023 wake_up_all(&slow_work_thread_wq);
1024 wait_for_completion(&slow_work_last_thread_exited);
1025 printk(KERN_NOTICE "Slow work thread pool:"
1026 " Shut down complete\n");
1027 }
1028
1029 mutex_unlock(&slow_work_user_lock);
1030 }
1031 EXPORT_SYMBOL(slow_work_unregister_user);
1032
1033 /*
1034 * Initialise the slow work facility
1035 */
1036 static int __init init_slow_work(void)
1037 {
1038 unsigned nr_cpus = num_possible_cpus();
1039
1040 if (slow_work_max_threads < nr_cpus)
1041 slow_work_max_threads = nr_cpus;
1042 #ifdef CONFIG_SYSCTL
1043 if (slow_work_max_max_threads < nr_cpus * 2)
1044 slow_work_max_max_threads = nr_cpus * 2;
1045 #endif
1046 #ifdef CONFIG_SLOW_WORK_PROC
1047 proc_create("slow_work_rq", S_IFREG | 0400, NULL,
1048 &slow_work_runqueue_fops);
1049 #endif
1050 return 0;
1051 }
1052
1053 subsys_initcall(init_slow_work);
This page took 0.050467 seconds and 6 git commands to generate.