瀏覽代碼

代码修改

erlArango_v1
lijie 5 年之前
父節點
當前提交
29f18af29e
共有 4 個檔案被更改,包括 52 行新增48 行删除
  1. +2
    -3
      src/httpCli/agHttpCli.erl
  2. +36
    -32
      src/httpCli/agHttpProtocol.erl
  3. +5
    -4
      src/httpCli/agSslAgencyIns.erl
  4. +9
    -9
      src/httpCli/agTcpAgencyIns.erl

+ 2
- 3
src/httpCli/agHttpCli.erl 查看文件

@ -21,7 +21,7 @@
-spec callAgency(poolName(), method(), path(), headers(), body()) -> term() | {error, term()}. -spec callAgency(poolName(), method(), path(), headers(), body()) -> term() | {error, term()}.
callAgency(PoolName, Method, Path, Headers, Body) -> callAgency(PoolName, Method, Path, Headers, Body) ->
callAgency(PoolName, Method, Path, Headers, Body, infinity).
callAgency(PoolName, Method, Path, Headers, Body, ?DEFAULT_TIMEOUT).
-spec callAgency(poolName(), method(), path(), headers(), body(), timeout()) -> term() | {error, atom()}. -spec callAgency(poolName(), method(), path(), headers(), body(), timeout()) -> term() | {error, atom()}.
callAgency(PoolName, Method, Path, Headers, Body, Timeout) -> callAgency(PoolName, Method, Path, Headers, Body, Timeout) ->
@ -34,7 +34,7 @@ callAgency(PoolName, Method, Path, Headers, Body, Timeout) ->
-spec castAgency(poolName(), method(), path(), headers(), body()) -> {ok, requestId()} | {error, atom()}. -spec castAgency(poolName(), method(), path(), headers(), body()) -> {ok, requestId()} | {error, atom()}.
castAgency(PoolName, Method, Path, Headers, Body) -> castAgency(PoolName, Method, Path, Headers, Body) ->
castAgency(PoolName, Method, Path, Headers, Body, infinity, self()).
castAgency(PoolName, Method, Path, Headers, Body, ?DEFAULT_TIMEOUT, self()).
-spec castAgency(poolName(), method(), path(), headers(), body(), timeout()) -> {ok, requestId()} | {error, atom()}. -spec castAgency(poolName(), method(), path(), headers(), body(), timeout()) -> {ok, requestId()} | {error, atom()}.
castAgency(PoolName, Method, Path, Headers, Body, Timeout) -> castAgency(PoolName, Method, Path, Headers, Body, Timeout) ->
@ -58,7 +58,6 @@ castAgency(PoolName, Method, Path, Headers, Body, Timeout, Pid) ->
receiveResponse(RequestId) -> receiveResponse(RequestId) ->
receive receive
#miAgHttpCliRet{requestId = RequestId, reply = Reply} -> #miAgHttpCliRet{requestId = RequestId, reply = Reply} ->
%io:format("IMY************************ miAgHttpCliRet ~p ~p ~n", [111, size(element(4, Reply))]),
Reply Reply
end. end.

+ 36
- 32
src/httpCli/agHttpProtocol.erl 查看文件

@ -44,25 +44,25 @@ response(Data) ->
-spec response(undefined | recvState(), binary:cp(), binary:cp(), binary()) -> {ok, recvState()} | error(). -spec response(undefined | recvState(), binary:cp(), binary:cp(), binary()) -> {ok, recvState()} | error().
response(undefined, Rn, RnRn, Data) -> response(undefined, Rn, RnRn, Data) ->
case parseStatusLine(Data, Rn) of case parseStatusLine(Data, Rn) of
{StatusCode, Reason, Rest} ->
{StatusCode, Rest} ->
case splitHeaders(Rest, Rn, RnRn) of case splitHeaders(Rest, Rn, RnRn) of
{undefined, Headers, Body} -> {undefined, Headers, Body} ->
{done, #recvState{stage = done, statusCode = StatusCode, reason = Reason, headers = Headers, contentLength = undefined, body = Body}};
{done, #recvState{stage = done, statusCode = StatusCode, headers = Headers, contentLength = undefined, body = Body}};
{0, Headers, Rest} -> {0, Headers, Rest} ->
{done, #recvState{stage = done, statusCode = StatusCode, reason = Reason, headers = Headers, contentLength = 0, body = Rest}};
{done, #recvState{stage = done, statusCode = StatusCode, headers = Headers, contentLength = 0, body = Rest}};
{chunked, Headers, Body} -> {chunked, Headers, Body} ->
RecvState = #recvState{stage = body, contentLength = chunked, statusCode = StatusCode, reason = Reason, headers = Headers},
RecvState = #recvState{stage = body, contentLength = chunked, statusCode = StatusCode, headers = Headers},
response(RecvState, Rn, RnRn, Body); response(RecvState, Rn, RnRn, Body);
{ContentLength, Headers, Body} -> {ContentLength, Headers, Body} ->
BodySize = erlang:size(Body), BodySize = erlang:size(Body),
if if
BodySize == ContentLength -> BodySize == ContentLength ->
{done, #recvState{stage = done, statusCode = StatusCode, reason = Reason, headers = Headers, contentLength = ContentLength, body = Body}};
{done, #recvState{stage = done, statusCode = StatusCode, headers = Headers, contentLength = ContentLength, body = Body}};
BodySize > ContentLength -> BodySize > ContentLength ->
?WARN(agTcpAgencyIns, "11 contentLength get to long data why? more: ~p ~n",[BodySize - ContentLength]), ?WARN(agTcpAgencyIns, "11 contentLength get to long data why? more: ~p ~n",[BodySize - ContentLength]),
{done, #recvState{stage = done, statusCode = StatusCode, reason = Reason, headers = Headers, contentLength = ContentLength, body = Body}};
{done, #recvState{stage = done, statusCode = StatusCode, headers = Headers, contentLength = ContentLength, body = Body}};
true -> true ->
{ok, #recvState{stage = body, statusCode = StatusCode, reason = Reason, headers = Headers, contentLength = ContentLength, body = Body}}
{ok, #recvState{stage = body, statusCode = StatusCode, headers = Headers, contentLength = ContentLength, body = Body}}
end; end;
not_enough_data -> not_enough_data ->
%% headers都不足 %% headers都不足
@ -98,31 +98,35 @@ response(#recvState{stage = body, contentLength = ContentLength, body = Body} =
true -> true ->
{ok,RecvState#recvState{body = CurData}} {ok,RecvState#recvState{body = CurData}}
end; end;
response(#recvState{stage = header, body = Body}, Rn, RnRn, Data) ->
CurData = <<Body/binary, Data/binary>>,
case parseStatusLine(CurData, Rn) of
{StatusCode, Reason, Rest} ->
response(#recvState{stage = header, body = OldBody}, Rn, RnRn, Data) ->
CurBody = <<OldBody/binary, Data/binary>>,
case parseStatusLine(CurBody, Rn) of
{StatusCode, Rest} ->
case splitHeaders(Rest, Rn, RnRn) of case splitHeaders(Rest, Rn, RnRn) of
{undefined, Headers, Body} -> {undefined, Headers, Body} ->
{done, #recvState{stage = done, statusCode = StatusCode, reason = Reason, headers = Headers, contentLength = undefined, body = Body}};
{done, #recvState{stage = done, statusCode = StatusCode, headers = Headers, contentLength = undefined, body = Body}};
{0, Headers, Body} -> {0, Headers, Body} ->
{done, #recvState{stage = done, statusCode = StatusCode, reason = Reason, headers = Headers, contentLength = 0, body = Body}};
{done, #recvState{stage = done, statusCode = StatusCode, headers = Headers, contentLength = 0, body = Body}};
{chunked, Headers, Rest} -> {chunked, Headers, Rest} ->
RecvState = #recvState{stage = body, contentLength = chunked, statusCode = StatusCode, reason = Reason, headers = Headers},
RecvState = #recvState{stage = body, contentLength = chunked, statusCode = StatusCode, headers = Headers},
response(RecvState, Rn, RnRn, Rest); response(RecvState, Rn, RnRn, Rest);
{ContentLength, Headers, Body} -> {ContentLength, Headers, Body} ->
case erlang:size(Body) >= ContentLength of
BodySize = erlang:size(Body),
if
BodySize == ContentLength ->
{done, #recvState{stage = done, statusCode = StatusCode, headers = Headers, contentLength = ContentLength, body = Body}};
BodySize > ContentLength ->
?WARN(agTcpAgencyIns, "33 contentLength get to long data why? more: ~p ~n",[BodySize - ContentLength]),
{done, #recvState{stage = done, statusCode = StatusCode, headers = Headers, contentLength = ContentLength, body = Body}};
true -> true ->
{done, #recvState{stage = done, statusCode = StatusCode, reason = Reason, headers = Headers, contentLength = ContentLength, body = Body}};
_ ->
{ok, #recvState{stage = body, statusCode = StatusCode, reason = Reason, headers = Headers, contentLength = ContentLength, body = Body}}
{ok, #recvState{stage = body, statusCode = StatusCode, headers = Headers, contentLength = ContentLength, body = Body}}
end; end;
not_enough_data -> not_enough_data ->
%% headers都不足 %% headers都不足
{ok, #recvState{stage = header, body = CurData}}
{ok, #recvState{stage = header, body = CurBody}}
end; end;
not_enough_data -> not_enough_data ->
{ok, #recvState{stage = header, body = CurData}};
{ok, #recvState{stage = header, body = CurBody}};
{error, Reason} -> {error, Reason} ->
{error, Reason} {error, Reason}
end. end.
@ -159,35 +163,35 @@ parseStatusLine(Data, Rn) ->
not_enough_data; not_enough_data;
[Line, Rest] -> [Line, Rest] ->
case parseStatusReason(Line) of case parseStatusReason(Line) of
{ok, StatusCode, Reason} ->
{StatusCode, Reason, Rest};
{ok, StatusCode} ->
{StatusCode, Rest};
{error, Reason} -> {error, Reason} ->
{error, Reason} {error, Reason}
end end
end. end.
parseStatusReason(<<"HTTP/1.1 200 OK">>) -> parseStatusReason(<<"HTTP/1.1 200 OK">>) ->
{ok, 200, <<"OK">>};
{ok, 200};
parseStatusReason(<<"HTTP/1.1 204 No Content">>) -> parseStatusReason(<<"HTTP/1.1 204 No Content">>) ->
{ok, 204, <<"No Content">>};
{ok, 204};
parseStatusReason(<<"HTTP/1.1 301 Moved Permanently">>) -> parseStatusReason(<<"HTTP/1.1 301 Moved Permanently">>) ->
{ok, 301, <<"Moved Permanently">>};
{ok, 301};
parseStatusReason(<<"HTTP/1.1 302 Found">>) -> parseStatusReason(<<"HTTP/1.1 302 Found">>) ->
{ok, 302, <<"Found">>};
{ok, 302};
parseStatusReason(<<"HTTP/1.1 403 Forbidden">>) -> parseStatusReason(<<"HTTP/1.1 403 Forbidden">>) ->
{ok, 403, <<"Forbidden">>};
{ok, 403};
parseStatusReason(<<"HTTP/1.1 404 Not Found">>) -> parseStatusReason(<<"HTTP/1.1 404 Not Found">>) ->
{ok, 404, <<"Not Found">>};
{ok, 404};
parseStatusReason(<<"HTTP/1.1 500 Internal Server Error">>) -> parseStatusReason(<<"HTTP/1.1 500 Internal Server Error">>) ->
{ok, 500, <<"Internal Server Error">>};
{ok, 500};
parseStatusReason(<<"HTTP/1.1 502 Bad Gateway">>) -> parseStatusReason(<<"HTTP/1.1 502 Bad Gateway">>) ->
{ok, 502, <<"Bad Gateway">>};
parseStatusReason(<<"HTTP/1.1 ", N1, N2, N3, " ", Reason/bits>>)
{ok, 502};
parseStatusReason(<<"HTTP/1.1 ", N1, N2, N3, " ", _Reason/bits>>)
when $0 =< N1, N1 =< $9, when $0 =< N1, N1 =< $9,
$0 =< N2, N2 =< $9, $0 =< N2, N2 =< $9,
$0 =< N3, N3 =< $9 -> $0 =< N3, N3 =< $9 ->
StatusCode = (N1 - $0) * 100 + (N2 - $0) * 10 + (N3 - $0), StatusCode = (N1 - $0) * 100 + (N2 - $0) * 10 + (N3 - $0),
{ok, StatusCode, Reason};
{ok, StatusCode};
parseStatusReason(<<"HTTP/1.0 ", _/binary>>) -> parseStatusReason(<<"HTTP/1.0 ", _/binary>>) ->
{error, unsupported_feature}; {error, unsupported_feature};
parseStatusReason(_) -> parseStatusReason(_) ->

+ 5
- 4
src/httpCli/agSslAgencyIns.erl 查看文件

@ -57,11 +57,11 @@ handleMsg({miRequest, FromPid, Method, Path, Headers, Body, RequestId, OverTime}
infinity -> infinity ->
undefined; undefined;
_ -> _ ->
erlang:start_timer(OverTime, self(), waiting, [{abs, true}])
erlang:start_timer(OverTime, self(), waiting_over, [{abs, true}])
end, end,
{ok, SrvState, CliState#cliState{status = waiting, backlogNum = BacklogNum + 1, curInfo = {FromPid, RequestId, TimerRef}}}; {ok, SrvState, CliState#cliState{status = waiting, backlogNum = BacklogNum + 1, curInfo = {FromPid, RequestId, TimerRef}}};
{error, Reason} -> {error, Reason} ->
?WARN(ServerName, ":send error: ~p~n", [Reason]),
?WARN(ServerName, ":send error: ~p ~p ~p ~n", [Reason, FromPid, RequestId]),
ssl:close(Socket), ssl:close(Socket),
agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, socket_send_error}), agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, socket_send_error}),
dealClose(SrvState, CliState, {error, socket_send_error}) dealClose(SrvState, CliState, {error, socket_send_error})
@ -86,12 +86,12 @@ handleMsg({ssl, Socket, Data},
{ok, NewRecvState} -> {ok, NewRecvState} ->
{ok, SrvState, CliState#cliState{recvState = NewRecvState}}; {ok, SrvState, CliState#cliState{recvState = NewRecvState}};
{error, Reason} -> {error, Reason} ->
?WARN(ServerName, "handle ssl data error: ~p~n", [Reason]),
?WARN(ServerName, "handle ssl data error: ~p ~p ~n", [Reason, CurInfo]),
ssl:close(Socket), ssl:close(Socket),
dealClose(SrvState, CliState, {error, ssl_data_error}) dealClose(SrvState, CliState, {error, ssl_data_error})
catch catch
E:R:S -> E:R:S ->
?WARN(ServerName, "handle ssl data crash: ~p:~p~n~p~n", [E, R, S]),
?WARN(ServerName, "handle ssl data crash: ~p:~p~n~p~n ~p~n ", [E, R, S, CurInfo]),
ssl:close(Socket), ssl:close(Socket),
dealClose(SrvState, CliState, {{error, agency_handledata_error}}) dealClose(SrvState, CliState, {{error, agency_handledata_error}})
end; end;
@ -185,6 +185,7 @@ dealQueueRequest({miRequest, FromPid, Method, Path, Headers, Body, RequestId, Ov
case erlang:system_time(millisecond) > OverTime of case erlang:system_time(millisecond) > OverTime of
true -> true ->
%% %%
agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, timeout}),
case agAgencyUtils:getQueue(RequestsOut + 2) of case agAgencyUtils:getQueue(RequestsOut + 2) of
undefined -> undefined ->
{ok, SrvState, CliState#cliState{status = waiting, requestsOut = RequestsOut + 1}}; {ok, SrvState, CliState#cliState{status = waiting, requestsOut = RequestsOut + 1}};

+ 9
- 9
src/httpCli/agTcpAgencyIns.erl 查看文件

@ -57,11 +57,11 @@ handleMsg({miRequest, FromPid, Method, Path, Headers, Body, RequestId, OverTime}
infinity -> infinity ->
undefined; undefined;
_ -> _ ->
erlang:start_timer(OverTime, self(), waiting, [{abs, true}])
erlang:start_timer(OverTime, self(), waiting_over, [{abs, true}])
end, end,
{ok, SrvState, CliState#cliState{status = waiting, backlogNum = BacklogNum + 1, curInfo = {FromPid, RequestId, TimerRef}}}; {ok, SrvState, CliState#cliState{status = waiting, backlogNum = BacklogNum + 1, curInfo = {FromPid, RequestId, TimerRef}}};
{error, Reason} -> {error, Reason} ->
?WARN(ServerName, ":send error: ~p~n", [Reason]),
?WARN(ServerName, ":send error: ~p ~p ~p ~n", [Reason, FromPid, RequestId]),
gen_tcp:close(Socket), gen_tcp:close(Socket),
agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, socket_send_error}), agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, socket_send_error}),
dealClose(SrvState, CliState, {error, socket_send_error}) dealClose(SrvState, CliState, {error, socket_send_error})
@ -86,16 +86,16 @@ handleMsg({tcp, Socket, Data},
{ok, NewRecvState} -> {ok, NewRecvState} ->
{ok, SrvState, CliState#cliState{recvState = NewRecvState}}; {ok, SrvState, CliState#cliState{recvState = NewRecvState}};
{error, Reason} -> {error, Reason} ->
?WARN(ServerName, "handle tcp data error: ~p~n", [Reason]),
?WARN(ServerName, "handle tcp data error: ~p ~p ~n", [Reason, CurInfo]),
gen_tcp:close(Socket), gen_tcp:close(Socket),
dealClose(SrvState, CliState, {error, tcp_data_error}) dealClose(SrvState, CliState, {error, tcp_data_error})
catch catch
E:R:S -> E:R:S ->
?WARN(ServerName, "handle tcp data crash: ~p:~p~n~p~n", [E, R, S]),
?WARN(ServerName, "handle tcp data crash: ~p:~p~n~p~n ~p ~n ", [E, R, S, CurInfo]),
gen_tcp:close(Socket), gen_tcp:close(Socket),
dealClose(SrvState, CliState, {{error, agency_handledata_error}}) dealClose(SrvState, CliState, {{error, agency_handledata_error}})
end; end;
handleMsg({timeout, TimerRef, waiting},
handleMsg({timeout, TimerRef, waiting_over},
#srvState{socket = Socket} = SrvState, #srvState{socket = Socket} = SrvState,
#cliState{backlogNum = BacklogNum, curInfo = {FromPid, RequestId, TimerRef}} = CliState) -> #cliState{backlogNum = BacklogNum, curInfo = {FromPid, RequestId, TimerRef}} = CliState) ->
agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, timeout}), agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, timeout}),
@ -112,7 +112,6 @@ handleMsg({tcp_closed, Socket},
handleMsg({tcp_error, Socket, Reason}, handleMsg({tcp_error, Socket, Reason},
#srvState{socket = Socket, serverName = ServerName} = SrvState, #srvState{socket = Socket, serverName = ServerName} = SrvState,
CliState) -> CliState) ->
?WARN(ServerName, "connection error: ~p~n", [Reason]), ?WARN(ServerName, "connection error: ~p~n", [Reason]),
gen_tcp:close(Socket), gen_tcp:close(Socket),
dealClose(SrvState, CliState, {error, tcp_error}); dealClose(SrvState, CliState, {error, tcp_error});
@ -130,7 +129,7 @@ handleMsg(?miDoNetConnect,
undefined -> undefined ->
{ok, SrvState#srvState{userPassWord = UserPassword, host = Host, reconnectState = NewReconnectState, socket = Socket}, NewCliState}; {ok, SrvState#srvState{userPassWord = UserPassword, host = Host, reconnectState = NewReconnectState, socket = Socket}, NewCliState};
MiRequest -> MiRequest ->
dealQueueRequest(MiRequest, SrvState#srvState{socket = Socket}, NewCliState)
dealQueueRequest(MiRequest, SrvState#srvState{socket = Socket, reconnectState = NewReconnectState}, NewCliState)
end; end;
{error, _Reason} -> {error, _Reason} ->
reconnectTimer(SrvState, CliState) reconnectTimer(SrvState, CliState)
@ -185,9 +184,10 @@ dealQueueRequest({miRequest, FromPid, Method, Path, Headers, Body, RequestId, Ov
case erlang:system_time(millisecond) > OverTime of case erlang:system_time(millisecond) > OverTime of
true -> true ->
%% %%
agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, timeout}),
case agAgencyUtils:getQueue(RequestsOut + 2) of case agAgencyUtils:getQueue(RequestsOut + 2) of
undefined -> undefined ->
{ok, SrvState, CliState#cliState{status = waiting, requestsOut = RequestsOut + 1}};
{ok, SrvState, CliState#cliState{requestsOut = RequestsOut + 1}};
MiRequest -> MiRequest ->
dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOut = RequestsOut + 1}) dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOut = RequestsOut + 1})
end; end;
@ -200,7 +200,7 @@ dealQueueRequest({miRequest, FromPid, Method, Path, Headers, Body, RequestId, Ov
infinity -> infinity ->
undefined; undefined;
_ -> _ ->
erlang:start_timer(OverTime, self(), waiting, [{abs, true}])
erlang:start_timer(OverTime, self(), waiting_over, [{abs, true}])
end, end,
{ok, SrvState, CliState#cliState{status = waiting, requestsOut = RequestsOut + 1, curInfo = {FromPid, RequestId, TimerRef}}}; {ok, SrvState, CliState#cliState{status = waiting, requestsOut = RequestsOut + 1, curInfo = {FromPid, RequestId, TimerRef}}};
{error, Reason} -> {error, Reason} ->

Loading…
取消
儲存