生产者消费者程序,用semaphore控制

anminliu 2009-03-14 02:14:17
生产者消费者程序,用semaphore控制
程序接受3个命令行参数,生产者数量,此消费者数量和缓冲区大小,运行程序的结果如下:

>./produce_consume 3 1 2
Main program has started ...
Producer 0 has started ...
Producer 0: Deposited item 0.
Producer 1 has started ...
Producer 1: Deposited item 1.
Producer 2 has started ...
Consumer 0 has started ...

Consumer 0: Removed item 0.
Producer 2: Deposited item 2.
2 item(s) left in the buffer!
Threads terminated!

或者:
>./produce_consume 2 3 2
Main program has started ...
Producer 0 has started ...
Producer 0: Deposited item 0.
Producer 1 has started ...
Producer 1: Deposited item 1.
Consumer 0 has started ...
Consumer 0: Removed item 0.
Consumer 1 has started ...
Consumer 1: Removed item 1.
Consumer 2 has started ...

或者:
>./produce_consume 2 2 2
Main program has started ...
Producer 0 has started ...
Producer 0: Deposited item 0.
Producer 1 has started ...
Producer 1: Deposited item 1.
Consumer 0 has started ...
Consumer 0: Removed item 0.
Consumer 1 has started ...
Consumer 1: Removed item 1.
0 item(s) left in the buffer!
Threads terminated!

输入的参数不同结果不同!



...全文
703 21 打赏 收藏 转发到动态 举报
写回复
用AI写文章
21 条回复
切换为时间正序
请发表友善的回复…
发表回复
hatingfarmer 2010-08-01
  • 打赏
  • 举报
回复

#define DECLARE_SEMAPHORE(_hSemaphore) HANDLE _hSemaphore = NULL;

#define INIT_SEMAPHORE(_hSemaphore,_initCount,_maxCount) \
_hSemaphore = CreateSemaphore(NULL, _initCount, _maxCount ,NULL);

#define P(_hSemaphore) assert(WAIT_OBJECT_0 == WaitForSingleObject(_hSemaphore, INFINITE));

#define V(_hSemaphore) ReleaseSemaphore( _hSemaphore, 1,NULL);

DECLARE_SEMAPHORE(hEmpty) ;

DECLARE_SEMAPHORE(hFull) ;

DECLARE_SEMAPHORE(hMutex) ;

class Producer_comsumer
{
public:
void Producer()
{
P(hMutex);
//生产产品
V(hEmpty);
P(hFull);

V(hMutex);
}

void Comsumer()
{
P(hMutex);
P(hEmpty);
//卖产品
V(hFull);
V(hMutex);
}

public:
Producer_comsumer()
{
INIT_SEMAPHORE(hMutex,0,1);
INIT_SEMAPHORE(hEmpty,0,10);
INIT_SEMAPHORE(hFull,10,10);
}

};
chin_chen 2009-03-15
  • 打赏
  • 举报
回复
const.h头文件
 1#ifndef CONST_H
2#define CONST_H
3#define MaxSize 10 //栈中最多只有十个元素 
4#define TRUE 1
5#define FALSE 0
6#define ERROR 0
7#define OVERFLOW -2
8#define OK 1
9#define MAXSIZE 20
10#define MAXLEN 100//字符串的最大长度
11#endif
12


主程序exp1.c
#include <stdio.h>
2#include <stdlib.h>
3#include <unistd.h>
4#include <pthread.h>
5#include <errno.h>
6#include <sys/ipc.h>
7#include <semaphore.h>
8#include <fcntl.h>
9#include "Queue.h"
10#include "const.h"
11#define N 5
12time_t end_time;
13sem_t mutex,full,empty;
14int fd;
15Queue * qt; /**//*缓冲区*/
16Elemtype p;
17
18void consumer(void *arg);
19void productor(void *arg);
20
21int main(int argc, char *argv[])
22{
23 pthread_t id1,id2;
24 pthread_t mon_th_id;
25 int ret;
26 end_time = time(NULL)+30;
27 qt = InitQueue();
28 p.lNumber = 1000;
29 ret=sem_init(&mutex,0,1); /**//*初使化互斥信号量为1*/
30 ret=sem_init(&empty,0,N); /**//*初使化empty信号量为N*/
31 ret=sem_init(&full,0,0); /**//*初使化full信号量为0*/
32 if(ret!=0)
33 {
34 perror("sem_init");
35 }
36 ret=pthread_create(&id1,NULL,(void *)productor, NULL); /**//*创建两个线程*/
37 if(ret!=0)
38 perror("pthread cread1");
39 ret=pthread_create(&id2,NULL,(void *)consumer, NULL);
40 if(ret!=0)
41 perror("pthread cread2");
42 pthread_join(id1,NULL);
43 pthread_join(id2,NULL);
44
45 exit(0);
46}
47/**//*生产者线程*/
48void productor(void *arg)
49{
50 int i,nwrite;
51 while(time(NULL) < end_time){
52 sem_wait(&empty);// p(empty)
53 sem_wait(&mutex);// p(mutex)
54 if(TRUE==QueueFull(*qt))
55 {
56 //队满
57 printf("Productor:buffer is full ,please try to write later.\n");
58 }
59 else
60 {
61 EnQueue(qt,p);
62 printf("Productor:write [%d] to buffer \n",p.lNumber);
63 p.lNumber++;
64 }
65
66 sem_post(&full);//v(full)
67 sem_post(&mutex);//v(mutex)
68 sleep(1);
69 }
70}//productor
71
72/**//*消费者线程*/
73void consumer(void *arg)
74{
75 int nolock=0;
76 int ret,nread;
77 Elemtype p2;
78 while((time(NULL) < end_time)||(FALSE==(QueueEmpty(*qt))))
79 {
80 sem_wait(&full);//p(full)
81 sem_wait(&mutex);//p(mutex)
82 if(TRUE==QueueEmpty(*qt))
83 {
84 //队空
85 printf("Consumer:the buffer is empty,please try to read later.\n");
86 }
87 else
88 {
89 DeQueue(qt,&p2);
90 printf("Consumer:read [%d] from buffer.\n",p2.lNumber);
91 }
92 sem_post(&empty);//v(empty)
93 sem_post(&mutex);//v(mutex)
94 sleep(2);
95 }/**//*end of while((time(NULL) < end_time)||(FALSE==(QueueEmpty(*qt))))*/
96}//consumer
97


--Queue.c 循环队列的实现
/**//*
2队列的基本操作函数
3*/
4#include "stdio.h"
5#include "malloc.h"
6#include "const.h"
7#include "Queue.h"
8/**//*
9 初使化队列
10*/
11Queue * InitQueue()
12{
13 Queue * Q = (Queue *)malloc(sizeof(Queue));
14
15 Q->front = Q->rear = 0;
16 return Q;
17}
18
19/**//*
20 进队
21*/
22int EnQueue(Queue *Q,Elemtype x)
23{
24 if((Q->rear + 1)%MaxSize==Q->front)/**//* 队满 */
25 return FALSE;
26 else
27 {
28 Q->q[Q->rear] = x;
29 Q->rear = (Q->rear+1)%MaxSize;
30 return TRUE;
31 }
32}
33
34/**//*
35 出队
36*/
37int DeQueue(Queue *Q,Elemtype *x)
38{
39 if(Q->rear==Q->front)/**//*队空*/
40 return FALSE;
41 else
42 {
43 *x = Q->q[Q->front];
44 Q->front = (Q->front+1)%MaxSize;
45 return TRUE;
46 }
47}
48
49/**//*
50 判断队是否为空,空返回0
51 非空返回 1
52*/
53int QueueEmpty(Queue Q)
54{
55 if(Q.rear==Q.front)/**//*队空*/
56 return TRUE;
57 else
58 return FALSE;
59}
60
61/**//*
62 返回队例中的最后的一个元素(原队列还要存在)
63*/
64Elemtype Last(Queue *Q)
65{
66
67 Elemtype *prElem = NULL;
68 Queue *prTempQueue;
69 /**//*这个临时队列,把原队列放到里面,当取完最后一
70 个元素后,就把这个队列中的元素放回原来的队列*/
71 prTempQueue = InitQueue();
72 while(QueueEmpty(*Q)==1)
73 {
74 DeQueue(Q,prElem);
75 EnQueue(prTempQueue,*prElem);
76 }
77 while(QueueEmpty(*prTempQueue)==1)
78 {
79 DeQueue(prTempQueue,prElem);
80 EnQueue(Q,*prElem);
81 }
82 return *prElem;
83}/**//*Last*/
84
85/**//*
86 判断队是否为满,满返回TRUE
87 非满返回FALSE
88*/
89int QueueFull(Queue Q)
90{
91 if(((Q.rear+1)%MaxSize)==Q.front)/**//*队空*/
92 return TRUE;
93 else
94 return FALSE;
95}
96


--Queue.h 循环队列的函数说明
#ifndef QUEUE_H_LY
2#define QUEUE_H_LY
3#include "Typedefine.h"
4/**//*
52007-12-23 10:31:14
6*/
7/**//*
8 初使化队列信息
9*/
10Queue * InitQueue();
11/**//*
12 进队(因为没有队满的情况,所以没有返回值)
13*/
14
15int EnQueue(Queue *Q,Elemtype x);
16/**//*
17 出队,x为出队的那个结点的数据域
18 返回:1成功/0失败
19*/
20int DeQueue(Queue *Q,Elemtype *x);
21/**//*
22 判断队空1(不空)/0(为空)
23*/
24int QueueEmpty(Queue Q);
25/**//*
26 统计一个栈链中的元素的个数
27*/
28int QueueCount(Queue *HQ);
29
30/**//*
31 判断队是否为满,满返回TRUE
32 非满返回FALSE
33*/
34int QueueFull(Queue Q);
35
36#endif
37
anminliu 2009-03-15
  • 打赏
  • 举报
回复
问题:Practice multi-threaded programming with semaphores of the
Pthread libraries.
程序接受3个命令行参数,生产者线程数量,消费者线程数量和缓冲区大小
生产者和消费者线程都有序列号,从零开始,每次加1,
每个线程打印信息当它启动和存取东西,存取的就是线程序列号
生产者存东西到缓冲区,直到缓冲区满时,切换到消费者线程,消费者消费完,如果生产者
还生产东西,继续存到缓冲区,如果东西不够消费者消费,程序继续等待,
最后检查缓冲区是否剩东西,打印出剩余的数量


运行程序的结果如下:

>./produce_consume 3 1 2
Main program has started ...
Producer 0 has started ...
Producer 0: Deposited item 0.
Producer 1 has started ...
Producer 1: Deposited item 1.
Producer 2 has started ...
Consumer 0 has started ...
Consumer 0: Removed item 0.
Producer 2: Deposited item 2.
2 item(s) left in the buffer!
Threads terminated!

或者:
>./produce_consume 2 3 2
Main program has started ...
Producer 0 has started ...
Producer 0: Deposited item 0.
Producer 1 has started ...
Producer 1: Deposited item 1.
Consumer 0 has started ...
Consumer 0: Removed item 0.
Consumer 1 has started ...
Consumer 1: Removed item 1.
Consumer 2 has started ...

或者:
>./produce_consume 2 2 2
Main program has started ...
Producer 0 has started ...
Producer 0: Deposited item 0.
Producer 1 has started ...
Producer 1: Deposited item 1.
Consumer 0 has started ...
Consumer 0: Removed item 0.
Consumer 1 has started ...
Consumer 1: Removed item 1.
0 item(s) left in the buffer!
Threads terminated!

输入的参数不同结果不同!

zhongliang871014 2009-03-15
  • 打赏
  • 举报
回复
lz你太搓,想拿代码,不给
anminliu 2009-03-15
  • 打赏
  • 举报
回复
有c++的代码没??
sherrik 2009-03-14
  • 打赏
  • 举报
回复
up~
太乙 2009-03-14
  • 打赏
  • 举报
回复
upup~~
太乙 2009-03-14
  • 打赏
  • 举报
回复
upup~~
chin_chen 2009-03-14
  • 打赏
  • 举报
回复
[Quote=引用 12 楼 anminliu 的回复:]
楼上的是网上找的,也不错,但不怎么和我的题意啊
[/Quote]
我不知道你的题意啊。
anminliu 2009-03-14
  • 打赏
  • 举报
回复
楼上的是网上找的,也不错,但不怎么和我的题意啊
chin_chen 2009-03-14
  • 打赏
  • 举报
回复
产者-消费者问题是一个经典的进程同步问题,该问题最早由Dijkstra提出,用以演示他提出的信号量机制。在同一个进程地址空间内执行的两个线程。生产者线程生产物品,然后将物品放置在一个空缓冲区中供消费者线程消费。消费者线程从缓冲区中获得物品,然后释放缓冲区。当生产者线程生产物品时,如果没有空缓冲区可用,那么生产者线程必须等待消费者线程释放出一个空缓冲区。当消费者线程消费物品时,如果没有满的缓冲区,那么消费者线程将被阻塞,直到新的物品被生产出来。



#include <windows.h>
#include <iostream>

const unsigned short SIZE_OF_BUFFER = 10; //缓冲区长度
unsigned short ProductID = 0; //产品号
unsigned short ConsumeID = 0; //将被消耗的产品号
unsigned short in = 0; //产品进缓冲区时的缓冲区下标
unsigned short out = 0; //产品出缓冲区时的缓冲区下标

int g_buffer[SIZE_OF_BUFFER]; //缓冲区是个循环队列
bool g_continue = true; //控制程序结束
HANDLE g_hMutex; //用于线程间的互斥
HANDLE g_hFullSemaphore; //当缓冲区满时迫使生产者等待
HANDLE g_hEmptySemaphore; //当缓冲区空时迫使消费者等待

DWORD WINAPI Producer(LPVOID); //生产者线程
DWORD WINAPI Consumer(LPVOID); //消费者线程

int main()
{
//创建各个互斥信号
g_hMutex = CreateMutex(NULL,FALSE,NULL);
g_hFullSemaphore = CreateSemaphore(NULL,SIZE_OF_BUFFER-1,SIZE_OF_BUFFER-1,NULL);
g_hEmptySemaphore = CreateSemaphore(NULL,0,SIZE_OF_BUFFER-1,NULL);

//调整下面的数值,可以发现,当生产者个数多于消费者个数时,
//生产速度快,生产者经常等待消费者;反之,消费者经常等待
const unsigned short PRODUCERS_COUNT = 3; //生产者的个数
const unsigned short CONSUMERS_COUNT = 1; //消费者的个数

//总的线程数
const unsigned short THREADS_COUNT = PRODUCERS_COUNT+CONSUMERS_COUNT;

HANDLE hThreads[PRODUCERS_COUNT]; //各线程的handle
DWORD producerID[CONSUMERS_COUNT]; //生产者线程的标识符
DWORD consumerID[THREADS_COUNT]; //消费者线程的标识符

//创建生产者线程
for (int i=0;i<PRODUCERS_COUNT;++i){
hThreads[i]=CreateThread(NULL,0,Producer,NULL,0,&producerID[i]);
if (hThreads[i]==NULL) return -1;
}
//创建消费者线程
for (i=0;i<CONSUMERS_COUNT;++i){
hThreads[PRODUCERS_COUNT+i]=CreateThread(NULL,0,Consumer,NULL,0,&consumerID[i]);
if (hThreads[i]==NULL) return -1;
}

while(g_continue){
if(getchar()){ //按回车后终止程序运行
g_continue = false;
}
}

return 0;
}

//生产一个产品。简单模拟了一下,仅输出新产品的ID号
void Produce()
{
std::cerr << "Producing " << ++ProductID << " ... ";
std::cerr << "Succeed" << std::endl;
}

//把新生产的产品放入缓冲区
void Append()
{
std::cerr << "Appending a product ... ";
g_buffer[in] = ProductID;
in = (in+1)%SIZE_OF_BUFFER;
std::cerr << "Succeed" << std::endl;

//输出缓冲区当前的状态
for (int i=0;i<SIZE_OF_BUFFER;++i){
std::cout << i <<": " << g_buffer[i];
if (i==in) std::cout << " <-- 生产";
if (i==out) std::cout << " <-- 消费";
std::cout << std::endl;
}
}

//从缓冲区中取出一个产品
void Take()
{
std::cerr << "Taking a product ... ";
ConsumeID = g_buffer[out];
out = (out+1)%SIZE_OF_BUFFER;
std::cerr << "Succeed" << std::endl;

//输出缓冲区当前的状态
for (int i=0;i<SIZE_OF_BUFFER;++i){
std::cout << i <<": " << g_buffer[i];
if (i==in) std::cout << " <-- 生产";
if (i==out) std::cout << " <-- 消费";
std::cout << std::endl;
}
}

//消耗一个产品
void Consume()
{
std::cerr << "Consuming " << ConsumeID << " ... ";
std::cerr << "Succeed" << std::endl;
}

//生产者
DWORD WINAPI Producer(LPVOID lpPara)
{
while(g_continue){
WaitForSingleObject(g_hFullSemaphore,INFINITE);
WaitForSingleObject(g_hMutex,INFINITE);
Produce();
Append();
Sleep(1500);
ReleaseMutex(g_hMutex);
ReleaseSemaphore(g_hEmptySemaphore,1,NULL);
}
return 0;
}

//消费者
DWORD WINAPI Consumer(LPVOID lpPara)
{
while(g_continue){
WaitForSingleObject(g_hEmptySemaphore,INFINITE);
WaitForSingleObject(g_hMutex,INFINITE);
Take();
Consume();
Sleep(1500);
ReleaseMutex(g_hMutex);
ReleaseSemaphore(g_hFullSemaphore,1,NULL);
}
return 0;
}
ltc_mouse 2009-03-14
  • 打赏
  • 举报
回复
anminliu 2009-03-14
  • 打赏
  • 举报
回复
不好意思,我以为说到这,大家应该知道了。
问题:Practice multi-threaded programming with semaphores of the
Pthread libraries.
程序接受3个命令行参数,生产者线程数量,消费者线程数量和缓冲区大小
生产者和消费者线程都有序列号,从零开始,每次加1,
每个线程打印信息当它启动和存取东西,存取的就是线程序列号
生产者存东西到缓冲区,直到缓冲区满时,切换到消费者线程,消费者消费完,如果生产者
还生产东西,继续存到缓冲区,如果东西不够消费者消费,程序继续等待,
最后检查缓冲区是否剩东西,打印出剩余的数量
chin_chen 2009-03-14
  • 打赏
  • 举报
回复
[Quote=引用楼主 anminliu 的帖子:]
生产者消费者程序,用semaphore控制
程序接受3个命令行参数,生产者数量,此消费者数量和缓冲区大小,运行程序的结果如下:

>./produce_consume 3 1 2
Main program has started ...
Producer 0 has started ...
Producer 0: Deposited item 0.
Producer 1 has started ...
Producer 1: Deposited item 1.
Producer 2 has started ...
Consumer 0 has started ...
Consumer 0: Removed item 0.
Producer 2: Deposit…
[/Quote]
up
dongpy 2009-03-14
  • 打赏
  • 举报
回复
输入的参数不同结果不同!
========================
没错啊,有什么问题?
ztenv 版主 2009-03-14
  • 打赏
  • 举报
回复
生产者/消费者写为两个不同的类,然后再用一个类来提供资源,用多线程实现,可以搞定吧?
arong1234 2009-03-14
  • 打赏
  • 举报
回复
^_^,这种傻事咱不干
[Quote=引用 4 楼 Treazy 的回复:]
问题就是让你根据结果给代码!
[/Quote]
Treazy 2009-03-14
  • 打赏
  • 举报
回复
问题就是让你根据结果给代码!
sagegz 2009-03-14
  • 打赏
  • 举报
回复
What's your problem?
  • 打赏
  • 举报
回复
问题呢?
加载更多回复(1)
(1)创建生产者和消费者线程 在Windows2000环境下,创建一个控制台进程,在此进程中创建n个线程来模拟生产者或者消费者。这些线程的信息由本程序定义的“测试用例文件”中予以指定。 该文件的格式和含义如下: 3 1 P 3 2 P 4 3 C 4 1 4 P 2 5 C 3 1 2 4 第一行说明程序中设置几个临界区,其余每行分别描述了一个生产者或者消费者线程的信息。每一行的各字段间用Tab键隔开。不管是消费者还是生产者,都有一个对应的线程号,即每一行开始字段那个整数。第二个字段用字母P或者C区分是生产者还是消费者。第三个字段表示在进入相应线程后,在进行生产和消费动作前的休眠时间,以秒计时;这样做的目的是可以通过调整这一列参数,控制开始进行生产和消费动作的时间。如果是代表生产者,则该行只有三个字段。如果代表消费者,则该行后边还有若干字段,代表要求消费的产品所对应的生产者的线程号。所以务必确认这些对应的线程号存在并且该线程代表一个生产者。 (2)生产和消费的规则 在按照上述要求创建线程进行相应的读写操作时,还需要符合以下要求: ①共享缓冲区存在空闲空间时,生产者即可使用共享缓冲区。 ②从上边的测试数据文件例子可以看出,某一生产者生产一个产品后,可能不止一个消费者,或者一个消费者多次地请求消费该产品。此时,只有当所有的消费需求都被满足以后,该产品所在的共享缓冲区才可以被释放,并作为空闲空间允许新的生产者使用。 ③每个消费者线程的各个消费需求之间存在先后顺序。例如上述测试用例文件包含一行信息“5 C 3 l 2 4”,可知这代表一个消费者线程,该线程请求消费1,2,4号生产者线程生产的产品。而这种消费是有严格顺序的,消费1号线程产品的请求得到满足后才能继续往下请求2号生产者线程的产品。 ④要求在每个线程发出读写操作申请、开始读写操作和结束读写操作时分别显示提示信息。 (3)相关基础知识 本实验所使用的生产者和消费者模型具有如下特点: 本实验的多个缓冲区不是环形循环的,也不要求按顺序访问。生产者可以把产品放到目前某一个空缓冲区中。 消费者只消费指定生产者的产品。 在测试用例文件中指定了所有的生产和消费的需求,只有当共享缓冲区的数据满足了所有关于它的消费需求后,此共享缓冲区才可以作为空闲空间允许新的生产者使用。 本实验在为生产者分配缓冲区时各生产者间必须互斥,此后各个生产者的具体生产活动可以并发。而消费者之间只有在对同一产品进行消费时才需要互斥,同时它们在消费过程结束时需要判断该消费对象是否已经消费完毕并清除该产品。 Windows用来实现同步和互斥的实体。在Windows中,常见的同步对象有:信号量(Semaphore)、互斥量(Mutex)、临界段(CriticalSection)等。使用这些对象都分为三个步骤,一是创建或者初始化:接着请求该同步对象,随即进入临界区,这一步对应于互斥量的上锁;最后释放该同步对象,这对应于互斥量的解锁。这些同步对象在一个线程中创建,在其他线程中都可以使用,从而实现同步互斥。
#include #include #include #include #include //定义一些常量; //本程序允许的最大临界区数; #define MAX_BUFFER_NUM 10 //秒到微秒的乘法因子; #define INTE_PER_SEC 1000 //本程序允许的生产和消费线程的总数; #define MAX_THREAD_NUM 64 //定义一个结构,记录在测试文件中指定的每一个线程的参数 struct ThreadInfo { int serial; //线程序列号 char entity; //是P还是C double delay; //线程延迟 int thread_request[MAX_THREAD_NUM]; //线程请求队列 int n_request; //请求个数 }; //全局变量的定义 //临界区对象的声明,用于管理缓冲区的互斥访问; int Buffer_Critical[MAX_BUFFER_NUM]; //缓冲区声明,用于存放产品; ThreadInfo Thread_Info[MAX_THREAD_NUM]; //线程信息数组; HANDLE h_Thread[MAX_THREAD_NUM]; //用于存储每个线程句柄的数组; HANDLE empty_semaphore; //一个信号量; HANDLE h_mutex; //一个互斥量; HANDLE h_Semaphore[MAX_THREAD_NUM]; //生产者允许消费者开始消费的信号量; CRITICAL_SECTION PC_Critical[MAX_BUFFER_NUM]; DWORD n_Thread = 0; //实际的线程的数目; DWORD n_Buffer_or_Critical; //实际的缓冲区或者临界区的数目; //生产消费及辅助函数的声明 void Produce(void *p); void Consume(void *p); bool IfInOtherRequest(int); int FindProducePositon(); int FindBufferPosition(int); int main(int argc, char **argv) { //声明所需变量; DWORD wait_for_all; ifstream inFile; if (argc!=2) { printf("Usage:%s \n",argv[0]); return 1; } //初始化缓冲区; for(int i=0;i< MAX_BUFFER_NUM;i++) Buffer_Critical[i] = -1; //初始化每个线程的请求队列; for(int j=0;j> n_Buffer_or_Critical; inFile.get(); // 读取测试文件中的空格,将文件指针指向下一行; printf("输入文件是:\n"); //回显获得的缓冲区的数目信息; printf("%d \n",(int) n_Buffer_or_Critical); //提取每个线程的信息到相应数据结构中; while(inFile){ inFile >> Thread_Info[n_Thread].serial; inFile >> Thread_Info[n_Thread].entity; inFile >> Thread_Info[n_Thread].delay; char c; inFile.get(c); while(c!='\n'&& !inFile.eof()) { inFile>> Thread_Info[n_Thread].thread_request[Thread_Info[n_Thread].n_request++]; inFile.get(c); } n_Thread++; } //回显获得的线程信息,便于确认正确性; for(j=0;j<(int) n_Thread;j++) { int Temp_serial = Thread_Info[j].serial; char Temp_entity = Thread_Info[j].entity; double Temp_delay = Thread_Info[j].delay; printf(" \nthread%2d %c %f ",Temp_serial,Temp_entity,Temp_delay); int Temp_request = Thread_Info[j].n_request; for(int k=0;ksemaphore = CreateSemaphore(NULL,n_Buffer_or_Critical,n_Buffer_or_Critical, "semaphore_for_empty"); h_mutex = CreateMutex(NULL,FALSE,"mutex_for_update"); //下面这个循环用线程的ID号来为相应生产线程的产品读写时所 //使用的同步信号量命名; for(j=0;j<(int)n_Thread;j++) { char lp[]="semaphore_for_produce_"; int temp =j; while(temp){ char c = (char)(temp%10); strcat(lp,&c); temp/=10; } h_Semaphore[j+1]=CreateSemaphore(NULL,0,n_Thread,lp); } //创建生产者和消费者线程; for(i =0;i< (int) n_Thread;i++) { if(Thread_Info[i].entity =='P') h_Thread[i]= CreateThread(NULL,0,(LPTHREAD_START_ROUTINE)(Produce), &(Thread_Info[i]),0,NULL); else h_Thread[i]=CreateThread(NULL,0,(LPTHREAD_START_ROUTINE)(Consume), &(Thread_Info[i]),0,NULL); } //主程序等待各个线程的动作结束; wait_for_all = WaitForMultipleObjects(n_Thread,h_Thread,TRUE,-1); printf(" \n \nALL Producer and consumer have finished their work. \n"); printf("Press any key to quit!\n"); _getch(); return 0; } //确认是否还有对同一产品的消费请求未执行; bool IfInOtherRequest(int req) { for(int i=0;isemaphore,wait_for_mutex,m_delay; int m_serial; //获得本线程的信息; m_serial = ((ThreadInfo*)(p))->serial; m_delay = (DWORD)(((ThreadInfo*)(p))->delay *INTE_PER_SEC); Sleep(m_delay); //开始请求生产 printf("Producer %2d sends the produce require.\n",m_serial); //互斥访问下一个可用于生产的空临界区,实现写写互斥; wait_for_mutex = WaitForSingleObject(h_mutex,-1); //确认有空缓冲区可供生产,同时将空位置数empty减1;用于生产者和消费者的同步; //若没有则一直等待,直到消费者进程释放资源为止; wait_for_semaphore = WaitForSingleObject(empty_semaphore,-1); int ProducePos = FindProducePosition(); ReleaseMutex(h_mutex); //生产者在获得自己的空位置并做上标记后,以下的写操作在生产者之间可以并发; //核心生产步骤中,程序将生产者的ID作为产品编号放入,方便消费者识别; printf("Producer %2d begin to produce at position %2d.\n",m_serial,ProducePos); Buffer_Critical[ProducePos] = m_serial; printf("Producer %2d finish producing :\n ",m_serial); printf(" position[ %2d ]:%3d \n\n" ,ProducePos,Buffer_Critical[ProducePos]); //使生产者写的缓冲区可以被多个消费者使用,实现读写同步; ReleaseSemaphore(h_Semaphore[m_serial],n_Thread,NULL); } //消费者进程 void Consume(void * p) { //局部变量声明; DWORD wait_for_semaphore,m_delay; int m_serial,m_requestNum; //消费者的序列号和请求的数目; int m_thread_request[MAX_THREAD_NUM]; //本消费线程的请求队列; //提取本线程的信息到本地; m_serial = ((ThreadInfo*)(p))->serial; m_delay = (DWORD)(((ThreadInfo*)(p))->delay *INTE_PER_SEC); m_requestNum = ((ThreadInfo *)(p))->n_request; for (int i = 0;ithread_request[i]; Sleep(m_delay); //循环进行所需产品的消费 for(i =0;isemaphore=WaitForSingleObject(h_Semaphore[m_thread_request[i]],-1); //查询所需产品放到缓冲区的号 int BufferPos=FindBufferPosition(m_thread_request[i]); //开始进行具体缓冲区的消费处理,读和读在该缓冲区上仍然是互斥的; //进入临界区后执行消费动作;并在完成此次请求后,通知另外的消费者本处请求已 //经满足;同时如果对应的产品使用完毕,就做相应处理;并给出相应动作的界面提 //示;该相应处理指将相应缓冲区清空,并增加代表空缓冲区的信号量; EnterCriticalSection(&PC_Critical[BufferPos]); printf("Consumer %2d begin to consume %2d product \n",m_serial,m_thread_request[i]); ((ThreadInfo*)(p))->thread_request[i] =-1; if(!IfInOtherRequest(m_thread_request[i])) { Buffer_Critical[BufferPos] = -1; //-1标记缓冲区为空; printf("Consumer %2d finish consuming %2d:\n ",m_serial,m_thread_request[i]); printf(" position[ %2d ]:%3d \n\n" ,BufferPos,Buffer_Critical[BufferPos]); ReleaseSemaphore(empty_semaphore,1,NULL); } else { printf("Consumer %2d finish consuming product %2d\n\n ",m_serial,m_thread_request[i]); } //离开临界区 LeaveCriticalSection(&PC_Critical[BufferPos]); } }

64,666

社区成员

发帖
与我相关
我的任务
社区描述
C++ 语言相关问题讨论,技术干货分享,前沿动态等
c++ 技术论坛(原bbs)
社区管理员
  • C++ 语言社区
  • encoderlee
  • paschen
加入社区
  • 近7日
  • 近30日
  • 至今
社区公告
  1. 请不要发布与C++技术无关的贴子
  2. 请不要发布与技术无关的招聘、广告的帖子
  3. 请尽可能的描述清楚你的问题,如果涉及到代码请尽可能的格式化一下

试试用AI创作助手写篇文章吧