Browse code

Improve parallelism of search workers

Previously the list of candidates was split between threads a priori,
with each thread being evenly distributed a contiguous range from the
search candidates.

This did a bad job of distributing the work evenly. There are likely to
be areas with significantly more matches than others (ex. files within
directories which match the search terms), as well as areas with longer
strings than others (ex. deep directories).

Because of the type of data fzy receives, work allocation needs to be
dynamic.

This commit changes the workers to operate on the candidates in batches,
until they have all been processed. Batches are allocated by locking a
mutex and grabbing the next available range of BATCH_SIZE candidates.

BATCH_SIZE is currently set at 512, which worked best on my laptop in a
quick test. This will always be a compromise. Small batch sizes will
distribute the work more evenly, but larger batch sizes will be
friendlier to CPU caches.

Quick testing:

Before:
./fzy -e drivers --benchmark < linux_files.txt 1.69s user 0.03s system 163% cpu 1.053 total

After:
./fzy -e drivers --benchmark < linux_files.txt 2.12s user 0.02s system 296% cpu 0.721 total

John Hawthorn authored on 08/01/2017 09:18:43
Showing 1 changed files

... ...
@@ -138,9 +138,13 @@ size_t choices_available(choices_t *c) {
138 138
 	return c->available;
139 139
 }
140 140
 
141
+#define BATCH_SIZE 512
142
+
141 143
 struct search_job {
144
+	pthread_mutex_t lock;
142 145
 	choices_t *choices;
143 146
 	const char *search;
147
+	size_t processed;
144 148
 };
145 149
 
146 150
 struct worker {
... ...
@@ -151,19 +155,41 @@ struct worker {
151 155
 	size_t available;
152 156
 };
153 157
 
158
+static void worker_get_next_batch(struct search_job *job, size_t *start, size_t *end) {
159
+	pthread_mutex_lock(&job->lock);
160
+
161
+	*start = job->processed;
162
+
163
+	job->processed += BATCH_SIZE;
164
+	if (job->processed > job->choices->size) {
165
+		job->processed = job->choices->size;
166
+	}
167
+
168
+	*end = job->processed;
169
+
170
+	pthread_mutex_unlock(&job->lock);
171
+}
172
+
154 173
 static void *choices_search_worker(void *data) {
155 174
 	struct worker *w = (struct worker *)data;
156 175
 	struct search_job *job = w->job;
157 176
 	const choices_t *c = job->choices;
158 177
 
159
-	size_t start = (w->worker_num) * c->size / c->worker_count;
160
-	size_t end = (w->worker_num + 1) * c->size / c->worker_count;
178
+	size_t start, end;
179
+
180
+	for(;;) {
181
+		worker_get_next_batch(job, &start, &end);
182
+
183
+		if(start == end) {
184
+			break;
185
+		}
161 186
 
162
-	for(size_t i = start; i < end; i++) {
163
-		if (has_match(job->search, c->strings[i])) {
164
-			w->results[w->available].str = c->strings[i];
165
-			w->results[w->available].score = match(job->search, c->strings[i]);
166
-			w->available++;
187
+		for(size_t i = start; i < end; i++) {
188
+			if (has_match(job->search, c->strings[i])) {
189
+				w->results[w->available].str = c->strings[i];
190
+				w->results[w->available].score = match(job->search, c->strings[i]);
191
+				w->available++;
192
+			}
167 193
 		}
168 194
 	}
169 195
 
... ...
@@ -176,6 +202,10 @@ void choices_search(choices_t *c, const char *search) {
176 202
 	struct search_job *job = calloc(1, sizeof(struct search_job));
177 203
 	job->search = search;
178 204
 	job->choices = c;
205
+	if (pthread_mutex_init(&job->lock, NULL) != 0) {
206
+		fprintf(stderr, "Error: pthread_mutex_init failed\n");
207
+		abort();
208
+	}
179 209
 
180 210
 	/* allocate storage for our results */
181 211
 	c->results = malloc(c->size * sizeof(struct scored_result));
... ...
@@ -208,6 +238,8 @@ void choices_search(choices_t *c, const char *search) {
208 238
 
209 239
 		free(w->results);
210 240
 	}
241
+
242
+	pthread_mutex_destroy(&job->lock);
211 243
 	free(workers);
212 244
 
213 245
 	if(*search) {