Uthread: switching between threads实验

实验目的

实现一个用户级的线程切换:测试程序user/uthread.c会创建三个线程,每个线程打印一个语句,大部分逻辑已经完成,只剩下context切换逻辑。抢占式中断因为经过陷入机制,因此流程是比较繁琐的。核心的切换是在内核中调用yieldyield调用schedsched做了一些故障检查,就调用了swtch,从这里我们会进入线程调度器线程,保存14个用户进程的内核寄存器,并且返回到scheduler函数调用的swtch函数,去除原来进程信息、释放锁,此后调度器又将另一个线程装载,调用swtch,并且从当前新进程的sched调用的swtch返回。继续执行陷入处理 直至恢复执行;

在用户级线程切换设计测试代码中,则省去了很多繁琐的跳转步骤;首先thread_schedule模拟了一部分scheduler函数功能,将线程存储在all_thread数组中,最大线程数量为4;线程初始化时取第一个线程,现在只需要解决两个问题:

  1. 线程如何执行程序?

  2. 如何实现线程的切换?

具体实现

直接看代码,三个位置新增代码已经在其中注明,全是围绕context进行处理,swtch实现两个线程上下文的切换,线程创建时,内核抢占式中断的ra指向的是一个返回用户空间代码(详见基础理论),这里只需要指向我们需要执行的函数即可,也即thread_create的函数指针参数。另外此处使用char数组char stack[STACK_SIZE]模拟栈空间,数组地址由低至高,栈地址高向低延申,因此初始化的sp应该是指向最高地址(注:内核栈地址是sp=p->kstack+PGSIZE,其中uint64 kstack)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
#include "kernel/types.h"
#include "kernel/stat.h"
#include "user/user.h"
/* Possible states of a thread: */
#define FREE 0x0
#define RUNNING 0x1
#define RUNNABLE 0x2
#define STACK_SIZE 8192
#define MAX_THREAD 4

struct context { //新增1 :context上下文定义
uint64 ra;
uint64 sp;

// callee-saved
uint64 s0;
uint64 s1;
uint64 s2;
uint64 s3;
uint64 s4;
uint64 s5;
uint64 s6;
uint64 s7;
uint64 s8;
uint64 s9;
uint64 s10;
uint64 s11;
};

struct thread {
char stack[STACK_SIZE]; /* the thread's stack */
int state; /* FREE, RUNNING, RUNNABLE */
struct context context; //新增1:结构体

};
struct thread all_thread[MAX_THREAD];
struct thread *current_thread;
extern void thread_switch(uint64, uint64);

void
thread_init(void) //将线程数组第一个标记为RUNNING状态
{
// main() is thread 0, which will make the first invocation to
// thread_schedule(). it needs a stack so that the first thread_switch() can
// save thread 0's state. thread_schedule() won't run the main thread ever
// again, because its state is set to RUNNING, and thread_schedule() selects
// a RUNNABLE thread.
current_thread = &all_thread[0];
current_thread->state = RUNNING;
}
void
thread_schedule(void) //实现线程切换函数
{
struct thread *t, *next_thread;
/* Find another runnable thread. */
next_thread = 0;
t = current_thread + 1; //取线程数组下一个对象
for(int i = 0; i < MAX_THREAD; i++){ //看下一个线程对象是否RUNABLE,不是就再跳转至下一个,遍历一次
if(t >= all_thread + MAX_THREAD) //t越界就到开头寻找
t = all_thread;
if(t->state == RUNNABLE) {
next_thread = t;
break;
}
t = t + 1;
}
if (next_thread == 0) { //没有找到退出
printf("thread_schedule: no runnable threads\n");
exit(-1);
}
if (current_thread != next_thread) { //有下个线程可运行,不满足说明数组只有一个进程可以运行
next_thread->state = RUNNING; //状态
t = current_thread;
current_thread = next_thread; //切换
/* YOUR CODE HERE
* Invoke thread_switch to switch from t to next_thread:
* thread_switch(??, ??);
*/
thread_switch((uint64)&t->context,(uint64)&next_thread->context);//新增2:切换上下文
} else
next_thread = 0;
}
void
thread_create(void (*func)())
{
struct thread *t;
for (t = all_thread; t < all_thread + MAX_THREAD; t++) {
if (t->state == FREE) break;
}
t->state = RUNNABLE;
// YOUR CODE HERE
t->context.ra = (uint64)func; //新增3:起始运行函数
t->context.sp = (uint64)t->stack+ STACK_SIZE-1; //新增3:栈向下延申,因此开始时stack要到char数组位置的末尾向前使用
}

void
thread_yield(void) //自动让出CPU
{
current_thread->state = RUNNABLE;
thread_schedule();
}
volatile int a_started, b_started, c_started;
volatile int a_n, b_n, c_n;
void
thread_a(void)
{
int i;
printf("thread_a started\n");
a_started = 1;
while(b_started == 0 || c_started == 0) //b和c还没开始启动
thread_yield(); //就让出使其运行

for (i = 0; i < 100; i++) {
printf("thread_a %d\n", i);
a_n += 1;
thread_yield(); //每打印一次就切换线程交替打印
}
printf("thread_a: exit after %d\n", a_n);
current_thread->state = FREE;
thread_schedule();
}
void
thread_b(void)
{
int i;
printf("thread_b started\n");
b_started = 1;
while(a_started == 0 || c_started == 0)
thread_yield();

for (i = 0; i < 100; i++) {
printf("thread_b %d\n", i);
b_n += 1;
thread_yield();
}
printf("thread_b: exit after %d\n", b_n);
current_thread->state = FREE;
thread_schedule();
}
void
thread_c(void)
{
int i;
printf("thread_c started\n");
c_started = 1;
while(a_started == 0 || b_started == 0)
thread_yield();

for (i = 0; i < 100; i++) {
printf("thread_c %d\n", i);
c_n += 1;
thread_yield();
}
printf("thread_c: exit after %d\n", c_n);
current_thread->state = FREE;
thread_schedule();
}
int
main(int argc, char *argv[])
{
a_started = b_started = c_started = 0;
a_n = b_n = c_n = 0;
thread_init();
thread_create(thread_a);
thread_create(thread_b);
thread_create(thread_c);
thread_schedule();
exit(0);
}
其中thread_switch同内核的swtch函数:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
sd ra, 0(a0)
sd sp, 8(a0)
sd s0, 16(a0)
sd s1, 24(a0)
sd s2, 32(a0)
sd s3, 40(a0)
sd s4, 48(a0)
sd s5, 56(a0)
sd s6, 64(a0)
sd s7, 72(a0)
sd s8, 80(a0)
sd s9, 88(a0)
sd s10, 96(a0)
sd s11, 104(a0)

ld ra, 0(a1)
ld sp, 8(a1)
ld s0, 16(a1)
ld s1, 24(a1)
ld s2, 32(a1)
ld s3, 40(a1)
ld s4, 48(a1)
ld s5, 56(a1)
ld s6, 64(a1)
ld s7, 72(a1)
ld s8, 80(a1)
ld s9, 88(a1)
ld s10, 96(a1)
ld s11, 104(a1)
ret #返回新用户空间
这个代码简化了很多细节,有很多细节略去了但是能实现线程的切换,这让我觉得不可思议,可能我对这个代码理解还不够深入吧,所以全部贴了上来,日后再看看。

验证:xv6中运行

1
uthread

完整Commit:Test1 Uthread Done(仅参考uthread文件即可)

Using threads实验

实验目的

这个实验需要了解一点散列表的基础,没有学过或者有点模糊的可以参考数据结构与算法(三):查找与排序算法里面提到的哈希表。

为了让我们熟悉多线程下锁的重要性设置了这样的实验,这个实验代码运行在Linux的gcc环境,看一下完整源代码:目前源代码的逻辑比较简单,首先创建了一个哈希表结构,其接收用户参数线程数量nthread,随机生成10万个键,对应的值为线程编号n;通过创建线程运行put_thread函数,将这10w个键值对分配给nthread个线程,插入到哈希表中,计算其操作需要的时间。随即会创建线程get_thread函数,每个线程都会按key编号在哈希表搜索10万个,计算这10万个键值对的丢失数量。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
#include <stdlib.h>
#include <unistd.h>
#include <stdio.h>
#include <assert.h>
#include <pthread.h>
#include <sys/time.h>

#define NBUCKET 5 //表长5
#define NKEYS 100000

struct entry { //链式哈希表结点
int key;
int value;
struct entry *next;
};
struct entry *table[NBUCKET]; //哈希表
int keys[NKEYS];
int nthread = 1;

double
now()
{
struct timeval tv;
gettimeofday(&tv, 0); //内置函数,获取目前时区时间
return tv.tv_sec + tv.tv_usec / 1000000.0; //秒、微秒
}

//将键值key、value填入新结点e,头指针为p、头结点为n,头插法插e
static void
insert(int key, int value, struct entry **p, struct entry *n)
{
struct entry *e = malloc(sizeof(struct entry));
e->key = key;
e->value = value;
e->next = n;
*p = e;
}

//搜索表中是否存在key,修改其值为value,不存在则插入新key
static
void put(int key, int value)
{
int i = key % NBUCKET; //哈希函数,获取插入表位置的下标

// is the key already present?
struct entry *e = 0;
for (e = table[i]; e != 0; e = e->next) { //找到下标位置,遍历链是否为有key
if (e->key == key)
break;
}
if(e){ //key存在就更新,不存在e为0
// update the existing key.
e->value = value;
} else { //不存在就插入
// the new is new.
insert(key, value, &table[i], table[i]);
}
}

//根据key值搜索,返回结点e
static struct entry*
get(int key)
{
int i = key % NBUCKET;
struct entry *e = 0;
for (e = table[i]; e != 0; e = e->next) {
if (e->key == key) break;
}
return e;
}

static void *
put_thread(void *xa)
{
int n = (int) (long) xa; // thread number
int b = NKEYS/nthread; //NKETS为最大键值对数100000,nthread是线程数,计算每个线程需要处理的键值对数目

for (int i = 0; i < b; i++) {
put(keys[b*n + i], n); //插入key,线程编号n为0~nthread-1
}

return NULL;
}

static void *
get_thread(void *xa)
{
int n = (int) (long) xa; // thread number
int missing = 0;

for (int i = 0; i < NKEYS; i++) { //计算没有被插入成功的键值对数量
struct entry *e = get(keys[i]);
if (e == 0) missing++;
}
printf("%d: %d keys missing\n", n, missing);
return NULL;
}

int
main(int argc, char *argv[])
{
pthread_t *tha;
void *value;
double t1, t0;

if (argc < 2) {
fprintf(stderr, "Usage: %s nthreads\n", argv[0]);
exit(-1);
}
nthread = atoi(argv[1]); //线程数为参数
tha = malloc(sizeof(pthread_t) * nthread);
srandom(0);
assert(NKEYS % nthread == 0); //确保每个线程处理整数个数据
for (int i = 0; i < NKEYS; i++) {
keys[i] = random(); //随机生成键
}

//
// first the puts
//
t0 = now(); //起始时间
for(int i = 0; i < nthread; i++) { //创建put_thread线程
assert(pthread_create(&tha[i], NULL, put_thread, (void *) (long) i) == 0);
}
for(int i = 0; i < nthread; i++) { //回收线程
assert(pthread_join(tha[i], &value) == 0);
}
t1 = now(); //终止时间

printf("%d puts, %.3f seconds, %.0f puts/second\n", //计算us级耗费时间、单位时间处理的数据量
NKEYS, t1 - t0, NKEYS / (t1 - t0));

//
// now the gets
//
t0 = now();
for(int i = 0; i < nthread; i++) { //创建put_thread线程
assert(pthread_create(&tha[i], NULL, get_thread, (void *) (long) i) == 0);
}
for(int i = 0; i < nthread; i++) { //回收put_thread线程
assert(pthread_join(tha[i], &value) == 0);
}
t1 = now();

printf("%d gets, %.3f seconds, %.0f gets/second\n",
NKEYS*nthread, t1 - t0, (NKEYS*nthread) / (t1 - t0)); //get_phread每个线程都会搜NKEYS次,因此计算的数据量是NKEYS*nthread
}
为什么键值对会丢失,这里要考虑一种特殊情况,在插入哈希表时,假如线程t1和线程t2都需要对同一个链表进行插入,假设t1插入操作进行更新头指针前,执行t2插入操作,那么t1插入的结点就会被覆盖。这不是唯一的可能,但是这些非原子操作会带来各种问题。

具体实现

只需要考虑加锁问题即可,实现很简单:
1. 声明一个全局变量:pthread_mutex_t lock
2. main函数添加初始化pthread_mutex_init(&lock, NULL)
3. 在put函数内加锁,确保insert操作是受锁保护的。

1
2
3
4
5
6
else {  //不存在就插入
// the new is new.
pthread_mutex_lock(&lock);
insert(key, value, &table[i], table[i]);
pthread_mutex_lock(&lock);
}

这样的代码似乎也没有问题,ph_fastph_safe都S能够顺利通过,但是实际上insert冲突只会发生在同一个链表上,因此没必要一个链表插入获得锁,其他链表也要阻塞等待锁,大可以每个链表都定义一个锁,就成为了锁数组: 1. 声明一个全局变量:pthread_mutex_t lock[NBUCKET] ,其中NBUCKET为哈希表长,也就是链表的数目。 2. main函数添加初始化:

1
2
for(int i=0;i<NBUCKET;i++)
pthread_mutex_init(&lock[i], NULL)
3. 在put函数内加锁,确保insert操作是受锁保护的。
1
2
3
4
5
6
else {  //不存在就插入
// the new is new.
pthread_mutex_lock(&lock[i]);
insert(key, value, &table[i], table[i]);
pthread_mutex_lock(&lock[i]);
}

除了这个优化,加锁位置的优化也是可以考虑的。首先将锁放在insert函数好像不能正常避免missing,这个极端情况我也没有想到。其次这里只有else加锁,因为我认为基本只有触发插入操作才有可能导致missing,但是这也有一个问题,就是两个线程同时插入一个key,几乎运行到else,其中一个会首先获得锁,另一个后获得锁会对同一个key进行insert,而在设计上我们的原意是遇到相同的key,只对key进行value的修改即可。所以这个可能是代码潜在的bug。但是这种小概率情况在很多次测试也没有发生,只有else才会申请锁也降低了开销,所以就没改过来了。

验证:Linux中运行

1
2
make ph
./ph 2

完整Commit:Using threads Done

Barrier实验

实验目的

实现一个“同步屏障”:同步屏障的意思是,使得每个线程都运行到同一个位置,再一起继续运行,这种机制可能在某些任务用到。这里是通过线程同步和互斥机制实现的,思路和我半年前撰写的Linux进程与线程其中的条件变量实现线程调度基本一致。线程的条件变量和pthread_cond_wait函数、pthread_mutex_lock函数一起结合使用,使用场景:假设现在有父子线程运行,为了父子线程对同一批数据的操作互不打扰,需要使用pthread_mutex_lock加锁,父线程运行时子线程无法访问数据,子线程也同理;现在我们希望父线程程序完成了某些操作,才执行子线程的程序,这时候我们就在子线程定义一个pthread_cond_wait,假设某次子线程优先获得锁,pthread_cond_wait函数会主动交出锁,阻塞等待父进程发出pthread_cond_signal信号,这样父进程在运行特定操作使用pthread_cond_signal就可以唤醒子进程了。

这里再多说一句,条件变量和信号量不同,条件变量不是PV操作,因此如果需要多个条件变量,就需要重新命名和初始化一个(不是加减法);对于我们上面的任务,往往需要2个以上的条件变量,因为我们还需要保证第一次是子进程先获得锁,因为如果父进程先获得锁,可能就会父进程运行很多次特定操作,才会运行子进程,这不一定是我们想要的效果。

因此同步屏障的实现方法就是判断现在阻塞等待的线程是否等于系统线程数量,不是就使用pthread_cond_wait交出锁,一旦等于,使用pthread_cond_broadcast(&cond)函数就可以唤醒所有因为这个条件变量阻塞的线程,不用pthread_cond_signal这样的单一对象函数。

具体实现

描述完毕,上函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
static void 
barrier()
{
// YOUR CODE HERE
//
// Block until all threads have called barrier() and
// then increment bstate.round.
//
//for barrier
pthread_mutex_lock(&bstate.barrier_mutex); //锁
bstate.nthread++; //阻塞进程++
if(bstate.nthread<nthread){ //没有全部阻塞
pthread_cond_wait(&bstate.barrier_cond,&bstate.barrier_mutex); //阻塞等待
}
else { //全部都已经阻塞了
bstate.nthread = 0;
bstate.round++; //round++(来自tips)
pthread_cond_broadcast(&bstate.barrier_cond); //全部唤醒运行
}
pthread_mutex_unlock(&bstate.barrier_mutex);
}

验证:Linux中运行

1
2
make barrier
./barrier 2

完整Commit:Barrier Done