diff options
author | Gerald Combs <gerald@wireshark.org> | 2018-11-29 15:59:25 -0800 |
---|---|---|
committer | Anders Broman <a.broman58@gmail.com> | 2018-11-30 06:11:59 +0000 |
commit | 1bab83de53995d87c6032cc4872dbc7be2c04cca (patch) | |
tree | f365d63ca83ff5f23f9f28caac4ae577bb4c68ca /epan/maxmind_db.c | |
parent | af6aa3f76aca1d143a81a67b679cf2a333415d47 (diff) |
maxmind: Move response processing to a thread.
Move response processing to a thread. Blind attempt at fixing bug 14701.
Bug: 14701
Change-Id: I2b7e6a0669c4784c7c169e659fa37ea2e62c96a3
Reviewed-on: https://code.wireshark.org/review/30837
Petri-Dish: Gerald Combs <gerald@wireshark.org>
Tested-by: Petri Dish Buildbot
Reviewed-by: Anders Broman <a.broman58@gmail.com>
Diffstat (limited to 'epan/maxmind_db.c')
-rw-r--r-- | epan/maxmind_db.c | 248 |
1 files changed, 150 insertions, 98 deletions
diff --git a/epan/maxmind_db.c b/epan/maxmind_db.c index 5e8fb1696c..c939b871a4 100644 --- a/epan/maxmind_db.c +++ b/epan/maxmind_db.c @@ -41,14 +41,22 @@ static mmdb_lookup_t mmdb_not_found; // - Switch to a different format? I was going to use g_key_file_* to parse // the mmdbresolve output, but it was easier to just parse it directly. -static GThread *mmdbr_thread; +static GThread *write_mmdbr_stdin_thread; static GAsyncQueue *mmdbr_request_q; // g_allocated char * -static GMutex mmdbr_pipe_mtx; +static GRWLock mmdbr_pipe_mtx; // Hashes of mmdb_lookup_t +typedef struct _mmdbr_response_t { + gboolean is_ipv4; + ws_in4_addr ipv4_addr; + ws_in6_addr ipv6_addr; + mmdb_lookup_t mmdb_val; +} mmdb_response_t; + static wmem_map_t *mmdb_ipv4_map; static wmem_map_t *mmdb_ipv6_map; -static gboolean new_entries; +static GAsyncQueue *mmdbr_response_q; // g_allocated mmdbr_response_t * +static GThread *read_mmdbr_stdout_thread; // Interned strings static wmem_map_t *mmdb_str_chunk; @@ -56,7 +64,6 @@ static wmem_map_t *mmdb_ipv6_chunk; /* Child mmdbresolve process */ static ws_pipe_t mmdbr_pipe; // Requires mutex -static FILE *mmdbr_stdout; // Requires mutex /* UAT definitions. Copied from oids.c */ typedef struct _maxmind_db_path_t { @@ -91,7 +98,7 @@ static GPtrArray *mmdb_file_arr; // .mmdb files #define MMDB_DEBUG(...) #endif -static void mmdb_resolve_stop(gboolean lock_mutex); +static void mmdb_resolve_stop(void); // Hopefully scanning a few lines asynchronously has less overhead than // reading in a child thread. @@ -135,20 +142,21 @@ static void init_lookup(mmdb_lookup_t *lookup) { } static gboolean mmdbr_pipe_valid(void) { - g_mutex_lock(&mmdbr_pipe_mtx); + g_rw_lock_reader_lock(&mmdbr_pipe_mtx); gboolean pipe_valid = ws_pipe_valid(&mmdbr_pipe); - g_mutex_unlock(&mmdbr_pipe_mtx); + g_rw_lock_reader_unlock(&mmdbr_pipe_mtx); return pipe_valid; } // Writing to mmdbr_pipe.stdin_fd can block. Do so in a separate thread. #define MMDB_WAIT_TIME (150 * 1000) // microseconds static gpointer -write_mmdbr_stdin_worker(gpointer data _U_) { +write_mmdbr_stdin_worker(gpointer sifd_data) { + int stdin_fd = GPOINTER_TO_INT(sifd_data); while (1) { if (!mmdbr_pipe_valid()) { // Should be due to mmdb_resolve_stop. - MMDB_DEBUG("invalid mmdbr pipe. exiting thread."); + MMDB_DEBUG("invalid mmdbr stdin pipe. exiting thread."); return NULL; } @@ -158,9 +166,7 @@ write_mmdbr_stdin_worker(gpointer data _U_) { } MMDB_DEBUG("write %s ql %d", request, g_async_queue_length(mmdbr_request_q)); - g_mutex_lock(&mmdbr_pipe_mtx); - ssize_t req_status = ws_write(mmdbr_pipe.stdin_fd, request, (unsigned int)strlen(request)); - g_mutex_unlock(&mmdbr_pipe_mtx); + ssize_t req_status = ws_write(stdin_fd, request, (unsigned int)strlen(request)); if (req_status < 0) { MMDB_DEBUG("write error %s. exiting thread.", g_strerror(errno)); return NULL; @@ -170,33 +176,38 @@ write_mmdbr_stdin_worker(gpointer data _U_) { return NULL; } -static void -read_mmdbr_stdout(void) { - static char cur_addr[WS_INET6_ADDRSTRLEN]; - static mmdb_lookup_t cur_lookup; - - g_mutex_lock(&mmdbr_pipe_mtx); - if (!ws_pipe_valid(&mmdbr_pipe)) { - g_mutex_unlock(&mmdbr_pipe_mtx); - return; - } - MMDB_DEBUG("read mmdbr %d", ws_pipe_data_available(mmdbr_pipe.stdout_fd)); +static gpointer +read_mmdbr_stdout_worker(gpointer sofd_data) { + mmdb_response_t *response = g_new0(mmdb_response_t, 1); + int stdout_fd = GPOINTER_TO_INT(sofd_data); + FILE *stdout_fp = ws_fdopen(stdout_fd, "r"); + GString *country_iso = g_string_new(""); + GString *country = g_string_new(""); + GString *city = g_string_new(""); + GString *as_org = g_string_new(""); int read_buf_size = 2048; char *read_buf = (char *) g_malloc(read_buf_size); - while (ws_pipe_data_available(mmdbr_pipe.stdout_fd)) { + while (1) { + char cur_addr[WS_INET6_ADDRSTRLEN]; + + if (!mmdbr_pipe_valid()) { + // Should be due to mmdb_resolve_stop. + MMDB_DEBUG("invalid mmdbr stdout pipe. exiting thread."); + break; + } + read_buf[0] = '\0'; - char *line = fgets(read_buf, read_buf_size, mmdbr_stdout); - if (!line || ferror(mmdbr_stdout)) { + char *line = fgets(read_buf, read_buf_size, stdout_fp); + if (!line || ferror(stdout_fp)) { MMDB_DEBUG("read error %s", g_strerror(errno)); - mmdb_resolve_stop(FALSE); break; } line = g_strstrip(line); size_t line_len = strlen(line); - MMDB_DEBUG("read %zd bytes, feof %d: %s", line_len, feof(mmdbr_stdout), line); + MMDB_DEBUG("read %zd bytes, feof %d: %s", line_len, feof(stdout_fp), line); if (line_len < 1) continue; char *val_start = strchr(line, ':'); @@ -206,67 +217,82 @@ read_mmdbr_stdout(void) { // [init] or resolved address in square brackets. line[line_len - 1] = '\0'; g_strlcpy(cur_addr, line + 1, WS_INET6_ADDRSTRLEN); - init_lookup(&cur_lookup); + init_lookup(&response->mmdb_val); } else if (strcmp(line, RES_STATUS_ERROR) == 0) { // Error during init. cur_addr[0] = '\0'; - init_lookup(&cur_lookup); - mmdb_resolve_stop(FALSE); + init_lookup(&response->mmdb_val); + break; } else if (val_start && g_str_has_prefix(line, RES_COUNTRY_ISO_CODE)) { - cur_lookup.found = TRUE; - cur_lookup.country_iso = chunkify_string(val_start); + response->mmdb_val.found = TRUE; + g_string_printf(country_iso, "%s", val_start); } else if (val_start && g_str_has_prefix(line, RES_COUNTRY_NAMES_EN)) { - cur_lookup.found = TRUE; - cur_lookup.country = chunkify_string(val_start); + response->mmdb_val.found = TRUE; + g_string_printf(country, "%s", val_start); } else if (val_start && g_str_has_prefix(line, RES_CITY_NAMES_EN)) { - cur_lookup.found = TRUE; - cur_lookup.city = chunkify_string(val_start); + response->mmdb_val.found = TRUE; + g_string_printf(city, "%s", val_start); } else if (val_start && g_str_has_prefix(line, RES_ASN_ORG)) { - cur_lookup.found = TRUE; - cur_lookup.as_org = chunkify_string(val_start); + response->mmdb_val.found = TRUE; + g_string_printf(as_org, "%s", val_start); } else if (val_start && g_str_has_prefix(line, RES_ASN_NUMBER)) { - if (ws_strtou32(val_start, NULL, &cur_lookup.as_number)) { - cur_lookup.found = TRUE; + if (ws_strtou32(val_start, NULL, &response->mmdb_val.as_number)) { + response->mmdb_val.found = TRUE; } else { MMDB_DEBUG("Invalid as number: %s", val_start); } } else if (val_start && g_str_has_prefix(line, RES_LOCATION_LATITUDE)) { - cur_lookup.found = TRUE; - cur_lookup.latitude = g_ascii_strtod(val_start, NULL); + response->mmdb_val.found = TRUE; + response->mmdb_val.latitude = g_ascii_strtod(val_start, NULL); } else if (val_start && g_str_has_prefix(line, RES_LOCATION_LONGITUDE)) { - cur_lookup.found = TRUE; - cur_lookup.longitude = g_ascii_strtod(val_start, NULL); + response->mmdb_val.found = TRUE; + response->mmdb_val.longitude = g_ascii_strtod(val_start, NULL); } else if (g_str_has_prefix(line, RES_END)) { - if (cur_lookup.found) { - mmdb_lookup_t *mmdb_val = (mmdb_lookup_t *) wmem_memdup(wmem_epan_scope(), &cur_lookup, sizeof(cur_lookup)); + if (response->mmdb_val.found) { + if (country_iso->len) { + response->mmdb_val.country_iso = g_strdup(country_iso->str); + } + if (country->len) { + response->mmdb_val.country = g_strdup(country->str); + } + if (city->len) { + response->mmdb_val.city = g_strdup(city->str); + } + if (as_org->len) { + response->mmdb_val.as_org = g_strdup(as_org->str); + } if (strstr(cur_addr, ".")) { - MMDB_DEBUG("inserting v4 %p %s: city %s country %s", (void *) mmdb_val, cur_addr, mmdb_val->city, mmdb_val->country); - guint32 addr; - ws_inet_pton4(cur_addr, &addr); - wmem_map_insert(mmdb_ipv4_map, GUINT_TO_POINTER(addr), mmdb_val); - new_entries = TRUE; + ws_inet_pton4(cur_addr, &response->ipv4_addr); + response->is_ipv4 = TRUE; + MMDB_DEBUG("queued v4 %s: city %s country %s", cur_addr, response->mmdb_val.city, response->mmdb_val.country); } else if (strstr(cur_addr, ":")) { - MMDB_DEBUG("inserting v6 %p %s: city %s country %s", (void *) mmdb_val, cur_addr, mmdb_val->city, mmdb_val->country); - ws_in6_addr addr; - ws_inet_pton6(cur_addr, &addr); - wmem_map_insert(mmdb_ipv6_map, chunkify_v6_addr(&addr), mmdb_val); - new_entries = TRUE; + ws_inet_pton6(cur_addr, &response->ipv6_addr); + MMDB_DEBUG("queued v6 %s: city %s country %s", cur_addr, response->mmdb_val.city, response->mmdb_val.country); } + g_async_queue_push(mmdbr_response_q, response); // Will be freed by maxmind_db_lookup_process. + response = g_new0(mmdb_response_t, 1); } cur_addr[0] = '\0'; - init_lookup(&cur_lookup); + init_lookup(&response->mmdb_val); } } - g_mutex_unlock(&mmdbr_pipe_mtx); + g_string_free(country_iso, TRUE); + g_string_free(country, TRUE); + g_string_free(city, TRUE); + g_string_free(as_org, TRUE); g_free(read_buf); + g_free(response); + return NULL; } /** * Stop our mmdbresolve process. + * Can be called from any thread. */ -static void mmdb_resolve_stop(gboolean lock_mutex) { +static void mmdb_resolve_stop(void) { char *request; + mmdb_response_t *response; while (mmdbr_request_q && (request = (char *) g_async_queue_try_pop(mmdbr_request_q)) != NULL) { g_free(request); @@ -277,18 +303,26 @@ static void mmdb_resolve_stop(gboolean lock_mutex) { return; } - if (lock_mutex) - g_mutex_lock(&mmdbr_pipe_mtx); + g_rw_lock_writer_lock(&mmdbr_pipe_mtx); ws_close(mmdbr_pipe.stdin_fd); - fclose(mmdbr_stdout); + ws_close(mmdbr_pipe.stdout_fd); MMDB_DEBUG("closing pid %d", mmdbr_pipe.pid); ws_pipe_close(&mmdbr_pipe); - mmdbr_stdout = NULL; - if (lock_mutex) - g_mutex_unlock(&mmdbr_pipe_mtx); + g_rw_lock_writer_unlock(&mmdbr_pipe_mtx); + + g_thread_join(write_mmdbr_stdin_thread); + write_mmdbr_stdin_thread = NULL; + + g_thread_join(read_mmdbr_stdout_thread); + read_mmdbr_stdout_thread = NULL; - g_thread_join(mmdbr_thread); - mmdbr_thread = NULL; + while (mmdbr_response_q && (response = (mmdb_response_t *) g_async_queue_try_pop(mmdbr_response_q)) != NULL) { + g_free((char *) response->mmdb_val.country_iso); + g_free((char *) response->mmdb_val.country); + g_free((char *) response->mmdb_val.city); + g_free((char *) response->mmdb_val.as_org); + g_free(response); + } } /** @@ -299,6 +333,10 @@ static void mmdb_resolve_start(void) { mmdbr_request_q = g_async_queue_new(); } + if (!mmdbr_response_q) { + mmdbr_response_q = g_async_queue_new(); + } + if (!mmdb_ipv4_map) { mmdb_ipv4_map = wmem_map_new(wmem_epan_scope(), g_direct_hash, g_direct_equal); } @@ -320,7 +358,7 @@ static void mmdb_resolve_start(void) { return; } - mmdb_resolve_stop(TRUE); + mmdb_resolve_stop(); if (mmdb_file_arr->len == 0) { MMDB_DEBUG("no GeoIP databases found"); @@ -337,7 +375,6 @@ static void mmdb_resolve_start(void) { g_ptr_array_add(args, NULL); ws_pipe_init(&mmdbr_pipe); - mmdbr_stdout = NULL; GPid pipe_pid = ws_pipe_spawn_async(&mmdbr_pipe, args); MMDB_DEBUG("spawned %s pid %d", mmdbresolve, pipe_pid); @@ -353,11 +390,8 @@ static void mmdb_resolve_start(void) { return; } - // XXX Should we set O_NONBLOCK similar to dumpcap? - mmdbr_stdout = ws_fdopen(mmdbr_pipe.stdout_fd, "r"); - setvbuf(mmdbr_stdout, NULL, _IONBF, 0); - - mmdbr_thread = g_thread_new("write_mmdbr_stdin_worker", write_mmdbr_stdin_worker, NULL); + write_mmdbr_stdin_thread = g_thread_new("write_mmdbr_stdin_worker", write_mmdbr_stdin_worker, GINT_TO_POINTER(mmdbr_pipe.stdin_fd)); + read_mmdbr_stdout_thread = g_thread_new("read_mmdbr_stdout_worker", read_mmdbr_stdout_worker, GINT_TO_POINTER(mmdbr_pipe.stdout_fd)); } /** @@ -404,7 +438,7 @@ static void maxmind_db_path_free_cb(void* p) { static void maxmind_db_cleanup(void) { guint i; - mmdb_resolve_stop(TRUE); + mmdb_resolve_stop(); /* If we have old data, clear out the whole thing * and start again. TODO: Just update the ones that @@ -481,7 +515,7 @@ maxmind_db_pref_init(module_t *nameres) void maxmind_db_pref_cleanup(void) { - mmdb_resolve_stop(TRUE); + mmdb_resolve_stop(); } /** @@ -490,13 +524,43 @@ void maxmind_db_pref_cleanup(void) gboolean maxmind_db_lookup_process(void) { - if (mmdbr_pipe_valid()) { - read_mmdbr_stdout(); + gboolean new_entries = FALSE; + mmdb_response_t *response; + + while (mmdbr_response_q && (response = (mmdb_response_t *) g_async_queue_try_pop(mmdbr_response_q)) != NULL) { + mmdb_lookup_t *mmdb_val = (mmdb_lookup_t *) g_memdup(&response->mmdb_val, sizeof(mmdb_lookup_t)); + if (response->mmdb_val.country_iso) { + char *country_iso = (char *) response->mmdb_val.country_iso; + mmdb_val->country_iso = chunkify_string(country_iso); + g_free(country_iso); + } + if (response->mmdb_val.country) { + char *country = (char *) response->mmdb_val.country; + mmdb_val->country = chunkify_string(country); + g_free(country); + } + if (response->mmdb_val.city) { + char *city = (char *) response->mmdb_val.city; + mmdb_val->city = chunkify_string(city); + g_free(city); + } + if (response->mmdb_val.as_org) { + char *as_org = (char *) response->mmdb_val.as_org; + mmdb_val->as_org = chunkify_string(as_org); + g_free(as_org); + } + MMDB_DEBUG("popped response v4 %d city %s country %s", response->is_ipv4, mmdb_val->city, mmdb_val->country); + + if (response->is_ipv4) { + wmem_map_insert(mmdb_ipv4_map, GUINT_TO_POINTER(response->ipv4_addr), mmdb_val); + } else { + wmem_map_insert(mmdb_ipv6_map, chunkify_v6_addr(&response->ipv6_addr), mmdb_val); + } + new_entries = TRUE; + g_free(response); } - gboolean prev_ne = new_entries; - new_entries = FALSE; - return prev_ne; + return new_entries; } const mmdb_lookup_t * @@ -504,21 +568,15 @@ maxmind_db_lookup_ipv4(guint32 addr) { mmdb_lookup_t *result = (mmdb_lookup_t *) wmem_map_lookup(mmdb_ipv4_map, GUINT_TO_POINTER(addr)); if (!result) { - // Try again, mainly so that we empty our pipe buffers. - read_mmdbr_stdout(); - result = (mmdb_lookup_t *) wmem_map_lookup(mmdb_ipv4_map, GUINT_TO_POINTER(addr)); - } + result = &mmdb_not_found; + wmem_map_insert(mmdb_ipv4_map, GUINT_TO_POINTER(addr), result); - if (!result) { if (mmdbr_pipe_valid()) { char addr_str[WS_INET_ADDRSTRLEN]; ws_inet_ntop4(&addr, addr_str, WS_INET_ADDRSTRLEN); MMDB_DEBUG("looking up %s", addr_str); g_async_queue_push(mmdbr_request_q, g_strdup_printf("%s\n", addr_str)); } - - result = &mmdb_not_found; - wmem_map_insert(mmdb_ipv4_map, GUINT_TO_POINTER(addr), result); } return result; @@ -529,21 +587,15 @@ maxmind_db_lookup_ipv6(const ws_in6_addr *addr) { mmdb_lookup_t * result = (mmdb_lookup_t *) wmem_map_lookup(mmdb_ipv6_map, addr->bytes); if (!result) { - // Try again, mainly so that we empty our pipe buffers. - read_mmdbr_stdout(); - result = (mmdb_lookup_t *) wmem_map_lookup(mmdb_ipv6_map, addr->bytes); - } + result = &mmdb_not_found; + wmem_map_insert(mmdb_ipv6_map, chunkify_v6_addr(addr), result); - if (!result) { if (mmdbr_pipe_valid()) { char addr_str[WS_INET6_ADDRSTRLEN]; ws_inet_ntop6(addr, addr_str, WS_INET6_ADDRSTRLEN); MMDB_DEBUG("looking up %s", addr_str); g_async_queue_push(mmdbr_request_q, g_strdup_printf("%s\n", addr_str)); } - - result = &mmdb_not_found; - wmem_map_insert(mmdb_ipv6_map, chunkify_v6_addr(addr), result); } return result; |