diff --git a/src/ibrowse.erl b/src/ibrowse.erl index 5f26d45..0694465 100644 --- a/src/ibrowse.erl +++ b/src/ibrowse.erl @@ -20,14 +20,14 @@ %%

Here are a few sample invocations.

%% %% -%% ibrowse:send_req("http://intranet/messenger/", [], get). +%% ibrowse:send_req("http://intranet/messenger/", [], get). %%

-%% -%% ibrowse:send_req("http://www.google.com/", [], get, [], +%% +%% ibrowse:send_req("http://www.google.com/", [], get, [], %% [{proxy_user, "XXXXX"}, %% {proxy_password, "XXXXX"}, %% {proxy_host, "proxy"}, -%% {proxy_port, 8080}], 1000). +%% {proxy_port, 8080}], 1000). %%

%% %%ibrowse:send_req("http://www.erlang.org/download/otp_src_R10B-3.tar.gz", [], get, [], @@ -47,7 +47,7 @@ %% ibrowse:send_req("http://www.bbc.co.uk", [], trace). %% %%

-%% ibrowse:send_req("http://www.google.com", [], get, [], +%% ibrowse:send_req("http://www.google.com", [], get, [], %% [{stream_to, self()}]). %%
%% @@ -96,7 +96,9 @@ trace_off/2, all_trace_off/0, show_dest_status/0, - show_dest_status/2 + show_dest_status/2, + get_metrics/0, + get_metrics/2 ]). -ifdef(debug). @@ -108,7 +110,7 @@ get_value/3, do_trace/2 ]). - + -record(state, {trace = false}). -include("ibrowse.hrl"). @@ -156,7 +158,7 @@ stop() -> send_req(Url, Headers, Method) -> send_req(Url, Headers, Method, [], []). -%% @doc Same as send_req/3. +%% @doc Same as send_req/3. %% If a list is specified for the body it has to be a flat list. The body can also be a fun/0 or a fun/1.
%% If fun/0, the connection handling process will repeatdely call the fun until it returns an error or eof.
Fun() = {ok, Data} | eof

%% If fun/1, the connection handling process will repeatedly call the fun with the supplied state until it returns an error or eof.
Fun(State) = {ok, Data} | {ok, Data, NewState} | eof
@@ -166,7 +168,7 @@ send_req(Url, Headers, Method) -> send_req(Url, Headers, Method, Body) -> send_req(Url, Headers, Method, Body, []). -%% @doc Same as send_req/4. +%% @doc Same as send_req/4. %% For a description of SSL Options, look in the ssl manpage. If the %% HTTP Version to use is not specified, the default is 1.1. %%
@@ -187,7 +189,7 @@ send_req(Url, Headers, Method, Body) -> %% will have to invoke ibrowse:stream_next(Request_id) to %% receive the next packet. %% -%%
  • When both the options save_response_to_file and stream_to +%%
  • When both the options save_response_to_file and stream_to %% are specified, the former takes precedence.
  • %% %%
  • For the save_response_to_file option, the response body is saved to @@ -202,8 +204,8 @@ send_req(Url, Headers, Method, Body) -> %% cases, it might be hard to estimate how long a request will take to %% complete. In such cases, the client might want to timeout if no %% data has been received on the link for a certain time interval. -%% -%% This value is also used to close connections which are not in use for +%% +%% This value is also used to close connections which are not in use for %% the specified timeout value. %%
  • %% @@ -221,15 +223,15 @@ send_req(Url, Headers, Method, Body) -> %% ibrowse:send_req("http://www.example.com/cgi-bin/request", [], get, [], [{connect_timeout, 100}], 1000). %% %% In the above invocation, if the connection isn't established within -%% 100 milliseconds, the request will fail with +%% 100 milliseconds, the request will fail with %% {error, conn_failed}.
    %% If connection setup succeeds, the total time allowed for the %% request to complete will be 1000 milliseconds minus the time taken %% for connection setup. %% -%% +%% %%
  • The socket_options option can be used to set -%% specific options on the socket. The {active, true | false | once} +%% specific options on the socket. The {active, true | false | once} %% and {packet_type, Packet_type} will be filtered out by ibrowse.
  • %% %%
  • The headers_as_is option is to enable the caller @@ -253,7 +255,7 @@ send_req(Url, Headers, Method, Body) -> %% {response_format,response_format()}| %% {stream_chunk_size, integer()} | %% {max_pipeline_size, integer()} | -%% {trace, boolean()} | +%% {trace, boolean()} | %% {is_ssl, boolean()} | %% {ssl_options, [SSLOpt]} | %% {pool_name, atom()} | @@ -273,7 +275,7 @@ send_req(Url, Headers, Method, Body) -> %% {inactivity_timeout, integer()} | %% {connect_timeout, integer()} | %% {socket_options, Sock_opts} | -%% {transfer_encoding, {chunked, ChunkSize}} | +%% {transfer_encoding, {chunked, ChunkSize}} | %% {headers_as_is, boolean()} | %% {give_raw_headers, boolean()} | %% {preserve_chunked_encoding,boolean()} @@ -292,7 +294,7 @@ send_req(Url, Headers, Method, Body) -> send_req(Url, Headers, Method, Body, Options) -> send_req(Url, Headers, Method, Body, Options, 30000). -%% @doc Same as send_req/5. +%% @doc Same as send_req/5. %% All timeout values are in milliseconds. %% @spec send_req(Url, Headers::headerList(), Method::method(), Body::body(), Options::optionList(), Timeout) -> response() %% Timeout = integer() | infinity @@ -317,21 +319,21 @@ send_req(Url, Headers, Method, Body, Options, Timeout) -> true -> {get_value(ssl_options, Options_1, []), true} end, try_routing_request(Lb_pid, Parsed_url, - Max_sessions, + Max_sessions, Max_pipeline_size, - {SSLOptions, IsSSL}, + {SSLOptions, IsSSL}, Headers, Method, Body, Options_1, Timeout, 0); Err -> {error, {url_parsing_failed, Err}} end. try_routing_request(Lb_pid, Parsed_url, - Max_sessions, + Max_sessions, Max_pipeline_size, - {SSLOptions, IsSSL}, + {SSLOptions, IsSSL}, Headers, Method, Body, Options_1, Timeout, Try_count) when Try_count < 3 -> case ibrowse_lb:spawn_connection(Lb_pid, Parsed_url, - Max_sessions, + Max_sessions, Max_pipeline_size, {SSLOptions, IsSSL}) of {ok, Conn_Pid} -> @@ -339,9 +341,9 @@ try_routing_request(Lb_pid, Parsed_url, Method, Body, Options_1, Timeout) of {error, sel_conn_closed} -> try_routing_request(Lb_pid, Parsed_url, - Max_sessions, + Max_sessions, Max_pipeline_size, - {SSLOptions, IsSSL}, + {SSLOptions, IsSSL}, Headers, Method, Body, Options_1, Timeout, Try_count + 1); Res -> Res @@ -406,7 +408,7 @@ set_dest(_Host, _Port, [H | _]) -> exit({invalid_option, H}); set_dest(_, _, []) -> ok. - + %% @doc Set the maximum number of connections allowed to a specific Host:Port. %% @spec set_max_sessions(Host::string(), Port::integer(), Max::integer()) -> ok set_max_sessions(Host, Port, Max) when is_integer(Max), Max > 0 -> @@ -525,7 +527,7 @@ send_req_direct(Conn_pid, Url, Headers, Method, Body, Options, Timeout) -> %% caller. Should be used in conjunction with the %% stream_to option %% @spec stream_next(Req_id :: req_id()) -> ok | {error, unknown_req_id} -stream_next(Req_id) -> +stream_next(Req_id) -> case ets:lookup(ibrowse_stream, {req_id_pid, Req_id}) of [] -> {error, unknown_req_id}; @@ -540,7 +542,7 @@ stream_next(Req_id) -> %% the connection which is serving this Req_id will be aborted, and an %% error returned. %% @spec stream_close(Req_id :: req_id()) -> ok | {error, unknown_req_id} -stream_close(Req_id) -> +stream_close(Req_id) -> case ets:lookup(ibrowse_stream, {req_id_pid, Req_id}) of [] -> {error, unknown_req_id}; @@ -559,7 +561,7 @@ trace_off() -> %% @doc Turn tracing on for all connections to the specified HTTP %% server. Host is whatever is specified as the domain name in the URL %% @spec trace_on(Host, Port) -> ok -%% Host = string() +%% Host = string() %% Port = integer() trace_on(Host, Port) -> ibrowse ! {trace, true, Host, Port}, @@ -582,74 +584,99 @@ all_trace_off() -> %% about workers spawned using spawn_worker_process/2 or %% spawn_link_worker_process/2 is not included. show_dest_status() -> - Dests = lists:filter(fun({lb_pid, {Host, Port}, _}) when is_list(Host), - is_integer(Port) -> - true; - (_) -> - false - end, ets:tab2list(ibrowse_lb)), - All_ets = ets:all(), io:format("~-40.40s | ~-5.5s | ~-10.10s | ~s~n", ["Server:port", "ETS", "Num conns", "LB Pid"]), io:format("~80.80.=s~n", [""]), - lists:foreach(fun({lb_pid, {Host, Port}, Lb_pid}) -> - case lists:dropwhile( - fun(Tid) -> - ets:info(Tid, owner) /= Lb_pid - end, All_ets) of - [] -> - io:format("~40.40s | ~-5.5s | ~-5.5s | ~s~n", - [Host ++ ":" ++ integer_to_list(Port), - "", - "", - io_lib:format("~p", [Lb_pid])] - ); - [Tid | _] -> - catch ( - begin - Size = ets:info(Tid, size), - io:format("~40.40s | ~-5.5s | ~-5.5s | ~s~n", - [Host ++ ":" ++ integer_to_list(Port), - io_lib:format("~p", [Tid]), - integer_to_list(Size), - io_lib:format("~p", [Lb_pid])] - ) - end - ) - end - end, Dests). - + lists:foreach(fun({Host, Port, Lb_pid, Tid, Size}) -> + case Tid of + unknown -> + io:format("~40.40s | ~-5.5s | ~-5.5s | ~s~n", + [Host ++ ":" ++ integer_to_list(Port), + "", + "", + io_lib:format("~p", [Lb_pid])] + ); + _ActualTid -> + io:format("~40.40s | ~-5.5s | ~-5.5s | ~s~n", + [Host ++ ":" ++ integer_to_list(Port), + io_lib:format("~p", [Tid]), + integer_to_list(Size), + io_lib:format("~p", [Lb_pid])] + ) + end + end, get_metrics()). + %% @doc Shows some internal information about load balancing to a %% specified Host:Port. Info about workers spawned using %% spawn_worker_process/2 or spawn_link_worker_process/2 is not %% included. show_dest_status(Host, Port) -> + case get_metrics(Host, Port) of + {Lb_pid, MsgQueueSize, Tid, Size, PipelineSizes} -> + io:format("Load Balancer Pid : ~p~n", [Lb_pid]), + io:format("LB process msg q size : ~p~n", [MsgQueueSize]), + case Tid of + unknown -> + io:format("Couldn't locate ETS table for ~p~n", [Lb_pid]); + _ -> + io:format("LB ETS table id : ~p~n", [Tid]), + io:format("Num Connections : ~p~n", [Size]), + {First_p_sz, Last_p_sz} = PipelineSizes, + io:format("Smallest pipeline : ~1000.p~n", [First_p_sz]), + io:format("Largest pipeline : ~1000.p~n", [Last_p_sz]) + end; + _ -> + no_active_processes + end. + +get_metrics() -> + Dests = lists:filter(fun({lb_pid, {Host, Port}, _}) when is_list(Host), + is_integer(Port) -> + true; + (_) -> + false + end, ets:tab2list(ibrowse_lb)), + All_ets = ets:all(), + lists:map(fun({lb_pid, {Host, Port}, Lb_pid}) -> + case lists:dropwhile( + fun(Tid) -> + ets:info(Tid, owner) /= Lb_pid + end, All_ets) of + [] -> + {Host, Port, Lb_pid, unknown, 0}; + [Tid | _] -> + Size = case catch (ets:info(Tid, size)) of + N when is_integer(N) -> N; + _ -> 0 + end, + {Host, Port, Lb_pid, Tid, Size} + end + end, Dests). + +get_metrics(Host, Port) -> case ets:lookup(ibrowse_lb, {Host, Port}) of [] -> no_active_processes; [#lb_pid{pid = Lb_pid}] -> - io:format("Load Balancer Pid : ~p~n", [Lb_pid]), - io:format("LB process msg q size : ~p~n", [(catch process_info(Lb_pid, message_queue_len))]), + MsgQueueSize = (catch process_info(Lb_pid, message_queue_len)), + %% {Lb_pid, MsgQueueSize, case lists:dropwhile( fun(Tid) -> ets:info(Tid, owner) /= Lb_pid end, ets:all()) of [] -> - io:format("Couldn't locate ETS table for ~p~n", [Lb_pid]); + {Lb_pid, MsgQueueSize, unknown, 0, unknown}; [Tid | _] -> First = ets:first(Tid), Last = ets:last(Tid), Size = ets:info(Tid, size), - io:format("LB ETS table id : ~p~n", [Tid]), - io:format("Num Connections : ~p~n", [Size]), case Size of 0 -> ok; _ -> {First_p_sz, _} = First, {Last_p_sz, _} = Last, - io:format("Smallest pipeline : ~1000.p~n", [First_p_sz]), - io:format("Largest pipeline : ~1000.p~n", [Last_p_sz]) + {Lb_pid, MsgQueueSize, Tid, Size, {First_p_sz, Last_p_sz}} end end end. @@ -703,7 +730,7 @@ import_config(Filename) -> case file:consult(Filename) of {ok, Terms} -> ets:delete_all_objects(ibrowse_conf), - Fun = fun({dest, Host, Port, MaxSess, MaxPipe, Options}) + Fun = fun({dest, Host, Port, MaxSess, MaxPipe, Options}) when is_list(Host), is_integer(Port), is_integer(MaxSess), MaxSess > 0, is_integer(MaxPipe), MaxPipe > 0, is_list(Options) -> @@ -713,7 +740,7 @@ import_config(Filename) -> lists:foreach( fun({X, Y}) -> ets:insert(ibrowse_conf, - #ibrowse_conf{key = X, + #ibrowse_conf{key = X, value = Y}) end, I); ({K, V}) -> @@ -816,7 +843,7 @@ handle_info(all_trace_off, State) -> ets:foldl(Fun, undefined, ibrowse_lb), ets:select_delete(ibrowse_conf, [{{ibrowse_conf,{trace,'$1','$2'},true},[],['true']}]), {noreply, State}; - + handle_info({trace, Bool}, State) -> put(my_trace_flag, Bool), {noreply, State}; @@ -833,7 +860,7 @@ handle_info({trace, Bool, Host, Port}, State) -> ets:insert(ibrowse_conf, #ibrowse_conf{key = {trace, Host, Port}, value = Bool}), {noreply, State}; - + handle_info(_Info, State) -> {noreply, State}.