2010年8月24日 星期二

[轉]linux內核同步


帖子 timeboy於2009-03-15 14:51
本文系本站原創,歡迎轉載! 
轉載請註明出處:http://ericxiao.cublog.cn/ 
------------------------------------------ 
同步是linux內核中一種很重要的操作.它為內核提供了一種臨界區和SMP系統中的數據保護機制.今天就來分析一下在linux內核是怎麼樣實現這些操作的. 
一:原子操作(摘自《understanding the linux kernel 2.4》) 
原子操作是指在執行過程中不能被打斷的操作.它包括以下幾種類型: 
進行一次或者零次對齊內存的訪問操作都是原子操作.因為這些指令一般都是單指令.不可能單指令在執行過程中被搶占。 
如果在讀操作之後,寫操作之前沒有其他處理器佔用內存總線。 那麼從內存中讀取數據,更新數據並把更新後的數據寫回內存的這些“讀—修改—寫”彙編語言指令是原子的。 例如:inc或dec 
操作碼前綴是lock字節的(0xf0)的“讀—修改—寫”彙編程序指令即使在SMP系統中也是原子的。 當控制單元檢測lock後,就會把總線鎖住,一直到這些加鎖指令執行完為止. 
操作碼前綴是一個rep字節的彙編語言指令都不是原子的。 這條指令強行讓控制單元執行相同的指令.控制單元在執行新的指令之前要檢查掛起的中斷. 
1.1: 在內核中提供了以下的幾組操作接口: 
void atomic_set(atomic_t *v, int i) 
int atomic_read(atomic_t *v) 
void atomic_add(int i, atomic_t *v) 
void atomic_sub(int i, atomic_t *v) 
void atomic_inc(atomic_t *v) 
void atomic_dec(atomic_t *v) 
int atomic_inc_and_test(atomic_t *v) 
int atomic_dec_and_test(atomic_t *v) 
int atomic_sub_and_test(int i, atomic_t *v) 
int atomic_add_negative(int i, atomic_t *v) 
int atomic_add_return(int i, atomic_t *v) 
int atomic_sub_return(int i, atomic_t *v) 
int atomic_inc_return(atomic_t *v) 
int atomic_dec_return(atomic_t *v) 
atomic_sub(amount, &first_atomic) 
atomic_add(amount, &second_atomic) 
上述原子操作對像是一個32位數。 之所以定義成這樣,是為了和一般的操作區別。 提醒用戶這是一個原子操作接口。 
1.2: 原子的位操作: 
void set_bit(nr, void *addr) 
void clear_bit(nr, void *addr) 
void change_bit(nr, void *addr) 
test_bit(nr, void *addr) 
int test_and_set_bit(nr, void *addr) 
int test_and_clear_bit(nr, void *addr) 
int test_and_change_bit(nr, void *addr) 
以atomic_inc(atomic_t *v), atomic_dec(atomic_t *v)為例來分析內核中對應的代碼. 
static __inline__ void atomic_dec(atomic_t *v) 
{ 
__asm__ __volatile__( 
LOCK_PREFIX "decl %0" 
:"+m" (v->counter)); 
} 
LOCK_PREFIX為鎖操作前綴 
Atomic_inc的代碼如下: 
static __inline__ void atomic_inc(atomic_t *v) 
{ 
__asm__ __volatile__( 
LOCK_PREFIX "incl %0" 
:"+m" (v->counter)); 
} 
二:自旋鎖上述的原子操作操作接口提供了單變量的保護操作,但是在實際項目中,情況並沒有這麼簡單,我們需要保護的是一個臨界區.所以我們需要用到其它的加鎖方法.自旋鎖就是其中之一. 
2.1:自旋鎖的特性自旋鎖只能被一個控制路徑所持有.進入臨界區,自旋鎖鎖住.其後如果有其它線程來取自旋鎖的自己就會原地“自旋” ,一直到持有者放開鎖為止. 
很顯然,當得不到鎖的時候,CPU就會在一直在做輪詢判斷.也就是說如果一個進程取不到自旋鎖.也會一直判斷下去,不會主動去調度其它進程代替當前進程.但是這樣鎖機制的開銷極少.可以快速的開鎖和加鎖.持有自旋鎖的進程一般都會對臨界區有快速的處理另外,內核還規定持有自旋鎖的進程不能被搶占,這樣做主要是出於系統性能考慮.如果一個持自旋鎖的進程被調度出CPU.那其它需要這些鎖的進程被調度進來之後,也只做“自旋”工作. 
2.2: 自旋鎖的實現自旋鎖在內核中對應的類型為: spinlock_t 
typedef struct { 
raw_spinlock_t raw_lock; 
#if defined(CONFIG_PREEMPT) && defined(CONFIG_SMP) 
//配置了內核搶占與SMP 
unsigned int break_lock; 
#endif 
//自旋鎖DEBUG 
#ifdef CONFIG_DEBUG_SPINLOCK 
unsigned int magic, owner_cpu; 
void *owner; 
#endif 
#ifdef CONFIG_DEBUG_LOCK_ALLOC 
struct lockdep_map dep_map; 
#endif 
} spinlock_t; 

typedef struct { 
unsigned int slock; 
} raw_spinlock_t; 
在接下來的代碼中再來分析結構中各成員的含義. 
2.2.1:spin_lock_init :自旋鎖的初始化代碼如下: 
# define spin_lock_init(lock) \ 
do { *(lock) = SPIN_LOCK_UNLOCKED; } while (0) 
#define SPIN_LOCK_UNLOCKED __SPIN_LOCK_UNLOCKED(old_style_spin_init) 
# define __SPIN_LOCK_UNLOCKED(lockname) \ 
(spinlock_t) { .raw_lock = __RAW_SPIN_LOCK_UNLOCKED, \ 
SPIN_DEP_MAP_INIT(lockname) } 
#define __RAW_SPIN_LOCK_UNLOCKED { 1 } 
上述操作就是把sipinlock_t中的raw_lock置為__RAW_SPIN_LOCK_UNLOCKED.即為1. 
此時鎖是打開狀態. 

2.2.2: spin_lock() :獲取自旋鎖. 
#define spin_lock(lock) _spin_lock(lock) 
_spin_lock()定義如下: 
void __lockfunc _spin_lock(spinlock_t *lock) 
{ 
//禁止搶占 
preempt_disable(); 
//這個函數在沒有定義自旋鎖調試的時候是空函數 
spin_acquire(&lock->dep_map, 0, 0, _RET_IP_); 
//相當於_raw_spin_lock(lock) 
LOCK_CONTENDED(lock, _raw_spin_trylock, _raw_spin_lock); 
} 
LOCK_CONTENDED的定義如下: 
#define LOCK_CONTENDED(_lock, try, lock) \ 
lock(_lock) 
_raw_spin_lock()的操作如下: 
# define _raw_spin_lock(lock) __raw_spin_lock(&(lock)->raw_lock) 

static inline void __raw_spin_lock(raw_spinlock_t *lock) 
{ 
asm volatile( 
"\n1:\t" 
//decl: lock->slock減1 
LOCK_PREFIX " ; decl %0\n\t" 
//如果不為負.跳轉到2f.2f後面沒有任何指令.即為退出 
"jns 2f\n" 
"3:\n" 
//重複執行nop.nop是x86的小延遲函數 
"rep;nop\n\t" 
//比較0與lock->slock的值,如果lock->slock不大於0.跳轉到標號3.即繼續重複執行nop 
"cmpl $0,%0\n\t" 
//如果lock->slock不大於0.跳轉到標號3.即繼續重複執行nop 
"jle 3b\n\t" 
//如果lock->slock大於0.跳轉到標號1.重新判斷鎖的slock成員 
"jmp 1b\n" 
"2:\t" : "=m" (lock->slock) : : "memory"); 
} 
在上面的函數中,可能對"jmp 1b\n"比較難以理解.在我們一般的觀念裡.獲得一個鎖.將其值減1.釋放鎖時將其值加1.實際上在自旋鎖的實現中lock->slock只有兩個可能值,一個是0. 一個是1.釋放鎖的時候並不是將lock->slock加1.而是將其賦為1.在後面的自旋鎖釋放代碼中再詳細分析. 

# define spin_unlock(lock) _spin_unlock(lock) 
void __lockfunc _spin_unlock(spinlock_t *lock) 
{ 
//在沒有配置自旋鎖調試的時候.該函數為空函數 
spin_release(&lock->dep_map, 1, _RET_IP_); 
// 
_raw_spin_unlock(lock); 
//釋放自旋鎖了.允許內核搶占 
preempt_enable(); 
} 
# define _raw_spin_unlock(lock) __raw_spin_unlock(&(lock)->raw_lock) 
static inline void __raw_spin_unlock(raw_spinlock_t *lock) 
{ 
//將lock->slock賦為1 
asm volatile("movb $1,%0" : "+m" (lock->slock) :: "memory"); 
} 
總體來說,釋放自旋鎖的分為兩步: 
第一步:將鎖的slock字段回歸到1.注意這裡不是加1,而是賦值1 
第二步:允許內核搶占,與加鎖時的禁止內核搶占相對應 

在內核中還有幾個與硬中斷和軟中斷有關的幾個自旋鎖API.如下所示: 
void spin_lock(spinlock_t *lock) 
void spin_lock_irqsave(spinlock_t *lock, unsigned long flags) 
void spin_lock_irq(spinlock_t *lock) 
void spin_lock_bh(spinlock_t *lock) 

void spin_unlock(spinlock_t *lock); 
void spin_unlock_irqrestore(spinlock_t *lock, unsigned long flags); 
void spin_unlock_irq(spinlock_t *lock); 
void spin_unlock_bh(spinlock_t *lock); 
代碼都不復雜,可自行結合前面分析的IRQ中斷部份理解這幾個API的操作. 

三:信號量的操作 
3.1:信號量的特點: 
根據前面的分析看到.自旋鎖是一種輕快但效率極其低下的一種加鎖,對於需要對臨界區長時間操作的情況下不太合適.因此,需要一種等待獲鎖的機制.這就是信號量.信號量在請求鎖的時候,如果不成功,便會將進程投入睡眠.因此要在允許睡眠的場合使用信號量. 
3.2:信號量的實現信號量的結構如下所示: 
struct semaphore { 
//信號量資源數 
atomic_t count; 
//是否有一些進程在睡眠 
int sleepers; 
//信號的等待隊列 
wait_queue_head_t wait; 
} 
3.3: sema_init() :信號量的初始化 
static inline void sema_init (struct semaphore *sem, int val) 
{ 
//設置信號量的資源數為val.sem->count表示允許同時有多少個進程佔有此信號量 
atomic_set(&sem->count, val); 
//sleepers成員置0,表示沒有進程在等待 
sem->sleepers = 0; 
//初始化等待隊列 
init_waitqueue_head(&sem->wait); 
} 
3.4:down :獲取信號量 
static inline void down(struct semaphore * sem) 
{ 
//可能會引起睡眠 
might_sleep(); 
__asm__ __volatile__( 
"# atomic down operation\n\t" 
//LOCK:鎖定總線標誌 
//使sem->count -1 如果小於0.跳轉到2 
LOCK "decl %0\n\t" /* --sem->count */ 
"js 2f\n" 
//後面沒有指令了,退出 
"1:\n" 
LOCK_SECTION_START("") 
//如果sem->count -1小於0.說明該信號已經沒有足夠的資源了.調用__down_failed 
"2:\tcall __down_failed\n\t" 
"jmp 1b\n" 
LOCK_SECTION_END 
:"=m" (sem->count) 
:"c" (sem) 
:"memory"); 
} 
__down_failed的代碼如下: 
asm( 
".section .sched.text\n" 
".align 4\n" 
".globl __down_failed\n" 
"__down_failed:\n\t" 
#if defined(CONFIG_FRAME_POINTER) 
"pushl %ebp\n\t" 
"movl %esp,%ebp\n\t" 
#endif 
//之所以把eax edx ecx壓棧,是因為__down()函數的參數就是eax edx ecx.它從堆棧中取參數.所以//將其入棧.這樣做是從2.4遺留下來的,因為2.4的__down函數為fastcall型 
"pushl %eax\n\t" 
"pushl %edx\n\t" 
"pushl %ecx\n\t" 
"call __down\n\t" 
"popl %ecx\n\t" 
"popl %edx\n\t" 
"popl %eax\n\t" 
#if defined(CONFIG_FRAME_POINTER) 
"movl %ebp,%esp\n\t" 
"popl %ebp\n\t" 
#endif 
"ret" 
) 
__down的代碼如下: 
asmlinkage void __sched __down(struct semaphore * sem) 
{ 
struct task_struct *tsk = current; 
//聲明一個等待隊列 
DECLARE_WAITQUEUE(wait, tsk); 
unsigned long flags; 

//將進程狀態置為不可中斷型.外部信號不可以將其中斷 
tsk->state = TASK_UNINTERRUPTIBLE; 
//獲取自旋鎖且關中斷 
spin_lock_irqsave(&sem->wait.lock, flags); 
//加入等待隊列,並將等待標誌高為WQ_FLAG_EXCLUSIVE 
add_wait_queue_exclusive_locked(&sem->wait, &wait); 

//sleepers之前只能為0或者是1 
sem->sleepers++; 
for (;;) { 
int sleepers = sem->sleepers; 

/* 
* Add "everybody else" into it. They aren't 
* playing, because we own the spinlock in 
* the wait_queue_head. 
*/ 
//如果之前有進程在線程,則使sem->count+1 (剛好使count等於down進來的值) 
//因為之前在down()中有減1操作 

//負數返回真 
if (!atomic_add_negative(sleepers - 1, &sem->count)) { 
//已經有信號了,將sleeps置零,退出循環後會喚醒等列中的一個進程 
sem->sleepers = 0; 
break; 
} 
//將sleepers 置1 
sem->sleepers = 1; /* us - see -1 above */ 
//釋放自旋量並恢復之前的中斷標誌 
spin_unlock_irqrestore(&sem->wait.lock, flags); 

//重新調度 
schedule(); 

//重新調度後,因為其狀態是TASK_UNINTERRUPTIBLE .所以要釋放信號量的進程將其喚醒 
//才能繼續運行 
spin_lock_irqsave(&sem->wait.lock, flags); 
//將狀態再設為TASK_UNINTERRUPTIBLE . 重新判斷是否有信號量資源 
tsk->state = TASK_UNINTERRUPTIBLE; 
} 

//已經有信號量資源了,將等待隊列刪除 
remove_wait_queue_locked(&sem->wait, &wait); 
//喚醒其它隊列 
wake_up_locked(&sem->wait); 
//恢復中斷標誌和釋放自旋鎖 
spin_unlock_irqrestore(&sem->wait.lock, flags); 
tsk->state = TASK_RUNNING; 
} 
這裡要注意的是對sleeper成員的理解.結合後面的代碼.進程在沒有獲得鎖時會將其設為1.在獲得了鎖的時候將其設為0.所以,sleeper=1時表示有進程在等待.sleeper=0時,有進程獲得信號量退出,或者沒有進程在等待此信號量. 
如果在等待隊列中進程被喚醒,並獲得了鎖.則將sleeper設為0.然後用break退出循環,再用wake_up_locked(&sem->wait)喚醒等待隊列中的一個進程,這個進程下次被調度的時候從shedule()後面開始運行.更改進程狀態好,調用if (!atomic_add_negative(sleepers - 1, &sem->count))判斷,此時sleeper=0.sleep-1 = -1.經過atomic_add_negative()使sem->count-1了.因此前面論述過.進程進來取信號量down()裡count有減1.然後如果有等待線程在獲取這個信號,剛又會將count+1,使其變到了原值.現在在我們這個情景中,已經有線程進程去了,應該要+1. 
那思考一下:為什麼要對count sleeper做這樣的處理呢?直接使用使用計數不就完了嗎?進程等待獲取信號量的時候sleep+1 count-1.釋放信號量的時候sleep-1.count+1不可以嗎? 
表面上上述的方法也能工作的很好,但是如果進程數目一多就要考慮到溢出問題了. 
Linux在這裡設計的很巧妙,可以好好的回味. 

3.5:up(): 釋放信號 
static inline void up(struct semaphore * sem) 
{ 
__asm__ __volatile__( 
"# atomic up operation\n\t" 
//sem->count 計數加1 
LOCK "incl %0\n\t" /* ++sem->count */ 
//如果大於0的話,說明信號量資源充足,沒有進程在等待 
//無需喚醒等待隊列.直接退出即可 
"jle 2f\n" 
"1:\n" 
LOCK_SECTION_START("") 
//否則喚醒隊列 
"2:\tcall __up_wakeup\n\t" 
"jmp 1b\n" 
LOCK_SECTION_END 
".subsection 0\n" 
:"=m" (sem->count) 
:"c" (sem) 
:"memory"); 
} 
asm( 
".section .sched.text\n" 
".align 4\n" 
".globl __up_wakeup\n" 
"__up_wakeup:\n\t" 
"pushl %eax\n\t" 
"pushl %edx\n\t" 
"pushl %ecx\n\t" 
"call __up\n\t" 
"popl %ecx\n\t" 
"popl %edx\n\t" 
"popl %eax\n\t" 
"ret" 
); 
同理調用__up()進行處理 
asmlinkage void __up(struct semaphore *sem) 
{ 
wake_up(&sem->wait); 
} 
這個函數很簡單,就是將信號量中的等待隊列喚醒.注意之前提到過的,加入等待隊列中所用的標誌WQ_FLAG_EXCLUSIVE.這個標誌表示只需將掛成隊列前面的進程喚醒.無需全部喚醒. 
信號量的使用內核中還有很多的變體API.究其原理都差不多.可以自行了解^_^ 

四:完成變量 
4.1:完全變量的特點在內核中經常有這樣的需要,在一個進程中要等待某個進程完成某個事情.內核為其提供了一個簡單的接口,其實它只是一個簡化版的信號量而已. 
4.2:完全變量的結構完全變量在代碼中的結構如下: 
struct completion { 
//等待標誌 
unsigned int done; 
//等待隊列 
wait_queue_head_t wait; 
}; 
4.3:完全變量的操作與實現 
wait_for_completion():等待條件的完成,它的代碼如下: 
void __sched wait_for_completion(struct completion *x) 
{ 
wait_for_common(x, MAX_SCHEDULE_TIMEOUT, TASK_UNINTERRUPTIBLE); 
} 
wait_for_common() à do_wait_for_common(): 
static inline long __sched 
do_wait_for_common(struct completion *x, long timeout, int state) 
{ 
//!x->done:條件末完成.投入睡眠等待 
if (!x->done) { 
DECLARE_WAITQUEUE(wait, current); 

wait.flags |= WQ_FLAG_EXCLUSIVE; 
//加入等待隊列 
__add_wait_queue_tail(&x->wait, &wait); 
do { 
//如果是可中斷狀態且有末處理的信號。 將其移出等待隊列 
if (state == TASK_INTERRUPTIBLE && 
signal_pending(current)) { 
__remove_wait_queue(&x->wait, &wait); 
return -ERESTARTSYS; 
} 
__set_current_state(state); 
spin_unlock_irq(&x->wait.lock); 
timeout = schedule_timeout(timeout); 
spin_lock_irq(&x->wait.lock); 
if (!timeout) { 
//延時到了.將其移出等待隊列 
__remove_wait_queue(&x->wait, &wait); 
return timeout; 
} 
} while (!x->done); 
//運行到這裡,條件已經完成了,將其從等待隊列移出 
__remove_wait_queue(&x->wait, &wait); 
} 
//如果條件完成了,將x->done-- 直接返回 
x->done--; 
return timeout; 
} 

Complete()來用通告已經完成了這個條件.代碼如下: 
void complete(struct completion *x) 
{ 
unsigned long flags; 

//持完全變量的自旋鎖且禁中斷 
spin_lock_irqsave(&x->wait.lock, flags); 
//x- 
x->done++; 
__wake_up_common(&x->wait, TASK_UNINTERRUPTIBLE | TASK_INTERRUPTIBLE, 
1, 0, NULL); 
//釋放自旋鎖且恢復中斷到以前的狀態 
spin_unlock_irqrestore(&x->wait.lock, flags); 
} 
__wake_up_common()的代碼如下所示: 
static void __wake_up_common(wait_queue_head_t *q, unsigned int mode, 
int nr_exclusive, int sync, void *key) 
{ 
wait_queue_t *curr, *next; 

//遍歷等待隊列且喚醒等待的進程 
list_for_each_entry_safe(curr, next, &q->task_list, task_list) { 
unsigned flags = curr->flags; 

if (curr->func(curr, mode, sync, key) && 
(flags & WQ_FLAG_EXCLUSIVE) && !--nr_exclusive) 
break; 
} 
} 
分析完了信號量之後,看完全變量的實現還是挺簡單.另外需要注意的是信號量與完成變量裡的自旋鎖的使用.信號量是在其中的一段操作中使用了自旋鎖.這是為了在這一段操作中保持串行化.而完全變量是一開始就加鎖.不允許wait_for_completion()和completion()同時調用. 

五:讀寫信號量 
5.1:讀寫信號量的特點通常受保護的數據是允許多個進程去讀的。 只需要保護寫操作就可以了。 如果多個讀者去排隊等待其它讀者的操作完成的話,顯然是有失效率的。 讀寫信號量就是為了解瘊這個問題而產生的。 讀寫信號量的等待隊列是一個嚴格的FIFO.有以下幾個特點: 
1):讀者去獲取信號量的時候: 
1:如果臨界區是空的或者有其它讀者在操作,以時如果沒有寫者在排隊,就把讀者放進去。 如果有寫者在排隊,就把讀者加入到等待隊列。 
2:如果臨界區有寫者在操作。 就把這個讀者加入等待隊列。 
2):寫者去獲取信號量的時候: 
1:如果臨界區是空的且等待隊列為空。 放這個寫者進去。 如果等待隊不為空,就將其加至等待隊列 
2:如果臨界區中有讀者在操作,就將其加入到等待隊列. 
3):釋放信號量的時候: 
1:如果這個進程是讀者。 檢查是不是臨界區內的最後一個讀者。 如果是最後一個讀者,檢查是否有寫者在等待。 如果有。 將寫者喚醒。如果沒有,直接退出。 
2:如果釋放信號量是一個寫者,通常會喚醒等待隊列中的第一個進程 
2.1:如果這個進程是讀者。 那它後面的所有讀者也會被喚醒(不包含寫者後面的) 
2.2:如果這個進程是寫者。 那麼它後面的所有等待進程繼續睡眠. 
總之:在這裡要注意對待讀者與寫者的區別。 在臨界區中可以允許有多個讀者,但不允許有多個寫者。 

5.2:讀寫信號量結構: 
讀寫信號量在內核中對應的數據結構為: 
struct rw_semaphore { 
//activity = 0:表示沒有對像在操作信號量 
//activity > 0:表示有一個或者多個讀者在操作信號量 
//activity < 0:表示有一個寫者在操作信號量 
__s32 activity; 
//讀寫信號量對應的自旋鎖 
spinlock_t wait_lock; 
//對應的等待隊列 
struct list_head wait_list; 
#if RWSEM_DEBUG 
int debug; 
#endif 
} 

5.3:讀寫信號量的操作與實現: 
5.3.1:down_read():讀者獲得信號量 
down_read()用來在讀者操作臨界區的情況。 它的代碼如下: 
static inline void down_read(struct rw_semaphore *sem) 
{ 
//可能會引起睡眠 
might_sleep(); 
//調試用 
rwsemtrace(sem,"Entering down_read"); 
__down_read(sem); 
//調試用 
rwsemtrace(sem,"Leaving down_read"); 
} 
__down_read()的代碼如下: 
void fastcall __sched __down_read(struct rw_semaphore *sem) 
{ 
struct rwsem_waiter waiter; 
struct task_struct *tsk; 

rwsemtrace(sem, "Entering __down_read"); 

//獲取自旋鎖 
spin_lock(&sem->wait_lock); 

//sem->activeity>=0:說明有讀者進入了臨界區 
//sem->wait_list:等待隊列 
//如果activeity>=0. wait_list不為空:說明有寫者在等待 
//如果activeity<0:說明有寫者在對臨界區操作 

//如果臨界區中有讀者然後沒有寫者在等待.可以直接獲得此信號量 
if (sem->activity >= 0 && list_empty(&sem->wait_list)) { 
/* granted */ 
sem->activity++; 
spin_unlock(&sem->wait_lock); 
goto out; 
} 

//否則,將讀者加入等待隊列末尾 
tsk = current; 
set_task_state(tsk, TASK_UNINTERRUPTIBLE); 

/* set up my own style of waitqueue */ 
waiter.task = tsk; 
//對應的等待標誌是RWSEM_WAITING_FOR_READ 
waiter.flags = RWSEM_WAITING_FOR_READ; 
get_task_struct(tsk); 

list_add_tail(&waiter.list, &sem->wait_list); 

/* we don't need to touch the semaphore struct anymore */ 
spin_unlock(&sem->wait_lock); 

/* wait to be given the lock */ 
//一個死循環,這樣是為了是被期待的喚醒 
for (;;) { 
//如果waiter.task==NULL.確實是被喚醒的.因為喚醒的時候,會將waiter,task清空 
if (!waiter.task) 
break; 
//睡眠 
schedule(); 
//設置進程狀態為不可中斷型 
set_task_state(tsk, TASK_UNINTERRUPTIBLE); 
} 

//已經獲得信號量了.設置狀態為RUNNING 退出 
tsk->state = TASK_RUNNING; 

out: 
rwsemtrace(sem, "Leaving __down_read"); 
} 
5.3.2: up_read():讀者釋放信號量讀者操作完臨界區了之後,使用up_read()將信號量釋放。 代碼如下: 
up_read()à 
void fastcall __up_read(struct rw_semaphore *sem) 
{ 
rwsemtrace(sem, "Entering __up_read"); 

spin_lock(&sem->wait_lock); 

//--sem->activity == 0:此讀者是臨界區中的最後一個對象 
//sem->wait_list不為空說明臨界區外有寫者在等待 
if (--sem->activity == 0 && !list_empty(&sem->wait_list)) 
//喚醒等待隊列中的第一個進程 
sem = __rwsem_wake_one_writer(sem); 

//如果不是臨界區中的最後一個讀者或者是沒有寫者在等待,退出 
spin_unlock(&sem->wait_lock); 

rwsemtrace(sem, "Leaving __up_read"); 
} 
__rwsem_wake_one_writer()代碼如下: 
static inline struct rw_semaphore * 
__rwsem_wake_one_writer(struct rw_semaphore *sem) 
{ 
struct rwsem_waiter *waiter; 
struct task_struct *tsk; 

//將activity置為-1.因為臨界區是被寫者佔據了 
sem->activity = -1; 

//取等待隊列中的第一個進程 
waiter = list_entry(sem->wait_list.next, struct rwsem_waiter, list); 
//將其從等待鍊錶中刪除 
list_del(&waiter->list); 

tsk = waiter->task; 
mb(); 
//將waiter->task清空 
waiter->task = NULL; 
//喚醒過程 
wake_up_process(tsk); 
//減少進程引用計數,因為在把它投入隊列的時候被+1 了.這樣做是為了防止task結構被釋放掉 
put_task_struct(tsk); 
return sem; 
} 
注意裡面對activity和waiter->task的操作. 
5.3.3: down_write():寫者獲得信號量在更改臨界區的情況下,使用down_write()獲得信號量。 代碼如下: 
down_write() à __down_write(): 
void fastcall __sched __down_write(struct rw_semaphore *sem) 
{ 
struct rwsem_waiter waiter; 
struct task_struct *tsk; 

rwsemtrace(sem, "Entering __down_write"); 

spin_lock(&sem->wait_lock); 

//如果臨界區沒有對像在操作.且沒有其它的寫者在等待 
if (sem->activity == 0 && list_empty(&sem->wait_list)) { 
/* granted */ 
//獲得信號 
sem->activity = -1; 
spin_unlock(&sem->wait_lock); 
goto out; 
} 

//如果臨界區被其它對象佔據或者有其它的寫者在等待,將其加到等待隊列末尾 
tsk = current; 
set_task_state(tsk, TASK_UNINTERRUPTIBLE); 

/* set up my own style of waitqueue */ 
waiter.task = tsk; 
//置標誌為RWSEM_WAITING_FOR_WRITE 
waiter.flags = RWSEM_WAITING_FOR_WRITE; 
get_task_struct(tsk); 

list_add_tail(&waiter.list, &sem->wait_list); 

/* we don't need to touch the semaphore struct anymore */ 
spin_unlock(&sem->wait_lock); 

/* wait to be given the lock */ 
//判斷是否被正常喚醒 
for (;;) { 
if (!waiter.task) 
break; 
schedule(); 
set_task_state(tsk, TASK_UNINTERRUPTIBLE); 
} 

tsk->state = TASK_RUNNING; 

out: 
rwsemtrace(sem, "Leaving __down_write"); 
} 
注意讀者進入臨界的判斷條件和寫者進入臨界區的判斷條件的不同.讀者只需要sem->activity >= 0且沒有等待隊列就可以進行。 寫者必須要求sem->activity == 0且無等待隊列. 
5.3.4:up_write():寫者釋放信號量寫者操作完了之後,調用up_write()釋放信號量。 代碼如下: 
Up_write() à __up_write(): 
void fastcall __up_write(struct rw_semaphore *sem) 
{ 
rwsemtrace(sem, "Entering __up_write"); 

spin_lock(&sem->wait_lock); 

//寫者釋放信號量.將activity重置為0.因為臨界區已經沒有對像在操作了 
sem->activity = 0; 
//如果有進程在等待,將其喚醒 
if (!list_empty(&sem->wait_list)) 
sem = __rwsem_do_wake(sem, 1); 

spin_unlock(&sem->wait_lock); 

rwsemtrace(sem, "Leaving __up_write"); 
} 
寫進程釋放信號喚醒其它進程的處理有點特別,代碼如下: 
static inline struct rw_semaphore * 
__rwsem_do_wake(struct rw_semaphore *sem, int wakewrite) 
{ 
struct rwsem_waiter *waiter; 
struct task_struct *tsk; 
int woken; 

rwsemtrace(sem, "Entering __rwsem_do_wake"); 

waiter = list_entry(sem->wait_list.next, struct rwsem_waiter, list); 

//在__up_write()調用的時候.wakewrite是置為1的.表示是一個寫者在進行喚醒 
if (!wakewrite) { 
if (waiter->flags & RWSEM_WAITING_FOR_WRITE) 
goto out; 
goto dont_wake_writers; 
} 

/* if we are allowed to wake writers try to grant a single write lock 
* if there's a writer at the front of the queue 
* - we leave the 'waiting count' incremented to signify potential 
* contention 
*/ 

//如果等待隊列的第一個進程是一個寫等待,只需要將這個寫者喚醒就行了 
if (waiter->flags & RWSEM_WAITING_FOR_WRITE) { 
sem->activity = -1; 
list_del(&waiter->list); 
tsk = waiter->task; 
/* Don't touch waiter after ->task has been NULLed */ 
mb(); 
waiter->task = NULL; 
wake_up_process(tsk); 
put_task_struct(tsk); 
goto out; 
} 

/* grant an infinite number of read locks to the front of the queue */ 
dont_wake_writers: 
//如果第一個進程是讀等待,將將其後所有為讀等待的進程喚醒 
woken = 0; 
while (waiter->flags & RWSEM_WAITING_FOR_READ) { 
struct list_head *next = waiter->list.next; 

list_del(&waiter->list); 
tsk = waiter->task; 
mb(); 
waiter->task = NULL; 
wake_up_process(tsk); 
put_task_struct(tsk); 
woken++; 
if (list_empty(&sem->wait_list)) 
break; 
waiter = list_entry(next, struct rwsem_waiter, list); 
} 

sem->activity += woken; 

out: 
rwsemtrace(sem, "Leaving __rwsem_do_wake"); 
return sem; 
} 
六:小結在linux內核中還有其它的幾種同步機制,我在這裡沒有一一列出分析。 在上面分析的幾種機制中,設得都很巧妙。 需要慢慢的琢磨。

沒有留言:

張貼留言