我需要创建一个管理者-工作者服务器,其中一个工作者一次只处理一个请求。在我想到的代码中,管理者将文件描述符存储在一个队列中;线程检索文件描述符并处理对它的请求。
我的问题是,在当前代码中,在开始时创建了N个线程,它们等待处理N个请求;但是一旦处理了N个请求,clientFun()
函数就不再运行,因为初始线程已经完成了它们的工作。
服务器代码:
#include <stdio.h>
#include <pthread.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <ctype.h>
#include "utils.h"
#include "conn.h"
#define DIM_BUFFER 100
#define N_THREADS 1
struct nodo
{
int fd;
struct nodo *prossimoPtr;
};
typedef struct nodo Nodo;
typedef Nodo *NodoPtr;
static Nodo *testaPtr = NULL;
static Nodo *codaPtr = NULL;
static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t emptyFd = PTHREAD_COND_INITIALIZER;
unsigned int updateMaxSelect(int maxFd, fd_set set);
static void run_server(int pipeW2M_Read);
void push(NodoPtr *testaPtrF, NodoPtr *codaPtrF, int fdF);
static void *clientFun(void *pipeW2M_WriteF);
int pop(NodoPtr *lPtrF);
void gestioneCoda(int maxFdF, int fd, fd_set set);
void stampa(NodoPtr lPtrF);
void cleanup();
int main()
{
cleanup();
atexit(cleanup);
int pipeW2M[2];
SYSCALL(pipe(pipeW2M), "Errore: pipe(pipeW2M)")
pthread_t threadFd[N_THREADS];
for(int i = 0; i < N_THREADS; i++)
{
THREAD_CREATE(&threadFd[i], NULL, &clientFun, (void *) &pipeW2M[WRITE_END], "Thread setId")
}
run_server(pipeW2M[READ_END]);
SYSCALL(close(pipeW2M[WRITE_END]), "Errore: close(pipeW2M[WRITE_END])")
SYSCALL(close(pipeW2M[READ_END]), "Errore: close(pipeW2M[READ_END])")
for(int i = 0; i < N_THREADS; i++)
{
THREAD_JOIN(threadFd[i], NULL, "Impossibile fare la join: seetId");
}
}
static void run_server(int pipeW2M_Read)
{
//Socket di connessione
int fdSkt;
RETURN_SYSCALL(fdSkt, socket(AF_UNIX, SOCK_STREAM, 0), "Errore creazione socket - fdSkt")
struct sockaddr_un sckAddr;
strncpy(sckAddr.sun_path, SOCKNAME, MAXBACKLOG);
sckAddr.sun_family = AF_UNIX;
SYSCALL(bind(fdSkt, (struct sockaddr *) &sckAddr, sizeof(sckAddr)), "Errore bind - fdSkt")
SYSCALL(listen(fdSkt, SOMAXCONN), "Errore listen - fdSkt")
//Massimo fd attivo
int maxFd = fdSkt;
//Inizializzazione set
fd_set set, readSet;
FD_ZERO(&set);
FD_SET(fdSkt, &set); //FD_SET imposta a 1 il bit corrispondente a fdSkt
FD_SET(pipeW2M_Read, &set);
if(pipeW2M_Read > maxFd)
{
maxFd = pipeW2M_Read;
}
int fdSkt_accept;
while(1)
{
readSet = set;
SYSCALL(select(maxFd + 1, &readSet, NULL, NULL, NULL), "select(fd_num + 1, &rdset, NULL, NULL, NULL)")
for(int i = 0; i <= maxFd; i++)
{
if (FD_ISSET(i, &readSet))
{
if (i == fdSkt)
{
RETURN_SYSCALL(fdSkt_accept, accept(fdSkt, NULL, 0), "fdSkt_accept = accept(fdSkt, NULL, 0)")
FD_SET(fdSkt_accept, &set);
if (fdSkt_accept > maxFd) {
maxFd = fdSkt_accept;
}
continue;
}
if(i == pipeW2M_Read)
{
int pipeFdSoccket;
SYSCALL(read(pipeW2M_Read, &pipeFdSoccket, sizeof(int)), "Errore")
printf("%d\n", pipeFdSoccket);
FD_SET(pipeFdSoccket, &set);
if(pipeFdSoccket > maxFd)
maxFd = pipeFdSoccket;
continue;
}
gestioneCoda(maxFd, i, set);
}
}
}
SYSCALL(close(fdSkt), "Errore close - fdSkt")
}
static void *clientFun(void *pipeW2M_WriteF)
{
puts("Entro");
int pipeW2M_Write = *((int *) pipeW2M_WriteF);
LOCK(&mutex)
while (testaPtr == NULL)
{
WAIT(&emptyFd, &mutex)
}
int fdAccept = pop(&testaPtr);
printf("Fd in thread: %d\n", fdAccept);
UNLOCK(&mutex)
char buffer[DIM_BUFFER];
memset(buffer, '\0', DIM_BUFFER);
int lenghtRead;
RETURN_SYSCALL(lenghtRead, read(fdAccept, buffer, DIM_BUFFER), "Errore: read(fdSkt_com, buffer, DIM_BUFFER)")
if(lenghtRead == 0)
{
SYSCALL(write(pipeW2M_Write, &fdAccept, sizeof(int)), "Erroer: write(pipeW2M_Write, &fdAccept, sizeof(int))")
return NULL;
}
//lenghtRead comprende conta tutti i caretteri letti (compreso il '\0' se è presente)
for(int i = 0; i < lenghtRead-1; i++)
{
buffer[i] = toupper((unsigned char) buffer[i]);
}
SYSCALL(writen(fdAccept, buffer, lenghtRead), "Errore: writen(fdSkt_com, buffer, lengthBuffer)")
SYSCALL(write(pipeW2M_Write, &fdAccept, sizeof(int)), "Erroer: write(pipeW2M_Write, &fdAccept, sizeof(int))")
puts("Esco");
return NULL;
}
unsigned int updateMaxSelect(int maxFd, fd_set set)
{
for(int i = maxFd - 1; i >= 0; i--)
{
if(FD_ISSET(i, &set))
{
return i;
}
}
return -1;
}
void gestioneCoda(int maxFdF, int fd, fd_set set)
{
LOCK(&mutex)
push(&testaPtr, &codaPtr, fd);
FD_CLR(fd, &set);
if(fd == maxFdF)
maxFdF = updateMaxSelect(fd, set);
SIGNAL(&emptyFd)
UNLOCK(&mutex)
}
void push(NodoPtr *testaPtrF, NodoPtr *codaPtrF, int fdF)
{
NodoPtr nuovoPtr = NULL;
RETURN_NULL_SYSCALL(nuovoPtr, malloc(sizeof(Nodo)), "nuovoPtr = malloc(sizeof(Nodo))")
nuovoPtr->fd = fdF;
nuovoPtr->prossimoPtr = NULL;
if(*testaPtrF == NULL)
{
*testaPtrF = nuovoPtr;
*codaPtrF = nuovoPtr;
}
else
{
(*codaPtrF)->prossimoPtr = nuovoPtr;
*codaPtrF = nuovoPtr;
}
}
int pop(NodoPtr *lPtrF)
{
if(*lPtrF != NULL)
{
int value = (*lPtrF)->fd;
NodoPtr tempPtr = *lPtrF;
*lPtrF = (*lPtrF)->prossimoPtr;
free(tempPtr);
return value;
}
else
{
puts("la lista è vuota");
exit(EXIT_FAILURE);
}
}
void stampa(NodoPtr lPtrF)
{
if(lPtrF != NULL)
{
printf("Parola: %d\n", lPtrF->fd);
stampa(lPtrF->prossimoPtr);
}
else
puts("NULL");
}
void cleanup()
{
unlink(SOCKNAME);
}
客户代码:
#include <stdlib.h>
#include <stdio.h>
#include <unistd.h>
#include "utils.h"
#include "conn.h"
//Librerie per i socket:
#include <sys/socket.h>
#include <sys/un.h>
#define DIM_BUFFER 256
int main()
{
int fdSkt;
//Creazione socket - si usano (quasi) sempre questi parametri
RETURN_SYSCALL(fdSkt, socket(AF_UNIX, SOCK_STREAM, 0), "Errore creazione socket - fdSkt")
//Connect
//sckAddr deve essere uguale a quello del server
struct sockaddr_un sckAddr;
strncpy(sckAddr.sun_path, SOCKNAME, 108);
sckAddr.sun_family = AF_UNIX;
//il socket potrebbe non aver ancora fatto la listen (per via dello scheduler)
//il prof nelle correzioni mette solamente: SYSCALL(connect(fdSkt, (struct sockaddr *) &sckAddr, sizeof(sckAddr)), "")
while(connect(fdSkt, (struct sockaddr *) &sckAddr, sizeof(sckAddr)) == -1)
{
puts("Bloccato");
if(errno != ENOENT)
{
perror("Errore connect - fdSkt");
exit(EXIT_FAILURE);
}
}
while (1)
{
char buffer[DIM_BUFFER];
memset(buffer, '\0', DIM_BUFFER);
SCANF_STRINGA(buffer);
if(strncmp(buffer, "quit", strlen("quit")) == 0) {
break;
}
int lenghtBuffer = strlen(buffer)+1;
SYSCALL(writen(fdSkt, buffer, lenghtBuffer), "Errore: writen(fdSkt, buffer, strlen(buffer)+1)")
SYSCALL(readn(fdSkt, buffer, lenghtBuffer), "Errore: writen(fdSkt, buffer, strlen(buffer)+1)")
printf("%s\n", buffer);
}
SYSCALL(close(fdSkt), "Errore close - fdSkt")
return 0;
}
实用程序. h代码:
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <errno.h>
#define READ_END 0
#define WRITE_END 1
#define RETURN_SYSCALL(r,c,e) if((r=c)==-1) { perror(e);exit(errno); }
#define SYSCALL(c,e) if(c==-1) { perror(e);exit(errno);}
#define THREAD_CREATE(a, b, c, d, text) if(pthread_create(a, b, c, d) != 0) { perror(text);exit(EXIT_FAILURE);}
#define THREAD_JOIN(a, b, text) if(pthread_join(a, b) != 0) { perror(text);exit(EXIT_FAILURE);}
//usare con le funzioni che ritornano NULL quando falliscono e di cui si vuole memorizzare il valore di ritorno (es: fopen)
#define RETURN_NULL_SYSCALL(retrunVar, fun, text) if((retrunVar=fun) == NULL) { perror(text);exit(errno); }
//usare per le syscall che quando falliscono ritornano un valore != 0
#define SYSCALL_ZERO(syscall, text) if(syscall != 0) {perror(text);exit(errno);}
#define LOCK(l) \
if (pthread_mutex_lock(l) != 0) \
{ \
fprintf(stderr, "ERRORE FATALE lock\n"); \
pthread_exit((void*)EXIT_FAILURE); \
}
#define UNLOCK(l) \
if (pthread_mutex_unlock(l) != 0) \
{ \
fprintf(stderr, "ERRORE FATALE unlock\n"); \
pthread_exit((void*)EXIT_FAILURE); \
}
#define SIGNAL(c) \
if (pthread_cond_signal(c) != 0) \
{ \
fprintf(stderr, "ERRORE FATALE signal\n"); \
pthread_exit((void*)EXIT_FAILURE); \
}
#define WAIT(c, l) \
if (pthread_cond_wait(c,l) != 0) \
{ \
fprintf(stderr, "ERRORE FATALE wait\n"); \
pthread_exit((void*)EXIT_FAILURE); \
}
#define SCANF_STRINGA(stringa) \
if(scanf("%s", stringa) == 0) \
{ \
perror("Impossibile leggere la stringa"); \
exit(EXIT_FAILURE); \
}
连接代码:
#if !defined(CONN_H)
#define CONN_H
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/uio.h>
#include <sys/un.h>
#include <errno.h>
#include <stdio.h>
#include <stdlib.h>
#define SOCKNAME "./cs_sock"
#define MAXBACKLOG 108
/** Evita letture parziali
*
* \retval -1 errore (errno settato)
* \retval 0 se durante la lettura da fd leggo EOF
* \retval size se termina con successo
*/
static inline int readn(long fd, void *buf, size_t size) {
size_t left = size;
int r;
char *bufptr = (char*)buf;
while(left>0) {
if ((r=read((int)fd ,bufptr,left)) == -1) {
if (errno == EINTR) continue;
return -1;
}
if (r == 0) return 0; // EOF
left -= r;
bufptr += r;
}
return size;
}
/** Evita scritture parziali
*
* \retval -1 errore (errno settato)
* \retval 0 se durante la scrittura la write ritorna 0
* \retval 1 se la scrittura termina con successo
*/
static inline int writen(long fd, void *buf, size_t size) {
size_t left = size;
int r;
char *bufptr = (char*)buf;
while(left>0) {
if ((r=write((int)fd ,bufptr,left)) == -1) {
if (errno == EINTR) continue;
return -1;
}
if (r == 0) return 0;
left -= r;
bufptr += r;
}
return 1;
}
#endif /* CONN_H */
2条答案
按热度按时间ftf50wuq1#
线程通常不会运行到完成;相反,它们在执行循环中等待一些触发事件,如消息或信号量的到达。2典型线程函数的模式是:
线程函数的签名取决于线程库/OS;以上对于pthreads来说是正常的。2同样,一个线程可以使用
while(1)
或for(;;)
循环无限期地运行。3确切的终止机制(例如,我的例子中的terminate
;但这不是必需的。线程循环通常是状态机的实现。
当然,您可以有一个运行到完成的线程,但在这种情况下,您必须为每个事件创建一个新线程-这样做效率相当低。
阻塞调用可以像周期性任务的
sleep()
调用一样简单,也可以在一个任务的多个地方阻塞,但是这样做会使编码和调试变得更加复杂,状态机模式通常是一个更好的解决方案。t3psigkw2#
既然Clifford已经给出了一般信息,我将讨论您的程序中的具体问题,并提出适当的修改建议。
clientFun
不仅无法处理多个(连续的)客户端连接,而且无法处理来自一个连接的多个单独的消息(在多个线程中处理来自一个连接的消息是不明智的,因为客户端无论如何都不是为发送并行请求而设计的);作为第一步,我们需要围绕一个消息的处理的循环,即围绕行放置一个
for (; ; ) { … }
循环,并用break;
替换其中的return NULL;
。for (; ; ) { … }
循环run_server
中进行调整。服务器不再需要监视接受的套接字描述符,因此更改到
并删除后面对
gestioneCoda
的调用,这样接受的套接字就立即传递给工作线程。到