Sfoglia il codice sorgente

More fixes to pipelining

pull/122/head
Chandrashekhar Mullaparthi 10 anni fa
parent
commit
47ae6a4217
1 ha cambiato i file con 33 aggiunte e 51 eliminazioni
  1. +33
    -51
      src/ibrowse_lb.erl

+ 33
- 51
src/ibrowse_lb.erl Vedi File

@ -36,7 +36,6 @@
port,
max_sessions,
max_pipeline_size,
num_cur_sessions = 0,
proc_state
}).
@ -123,24 +122,23 @@ handle_call(stop, _From, #state{ets_tid = Tid} = State) ->
handle_call(_, _From, #state{proc_state = shutting_down} = State) ->
{reply, {error, shutting_down}, State};
%% Update max_sessions in #state with supplied value
handle_call({spawn_connection, _Url, Max_sess, Max_pipe, _, _}, _From,
#state{num_cur_sessions = Num} = State)
when Num >= Max_sess ->
State_1 = maybe_create_ets(State),
Reply = find_best_connection(State_1#state.ets_tid, Max_pipe),
{reply, Reply, State_1#state{max_sessions = Max_sess,
max_pipeline_size = Max_pipe}};
handle_call({spawn_connection, Url, Max_sess, Max_pipe, SSL_options, Process_options}, _From,
#state{num_cur_sessions = Cur} = State) ->
State_1 = maybe_create_ets(State),
Tid = State_1#state.ets_tid,
{ok, Pid} = ibrowse_http_client:start_link({Tid, Url, SSL_options}, Process_options),
ets:insert(Tid, {{0, Pid}, []}),
{reply, {ok, Pid}, State_1#state{num_cur_sessions = Cur + 1,
max_sessions = Max_sess,
max_pipeline_size = Max_pipe}};
State) ->
State_1 = maybe_create_ets(State),
Tid = State_1#state.ets_tid,
Tid_size = ets:info(Tid, size),
case Tid_size > Max_sess of
true ->
Reply = find_best_connection(Tid, Max_pipe, Tid_size),
{reply, Reply, State_1#state{max_sessions = Max_sess,
max_pipeline_size = Max_pipe}};
false ->
{ok, Pid} = ibrowse_http_client:start({Tid, Url, SSL_options}, Process_options),
Ts = os:timestamp(),
ets:insert(Tid, {{0, Ts, Pid}, []}),
{reply, {ok, {0, Ts, Pid}}, State_1#state{max_sessions = Max_sess,
max_pipeline_size = Max_pipe}}
end;
handle_call(Request, _From, State) ->
Reply = {unknown_request, Request},
@ -163,24 +161,6 @@ handle_cast(_Msg, State) ->
%% {noreply, State, Timeout} |
%% {stop, Reason, State} (terminate/2 is called)
%%--------------------------------------------------------------------
handle_info({'EXIT', Parent, _Reason}, #state{parent_pid = Parent} = State) ->
{stop, normal, State};
handle_info({'EXIT', _Pid, _Reason}, #state{ets_tid = undefined} = State) ->
{noreply, State};
handle_info({'EXIT', Pid, _Reason},
#state{num_cur_sessions = Cur,
ets_tid = Tid} = State) ->
ets:match_delete(Tid, {{'_', Pid}, '_'}),
Cur_1 = Cur - 1,
case Cur_1 of
0 ->
ets:delete(Tid),
{noreply, State#state{ets_tid = undefined, num_cur_sessions = 0}, 10000};
_ ->
{noreply, State#state{num_cur_sessions = Cur_1}}
end;
handle_info({trace, Bool}, #state{ets_tid = undefined} = State) ->
put(my_trace_flag, Bool),
@ -216,7 +196,8 @@ handle_info(_Info, State) ->
%% Description: Shutdown the server
%% Returns: any (ignored by gen_server)
%%--------------------------------------------------------------------
terminate(_Reason, _State) ->
terminate(_Reason, #state{host = Host, port = Port} = _State) ->
catch ets:delete(ibrowse_lb, {Host, Port}),
ok.
%%--------------------------------------------------------------------
@ -230,23 +211,24 @@ code_change(_OldVsn, State, _Extra) ->
%%--------------------------------------------------------------------
%%% Internal functions
%%--------------------------------------------------------------------
find_best_connection(Tid, Max_pipe) ->
First = ets:first(Tid),
case First of
{Pid_pipeline_size, Pid} when Pid_pipeline_size < Max_pipe ->
ets:delete(Tid, First),
ets:insert(Tid, {{Pid_pipeline_size, Pid}, []}),
{ok, Pid};
_ ->
{error, retry_later}
find_best_connection(Tid, Max_pipe, _Num_cur) ->
case ets:first(Tid) of
{Spec_size, Ts, Pid} = First ->
case Spec_size >= Max_pipe of
true ->
{error, retry_later};
false ->
ets:delete(Tid, First),
ets:insert(Tid, {{Spec_size + 1, Ts, Pid}, []}),
{ok, First}
end;
'$end_of_table' ->
{error, retry_later}
end.
maybe_create_ets(#state{ets_tid = undefined,
host = Host, port = Port} = State) ->
maybe_create_ets(#state{ets_tid = undefined, host = Host, port = Port} = State) ->
Tid = ets:new(ibrowse_lb, [public, ordered_set]),
ets:insert(ibrowse_lb, #lb_pid{host_port = {Host, Port},
pid = self(),
ets_tid = Tid}),
ets:insert(ibrowse_lb, #lb_pid{host_port = {Host, Port}, pid = self(), ets_tid = Tid}),
State#state{ets_tid = Tid};
maybe_create_ets(State) ->
State.

Caricamento…
Annulla
Salva