concurrency - C uninitialized mutex works and initialized mutex fails? -
my c program creates producer thread, saving data fast possible. main thread consumes , prints these.
after days of bug finding, noticed if mutex initialized, program stops within 30 seconds (deadlock?).
however if mutex left uninitialized works perfectly.
can explain this?? avoid undefined behavior, prefer initialize them if possible.
further info: it's locking if "pthread_mutex_t signalm" (the signaling mutex) initialized
initialized
#include <stdlib.h> // exit_failure, exit_success #include <stdio.h> // stdin, stdout, printf #include <pthread.h> // threads #include <string.h> // string #include <unistd.h> // sleep #include <stdbool.h> // bool #include <fcntl.h> // open struct event { pthread_mutex_t critical; pthread_mutex_t signalm; pthread_cond_t signalc; int eventcount; }; struct allvars { struct event inevents; struct event outevents; int buffersize; char buffer[10][128]; }; /** * advance eventcount */ void advance(struct event *event) { // increment event counter pthread_mutex_lock(&event->critical); event->eventcount++; pthread_mutex_unlock(&event->critical); // signal await continue pthread_mutex_lock(&event->signalm); pthread_cond_signal(&event->signalc); pthread_mutex_unlock(&event->signalm); } /** * wait ticket , buffer availability */ void await(struct event *event, int ticket) { int eventcount; // counter pthread_mutex_lock(&event->critical); eventcount = event->eventcount; pthread_mutex_unlock(&event->critical); // lock signaling mutex pthread_mutex_lock(&event->signalm); // loop until ticket machine shows number while (ticket > eventcount) { // wait until ticket called pthread_cond_wait(&event->signalc, &event->signalm); // counter pthread_mutex_lock(&event->critical); eventcount = event->eventcount; pthread_mutex_unlock(&event->critical); } // unlock signaling mutex pthread_mutex_unlock(&event->signalm); } /** * add buffer */ void putbuffer(struct allvars *allvars, char data[]) { // current write position pthread_mutex_lock(&allvars->inevents.critical); int in = allvars->inevents.eventcount; pthread_mutex_unlock(&allvars->inevents.critical); // wait until theres space free in buffer await(&allvars->outevents, in - allvars->buffersize + 1); // set 2 keep 1 index distance // add data buffer strcpy(allvars->buffer[in % allvars->buffersize], data); // increment eventcounter , signal advance(&allvars->inevents); } /** * buffer */ char *getbuffer(struct allvars *allvars) { // current read position pthread_mutex_lock(&allvars->outevents.critical); int out = allvars->outevents.eventcount; pthread_mutex_unlock(&allvars->outevents.critical); // wait until theres in buffer await(&allvars->inevents, out + 1); // allocate memory returned string char *str = malloc(128); // buffer data strcpy(str, allvars->buffer[out % allvars->buffersize]); // increment eventcounter , signal advance(&allvars->outevents); return str; } /** child thread (producer) */ void *childthread(void *allvars) { char str[10]; int count = 0; while (true) { sprintf(str, "%d", count++); putbuffer(allvars, str); } pthread_exit(exit_success); } int main(void) { // init structs struct event inevents = { pthread_mutex_initializer, pthread_mutex_initializer, pthread_cond_initializer, 0 }; struct event outevents = { pthread_mutex_initializer, pthread_mutex_initializer, pthread_cond_initializer, 0 }; struct allvars allvars = { inevents, // events outevents, 10, // buffersize {"", {""}} // buffer[][] }; // create child thread (producer) pthread_t thread; if (pthread_create(&thread, null, childthread, &allvars)) { fprintf(stderr, "failed create child thread"); exit(exit_failure); } // (consumer) while (true) { char *out = getbuffer(&allvars); printf("buf: %s\n", out); free(out); } return (exit_success); } uninitialized
#include <stdlib.h> // exit_failure, exit_success #include <stdio.h> // stdin, stdout, printf #include <pthread.h> // threads #include <string.h> // string #include <unistd.h> // sleep #include <stdbool.h> // bool #include <fcntl.h> // open struct event { pthread_mutex_t critical; pthread_mutex_t signalm; pthread_cond_t signalc; int eventcount; }; struct allvars { struct event inevents; struct event outevents; int buffersize; char buffer[10][128]; }; /** * advance eventcount */ void advance(struct event *event) { // increment event counter pthread_mutex_lock(&event->critical); event->eventcount++; pthread_mutex_unlock(&event->critical); // signal await continue pthread_mutex_lock(&event->signalm); pthread_cond_signal(&event->signalc); pthread_mutex_unlock(&event->signalm); } /** * wait ticket , buffer availability */ void await(struct event *event, int ticket) { int eventcount; // counter pthread_mutex_lock(&event->critical); eventcount = event->eventcount; pthread_mutex_unlock(&event->critical); // lock signaling mutex pthread_mutex_lock(&event->signalm); // loop until ticket machine shows number while (ticket > eventcount) { // wait until ticket called pthread_cond_wait(&event->signalc, &event->signalm); // counter pthread_mutex_lock(&event->critical); eventcount = event->eventcount; pthread_mutex_unlock(&event->critical); } // unlock signaling mutex pthread_mutex_unlock(&event->signalm); } /** * add buffer */ void putbuffer(struct allvars *allvars, char data[]) { // current write position pthread_mutex_lock(&allvars->inevents.critical); int in = allvars->inevents.eventcount; pthread_mutex_unlock(&allvars->inevents.critical); // wait until theres space free in buffer await(&allvars->outevents, in - allvars->buffersize + 1); // set 2 keep 1 index distance // add data buffer strcpy(allvars->buffer[in % allvars->buffersize], data); // increment eventcounter , signal advance(&allvars->inevents); } /** * buffer */ char *getbuffer(struct allvars *allvars) { // current read position pthread_mutex_lock(&allvars->outevents.critical); int out = allvars->outevents.eventcount; pthread_mutex_unlock(&allvars->outevents.critical); // wait until theres in buffer await(&allvars->inevents, out + 1); // allocate memory returned string char *str = malloc(128); // buffer data strcpy(str, allvars->buffer[out % allvars->buffersize]); // increment eventcounter , signal advance(&allvars->outevents); return str; } /** child thread (producer) */ void *childthread(void *allvars) { char str[10]; int count = 0; while (true) { sprintf(str, "%d", count++); putbuffer(allvars, str); } pthread_exit(exit_success); } int main(void) { // init structs struct event inevents; /* = { pthread_mutex_initializer, pthread_mutex_initializer, pthread_cond_initializer, 0 }; */ struct event outevents; /* = { pthread_mutex_initializer, pthread_mutex_initializer, pthread_cond_initializer, 0 }; */ struct allvars allvars = { inevents, // events outevents, 10, // buffersize {"", {""}} // buffer[][] }; // create child thread (producer) pthread_t thread; if (pthread_create(&thread, null, childthread, &allvars)) { fprintf(stderr, "failed create child thread"); exit(exit_failure); } // (consumer) while (true) { char *out = getbuffer(&allvars); printf("buf: %s\n", out); free(out); } return (exit_success); }
jonathan explained why code didn't initialize mutexes didn't deadlock (essentially because trying use uninitialized mutex never block, return error).
the problem causing infinite wait in version of program initialize mutexes aren't using condition variables properly. check of predicate expression , wait on condition variable must done atomically respect whatever other thread might modifying predicate. code checking predicate local variable other thread doesn't have access to. code reads actual predicate local variable within critical section, mutex reading predicate released , different mutex acquired read 'false' predicate (which cannot modified other thread anyway) atomically condition variable wait.
so have situation actual predicate, event->eventcount, can modified , signal modification issued in between when waiting thread reads predicate , blocks on condition variable.
i think following fix deadlock, haven't had chance perform testing. change remove signalm mutex struct event , replace use of critical mutex:
#include <stdlib.h> // exit_failure, exit_success #include <stdio.h> // stdin, stdout, printf #include <pthread.h> // threads #include <string.h> // string #include <unistd.h> // sleep #include <stdbool.h> // bool #include <fcntl.h> // open struct event { pthread_mutex_t critical; pthread_cond_t signalc; int eventcount; }; struct allvars { struct event inevents; struct event outevents; int buffersize; char buffer[10][128]; }; /** * advance eventcount */ void advance(struct event *event) { // increment event counter pthread_mutex_lock(&event->critical); event->eventcount++; pthread_mutex_unlock(&event->critical); // signal await continue pthread_cond_signal(&event->signalc); } /** * wait ticket , buffer availability */ void await(struct event *event, int ticket) { // counter pthread_mutex_lock(&event->critical); // loop until ticket machine shows number while (ticket > event->eventcount) { // wait until ticket called pthread_cond_wait(&event->signalc, &event->critical); } // unlock signaling mutex pthread_mutex_unlock(&event->critical); } /** * add buffer */ void putbuffer(struct allvars *allvars, char data[]) { // current write position pthread_mutex_lock(&allvars->inevents.critical); int in = allvars->inevents.eventcount; pthread_mutex_unlock(&allvars->inevents.critical); // wait until theres space free in buffer await(&allvars->outevents, in - allvars->buffersize + 1); // set 2 keep 1 index distance // add data buffer strcpy(allvars->buffer[in % allvars->buffersize], data); // increment eventcounter , signal advance(&allvars->inevents); } /** * buffer */ char *getbuffer(struct allvars *allvars) { // current read position pthread_mutex_lock(&allvars->outevents.critical); int out = allvars->outevents.eventcount; pthread_mutex_unlock(&allvars->outevents.critical); // wait until theres in buffer await(&allvars->inevents, out + 1); // allocate memory returned string char *str = malloc(128); // buffer data strcpy(str, allvars->buffer[out % allvars->buffersize]); // increment eventcounter , signal advance(&allvars->outevents); return str; } /** child thread (producer) */ void *childthread(void *allvars) { char str[10]; int count = 0; while (true) { sprintf(str, "%d", count++); putbuffer(allvars, str); } pthread_exit(exit_success); } int main(void) { // init structs struct event inevents = { pthread_mutex_initializer, pthread_cond_initializer, 0 }; struct event outevents = { pthread_mutex_initializer, pthread_cond_initializer, 0 }; struct allvars allvars = { inevents, // events outevents, 10, // buffersize {"", {""}} // buffer[][] }; // create child thread (producer) pthread_t thread; if (pthread_create(&thread, null, childthread, &allvars)) { fprintf(stderr, "failed create child thread"); exit(exit_failure); } // (consumer) while (true) { char *out = getbuffer(&allvars); printf("buf: %s\n", out); free(out); } return (exit_success); }
Comments
Post a Comment