xv6 在进程切换之时还有一些限制:不允许进程在执行switch函数的过程中,持有任何其他的锁。即进程在调用switch函数的过程中,必须要持有 p->lock,但是同时又不能持有任何其他的锁

假设我们在一个只有一个CPU核的机器上,进程 P1 调用了 swtch 函数将 CPU 控制转给了调度器线程,调度器线程发现还有一个进程 P2 的内核线程正在等待被运行,所以调度器线程会切换到运行进程 P2。假设 P2 也想使用磁盘,UART 或者 console,它会对 P1 持有的锁调用 acquire,这是对于同一个锁的第二个 acquire 调用。这形成了死锁

后面讨论 Sleep & Wakeup 如何工作时会再次使用它们

Sleep & Wakeup

当我们写一个线程的代码时,有些场景需要等待一些特定的事件,比如读取 pipe 等待非空,磁盘写入一类,可能来自于 I/O,也可能来自于另一个进程

我们怎么能让进程或者线程等待一些特定的事件呢?一种非常直观的方法是通过循环实现 busy-wait。假设我们想从一个 pipe 读取数据,我们就写一个循环一直等待 pipe 的 buffer 不为空

如果你知道你要等待的事件极有可能在 0.1 微秒内发生,通过一个类似的循环等待或许是最正确的方式。通常来说在操作设备硬件的代码中会采用这样的等待方式

如果事件可能需要数个毫秒(要知道现代 PC 每秒运行数亿次指令)甚至你都不知道事件要多久才能发生,那么我们就不想在这一直循环并且浪费本可以用来完成其他任务的 CPU 时间。这时我们想要通过类似 swtch 函数调用的方式出让 CPU,并在我们关心的事件发生时重新获取 CPU。Coordination 就是有关出让 CPU ,直到等待的事件发生再恢复执行的工具。与许多 Unix 风格操作系统一样,xv6 使用 Sleep & Wakeup 的方式

// Atomically release lock and sleep on chan.
// Reacquires lock when awakened.
void
sleep(void *chan, struct spinlock *lk)
{
    struct proc *p = myproc();
      
    // Must acquire p->lock in order to
    // change p->state and then call sched.
    // Once we hold p->lock, we can be
    // guaranteed that we won't miss any wakeup
    // (wakeup locks p->lock),
    // so it's okay to release lk.
    
    acquire(&p->lock);  //DOC: sleeplock1
    release(lk);
    
    // Go to sleep.
    p->chan = chan;
    p->state = SLEEPING;
    
    sched();
    
    // Tidy up.
    p->chan = 0;
    
    // Reacquire original lock.
    release(&p->lock);
    acquire(lk);
}
    

// Wake up all processes sleeping on chan.
// Must be called without any p->lock.
void
wakeup(void *chan)
{
    struct proc *p;
    
    for(p = proc; p < &proc[NPROC]; p++) {
        if(p != myproc()){
            acquire(&p->lock);
            if(p->state == SLEEPING && p->chan == chan) {
                p->state = RUNNABLE;
            }
            release(&p->lock);
        }
    }
}

sleep 在获取,释放锁之后,设置 channel 与 state 就切换到另外的线程

wakeup 则根据 channel 将 SLEEPING 的进程状态改为 RUNABLE

当我们调用 sleep 函数时,我们传入一个 sleep channel 表明我们等待的特定事件,当然,并不关心这个数值代表什么,当调用 wakeup 时我们希望能传入与调用 sleep 函数相同的 sleep channel 来表明想唤醒哪个线程

Sleep & wakeup 的一个优点是它们可以很灵活,它们不关心代码正在执行什么操作,你不用告诉 sleep 函数你在等待什么事件,你也不用告诉 wakeup 函数发生了什么事件,你只需要匹配好 64bit 的 sleep channel 就行

不过,对于sleep函数,我们需要将一个锁作为第二个参数传入,这是一个稍微丑陋的实现。总的来说,不太可能设计一个通用的 sleep 函数并完全忽略需要等待的事件

Lost Wakeup

sleep 若只传入 sleep channel 可能会触发 lost wakeup,即 wakeup 在 proc 的状态变为 sleep 之前便已调用

我们以 uart 设备的 uartwrite 与 uartintr 为例:这里牵涉到两个锁:进程的锁与 uart 设备的锁

假如 sleep 没有接受 uart 锁的传参,就称这种 sleep 叫 broken_sleep 吧

void
uartwrite(char buf[],int n)
{
    acquire(&uart_tx_lock);
    int i = 0;
    while(i < n){
        while(tx_done == 0){
            //UART is busy sending a character.
            //wait for it to interrupt.
            //sleep(&tx_chan,&uart_tx_lock);
            release(&uart_tx_lock);
            broken_sleep(&tx_chan);
            acquire(&uart_tx_lock);
        }
        
        WriteReg(THR, buf[i]);
        i +=1 ;
        tx_done =0;
    }
      
    release(&uart_tx_lock);
}

void
uartintr(void)
{
    acquire(&uart_tx_lock);
    if(ReadReg(LSR) & LSR_TX_IDLE){
        //UART finished transmitting;wake up any sending thread.
        tx_done = 1;
        wakeup(&tx_chan);
    }
    release(&uart_tx_lock);
}

可以看到 uartwrite 的 broken_sleep 上下,先释放了 uart_tx_lock,sleep 结束后又将其锁上

假如说没有这两把锁是不行的,uartwrite 函数一开始就请求了 uart 设备的锁(毕竟 tx_done 和 uart 本身是共享的,必须要加锁),而在其 while 循环内,只有当 tx_done != 0 时才能退出循环,但是只有在 uartintr 里面才可能将这个值设为非 0,但是这个中断处理函数最开始就会请求 uart 的锁,但这是锁被 uartwrite 持有,deadly embrace

上面加锁方式的问题是,uartwrite 在期望中断处理程序执行的同时又持有了锁。而我们唯一期望中断处理程序执行的位置就是 sleep 执行期间,其他的时候 uartwrite 持有锁是没有问题的。所以我们在 sleep 之前释放了锁,待其放回再锁上

好的,假如我们有一个核正运行到 uartwrite 的 sleep 前面,此时另一个 CPU 核正在执行 uartintr 的 acquire。在锁释放的一瞬间,后者得到了锁,发现 UART 硬件完成了发送上一个字符,之后会设置 tx_done 为 1,最后再调用 wakeup 函数。但假如在 wakeup 的时候 sleep 还没调用,uartwrite 的线程并没又进入 SLEEPING 呢?那么 wakeup 并没有唤醒任何进程,而这次的 sleep 没有人会唤醒它。这就出现了 Lost Wakeup 问题

学生提问:当从sleep函数中唤醒时,不是已经知道是来自UART的中断处理程序调用wakeup的结果吗?这样的话tx_done有些多余。

Robert教授:我想你的问题也可以描述为:为什么需要通过一个循环while(tx_done == 0)来调用sleep函数?这个问题的答案适用于一个更通用的场景:实际中不太可能将sleep和wakeup精确匹配。并不是说sleep函数返回了,你等待的事件就一定会发生。举个例子,假设我们有两个进程同时想写UART,它们都在uartwrite函数中。可能发生这种场景,当一个进程写完一个字符之后,会进入SLEEPING状态并释放锁,而另一个进程可以在这时进入到循环并等待UART空闲下来。之后两个进程都进入到SLEEPING状态,当发生中断时UART可以再次接收一个字符,两个进程都会被唤醒,但是只有一个进程应该写入字符,所以我们才需要在sleep外面包一层while循环。实际上,你可以在XV6中的每一个sleep函数调用都被一个while循环包着。因为事实是,你或许被唤醒了,但是其他人将你等待的事件拿走了,所以你还得继续sleep。这种现象还挺普遍的。

在最初给的 sleep 与 wakeup 的源代码便解决了这个问题,这是由下面这些规则确保的:

  • 调用sleep时需要持有 condition lock,这样 sleep 函数才能知道相应的锁

  • sleep 函数只有在获取到进程的锁 p->lock 之后,才能释放 condition lock

  • wakeup 需要同时持有两个锁才能查看进程

Exit

在 xv6 中,一个进程如果退出的话,我们需要释放用户内存,释放 page table,释放 rapframe 对象,将进程在进程表单中标为 REUSABLE

这里会产生的两大问题:

  • 首先我们不能直接单方面的摧毁另一个线程。另一个线程可能正在另一个CPU核上运行,并使用着自己的栈;也可能另一个线程正在内核中持有了锁;也可能另一个线程正在更新一个复杂的内核数据

  • 另一个问题是,即使一个线程调用了exit系统调用,并且是自己决定要退出,它仍然还是要运行一小段代码。但只要它还在执行代码,它就不能释放正在使用的资源

xv6 有两个函数与关闭线程进程相关。第一个是 exit,第二个是 kill。让我们先来看位于 proc.c 中的 exit 函数

// Exit the current process.  Does not return.
// An exited process remains in the zombie state
// until its parent calls wait().
void
exit(int status)
{
    struct proc *p = myproc();
    
    if(p == initproc)
        panic("init exiting");
    
    // Close all open files.
    for(int fd = 0; fd < NOFILE; fd++){
        if(p->ofile[fd]){
            struct file *f = p->ofile[fd];
            fileclose(f);
            p->ofile[fd] = 0;
        }
    }
    
    begin_op();
    iput(p->cwd);
    end_op();
    p->cwd = 0;
    
    acquire(&wait_lock);
    
    // Give any children to init.
    reparent(p);
    
    // Parent might be sleeping in wait().
    wakeup(p->parent);
      
    acquire(&p->lock);
    
    p->xstate = status;
    p->state = ZOMBIE;
    
    release(&wait_lock);
    
    // Jump into the scheduler, never to return.
    sched();
    panic("zombie exit");
}

首先 exit 关闭了所有已打开的文件,将当前目录的引用释放给文件系统,接下来需要设置子进程的父进程为 init 进程,之后通过调用 wakeup 函数唤醒当前进程的父进程,然后设置进程状态为 ZOMBIE。现在进程还没有完全释放它的资源,还不能被重用,并且进程不会再运行。最后 sched 调度其他的进程

Wait

// Wait for a child process to exit and return its pid.
// Return -1 if this process has no children.
int
wait(uint64 addr)
{
    struct proc *np;
    int havekids, pid;
    struct proc *p = myproc();
    
    acquire(&wait_lock);
    
    for(;;){
        // Scan through table looking for exited children.
        havekids = 0;
        for(np = proc; np < &proc[NPROC]; np++){
            if(np->parent == p){
                // make sure the child isn't still in exit() or swtch().
                acquire(&np->lock);
    
                havekids = 1;
                if(np->state == ZOMBIE){
                    // Found one.
                    pid = np->pid;
                    if(addr != 0 && copyout(p->pagetable, addr, (char *)&np->xstate,sizeof(np->xstate)) < 0) {
                    release(&np->lock);
                    release(&wait_lock);
                    return -1;
                }
                freeproc(np);
                release(&np->lock);
                release(&wait_lock);
                return pid;
            }
            release(&np->lock);
      }
    }
    
    // No point waiting if we don't have any children.
    if(!havekids || p->killed){
        release(&wait_lock);
          return -1;
        }
        
        // Wait for a child to exit.
        sleep(p, &wait_lock);  //DOC: wait-sleep
    }
}

可以看到,wait 会循环扫描进程表单,去查找自己 ZOMBIE 的子进程。找到后,它会调用 freeproc 去释放子进程例如 trapframe,pagetable 等资源,并将 state 置为 UNUSED

wait 实际上也是进程退出的一个重要组成部分。在 Unix 中,对于每一个退出的进程,都需要有一个对应的 wait 系统调用,这就是为什么当一个进程退出时,它的子进程需要变成 init 进程的子进程。init 进程的工作就是在一个循环中不停调用 wait,因为每个进程都需要对应一个 wait,这样它的父进程才能调用 freeproc 函数,并清理进程的资源

当父进程完成了清理进程的所有资源,子进程的状态会被设置成 UNUSED。之后,fork 系统调用才能重用进程在进程表单的位置。直到子进程 exit 的最后,它都没有释放所有的资源,因为它还在运行的过程中,所以不能释放这些资源。最后是父进程释放了运行子进程代码所需要的资源。这样的设计可以让我们极大的精简 exit 的实现

Kill

kill 的实现比想象中更加的……朴素。Uni x中的一个进程可以将另一个进程的 ID 传递给 kill 系统调用,并让另一个进程停止运行。但是这会牵扯到上面讲过的问题,比如我们想要杀掉的进程的内核线程还在更新一些数据,像更新文件系统,创建一个文件之类的,我们不能就这样杀掉。所以 kill 系统调用不能直接停止目标进程的运行。实际上,在 xv6 和其他的 Unix 系统中,kill 基本上不做任何事情

// Kill the process with the given pid.
// The victim won't exit until it tries to return
// to user space (see usertrap() in trap.c).
int
kill(int pid)
{
    struct proc *p;
    
    for(p = proc; p < &proc[NPROC]; p++){
        acquire(&p->lock);
        if(p->pid == pid){
            p->killed = 1;
            if(p->state == SLEEPING){
                // Wake process from sleep().
                p->state = RUNNABLE;
            }
            release(&p->lock);
            return 0;
        }
        release(&p->lock);
    }
    return -1;
}

它先扫描进程表单,找到目标进程。然后只是将进程的 proc 结构体中 killed 标志位设置为 1。如果进程正在 SLEEPING 状态,将其设置为 RUNNABLE

而目标进程运行到内核代码中能安全停止运行的位置时,会检查自己的 killed 标志位,如果为 1,目标进程会自愿的执行 exit(-1)。在 usertrap 的最后就会做这样的检查,例如当一个定时器中断到来,usertrap 发现 p->killed === 1,就直接 exit 了

所以 kill 系统调用并不是真正的立即停止进程的运行,从调用 kill,到另一个进程真正退出,中间可能有很明显的延时

但其实在 xv6 的很多位置中,如果进程在 SLEEPING 状态时被 kill 了,进程会实际的退出。首先 kill 会将 SLEEPING 改为 RUNABLE,这样调度器线程会重新运行这个线程,而在很多地方的 sleep 循环内(比如下面的 piperead),会对 killed 进行检测:

while(pi->nread == pi->nwrite && pi->writeopen){  //DOC: pipe-empty
    if(pr->killed){
        release(&pi->lock);
        return -1;
    }
    sleep(&pi->nread, &pi->lock); //DOC: piperead-sleep
}

于是函数会立马返回到 usertrap,然后 exit

因为 pipe 再度 sleep 的时候,pipe 内大概率还是没有数据,所以 kill 了影响不大。但是在更新文件系统这样的操作时, while 里面就不会检查 killed 了,因为必须等待这个操作完成

while(b->disk == 1)
    sleep(b, &disk.vdisk_lock);