From 13d42a094583fbcc6c750ee1a0a34e0075b6c7bc Mon Sep 17 00:00:00 2001 From: Johan Hedberg Date: Thu, 30 Jun 2011 00:26:19 +0300 Subject: [PATCH] gobex: Add request cancellation support --- gobex/gobex-defs.h | 1 + gobex/gobex.c | 99 ++++++++++++++++++++++++++++--- gobex/gobex.h | 3 +- unit/test-gobex.c | 143 +++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 236 insertions(+), 10 deletions(-) diff --git a/gobex/gobex-defs.h b/gobex/gobex-defs.h index 6d7e37964..867307749 100644 --- a/gobex/gobex-defs.h +++ b/gobex/gobex-defs.h @@ -35,6 +35,7 @@ typedef enum { G_OBEX_ERROR_INVALID_ARGS, G_OBEX_ERROR_DISCONNECTED, G_OBEX_ERROR_TIMEOUT, + G_OBEX_ERROR_CANCELLED, G_OBEX_ERROR_FAILED, } GObexError; diff --git a/gobex/gobex.c b/gobex/gobex.c index 1556629dc..de2fba20e 100644 --- a/gobex/gobex.c +++ b/gobex/gobex.c @@ -63,11 +63,13 @@ struct _GObex { struct pending_pkt { guint id; + GObex *obex; GObexPacket *pkt; guint timeout; guint timeout_id; GObexResponseFunc rsp_func; gpointer rsp_data; + gboolean cancelled; }; struct connect_data { @@ -118,6 +120,8 @@ static ssize_t rsp_header_offset(guint8 opcode) static void pending_pkt_free(struct pending_pkt *p) { + if (p->obex != NULL) + g_obex_unref(p->obex); g_obex_packet_free(p->pkt); g_free(p); } @@ -244,15 +248,13 @@ static gboolean g_obex_send_internal(GObex *obex, struct pending_pkt *p, return FALSE; } - g_queue_push_tail(obex->tx_queue, p); - - if (g_queue_get_length(obex->tx_queue) > 1) - return TRUE; - - if (p->id > 0 && obex->pending_req != NULL) - return TRUE; + if (g_obex_packet_get_operation(p->pkt, NULL) == G_OBEX_OP_ABORT) + g_queue_push_head(obex->tx_queue, p); + else + g_queue_push_tail(obex->tx_queue, p); - enable_tx(obex); + if (obex->pending_req == NULL || p->id == 0) + enable_tx(obex); return TRUE; } @@ -305,8 +307,80 @@ guint g_obex_send_req(GObex *obex, GObexPacket *req, gint timeout, return p->id; } -gboolean g_obex_cancel_req(GObex *obex, guint req_id) +static gint pending_pkt_cmp(gconstpointer a, gconstpointer b) { + const struct pending_pkt *p = a; + guint id = GPOINTER_TO_INT(b); + + return (p->id - id); +} + +static gboolean pending_req_abort(GObex *obex, GError **err) +{ + GObexPacket *pkt; + + if (obex->pending_req->cancelled) + return TRUE; + + obex->pending_req->cancelled = TRUE; + + pkt = g_obex_packet_new(G_OBEX_OP_ABORT, TRUE); + + return g_obex_send(obex, pkt, err); +} + +static gboolean cancel_complete(gpointer user_data) +{ + struct pending_pkt *p = user_data; + GObex *obex = p->obex; + GError *err; + + g_assert(p->rsp_func != NULL); + + err = g_error_new(G_OBEX_ERROR, G_OBEX_ERROR_CANCELLED, + "The request was cancelled"); + p->rsp_func(obex, err, NULL, p->rsp_data); + + g_error_free(err); + + pending_pkt_free(p); + + return FALSE; +} + +gboolean g_obex_cancel_req(GObex *obex, guint req_id, gboolean remove_callback) +{ + GList *match; + struct pending_pkt *p; + + if (obex->pending_req && obex->pending_req->id == req_id) { + if (!pending_req_abort(obex, NULL)) { + p = obex->pending_req; + obex->pending_req = NULL; + goto immediate_completion; + } + + return TRUE; + } + + match = g_queue_find_custom(obex->tx_queue, GINT_TO_POINTER(req_id), + pending_pkt_cmp); + if (match == NULL) + return FALSE; + + p = match->data; + + g_queue_delete_link(obex->tx_queue, match); + +immediate_completion: + p->cancelled = TRUE; + p->obex = g_obex_ref(obex); + + if (remove_callback || p->rsp_func == NULL) + pending_pkt_free(p); + else + g_idle_add(cancel_complete, p); + return TRUE; } @@ -343,9 +417,16 @@ static void handle_response(GObex *obex, GError *err, GObexPacket *rsp) parse_connect_data(obex, rsp); } + if (p->cancelled) + err = g_error_new(G_OBEX_ERROR, G_OBEX_ERROR_CANCELLED, + "The operation was cancelled"); + if (p->rsp_func) p->rsp_func(obex, err, rsp, p->rsp_data); + if (p->cancelled) + g_error_free(err); + pending_pkt_free(p); obex->pending_req = NULL; diff --git a/gobex/gobex.h b/gobex/gobex.h index 59cec801b..4bf7b1df5 100644 --- a/gobex/gobex.h +++ b/gobex/gobex.h @@ -43,7 +43,8 @@ gboolean g_obex_send(GObex *obex, GObexPacket *pkt, GError **err); guint g_obex_send_req(GObex *obex, GObexPacket *req, gint timeout, GObexResponseFunc func, gpointer user_data, GError **err); -gboolean g_obex_cancel_req(GObex *obex, guint req_id); +gboolean g_obex_cancel_req(GObex *obex, guint req_id, + gboolean remove_callback); void g_obex_set_event_function(GObex *obex, GObexEventFunc func, gpointer user_data); diff --git a/unit/test-gobex.c b/unit/test-gobex.c index 764da28fc..3bbc65ced 100644 --- a/unit/test-gobex.c +++ b/unit/test-gobex.c @@ -41,6 +41,7 @@ static uint8_t pkt_connect_rsp[] = { 0x10 | FINAL_BIT, 0x00, 0x07, 0x10, 0x00, 0x10, 0x00 }; static uint8_t pkt_nval_connect_rsp[] = { 0x10 | FINAL_BIT, 0x00, 0x05, 0x10, 0x00, }; +static uint8_t pkt_abort_rsp[] = { 0x90, 0x00, 0x03 }; static gboolean test_timeout(gpointer user_data) { @@ -130,6 +131,11 @@ static void create_endpoints(GObex **obex, GIOChannel **io, int sock_type) *obex = create_gobex(sv[0], transport_type, TRUE); g_assert(*obex != NULL); + if (io == NULL) { + close(sv[1]); + return; + } + *io = g_io_channel_unix_new(sv[1]); g_assert(*io != NULL); @@ -315,6 +321,138 @@ static void test_send_connect_req_timeout_stream(void) send_connect(timeout_rsp, send_nothing, 1); } +struct req_info { + GObex *obex; + guint id; + GError *err; +}; + +static void req_done(GObex *obex, GError *err, GObexPacket *rsp, + gpointer user_data) +{ + struct req_info *r = user_data; + + if (!g_error_matches(err, G_OBEX_ERROR, G_OBEX_ERROR_CANCELLED)) + g_set_error(&r->err, TEST_ERROR, TEST_ERROR_UNEXPECTED, + "Did not get expected cancelled error"); + + g_main_loop_quit(mainloop); +} + +static void test_cancel_req_immediate(void) +{ + GObexPacket *req; + struct req_info r; + gboolean ret; + + create_endpoints(&r.obex, NULL, SOCK_STREAM); + + r.err = NULL; + + req = g_obex_packet_new(G_OBEX_OP_PUT, TRUE); + r.id = g_obex_send_req(r.obex, req, -1, req_done, &r, &r.err); + g_assert_no_error(r.err); + g_assert(r.id != 0); + + ret = g_obex_cancel_req(r.obex, r.id, FALSE); + g_assert(ret == TRUE); + + mainloop = g_main_loop_new(NULL, FALSE); + + g_main_loop_run(mainloop); + + g_assert_no_error(r.err); + + g_obex_unref(r.obex); + g_main_loop_unref(mainloop); +} + +static gboolean cancel_server(GIOChannel *io, GIOCondition cond, + gpointer user_data) +{ + struct req_info *r = user_data; + GIOStatus status; + gsize bytes_written, rbytes; + char buf[255]; + + status = g_io_channel_read_chars(io, buf, sizeof(buf), &rbytes, NULL); + if (status != G_IO_STATUS_NORMAL) { + g_set_error(&r->err, TEST_ERROR, TEST_ERROR_UNEXPECTED, + "Reading data failed with status %d", status); + goto failed; + } + + if (rbytes < 3) { + g_set_error(&r->err, TEST_ERROR, TEST_ERROR_UNEXPECTED, + "Not enough data from socket"); + goto failed; + } + + if ((uint8_t) buf[0] == (G_OBEX_OP_PUT | FINAL_BIT)) { + if (!g_obex_cancel_req(r->obex, r->id, FALSE)) { + g_set_error(&r->err, TEST_ERROR, TEST_ERROR_UNEXPECTED, + "Cancelling request failed"); + goto failed; + } + return TRUE; + } + + if ((uint8_t) buf[0] != (G_OBEX_OP_ABORT | FINAL_BIT)) { + g_set_error(&r->err, TEST_ERROR, TEST_ERROR_UNEXPECTED, + "Neither Put nor Abort packet received"); + goto failed; + } + + g_io_channel_write_chars(io, (gchar *) pkt_abort_rsp, + sizeof(pkt_abort_rsp), &bytes_written, NULL); + if (bytes_written != sizeof(pkt_abort_rsp)) { + g_set_error(&r->err, TEST_ERROR, TEST_ERROR_UNEXPECTED, + "Unable to write to socket"); + goto failed; + } + + return TRUE; + +failed: + g_main_loop_quit(mainloop); + return FALSE; +} + +static void test_cancel_req_delay(void) +{ + GIOChannel *io; + guint io_id, timer_id; + struct req_info r; + GObexPacket *req; + GIOCondition cond; + + create_endpoints(&r.obex, &io, SOCK_STREAM); + + r.err = NULL; + + req = g_obex_packet_new(G_OBEX_OP_PUT, TRUE); + r.id = g_obex_send_req(r.obex, req, -1, req_done, &r, &r.err); + g_assert_no_error(r.err); + g_assert(r.id != 0); + + mainloop = g_main_loop_new(NULL, FALSE); + + cond = G_IO_IN | G_IO_HUP | G_IO_ERR | G_IO_NVAL; + io_id = g_io_add_watch(io, cond, cancel_server, &r); + + timer_id = g_timeout_add_seconds(2, test_timeout, &r.err); + + g_main_loop_run(mainloop); + + g_assert_no_error(r.err); + + g_source_remove(timer_id); + g_io_channel_unref(io); + g_source_remove(io_id); + g_obex_unref(r.obex); + g_main_loop_unref(mainloop); +} + static void test_send_connect_stream(void) { guint8 connect_data[] = { 0x10, 0x00, 0x10, 0x00 }; @@ -503,6 +641,11 @@ int main(int argc, char *argv[]) g_test_add_func("/gobex/test_send_connect_req_timeout_stream", test_send_connect_req_timeout_stream); + g_test_add_func("/gobex/test_cancel_req_immediate", + test_cancel_req_immediate); + g_test_add_func("/gobex/test_cancel_req_delay", + test_cancel_req_delay); + g_test_run(); return 0; -- 2.47.3