Browse code

Perform sort in parallel

John Hawthorn authored on 15/01/2017 05:19:45
Showing 1 changed files

... ...
@@ -193,9 +193,48 @@ static void *choices_search_worker(void *data) {
193 193
 		}
194 194
 	}
195 195
 
196
+	/* Sort the partial result */
197
+	qsort(w->results, w->available, sizeof(struct scored_result), cmpchoice);
198
+
196 199
 	return w;
197 200
 }
198 201
 
202
+static void merge_step(struct search_job *job, struct worker *all_workers) {
203
+	/* Merge our sorted partial-results */
204
+	choices_t *c = job->choices;
205
+	size_t worker_count = 0;
206
+	struct worker *workers[c->worker_count];
207
+	size_t indexes[c->worker_count];
208
+
209
+	for (unsigned int w = 0; w < c->worker_count; w++) {
210
+		indexes[w] = 0;
211
+		if (all_workers[w].available) {
212
+			workers[worker_count++] = &all_workers[w];
213
+		}
214
+	}
215
+
216
+	while (worker_count) {
217
+		/* Loop over each sorted block to find the lowest scoring result */
218
+		unsigned int min_w = 0;
219
+		for (unsigned int w = 0; w < worker_count; w++) {
220
+			if (cmpchoice(&workers[w]->results[indexes[w]], &workers[min_w]->results[indexes[min_w]]) < 0) {
221
+				min_w = w;
222
+			}
223
+		}
224
+
225
+		/* Move that result onto our global list */
226
+		c->results[c->available++] = workers[min_w]->results[indexes[min_w]++];
227
+
228
+		/* If we have merged all the results for this worker, shuffle it
229
+		 * out of our list */
230
+		if (indexes[min_w] == workers[min_w]->available) {
231
+			indexes[min_w] = indexes[worker_count - 1];
232
+			workers[min_w] = workers[worker_count - 1];
233
+			worker_count--;
234
+		}
235
+	}
236
+}
237
+
199 238
 void choices_search(choices_t *c, const char *search) {
200 239
 	choices_reset_search(c);
201 240
 
... ...
@@ -226,26 +265,21 @@ void choices_search(choices_t *c, const char *search) {
226 265
 	}
227 266
 
228 267
 	for (unsigned int i = 0; i < c->worker_count; i++) {
229
-		struct worker *w = &workers[i];
230
-
231
-		if (pthread_join(w->thread_id, NULL)) {
268
+		if (pthread_join(workers[i].thread_id, NULL)) {
232 269
 			perror("pthread_join");
233 270
 			exit(EXIT_FAILURE);
234 271
 		}
235 272
 
236
-		memcpy(&c->results[c->available], w->results,  w->available * sizeof(struct scored_result));
237
-		c->available += w->available;
238
-
239
-		free(w->results);
240 273
 	}
241 274
 
242
-	pthread_mutex_destroy(&job->lock);
243
-	free(workers);
244
-	free(job);
275
+	merge_step(job, workers);
245 276
 
246
-	if(*search) {
247
-		qsort(c->results, c->available, sizeof(struct scored_result), cmpchoice);
277
+	for (unsigned int i = 0; i < c->worker_count; i++) {
278
+		free(workers[i].results);
248 279
 	}
280
+	free(workers);
281
+	pthread_mutex_destroy(&job->lock);
282
+	free(job);
249 283
 }
250 284
 
251 285
 const char *choices_get(choices_t *c, size_t n) {