Párhuzamos programozást támogató nyelvi eszközök összehasonlítása: Termelô - fogyasztó probléma


4. Termelô - fogyasztó probléma

Ez a megoldás a termelõ-fogyasztó problémát a C++ nyelv objektum orientált, és a PVM könyvtár párhuzamos lehetõségeit kihasználva oldja meg.

A megoldás hasonlít a CC++ nyelv által adható példára, de itt a részleteket kézzel kellett elõállítani. Ehhez a módszerhez képest a CC++ nyelv alapvetõen a folyamatszálak kezelésével mutat többet.

Következô hardware/software környezetekben teszteltem:

4.1 Az ötlet

Elsô lépésben a PVM C nyelvû felületét alakítottam át, hogy objektum orientált elvárásoknak megfeleljen.

Az ötlet alapja tulajdonképpen az, hogy a C++ nyelvben megírt objektumok ne csak egy önálló memóriaterületet kapjanak a programon belül, és a függvényekként definiált belépési pontokon kívül is kifejthessenek valamilyen tevékenységet.

A cél az volt, hogy egy C++-beli objektum egy önálló folyamatként, önálló tárterülettel fusson, és a külvilággal csak az elôre definiált üzenetekkel kommunikáljon. A megoldásban egy ilyen objektum egy önálló program lett, mely így teljesíti az összes elvárást.

A PVM rendszer az objektumok közötti üzenetekkel megvalósított kommunikáció megszervezésére, valamint egy több processzorból álló - valóban párhuzamos - rendszer kihasználására kellett.

A PVM rendszerbeli lehetôségek a dinamikus programindítást is megengedik, így a C++ program szemszögébôl dinamikusan lehet létrehozni, illetve megszüntetni ilyen objektum/programokat.

4.2 Az alapok

Az leírt program nem csak egy egyedi megoldás, hanem egy módszer alapja is lehet. A módszer lényegében arra szolgál, hogy egy hagyományos C++ osztályt átalakítsunk objektum/programmá.

Az átalakítás alapja a PvmCore osztály, mely elrejti a PVM programindítási függvényeit, illetve a kommunikáció egy részét.

Létrehozásának célja az volt, hogy egy absztrakt osztályt definiáljak, amelyet új osztályok kényelmesebb létrehozására lehet majd használni.

Az osztálynak két fô megvalósítása van:

// PvmCoreC.h
#ifndef _PVMCORE_H_
#define _PVMCORE_H_

#include "pvm3.h"
#include "debug.h"

class PvmCore
{
 protected:
  PvmCore(char* ObjectCodeName);
  PvmCore(const PvmCore&); 		// calls inc_ref()
  PvmCore(char* ObjectCodeName, int);	
		// sets object's tid and name
  		// and calls inc_ref()
  PvmCore& operator=(const PvmCore&);
 public:
  int GetId(void) const { return object_tid; }
 protected:
  virtual ~PvmCore(void);		// calls dec_ref()
 private:
  int object_tid;
  void inc_ref(void);
  void dec_ref(void);
  void sync_call(int);
  char *codeName;
};
#endif

// PvmCoreCS.h
#define MSG_PVMCORE_INC_REF	30000
#define MSG_PVMCORE_DEC_REF	30001

// PvmCoreC.C
#include "PvmCoreC.h"
#include "PvmCoreCS.h"
#include <stdlib.h>
#include <string.h>

PvmCore::PvmCore(char *ObjectCodeName)
  {
    int ret;

    codeName = new char[strlen(ObjectCodeName)+1];
    strcpy(codeName, ObjectCodeName);

    DPRINT(("%s->PvmCore::PvmCore() spawns %s\n", 
		codeName, codeName));
    pvm_mytid();
    ret = pvm_spawn(codeName, NULL, PvmTaskDefault,
				 "", 1, &object_tid);
    if(ret!=1)
      {
		DPRINT(("%s->PvmCore::PvmCore() spawn "
			"was not succesfull\n", codeName));
		exit(-1);
      }
    inc_ref();
  }

PvmCore::PvmCore(const PvmCore& pc)
  {
    codeName = new char[strlen(pc.codeName)+1];
    strcpy(codeName, pc.codeName);

    DPRINT(("%s->PvmCore::PvmCore(PvmCore&)\n", codeName));
    object_tid = pc.GetId();
    inc_ref();
  }

PvmCore::PvmCore(char *CodeName, int ObjectTid)
  {
    codeName = new char[strlen(CodeName)+1];
    strcpy(codeName, CodeName);

    DPRINT(("%s->PvmCore::PvmCore(PvmCore&)\n", codeName));
    object_tid = ObjectTid;
    inc_ref();
  }

PvmCore& PvmCore::operator=(const PvmCore& pc)
  {
    char *temp = codeName;
    codeName = new char[strlen(pc.codeName)+1];
    strcpy(codeName, pc.codeName);

    DPRINT(("%s->PvmCore& PvmCore::operator=(const PvmCore&)\n", codeName));
    if(pvm_pstat(object_tid)==PvmOk)
      dec_ref();
    object_tid = pc.GetId();
    inc_ref();

    delete [] temp;
    return *this;
  }

PvmCore::~PvmCore(void)
  {
    DPRINT(("%s->PvmCore::~PvmCore()\n", codeName));
    dec_ref();
    delete [] codeName;
  }
 
void PvmCore::sync_call(int call_type)
  {
   int sbuf, rbuf;

   if( (sbuf=pvm_mkbuf(PvmDataDefault)) < 0) 	pvm_perror("sync_call");
   if( (sbuf=pvm_setsbuf(sbuf)) < 0)         	pvm_perror("sync_call");
   if( pvm_send(object_tid, call_type) < 0)  	pvm_perror("sync_call");
   if( (rbuf=pvm_setrbuf(0)) < 0)            	pvm_perror("sync_call");
   if( pvm_recv(object_tid, call_type) < 0)  	pvm_perror("sync_call");

   pvm_freebuf(pvm_setsbuf(sbuf));
   pvm_freebuf(pvm_setrbuf(rbuf));
  }

void PvmCore::inc_ref(void)
  {
    DPRINT(("%s->void PvmCore::inc_ref()\n", codeName));
    sync_call(MSG_PVMCORE_INC_REF);
  }

void PvmCore::dec_ref(void)
  {
    DPRINT(("%s->void PvmCore::dec_ref()\n", codeName));
    sync_call(MSG_PVMCORE_DEC_REF);
  }

// PvmCoreS.h
#ifndef _PVMCORE_S_H_
#define _PVMCORE_S_H_

#include "pvm3.h"
#include "debug.h"

class PvmCoreS
{
  public:
    PvmCoreS(void);
    void Run(void);
    virtual ~PvmCoreS(void);
    virtual void HandleMessage(int, int);
    virtual void IdleProc(void);
    void Detach(void);
    void Synchronize(void);
  private:
    int reference_counter;
    int is_entity;
};
#endif
// PvmCoreS.C
#include "PvmCoreS.h"
#include "PvmCoreCS.h"

#define	True	(1==1)
#define False	!True

PvmCoreS::PvmCoreS(void)
  {
   DPRINT(("PvmCoreS::PvmCoreS()\n"));
   pvm_mytid();
   reference_counter=0;
   is_entity = False;
  }

PvmCoreS::~PvmCoreS(void)
  {
   DPRINT(("PvmCoreS::~PvmCoreS(void)\n"));
   if(reference_counter!=0)
     DPRINT(("PvmCore::~PvmCore(), reference_counter=%d!!!\n", 
     		reference_counter));
   pvm_exit();
  }

void PvmCoreS::Run(void)
  {
    int bufid, reply_tid, reply_type;

    bufid = pvm_recv(-1, MSG_PVMCORE_INC_REF);
    pvm_bufinfo(bufid, NULL, &reply_type, &reply_tid);
    pvm_initsend(PvmDataDefault);
    HandleMessage(bufid, MSG_PVMCORE_INC_REF);
    pvm_send(reply_tid, reply_type);

    while(reference_counter!=0)
      {
        if(is_entity)
	  bufid = pvm_nrecv(-1, -1);
	else
          bufid = pvm_recv(-1, -1);
	
	if(bufid<=0)
	  IdleProc();
	else
	  {
	   pvm_bufinfo(bufid, NULL, &reply_type, &reply_tid);
	   pvm_initsend(PvmDataDefault);

	   HandleMessage(bufid, reply_type);

	   pvm_send(reply_tid, reply_type);
	  }
      } // end-while
  }


void PvmCoreS::HandleMessage(int bufid, int msg_type)
  {
    switch(msg_type)
      {
       case MSG_PVMCORE_INC_REF:
       		reference_counter++;
		DPRINT(("PvmCoreS::INC_REF => "
			"reference_counter==%d\n",reference_counter));
		break;
       case MSG_PVMCORE_DEC_REF:
       		reference_counter--;
		DPRINT(("PvmCoreS::DEC_REF => "
			"reference_counter==%d\n",reference_counter));
		break;
       default: 
      		DPRINT(("PvmCoreS::HandleMessage(): "
					"UNKNOWN MESSAGE\n"));
      }
  }


void PvmCoreS::Detach(void)
  {
   is_entity = True;
   DPRINT(("PvmCoreS::Detach()\n"));
  }

void PvmCoreS::Synchronize(void)
  {
   is_entity = False;
   DPRINT(("PvmCoreS::Synchronize()\n"));
  }

void PvmCoreS::IdleProc(void)
  {
   ;
  }

Az interface konstruktorának végrehajtásakor létrejön a munkát végzô program. Ennek nevét az interface az ObjectCodeName paraméterben kapja meg. A kód elindítása a PVM feladata. Ha több processzor van a rendszerben, akkor a munkavégzô kód lehet, hogy egy másik processzoron fog elindulni (azt is elô lehet írni, hogy biztosan másikon induljon el), ami valódi párhuzamos mûködéshez vezet.

Az interface objektum másolásakor nem indulnak újabb programok, hanem a már meglévôben fog egy referencia számláló nôni, jelezve, hogy több helyrôl kapcsolódnak már rá.

Az interface objektum megszûnésekor a fent említett számláló csökkenni fog, és amikor eléri a nullát - tehát már senki sem kíváncsi rá - meg fog szûnni a kiszolgáló program is.

A kiszolgáló program kétféle üzemmódban futhat:

4.3 Átalakítás

Egy egyszerû példában, a sor típus átalakításában mutatom be a módszert.

Elsô lépésben létrehoztam a hagyományos Queue típust, mely a szokásos mûveleteket valósítja meg egész számokon.

A típus átalakítás után két osztályra bomlik szét:

// Queue.h
#ifndef _QUEUE_H_
#define _QUEUE_H_

class Queue {
  public:
    Queue(int size=30);
    Queue(const Queue&);
    Queue& operator=(const Queue&);
    ~Queue(void);
    void Resize(int size);     	// if Empty()
    int Size(void);	
    int Empty(void);
    int Full(void);
    void Put(int item);		// if not Full()
    int Get(void);		// if not Empty()
  protected:
    int *queue;
    int qsize;
    int qhead;
    int qtail;
    int qmax;
};
#endif
// Queue.C
#include "Queue.h"
#include <string.h>

Queue::Queue(int size)
  {
   qmax = (size<1)?1:size;
   queue = new int[qmax];
   qsize = 0;
   qhead = 0;
   qtail = 0;
  }

Queue::Queue(const Queue& q)
  {
   int *temp = queue;
   qmax = q.qmax;
   qsize = q.qsize;
   qhead = q.qhead;
   qtail = q.qtail;
   queue = new int[qmax];
   memcpy(queue, q.queue, sizeof(int) * qmax);
   delete [] temp;
  }

Queue& Queue::operator=(const Queue& q)
  {
   int *temp = queue;
   qmax = q.qmax;
   qsize = q.qsize;
   qhead = q.qhead;
   qtail = q.qtail;
   queue = new int[qmax];
   memcpy(queue, q.queue, sizeof(int) * qmax);
   delete [] temp;

   return *this;
  }

Queue::~Queue(void)
  {
   delete [] queue;
  }

void Queue::Resize(int size)     	// if Empty()
  {
   if(!Empty()) return;
   delete [] queue;

   qmax = (size<1)?1:size;
   queue = new int[qmax];
   qsize = 0;
   qhead = 0;
   qtail = 0;
  }

int Queue::Size(void)
  {
   return qsize;
  }

int Queue::Empty(void)
  {
   return qsize==0;
  }

int Queue::Full(void)
  {
   return qsize==qmax;
  }

void Queue::Put(int item)		// if not Full()
  {
   if(Full()) return;

   queue[qtail] = item;
   qtail = (qtail + 1) % qmax;
   qsize++;
  }

int Queue::Get(void)		// if not Empty()
  {
   if(Empty()) return -32768;

   int item = queue[qhead];
   qhead = (qhead + 1) % qmax;
   qsize--;

   return item;
  }
// QueueC.h
// client part

#ifndef _QUEUEC_H_
#define _QUEUEC_H_

#include "PvmCoreC.h"

class Queue : public PvmCore {
  public:
    Queue(int size=30);
    Queue(const Queue&);	// only a new reference 
    Queue(int tid, int dummy);	// only a new reference
    Queue& operator=(const Queue&);
    virtual ~Queue(void);
    void Resize(int size);     	
    int Size(void);	
    int Empty(void);
    int Full(void);
    void Put(int item);		// if not Full()
    int Get(void);		// if not Empty()
};
#endif

// QueueCS.h
#ifndef _QUEUECS_H_
#define _QUEUECS_H_

#define Queue__Queue		1
#define Queue__Queue_copy	2
#define Queue__Queue_request	3
#define Queue__Queue_answer	4
#define Queue___Queue		5
#define Queue__Resize		6
#define Queue__Size		7
#define Queue__Empty		8
#define Queue__Full		9
#define Queue__Put		10
#define Queue__Get		11

#endif

// QueueC.C
// client part

#include "QueueC.h"
#include "QueueCS.h"

#define OBJECT_NAME	"QueueS"

Queue::Queue(int size)
  : PvmCore(OBJECT_NAME)
  {
   pvm_packf("%+%d", PvmDataDefault, size);
   pvm_send(GetId(), Queue__Queue);
   pvm_recv(GetId(), Queue__Queue);
  }

Queue::Queue(const Queue& q)
  : PvmCore(q)
  {
   ;  // it will be only a new reference, not a new object instance
  }

Queue::Queue(int tid, int dummy)
  : PvmCore(OBJECT_NAME, tid)
  {
   ;  // it will be only a new reference, not a new object instance
  }

Queue& Queue::operator=(const Queue& q)
  {
   pvm_packf("%+%d", PvmDataDefault, q.GetId());
   pvm_send(GetId(), Queue__Queue_copy);
   pvm_recv(GetId(), Queue__Queue_copy);

   return *this;
  }

Queue::~Queue(void)
  {
   pvm_packf("%+", PvmDataDefault);
   pvm_send(GetId(), Queue___Queue);
   pvm_recv(GetId(), Queue___Queue); 
  }

void Queue::Resize(int size)     	// if Empty()
  {
   pvm_packf("%+%d", PvmDataDefault, size);
   pvm_send(GetId(), Queue__Resize);
   pvm_recv(GetId(), Queue__Resize);
  }

int Queue::Size(void)
  {
   int qsize;

   pvm_packf("%+", PvmDataDefault);
   pvm_send(GetId(), Queue__Size);
   pvm_recv(GetId(), Queue__Size);
   pvm_unpackf("%d", &qsize);

   return qsize;
  }

int Queue::Empty(void)
  {
   int empty;

   pvm_packf("%+", PvmDataDefault);
   pvm_send(GetId(), Queue__Empty);
   pvm_recv(GetId(), Queue__Empty);
   pvm_unpackf("%d", &empty);

   return empty;
  }

int Queue::Full(void)
  {
   int full;

   pvm_packf("%+", PvmDataDefault);
   pvm_send(GetId(), Queue__Full);
   pvm_recv(GetId(), Queue__Full);
   pvm_unpackf("%d", &full);

   return full;
  }

void Queue::Put(int item)		// if not Full()
  {
   pvm_packf("%+%d", PvmDataDefault, item);
   pvm_send(GetId(), Queue__Put);
   pvm_recv(GetId(), Queue__Put);
  }

int Queue::Get(void)		// if not Empty()
  {
   int item;

   pvm_packf("%+", PvmDataDefault);
   pvm_send(GetId(), Queue__Get);
   pvm_recv(GetId(), Queue__Get);
   pvm_unpackf("%d", &item);

   return item;
  }
// QueueS.C
// server part

#include "PvmCoreS.h"
#include "Queue.h"
#include "QueueCS.h"

class QueueS : public Queue, public PvmCoreS {
  public:
    QueueS(void);
  protected:
    virtual void HandleMessage(int, int);
};

static QueueS *main_obj_ptr;

int sieve(int bufid, int tid, int msg_type)
  {
   int mtype, cc;
   if((cc=pvm_bufinfo(bufid, NULL, &mtype, NULL)) < 0)
     return cc;
   
//   DPRINT(("message type: %d\n", mtype));
   if(mtype==Queue__Put && main_obj_ptr->Full()) return 0;
   if(mtype==Queue__Get && main_obj_ptr->Empty()) return 0;
   return 1;
  }

QueueS::QueueS(void)
  : Queue(10), PvmCoreS()
  {
   // we need this to avoid the under and overflow
   pvm_recvf(sieve);
  }

void QueueS::HandleMessage(int bufid, int msg_type)
  {
   switch(msg_type)
     {
	case Queue__Queue:
	  {
	   int size;
	   pvm_unpackf("%d", &size);
	   Resize(size);
	  }
	  break;
	case Queue__Queue_copy:
	  {
	   int partner_tid;
	   pvm_unpackf("%d", &partner_tid);
	   if(partner_tid == pvm_mytid())
	     break; 

	   pvm_packf("%+", PvmDataDefault);
	   if(pvm_send(partner_tid, Queue__Queue_request)!=PvmOk)
	     break; // Error!!!
	   pvm_recv(partner_tid, Queue__Queue_request);
	   
	   delete [] queue;
	   pvm_unpackf("%d %d %d %d", 
		&qmax, &qsize, &qhead, &qtail);
	   queue = new int[qmax];
	   pvm_upkint(queue, qmax, 1);

	   pvm_packf("%+", PvmDataDefault);
	  }
	  break;
	case Queue__Queue_request:
	  {
	   pvm_packf("%d %d %d %d", qmax, qsize, qhead, qtail);
	   pvm_pkint(queue, qmax, 1);
	  }
	  break;
	case Queue___Queue:
	  break;
	case Queue__Resize:
	  {
	   int size;
	   pvm_unpackf("%d", &size);
	   Resize(size);
	  }
	  break;
	case Queue__Size:
	  {
	   pvm_packf("%d", Size());
	  }
	  break;
	case Queue__Empty:
	  {
	   pvm_packf("%d", Empty());
	  }
	  break;
	case Queue__Full:
	  {
	   pvm_packf("%d", Full());
	  }
	  break;
	case Queue__Put:
	  {
	   int item;
	   pvm_unpackf("%d", &item);
	   Put(item);
	  }
	  break;
	case Queue__Get:
	  {
	   pvm_packf("%d", Get());
	  }
	  break;
       default:
         PvmCoreS::HandleMessage(bufid, msg_type);
     } // end-switch
  }

int main(int argc, char *argv[])
{
  QueueS main_obj;

  main_obj_ptr = &main_obj;

  main_obj.Run();
} // end-main

Az interface rész feladata csak a típus mûveleteinél a paraméterek becsomagolása és elküldése, illetve az eredmények fogadása, kinyitása és visszaadása. Ez a kódrész nagyon unalmas, és ennek átírása gépies feladat. Ezt a részt egy alkalmas programmal automatizálni is lehetne, ami az osztott osztályok készítését nagyon felgyorsítaná.

A kiszolgáló részben hasonló feladatokat kell megoldani, csak a másik irányban.

Itt érdekességet az egyes belépési pontok, vagy üzenetek fogadása jelenti. Azaz nem lehet mindig minden függvényt végrehajtani, azokhoz feltételeket is lehet kötni.

Eseményorientált típusspecifikáció fogalmait használva a request esemény kezelését a PVM elintézi, azaz fogadja az üzenetet a kiszolgáló program. A megkötést itt az enter eseményre lehet tenni, ahogy azt a specifikáció során is kikötöttük. A feltétel nagyon sokféle lehet, csak néhány megkötés van: · a feltételben nem szerepelhet az aktuális mûvelet paramétere, mert ezt még nem lehet tudni (ehhez fogadni kéne ténylegesen az eseményt) · korábbi eseményekre való hivatkozásnál a programozónak kell megoldania a korábbi esemény tárolását, erre nincs támogatás Az exit esemény itt nincsen kezelve, de ha szükséges be lehet illeszteni a megoldásba.
Az üzenetek szûrését a sieve függvény végzi. Ennek specifikációját a pvm_recvf() függvény leírásánál lehet megnézni.

A kiszolgáló osztály létrehozásában csak a PvmCoreS és a megvalósítani kívánt típusból kell egy új osztályt létrehozni, és a kommunikációs részt megírni. Az egyetlen pont, amire szerintem nem nagyon lehetne automatikus konvertáló programot írni, az az üzenetek szûrése. Ez az eredeti C++ kódban nincs specifikálva, így valamilyen külön módon kéne ezt leírni.

4.4 Új osztályok

A User osztály nem egy hagyományos C++ osztály átirata, hanem a PvmCore lehetôségeit kihasználó, csak több folyamatot futtató rendszerben használható dolog.

// UserC.h
// client part

#ifndef _USERC_H_
#define _USERC_H_

#include "PvmCoreC.h"
#include "QueueC.h"
#include "UserCS.h"

class User : public PvmCore {
 public:
  User(const Queue&, UserType);
  void Run(void);
  void Stop(void);
  virtual ~User(void);
};
#endif

// UserCS.h
#ifndef _USERCS_H_
#define _USERCS_H_

#define User__User	1
#define User__Run	2
#define User__Stop	3
#define User___User	4

typedef enum { Producer, Consumer } UserType;

#endif

// UserC.C
// client part

#include "UserC.h"
#include "UserCS.h"

#define OBJECT_NAME	"UserS"

User::User(const Queue& q, UserType utype)
  :PvmCore(OBJECT_NAME)
  {
   int user_type=(int)utype;
   pvm_packf("%+%d%d", PvmDataDefault, user_type, q.GetId());
   pvm_send(GetId(), User__User); 
   pvm_recv(GetId(), User__User);
  }

void User::Run(void)
  {
   pvm_packf("%+", PvmDataDefault);
   pvm_send(GetId(), User__Run); 
   pvm_recv(GetId(), User__Run);
  }

void User::Stop(void)
  {
   pvm_packf("%+", PvmDataDefault);
   pvm_send(GetId(), User__Stop); 
   pvm_recv(GetId(), User__Stop);
  }

User::~User(void)
  {
   Stop();
  }

// UserS.C
// server part

#include <stdio.h>
#include "QueueC.h"
#include "UserCS.h"
#include "PvmCoreS.h"

class UserS : public PvmCoreS {
 public:
   UserS(void);
   virtual ~UserS(void);
 protected:
   virtual void HandleMessage(int, int);
   virtual void IdleProc(void);
 private:
   Queue *my_queue;
   UserType utype;
};

UserS::UserS(void)
  : PvmCoreS()
  {
   my_queue = NULL;
   utype = Producer;
  }

UserS::~UserS(void)
  {
   delete my_queue;
  }

void UserS::HandleMessage(int bufid, int msg_type)
  {
   switch(msg_type)
     {
      case User__User:
        {
	 int user_type;
	 int qtid;
	 pvm_unpackf("%d %d", &user_type, &qtid);

	 utype=(UserType)user_type;
	 my_queue = new Queue(qtid, 0);
	}
        break;
      case User__Run:
        Detach();
        break;
      case User__Stop:
        Synchronize();
        break;
      case User___User:
        break;
      default:
        PvmCoreS::HandleMessage(bufid, msg_type);
     } // end-switch
  }

void UserS::IdleProc(void)
  {
   static int i=0;
   if(my_queue == NULL) return;

   switch(utype)
     {
      case Producer:
	printf("Producer produced %d\n", i);
        my_queue->Put(i++);
	break;
      case Consumer:
        i=my_queue->Get();
	printf("Consumer consumed %d\n", i);
	break;
     }
  }

int main(int argc, char *argv[])
  {
   UserS main_obj;

   main_obj.Run();
  }

Ez a termelô, és fogyasztó típusok összevont megvalósítása, csak az induláskor derül ki, hogy ez vajon mi is lesz.

Érdekességet itt a Run() és a Stop() osztályfüggvények megvalósítása jelenti, melyekben az interface-hez kapcsolt kiszolgáló program aszinkron, illetve szinkron üzemmódba kapcsol.

4.5 A megoldás

A terelô fogyasztó probléma egy egyszerû szimulációja a ProdCons.C forrásban lehet megtalálni.

// ProdCons.C
// Producer-Consumer problem C++ version

#include <iostream.h>
#include "QueueC.h"
#include "UserC.h"
#include <unistd.h>

int main(int argc, char *argv[])
  {
   Queue theQueue(30);
   User  producer(theQueue, Producer);
   User  consumer(theQueue, Consumer);
   
   producer.Run();
   consumer.Run();
   sleep(1);
   producer.Stop();
   consumer.Stop();
  }

Itt egyetlen osztott sor jön létre, amire rákapcsolódik egy termelô, majd egy fogyasztó. Bizonyos idô elteltével a folyamatok leállnak, és véget ér a program.



P�rhuzamos Programoz�si Eszközök, Frohner Ákos
Hosted by www.Geocities.ws

1