diff --git a/src/dns.c b/src/dns.c index 3aca50986..f16d68186 100644 --- a/src/dns.c +++ b/src/dns.c @@ -493,147 +493,146 @@ static void dns_session_io_handler(struct appctx *appctx) HA_ATOMIC_INC(b_orig(buf) + ds->ofs); } - /* in this loop, ofs always points to the counter byte that precedes - * the message so that we can take our reference there if we have to - * stop before the end (ret=0). + /* we were already there, adjust the offset to be relative to + * the buffer's head and remove us from the counter. */ - if (sc_opposite(sc)->state == SC_ST_EST) { - /* we were already there, adjust the offset to be relative to - * the buffer's head and remove us from the counter. - */ - ofs = ds->ofs - b_head_ofs(buf); - if (ds->ofs < b_head_ofs(buf)) - ofs += b_size(buf); + ofs = ds->ofs - b_head_ofs(buf); + if (ds->ofs < b_head_ofs(buf)) + ofs += b_size(buf); - BUG_ON(ofs >= buf->size); - HA_ATOMIC_DEC(b_peek(buf, ofs)); + BUG_ON(ofs >= buf->size); + HA_ATOMIC_DEC(b_peek(buf, ofs)); - ret = 1; - while (ofs + 1 < b_data(buf)) { - struct dns_query *query; - uint16_t original_qid; - uint16_t new_qid; + /* in following loop, ofs always points to the counter byte that + * precedes the message so that we can take our reference there if we + * have to stop before the end (ret=0). + */ + ret = 1; + while (ofs + 1 < b_data(buf)) { + struct dns_query *query; + uint16_t original_qid; + uint16_t new_qid; - cnt = 1; - len = b_peek_varint(buf, ofs + cnt, &msg_len); - if (!len) - break; - cnt += len; - BUG_ON(msg_len + ofs + cnt + 1 > b_data(buf)); + cnt = 1; + len = b_peek_varint(buf, ofs + cnt, &msg_len); + if (!len) + break; + cnt += len; + BUG_ON(msg_len + ofs + cnt + 1 > b_data(buf)); - /* retrieve available room on output channel */ - available_room = channel_recv_max(sc_ic(sc)); + /* retrieve available room on output channel */ + available_room = channel_recv_max(sc_ic(sc)); - /* tx_msg_offset null means we are at the start of a new message */ - if (!ds->tx_msg_offset) { - uint16_t slen; + /* tx_msg_offset null means we are at the start of a new message */ + if (!ds->tx_msg_offset) { + uint16_t slen; - /* check if there is enough room to put message len and query id */ - if (available_room < sizeof(slen) + sizeof(new_qid)) { - sc_need_room(sc); - ret = 0; - break; - } - - /* put msg len into then channel */ - slen = (uint16_t)msg_len; - slen = htons(slen); - applet_putblk(appctx, (char *)&slen, sizeof(slen)); - available_room -= sizeof(slen); - - /* backup original query id */ - len = b_getblk(buf, (char *)&original_qid, sizeof(original_qid), ofs + cnt); - if (!len) { - /* should never happen since messages are atomically - * written into ring - */ - ret = 0; - break; - } - - /* generates new query id */ - new_qid = ++ds->query_counter; - new_qid = htons(new_qid); - - /* put new query id into the channel */ - applet_putblk(appctx, (char *)&new_qid, sizeof(new_qid)); - available_room -= sizeof(new_qid); - - /* keep query id mapping */ - - query = pool_alloc(dns_query_pool); - if (query) { - query->qid.key = new_qid; - query->original_qid = original_qid; - query->expire = tick_add(now_ms, 5000); - LIST_INIT(&query->list); - if (LIST_ISEMPTY(&ds->queries)) { - /* enable task to handle expire */ - ds->task_exp->expire = query->expire; - /* ensure this will be executed by the same - * thread than ds_session_release - * to ensure session_release is free - * to destroy the task */ - task_queue(ds->task_exp); - } - LIST_APPEND(&ds->queries, &query->list); - eb32_insert(&ds->query_ids, &query->qid); - ds->onfly_queries++; - } - - /* update the tx_offset to handle output in 16k streams */ - ds->tx_msg_offset = sizeof(original_qid); - - } - - /* check if it remains available room on output chan */ - if (unlikely(!available_room)) { + /* check if there is enough room to put message len and query id */ + if (available_room < sizeof(slen) + sizeof(new_qid)) { sc_need_room(sc); ret = 0; break; } - chunk_reset(&trash); - if ((msg_len - ds->tx_msg_offset) > available_room) { - /* remaining msg data is too large to be written in output channel at one time */ + /* put msg len into then channel */ + slen = (uint16_t)msg_len; + slen = htons(slen); + applet_putblk(appctx, (char *)&slen, sizeof(slen)); + available_room -= sizeof(slen); - len = b_getblk(buf, trash.area, available_room, ofs + cnt + ds->tx_msg_offset); - - /* update offset to complete mesg forwarding later */ - ds->tx_msg_offset += len; - } - else { - /* remaining msg data can be written in output channel at one time */ - len = b_getblk(buf, trash.area, msg_len - ds->tx_msg_offset, ofs + cnt + ds->tx_msg_offset); - - /* reset tx_msg_offset to mark forward fully processed */ - ds->tx_msg_offset = 0; - } - trash.data += len; - - if (applet_putchk(appctx, &trash) == -1) { - /* should never happen since we - * check available_room is large - * enough here. + /* backup original query id */ + len = b_getblk(buf, (char *)&original_qid, sizeof(original_qid), ofs + cnt); + if (!len) { + /* should never happen since messages are atomically + * written into ring */ ret = 0; break; } - if (ds->tx_msg_offset) { - /* msg was not fully processed, we must be awake to drain pending data */ + /* generates new query id */ + new_qid = ++ds->query_counter; + new_qid = htons(new_qid); - sc_need_room(sc); - ret = 0; - break; + /* put new query id into the channel */ + applet_putblk(appctx, (char *)&new_qid, sizeof(new_qid)); + available_room -= sizeof(new_qid); + + /* keep query id mapping */ + + query = pool_alloc(dns_query_pool); + if (query) { + query->qid.key = new_qid; + query->original_qid = original_qid; + query->expire = tick_add(now_ms, 5000); + LIST_INIT(&query->list); + if (LIST_ISEMPTY(&ds->queries)) { + /* enable task to handle expire */ + ds->task_exp->expire = query->expire; + /* ensure this will be executed by the same + * thread than ds_session_release + * to ensure session_release is free + * to destroy the task */ + task_queue(ds->task_exp); + } + LIST_APPEND(&ds->queries, &query->list); + eb32_insert(&ds->query_ids, &query->qid); + ds->onfly_queries++; } - /* switch to next message */ - ofs += cnt + msg_len; + + /* update the tx_offset to handle output in 16k streams */ + ds->tx_msg_offset = sizeof(original_qid); + } - HA_ATOMIC_INC(b_peek(buf, ofs)); - ds->ofs = b_peek_ofs(buf, ofs); + /* check if it remains available room on output chan */ + if (unlikely(!available_room)) { + sc_need_room(sc); + ret = 0; + break; + } + + chunk_reset(&trash); + if ((msg_len - ds->tx_msg_offset) > available_room) { + /* remaining msg data is too large to be written in output channel at one time */ + + len = b_getblk(buf, trash.area, available_room, ofs + cnt + ds->tx_msg_offset); + + /* update offset to complete mesg forwarding later */ + ds->tx_msg_offset += len; + } + else { + /* remaining msg data can be written in output channel at one time */ + len = b_getblk(buf, trash.area, msg_len - ds->tx_msg_offset, ofs + cnt + ds->tx_msg_offset); + + /* reset tx_msg_offset to mark forward fully processed */ + ds->tx_msg_offset = 0; + } + trash.data += len; + + if (applet_putchk(appctx, &trash) == -1) { + /* should never happen since we + * check available_room is large + * enough here. + */ + ret = 0; + break; + } + + if (ds->tx_msg_offset) { + /* msg was not fully processed, we must be awake to drain pending data */ + + sc_need_room(sc); + ret = 0; + break; + } + /* switch to next message */ + ofs += cnt + msg_len; } + + HA_ATOMIC_INC(b_peek(buf, ofs)); + ds->ofs = b_peek_ofs(buf, ofs); + HA_RWLOCK_RDUNLOCK(DNS_LOCK, &ring->lock); if (ret) {