Browse code

Merge partially sorted lists in parallel

John Hawthorn authored on 15/01/2017 06:10:16
Showing 1 changed files

... ...
@@ -3,6 +3,7 @@
3 3
 #include <string.h>
4 4
 #include <pthread.h>
5 5
 #include <unistd.h>
6
+#include <errno.h>
6 7
 
7 8
 #include "choices.h"
8 9
 #include "match.h"
... ...
@@ -150,14 +151,14 @@ struct search_job {
150 151
 	choices_t *choices;
151 152
 	const char *search;
152 153
 	size_t processed;
154
+	struct worker *workers;
153 155
 };
154 156
 
155 157
 struct worker {
156 158
 	pthread_t thread_id;
157 159
 	struct search_job *job;
158
-	size_t worker_num;
159
-	struct scored_result *results;
160
-	size_t available;
160
+	unsigned int worker_num;
161
+	struct result_list result;
161 162
 };
162 163
 
163 164
 static void worker_get_next_batch(struct search_job *job, size_t *start, size_t *end) {
... ...
@@ -175,35 +176,6 @@ static void worker_get_next_batch(struct search_job *job, size_t *start, size_t
175 176
 	pthread_mutex_unlock(&job->lock);
176 177
 }
177 178
 
178
-static void *choices_search_worker(void *data) {
179
-	struct worker *w = (struct worker *)data;
180
-	struct search_job *job = w->job;
181
-	const choices_t *c = job->choices;
182
-
183
-	size_t start, end;
184
-
185
-	for(;;) {
186
-		worker_get_next_batch(job, &start, &end);
187
-
188
-		if(start == end) {
189
-			break;
190
-		}
191
-
192
-		for(size_t i = start; i < end; i++) {
193
-			if (has_match(job->search, c->strings[i])) {
194
-				w->results[w->available].str = c->strings[i];
195
-				w->results[w->available].score = match(job->search, c->strings[i]);
196
-				w->available++;
197
-			}
198
-		}
199
-	}
200
-
201
-	/* Sort the partial result */
202
-	qsort(w->results, w->available, sizeof(struct scored_result), cmpchoice);
203
-
204
-	return w;
205
-}
206
-
207 179
 static struct result_list merge2(struct result_list list1, struct result_list list2) {
208 180
 	size_t result_index = 0, index1 = 0, index2 = 0;
209 181
 
... ...
@@ -236,25 +208,51 @@ static struct result_list merge2(struct result_list list1, struct result_list li
236 208
 	return result;
237 209
 }
238 210
 
239
-static void merge_step(struct search_job *job, struct worker *workers) {
240
-	/* Merge our sorted partial-results */
241
-	choices_t *c = job->choices;
211
+static void *choices_search_worker(void *data) {
212
+	struct worker *w = (struct worker *)data;
213
+	struct search_job *job = w->job;
214
+	const choices_t *c = job->choices;
215
+	struct result_list *result = &w->result;
242 216
 
243
-	struct result_list result = {NULL, 0};
217
+	size_t start, end;
218
+
219
+	for(;;) {
220
+		worker_get_next_batch(job, &start, &end);
221
+
222
+		if(start == end) {
223
+			break;
224
+		}
225
+
226
+		for(size_t i = start; i < end; i++) {
227
+			if (has_match(job->search, c->strings[i])) {
228
+				result->list[result->size].str = c->strings[i];
229
+				result->list[result->size].score = match(job->search, c->strings[i]);
230
+				result->size++;
231
+			}
232
+		}
233
+	}
244 234
 
245
-	for (unsigned int w = 0; w < c->worker_count; w++) {
246
-		struct result_list new_result;
247
-		struct worker *worker = &workers[w];
235
+	/* Sort the partial result */
236
+	qsort(result->list, result->size, sizeof(struct scored_result), cmpchoice);
237
+
238
+	/* Fan-in, merging results */
239
+	for(unsigned int step = 0;; step++) {
240
+		if (w->worker_num % (2 << step))
241
+			break;
248 242
 
249
-		struct result_list worker_result = {worker->results, worker->available};
250
-		new_result = merge2(result, worker_result);
243
+		unsigned int next_worker = w->worker_num | (1 << step);
244
+		if (next_worker >= c->worker_count)
245
+			break;
246
+
247
+		if ((errno = pthread_join(job->workers[next_worker].thread_id, NULL))) {
248
+			perror("pthread_join");
249
+			exit(EXIT_FAILURE);
250
+		}
251 251
 
252
-		free(result.list);
253
-		result = new_result;
252
+		w->result = merge2(w->result, job->workers[next_worker].result);
254 253
 	}
255 254
 
256
-	c->results = result.list;
257
-	c->available = result.size;
255
+	return NULL;
258 256
 }
259 257
 
260 258
 void choices_search(choices_t *c, const char *search) {
... ...
@@ -267,31 +265,30 @@ void choices_search(choices_t *c, const char *search) {
267 265
 		fprintf(stderr, "Error: pthread_mutex_init failed\n");
268 266
 		abort();
269 267
 	}
268
+	job->workers = calloc(c->worker_count, sizeof(struct worker));
270 269
 
271
-	struct worker *workers = calloc(c->worker_count, sizeof(struct worker));
272
-	for (unsigned int i = 0; i < c->worker_count; i++) {
270
+	struct worker *workers = job->workers;
271
+	for (int i = c->worker_count - 1; i >= 0; i--) {
273 272
 		workers[i].job = job;
274 273
 		workers[i].worker_num = i;
275
-		workers[i].results = malloc(c->size * sizeof(struct scored_result)); /* FIXME: This is overkill */
276
-		if (pthread_create(&workers[i].thread_id, NULL, &choices_search_worker, &workers[i])) {
274
+		workers[i].result.size = 0;
275
+		workers[i].result.list = malloc(c->size * sizeof(struct scored_result)); /* FIXME: This is overkill */
276
+
277
+		/* These must be created last-to-first to avoid a race condition when fanning in */
278
+		if ((errno = pthread_create(&workers[i].thread_id, NULL, &choices_search_worker, &workers[i]))) {
277 279
 			perror("pthread_create");
278 280
 			exit(EXIT_FAILURE);
279 281
 		}
280 282
 	}
281 283
 
282
-	for (unsigned int i = 0; i < c->worker_count; i++) {
283
-		if (pthread_join(workers[i].thread_id, NULL)) {
284
-			perror("pthread_join");
285
-			exit(EXIT_FAILURE);
286
-		}
287
-
284
+	if (pthread_join(workers[0].thread_id, NULL)) {
285
+		perror("pthread_join");
286
+		exit(EXIT_FAILURE);
288 287
 	}
289 288
 
290
-	merge_step(job, workers);
289
+	c->results = workers[0].result.list;
290
+	c->available = workers[0].result.size;
291 291
 
292
-	for (unsigned int i = 0; i < c->worker_count; i++) {
293
-		free(workers[i].results);
294
-	}
295 292
 	free(workers);
296 293
 	pthread_mutex_destroy(&job->lock);
297 294
 	free(job);