diff --git a/.gitignore b/.gitignore index cb2494d..11646f6 100644 --- a/.gitignore +++ b/.gitignore @@ -2,3 +2,5 @@ ebin/ *~ .eunit/ test/ +*beam + diff --git a/src/ibrowse.erl b/src/ibrowse.erl index 5f26d45..f706879 100644 --- a/src/ibrowse.erl +++ b/src/ibrowse.erl @@ -96,6 +96,7 @@ trace_off/2, all_trace_off/0, show_dest_status/0, + show_dest_status/1, show_dest_status/2 ]). @@ -353,15 +354,16 @@ try_routing_request(_, _, _, _, _, _, _, _, _, _, _) -> {error, retry_later}. merge_options(Host, Port, Options) -> - Config_options = get_config_value({options, Host, Port}, []), + Config_options = get_config_value({options, Host, Port}, []) ++ + get_config_value({options, global}, []), lists:foldl( fun({Key, Val}, Acc) -> - case lists:keysearch(Key, 1, Options) of - false -> - [{Key, Val} | Acc]; - _ -> - Acc - end + case lists:keysearch(Key, 1, Options) of + false -> + [{Key, Val} | Acc]; + _ -> + Acc + end end, Options, Config_options). get_lb_pid(Url) -> @@ -618,7 +620,11 @@ show_dest_status() -> ) end end, Dests). - + +show_dest_status(Url) -> + #url{host = Host, port = Port} = ibrowse_lib:parse_url(Url), + show_dest_status(Host, Port). + %% @doc Shows some internal information about load balancing to a %% specified Host:Port. Info about workers spawned using %% spawn_worker_process/2 or spawn_link_worker_process/2 is not @@ -637,19 +643,22 @@ show_dest_status(Host, Port) -> [] -> io:format("Couldn't locate ETS table for ~p~n", [Lb_pid]); [Tid | _] -> - First = ets:first(Tid), - Last = ets:last(Tid), - Size = ets:info(Tid, size), - io:format("LB ETS table id : ~p~n", [Tid]), - io:format("Num Connections : ~p~n", [Size]), - case Size of - 0 -> - ok; + Tid_rows = [{X, Y, Z} || {X, Y, Z} <- ets:tab2list(Tid), + is_pid(X), is_integer(Y)], + case Tid_rows of + [] -> + io:format("No active connections~n", []); _ -> - {First_p_sz, _} = First, - {Last_p_sz, _} = Last, - io:format("Smallest pipeline : ~1000.p~n", [First_p_sz]), - io:format("Largest pipeline : ~1000.p~n", [Last_p_sz]) + Tid_rows_sorted = lists:keysort(2, Tid_rows), + First = hd(Tid_rows_sorted), + Last = lists:last(Tid_rows_sorted), + Size = length(Tid_rows), + io:format("LB ETS table id : ~p~n", [Tid]), + io:format("Num Connections : ~p~n", [Size]), + {_, First_p_sz, First_speculative_sz} = First, + {_, Last_p_sz, Last_spec_sz} = Last, + io:format("Smallest pipeline : ~p:~p~n", [First_p_sz, First_speculative_sz]), + io:format("Largest pipeline : ~p:~p~n", [Last_p_sz, Last_spec_sz]) end end end. @@ -730,16 +739,26 @@ import_config(Filename) -> %% @doc Internal export get_config_value(Key) -> - [#ibrowse_conf{value = V}] = ets:lookup(ibrowse_conf, Key), - V. + try + [#ibrowse_conf{value = V}] = ets:lookup(ibrowse_conf, Key), + V + catch + error:badarg -> + throw({error, ibrowse_not_running}) + end. %% @doc Internal export get_config_value(Key, DefVal) -> - case ets:lookup(ibrowse_conf, Key) of - [] -> - DefVal; - [#ibrowse_conf{value = V}] -> - V + try + case ets:lookup(ibrowse_conf, Key) of + [] -> + DefVal; + [#ibrowse_conf{value = V}] -> + V + end + catch + error:badarg -> + throw({error, ibrowse_not_running}) end. set_config_value(Key, Val) -> diff --git a/src/ibrowse_http_client.erl b/src/ibrowse_http_client.erl index 70493c2..e634c4e 100644 --- a/src/ibrowse_http_client.erl +++ b/src/ibrowse_http_client.erl @@ -179,7 +179,6 @@ handle_cast(_Msg, State) -> %% {stop, Reason, State} (terminate/2 is called) %%-------------------------------------------------------------------- handle_info({tcp, _Sock, Data}, #state{status = Status} = State) -> -%% io:format("Recvd data: ~p~n", [Data]), do_trace("Data recvd in state: ~p. Size: ~p. ~p~n~n", [Status, size(Data), Data]), handle_sock_data(Data, State); handle_info({ssl, _Sock, Data}, State) -> @@ -187,7 +186,6 @@ handle_info({ssl, _Sock, Data}, State) -> handle_info({stream_next, Req_id}, #state{socket = Socket, cur_req = #request{req_id = Req_id}} = State) -> - %% io:format("Client process set {active, once}~n", []), do_setopts(Socket, [{active, once}], State), {noreply, set_inac_timer(State)}; @@ -198,8 +196,6 @@ handle_info({stream_next, _Req_id}, State) -> _ -> undefined end, -%% io:format("Ignoring stream_next as ~1000.p is not cur req (~1000.p)~n", -%% [_Req_id, _Cur_req_id]), {noreply, State}; handle_info({stream_close, _Req_id}, State) -> @@ -1417,6 +1413,11 @@ parse_headers_1([$\n, H |T], [$\r | L], Acc) when H =:= 32; parse_headers_1(lists:dropwhile(fun(X) -> is_whitespace(X) end, T), [32 | L], Acc); +parse_headers_1([$\n, H |T], L, Acc) when H =:= 32; + H =:= $\t -> + parse_headers_1(lists:dropwhile(fun(X) -> + is_whitespace(X) + end, T), [32 | L], Acc); parse_headers_1([$\n|T], [$\r | L], Acc) -> case parse_header(lists:reverse(L)) of invalid -> @@ -1424,6 +1425,13 @@ parse_headers_1([$\n|T], [$\r | L], Acc) -> NewHeader -> parse_headers_1(T, [], [NewHeader | Acc]) end; +parse_headers_1([$\n|T], L, Acc) -> + case parse_header(lists:reverse(L)) of + invalid -> + parse_headers_1(T, [], Acc); + NewHeader -> + parse_headers_1(T, [], [NewHeader | Acc]) + end; parse_headers_1([H|T], L, Acc) -> parse_headers_1(T, [H|L], Acc); parse_headers_1([], [], Acc) -> @@ -1469,6 +1477,9 @@ scan_header(Bin) -> {yes, Pos} -> {Headers, <<_:4/binary, Body/binary>>} = split_binary(Bin, Pos), {yes, Headers, Body}; + {yes_dodgy, Pos} -> + {Headers, <<_:2/binary, Body/binary>>} = split_binary(Bin, Pos), + {yes, Headers, Body}; no -> {no, Bin} end. @@ -1485,18 +1496,22 @@ scan_header(Bin1, Bin2) -> {yes, Pos} -> {Headers_suffix, <<_:4/binary, Body/binary>>} = split_binary(Bin_to_scan, Pos), {yes, <>, Body}; + {yes_dodgy, Pos} -> + {Headers_suffix, <<_:2/binary, Body/binary>>} = split_binary(Bin_to_scan, Pos), + {yes, <>, Body}; no -> {no, <>} end. get_crlf_crlf_pos(<<$\r, $\n, $\r, $\n, _/binary>>, Pos) -> {yes, Pos}; +get_crlf_crlf_pos(<<$\n, $\n, _/binary>>, Pos) -> {yes_dodgy, Pos}; get_crlf_crlf_pos(<<_, Rest/binary>>, Pos) -> get_crlf_crlf_pos(Rest, Pos + 1); get_crlf_crlf_pos(<<>>, _) -> no. scan_crlf(Bin) -> case get_crlf_pos(Bin) of - {yes, Pos} -> - {Prefix, <<_, _, Suffix/binary>>} = split_binary(Bin, Pos), + {yes, Offset, Pos} -> + {Prefix, <<_:Offset/binary, Suffix/binary>>} = split_binary(Bin, Pos), {yes, Prefix, Suffix}; no -> {no, Bin} @@ -1513,8 +1528,8 @@ scan_crlf_1(Bin1_head_size, Bin1, Bin2) -> <> = Bin1, Bin3 = <>, case get_crlf_pos(Bin3) of - {yes, Pos} -> - {Prefix, <<_, _, Suffix/binary>>} = split_binary(Bin3, Pos), + {yes, Offset, Pos} -> + {Prefix, <<_:Offset/binary, Suffix/binary>>} = split_binary(Bin3, Pos), {yes, list_to_binary([Bin1_head, Prefix]), Suffix}; no -> {no, list_to_binary([Bin1, Bin2])} @@ -1523,7 +1538,8 @@ scan_crlf_1(Bin1_head_size, Bin1, Bin2) -> get_crlf_pos(Bin) -> get_crlf_pos(Bin, 0). -get_crlf_pos(<<$\r, $\n, _/binary>>, Pos) -> {yes, Pos}; +get_crlf_pos(<<$\r, $\n, _/binary>>, Pos) -> {yes, 2, Pos}; +get_crlf_pos(<<$\n, _/binary>>, Pos) -> {yes, 1, Pos}; get_crlf_pos(<<_, Rest/binary>>, Pos) -> get_crlf_pos(Rest, Pos + 1); get_crlf_pos(<<>>, _) -> no. @@ -1534,21 +1550,22 @@ fmt_val(Term) -> io_lib:format("~p", [Term]). crnl() -> "\r\n". +method(connect) -> "CONNECT"; +method(copy) -> "COPY"; +method(delete) -> "DELETE"; method(get) -> "GET"; -method(post) -> "POST"; method(head) -> "HEAD"; -method(options) -> "OPTIONS"; -method(put) -> "PUT"; -method(delete) -> "DELETE"; -method(trace) -> "TRACE"; +method(lock) -> "LOCK"; method(mkcol) -> "MKCOL"; +method(move) -> "MOVE"; +method(options) -> "OPTIONS"; +method(patch) -> "PATCH"; +method(post) -> "POST"; method(propfind) -> "PROPFIND"; method(proppatch) -> "PROPPATCH"; -method(lock) -> "LOCK"; -method(unlock) -> "UNLOCK"; -method(move) -> "MOVE"; -method(copy) -> "COPY"; -method(connect) -> "CONNECT". +method(put) -> "PUT"; +method(trace) -> "TRACE"; +method(unlock) -> "UNLOCK". %% From RFC 2616 %% @@ -1772,11 +1789,15 @@ shutting_down(#state{lb_ets_tid = undefined}) -> ok; shutting_down(#state{lb_ets_tid = Tid, cur_pipeline_size = _Sz}) -> - catch ets:match_delete(Tid, {{'_', self()}, '_'}). + catch ets:match_delete(Tid, self()). inc_pipeline_counter(#state{is_closing = true} = State) -> State; -inc_pipeline_counter(#state{cur_pipeline_size = Pipe_sz} = State) -> +inc_pipeline_counter(#state{lb_ets_tid = undefined} = State) -> + State; +inc_pipeline_counter(#state{cur_pipeline_size = Pipe_sz, + lb_ets_tid = Tid} = State) -> + ets:update_counter(Tid, self(), {2,1,99999,9999}), State#state{cur_pipeline_size = Pipe_sz + 1}. dec_pipeline_counter(#state{is_closing = true} = State) -> @@ -1785,8 +1806,13 @@ dec_pipeline_counter(#state{lb_ets_tid = undefined} = State) -> State; dec_pipeline_counter(#state{cur_pipeline_size = Pipe_sz, lb_ets_tid = Tid} = State) -> - ets:delete(Tid, {Pipe_sz, self()}), - ets:insert(Tid, {{Pipe_sz - 1, self()}, []}), + try + ets:update_counter(Tid, self(), {2,-1,0,0}), + ets:update_counter(Tid, self(), {3,-1,0,0}) + catch + _:_ -> + ok + end, State#state{cur_pipeline_size = Pipe_sz - 1}. flatten([H | _] = L) when is_integer(H) -> diff --git a/src/ibrowse_lb.erl b/src/ibrowse_lb.erl index 0e001d4..aac6534 100644 --- a/src/ibrowse_lb.erl +++ b/src/ibrowse_lb.erl @@ -104,14 +104,6 @@ stop(Lb_pid) -> %% {stop, Reason, Reply, State} | (terminate/2 is called) %% {stop, Reason, State} (terminate/2 is called) %%-------------------------------------------------------------------- -% handle_call({spawn_connection, _Url, Max_sess, Max_pipe, _}, _From, -% #state{max_sessions = Max_sess, -% ets_tid = Tid, -% max_pipeline_size = Max_pipe_sz, -% num_cur_sessions = Num} = State) -% when Num >= Max -> -% Reply = find_best_connection(Tid), -% {reply, sorry_dude_reuse, State}; %% Update max_sessions in #state with supplied value handle_call({spawn_connection, _Url, Max_sess, Max_pipe, _}, _From, @@ -119,15 +111,18 @@ handle_call({spawn_connection, _Url, Max_sess, Max_pipe, _}, _From, when Num >= Max_sess -> State_1 = maybe_create_ets(State), Reply = find_best_connection(State_1#state.ets_tid, Max_pipe), - {reply, Reply, State_1#state{max_sessions = Max_sess}}; + {reply, Reply, State_1#state{max_sessions = Max_sess, + max_pipeline_size = Max_pipe}}; -handle_call({spawn_connection, Url, _Max_sess, _Max_pipe, SSL_options}, _From, +handle_call({spawn_connection, Url, Max_sess, Max_pipe, SSL_options}, _From, #state{num_cur_sessions = Cur} = State) -> State_1 = maybe_create_ets(State), Tid = State_1#state.ets_tid, {ok, Pid} = ibrowse_http_client:start_link({Tid, Url, SSL_options}), - ets:insert(Tid, {{1, Pid}, []}), - {reply, {ok, Pid}, State_1#state{num_cur_sessions = Cur + 1}}; + ets:insert(Tid, {Pid, 0, 0}), + {reply, {ok, Pid}, State_1#state{num_cur_sessions = Cur + 1, + max_sessions = Max_sess, + max_pipeline_size = Max_pipe}}; handle_call(stop, _From, #state{ets_tid = undefined} = State) -> gen_server:reply(_From, ok), @@ -180,7 +175,7 @@ handle_info({'EXIT', Pid, _Reason}, _ -> State end, - {noreply, State_1#state{num_cur_sessions = Cur_1}}; + {noreply, State_1#state{num_cur_sessions = Cur_1}, 10000}; handle_info({trace, Bool}, #state{ets_tid = undefined} = State) -> put(my_trace_flag, Bool), @@ -196,6 +191,9 @@ handle_info({trace, Bool}, #state{ets_tid = Tid} = State) -> put(my_trace_flag, Bool), {noreply, State}; +handle_info(timeout, State) -> + {noreply, State}; + handle_info(_Info, State) -> {noreply, State}. @@ -219,17 +217,25 @@ code_change(_OldVsn, State, _Extra) -> %%% Internal functions %%-------------------------------------------------------------------- find_best_connection(Tid, Max_pipe) -> - case ets:first(Tid) of - {Cur_sz, Pid} when Cur_sz < Max_pipe -> - ets:delete(Tid, {Cur_sz, Pid}), - ets:insert(Tid, {{Cur_sz + 1, Pid}, []}), - {ok, Pid}; - _ -> - {error, retry_later} + Res = find_best_connection(ets:first(Tid), Tid, Max_pipe), + Res. + +find_best_connection('$end_of_table', _, _) -> + {error, retry_later}; +find_best_connection(Pid, Tid, Max_pipe) -> + case ets:lookup(Tid, Pid) of + [{Pid, Cur_sz, Speculative_sz}] when Cur_sz < Max_pipe, + Speculative_sz < Max_pipe -> + ets:update_counter(Tid, Pid, {3, 1, 9999999, 9999999}), + {ok, Pid}; + _ -> + find_best_connection(ets:next(Tid, Pid), Tid, Max_pipe) end. maybe_create_ets(#state{ets_tid = undefined} = State) -> - Tid = ets:new(ibrowse_lb, [public, ordered_set]), + Tid = ets:new(ibrowse_lb, [public, + {write_concurrency, true}, + {read_concurrency, true}]), State#state{ets_tid = Tid}; maybe_create_ets(State) -> State. diff --git a/src/ibrowse_test.erl b/src/ibrowse_test.erl index cd5b7f4..9b521b6 100644 --- a/src/ibrowse_test.erl +++ b/src/ibrowse_test.erl @@ -84,7 +84,7 @@ send_reqs_1(Url, NumWorkers, NumReqsPerWorker) -> log_msg("Starting spawning of workers...~n", []), spawn_workers(Url, NumWorkers, NumReqsPerWorker), log_msg("Finished spawning workers...~n", []), - do_wait(), + do_wait(Url), End_time = now(), log_msg("All workers are done...~n", []), log_msg("ibrowse_test_results table: ~n~p~n", [ets:tab2list(ibrowse_test_results)]), @@ -114,24 +114,28 @@ spawn_workers(Url, NumWorkers, NumReqsPerWorker) -> ets:insert(pid_table, {Pid, []}), spawn_workers(Url, NumWorkers - 1, NumReqsPerWorker). -do_wait() -> +do_wait(Url) -> receive {'EXIT', _, normal} -> - do_wait(); + catch ibrowse:show_dest_status(Url), + catch ibrowse:show_dest_status(), + do_wait(Url); {'EXIT', Pid, Reason} -> ets:delete(pid_table, Pid), ets:insert(ibrowse_errors, {Pid, Reason}), ets:update_counter(ibrowse_test_results, crash, 1), - do_wait(); + do_wait(Url); Msg -> io:format("Recvd unknown message...~p~n", [Msg]), - do_wait() + do_wait(Url) after 1000 -> case ets:info(pid_table, size) of 0 -> done; _ -> - do_wait() + catch ibrowse:show_dest_status(Url), + catch ibrowse:show_dest_status(), + do_wait(Url) end end. @@ -236,16 +240,19 @@ unit_tests(Options) -> (catch ibrowse_test_server:start_server(8181, tcp)), ibrowse:start(), Options_1 = Options ++ [{connect_timeout, 5000}], + Test_timeout = proplists:get_value(test_timeout, Options, 60000), {Pid, Ref} = erlang:spawn_monitor(?MODULE, unit_tests_1, [self(), Options_1]), receive {done, Pid} -> ok; {'DOWN', Ref, _, _, Info} -> io:format("Test process crashed: ~p~n", [Info]) - after 60000 -> + after Test_timeout -> exit(Pid, kill), io:format("Timed out waiting for tests to complete~n", []) - end. + end, + catch ibrowse_test_server:stop_server(8181), + ok. unit_tests_1(Parent, Options) -> lists:foreach(fun({local_test_fun, Fun_name, Args}) -> diff --git a/test/ibrowse_test_server.erl b/test/ibrowse_test_server.erl index fcd75f6..a9d6e1e 100644 --- a/test/ibrowse_test_server.erl +++ b/test/ibrowse_test_server.erl @@ -35,7 +35,8 @@ start_server(Port, Sock_type) -> spawn_link(Fun). stop_server(Port) -> - exit(whereis(server_proc_name(Port)), kill). + catch exit(whereis(server_proc_name(Port)), kill), + ok. server_proc_name(Port) -> list_to_atom("ibrowse_test_server_"++integer_to_list(Port)).