C语言 进程从未命名管道阅读或更新共享内存中的变量时出现问题

yrwegjxp  于 2023-05-16  发布在  其他
关注(0)|答案(1)|浏览(240)

第一件事,谢谢你的阅读和帮助!
这是一个多进程和多线程的C代码它包括创建多个工作进程和一个通过未命名管道与工作进程通信的调度器线程。
下面是代码的解释:

结构

程序定义了两种结构类型:

  • worker_data有两个字段,一个整数id和一个状态字符串。
  • dispatcher_args,字段dispatcher_pipe是指向2D整数数组的指针。

共享内存和信号量

程序设置共享内存和信号量。共享内存用于存储可由多个进程访问的工作进程的数据(worker_data)。信号量用于进程/线程之间的同步。

全局变量

声明了几个全局变量,包括共享内存变量、信号量变量、工作进程的计数以及存储工作进程PID的数组。

Dispatcher函数

分派器是一个无限循环的线程函数。在每次迭代中,它都会检查共享内存中每个工作线程的状态。如果工作者的状态是“就绪”,则调度器将状态更改为“工作”,将消息写入工作者的管道,然后关闭管道。

Worker函数

每个工作进程都运行Worker函数。此函数使用select()等待来自调度程序的数据在其管道上可用。当数据可用时,它读取数据,执行一些工作(未在提供的代码中指定),将其在共享内存中的状态更改为“就绪”,然后无限期地重复该过程。

主要功能

main函数执行以下操作:
1.为工作进程数据分配共享内存。
1.取消链接并打开信号量。
1.初始化dispatcher_args结构并为其分配dispatcher_pipe的地址。
1.使用循环和fork()创建辅助进程。在每次迭代中,它设置一个管道,派生一个新进程,并分配子进程来运行Worker函数。
1.创建运行dispatcher函数的调度器线程。
1.等待调度程序线程完成。
1.通过取消Map共享内存、关闭并取消链接共享内存和信号量以及释放已分配的内存来进行清理。
此代码的目的是Dispatcher Threat与Worker Process通信,Dispatcher将任务分配给Worker Process并等待它们准备好执行新任务。每个工作进程通过改变其在共享内存中的状态来指示其对新任务的准备。
可用于使用共享存储器、管道和信号量来执行任务。
当我用gcc updatestatu.c -o updsts -lpthread -lrt编译程序时,程序执行“第一次循环”,这意味着它对一个Worker Process(例如1)进行读写,并正确地改变了该Worker(1)的状态,但在“第二次循环”中,之前执行任务的Worker(1)似乎已经准备好了,但在Worker Process中它没有得到任何消息,只有当它转到下一个Worker(一个不同的Worker,如Worker 2)时,它才得到消息。我不知道这是与工人进程的阅读或工人进程的状态变化的东西。
下面是我得到的输出(发送和获取消息的“循环”由\n分隔)

Worker 1 Status ready
Worker 1 Status working
message away
message get 1
message get 2
Worker message received from Dispatcher: Hello
JOB FINISHED Worker 1 Status ready

Worker 1 Status ready
Worker 1 Status working
message away
not read Worker 1 Status working

Worker 2 Status ready
Worker 2 Status working
message away
message get 1
message get 2
Worker message received from Dispatcher: Hello
JOB FINISHED Worker 2 Status ready
not read Worker 1 Status working

Worker 2 Status ready
Worker 2 Status working
message away
not read Worker 1 Status working
not read Worker 2 Status working

Worker 3 Status ready
Worker 3 Status working
message away
message get 1
message get 2
Worker message received from Dispatcher: Hello
JOB FINISHED Worker 3 Status ready
not read Worker 1 Status working
not read Worker 2 Status working

Worker 3 Status ready
Worker 3 Status working
message away
not read Worker 1 Status working
not read Worker 2 Status working
not read Worker 3 Status working

Worker 4 Status ready
Worker 4 Status working
message away
message get 1
message get 2
Worker message received from Dispatcher: Hello
JOB FINISHED Worker 4 Status ready
not read Worker 1 Status working
not read Worker 2 Status working
not read Worker 3 Status working

Worker 4 Status ready
Worker 4 Status working
message away
^C

注意,我按了^C,因为正如我在循环中的问题**”for(int i = 0; i < n_worker; i++)“**因为没有Worker就绪(所有5个worker都应该在工作),它会创建一个无限循环。
下面是代码

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <pthread.h>
#include <sys/types.h>
#include <semaphore.h>
#include <fcntl.h>
#include <sys/mman.h>

typedef struct worker_data{
    int id;
    char status[20];
} worker_data;

typedef struct dispatcher_args{
    int** dispatcher_pipe;
} dispatcher_args;

// variables for shared memory
worker_data* workers;
size_t workers_size;

// variables for semaphores
sem_t *sem_log;

int n_workers = 5;
pid_t *pid_workers;

void* dispatcher(void *args) {
    dispatcher_args* arg = args;
    int (*dispatcher_pipe)[2] = (int (*)[2])arg->dispatcher_pipe;

    char msg[100];
    strcpy(msg,"Hello!");
    int j=0;

    printf("THREAD DISPATCHER CREATED\n");

    strcpy(msg,"Hello");

    while(1){

        for (int i = 0; i < n_workers; i++) {
            if (strcmp(workers[i].status, "ready") == 0) {
                printf("\nWorker %d Status %s\n",workers[i].id,workers[i].status);

                // update status of the worker in shared memory
                strcpy(workers[i].status, "working");

                printf("Worker %d Status %s\n",workers[i].id,workers[i].status);
                close(dispatcher_pipe[i][0]); // close the read end of the pipe
                printf("message away\n");
                write(dispatcher_pipe[i][1], &msg, sizeof(msg));
                close(dispatcher_pipe[i][1]); // close the write end of the pipe
                break;
            }
            else {
                printf("not read Worker %d Status %s\n",workers[i].id,workers[i].status);
            }
        }
        //j+=1;
        sleep(2);
    }
}

void Worker(int *worker_pipe, int worker_index) {
    char msg[100];
    sprintf(msg, "WORKER %d READY\n", workers[worker_index].id);

    fd_set set;
    FD_ZERO(&set);
    FD_SET(worker_pipe[0], &set);

    while(1){

        int ready = select(worker_pipe[0]+1, &set, NULL, NULL, NULL);
        if (ready < 0) {
            perror("select");
            exit(1);
        }
        printf("message get 1\n");
        if (FD_ISSET(worker_pipe[0], &set)) {
            printf("message get 2\n");
            // data is available, read from the pipe
            ssize_t nbytes = read(worker_pipe[0], &msg, sizeof(msg));
            if (nbytes < 0) {
                perror("read");
                exit(1);
            }
            printf("Worker message received from Dispatcher: %s\n", msg);
        }
        close(worker_pipe[1]);

        // functions of worker
        // ...

        // change status of the worker in shared memory
        strcpy(workers[worker_index].status, "ready");
        printf("JOB FINISHED Worker %d Status %s\n", workers[worker_index].id, workers[worker_index].status);

        // sleep(3);
        // printf("I am Worker and Im alive\n");
    }
}

int main() {
    pthread_t dispatcher_thread;
    int dispatcher_pipe[n_workers][2];
    workers_size = n_workers * sizeof(worker_data);
    int workers_fd = shm_open("workers", O_CREAT | O_RDWR, 0666);
    ftruncate(workers_fd, workers_size);
    workers = mmap(NULL, workers_size, PROT_READ | PROT_WRITE, MAP_SHARED, workers_fd, 0);

    pid_workers = malloc(n_workers * sizeof(pid_t));

// create semaphores
    sem_unlink("sem_log");
    sem_log = sem_open("sem_log", O_CREAT | O_EXCL, 0700, 1);

    dispatcher_args* disp_args = malloc(sizeof(dispatcher_args));
    disp_args->dispatcher_pipe= (int **) dispatcher_pipe;

// creating worker processes
    for (int i = 0; i < n_workers; i++) {
        worker_data worker;
        worker.id = i+1;
        strcpy(worker.status, "ready");
        workers[i] = worker;

        // create the unnamed pipe for the dispatcher and corresponding association
        if (pipe(dispatcher_pipe[i]) == -1) {
            printf("Error creating dispatcher pipe n1\n");
            exit(0);
        }

        pid_workers[i] = fork();

        if (pid_workers[i] == 0) {
            // worker process
            Worker(dispatcher_pipe[i], i);
            // printf("Hey\n");
        } else if (pid_workers[i] > 0) {
            // parent process
        } else {
            // error forking
            perror("fork");
            exit(EXIT_FAILURE);
        }
    }

// create dispatcher thread
    pthread_create(&dispatcher_thread, NULL, dispatcher, disp_args);

// wait for the threads to finish
    pthread_join(dispatcher_thread, NULL);

// clean up
    munmap(workers, workers_size);
    close(workers_fd);
    shm_unlink("workers");
    sem_close(sem_log);
    sem_unlink("sem_log");
    free(pid_workers);
    free(workers);
    free(disp_args);

    return 0;

}
1l5u6lss

1l5u6lss1#

别关上水管!
1.在主进程/线程中,发送第一条消息后,关闭管道的发送端。所以,你永远不能再发一条信息了。
1.在辅助进程中,接收到第一条消息后,关闭管道的接收端。所以,你永远不会收到另一条消息。
1.我不会使用 string 来表示worker状态。我会使用一个enum。字符串 * 不能 * 自动更新。

  1. master不应该设置worker状态。只有工人才应该这样做。否则,可能会出现争用条件。
    1.您希望在worker设置状态时使用stdatomic.h原语。它们执行缓存更新/刷新/同步。

相关问题