如何使用atomic_compare_exchange_weak解决生产者-消费者问题?

ct3nt3jp  于 2023-05-16  发布在  其他
关注(0)|答案(2)|浏览(140)

我正在用C实现“生产者-消费者问题”,它解决了多线程时冗余生产、冗余消费和消费未生产项的同步问题。

bool expected = false; 
while (!atomic_compare_exchange_weak(&lock, &expected, true))
    expected = false;

为了防止进程的重复访问,使用了 atomic_compare_exchange_weak
我知道当一个进程进入临界区时,atomic_compare_exchange_weak 可以阻止另一个进程的访问。但是,进程的重复访问仍在继续。我该如何解决这个问题?下面是我的生产者和消费者代码。

生产者代码

void *producer(void *arg) {
    int i = *(int *)arg;
    int item;
    
    while(alive) {

        while(counter >= BUFSIZE);
        
        expected_p = false;
        while (!atomic_compare_exchange_weak(&lock_p, &expected_p, true))
            expected_p = false;
        
        item = next_item++;
        
        buffer[in] = item;
        in = (in + 1) % BUFSIZE;
        counter++;  
            
        if (task_log[item][0] == -1) {
            task_log[item][0] = i;
            produced++;
            lock_p = false;
        }
        else {
            printf("ERROR\n");
            continue;
        } 
        printf("<P%d,%d>", i, item); 
    }
    pthread_exit(NULL); 
}

消费者代码

void *consumer(void *arg)
{
    int i = *(int *)arg;
    int item;

    while (alive) {

        while (counter <= 0);

        expected_c = false;
        while (!atomic_compare_exchange_weak(&lock_c, &expected_c, true)) 
            expected_c = false;
        
        item = buffer[out];
        out = (out + 1) % BUFSIZE;    
        counter--; 
        
        if (task_log[item][0] == -1) {
            printf("ERROR\n");
            continue;
        }
        else if (task_log[item][1] == -1) {
            task_log[item][1] = i;
            consumed++;
            lock_c = false; 
        }
        else {
            printf("ERROR\n");
            continue;
        }
        printf("<C%d,%d>\n", i, item);   
    }
    pthread_exit(NULL);
}

全局值

int buffer[BUFSIZE];
int in = 0;
int out = 0;
int counter = 0;
int next_item = 0;

bool expected_p = false;
bool expected_c = false;
atomic_bool lock_p = false;
atomic_bool lock_c = false;
mqkwyuun

mqkwyuun1#

只需使用pthread_cond和pthread_mutex,而不是尝试使用原子做一些疯狂的事情。下面的代码使用一个真实的的锁可能比原子原语有更多的开销,但是它更安全,更易于维护。此外,用条件变量代替自旋循环将显著提高两个线程的性能--克服了使用真实的锁的开销。
声明一个互斥体和一对条件变量:

pthread_mutex_t mut = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond_can_write, cand_can_read;

然后启动两个线程并修改如下:

void *producer(void *arg) {

    int i = *(int *)arg;
    int item;
    
    while(alive) {

        pthread_mutex_lock(&mut);  // TAKE THE LOCK

        // wait for counter to go below BUFSIZE
        while (counter >= BUFSIZE) {
            // atomically release lock and wait for consumer thread to signal
            pthread_cond_wait(&cond_can_write, &mut);

           // when pthread_cond_wait returns, this thread has reacquired the lock
        }

        // DO YOUR PRODUCE LOGIC HERE        

        pthread_mutex_unlock(&mut); // EXIT THE LOCK

        // signal to consumer that it can read
        pthread_cond_broadcast(&cond_can_read);
    }
    pthread_exit(NULL); 
}

void *consumer(void *arg)
{
    int i = *(int *)arg;
    int item;

    while (alive) {

        pthread_mutex_lock(&mut);   // TAKE THE LOCK

        // Wait for the counter to go above 0
        while (counter <= 0) {
            // atomically release lock and wait for producer thread to signal
            pthread_cond_wait(&cond_can_read, &mut);

            // when pthread_cond_wait returns, this thread has reacquired the lock
        }

        // DO YOUR CONSUME LOGIC HERE

        pthread_mutex_unlock(&mut);  // EXIT THE LOCK

        // signal to producer that it can write again
        pthread_cond_broadcast(&cond_write);

    }
    pthread_exit(NULL);
}
h7appiyu

h7appiyu2#

@seiblie为你的问题提供了好得多的答案;至于为什么你发布的东西不起作用……
您正在使用两个不同的自旋锁(lock_c,lock_p);然而,counter 经由两个路径被修改,因此是不可预测的。
而且你在 counter 上的旋转也是错误的。buffer[out]和counter之间存在依赖关系,但您的程序允许它们无序更新。也就是说,counter 可能告诉您在 buffer[out] 处有一些内容要读取,但完全有可能 counterbuffer[out] 之前已经更新;因此您将在那里找到陈旧数据。

相关问题