2010年8月30日 星期一

2010年8月26日 星期四

C++問題 (轉)

1. 什麼是virtual inheritance? 請舉例說明。
2. 什麼是virtual function? 什麼是pure virtual function? 
3. pointer與reference的差別?
4. 什麼是static_cast, dynamic_cast, reinterpret_cast, const_cast? 他們之間的差異是?
5. run-time polymorphism與compile-time polymorphism的差異? 分別要如何達成?
6. 什麼是explicit constructor?
7. 什麼是template disambiguator? 請舉出例子。
8. 有沒有聽過boost or loki? 有沒有用過boost or loki? 請舉出例子。
9. 有沒有聽過CRTP (Curiously Recurring Template Pattern)與PBCP (Parametric Base Class Pattern)? 請寫出兩者的sample code並比較它們的差異。
10. 請問在C++中, struct與class最大的差異是?
11. 請問什麼是template partial specialization?
12. 什麼是type traits? 什麼是tag dispatch? 請以簡單的程式碼說明之。
13. 有聽過template metaprogramming嗎? 請舉例解釋之。
14. 什麼是SFINAE? 請解釋之。
15. 為什麼常常看到template的程式碼寫在hpp裡,而不是放在cpp檔中?
16. 什麼是functor? 用過(或寫過)任何的functor library嗎? 例如boost function/bind或是fast delegate?
17. std::list與std::vector的差異是什麼? std::map裡的資料結構是如何實作的?
18. 何謂copy constructor? 如何阻止一個class被copy?
19. virtual function是如何實作的? 什麼是vtbl?
20. 在template的宣告裡,typename與class的差異是? 為什麼有typename這個keyword?
21. 什麼是forward declaration? 
22. 如何解決header file mutual inclusion的問題?
23. 什麼是templated class? 什麼是class template?
24. 什麼是wchar_t? 有沒有用過std::locale跟std::codecvt?
25. 用過那些c++ compiler? 有沒有聽過c++0x? 試舉出c++0x的新功能。



轉自
http://nctusdk.blogspot.com/2009/07/c.html

建構式與解構式

http://imandrewliao.blogspot.com/2009/01/blog-post_22.html

2010年8月24日 星期二

spinlock

http://translate.google.com.tw/translate?hl=zh-TW&sl=zh-CN&tl=zh-TW&u=http://blog.chinaunix.net/u3/108974/showart_2151612.html

[轉]linux內核同步


帖子 timeboy於2009-03-15 14:51
本文系本站原創,歡迎轉載! 
轉載請註明出處:http://ericxiao.cublog.cn/ 
------------------------------------------ 
同步是linux內核中一種很重要的操作.它為內核提供了一種臨界區和SMP系統中的數據保護機制.今天就來分析一下在linux內核是怎麼樣實現這些操作的. 
一:原子操作(摘自《understanding the linux kernel 2.4》) 
原子操作是指在執行過程中不能被打斷的操作.它包括以下幾種類型: 
進行一次或者零次對齊內存的訪問操作都是原子操作.因為這些指令一般都是單指令.不可能單指令在執行過程中被搶占。 
如果在讀操作之後,寫操作之前沒有其他處理器佔用內存總線。 那麼從內存中讀取數據,更新數據並把更新後的數據寫回內存的這些“讀—修改—寫”彙編語言指令是原子的。 例如:inc或dec 
操作碼前綴是lock字節的(0xf0)的“讀—修改—寫”彙編程序指令即使在SMP系統中也是原子的。 當控制單元檢測lock後,就會把總線鎖住,一直到這些加鎖指令執行完為止. 
操作碼前綴是一個rep字節的彙編語言指令都不是原子的。 這條指令強行讓控制單元執行相同的指令.控制單元在執行新的指令之前要檢查掛起的中斷. 
1.1: 在內核中提供了以下的幾組操作接口: 
void atomic_set(atomic_t *v, int i) 
int atomic_read(atomic_t *v) 
void atomic_add(int i, atomic_t *v) 
void atomic_sub(int i, atomic_t *v) 
void atomic_inc(atomic_t *v) 
void atomic_dec(atomic_t *v) 
int atomic_inc_and_test(atomic_t *v) 
int atomic_dec_and_test(atomic_t *v) 
int atomic_sub_and_test(int i, atomic_t *v) 
int atomic_add_negative(int i, atomic_t *v) 
int atomic_add_return(int i, atomic_t *v) 
int atomic_sub_return(int i, atomic_t *v) 
int atomic_inc_return(atomic_t *v) 
int atomic_dec_return(atomic_t *v) 
atomic_sub(amount, &first_atomic) 
atomic_add(amount, &second_atomic) 
上述原子操作對像是一個32位數。 之所以定義成這樣,是為了和一般的操作區別。 提醒用戶這是一個原子操作接口。 
1.2: 原子的位操作: 
void set_bit(nr, void *addr) 
void clear_bit(nr, void *addr) 
void change_bit(nr, void *addr) 
test_bit(nr, void *addr) 
int test_and_set_bit(nr, void *addr) 
int test_and_clear_bit(nr, void *addr) 
int test_and_change_bit(nr, void *addr) 
以atomic_inc(atomic_t *v), atomic_dec(atomic_t *v)為例來分析內核中對應的代碼. 
static __inline__ void atomic_dec(atomic_t *v) 
{ 
__asm__ __volatile__( 
LOCK_PREFIX "decl %0" 
:"+m" (v->counter)); 
} 
LOCK_PREFIX為鎖操作前綴 
Atomic_inc的代碼如下: 
static __inline__ void atomic_inc(atomic_t *v) 
{ 
__asm__ __volatile__( 
LOCK_PREFIX "incl %0" 
:"+m" (v->counter)); 
} 
二:自旋鎖上述的原子操作操作接口提供了單變量的保護操作,但是在實際項目中,情況並沒有這麼簡單,我們需要保護的是一個臨界區.所以我們需要用到其它的加鎖方法.自旋鎖就是其中之一. 
2.1:自旋鎖的特性自旋鎖只能被一個控制路徑所持有.進入臨界區,自旋鎖鎖住.其後如果有其它線程來取自旋鎖的自己就會原地“自旋” ,一直到持有者放開鎖為止. 
很顯然,當得不到鎖的時候,CPU就會在一直在做輪詢判斷.也就是說如果一個進程取不到自旋鎖.也會一直判斷下去,不會主動去調度其它進程代替當前進程.但是這樣鎖機制的開銷極少.可以快速的開鎖和加鎖.持有自旋鎖的進程一般都會對臨界區有快速的處理另外,內核還規定持有自旋鎖的進程不能被搶占,這樣做主要是出於系統性能考慮.如果一個持自旋鎖的進程被調度出CPU.那其它需要這些鎖的進程被調度進來之後,也只做“自旋”工作. 
2.2: 自旋鎖的實現自旋鎖在內核中對應的類型為: spinlock_t 
typedef struct { 
raw_spinlock_t raw_lock; 
#if defined(CONFIG_PREEMPT) && defined(CONFIG_SMP) 
//配置了內核搶占與SMP 
unsigned int break_lock; 
#endif 
//自旋鎖DEBUG 
#ifdef CONFIG_DEBUG_SPINLOCK 
unsigned int magic, owner_cpu; 
void *owner; 
#endif 
#ifdef CONFIG_DEBUG_LOCK_ALLOC 
struct lockdep_map dep_map; 
#endif 
} spinlock_t; 

typedef struct { 
unsigned int slock; 
} raw_spinlock_t; 
在接下來的代碼中再來分析結構中各成員的含義. 
2.2.1:spin_lock_init :自旋鎖的初始化代碼如下: 
# define spin_lock_init(lock) \ 
do { *(lock) = SPIN_LOCK_UNLOCKED; } while (0) 
#define SPIN_LOCK_UNLOCKED __SPIN_LOCK_UNLOCKED(old_style_spin_init) 
# define __SPIN_LOCK_UNLOCKED(lockname) \ 
(spinlock_t) { .raw_lock = __RAW_SPIN_LOCK_UNLOCKED, \ 
SPIN_DEP_MAP_INIT(lockname) } 
#define __RAW_SPIN_LOCK_UNLOCKED { 1 } 
上述操作就是把sipinlock_t中的raw_lock置為__RAW_SPIN_LOCK_UNLOCKED.即為1. 
此時鎖是打開狀態. 

2.2.2: spin_lock() :獲取自旋鎖. 
#define spin_lock(lock) _spin_lock(lock) 
_spin_lock()定義如下: 
void __lockfunc _spin_lock(spinlock_t *lock) 
{ 
//禁止搶占 
preempt_disable(); 
//這個函數在沒有定義自旋鎖調試的時候是空函數 
spin_acquire(&lock->dep_map, 0, 0, _RET_IP_); 
//相當於_raw_spin_lock(lock) 
LOCK_CONTENDED(lock, _raw_spin_trylock, _raw_spin_lock); 
} 
LOCK_CONTENDED的定義如下: 
#define LOCK_CONTENDED(_lock, try, lock) \ 
lock(_lock) 
_raw_spin_lock()的操作如下: 
# define _raw_spin_lock(lock) __raw_spin_lock(&(lock)->raw_lock) 

static inline void __raw_spin_lock(raw_spinlock_t *lock) 
{ 
asm volatile( 
"\n1:\t" 
//decl: lock->slock減1 
LOCK_PREFIX " ; decl %0\n\t" 
//如果不為負.跳轉到2f.2f後面沒有任何指令.即為退出 
"jns 2f\n" 
"3:\n" 
//重複執行nop.nop是x86的小延遲函數 
"rep;nop\n\t" 
//比較0與lock->slock的值,如果lock->slock不大於0.跳轉到標號3.即繼續重複執行nop 
"cmpl $0,%0\n\t" 
//如果lock->slock不大於0.跳轉到標號3.即繼續重複執行nop 
"jle 3b\n\t" 
//如果lock->slock大於0.跳轉到標號1.重新判斷鎖的slock成員 
"jmp 1b\n" 
"2:\t" : "=m" (lock->slock) : : "memory"); 
} 
在上面的函數中,可能對"jmp 1b\n"比較難以理解.在我們一般的觀念裡.獲得一個鎖.將其值減1.釋放鎖時將其值加1.實際上在自旋鎖的實現中lock->slock只有兩個可能值,一個是0. 一個是1.釋放鎖的時候並不是將lock->slock加1.而是將其賦為1.在後面的自旋鎖釋放代碼中再詳細分析. 

# define spin_unlock(lock) _spin_unlock(lock) 
void __lockfunc _spin_unlock(spinlock_t *lock) 
{ 
//在沒有配置自旋鎖調試的時候.該函數為空函數 
spin_release(&lock->dep_map, 1, _RET_IP_); 
// 
_raw_spin_unlock(lock); 
//釋放自旋鎖了.允許內核搶占 
preempt_enable(); 
} 
# define _raw_spin_unlock(lock) __raw_spin_unlock(&(lock)->raw_lock) 
static inline void __raw_spin_unlock(raw_spinlock_t *lock) 
{ 
//將lock->slock賦為1 
asm volatile("movb $1,%0" : "+m" (lock->slock) :: "memory"); 
} 
總體來說,釋放自旋鎖的分為兩步: 
第一步:將鎖的slock字段回歸到1.注意這裡不是加1,而是賦值1 
第二步:允許內核搶占,與加鎖時的禁止內核搶占相對應 

在內核中還有幾個與硬中斷和軟中斷有關的幾個自旋鎖API.如下所示: 
void spin_lock(spinlock_t *lock) 
void spin_lock_irqsave(spinlock_t *lock, unsigned long flags) 
void spin_lock_irq(spinlock_t *lock) 
void spin_lock_bh(spinlock_t *lock) 

void spin_unlock(spinlock_t *lock); 
void spin_unlock_irqrestore(spinlock_t *lock, unsigned long flags); 
void spin_unlock_irq(spinlock_t *lock); 
void spin_unlock_bh(spinlock_t *lock); 
代碼都不復雜,可自行結合前面分析的IRQ中斷部份理解這幾個API的操作. 

三:信號量的操作 
3.1:信號量的特點: 
根據前面的分析看到.自旋鎖是一種輕快但效率極其低下的一種加鎖,對於需要對臨界區長時間操作的情況下不太合適.因此,需要一種等待獲鎖的機制.這就是信號量.信號量在請求鎖的時候,如果不成功,便會將進程投入睡眠.因此要在允許睡眠的場合使用信號量. 
3.2:信號量的實現信號量的結構如下所示: 
struct semaphore { 
//信號量資源數 
atomic_t count; 
//是否有一些進程在睡眠 
int sleepers; 
//信號的等待隊列 
wait_queue_head_t wait; 
} 
3.3: sema_init() :信號量的初始化 
static inline void sema_init (struct semaphore *sem, int val) 
{ 
//設置信號量的資源數為val.sem->count表示允許同時有多少個進程佔有此信號量 
atomic_set(&sem->count, val); 
//sleepers成員置0,表示沒有進程在等待 
sem->sleepers = 0; 
//初始化等待隊列 
init_waitqueue_head(&sem->wait); 
} 
3.4:down :獲取信號量 
static inline void down(struct semaphore * sem) 
{ 
//可能會引起睡眠 
might_sleep(); 
__asm__ __volatile__( 
"# atomic down operation\n\t" 
//LOCK:鎖定總線標誌 
//使sem->count -1 如果小於0.跳轉到2 
LOCK "decl %0\n\t" /* --sem->count */ 
"js 2f\n" 
//後面沒有指令了,退出 
"1:\n" 
LOCK_SECTION_START("") 
//如果sem->count -1小於0.說明該信號已經沒有足夠的資源了.調用__down_failed 
"2:\tcall __down_failed\n\t" 
"jmp 1b\n" 
LOCK_SECTION_END 
:"=m" (sem->count) 
:"c" (sem) 
:"memory"); 
} 
__down_failed的代碼如下: 
asm( 
".section .sched.text\n" 
".align 4\n" 
".globl __down_failed\n" 
"__down_failed:\n\t" 
#if defined(CONFIG_FRAME_POINTER) 
"pushl %ebp\n\t" 
"movl %esp,%ebp\n\t" 
#endif 
//之所以把eax edx ecx壓棧,是因為__down()函數的參數就是eax edx ecx.它從堆棧中取參數.所以//將其入棧.這樣做是從2.4遺留下來的,因為2.4的__down函數為fastcall型 
"pushl %eax\n\t" 
"pushl %edx\n\t" 
"pushl %ecx\n\t" 
"call __down\n\t" 
"popl %ecx\n\t" 
"popl %edx\n\t" 
"popl %eax\n\t" 
#if defined(CONFIG_FRAME_POINTER) 
"movl %ebp,%esp\n\t" 
"popl %ebp\n\t" 
#endif 
"ret" 
) 
__down的代碼如下: 
asmlinkage void __sched __down(struct semaphore * sem) 
{ 
struct task_struct *tsk = current; 
//聲明一個等待隊列 
DECLARE_WAITQUEUE(wait, tsk); 
unsigned long flags; 

//將進程狀態置為不可中斷型.外部信號不可以將其中斷 
tsk->state = TASK_UNINTERRUPTIBLE; 
//獲取自旋鎖且關中斷 
spin_lock_irqsave(&sem->wait.lock, flags); 
//加入等待隊列,並將等待標誌高為WQ_FLAG_EXCLUSIVE 
add_wait_queue_exclusive_locked(&sem->wait, &wait); 

//sleepers之前只能為0或者是1 
sem->sleepers++; 
for (;;) { 
int sleepers = sem->sleepers; 

/* 
* Add "everybody else" into it. They aren't 
* playing, because we own the spinlock in 
* the wait_queue_head. 
*/ 
//如果之前有進程在線程,則使sem->count+1 (剛好使count等於down進來的值) 
//因為之前在down()中有減1操作 

//負數返回真 
if (!atomic_add_negative(sleepers - 1, &sem->count)) { 
//已經有信號了,將sleeps置零,退出循環後會喚醒等列中的一個進程 
sem->sleepers = 0; 
break; 
} 
//將sleepers 置1 
sem->sleepers = 1; /* us - see -1 above */ 
//釋放自旋量並恢復之前的中斷標誌 
spin_unlock_irqrestore(&sem->wait.lock, flags); 

//重新調度 
schedule(); 

//重新調度後,因為其狀態是TASK_UNINTERRUPTIBLE .所以要釋放信號量的進程將其喚醒 
//才能繼續運行 
spin_lock_irqsave(&sem->wait.lock, flags); 
//將狀態再設為TASK_UNINTERRUPTIBLE . 重新判斷是否有信號量資源 
tsk->state = TASK_UNINTERRUPTIBLE; 
} 

//已經有信號量資源了,將等待隊列刪除 
remove_wait_queue_locked(&sem->wait, &wait); 
//喚醒其它隊列 
wake_up_locked(&sem->wait); 
//恢復中斷標誌和釋放自旋鎖 
spin_unlock_irqrestore(&sem->wait.lock, flags); 
tsk->state = TASK_RUNNING; 
} 
這裡要注意的是對sleeper成員的理解.結合後面的代碼.進程在沒有獲得鎖時會將其設為1.在獲得了鎖的時候將其設為0.所以,sleeper=1時表示有進程在等待.sleeper=0時,有進程獲得信號量退出,或者沒有進程在等待此信號量. 
如果在等待隊列中進程被喚醒,並獲得了鎖.則將sleeper設為0.然後用break退出循環,再用wake_up_locked(&sem->wait)喚醒等待隊列中的一個進程,這個進程下次被調度的時候從shedule()後面開始運行.更改進程狀態好,調用if (!atomic_add_negative(sleepers - 1, &sem->count))判斷,此時sleeper=0.sleep-1 = -1.經過atomic_add_negative()使sem->count-1了.因此前面論述過.進程進來取信號量down()裡count有減1.然後如果有等待線程在獲取這個信號,剛又會將count+1,使其變到了原值.現在在我們這個情景中,已經有線程進程去了,應該要+1. 
那思考一下:為什麼要對count sleeper做這樣的處理呢?直接使用使用計數不就完了嗎?進程等待獲取信號量的時候sleep+1 count-1.釋放信號量的時候sleep-1.count+1不可以嗎? 
表面上上述的方法也能工作的很好,但是如果進程數目一多就要考慮到溢出問題了. 
Linux在這裡設計的很巧妙,可以好好的回味. 

3.5:up(): 釋放信號 
static inline void up(struct semaphore * sem) 
{ 
__asm__ __volatile__( 
"# atomic up operation\n\t" 
//sem->count 計數加1 
LOCK "incl %0\n\t" /* ++sem->count */ 
//如果大於0的話,說明信號量資源充足,沒有進程在等待 
//無需喚醒等待隊列.直接退出即可 
"jle 2f\n" 
"1:\n" 
LOCK_SECTION_START("") 
//否則喚醒隊列 
"2:\tcall __up_wakeup\n\t" 
"jmp 1b\n" 
LOCK_SECTION_END 
".subsection 0\n" 
:"=m" (sem->count) 
:"c" (sem) 
:"memory"); 
} 
asm( 
".section .sched.text\n" 
".align 4\n" 
".globl __up_wakeup\n" 
"__up_wakeup:\n\t" 
"pushl %eax\n\t" 
"pushl %edx\n\t" 
"pushl %ecx\n\t" 
"call __up\n\t" 
"popl %ecx\n\t" 
"popl %edx\n\t" 
"popl %eax\n\t" 
"ret" 
); 
同理調用__up()進行處理 
asmlinkage void __up(struct semaphore *sem) 
{ 
wake_up(&sem->wait); 
} 
這個函數很簡單,就是將信號量中的等待隊列喚醒.注意之前提到過的,加入等待隊列中所用的標誌WQ_FLAG_EXCLUSIVE.這個標誌表示只需將掛成隊列前面的進程喚醒.無需全部喚醒. 
信號量的使用內核中還有很多的變體API.究其原理都差不多.可以自行了解^_^ 

四:完成變量 
4.1:完全變量的特點在內核中經常有這樣的需要,在一個進程中要等待某個進程完成某個事情.內核為其提供了一個簡單的接口,其實它只是一個簡化版的信號量而已. 
4.2:完全變量的結構完全變量在代碼中的結構如下: 
struct completion { 
//等待標誌 
unsigned int done; 
//等待隊列 
wait_queue_head_t wait; 
}; 
4.3:完全變量的操作與實現 
wait_for_completion():等待條件的完成,它的代碼如下: 
void __sched wait_for_completion(struct completion *x) 
{ 
wait_for_common(x, MAX_SCHEDULE_TIMEOUT, TASK_UNINTERRUPTIBLE); 
} 
wait_for_common() à do_wait_for_common(): 
static inline long __sched 
do_wait_for_common(struct completion *x, long timeout, int state) 
{ 
//!x->done:條件末完成.投入睡眠等待 
if (!x->done) { 
DECLARE_WAITQUEUE(wait, current); 

wait.flags |= WQ_FLAG_EXCLUSIVE; 
//加入等待隊列 
__add_wait_queue_tail(&x->wait, &wait); 
do { 
//如果是可中斷狀態且有末處理的信號。 將其移出等待隊列 
if (state == TASK_INTERRUPTIBLE && 
signal_pending(current)) { 
__remove_wait_queue(&x->wait, &wait); 
return -ERESTARTSYS; 
} 
__set_current_state(state); 
spin_unlock_irq(&x->wait.lock); 
timeout = schedule_timeout(timeout); 
spin_lock_irq(&x->wait.lock); 
if (!timeout) { 
//延時到了.將其移出等待隊列 
__remove_wait_queue(&x->wait, &wait); 
return timeout; 
} 
} while (!x->done); 
//運行到這裡,條件已經完成了,將其從等待隊列移出 
__remove_wait_queue(&x->wait, &wait); 
} 
//如果條件完成了,將x->done-- 直接返回 
x->done--; 
return timeout; 
} 

Complete()來用通告已經完成了這個條件.代碼如下: 
void complete(struct completion *x) 
{ 
unsigned long flags; 

//持完全變量的自旋鎖且禁中斷 
spin_lock_irqsave(&x->wait.lock, flags); 
//x- 
x->done++; 
__wake_up_common(&x->wait, TASK_UNINTERRUPTIBLE | TASK_INTERRUPTIBLE, 
1, 0, NULL); 
//釋放自旋鎖且恢復中斷到以前的狀態 
spin_unlock_irqrestore(&x->wait.lock, flags); 
} 
__wake_up_common()的代碼如下所示: 
static void __wake_up_common(wait_queue_head_t *q, unsigned int mode, 
int nr_exclusive, int sync, void *key) 
{ 
wait_queue_t *curr, *next; 

//遍歷等待隊列且喚醒等待的進程 
list_for_each_entry_safe(curr, next, &q->task_list, task_list) { 
unsigned flags = curr->flags; 

if (curr->func(curr, mode, sync, key) && 
(flags & WQ_FLAG_EXCLUSIVE) && !--nr_exclusive) 
break; 
} 
} 
分析完了信號量之後,看完全變量的實現還是挺簡單.另外需要注意的是信號量與完成變量裡的自旋鎖的使用.信號量是在其中的一段操作中使用了自旋鎖.這是為了在這一段操作中保持串行化.而完全變量是一開始就加鎖.不允許wait_for_completion()和completion()同時調用. 

五:讀寫信號量 
5.1:讀寫信號量的特點通常受保護的數據是允許多個進程去讀的。 只需要保護寫操作就可以了。 如果多個讀者去排隊等待其它讀者的操作完成的話,顯然是有失效率的。 讀寫信號量就是為了解瘊這個問題而產生的。 讀寫信號量的等待隊列是一個嚴格的FIFO.有以下幾個特點: 
1):讀者去獲取信號量的時候: 
1:如果臨界區是空的或者有其它讀者在操作,以時如果沒有寫者在排隊,就把讀者放進去。 如果有寫者在排隊,就把讀者加入到等待隊列。 
2:如果臨界區有寫者在操作。 就把這個讀者加入等待隊列。 
2):寫者去獲取信號量的時候: 
1:如果臨界區是空的且等待隊列為空。 放這個寫者進去。 如果等待隊不為空,就將其加至等待隊列 
2:如果臨界區中有讀者在操作,就將其加入到等待隊列. 
3):釋放信號量的時候: 
1:如果這個進程是讀者。 檢查是不是臨界區內的最後一個讀者。 如果是最後一個讀者,檢查是否有寫者在等待。 如果有。 將寫者喚醒。如果沒有,直接退出。 
2:如果釋放信號量是一個寫者,通常會喚醒等待隊列中的第一個進程 
2.1:如果這個進程是讀者。 那它後面的所有讀者也會被喚醒(不包含寫者後面的) 
2.2:如果這個進程是寫者。 那麼它後面的所有等待進程繼續睡眠. 
總之:在這裡要注意對待讀者與寫者的區別。 在臨界區中可以允許有多個讀者,但不允許有多個寫者。 

5.2:讀寫信號量結構: 
讀寫信號量在內核中對應的數據結構為: 
struct rw_semaphore { 
//activity = 0:表示沒有對像在操作信號量 
//activity > 0:表示有一個或者多個讀者在操作信號量 
//activity < 0:表示有一個寫者在操作信號量 
__s32 activity; 
//讀寫信號量對應的自旋鎖 
spinlock_t wait_lock; 
//對應的等待隊列 
struct list_head wait_list; 
#if RWSEM_DEBUG 
int debug; 
#endif 
} 

5.3:讀寫信號量的操作與實現: 
5.3.1:down_read():讀者獲得信號量 
down_read()用來在讀者操作臨界區的情況。 它的代碼如下: 
static inline void down_read(struct rw_semaphore *sem) 
{ 
//可能會引起睡眠 
might_sleep(); 
//調試用 
rwsemtrace(sem,"Entering down_read"); 
__down_read(sem); 
//調試用 
rwsemtrace(sem,"Leaving down_read"); 
} 
__down_read()的代碼如下: 
void fastcall __sched __down_read(struct rw_semaphore *sem) 
{ 
struct rwsem_waiter waiter; 
struct task_struct *tsk; 

rwsemtrace(sem, "Entering __down_read"); 

//獲取自旋鎖 
spin_lock(&sem->wait_lock); 

//sem->activeity>=0:說明有讀者進入了臨界區 
//sem->wait_list:等待隊列 
//如果activeity>=0. wait_list不為空:說明有寫者在等待 
//如果activeity<0:說明有寫者在對臨界區操作 

//如果臨界區中有讀者然後沒有寫者在等待.可以直接獲得此信號量 
if (sem->activity >= 0 && list_empty(&sem->wait_list)) { 
/* granted */ 
sem->activity++; 
spin_unlock(&sem->wait_lock); 
goto out; 
} 

//否則,將讀者加入等待隊列末尾 
tsk = current; 
set_task_state(tsk, TASK_UNINTERRUPTIBLE); 

/* set up my own style of waitqueue */ 
waiter.task = tsk; 
//對應的等待標誌是RWSEM_WAITING_FOR_READ 
waiter.flags = RWSEM_WAITING_FOR_READ; 
get_task_struct(tsk); 

list_add_tail(&waiter.list, &sem->wait_list); 

/* we don't need to touch the semaphore struct anymore */ 
spin_unlock(&sem->wait_lock); 

/* wait to be given the lock */ 
//一個死循環,這樣是為了是被期待的喚醒 
for (;;) { 
//如果waiter.task==NULL.確實是被喚醒的.因為喚醒的時候,會將waiter,task清空 
if (!waiter.task) 
break; 
//睡眠 
schedule(); 
//設置進程狀態為不可中斷型 
set_task_state(tsk, TASK_UNINTERRUPTIBLE); 
} 

//已經獲得信號量了.設置狀態為RUNNING 退出 
tsk->state = TASK_RUNNING; 

out: 
rwsemtrace(sem, "Leaving __down_read"); 
} 
5.3.2: up_read():讀者釋放信號量讀者操作完臨界區了之後,使用up_read()將信號量釋放。 代碼如下: 
up_read()à 
void fastcall __up_read(struct rw_semaphore *sem) 
{ 
rwsemtrace(sem, "Entering __up_read"); 

spin_lock(&sem->wait_lock); 

//--sem->activity == 0:此讀者是臨界區中的最後一個對象 
//sem->wait_list不為空說明臨界區外有寫者在等待 
if (--sem->activity == 0 && !list_empty(&sem->wait_list)) 
//喚醒等待隊列中的第一個進程 
sem = __rwsem_wake_one_writer(sem); 

//如果不是臨界區中的最後一個讀者或者是沒有寫者在等待,退出 
spin_unlock(&sem->wait_lock); 

rwsemtrace(sem, "Leaving __up_read"); 
} 
__rwsem_wake_one_writer()代碼如下: 
static inline struct rw_semaphore * 
__rwsem_wake_one_writer(struct rw_semaphore *sem) 
{ 
struct rwsem_waiter *waiter; 
struct task_struct *tsk; 

//將activity置為-1.因為臨界區是被寫者佔據了 
sem->activity = -1; 

//取等待隊列中的第一個進程 
waiter = list_entry(sem->wait_list.next, struct rwsem_waiter, list); 
//將其從等待鍊錶中刪除 
list_del(&waiter->list); 

tsk = waiter->task; 
mb(); 
//將waiter->task清空 
waiter->task = NULL; 
//喚醒過程 
wake_up_process(tsk); 
//減少進程引用計數,因為在把它投入隊列的時候被+1 了.這樣做是為了防止task結構被釋放掉 
put_task_struct(tsk); 
return sem; 
} 
注意裡面對activity和waiter->task的操作. 
5.3.3: down_write():寫者獲得信號量在更改臨界區的情況下,使用down_write()獲得信號量。 代碼如下: 
down_write() à __down_write(): 
void fastcall __sched __down_write(struct rw_semaphore *sem) 
{ 
struct rwsem_waiter waiter; 
struct task_struct *tsk; 

rwsemtrace(sem, "Entering __down_write"); 

spin_lock(&sem->wait_lock); 

//如果臨界區沒有對像在操作.且沒有其它的寫者在等待 
if (sem->activity == 0 && list_empty(&sem->wait_list)) { 
/* granted */ 
//獲得信號 
sem->activity = -1; 
spin_unlock(&sem->wait_lock); 
goto out; 
} 

//如果臨界區被其它對象佔據或者有其它的寫者在等待,將其加到等待隊列末尾 
tsk = current; 
set_task_state(tsk, TASK_UNINTERRUPTIBLE); 

/* set up my own style of waitqueue */ 
waiter.task = tsk; 
//置標誌為RWSEM_WAITING_FOR_WRITE 
waiter.flags = RWSEM_WAITING_FOR_WRITE; 
get_task_struct(tsk); 

list_add_tail(&waiter.list, &sem->wait_list); 

/* we don't need to touch the semaphore struct anymore */ 
spin_unlock(&sem->wait_lock); 

/* wait to be given the lock */ 
//判斷是否被正常喚醒 
for (;;) { 
if (!waiter.task) 
break; 
schedule(); 
set_task_state(tsk, TASK_UNINTERRUPTIBLE); 
} 

tsk->state = TASK_RUNNING; 

out: 
rwsemtrace(sem, "Leaving __down_write"); 
} 
注意讀者進入臨界的判斷條件和寫者進入臨界區的判斷條件的不同.讀者只需要sem->activity >= 0且沒有等待隊列就可以進行。 寫者必須要求sem->activity == 0且無等待隊列. 
5.3.4:up_write():寫者釋放信號量寫者操作完了之後,調用up_write()釋放信號量。 代碼如下: 
Up_write() à __up_write(): 
void fastcall __up_write(struct rw_semaphore *sem) 
{ 
rwsemtrace(sem, "Entering __up_write"); 

spin_lock(&sem->wait_lock); 

//寫者釋放信號量.將activity重置為0.因為臨界區已經沒有對像在操作了 
sem->activity = 0; 
//如果有進程在等待,將其喚醒 
if (!list_empty(&sem->wait_list)) 
sem = __rwsem_do_wake(sem, 1); 

spin_unlock(&sem->wait_lock); 

rwsemtrace(sem, "Leaving __up_write"); 
} 
寫進程釋放信號喚醒其它進程的處理有點特別,代碼如下: 
static inline struct rw_semaphore * 
__rwsem_do_wake(struct rw_semaphore *sem, int wakewrite) 
{ 
struct rwsem_waiter *waiter; 
struct task_struct *tsk; 
int woken; 

rwsemtrace(sem, "Entering __rwsem_do_wake"); 

waiter = list_entry(sem->wait_list.next, struct rwsem_waiter, list); 

//在__up_write()調用的時候.wakewrite是置為1的.表示是一個寫者在進行喚醒 
if (!wakewrite) { 
if (waiter->flags & RWSEM_WAITING_FOR_WRITE) 
goto out; 
goto dont_wake_writers; 
} 

/* if we are allowed to wake writers try to grant a single write lock 
* if there's a writer at the front of the queue 
* - we leave the 'waiting count' incremented to signify potential 
* contention 
*/ 

//如果等待隊列的第一個進程是一個寫等待,只需要將這個寫者喚醒就行了 
if (waiter->flags & RWSEM_WAITING_FOR_WRITE) { 
sem->activity = -1; 
list_del(&waiter->list); 
tsk = waiter->task; 
/* Don't touch waiter after ->task has been NULLed */ 
mb(); 
waiter->task = NULL; 
wake_up_process(tsk); 
put_task_struct(tsk); 
goto out; 
} 

/* grant an infinite number of read locks to the front of the queue */ 
dont_wake_writers: 
//如果第一個進程是讀等待,將將其後所有為讀等待的進程喚醒 
woken = 0; 
while (waiter->flags & RWSEM_WAITING_FOR_READ) { 
struct list_head *next = waiter->list.next; 

list_del(&waiter->list); 
tsk = waiter->task; 
mb(); 
waiter->task = NULL; 
wake_up_process(tsk); 
put_task_struct(tsk); 
woken++; 
if (list_empty(&sem->wait_list)) 
break; 
waiter = list_entry(next, struct rwsem_waiter, list); 
} 

sem->activity += woken; 

out: 
rwsemtrace(sem, "Leaving __rwsem_do_wake"); 
return sem; 
} 
六:小結在linux內核中還有其它的幾種同步機制,我在這裡沒有一一列出分析。 在上面分析的幾種機制中,設得都很巧妙。 需要慢慢的琢磨。