并行程序设计第二次作业

4.5 请修改使用互斥量的π计算程序,使临界区在for循环内。这个版本的性能与原来的忙等待版本相比如何?我们怎样解释它?

首先查询$\pi$的计算公式

思路相对简单,用多个线程处理计算,并将所有的计算都累加,最后得出$\pi$的值.

忙等待
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
#include<stdio.h>
#include<stdlib.h>
#include<pthread.h>

int thread_count;//线程数量
double sum = 0.0;
int flag = 0;

void *Thread_sum(void* rank);

int main(int argc,char* argv[]){
long thread;
pthread_t* thread_handles;
thread_count = strtol(argv[1],NULL,10);//获取线程数量(即终端输入)
thread_handles = malloc(thread_count*sizeof(pthread_t));//分配空间

for(thread = 0;thread < thread_count;thread++){
//创建线程
pthread_create(&thread_handles[thread],NULL,Thread_sum,(void*)thread);
}
for(thread = 0;thread < thread_count;thread++){
//等待上一个进程结束
pthread_join(thread_handles[thread],NULL);

}
free(thread_handles);//释放空间
printf("%f",4*sum);//输出结果
return 0;
}//main

void *Thread_sum(void *rank){
long my_rank=(long)rank;
double factor,my_sum = 0.0;
long long i;
long long my_n = n/thread_count;
long long my_first_i = my_n*my_rank;
long long my_last_i = my_first_i + my_n;

if(my_first_i % 2 == 0)
factor = 1.0;
else
factor = -1.0;

for(i = my_first_i;i < my_last_i;i++,factor = -factor){
my_sum += factor/(2*i+1);
}

//Use Busy-Waiting to solve critical sections after loop
while(flag != my_rank);
sum += my_sum;
flag = (flag+1) % thread_count;
return NULL;
}//Thread_sum

忙等待的方法使用了while循环,持续占用CPU资源,且执行相加的顺序是按照编号的,可行性不足。

互斥量
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
#include<stdio.h>
#include<stdlib.h>
#include<pthread.h>

int thread_count;
double sum = 0.0;
int flag = 0;
pthread_mutex_t mutex;//实现互斥访问
void *Thread_sum(void* rank);

int main(int argc,char* argv[]){
long thread;
pthread_t* thread_handles;
//输入线程总数
thread_count = strtol(argv[1],NULL,10);
thread_handles = malloc(thread_count*sizeof(pthread_t));
//初始化互斥访问区

pthread_mutex_init(&mutex,NULL);
for(thread = 0;thread < thread_count;thread++){
//创建一个线程
pthread_create(&thread_handles[thread],NULL,Thread_sum,(void*)thread);
}
for(thread = 0;thread < thread_count;thread++){
pthread_join(thread_handles[thread],NULL);
}
free(thread_handles);//释放空间
pthread_mutex_destroy(&mutex);

printf("%f",4*sum);
}//main

void *Thread_sum(void *rank){
long my_rank=(long)rank;
double factor,my_sum = 0.0;
long long i;
long long my_n = n/thread_count;
long long my_first_i = my_n*my_rank;
long long my_last_i = my_first_i + my_n;
if(my_first_i % 2 == 0)
factor = 1.0;
else
factor = -1.0;
for(i = my_first_i;i < my_last_i;i++,factor = -factor){
my_sum += factor/(2*i+1);
}
pthread_mutex_lock(&mutex);
sum += my_sum;
pthread_mutex_unlock(&mutex);
return NULL;
}//Thread_sum

通过使用枷锁方法pthread_)mutex_lock()和解锁方法pthread_mutex_unlock()实现每次只有一个线程访问临界区,可以解决对关键部分的冲突访问。

4.6 请修改使用互斥量的π计算程序,使用信号量替代互斥量。这个版本的性能与互斥量版本相比如何?

信号量
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
#include<stdio.h>
#include<stdlib.h>
#include<pthread.h>
#include<semaphore.h>

int thread_count;//线程数量
double sum = 0.0;
int flag = 0;
sem_t sem;//信号量sem

void *Thread_sum(void* rank);

int main(int argc,char* argv[]){
long thread;
pthread_t* thread_handles;
//输入线程数量
thread_count = strtol(argv[1],NULL,10);
thread_handles = malloc(thread_count*sizeof(pthread_t));//为线程分配空间
//初始化信号量
sem_init(&sem,0,1);

for(thread = 0;thread < thread_count;thread++){
//创建线程
pthread_create(&thread_handles[thread],NULL,Thread_sum,(void*)thread);
}
for(thread = 0;thread < thread_count;thread++){
pthread_join(thread_handles[thread],NULL);
}
free(thread_handles);//释放空间
sem_destroy(&sem);//释放空间
printf("%f",4*sum);
}

void *Thread_sum(void *rank){
long my_rank=(long)rank;
double factor,my_sum = 0.0;
long long i;
long long my_n = n/thread_count;
long long my_first_i = my_n*my_rank;
long long my_last_i = my_first_i + my_n;
if(my_first_i % 2 == 0)
factor = 1.0;
else
factor = -1.0;
for(i = my_first_i;i < my_last_i;i++,factor = -factor){
my_sum += factor/(2*i+1);
}
sem_wait(&sem);
sum += my_sum;
sem_post(&sem);
return NULL;
}

初始化设定sem=1,所以在第一个要访问临界区的线程,运行sem_wait(sem),sem减一,并可以访问临界区。而其他线程执行sem_wait(sem)时,因sem=0,所以处于等待的状态。相比于互斥量的实现,信号量可以将信号初始化为一个任意非负值。

4.7 尽管生产者一消费者同步采用信号量很容易实现,但它也能用互斥量来实现。

基本的想法是:让生产者线程和消费者线程共享一个互斥量。用一个被主线程初始化为false的标志变量来表示是否有产品可以被“消费”。这两个线程的执行如下:

image.png

如果消费者线程首先进入循环,它会看到没有可用的信息(message_available值为false)并在调用pthread_mutex_unlock后返回。消费者线程重复上述过程,直到生产者线程生产出信息。请编写一个双线程程序,实现这个版本的生产者一消费者同步。可以将这个程序一般化吗?让它能够运行2k个线程,其中奇数线程是消费者,偶数线程是生产者。另外,将程序一般化为每个线程既是生产者又是消费者,能做到吗?例如,线程q既要发送一条信息给线程(q+1) mod t,又要从线程(q-1+t) mod t接收一条信息,如何编写程序?要使用忙等待吗?

解决思路

我们使用队列queue来表示上述具有n个缓冲区的缓冲池。每投入(取出)一个产品,缓存池中就相应的插入(删除)一个节点。由于该缓冲池是被组织成队列的形式,因此,队空队满的判断条件分别如下:

1
2
q->front == q->rear;//队空
q->front == (q->rear+1)%SIZE;//队满

引入一个整型变量num,置其初值为0,当生产者(消费者)进程向缓冲池投入(取走)一个产品时,num对应的加一(减一)。同时,由于缓冲区是共享的,因此需要对生产者、消费者使用缓冲区进行限制,以此达到同步的效果,即在生产者向缓冲区投入产品时,消费者不得使用缓冲池;消费者向缓冲区取出产品时同理。

生产者和消费者的操作进程可如下表示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
void producer()
{
sem_wait();
lockf();
QueueFull();
Enqueue();
unlockf();
sem_post();
}
void customer()
{
sem_wait();
lockf();
QueueEmpty();
Dequeue();
unlockf();
sem_post();
}
双线程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
#define TRUE  1
#define FALSE 0
#define SIZE 2

int main()
{
srand((unsigned int)time(NULL));
//信号量地址,信号量在线程间共享,信号量的初始值
sem_init(&sem.count, 0, 10); //信号量初始化(做多容纳10条消息,容纳了10条生产者将不会生产消息)
pthread_mutex_init(&mutex, NULL); //互斥锁初始化
InitQueue(&(sem.q)); //队列初始化
pthread_t producid;
pthread_t consumid;
pthread_create(&producid, NULL, Producer, NULL); //创建生产者线程
pthread_create(&consumid, NULL, Customer, NULL); //创建消费者线程
pthread_join(consumid, NULL); //线程等待,如果没有这一步,主程序会直接结束,导致线程也直接退出。
sem_destroy(&sem.count); //信号量的销毁
pthread_mutex_destroy(&mutex); //互斥锁的销毁
return 0;
}
2k进程

与上述双线程大致一致,将队列大小SIZE更改为2k即可。

既是消费者又是生产者

将队列大小SIZE更改为1能满足一部分操作。

q向(q+1)modt发送,从(q-1)modt接收

需要创建t个队列,通过忙等待的方式进行生产者消费者进程同步。