diff --git a/src/ibrowse_http_client.erl b/src/ibrowse_http_client.erl index 1bb95d2..c9161b0 100644 --- a/src/ibrowse_http_client.erl +++ b/src/ibrowse_http_client.erl @@ -762,11 +762,10 @@ send_req_1(From, {ok, _Sent_body} -> trace_request_body(Body_1), _ = active_once(State_1), - State_1_1 = inc_pipeline_counter(State_1), - State_2 = State_1_1#state{status = get_header, - cur_req = NewReq, - proxy_tunnel_setup = in_progress, - tunnel_setup_queue = [{From, Url, Headers, Method, Body, Options, Timeout}]}, + State_2 = State_1#state{status = get_header, + cur_req = NewReq, + proxy_tunnel_setup = in_progress, + tunnel_setup_queue = [{From, Url, Headers, Method, Body, Options, Timeout}]}, State_3 = set_inac_timer(State_2), {noreply, State_3}; Err -> @@ -853,15 +852,14 @@ send_req_1(From, Raw_req = list_to_binary([Req, Sent_body]), NewReq_1 = NewReq#request{raw_req = Raw_req}, State_1 = State#state{reqs=queue:in(NewReq_1, State#state.reqs)}, - State_2 = inc_pipeline_counter(State_1), - _ = active_once(State_2), - State_3 = case Status of + _ = active_once(State_1), + State_2 = case Status of idle -> - State_2#state{ + State_1#state{ status = get_header, cur_req = NewReq_1}; _ -> - State_2 + State_1 end, case StreamTo of undefined -> @@ -875,8 +873,8 @@ send_req_1(From, catch StreamTo ! {ibrowse_async_raw_req, Raw_req} end end, - State_4 = set_inac_timer(State_3), - {noreply, State_4}; + State_3 = set_inac_timer(State_2), + {noreply, State_3}; Err -> shutting_down(State), do_trace("Send failed... Reason: ~p~n", [Err]), @@ -1815,13 +1813,13 @@ format_response_data(Resp_format, Body) -> do_reply(State, From, undefined, _, Resp_format, {ok, St_code, Headers, Body}) -> Msg_1 = {ok, St_code, Headers, format_response_data(Resp_format, Body)}, gen_server:reply(From, Msg_1), - dec_pipeline_counter(State); + report_request_complete(State); do_reply(State, From, undefined, _, _, Msg) -> gen_server:reply(From, Msg), - dec_pipeline_counter(State); + report_request_complete(State); do_reply(#state{prev_req_id = Prev_req_id} = State, _From, StreamTo, ReqId, Resp_format, {ok, _, _, Body}) -> - State_1 = dec_pipeline_counter(State), + State_1 = report_request_complete(State), case Body of [] -> ok; @@ -1843,7 +1841,7 @@ do_reply(#state{prev_req_id = Prev_req_id} = State, ets:delete(?STREAM_TABLE, {req_id_pid, Prev_req_id}), State_1#state{prev_req_id = ReqId}; do_reply(State, _From, StreamTo, ReqId, Resp_format, Msg) -> - State_1 = dec_pipeline_counter(State), + State_1 = report_request_complete(State), Msg_1 = format_response_data(Resp_format, Msg), catch StreamTo ! {ibrowse_async_response, ReqId, Msg_1}, State_1. @@ -1946,19 +1944,11 @@ shutting_down(#state{lb_ets_tid = undefined}) -> shutting_down(#state{lb_ets_tid = Tid}) -> ibrowse_lb:report_connection_down(Tid). -inc_pipeline_counter(#state{is_closing = true} = State) -> - State; -inc_pipeline_counter(#state{lb_ets_tid = undefined} = State) -> - State; -inc_pipeline_counter(#state{lb_ets_tid = Tid} = State) -> - ibrowse_lb:report_request_underway(Tid), - State. - -dec_pipeline_counter(#state{is_closing = true} = State) -> +report_request_complete(#state{is_closing = true} = State) -> State; -dec_pipeline_counter(#state{lb_ets_tid = undefined} = State) -> +report_request_complete(#state{lb_ets_tid = undefined} = State) -> State; -dec_pipeline_counter(#state{lb_ets_tid = Tid} = State) -> +report_request_complete(#state{lb_ets_tid = Tid} = State) -> ibrowse_lb:report_request_complete(Tid), State. diff --git a/src/ibrowse_lb.erl b/src/ibrowse_lb.erl index cc067fc..3d487d4 100644 --- a/src/ibrowse_lb.erl +++ b/src/ibrowse_lb.erl @@ -16,7 +16,6 @@ spawn_connection/6, stop/1, report_connection_down/1, - report_request_underway/1, report_request_complete/1 ]). @@ -39,6 +38,9 @@ proc_state}). -define(PIPELINE_MAX, 99999). +-define(KEY_MATCHSPEC_BY_PID(Pid), [{{{'_', '_', Pid}, '_'}, [], ['$_']}]). +-define(KEY_MATCHSPEC(Key), [{{Key, '_'}, [], ['$_']}]). +-define(KEY_MATCHSPEC_FOR_DELETE(Key), [{{Key, '_'}, [], [true]}]). -include("ibrowse.hrl"). @@ -74,13 +76,23 @@ stop(Lb_pid) -> end. report_connection_down(Tid) -> - catch ets:delete(Tid, self()). - -report_request_underway(Tid) -> - catch ets:update_counter(Tid, self(), {2, 1, ?PIPELINE_MAX, ?PIPELINE_MAX}). + %% Don't cascade errors since Tid is really managed by other process + catch ets:select_delete(Tid, ?KEY_MATCHSPEC_BY_PID(self())). report_request_complete(Tid) -> - catch ets:update_counter(Tid, self(), {2, -1, 0, 0}). + %% Don't cascade errors since Tid is really managed by other process + catch case ets:select(Tid, ?KEY_MATCHSPEC_BY_PID(self())) of + [MatchKey] -> + case ets:select_delete(Tid, ?KEY_MATCHSPEC_FOR_DELETE(MatchKey)) of + 1 -> + ets:insert(Tid, {decremented(MatchKey), undefined}), + true; + _ -> + false + end; + _ -> + false + end. %%==================================================================== %% Server functions @@ -210,23 +222,17 @@ code_change(_OldVsn, State, _Extra) -> %%-------------------------------------------------------------------- %%% Internal functions %%-------------------------------------------------------------------- -find_best_connection(Tid, Max_pipe) -> - find_best_connection(ets:first(Tid), Tid, Max_pipe). - -find_best_connection('$end_of_table', _, _) -> - {error, retry_later}; -find_best_connection(Pid, Tid, Max_pipe) -> - case ets:lookup(Tid, Pid) of - [{Pid, Cur_sz}] when Cur_sz < Max_pipe -> - case record_request_for_connection(Tid, Pid) of - {'EXIT', _} -> - %% The selected process has shutdown - find_best_connection(ets:next(Tid, Pid), Tid, Max_pipe); - _ -> - {ok, Pid} +find_best_connection(Tid, Max_pipeline_size) -> + case ets:first(Tid) of + {Size, _Timestamp, Pid} = Key when Size < Max_pipeline_size -> + case record_request_for_connection(Tid, Key) of + true -> + {ok, Pid}; + false -> + find_best_connection(Tid, Max_pipeline_size) end; - _ -> - find_best_connection(ets:next(Tid, Pid), Tid, Max_pipe) + _ -> + {error, retry_later} end. maybe_create_ets(#state{ets_tid = undefined} = State) -> @@ -240,10 +246,25 @@ num_current_connections(Tid) -> catch ets:info(Tid, size). record_new_connection(Tid, Pid) -> - catch ets:insert(Tid, {Pid, 0}). + catch ets:insert(Tid, {new_key(Pid), undefined}). + +record_request_for_connection(Tid, Key) -> + case ets:select_delete(Tid, ?KEY_MATCHSPEC_FOR_DELETE(Key)) of + 1 -> + ets:insert(Tid, {incremented(Key), undefined}), + true; + _ -> + false + end. + +new_key(Pid) -> + {1, os:timestamp(), Pid}. + +incremented({Size, Timestamp, Pid}) -> + {Size + 1, Timestamp, Pid}. -record_request_for_connection(Tid, Pid) -> - catch ets:update_counter(Tid, Pid, {2, 1, ?PIPELINE_MAX, ?PIPELINE_MAX}). +decremented({Size, _Timestamp, Pid}) -> + {Size - 1, os:timestamp(), Pid}. for_each_connection_pid(Tid, Fun) -> catch ets:foldl(fun({Pid, _}, _) -> Fun(Pid) end, undefined, Tid), diff --git a/test/ibrowse_functional_tests.erl b/test/ibrowse_functional_tests.erl index 0a68e14..fc7afec 100644 --- a/test/ibrowse_functional_tests.erl +++ b/test/ibrowse_functional_tests.erl @@ -56,15 +56,10 @@ balanced_connections() -> timer:sleep(1000), - Diffs = [Count - BalancedNumberOfRequestsPerConnection || {_Pid, Count} <- ibrowse_test_server:get_conn_pipeline_depth()], - ?assertEqual(MaxSessions, length(Diffs)), + Counts = [Count || {_Pid, Count} <- ibrowse_test_server:get_conn_pipeline_depth()], + ?assertEqual(MaxSessions, length(Counts)), - lists:foreach(fun(X) -> ?assertEqual(yep, close_to_zero(X)) end, Diffs). - -close_to_zero(0) -> yep; -close_to_zero(-1) -> yep; -close_to_zero(1) -> yep; -close_to_zero(X) -> {nope, X}. + ?assertEqual(lists:duplicate(MaxSessions, BalancedNumberOfRequestsPerConnection), Counts). times(0, _) -> ok;