由timeboy於2009-03-15 14:51
本文系本站原創,歡迎轉載!
轉載請註明出處:http://ericxiao.cublog.cn/
------------------------------------------
同步是linux內核中一種很重要的操作.它為內核提供了一種臨界區和SMP系統中的數據保護機制.今天就來分析一下在linux內核是怎麼樣實現這些操作的.
一:原子操作(摘自《understanding the linux kernel 2.4》)
原子操作是指在執行過程中不能被打斷的操作.它包括以下幾種類型:
進行一次或者零次對齊內存的訪問操作都是原子操作.因為這些指令一般都是單指令.不可能單指令在執行過程中被搶占。
如果在讀操作之後,寫操作之前沒有其他處理器佔用內存總線。 那麼從內存中讀取數據,更新數據並把更新後的數據寫回內存的這些“讀—修改—寫”彙編語言指令是原子的。 例如:inc或dec
操作碼前綴是lock字節的(0xf0)的“讀—修改—寫”彙編程序指令即使在SMP系統中也是原子的。 當控制單元檢測lock後,就會把總線鎖住,一直到這些加鎖指令執行完為止.
操作碼前綴是一個rep字節的彙編語言指令都不是原子的。 這條指令強行讓控制單元執行相同的指令.控制單元在執行新的指令之前要檢查掛起的中斷.
1.1: 在內核中提供了以下的幾組操作接口:
void atomic_set(atomic_t *v, int i)
int atomic_read(atomic_t *v)
void atomic_add(int i, atomic_t *v)
void atomic_sub(int i, atomic_t *v)
void atomic_inc(atomic_t *v)
void atomic_dec(atomic_t *v)
int atomic_inc_and_test(atomic_t *v)
int atomic_dec_and_test(atomic_t *v)
int atomic_sub_and_test(int i, atomic_t *v)
int atomic_add_negative(int i, atomic_t *v)
int atomic_add_return(int i, atomic_t *v)
int atomic_sub_return(int i, atomic_t *v)
int atomic_inc_return(atomic_t *v)
int atomic_dec_return(atomic_t *v)
atomic_sub(amount, &first_atomic)
atomic_add(amount, &second_atomic)
上述原子操作對像是一個32位數。 之所以定義成這樣,是為了和一般的操作區別。 提醒用戶這是一個原子操作接口。
1.2: 原子的位操作:
void set_bit(nr, void *addr)
void clear_bit(nr, void *addr)
void change_bit(nr, void *addr)
test_bit(nr, void *addr)
int test_and_set_bit(nr, void *addr)
int test_and_clear_bit(nr, void *addr)
int test_and_change_bit(nr, void *addr)
以atomic_inc(atomic_t *v), atomic_dec(atomic_t *v)為例來分析內核中對應的代碼.
static __inline__ void atomic_dec(atomic_t *v)
{
__asm__ __volatile__(
LOCK_PREFIX "decl %0"
:"+m" (v->counter));
}
LOCK_PREFIX為鎖操作前綴
Atomic_inc的代碼如下:
static __inline__ void atomic_inc(atomic_t *v)
{
__asm__ __volatile__(
LOCK_PREFIX "incl %0"
:"+m" (v->counter));
}
二:自旋鎖上述的原子操作操作接口提供了單變量的保護操作,但是在實際項目中,情況並沒有這麼簡單,我們需要保護的是一個臨界區.所以我們需要用到其它的加鎖方法.自旋鎖就是其中之一.
2.1:自旋鎖的特性自旋鎖只能被一個控制路徑所持有.進入臨界區,自旋鎖鎖住.其後如果有其它線程來取自旋鎖的自己就會原地“自旋” ,一直到持有者放開鎖為止.
很顯然,當得不到鎖的時候,CPU就會在一直在做輪詢判斷.也就是說如果一個進程取不到自旋鎖.也會一直判斷下去,不會主動去調度其它進程代替當前進程.但是這樣鎖機制的開銷極少.可以快速的開鎖和加鎖.持有自旋鎖的進程一般都會對臨界區有快速的處理另外,內核還規定持有自旋鎖的進程不能被搶占,這樣做主要是出於系統性能考慮.如果一個持自旋鎖的進程被調度出CPU.那其它需要這些鎖的進程被調度進來之後,也只做“自旋”工作.
2.2: 自旋鎖的實現自旋鎖在內核中對應的類型為: spinlock_t
typedef struct {
raw_spinlock_t raw_lock;
#if defined(CONFIG_PREEMPT) && defined(CONFIG_SMP)
//配置了內核搶占與SMP
unsigned int break_lock;
#endif
//自旋鎖DEBUG
#ifdef CONFIG_DEBUG_SPINLOCK
unsigned int magic, owner_cpu;
void *owner;
#endif
#ifdef CONFIG_DEBUG_LOCK_ALLOC
struct lockdep_map dep_map;
#endif
} spinlock_t;
typedef struct {
unsigned int slock;
} raw_spinlock_t;
在接下來的代碼中再來分析結構中各成員的含義.
2.2.1:spin_lock_init :自旋鎖的初始化代碼如下:
# define spin_lock_init(lock) \
do { *(lock) = SPIN_LOCK_UNLOCKED; } while (0)
#define SPIN_LOCK_UNLOCKED __SPIN_LOCK_UNLOCKED(old_style_spin_init)
# define __SPIN_LOCK_UNLOCKED(lockname) \
(spinlock_t) { .raw_lock = __RAW_SPIN_LOCK_UNLOCKED, \
SPIN_DEP_MAP_INIT(lockname) }
#define __RAW_SPIN_LOCK_UNLOCKED { 1 }
上述操作就是把sipinlock_t中的raw_lock置為__RAW_SPIN_LOCK_UNLOCKED.即為1.
此時鎖是打開狀態.
2.2.2: spin_lock() :獲取自旋鎖.
#define spin_lock(lock) _spin_lock(lock)
_spin_lock()定義如下:
void __lockfunc _spin_lock(spinlock_t *lock)
{
//禁止搶占
preempt_disable();
//這個函數在沒有定義自旋鎖調試的時候是空函數
spin_acquire(&lock->dep_map, 0, 0, _RET_IP_);
//相當於_raw_spin_lock(lock)
LOCK_CONTENDED(lock, _raw_spin_trylock, _raw_spin_lock);
}
LOCK_CONTENDED的定義如下:
#define LOCK_CONTENDED(_lock, try, lock) \
lock(_lock)
_raw_spin_lock()的操作如下:
# define _raw_spin_lock(lock) __raw_spin_lock(&(lock)->raw_lock)
static inline void __raw_spin_lock(raw_spinlock_t *lock)
{
asm volatile(
"\n1:\t"
//decl: lock->slock減1
LOCK_PREFIX " ; decl %0\n\t"
//如果不為負.跳轉到2f.2f後面沒有任何指令.即為退出
"jns 2f\n"
"3:\n"
//重複執行nop.nop是x86的小延遲函數
"rep;nop\n\t"
//比較0與lock->slock的值,如果lock->slock不大於0.跳轉到標號3.即繼續重複執行nop
"cmpl $0,%0\n\t"
//如果lock->slock不大於0.跳轉到標號3.即繼續重複執行nop
"jle 3b\n\t"
//如果lock->slock大於0.跳轉到標號1.重新判斷鎖的slock成員
"jmp 1b\n"
"2:\t" : "=m" (lock->slock) : : "memory");
}
在上面的函數中,可能對"jmp 1b\n"比較難以理解.在我們一般的觀念裡.獲得一個鎖.將其值減1.釋放鎖時將其值加1.實際上在自旋鎖的實現中lock->slock只有兩個可能值,一個是0. 一個是1.釋放鎖的時候並不是將lock->slock加1.而是將其賦為1.在後面的自旋鎖釋放代碼中再詳細分析.
# define spin_unlock(lock) _spin_unlock(lock)
void __lockfunc _spin_unlock(spinlock_t *lock)
{
//在沒有配置自旋鎖調試的時候.該函數為空函數
spin_release(&lock->dep_map, 1, _RET_IP_);
//
_raw_spin_unlock(lock);
//釋放自旋鎖了.允許內核搶占
preempt_enable();
}
# define _raw_spin_unlock(lock) __raw_spin_unlock(&(lock)->raw_lock)
static inline void __raw_spin_unlock(raw_spinlock_t *lock)
{
//將lock->slock賦為1
asm volatile("movb $1,%0" : "+m" (lock->slock) :: "memory");
}
總體來說,釋放自旋鎖的分為兩步:
第一步:將鎖的slock字段回歸到1.注意這裡不是加1,而是賦值1
第二步:允許內核搶占,與加鎖時的禁止內核搶占相對應
在內核中還有幾個與硬中斷和軟中斷有關的幾個自旋鎖API.如下所示:
void spin_lock(spinlock_t *lock)
void spin_lock_irqsave(spinlock_t *lock, unsigned long flags)
void spin_lock_irq(spinlock_t *lock)
void spin_lock_bh(spinlock_t *lock)
void spin_unlock(spinlock_t *lock);
void spin_unlock_irqrestore(spinlock_t *lock, unsigned long flags);
void spin_unlock_irq(spinlock_t *lock);
void spin_unlock_bh(spinlock_t *lock);
代碼都不復雜,可自行結合前面分析的IRQ中斷部份理解這幾個API的操作.
三:信號量的操作
3.1:信號量的特點:
根據前面的分析看到.自旋鎖是一種輕快但效率極其低下的一種加鎖,對於需要對臨界區長時間操作的情況下不太合適.因此,需要一種等待獲鎖的機制.這就是信號量.信號量在請求鎖的時候,如果不成功,便會將進程投入睡眠.因此要在允許睡眠的場合使用信號量.
3.2:信號量的實現信號量的結構如下所示:
struct semaphore {
//信號量資源數
atomic_t count;
//是否有一些進程在睡眠
int sleepers;
//信號的等待隊列
wait_queue_head_t wait;
}
3.3: sema_init() :信號量的初始化
static inline void sema_init (struct semaphore *sem, int val)
{
//設置信號量的資源數為val.sem->count表示允許同時有多少個進程佔有此信號量
atomic_set(&sem->count, val);
//sleepers成員置0,表示沒有進程在等待
sem->sleepers = 0;
//初始化等待隊列
init_waitqueue_head(&sem->wait);
}
3.4:down :獲取信號量
static inline void down(struct semaphore * sem)
{
//可能會引起睡眠
might_sleep();
__asm__ __volatile__(
"# atomic down operation\n\t"
//LOCK:鎖定總線標誌
//使sem->count -1 如果小於0.跳轉到2
LOCK "decl %0\n\t" /* --sem->count */
"js 2f\n"
//後面沒有指令了,退出
"1:\n"
LOCK_SECTION_START("")
//如果sem->count -1小於0.說明該信號已經沒有足夠的資源了.調用__down_failed
"2:\tcall __down_failed\n\t"
"jmp 1b\n"
LOCK_SECTION_END
:"=m" (sem->count)
:"c" (sem)
:"memory");
}
__down_failed的代碼如下:
asm(
".section .sched.text\n"
".align 4\n"
".globl __down_failed\n"
"__down_failed:\n\t"
#if defined(CONFIG_FRAME_POINTER)
"pushl %ebp\n\t"
"movl %esp,%ebp\n\t"
#endif
//之所以把eax edx ecx壓棧,是因為__down()函數的參數就是eax edx ecx.它從堆棧中取參數.所以//將其入棧.這樣做是從2.4遺留下來的,因為2.4的__down函數為fastcall型
"pushl %eax\n\t"
"pushl %edx\n\t"
"pushl %ecx\n\t"
"call __down\n\t"
"popl %ecx\n\t"
"popl %edx\n\t"
"popl %eax\n\t"
#if defined(CONFIG_FRAME_POINTER)
"movl %ebp,%esp\n\t"
"popl %ebp\n\t"
#endif
"ret"
)
__down的代碼如下:
asmlinkage void __sched __down(struct semaphore * sem)
{
struct task_struct *tsk = current;
//聲明一個等待隊列
DECLARE_WAITQUEUE(wait, tsk);
unsigned long flags;
//將進程狀態置為不可中斷型.外部信號不可以將其中斷
tsk->state = TASK_UNINTERRUPTIBLE;
//獲取自旋鎖且關中斷
spin_lock_irqsave(&sem->wait.lock, flags);
//加入等待隊列,並將等待標誌高為WQ_FLAG_EXCLUSIVE
add_wait_queue_exclusive_locked(&sem->wait, &wait);
//sleepers之前只能為0或者是1
sem->sleepers++;
for (;;) {
int sleepers = sem->sleepers;
/*
* Add "everybody else" into it. They aren't
* playing, because we own the spinlock in
* the wait_queue_head.
*/
//如果之前有進程在線程,則使sem->count+1 (剛好使count等於down進來的值)
//因為之前在down()中有減1操作
//負數返回真
if (!atomic_add_negative(sleepers - 1, &sem->count)) {
//已經有信號了,將sleeps置零,退出循環後會喚醒等列中的一個進程
sem->sleepers = 0;
break;
}
//將sleepers 置1
sem->sleepers = 1; /* us - see -1 above */
//釋放自旋量並恢復之前的中斷標誌
spin_unlock_irqrestore(&sem->wait.lock, flags);
//重新調度
schedule();
//重新調度後,因為其狀態是TASK_UNINTERRUPTIBLE .所以要釋放信號量的進程將其喚醒
//才能繼續運行
spin_lock_irqsave(&sem->wait.lock, flags);
//將狀態再設為TASK_UNINTERRUPTIBLE . 重新判斷是否有信號量資源
tsk->state = TASK_UNINTERRUPTIBLE;
}
//已經有信號量資源了,將等待隊列刪除
remove_wait_queue_locked(&sem->wait, &wait);
//喚醒其它隊列
wake_up_locked(&sem->wait);
//恢復中斷標誌和釋放自旋鎖
spin_unlock_irqrestore(&sem->wait.lock, flags);
tsk->state = TASK_RUNNING;
}
這裡要注意的是對sleeper成員的理解.結合後面的代碼.進程在沒有獲得鎖時會將其設為1.在獲得了鎖的時候將其設為0.所以,sleeper=1時表示有進程在等待.sleeper=0時,有進程獲得信號量退出,或者沒有進程在等待此信號量.
如果在等待隊列中進程被喚醒,並獲得了鎖.則將sleeper設為0.然後用break退出循環,再用wake_up_locked(&sem->wait)喚醒等待隊列中的一個進程,這個進程下次被調度的時候從shedule()後面開始運行.更改進程狀態好,調用if (!atomic_add_negative(sleepers - 1, &sem->count))判斷,此時sleeper=0.sleep-1 = -1.經過atomic_add_negative()使sem->count-1了.因此前面論述過.進程進來取信號量down()裡count有減1.然後如果有等待線程在獲取這個信號,剛又會將count+1,使其變到了原值.現在在我們這個情景中,已經有線程進程去了,應該要+1.
那思考一下:為什麼要對count sleeper做這樣的處理呢?直接使用使用計數不就完了嗎?進程等待獲取信號量的時候sleep+1 count-1.釋放信號量的時候sleep-1.count+1不可以嗎?
表面上上述的方法也能工作的很好,但是如果進程數目一多就要考慮到溢出問題了.
Linux在這裡設計的很巧妙,可以好好的回味.
3.5:up(): 釋放信號
static inline void up(struct semaphore * sem)
{
__asm__ __volatile__(
"# atomic up operation\n\t"
//sem->count 計數加1
LOCK "incl %0\n\t" /* ++sem->count */
//如果大於0的話,說明信號量資源充足,沒有進程在等待
//無需喚醒等待隊列.直接退出即可
"jle 2f\n"
"1:\n"
LOCK_SECTION_START("")
//否則喚醒隊列
"2:\tcall __up_wakeup\n\t"
"jmp 1b\n"
LOCK_SECTION_END
".subsection 0\n"
:"=m" (sem->count)
:"c" (sem)
:"memory");
}
asm(
".section .sched.text\n"
".align 4\n"
".globl __up_wakeup\n"
"__up_wakeup:\n\t"
"pushl %eax\n\t"
"pushl %edx\n\t"
"pushl %ecx\n\t"
"call __up\n\t"
"popl %ecx\n\t"
"popl %edx\n\t"
"popl %eax\n\t"
"ret"
);
同理調用__up()進行處理
asmlinkage void __up(struct semaphore *sem)
{
wake_up(&sem->wait);
}
這個函數很簡單,就是將信號量中的等待隊列喚醒.注意之前提到過的,加入等待隊列中所用的標誌WQ_FLAG_EXCLUSIVE.這個標誌表示只需將掛成隊列前面的進程喚醒.無需全部喚醒.
信號量的使用內核中還有很多的變體API.究其原理都差不多.可以自行了解^_^
四:完成變量
4.1:完全變量的特點在內核中經常有這樣的需要,在一個進程中要等待某個進程完成某個事情.內核為其提供了一個簡單的接口,其實它只是一個簡化版的信號量而已.
4.2:完全變量的結構完全變量在代碼中的結構如下:
struct completion {
//等待標誌
unsigned int done;
//等待隊列
wait_queue_head_t wait;
};
4.3:完全變量的操作與實現
wait_for_completion():等待條件的完成,它的代碼如下:
void __sched wait_for_completion(struct completion *x)
{
wait_for_common(x, MAX_SCHEDULE_TIMEOUT, TASK_UNINTERRUPTIBLE);
}
wait_for_common() à do_wait_for_common():
static inline long __sched
do_wait_for_common(struct completion *x, long timeout, int state)
{
//!x->done:條件末完成.投入睡眠等待
if (!x->done) {
DECLARE_WAITQUEUE(wait, current);
wait.flags |= WQ_FLAG_EXCLUSIVE;
//加入等待隊列
__add_wait_queue_tail(&x->wait, &wait);
do {
//如果是可中斷狀態且有末處理的信號。 將其移出等待隊列
if (state == TASK_INTERRUPTIBLE &&
signal_pending(current)) {
__remove_wait_queue(&x->wait, &wait);
return -ERESTARTSYS;
}
__set_current_state(state);
spin_unlock_irq(&x->wait.lock);
timeout = schedule_timeout(timeout);
spin_lock_irq(&x->wait.lock);
if (!timeout) {
//延時到了.將其移出等待隊列
__remove_wait_queue(&x->wait, &wait);
return timeout;
}
} while (!x->done);
//運行到這裡,條件已經完成了,將其從等待隊列移出
__remove_wait_queue(&x->wait, &wait);
}
//如果條件完成了,將x->done-- 直接返回
x->done--;
return timeout;
}
Complete()來用通告已經完成了這個條件.代碼如下:
void complete(struct completion *x)
{
unsigned long flags;
//持完全變量的自旋鎖且禁中斷
spin_lock_irqsave(&x->wait.lock, flags);
//x-
x->done++;
__wake_up_common(&x->wait, TASK_UNINTERRUPTIBLE | TASK_INTERRUPTIBLE,
1, 0, NULL);
//釋放自旋鎖且恢復中斷到以前的狀態
spin_unlock_irqrestore(&x->wait.lock, flags);
}
__wake_up_common()的代碼如下所示:
static void __wake_up_common(wait_queue_head_t *q, unsigned int mode,
int nr_exclusive, int sync, void *key)
{
wait_queue_t *curr, *next;
//遍歷等待隊列且喚醒等待的進程
list_for_each_entry_safe(curr, next, &q->task_list, task_list) {
unsigned flags = curr->flags;
if (curr->func(curr, mode, sync, key) &&
(flags & WQ_FLAG_EXCLUSIVE) && !--nr_exclusive)
break;
}
}
分析完了信號量之後,看完全變量的實現還是挺簡單.另外需要注意的是信號量與完成變量裡的自旋鎖的使用.信號量是在其中的一段操作中使用了自旋鎖.這是為了在這一段操作中保持串行化.而完全變量是一開始就加鎖.不允許wait_for_completion()和completion()同時調用.
五:讀寫信號量
5.1:讀寫信號量的特點通常受保護的數據是允許多個進程去讀的。 只需要保護寫操作就可以了。 如果多個讀者去排隊等待其它讀者的操作完成的話,顯然是有失效率的。 讀寫信號量就是為了解瘊這個問題而產生的。 讀寫信號量的等待隊列是一個嚴格的FIFO.有以下幾個特點:
1):讀者去獲取信號量的時候:
1:如果臨界區是空的或者有其它讀者在操作,以時如果沒有寫者在排隊,就把讀者放進去。 如果有寫者在排隊,就把讀者加入到等待隊列。
2:如果臨界區有寫者在操作。 就把這個讀者加入等待隊列。
2):寫者去獲取信號量的時候:
1:如果臨界區是空的且等待隊列為空。 放這個寫者進去。 如果等待隊不為空,就將其加至等待隊列
2:如果臨界區中有讀者在操作,就將其加入到等待隊列.
3):釋放信號量的時候:
1:如果這個進程是讀者。 檢查是不是臨界區內的最後一個讀者。 如果是最後一個讀者,檢查是否有寫者在等待。 如果有。 將寫者喚醒。如果沒有,直接退出。
2:如果釋放信號量是一個寫者,通常會喚醒等待隊列中的第一個進程
2.1:如果這個進程是讀者。 那它後面的所有讀者也會被喚醒(不包含寫者後面的)
2.2:如果這個進程是寫者。 那麼它後面的所有等待進程繼續睡眠.
總之:在這裡要注意對待讀者與寫者的區別。 在臨界區中可以允許有多個讀者,但不允許有多個寫者。
5.2:讀寫信號量結構:
讀寫信號量在內核中對應的數據結構為:
struct rw_semaphore {
//activity = 0:表示沒有對像在操作信號量
//activity > 0:表示有一個或者多個讀者在操作信號量
//activity < 0:表示有一個寫者在操作信號量
__s32 activity;
//讀寫信號量對應的自旋鎖
spinlock_t wait_lock;
//對應的等待隊列
struct list_head wait_list;
#if RWSEM_DEBUG
int debug;
#endif
}
5.3:讀寫信號量的操作與實現:
5.3.1:down_read():讀者獲得信號量
down_read()用來在讀者操作臨界區的情況。 它的代碼如下:
static inline void down_read(struct rw_semaphore *sem)
{
//可能會引起睡眠
might_sleep();
//調試用
rwsemtrace(sem,"Entering down_read");
__down_read(sem);
//調試用
rwsemtrace(sem,"Leaving down_read");
}
__down_read()的代碼如下:
void fastcall __sched __down_read(struct rw_semaphore *sem)
{
struct rwsem_waiter waiter;
struct task_struct *tsk;
rwsemtrace(sem, "Entering __down_read");
//獲取自旋鎖
spin_lock(&sem->wait_lock);
//sem->activeity>=0:說明有讀者進入了臨界區
//sem->wait_list:等待隊列
//如果activeity>=0. wait_list不為空:說明有寫者在等待
//如果activeity<0:說明有寫者在對臨界區操作
//如果臨界區中有讀者然後沒有寫者在等待.可以直接獲得此信號量
if (sem->activity >= 0 && list_empty(&sem->wait_list)) {
/* granted */
sem->activity++;
spin_unlock(&sem->wait_lock);
goto out;
}
//否則,將讀者加入等待隊列末尾
tsk = current;
set_task_state(tsk, TASK_UNINTERRUPTIBLE);
/* set up my own style of waitqueue */
waiter.task = tsk;
//對應的等待標誌是RWSEM_WAITING_FOR_READ
waiter.flags = RWSEM_WAITING_FOR_READ;
get_task_struct(tsk);
list_add_tail(&waiter.list, &sem->wait_list);
/* we don't need to touch the semaphore struct anymore */
spin_unlock(&sem->wait_lock);
/* wait to be given the lock */
//一個死循環,這樣是為了是被期待的喚醒
for (;;) {
//如果waiter.task==NULL.確實是被喚醒的.因為喚醒的時候,會將waiter,task清空
if (!waiter.task)
break;
//睡眠
schedule();
//設置進程狀態為不可中斷型
set_task_state(tsk, TASK_UNINTERRUPTIBLE);
}
//已經獲得信號量了.設置狀態為RUNNING 退出
tsk->state = TASK_RUNNING;
out:
rwsemtrace(sem, "Leaving __down_read");
}
5.3.2: up_read():讀者釋放信號量讀者操作完臨界區了之後,使用up_read()將信號量釋放。 代碼如下:
up_read()à
void fastcall __up_read(struct rw_semaphore *sem)
{
rwsemtrace(sem, "Entering __up_read");
spin_lock(&sem->wait_lock);
//--sem->activity == 0:此讀者是臨界區中的最後一個對象
//sem->wait_list不為空說明臨界區外有寫者在等待
if (--sem->activity == 0 && !list_empty(&sem->wait_list))
//喚醒等待隊列中的第一個進程
sem = __rwsem_wake_one_writer(sem);
//如果不是臨界區中的最後一個讀者或者是沒有寫者在等待,退出
spin_unlock(&sem->wait_lock);
rwsemtrace(sem, "Leaving __up_read");
}
__rwsem_wake_one_writer()代碼如下:
static inline struct rw_semaphore *
__rwsem_wake_one_writer(struct rw_semaphore *sem)
{
struct rwsem_waiter *waiter;
struct task_struct *tsk;
//將activity置為-1.因為臨界區是被寫者佔據了
sem->activity = -1;
//取等待隊列中的第一個進程
waiter = list_entry(sem->wait_list.next, struct rwsem_waiter, list);
//將其從等待鍊錶中刪除
list_del(&waiter->list);
tsk = waiter->task;
mb();
//將waiter->task清空
waiter->task = NULL;
//喚醒過程
wake_up_process(tsk);
//減少進程引用計數,因為在把它投入隊列的時候被+1 了.這樣做是為了防止task結構被釋放掉
put_task_struct(tsk);
return sem;
}
注意裡面對activity和waiter->task的操作.
5.3.3: down_write():寫者獲得信號量在更改臨界區的情況下,使用down_write()獲得信號量。 代碼如下:
down_write() à __down_write():
void fastcall __sched __down_write(struct rw_semaphore *sem)
{
struct rwsem_waiter waiter;
struct task_struct *tsk;
rwsemtrace(sem, "Entering __down_write");
spin_lock(&sem->wait_lock);
//如果臨界區沒有對像在操作.且沒有其它的寫者在等待
if (sem->activity == 0 && list_empty(&sem->wait_list)) {
/* granted */
//獲得信號
sem->activity = -1;
spin_unlock(&sem->wait_lock);
goto out;
}
//如果臨界區被其它對象佔據或者有其它的寫者在等待,將其加到等待隊列末尾
tsk = current;
set_task_state(tsk, TASK_UNINTERRUPTIBLE);
/* set up my own style of waitqueue */
waiter.task = tsk;
//置標誌為RWSEM_WAITING_FOR_WRITE
waiter.flags = RWSEM_WAITING_FOR_WRITE;
get_task_struct(tsk);
list_add_tail(&waiter.list, &sem->wait_list);
/* we don't need to touch the semaphore struct anymore */
spin_unlock(&sem->wait_lock);
/* wait to be given the lock */
//判斷是否被正常喚醒
for (;;) {
if (!waiter.task)
break;
schedule();
set_task_state(tsk, TASK_UNINTERRUPTIBLE);
}
tsk->state = TASK_RUNNING;
out:
rwsemtrace(sem, "Leaving __down_write");
}
注意讀者進入臨界的判斷條件和寫者進入臨界區的判斷條件的不同.讀者只需要sem->activity >= 0且沒有等待隊列就可以進行。 寫者必須要求sem->activity == 0且無等待隊列.
5.3.4:up_write():寫者釋放信號量寫者操作完了之後,調用up_write()釋放信號量。 代碼如下:
Up_write() à __up_write():
void fastcall __up_write(struct rw_semaphore *sem)
{
rwsemtrace(sem, "Entering __up_write");
spin_lock(&sem->wait_lock);
//寫者釋放信號量.將activity重置為0.因為臨界區已經沒有對像在操作了
sem->activity = 0;
//如果有進程在等待,將其喚醒
if (!list_empty(&sem->wait_list))
sem = __rwsem_do_wake(sem, 1);
spin_unlock(&sem->wait_lock);
rwsemtrace(sem, "Leaving __up_write");
}
寫進程釋放信號喚醒其它進程的處理有點特別,代碼如下:
static inline struct rw_semaphore *
__rwsem_do_wake(struct rw_semaphore *sem, int wakewrite)
{
struct rwsem_waiter *waiter;
struct task_struct *tsk;
int woken;
rwsemtrace(sem, "Entering __rwsem_do_wake");
waiter = list_entry(sem->wait_list.next, struct rwsem_waiter, list);
//在__up_write()調用的時候.wakewrite是置為1的.表示是一個寫者在進行喚醒
if (!wakewrite) {
if (waiter->flags & RWSEM_WAITING_FOR_WRITE)
goto out;
goto dont_wake_writers;
}
/* if we are allowed to wake writers try to grant a single write lock
* if there's a writer at the front of the queue
* - we leave the 'waiting count' incremented to signify potential
* contention
*/
//如果等待隊列的第一個進程是一個寫等待,只需要將這個寫者喚醒就行了
if (waiter->flags & RWSEM_WAITING_FOR_WRITE) {
sem->activity = -1;
list_del(&waiter->list);
tsk = waiter->task;
/* Don't touch waiter after ->task has been NULLed */
mb();
waiter->task = NULL;
wake_up_process(tsk);
put_task_struct(tsk);
goto out;
}
/* grant an infinite number of read locks to the front of the queue */
dont_wake_writers:
//如果第一個進程是讀等待,將將其後所有為讀等待的進程喚醒
woken = 0;
while (waiter->flags & RWSEM_WAITING_FOR_READ) {
struct list_head *next = waiter->list.next;
list_del(&waiter->list);
tsk = waiter->task;
mb();
waiter->task = NULL;
wake_up_process(tsk);
put_task_struct(tsk);
woken++;
if (list_empty(&sem->wait_list))
break;
waiter = list_entry(next, struct rwsem_waiter, list);
}
sem->activity += woken;
out:
rwsemtrace(sem, "Leaving __rwsem_do_wake");
return sem;
}
六:小結在linux內核中還有其它的幾種同步機制,我在這裡沒有一一列出分析。 在上面分析的幾種機制中,設得都很巧妙。 需要慢慢的琢磨。
轉載請註明出處:http://ericxiao.cublog.cn/
------------------------------------------
同步是linux內核中一種很重要的操作.它為內核提供了一種臨界區和SMP系統中的數據保護機制.今天就來分析一下在linux內核是怎麼樣實現這些操作的.
一:原子操作(摘自《understanding the linux kernel 2.4》)
原子操作是指在執行過程中不能被打斷的操作.它包括以下幾種類型:
進行一次或者零次對齊內存的訪問操作都是原子操作.因為這些指令一般都是單指令.不可能單指令在執行過程中被搶占。
如果在讀操作之後,寫操作之前沒有其他處理器佔用內存總線。 那麼從內存中讀取數據,更新數據並把更新後的數據寫回內存的這些“讀—修改—寫”彙編語言指令是原子的。 例如:inc或dec
操作碼前綴是lock字節的(0xf0)的“讀—修改—寫”彙編程序指令即使在SMP系統中也是原子的。 當控制單元檢測lock後,就會把總線鎖住,一直到這些加鎖指令執行完為止.
操作碼前綴是一個rep字節的彙編語言指令都不是原子的。 這條指令強行讓控制單元執行相同的指令.控制單元在執行新的指令之前要檢查掛起的中斷.
1.1: 在內核中提供了以下的幾組操作接口:
void atomic_set(atomic_t *v, int i)
int atomic_read(atomic_t *v)
void atomic_add(int i, atomic_t *v)
void atomic_sub(int i, atomic_t *v)
void atomic_inc(atomic_t *v)
void atomic_dec(atomic_t *v)
int atomic_inc_and_test(atomic_t *v)
int atomic_dec_and_test(atomic_t *v)
int atomic_sub_and_test(int i, atomic_t *v)
int atomic_add_negative(int i, atomic_t *v)
int atomic_add_return(int i, atomic_t *v)
int atomic_sub_return(int i, atomic_t *v)
int atomic_inc_return(atomic_t *v)
int atomic_dec_return(atomic_t *v)
atomic_sub(amount, &first_atomic)
atomic_add(amount, &second_atomic)
上述原子操作對像是一個32位數。 之所以定義成這樣,是為了和一般的操作區別。 提醒用戶這是一個原子操作接口。
1.2: 原子的位操作:
void set_bit(nr, void *addr)
void clear_bit(nr, void *addr)
void change_bit(nr, void *addr)
test_bit(nr, void *addr)
int test_and_set_bit(nr, void *addr)
int test_and_clear_bit(nr, void *addr)
int test_and_change_bit(nr, void *addr)
以atomic_inc(atomic_t *v), atomic_dec(atomic_t *v)為例來分析內核中對應的代碼.
static __inline__ void atomic_dec(atomic_t *v)
{
__asm__ __volatile__(
LOCK_PREFIX "decl %0"
:"+m" (v->counter));
}
LOCK_PREFIX為鎖操作前綴
Atomic_inc的代碼如下:
static __inline__ void atomic_inc(atomic_t *v)
{
__asm__ __volatile__(
LOCK_PREFIX "incl %0"
:"+m" (v->counter));
}
二:自旋鎖上述的原子操作操作接口提供了單變量的保護操作,但是在實際項目中,情況並沒有這麼簡單,我們需要保護的是一個臨界區.所以我們需要用到其它的加鎖方法.自旋鎖就是其中之一.
2.1:自旋鎖的特性自旋鎖只能被一個控制路徑所持有.進入臨界區,自旋鎖鎖住.其後如果有其它線程來取自旋鎖的自己就會原地“自旋” ,一直到持有者放開鎖為止.
很顯然,當得不到鎖的時候,CPU就會在一直在做輪詢判斷.也就是說如果一個進程取不到自旋鎖.也會一直判斷下去,不會主動去調度其它進程代替當前進程.但是這樣鎖機制的開銷極少.可以快速的開鎖和加鎖.持有自旋鎖的進程一般都會對臨界區有快速的處理另外,內核還規定持有自旋鎖的進程不能被搶占,這樣做主要是出於系統性能考慮.如果一個持自旋鎖的進程被調度出CPU.那其它需要這些鎖的進程被調度進來之後,也只做“自旋”工作.
2.2: 自旋鎖的實現自旋鎖在內核中對應的類型為: spinlock_t
typedef struct {
raw_spinlock_t raw_lock;
#if defined(CONFIG_PREEMPT) && defined(CONFIG_SMP)
//配置了內核搶占與SMP
unsigned int break_lock;
#endif
//自旋鎖DEBUG
#ifdef CONFIG_DEBUG_SPINLOCK
unsigned int magic, owner_cpu;
void *owner;
#endif
#ifdef CONFIG_DEBUG_LOCK_ALLOC
struct lockdep_map dep_map;
#endif
} spinlock_t;
typedef struct {
unsigned int slock;
} raw_spinlock_t;
在接下來的代碼中再來分析結構中各成員的含義.
2.2.1:spin_lock_init :自旋鎖的初始化代碼如下:
# define spin_lock_init(lock) \
do { *(lock) = SPIN_LOCK_UNLOCKED; } while (0)
#define SPIN_LOCK_UNLOCKED __SPIN_LOCK_UNLOCKED(old_style_spin_init)
# define __SPIN_LOCK_UNLOCKED(lockname) \
(spinlock_t) { .raw_lock = __RAW_SPIN_LOCK_UNLOCKED, \
SPIN_DEP_MAP_INIT(lockname) }
#define __RAW_SPIN_LOCK_UNLOCKED { 1 }
上述操作就是把sipinlock_t中的raw_lock置為__RAW_SPIN_LOCK_UNLOCKED.即為1.
此時鎖是打開狀態.
2.2.2: spin_lock() :獲取自旋鎖.
#define spin_lock(lock) _spin_lock(lock)
_spin_lock()定義如下:
void __lockfunc _spin_lock(spinlock_t *lock)
{
//禁止搶占
preempt_disable();
//這個函數在沒有定義自旋鎖調試的時候是空函數
spin_acquire(&lock->dep_map, 0, 0, _RET_IP_);
//相當於_raw_spin_lock(lock)
LOCK_CONTENDED(lock, _raw_spin_trylock, _raw_spin_lock);
}
LOCK_CONTENDED的定義如下:
#define LOCK_CONTENDED(_lock, try, lock) \
lock(_lock)
_raw_spin_lock()的操作如下:
# define _raw_spin_lock(lock) __raw_spin_lock(&(lock)->raw_lock)
static inline void __raw_spin_lock(raw_spinlock_t *lock)
{
asm volatile(
"\n1:\t"
//decl: lock->slock減1
LOCK_PREFIX " ; decl %0\n\t"
//如果不為負.跳轉到2f.2f後面沒有任何指令.即為退出
"jns 2f\n"
"3:\n"
//重複執行nop.nop是x86的小延遲函數
"rep;nop\n\t"
//比較0與lock->slock的值,如果lock->slock不大於0.跳轉到標號3.即繼續重複執行nop
"cmpl $0,%0\n\t"
//如果lock->slock不大於0.跳轉到標號3.即繼續重複執行nop
"jle 3b\n\t"
//如果lock->slock大於0.跳轉到標號1.重新判斷鎖的slock成員
"jmp 1b\n"
"2:\t" : "=m" (lock->slock) : : "memory");
}
在上面的函數中,可能對"jmp 1b\n"比較難以理解.在我們一般的觀念裡.獲得一個鎖.將其值減1.釋放鎖時將其值加1.實際上在自旋鎖的實現中lock->slock只有兩個可能值,一個是0. 一個是1.釋放鎖的時候並不是將lock->slock加1.而是將其賦為1.在後面的自旋鎖釋放代碼中再詳細分析.
# define spin_unlock(lock) _spin_unlock(lock)
void __lockfunc _spin_unlock(spinlock_t *lock)
{
//在沒有配置自旋鎖調試的時候.該函數為空函數
spin_release(&lock->dep_map, 1, _RET_IP_);
//
_raw_spin_unlock(lock);
//釋放自旋鎖了.允許內核搶占
preempt_enable();
}
# define _raw_spin_unlock(lock) __raw_spin_unlock(&(lock)->raw_lock)
static inline void __raw_spin_unlock(raw_spinlock_t *lock)
{
//將lock->slock賦為1
asm volatile("movb $1,%0" : "+m" (lock->slock) :: "memory");
}
總體來說,釋放自旋鎖的分為兩步:
第一步:將鎖的slock字段回歸到1.注意這裡不是加1,而是賦值1
第二步:允許內核搶占,與加鎖時的禁止內核搶占相對應
在內核中還有幾個與硬中斷和軟中斷有關的幾個自旋鎖API.如下所示:
void spin_lock(spinlock_t *lock)
void spin_lock_irqsave(spinlock_t *lock, unsigned long flags)
void spin_lock_irq(spinlock_t *lock)
void spin_lock_bh(spinlock_t *lock)
void spin_unlock(spinlock_t *lock);
void spin_unlock_irqrestore(spinlock_t *lock, unsigned long flags);
void spin_unlock_irq(spinlock_t *lock);
void spin_unlock_bh(spinlock_t *lock);
代碼都不復雜,可自行結合前面分析的IRQ中斷部份理解這幾個API的操作.
三:信號量的操作
3.1:信號量的特點:
根據前面的分析看到.自旋鎖是一種輕快但效率極其低下的一種加鎖,對於需要對臨界區長時間操作的情況下不太合適.因此,需要一種等待獲鎖的機制.這就是信號量.信號量在請求鎖的時候,如果不成功,便會將進程投入睡眠.因此要在允許睡眠的場合使用信號量.
3.2:信號量的實現信號量的結構如下所示:
struct semaphore {
//信號量資源數
atomic_t count;
//是否有一些進程在睡眠
int sleepers;
//信號的等待隊列
wait_queue_head_t wait;
}
3.3: sema_init() :信號量的初始化
static inline void sema_init (struct semaphore *sem, int val)
{
//設置信號量的資源數為val.sem->count表示允許同時有多少個進程佔有此信號量
atomic_set(&sem->count, val);
//sleepers成員置0,表示沒有進程在等待
sem->sleepers = 0;
//初始化等待隊列
init_waitqueue_head(&sem->wait);
}
3.4:down :獲取信號量
static inline void down(struct semaphore * sem)
{
//可能會引起睡眠
might_sleep();
__asm__ __volatile__(
"# atomic down operation\n\t"
//LOCK:鎖定總線標誌
//使sem->count -1 如果小於0.跳轉到2
LOCK "decl %0\n\t" /* --sem->count */
"js 2f\n"
//後面沒有指令了,退出
"1:\n"
LOCK_SECTION_START("")
//如果sem->count -1小於0.說明該信號已經沒有足夠的資源了.調用__down_failed
"2:\tcall __down_failed\n\t"
"jmp 1b\n"
LOCK_SECTION_END
:"=m" (sem->count)
:"c" (sem)
:"memory");
}
__down_failed的代碼如下:
asm(
".section .sched.text\n"
".align 4\n"
".globl __down_failed\n"
"__down_failed:\n\t"
#if defined(CONFIG_FRAME_POINTER)
"pushl %ebp\n\t"
"movl %esp,%ebp\n\t"
#endif
//之所以把eax edx ecx壓棧,是因為__down()函數的參數就是eax edx ecx.它從堆棧中取參數.所以//將其入棧.這樣做是從2.4遺留下來的,因為2.4的__down函數為fastcall型
"pushl %eax\n\t"
"pushl %edx\n\t"
"pushl %ecx\n\t"
"call __down\n\t"
"popl %ecx\n\t"
"popl %edx\n\t"
"popl %eax\n\t"
#if defined(CONFIG_FRAME_POINTER)
"movl %ebp,%esp\n\t"
"popl %ebp\n\t"
#endif
"ret"
)
__down的代碼如下:
asmlinkage void __sched __down(struct semaphore * sem)
{
struct task_struct *tsk = current;
//聲明一個等待隊列
DECLARE_WAITQUEUE(wait, tsk);
unsigned long flags;
//將進程狀態置為不可中斷型.外部信號不可以將其中斷
tsk->state = TASK_UNINTERRUPTIBLE;
//獲取自旋鎖且關中斷
spin_lock_irqsave(&sem->wait.lock, flags);
//加入等待隊列,並將等待標誌高為WQ_FLAG_EXCLUSIVE
add_wait_queue_exclusive_locked(&sem->wait, &wait);
//sleepers之前只能為0或者是1
sem->sleepers++;
for (;;) {
int sleepers = sem->sleepers;
/*
* Add "everybody else" into it. They aren't
* playing, because we own the spinlock in
* the wait_queue_head.
*/
//如果之前有進程在線程,則使sem->count+1 (剛好使count等於down進來的值)
//因為之前在down()中有減1操作
//負數返回真
if (!atomic_add_negative(sleepers - 1, &sem->count)) {
//已經有信號了,將sleeps置零,退出循環後會喚醒等列中的一個進程
sem->sleepers = 0;
break;
}
//將sleepers 置1
sem->sleepers = 1; /* us - see -1 above */
//釋放自旋量並恢復之前的中斷標誌
spin_unlock_irqrestore(&sem->wait.lock, flags);
//重新調度
schedule();
//重新調度後,因為其狀態是TASK_UNINTERRUPTIBLE .所以要釋放信號量的進程將其喚醒
//才能繼續運行
spin_lock_irqsave(&sem->wait.lock, flags);
//將狀態再設為TASK_UNINTERRUPTIBLE . 重新判斷是否有信號量資源
tsk->state = TASK_UNINTERRUPTIBLE;
}
//已經有信號量資源了,將等待隊列刪除
remove_wait_queue_locked(&sem->wait, &wait);
//喚醒其它隊列
wake_up_locked(&sem->wait);
//恢復中斷標誌和釋放自旋鎖
spin_unlock_irqrestore(&sem->wait.lock, flags);
tsk->state = TASK_RUNNING;
}
這裡要注意的是對sleeper成員的理解.結合後面的代碼.進程在沒有獲得鎖時會將其設為1.在獲得了鎖的時候將其設為0.所以,sleeper=1時表示有進程在等待.sleeper=0時,有進程獲得信號量退出,或者沒有進程在等待此信號量.
如果在等待隊列中進程被喚醒,並獲得了鎖.則將sleeper設為0.然後用break退出循環,再用wake_up_locked(&sem->wait)喚醒等待隊列中的一個進程,這個進程下次被調度的時候從shedule()後面開始運行.更改進程狀態好,調用if (!atomic_add_negative(sleepers - 1, &sem->count))判斷,此時sleeper=0.sleep-1 = -1.經過atomic_add_negative()使sem->count-1了.因此前面論述過.進程進來取信號量down()裡count有減1.然後如果有等待線程在獲取這個信號,剛又會將count+1,使其變到了原值.現在在我們這個情景中,已經有線程進程去了,應該要+1.
那思考一下:為什麼要對count sleeper做這樣的處理呢?直接使用使用計數不就完了嗎?進程等待獲取信號量的時候sleep+1 count-1.釋放信號量的時候sleep-1.count+1不可以嗎?
表面上上述的方法也能工作的很好,但是如果進程數目一多就要考慮到溢出問題了.
Linux在這裡設計的很巧妙,可以好好的回味.
3.5:up(): 釋放信號
static inline void up(struct semaphore * sem)
{
__asm__ __volatile__(
"# atomic up operation\n\t"
//sem->count 計數加1
LOCK "incl %0\n\t" /* ++sem->count */
//如果大於0的話,說明信號量資源充足,沒有進程在等待
//無需喚醒等待隊列.直接退出即可
"jle 2f\n"
"1:\n"
LOCK_SECTION_START("")
//否則喚醒隊列
"2:\tcall __up_wakeup\n\t"
"jmp 1b\n"
LOCK_SECTION_END
".subsection 0\n"
:"=m" (sem->count)
:"c" (sem)
:"memory");
}
asm(
".section .sched.text\n"
".align 4\n"
".globl __up_wakeup\n"
"__up_wakeup:\n\t"
"pushl %eax\n\t"
"pushl %edx\n\t"
"pushl %ecx\n\t"
"call __up\n\t"
"popl %ecx\n\t"
"popl %edx\n\t"
"popl %eax\n\t"
"ret"
);
同理調用__up()進行處理
asmlinkage void __up(struct semaphore *sem)
{
wake_up(&sem->wait);
}
這個函數很簡單,就是將信號量中的等待隊列喚醒.注意之前提到過的,加入等待隊列中所用的標誌WQ_FLAG_EXCLUSIVE.這個標誌表示只需將掛成隊列前面的進程喚醒.無需全部喚醒.
信號量的使用內核中還有很多的變體API.究其原理都差不多.可以自行了解^_^
四:完成變量
4.1:完全變量的特點在內核中經常有這樣的需要,在一個進程中要等待某個進程完成某個事情.內核為其提供了一個簡單的接口,其實它只是一個簡化版的信號量而已.
4.2:完全變量的結構完全變量在代碼中的結構如下:
struct completion {
//等待標誌
unsigned int done;
//等待隊列
wait_queue_head_t wait;
};
4.3:完全變量的操作與實現
wait_for_completion():等待條件的完成,它的代碼如下:
void __sched wait_for_completion(struct completion *x)
{
wait_for_common(x, MAX_SCHEDULE_TIMEOUT, TASK_UNINTERRUPTIBLE);
}
wait_for_common() à do_wait_for_common():
static inline long __sched
do_wait_for_common(struct completion *x, long timeout, int state)
{
//!x->done:條件末完成.投入睡眠等待
if (!x->done) {
DECLARE_WAITQUEUE(wait, current);
wait.flags |= WQ_FLAG_EXCLUSIVE;
//加入等待隊列
__add_wait_queue_tail(&x->wait, &wait);
do {
//如果是可中斷狀態且有末處理的信號。 將其移出等待隊列
if (state == TASK_INTERRUPTIBLE &&
signal_pending(current)) {
__remove_wait_queue(&x->wait, &wait);
return -ERESTARTSYS;
}
__set_current_state(state);
spin_unlock_irq(&x->wait.lock);
timeout = schedule_timeout(timeout);
spin_lock_irq(&x->wait.lock);
if (!timeout) {
//延時到了.將其移出等待隊列
__remove_wait_queue(&x->wait, &wait);
return timeout;
}
} while (!x->done);
//運行到這裡,條件已經完成了,將其從等待隊列移出
__remove_wait_queue(&x->wait, &wait);
}
//如果條件完成了,將x->done-- 直接返回
x->done--;
return timeout;
}
Complete()來用通告已經完成了這個條件.代碼如下:
void complete(struct completion *x)
{
unsigned long flags;
//持完全變量的自旋鎖且禁中斷
spin_lock_irqsave(&x->wait.lock, flags);
//x-
x->done++;
__wake_up_common(&x->wait, TASK_UNINTERRUPTIBLE | TASK_INTERRUPTIBLE,
1, 0, NULL);
//釋放自旋鎖且恢復中斷到以前的狀態
spin_unlock_irqrestore(&x->wait.lock, flags);
}
__wake_up_common()的代碼如下所示:
static void __wake_up_common(wait_queue_head_t *q, unsigned int mode,
int nr_exclusive, int sync, void *key)
{
wait_queue_t *curr, *next;
//遍歷等待隊列且喚醒等待的進程
list_for_each_entry_safe(curr, next, &q->task_list, task_list) {
unsigned flags = curr->flags;
if (curr->func(curr, mode, sync, key) &&
(flags & WQ_FLAG_EXCLUSIVE) && !--nr_exclusive)
break;
}
}
分析完了信號量之後,看完全變量的實現還是挺簡單.另外需要注意的是信號量與完成變量裡的自旋鎖的使用.信號量是在其中的一段操作中使用了自旋鎖.這是為了在這一段操作中保持串行化.而完全變量是一開始就加鎖.不允許wait_for_completion()和completion()同時調用.
五:讀寫信號量
5.1:讀寫信號量的特點通常受保護的數據是允許多個進程去讀的。 只需要保護寫操作就可以了。 如果多個讀者去排隊等待其它讀者的操作完成的話,顯然是有失效率的。 讀寫信號量就是為了解瘊這個問題而產生的。 讀寫信號量的等待隊列是一個嚴格的FIFO.有以下幾個特點:
1):讀者去獲取信號量的時候:
1:如果臨界區是空的或者有其它讀者在操作,以時如果沒有寫者在排隊,就把讀者放進去。 如果有寫者在排隊,就把讀者加入到等待隊列。
2:如果臨界區有寫者在操作。 就把這個讀者加入等待隊列。
2):寫者去獲取信號量的時候:
1:如果臨界區是空的且等待隊列為空。 放這個寫者進去。 如果等待隊不為空,就將其加至等待隊列
2:如果臨界區中有讀者在操作,就將其加入到等待隊列.
3):釋放信號量的時候:
1:如果這個進程是讀者。 檢查是不是臨界區內的最後一個讀者。 如果是最後一個讀者,檢查是否有寫者在等待。 如果有。 將寫者喚醒。如果沒有,直接退出。
2:如果釋放信號量是一個寫者,通常會喚醒等待隊列中的第一個進程
2.1:如果這個進程是讀者。 那它後面的所有讀者也會被喚醒(不包含寫者後面的)
2.2:如果這個進程是寫者。 那麼它後面的所有等待進程繼續睡眠.
總之:在這裡要注意對待讀者與寫者的區別。 在臨界區中可以允許有多個讀者,但不允許有多個寫者。
5.2:讀寫信號量結構:
讀寫信號量在內核中對應的數據結構為:
struct rw_semaphore {
//activity = 0:表示沒有對像在操作信號量
//activity > 0:表示有一個或者多個讀者在操作信號量
//activity < 0:表示有一個寫者在操作信號量
__s32 activity;
//讀寫信號量對應的自旋鎖
spinlock_t wait_lock;
//對應的等待隊列
struct list_head wait_list;
#if RWSEM_DEBUG
int debug;
#endif
}
5.3:讀寫信號量的操作與實現:
5.3.1:down_read():讀者獲得信號量
down_read()用來在讀者操作臨界區的情況。 它的代碼如下:
static inline void down_read(struct rw_semaphore *sem)
{
//可能會引起睡眠
might_sleep();
//調試用
rwsemtrace(sem,"Entering down_read");
__down_read(sem);
//調試用
rwsemtrace(sem,"Leaving down_read");
}
__down_read()的代碼如下:
void fastcall __sched __down_read(struct rw_semaphore *sem)
{
struct rwsem_waiter waiter;
struct task_struct *tsk;
rwsemtrace(sem, "Entering __down_read");
//獲取自旋鎖
spin_lock(&sem->wait_lock);
//sem->activeity>=0:說明有讀者進入了臨界區
//sem->wait_list:等待隊列
//如果activeity>=0. wait_list不為空:說明有寫者在等待
//如果activeity<0:說明有寫者在對臨界區操作
//如果臨界區中有讀者然後沒有寫者在等待.可以直接獲得此信號量
if (sem->activity >= 0 && list_empty(&sem->wait_list)) {
/* granted */
sem->activity++;
spin_unlock(&sem->wait_lock);
goto out;
}
//否則,將讀者加入等待隊列末尾
tsk = current;
set_task_state(tsk, TASK_UNINTERRUPTIBLE);
/* set up my own style of waitqueue */
waiter.task = tsk;
//對應的等待標誌是RWSEM_WAITING_FOR_READ
waiter.flags = RWSEM_WAITING_FOR_READ;
get_task_struct(tsk);
list_add_tail(&waiter.list, &sem->wait_list);
/* we don't need to touch the semaphore struct anymore */
spin_unlock(&sem->wait_lock);
/* wait to be given the lock */
//一個死循環,這樣是為了是被期待的喚醒
for (;;) {
//如果waiter.task==NULL.確實是被喚醒的.因為喚醒的時候,會將waiter,task清空
if (!waiter.task)
break;
//睡眠
schedule();
//設置進程狀態為不可中斷型
set_task_state(tsk, TASK_UNINTERRUPTIBLE);
}
//已經獲得信號量了.設置狀態為RUNNING 退出
tsk->state = TASK_RUNNING;
out:
rwsemtrace(sem, "Leaving __down_read");
}
5.3.2: up_read():讀者釋放信號量讀者操作完臨界區了之後,使用up_read()將信號量釋放。 代碼如下:
up_read()à
void fastcall __up_read(struct rw_semaphore *sem)
{
rwsemtrace(sem, "Entering __up_read");
spin_lock(&sem->wait_lock);
//--sem->activity == 0:此讀者是臨界區中的最後一個對象
//sem->wait_list不為空說明臨界區外有寫者在等待
if (--sem->activity == 0 && !list_empty(&sem->wait_list))
//喚醒等待隊列中的第一個進程
sem = __rwsem_wake_one_writer(sem);
//如果不是臨界區中的最後一個讀者或者是沒有寫者在等待,退出
spin_unlock(&sem->wait_lock);
rwsemtrace(sem, "Leaving __up_read");
}
__rwsem_wake_one_writer()代碼如下:
static inline struct rw_semaphore *
__rwsem_wake_one_writer(struct rw_semaphore *sem)
{
struct rwsem_waiter *waiter;
struct task_struct *tsk;
//將activity置為-1.因為臨界區是被寫者佔據了
sem->activity = -1;
//取等待隊列中的第一個進程
waiter = list_entry(sem->wait_list.next, struct rwsem_waiter, list);
//將其從等待鍊錶中刪除
list_del(&waiter->list);
tsk = waiter->task;
mb();
//將waiter->task清空
waiter->task = NULL;
//喚醒過程
wake_up_process(tsk);
//減少進程引用計數,因為在把它投入隊列的時候被+1 了.這樣做是為了防止task結構被釋放掉
put_task_struct(tsk);
return sem;
}
注意裡面對activity和waiter->task的操作.
5.3.3: down_write():寫者獲得信號量在更改臨界區的情況下,使用down_write()獲得信號量。 代碼如下:
down_write() à __down_write():
void fastcall __sched __down_write(struct rw_semaphore *sem)
{
struct rwsem_waiter waiter;
struct task_struct *tsk;
rwsemtrace(sem, "Entering __down_write");
spin_lock(&sem->wait_lock);
//如果臨界區沒有對像在操作.且沒有其它的寫者在等待
if (sem->activity == 0 && list_empty(&sem->wait_list)) {
/* granted */
//獲得信號
sem->activity = -1;
spin_unlock(&sem->wait_lock);
goto out;
}
//如果臨界區被其它對象佔據或者有其它的寫者在等待,將其加到等待隊列末尾
tsk = current;
set_task_state(tsk, TASK_UNINTERRUPTIBLE);
/* set up my own style of waitqueue */
waiter.task = tsk;
//置標誌為RWSEM_WAITING_FOR_WRITE
waiter.flags = RWSEM_WAITING_FOR_WRITE;
get_task_struct(tsk);
list_add_tail(&waiter.list, &sem->wait_list);
/* we don't need to touch the semaphore struct anymore */
spin_unlock(&sem->wait_lock);
/* wait to be given the lock */
//判斷是否被正常喚醒
for (;;) {
if (!waiter.task)
break;
schedule();
set_task_state(tsk, TASK_UNINTERRUPTIBLE);
}
tsk->state = TASK_RUNNING;
out:
rwsemtrace(sem, "Leaving __down_write");
}
注意讀者進入臨界的判斷條件和寫者進入臨界區的判斷條件的不同.讀者只需要sem->activity >= 0且沒有等待隊列就可以進行。 寫者必須要求sem->activity == 0且無等待隊列.
5.3.4:up_write():寫者釋放信號量寫者操作完了之後,調用up_write()釋放信號量。 代碼如下:
Up_write() à __up_write():
void fastcall __up_write(struct rw_semaphore *sem)
{
rwsemtrace(sem, "Entering __up_write");
spin_lock(&sem->wait_lock);
//寫者釋放信號量.將activity重置為0.因為臨界區已經沒有對像在操作了
sem->activity = 0;
//如果有進程在等待,將其喚醒
if (!list_empty(&sem->wait_list))
sem = __rwsem_do_wake(sem, 1);
spin_unlock(&sem->wait_lock);
rwsemtrace(sem, "Leaving __up_write");
}
寫進程釋放信號喚醒其它進程的處理有點特別,代碼如下:
static inline struct rw_semaphore *
__rwsem_do_wake(struct rw_semaphore *sem, int wakewrite)
{
struct rwsem_waiter *waiter;
struct task_struct *tsk;
int woken;
rwsemtrace(sem, "Entering __rwsem_do_wake");
waiter = list_entry(sem->wait_list.next, struct rwsem_waiter, list);
//在__up_write()調用的時候.wakewrite是置為1的.表示是一個寫者在進行喚醒
if (!wakewrite) {
if (waiter->flags & RWSEM_WAITING_FOR_WRITE)
goto out;
goto dont_wake_writers;
}
/* if we are allowed to wake writers try to grant a single write lock
* if there's a writer at the front of the queue
* - we leave the 'waiting count' incremented to signify potential
* contention
*/
//如果等待隊列的第一個進程是一個寫等待,只需要將這個寫者喚醒就行了
if (waiter->flags & RWSEM_WAITING_FOR_WRITE) {
sem->activity = -1;
list_del(&waiter->list);
tsk = waiter->task;
/* Don't touch waiter after ->task has been NULLed */
mb();
waiter->task = NULL;
wake_up_process(tsk);
put_task_struct(tsk);
goto out;
}
/* grant an infinite number of read locks to the front of the queue */
dont_wake_writers:
//如果第一個進程是讀等待,將將其後所有為讀等待的進程喚醒
woken = 0;
while (waiter->flags & RWSEM_WAITING_FOR_READ) {
struct list_head *next = waiter->list.next;
list_del(&waiter->list);
tsk = waiter->task;
mb();
waiter->task = NULL;
wake_up_process(tsk);
put_task_struct(tsk);
woken++;
if (list_empty(&sem->wait_list))
break;
waiter = list_entry(next, struct rwsem_waiter, list);
}
sem->activity += woken;
out:
rwsemtrace(sem, "Leaving __rwsem_do_wake");
return sem;
}
六:小結在linux內核中還有其它的幾種同步機制,我在這裡沒有一一列出分析。 在上面分析的幾種機制中,設得都很巧妙。 需要慢慢的琢磨。
沒有留言:
張貼留言