diff options
Diffstat (limited to 'net/colo-compare.c')
-rw-r--r-- | net/colo-compare.c | 189 |
1 files changed, 108 insertions, 81 deletions
diff --git a/net/colo-compare.c b/net/colo-compare.c index 162fd6a570..282727b28a 100644 --- a/net/colo-compare.c +++ b/net/colo-compare.c @@ -83,9 +83,9 @@ typedef struct CompareState { GHashTable *connection_track_table; /* compare thread, a thread for each NIC */ QemuThread thread; - /* Timer used on the primary to find packets that are never matched */ - QEMUTimer *timer; - QemuMutex timer_check_lock; + + GMainContext *worker_context; + GMainLoop *compare_loop; } CompareState; typedef struct CompareClass { @@ -180,7 +180,7 @@ static int packet_enqueue(CompareState *s, int mode) * return: 0 means packet same * > 0 || < 0 means packet different */ -static int colo_packet_compare(Packet *ppkt, Packet *spkt) +static int colo_packet_compare_common(Packet *ppkt, Packet *spkt, int offset) { trace_colo_compare_ip_info(ppkt->size, inet_ntoa(ppkt->ip->ip_src), inet_ntoa(ppkt->ip->ip_dst), spkt->size, @@ -188,8 +188,10 @@ static int colo_packet_compare(Packet *ppkt, Packet *spkt) inet_ntoa(spkt->ip->ip_dst)); if (ppkt->size == spkt->size) { - return memcmp(ppkt->data, spkt->data, spkt->size); + return memcmp(ppkt->data + offset, spkt->data + offset, + spkt->size - offset); } else { + trace_colo_compare_main("Net packet size are not the same"); return -1; } } @@ -205,12 +207,6 @@ static int colo_packet_compare_tcp(Packet *spkt, Packet *ppkt) int res; trace_colo_compare_main("compare tcp"); - if (ppkt->size != spkt->size) { - if (trace_event_get_state(TRACE_COLO_COMPARE_MISCOMPARE)) { - trace_colo_compare_main("pkt size not same"); - } - return -1; - } ptcp = (struct tcphdr *)ppkt->transport_header; stcp = (struct tcphdr *)spkt->transport_header; @@ -229,8 +225,11 @@ static int colo_packet_compare_tcp(Packet *spkt, Packet *ppkt) spkt->ip->ip_sum = ppkt->ip->ip_sum; } - res = memcmp(ppkt->data + ETH_HLEN, spkt->data + ETH_HLEN, - (spkt->size - ETH_HLEN)); + if (ptcp->th_sum == stcp->th_sum) { + res = colo_packet_compare_common(ppkt, spkt, ETH_HLEN); + } else { + res = -1; + } if (res != 0 && trace_event_get_state(TRACE_COLO_COMPARE_MISCOMPARE)) { trace_colo_compare_pkt_info_src(inet_ntoa(ppkt->ip->ip_src), @@ -261,15 +260,32 @@ static int colo_packet_compare_tcp(Packet *spkt, Packet *ppkt) static int colo_packet_compare_udp(Packet *spkt, Packet *ppkt) { int ret; + int network_header_length = ppkt->ip->ip_hl * 4; trace_colo_compare_main("compare udp"); - ret = colo_packet_compare(ppkt, spkt); + + /* + * Because of ppkt and spkt are both in the same connection, + * The ppkt's src ip, dst ip, src port, dst port, ip_proto all are + * same with spkt. In addition, IP header's Identification is a random + * field, we can handle it in IP fragmentation function later. + * COLO just concern the response net packet payload from primary guest + * and secondary guest are same or not, So we ignored all IP header include + * other field like TOS,TTL,IP Checksum. we only need to compare + * the ip payload here. + */ + ret = colo_packet_compare_common(ppkt, spkt, + network_header_length + ETH_HLEN); if (ret) { trace_colo_compare_udp_miscompare("primary pkt size", ppkt->size); - qemu_hexdump((char *)ppkt->data, stderr, "colo-compare", ppkt->size); trace_colo_compare_udp_miscompare("Secondary pkt size", spkt->size); - qemu_hexdump((char *)spkt->data, stderr, "colo-compare", spkt->size); + if (trace_event_get_state(TRACE_COLO_COMPARE_MISCOMPARE)) { + qemu_hexdump((char *)ppkt->data, stderr, "colo-compare pri pkt", + ppkt->size); + qemu_hexdump((char *)spkt->data, stderr, "colo-compare sec pkt", + spkt->size); + } } return ret; @@ -281,24 +297,32 @@ static int colo_packet_compare_udp(Packet *spkt, Packet *ppkt) */ static int colo_packet_compare_icmp(Packet *spkt, Packet *ppkt) { - int network_length; + int network_header_length = ppkt->ip->ip_hl * 4; trace_colo_compare_main("compare icmp"); - network_length = ppkt->ip->ip_hl * 4; - if (ppkt->size != spkt->size || - ppkt->size < network_length + ETH_HLEN) { - return -1; - } - if (colo_packet_compare(ppkt, spkt)) { + /* + * Because of ppkt and spkt are both in the same connection, + * The ppkt's src ip, dst ip, src port, dst port, ip_proto all are + * same with spkt. In addition, IP header's Identification is a random + * field, we can handle it in IP fragmentation function later. + * COLO just concern the response net packet payload from primary guest + * and secondary guest are same or not, So we ignored all IP header include + * other field like TOS,TTL,IP Checksum. we only need to compare + * the ip payload here. + */ + if (colo_packet_compare_common(ppkt, spkt, + network_header_length + ETH_HLEN)) { trace_colo_compare_icmp_miscompare("primary pkt size", ppkt->size); - qemu_hexdump((char *)ppkt->data, stderr, "colo-compare", - ppkt->size); trace_colo_compare_icmp_miscompare("Secondary pkt size", spkt->size); - qemu_hexdump((char *)spkt->data, stderr, "colo-compare", - spkt->size); + if (trace_event_get_state(TRACE_COLO_COMPARE_MISCOMPARE)) { + qemu_hexdump((char *)ppkt->data, stderr, "colo-compare pri pkt", + ppkt->size); + qemu_hexdump((char *)spkt->data, stderr, "colo-compare sec pkt", + spkt->size); + } return -1; } else { return 0; @@ -316,7 +340,7 @@ static int colo_packet_compare_other(Packet *spkt, Packet *ppkt) inet_ntoa(ppkt->ip->ip_dst), spkt->size, inet_ntoa(spkt->ip->ip_src), inet_ntoa(spkt->ip->ip_dst)); - return colo_packet_compare(ppkt, spkt); + return colo_packet_compare_common(ppkt, spkt, 0); } static int colo_old_packet_check_one(Packet *pkt, int64_t *check_time) @@ -374,9 +398,7 @@ static void colo_compare_connection(void *opaque, void *user_data) while (!g_queue_is_empty(&conn->primary_list) && !g_queue_is_empty(&conn->secondary_list)) { - qemu_mutex_lock(&s->timer_check_lock); pkt = g_queue_pop_tail(&conn->primary_list); - qemu_mutex_unlock(&s->timer_check_lock); switch (conn->ip_proto) { case IPPROTO_TCP: result = g_queue_find_custom(&conn->secondary_list, @@ -411,9 +433,7 @@ static void colo_compare_connection(void *opaque, void *user_data) * until next comparison. */ trace_colo_compare_main("packet different"); - qemu_mutex_lock(&s->timer_check_lock); g_queue_push_tail(&conn->primary_list, pkt); - qemu_mutex_unlock(&s->timer_check_lock); /* TODO: colo_notify_checkpoint();*/ break; } @@ -486,25 +506,45 @@ static void compare_sec_chr_in(void *opaque, const uint8_t *buf, int size) } } +/* + * Check old packet regularly so it can watch for any packets + * that the secondary hasn't produced equivalents of. + */ +static gboolean check_old_packet_regular(void *opaque) +{ + CompareState *s = opaque; + + /* if have old packet we will notify checkpoint */ + colo_old_packet_check(s); + + return TRUE; +} + static void *colo_compare_thread(void *opaque) { - GMainContext *worker_context; - GMainLoop *compare_loop; CompareState *s = opaque; + GSource *timeout_source; - worker_context = g_main_context_new(); + s->worker_context = g_main_context_new(); qemu_chr_fe_set_handlers(&s->chr_pri_in, compare_chr_can_read, - compare_pri_chr_in, NULL, s, worker_context, true); + compare_pri_chr_in, NULL, s, s->worker_context, true); qemu_chr_fe_set_handlers(&s->chr_sec_in, compare_chr_can_read, - compare_sec_chr_in, NULL, s, worker_context, true); + compare_sec_chr_in, NULL, s, s->worker_context, true); + + s->compare_loop = g_main_loop_new(s->worker_context, FALSE); - compare_loop = g_main_loop_new(worker_context, FALSE); + /* To kick any packets that the secondary doesn't match */ + timeout_source = g_timeout_source_new(REGULAR_PACKET_CHECK_MS); + g_source_set_callback(timeout_source, + (GSourceFunc)check_old_packet_regular, s, NULL); + g_source_attach(timeout_source, s->worker_context); - g_main_loop_run(compare_loop); + g_main_loop_run(s->compare_loop); - g_main_loop_unref(compare_loop); - g_main_context_unref(worker_context); + g_source_unref(timeout_source); + g_main_loop_unref(s->compare_loop); + g_main_context_unref(s->worker_context); return NULL; } @@ -604,26 +644,6 @@ static int find_and_check_chardev(Chardev **chr, } /* - * Check old packet regularly so it can watch for any packets - * that the secondary hasn't produced equivalents of. - */ -static void check_old_packet_regular(void *opaque) -{ - CompareState *s = opaque; - - timer_mod(s->timer, qemu_clock_get_ms(QEMU_CLOCK_VIRTUAL) + - REGULAR_PACKET_CHECK_MS); - /* if have old packet we will notify checkpoint */ - /* - * TODO: Make timer handler run in compare thread - * like qemu_chr_add_handlers_full. - */ - qemu_mutex_lock(&s->timer_check_lock); - colo_old_packet_check(s); - qemu_mutex_unlock(&s->timer_check_lock); -} - -/* * Called from the main thread on the primary * to setup colo-compare. */ @@ -665,7 +685,6 @@ static void colo_compare_complete(UserCreatable *uc, Error **errp) net_socket_rs_init(&s->sec_rs, compare_sec_rs_finalize); g_queue_init(&s->conn_list); - qemu_mutex_init(&s->timer_check_lock); s->connection_track_table = g_hash_table_new_full(connection_key_hash, connection_key_equal, @@ -678,15 +697,26 @@ static void colo_compare_complete(UserCreatable *uc, Error **errp) QEMU_THREAD_JOINABLE); compare_id++; - /* A regular timer to kick any packets that the secondary doesn't match */ - s->timer = timer_new_ms(QEMU_CLOCK_VIRTUAL, /* Only when guest runs */ - check_old_packet_regular, s); - timer_mod(s->timer, qemu_clock_get_ms(QEMU_CLOCK_VIRTUAL) + - REGULAR_PACKET_CHECK_MS); - return; } +static void colo_flush_packets(void *opaque, void *user_data) +{ + CompareState *s = user_data; + Connection *conn = opaque; + Packet *pkt = NULL; + + while (!g_queue_is_empty(&conn->primary_list)) { + pkt = g_queue_pop_head(&conn->primary_list); + compare_chr_send(&s->chr_out, pkt->data, pkt->size); + packet_destroy(pkt, NULL); + } + while (!g_queue_is_empty(&conn->secondary_list)) { + pkt = g_queue_pop_head(&conn->secondary_list); + packet_destroy(pkt, NULL); + } +} + static void colo_compare_class_init(ObjectClass *oc, void *data) { UserCreatableClass *ucc = USER_CREATABLE_CLASS(oc); @@ -711,24 +741,21 @@ static void colo_compare_finalize(Object *obj) { CompareState *s = COLO_COMPARE(obj); - qemu_chr_fe_deinit(&s->chr_pri_in); - qemu_chr_fe_deinit(&s->chr_sec_in); + qemu_chr_fe_set_handlers(&s->chr_pri_in, NULL, NULL, NULL, NULL, + s->worker_context, true); + qemu_chr_fe_set_handlers(&s->chr_sec_in, NULL, NULL, NULL, NULL, + s->worker_context, true); qemu_chr_fe_deinit(&s->chr_out); - g_queue_free(&s->conn_list); - - if (qemu_thread_is_self(&s->thread)) { - /* compare connection */ - g_queue_foreach(&s->conn_list, colo_compare_connection, s); - qemu_thread_join(&s->thread); - } + g_main_loop_quit(s->compare_loop); + qemu_thread_join(&s->thread); - if (s->timer) { - timer_del(s->timer); - } + /* Release all unhandled packets after compare thead exited */ + g_queue_foreach(&s->conn_list, colo_flush_packets, s); - qemu_mutex_destroy(&s->timer_check_lock); + g_queue_clear(&s->conn_list); + g_hash_table_destroy(s->connection_track_table); g_free(s->pri_indev); g_free(s->sec_indev); g_free(s->outdev); |