如何在C++和Rust之间共享pthread同步原语?

a8jjtwal  于 2023-02-12  发布在  其他
关注(0)|答案(1)|浏览(183)

我有一个C程序和一个Rust程序,在它们之间,我成功地让它们讨论POSIX共享内存(C++rust)。
我现在要做的是同步它们,我已经设法使用原子布尔创建了一个工作正常但效率低下的原始系统(在like this的基础上创建了AtomicBool)。
然而,我真的希望使用互斥锁/condvar在线程之间进行同步,这就是我遇到的问题。
我似乎能够初始化它的C
端,几乎逐字逐句地遵循这个例子。
我曾试图把它直接翻译成铁 rust :

let raw_shm = shm.get_shm();

    let mut mtx_attrs = MaybeUninit::<nix::libc::pthread_mutexattr_t>::uninit();
    if unsafe { nix::libc::pthread_mutexattr_init(mtx_attrs.as_mut_ptr()) } != 0 {
        panic!("failed to create mtx_attrs");
    };
    let mtx_attrs = unsafe { mtx_attrs.assume_init() };

    let mut cond_attrs = MaybeUninit::<nix::libc::pthread_condattr_t>::uninit();
    if unsafe { nix::libc::pthread_condattr_init(cond_attrs.as_mut_ptr()) } != 0 {
        panic!("failed to create cond_attrs");
    };
    let cond_attrs = unsafe { cond_attrs.assume_init() };

    if unsafe {
        nix::libc::pthread_mutexattr_setpshared(
            &mtx_attrs as *const _ as *mut _,
            PTHREAD_PROCESS_SHARED,
        )
    } != 0
    {
        panic!("failed to set mtx as process shared");
    };

    if unsafe {
        nix::libc::pthread_condattr_setpshared(
            &cond_attrs as *const _ as *mut _,
            PTHREAD_PROCESS_SHARED,
        )
    } != 0
    {
        panic!("failed to set cond as process shared");
    };

    // I know that these offsets are correct, having used `offsetof` on the C++ side
    let mtx_start = unsafe { &raw_shm.as_slice()[3110416] };
    let mtx = unsafe { &*(mtx_start as *const _ as *const pthread_mutex_t) };
    let cond_start = unsafe { &raw_shm.as_slice()[3110440] };
    let cond = unsafe { &*(cond_start as *const _ as *const pthread_mutex_t) };

    if unsafe {
        nix::libc::pthread_mutex_init(&mtx as *const _ as *mut _, &mtx_attrs as *const _ as *mut _)
    } != 0
    {
        panic!("failed to init mtx");
    };
    if unsafe {
        nix::libc::pthread_cond_init(
            &cond as *const _ as *mut _,
            &cond_attrs as *const _ as *mut _,
        )
    } != 0
    {
        panic!("failed to init cond");
    };

所有这些都通过了,返回值为0......到目前为止一切顺利。
我现在可以用两种方法之一测试它:
1.我可以设置这个简单的C++程序,让它停止在condvar中等待:

if (pthread_mutex_lock(&shmp->mutex) != 0)
    throw("Error locking mutex");
if (pthread_cond_wait(&shmp->condition, &shmp->mutex) != 0)
    throw("Error waiting for condition variable");

在铁 rust 中:

let sig = unsafe { nix::libc::pthread_cond_signal(&cond as *const _ as *mut _) };
    dbg!(sig);

尽管返回了0(即成功),但我的C程序没有发布超过condvar;它保持等待,就好像它从未接收到信号一样。
1.我可以设置另一个平凡的C
程序,它在循环中不断地向条件变量发送信号:

for (unsigned int count = 0;; count++) {
        if (pthread_cond_signal(condition) != 0)
            throw("Error")
        // sleep for a bit
    }

然后在铁 rust 里,就像这样:

loop {
        if unsafe { nix::libc::pthread_mutex_lock(&mtx as *const _ as *mut _) } > 0 {
            panic!("Failed to acquire lock")
        };
        if unsafe {
            nix::libc::pthread_cond_wait(&cond as *const _ as *mut _, &mtx as *const _ as *mut _)
        } > 0
        {
            panic!("Failed to acquire lock")
        };
    }

这样做,锁定互斥锁的调用是成功的,但是我得到了一个EINVALpthread_cond_wait定义的here,我似乎无法纠正...
我觉得我快成功了......有什么想法可以让它成功吗?(这主要是一个概念验证)。

toe95027

toe950271#

为了子孙后代,我已经设法让这个工作。
为了阐明程序的架构,有两个二进制文件:一个C和一个Rust。Rust程序使用std::process::Command生成C程序。
为简洁起见,省略了错误处理和导入。

  1. rust程序启动并创建一个新的共享内存块(删除现有的内存块,以确保程序总是以新的状态启动),我使用shared_memory crate来处理细节,它还提供了一些有用的帮助,例如访问指向内存块开头的原始指针。
    共享内存块的结构如下:
#[repr(c)]
struct SharedMemoryLayout {
    ready: std::sync::atomic::AtomicBool,
    mutex: libc::pthread_mutex_t,
    condition: libc::pthread_cond_t,
}

共享内存块初始化为零,因此ready将始终以false开始。

  1. rust程序使用std::process::Command::spawn生成c++程序,然后在循环中等待,直到ready变为true
let proc = Command::new("/path/to/c++/binary").spawn().unwrap();

let ptr: *mut u8 = // pointer to first byte of shared memory block;
let ready: &AtomicBool =  unsafe { &*(ptr as *mut bool as *const AtomicBool) };

loop {
    if ready.load(Ordering::SeqCst) {
        break
    } else {
        thread::sleep(Duration::from_secs(1));
    }
}
  1. C++程序打开共享内存块,并将其mmap到本地地址空间。
struct SharedMemoryLayout
{
    std::atomic_bool ready;
    pthread_mutex_t mutex;
    pthread_cond_t condition;
};

int fd = shm_open("name_of_shared_memory_block", O_RDWR, S_IRUSR | S_IWUSR);
struct SharedMemoryLayout *sync = (SharedMemoryLayout *)mmap(NULL, sizeof(*sync), PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
  1. C++程序继续初始化mutexcondition,然后将内存块标记为就绪。
pthread_mutexattr_t mutex_attributes;
pthread_condattr_t condition_attributes;

pthread_mutexattr_init(&mutex_attributes);
pthread_condattr_init(&condition_attributes);

pthread_mutexattr_setpshared(&mutex_attributes, PTHREAD_PROCESS_SHARED);
pthread_condattr_setpshared(&condition_attributes, PTHREAD_PROCESS_SHARED);

pthread_mutex_init(&sync->mutex, &mutex_attributes);
pthread_cond_init(&sync->condition, &condition_attributes);

pthread_mutexattr_destroy(&mutex_attributes);
pthread_condattr_destroy(&condition_attributes);

std::atomic_bool *ready = &syncp->ready;
ready->store(true);

然后在以下条件下进入循环信令:

for (unsigned int count = 0;; count++) {
    // do something
    sleep(1);
    pthread_cond_signal(&sync->condition);
}

1.现在,rust程序已经从步骤2)中的循环中释放出来了,具体化在步骤4)中初始化的互斥锁和条件。

let mutex = unsafe {ptr.offset(4) as *mut pthread_mutex_t};
let condition = unsafe {ptr.offset(32) as *mut pthread_cond_t};

现在我们可以等待条件,得到c++程序的通知。

loop {
    unsafe {
        pthread_mutex_lock(mutex);
        pthread_cond_wait(condition, mutex);
        pthread_mutex_unlock(mutex);
        
        // Do something
    }
}

相关问题