因此,我必须用C语言实现一个队列,用一个系统来保护它,并使它可以通过多个线程访问。为此,我使用了生产者-消费者逻辑:
#include <stdlib.h>
#include <threads.h>
#include <stdbool.h>
#include "queue.h"
#include <stdio.h>
int metidos=1;
int quitados=1;
// circular array
typedef struct _queue {
int size;
int used;
int first;
void **data;
mtx_t * mutex;
cnd_t * full;
cnd_t * empty;
bool terminado;
} _queue;
void q_terminar(queue q){
printf("Entra en q_terminar\n");
mtx_lock(q->mutex);
q->terminado=true;
mtx_unlock(q->mutex);
cnd_broadcast(q->empty);
}
queue q_create(int size) {
queue q = malloc(sizeof(_queue));
q->size = size;
q->used = 0;
q->first = 0;
q->data = malloc(size * sizeof(void *));
q->mutex = malloc(sizeof (mtx_t));
q->full = malloc(sizeof(cnd_t));
q->empty = malloc(sizeof(cnd_t));
q->terminado=false;
mtx_init(q->mutex, mtx_plain);
cnd_init(q->full);
cnd_init(q->empty);
return q;
}
int q_elements(queue q) {
mtx_lock(q->mutex);
int res= q->used;
mtx_unlock(q->mutex);
return res;
}
int q_insert(queue q, void *elem) {
if(q->terminado==true){
return 1;
}
printf("Entra en insert\n");
mtx_lock(q->mutex);
while(q->used == q->size){
printf("ESperando para insertar\n");
cnd_wait(q->full, q->mutex);
printf("Recibiendo señal para insertar\n");
}
//if(q->size == q->used) return -1;
q->data[(q->first + q->used) % q->size] = elem;
q->used++;
printf("Insertado, este es el elemento %d en ser insertado\n",metidos);
printf("En la cola hay %d elementos\n",q->used);
metidos++;
if(q->used == 1){
cnd_broadcast(q->empty);
printf("Enviando señal para despertar a los que borran\n");
}
mtx_unlock(q->mutex);
return 0;
}
void *q_remove(queue q) {
printf("Entra en remove\n");
void *res;
mtx_lock(q->mutex);
if(q->terminado == true){
mtx_unlock(q->mutex);
return NULL;
}
while(q->used ==0 && q->terminado==false){
printf("Esperando para quitar\n");
cnd_wait(q->empty, q->mutex);
printf("Recibiendo señal para quitar\n");
}
if(q->used == 0) {
mtx_unlock(q->mutex);
return NULL;
}
res = q->data[q->first];
q->first = (q->first + 1) % q->size;
q->used--;
cnd_signal(q->full);
printf("Quitado, este es el elemento %d en ser quitado\n", quitados);
printf("En la cola hay %d elementos\n",q->used);
quitados++;
if(q->used == q->size-1){
cnd_broadcast(q->full);
printf("Enviando señal para despertar a los que insertan\n");
}
mtx_unlock(q->mutex);
return res;
}
void q_destroy(queue q) {
mtx_destroy(q->mutex);
cnd_destroy(q->full);
cnd_destroy(q->empty);
free(q->full);
free(q->empty);
free(q->mutex);
free(q->data);
free(q);
}
现在,在主文件中,我必须将函数“sum”对get_entries的调用分离到一个单独的线程中,因此我创建了一个名为get_entries_thread的函数
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <dirent.h>
#include <stdlib.h>
#include <unistd.h>
#include <stdio.h>
#include <string.h>
#include <errno.h>
#include <openssl/evp.h>
#include <threads.h>
#include "options.h"
#include "queue.h"
#define MAX_PATH 1024
#define BLOCK_SIZE (10*1024*1024)
#define MAX_LINE_LENGTH (MAX_PATH * 2)
struct file_md5 {
char *file;
unsigned char *hash;
unsigned int hash_size;
};
struct thread_get_entries_args{
int id;
char *dir;
queue q;
};
struct thread_get_entries_info {
thrd_t id;
struct thread_get_entries_args * entries_args;
cnd_t condicion;
};
thrd_t global;
void get_entries(char *dir, queue q);
void print_hash(struct file_md5 *md5) {
for(int i = 0; i < md5->hash_size; i++) {
printf("%02hhx", md5->hash[i]);
}
}
void read_hash_file(char *file, char *dir, queue q) {
FILE *fp;
char line[MAX_LINE_LENGTH];
char *file_name, *hash;
int hash_len;
if((fp = fopen(file, "r")) == NULL) {
printf("Could not open %s : %s\n", file, strerror(errno));
exit(0);
}
while(fgets(line, MAX_LINE_LENGTH, fp) != NULL) {
char *field_break;
struct file_md5 *md5 = malloc(sizeof(struct file_md5));
if((field_break = strstr(line, ": ")) == NULL) {
printf("Malformed md5 file\n");
exit(0);
}
*field_break = '\0';
file_name = line;
hash = field_break + 2;
hash_len = strlen(hash);
md5->file = malloc(strlen(file_name) + strlen(dir) + 2);
sprintf(md5->file, "%s/%s", dir, file_name);
md5->hash = malloc(hash_len / 2);
md5->hash_size = hash_len / 2;
for(int i = 0; i < hash_len; i+=2)
sscanf(hash + i, "%02hhx", &md5->hash[i / 2]);
printf("Se llama a q_insert\n");
q_insert(q, md5);
}
fclose(fp);
}
void sum_file(struct file_md5 *md5) {
EVP_MD_CTX *mdctx;
int nbytes;
FILE *fp;
char *buf;
if((fp = fopen(md5->file, "r")) == NULL) {
printf("Could not open %s\n", md5->file);
return;
}
buf = malloc(BLOCK_SIZE);
const EVP_MD *md = EVP_get_digestbyname("md5");
mdctx = EVP_MD_CTX_create();
EVP_DigestInit_ex(mdctx, md, NULL);
while((nbytes = fread(buf, 1, BLOCK_SIZE, fp)) >0)
EVP_DigestUpdate(mdctx, buf, nbytes);
md5->hash = malloc(EVP_MAX_MD_SIZE);
EVP_DigestFinal_ex(mdctx, md5->hash, &md5->hash_size);
EVP_MD_CTX_destroy(mdctx);
free(buf);
fclose(fp);
}
void recurse(char *entry, void *arg) {
queue q = * (queue *) arg;
struct stat st;
stat(entry, &st);
if(S_ISDIR(st.st_mode))/////
get_entries(entry, q);
}
void add_files(char *entry, void *arg) {
queue q = * (queue *) arg;
struct stat st;
stat(entry, &st);
if(S_ISREG(st.st_mode)) {
printf("Se llama a q_insert\n");
q_insert(q, strdup(entry));
}
}
void walk_dir(char *dir, void (*action)(char *entry, void *arg), void *arg) {
DIR *d;
struct dirent *ent;
char full_path[MAX_PATH];
if((d = opendir(dir)) == NULL) {
printf("Could not open dir %s\n", dir);
return;
}
while((ent = readdir(d)) != NULL) {
if(strcmp(ent->d_name, ".") == 0 || strcmp(ent->d_name, "..") ==0)
continue;
snprintf(full_path, MAX_PATH, "%s/%s", dir, ent->d_name);
action(full_path, arg);
}
closedir(d);
}
void get_entries(char *dir, queue q) {
walk_dir(dir, add_files, &q);
walk_dir(dir, recurse, &q);
}
void check(struct options opt) {
queue in_q;
struct file_md5 *md5_in, md5_file;
in_q = q_create(opt.queue_size);
read_hash_file(opt.file, opt.dir, in_q);
while((md5_in = q_remove(in_q))) {
printf("Se llama a q_remove in en check\n");
md5_file.file = md5_in->file;
sum_file(&md5_file);
if(memcmp(md5_file.hash, md5_in->hash, md5_file.hash_size)!=0) {
printf("File %s doesn't match.\nFound: ", md5_file.file);
print_hash(&md5_file);
printf("\nExpected: ");
print_hash(md5_in);
printf("\n");
}
free(md5_file.hash);
free(md5_in->file);
free(md5_in->hash);
free(md5_in);
}
q_destroy(in_q);
}
int get_entries_cast(void*ptr){
struct thread_get_entries_args * entries_args = ptr;
get_entries(entries_args->dir, entries_args->q);
printf("Llamada a q_terminar\n");
q_terminar(entries_args->q);
return 0;
}
void start_get_entries_thread(char *dir, queue in_q){
struct thread_get_entries_info* thread;
thread = malloc(sizeof(struct thread_get_entries_info));
if(thread == NULL){
printf("Not enough memory available.\n");
exit(1);
}
thread->entries_args = malloc(sizeof(struct thread_get_entries_args));
thread->entries_args->dir=dir;
thread->entries_args->q=in_q;
thread->entries_args->id=0;
if(0!= thrd_create(&thread->id, get_entries_cast, thread->entries_args)){
printf("FALLO AL CREAR\n");
}
global = thread->id;
}
void sum(struct options opt) {
queue in_q, out_q;
char *ent;
FILE *out;
struct file_md5 *md5;
int dirname_len;
in_q = q_create(opt.queue_size);
out_q = q_create(opt.queue_size);
start_get_entries_thread(opt.dir, in_q); //Use thread here instead of calling get_entries
printf("Va a entrar en remove\n");
while((ent = q_remove(in_q)) != NULL) {
md5 = malloc(sizeof(struct file_md5));
md5->file = ent;
sum_file(md5);
printf("Se llama a q_insert\n");
q_insert(out_q, md5);
}
printf("Llamada a q_terminar\n");
q_terminar(out_q);
if((out = fopen(opt.file, "w")) == NULL) {
printf("Could not open output file\n");
exit(0);
}
dirname_len = strlen(opt.dir) + 1; // length of dir + /
while((md5 = q_remove(out_q)) != NULL) {
printf("Se llama a q_remove out\n");
fprintf(out, "%s: ", md5->file + dirname_len);
for(int i = 0; i < md5->hash_size; i++)
fprintf(out, "%02hhx", md5->hash[i]);
fprintf(out, "\n");
free(md5->file);
free(md5->hash);
free(md5);
}
//if(thrd_join(thread->id, NULL)){
//printf("FALLO AL UNIR\n");
//}
fclose(out);
q_destroy(in_q);
q_destroy(out_q);
}
int main(int argc, char *argv[]) {
struct options opt;
opt.num_threads = 5;
opt.queue_size = 1;
opt.check = true;
opt.file = NULL;
opt.dir = NULL;
read_options (argc, argv, &opt);
if(opt.check)
check(opt);
else
sum(opt);
}
问题是,一开始它能工作,但是后来它在尝试插入时被无限地卡住了,我也不知道是否要使用thrd_join,提前感谢。
1条答案
按热度按时间to94eoyn1#
在队列代码中使用同步对象有几个缺点,其中包括:
q_terminar()
只唤醒在q->empty
CV上等待的线程,而不唤醒在q->full
CV上等待的线程。也许您预期只有在队列为空时才调用此函数,但对于该函数来说,额外广播到q->full
会更安全,而且成本也更低。q_insert()
读取q->terminado
时没有首先获取队列的互斥锁。这通常会造成数据争用。任何线程在不保持互斥锁锁定或依赖其他有效同步机制的情况下,都不应访问队列结构中除条件变量和互斥锁本身以外的任何成员。q->insert()
只在函数入口处检查q->terminado
一次。每次从CV等待中唤醒时,它都应该再次检查,如果发现该成员为真,则采取适当的操作。q_insert()
仅在插入一个条目后队列大小正好为1时才向q->empty
CV广播。这 * 可能 * 足够了,但我建议无条件地执行该广播,因为这样更容易推理并确信它在所有情况下都是正确的。q_remove()
中,我建议将if(q->terminado == true)
块移到while
之后,并删除if (q->used == 0)
块(使用q->used == 0
时控件可以到达该点的唯一方法是如果q->termindo == true
)。当前代码在这方面没有错误,但它 * 是 * 冗余的。如果您愿意,在if(q->terminado == true)
块中添加一个assert(q->used == 0)
(我会这样做)。q_remove()
来说,既向q->full
CV发送信号又向其广播是多余的。广播可能是要保留的广播。但是,最好无条件地执行该广播,而不是仅在删除项后的队列大小正好为q->size - 1
时才执行。此外,
sum()
在不先加入第二个线程的情况下拆除队列。只有在互斥锁被销毁后没有线程将其锁定或可以尝试锁定它的情况下,销毁互斥锁才是安全的。只有在没有线程正在等待CV或可以尝试等待它、向它发送信号、或者在队列销毁之后向它广播。我不明白你怎么能确信你的队列销毁在这些方面是安全的,直到另一个线程(s)被确认为已经终止,并且加入该线程似乎是获得这种确认的最佳方式。out_q
上的项入队,但没有其他线程将这些项出队。初始线程只有在将所有项从in_q
出队之后才尝试将它们出队。因此,如果存在多于out_q
一次可以容纳的项目,则初始线程将最终填充该队列并阻止尝试添加另一项目。在此之后,它将不再从in_q
中取出任何项目,这可能会导致其他线程也将该队列填满并阻塞。