diff --git a/src/httpCli/agHttpCli.erl b/src/httpCli/agHttpCli.erl index 86c1fa2..d3c8204 100644 --- a/src/httpCli/agHttpCli.erl +++ b/src/httpCli/agHttpCli.erl @@ -21,7 +21,7 @@ -spec callAgency(poolName(), method(), path(), headers(), body()) -> term() | {error, term()}. callAgency(PoolName, Method, Path, Headers, Body) -> - callAgency(PoolName, Method, Path, Headers, Body, infinity). + callAgency(PoolName, Method, Path, Headers, Body, ?DEFAULT_TIMEOUT). -spec callAgency(poolName(), method(), path(), headers(), body(), timeout()) -> term() | {error, atom()}. callAgency(PoolName, Method, Path, Headers, Body, Timeout) -> @@ -34,7 +34,7 @@ callAgency(PoolName, Method, Path, Headers, Body, Timeout) -> -spec castAgency(poolName(), method(), path(), headers(), body()) -> {ok, requestId()} | {error, atom()}. castAgency(PoolName, Method, Path, Headers, Body) -> - castAgency(PoolName, Method, Path, Headers, Body, infinity, self()). + castAgency(PoolName, Method, Path, Headers, Body, ?DEFAULT_TIMEOUT, self()). -spec castAgency(poolName(), method(), path(), headers(), body(), timeout()) -> {ok, requestId()} | {error, atom()}. castAgency(PoolName, Method, Path, Headers, Body, Timeout) -> @@ -58,7 +58,6 @@ castAgency(PoolName, Method, Path, Headers, Body, Timeout, Pid) -> receiveResponse(RequestId) -> receive #miAgHttpCliRet{requestId = RequestId, reply = Reply} -> - %io:format("IMY************************ miAgHttpCliRet ~p ~p ~n", [111, size(element(4, Reply))]), Reply end. diff --git a/src/httpCli/agHttpProtocol.erl b/src/httpCli/agHttpProtocol.erl index 121dcb0..c8a1229 100644 --- a/src/httpCli/agHttpProtocol.erl +++ b/src/httpCli/agHttpProtocol.erl @@ -44,25 +44,25 @@ response(Data) -> -spec response(undefined | recvState(), binary:cp(), binary:cp(), binary()) -> {ok, recvState()} | error(). response(undefined, Rn, RnRn, Data) -> case parseStatusLine(Data, Rn) of - {StatusCode, Reason, Rest} -> + {StatusCode, Rest} -> case splitHeaders(Rest, Rn, RnRn) of {undefined, Headers, Body} -> - {done, #recvState{stage = done, statusCode = StatusCode, reason = Reason, headers = Headers, contentLength = undefined, body = Body}}; + {done, #recvState{stage = done, statusCode = StatusCode, headers = Headers, contentLength = undefined, body = Body}}; {0, Headers, Rest} -> - {done, #recvState{stage = done, statusCode = StatusCode, reason = Reason, headers = Headers, contentLength = 0, body = Rest}}; + {done, #recvState{stage = done, statusCode = StatusCode, headers = Headers, contentLength = 0, body = Rest}}; {chunked, Headers, Body} -> - RecvState = #recvState{stage = body, contentLength = chunked, statusCode = StatusCode, reason = Reason, headers = Headers}, + RecvState = #recvState{stage = body, contentLength = chunked, statusCode = StatusCode, headers = Headers}, response(RecvState, Rn, RnRn, Body); {ContentLength, Headers, Body} -> BodySize = erlang:size(Body), if BodySize == ContentLength -> - {done, #recvState{stage = done, statusCode = StatusCode, reason = Reason, headers = Headers, contentLength = ContentLength, body = Body}}; + {done, #recvState{stage = done, statusCode = StatusCode, headers = Headers, contentLength = ContentLength, body = Body}}; BodySize > ContentLength -> ?WARN(agTcpAgencyIns, "11 contentLength get to long data why? more: ~p ~n",[BodySize - ContentLength]), - {done, #recvState{stage = done, statusCode = StatusCode, reason = Reason, headers = Headers, contentLength = ContentLength, body = Body}}; + {done, #recvState{stage = done, statusCode = StatusCode, headers = Headers, contentLength = ContentLength, body = Body}}; true -> - {ok, #recvState{stage = body, statusCode = StatusCode, reason = Reason, headers = Headers, contentLength = ContentLength, body = Body}} + {ok, #recvState{stage = body, statusCode = StatusCode, headers = Headers, contentLength = ContentLength, body = Body}} end; not_enough_data -> %% headers都不足 这也可以能发生么 @@ -98,31 +98,35 @@ response(#recvState{stage = body, contentLength = ContentLength, body = Body} = true -> {ok,RecvState#recvState{body = CurData}} end; -response(#recvState{stage = header, body = Body}, Rn, RnRn, Data) -> - CurData = <>, - case parseStatusLine(CurData, Rn) of - {StatusCode, Reason, Rest} -> +response(#recvState{stage = header, body = OldBody}, Rn, RnRn, Data) -> + CurBody = <>, + case parseStatusLine(CurBody, Rn) of + {StatusCode, Rest} -> case splitHeaders(Rest, Rn, RnRn) of {undefined, Headers, Body} -> - {done, #recvState{stage = done, statusCode = StatusCode, reason = Reason, headers = Headers, contentLength = undefined, body = Body}}; + {done, #recvState{stage = done, statusCode = StatusCode, headers = Headers, contentLength = undefined, body = Body}}; {0, Headers, Body} -> - {done, #recvState{stage = done, statusCode = StatusCode, reason = Reason, headers = Headers, contentLength = 0, body = Body}}; + {done, #recvState{stage = done, statusCode = StatusCode, headers = Headers, contentLength = 0, body = Body}}; {chunked, Headers, Rest} -> - RecvState = #recvState{stage = body, contentLength = chunked, statusCode = StatusCode, reason = Reason, headers = Headers}, + RecvState = #recvState{stage = body, contentLength = chunked, statusCode = StatusCode, headers = Headers}, response(RecvState, Rn, RnRn, Rest); {ContentLength, Headers, Body} -> - case erlang:size(Body) >= ContentLength of + BodySize = erlang:size(Body), + if + BodySize == ContentLength -> + {done, #recvState{stage = done, statusCode = StatusCode, headers = Headers, contentLength = ContentLength, body = Body}}; + BodySize > ContentLength -> + ?WARN(agTcpAgencyIns, "33 contentLength get to long data why? more: ~p ~n",[BodySize - ContentLength]), + {done, #recvState{stage = done, statusCode = StatusCode, headers = Headers, contentLength = ContentLength, body = Body}}; true -> - {done, #recvState{stage = done, statusCode = StatusCode, reason = Reason, headers = Headers, contentLength = ContentLength, body = Body}}; - _ -> - {ok, #recvState{stage = body, statusCode = StatusCode, reason = Reason, headers = Headers, contentLength = ContentLength, body = Body}} + {ok, #recvState{stage = body, statusCode = StatusCode, headers = Headers, contentLength = ContentLength, body = Body}} end; not_enough_data -> %% headers都不足 这也可以能发生么 - {ok, #recvState{stage = header, body = CurData}} + {ok, #recvState{stage = header, body = CurBody}} end; not_enough_data -> - {ok, #recvState{stage = header, body = CurData}}; + {ok, #recvState{stage = header, body = CurBody}}; {error, Reason} -> {error, Reason} end. @@ -159,35 +163,35 @@ parseStatusLine(Data, Rn) -> not_enough_data; [Line, Rest] -> case parseStatusReason(Line) of - {ok, StatusCode, Reason} -> - {StatusCode, Reason, Rest}; + {ok, StatusCode} -> + {StatusCode, Rest}; {error, Reason} -> {error, Reason} end end. parseStatusReason(<<"HTTP/1.1 200 OK">>) -> - {ok, 200, <<"OK">>}; + {ok, 200}; parseStatusReason(<<"HTTP/1.1 204 No Content">>) -> - {ok, 204, <<"No Content">>}; + {ok, 204}; parseStatusReason(<<"HTTP/1.1 301 Moved Permanently">>) -> - {ok, 301, <<"Moved Permanently">>}; + {ok, 301}; parseStatusReason(<<"HTTP/1.1 302 Found">>) -> - {ok, 302, <<"Found">>}; + {ok, 302}; parseStatusReason(<<"HTTP/1.1 403 Forbidden">>) -> - {ok, 403, <<"Forbidden">>}; + {ok, 403}; parseStatusReason(<<"HTTP/1.1 404 Not Found">>) -> - {ok, 404, <<"Not Found">>}; + {ok, 404}; parseStatusReason(<<"HTTP/1.1 500 Internal Server Error">>) -> - {ok, 500, <<"Internal Server Error">>}; + {ok, 500}; parseStatusReason(<<"HTTP/1.1 502 Bad Gateway">>) -> - {ok, 502, <<"Bad Gateway">>}; -parseStatusReason(<<"HTTP/1.1 ", N1, N2, N3, " ", Reason/bits>>) + {ok, 502}; +parseStatusReason(<<"HTTP/1.1 ", N1, N2, N3, " ", _Reason/bits>>) when $0 =< N1, N1 =< $9, $0 =< N2, N2 =< $9, $0 =< N3, N3 =< $9 -> StatusCode = (N1 - $0) * 100 + (N2 - $0) * 10 + (N3 - $0), - {ok, StatusCode, Reason}; + {ok, StatusCode}; parseStatusReason(<<"HTTP/1.0 ", _/binary>>) -> {error, unsupported_feature}; parseStatusReason(_) -> diff --git a/src/httpCli/agSslAgencyIns.erl b/src/httpCli/agSslAgencyIns.erl index e9a5af2..fb8e83d 100644 --- a/src/httpCli/agSslAgencyIns.erl +++ b/src/httpCli/agSslAgencyIns.erl @@ -57,11 +57,11 @@ handleMsg({miRequest, FromPid, Method, Path, Headers, Body, RequestId, OverTime} infinity -> undefined; _ -> - erlang:start_timer(OverTime, self(), waiting, [{abs, true}]) + erlang:start_timer(OverTime, self(), waiting_over, [{abs, true}]) end, {ok, SrvState, CliState#cliState{status = waiting, backlogNum = BacklogNum + 1, curInfo = {FromPid, RequestId, TimerRef}}}; {error, Reason} -> - ?WARN(ServerName, ":send error: ~p~n", [Reason]), + ?WARN(ServerName, ":send error: ~p ~p ~p ~n", [Reason, FromPid, RequestId]), ssl:close(Socket), agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, socket_send_error}), dealClose(SrvState, CliState, {error, socket_send_error}) @@ -86,12 +86,12 @@ handleMsg({ssl, Socket, Data}, {ok, NewRecvState} -> {ok, SrvState, CliState#cliState{recvState = NewRecvState}}; {error, Reason} -> - ?WARN(ServerName, "handle ssl data error: ~p~n", [Reason]), + ?WARN(ServerName, "handle ssl data error: ~p ~p ~n", [Reason, CurInfo]), ssl:close(Socket), dealClose(SrvState, CliState, {error, ssl_data_error}) catch E:R:S -> - ?WARN(ServerName, "handle ssl data crash: ~p:~p~n~p~n", [E, R, S]), + ?WARN(ServerName, "handle ssl data crash: ~p:~p~n~p~n ~p~n ", [E, R, S, CurInfo]), ssl:close(Socket), dealClose(SrvState, CliState, {{error, agency_handledata_error}}) end; @@ -185,6 +185,7 @@ dealQueueRequest({miRequest, FromPid, Method, Path, Headers, Body, RequestId, Ov case erlang:system_time(millisecond) > OverTime of true -> %% 超时了 + agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, timeout}), case agAgencyUtils:getQueue(RequestsOut + 2) of undefined -> {ok, SrvState, CliState#cliState{status = waiting, requestsOut = RequestsOut + 1}}; diff --git a/src/httpCli/agTcpAgencyIns.erl b/src/httpCli/agTcpAgencyIns.erl index 1110c73..3a95ea8 100644 --- a/src/httpCli/agTcpAgencyIns.erl +++ b/src/httpCli/agTcpAgencyIns.erl @@ -57,11 +57,11 @@ handleMsg({miRequest, FromPid, Method, Path, Headers, Body, RequestId, OverTime} infinity -> undefined; _ -> - erlang:start_timer(OverTime, self(), waiting, [{abs, true}]) + erlang:start_timer(OverTime, self(), waiting_over, [{abs, true}]) end, {ok, SrvState, CliState#cliState{status = waiting, backlogNum = BacklogNum + 1, curInfo = {FromPid, RequestId, TimerRef}}}; {error, Reason} -> - ?WARN(ServerName, ":send error: ~p~n", [Reason]), + ?WARN(ServerName, ":send error: ~p ~p ~p ~n", [Reason, FromPid, RequestId]), gen_tcp:close(Socket), agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, socket_send_error}), dealClose(SrvState, CliState, {error, socket_send_error}) @@ -86,16 +86,16 @@ handleMsg({tcp, Socket, Data}, {ok, NewRecvState} -> {ok, SrvState, CliState#cliState{recvState = NewRecvState}}; {error, Reason} -> - ?WARN(ServerName, "handle tcp data error: ~p~n", [Reason]), + ?WARN(ServerName, "handle tcp data error: ~p ~p ~n", [Reason, CurInfo]), gen_tcp:close(Socket), dealClose(SrvState, CliState, {error, tcp_data_error}) catch E:R:S -> - ?WARN(ServerName, "handle tcp data crash: ~p:~p~n~p~n", [E, R, S]), + ?WARN(ServerName, "handle tcp data crash: ~p:~p~n~p~n ~p ~n ", [E, R, S, CurInfo]), gen_tcp:close(Socket), dealClose(SrvState, CliState, {{error, agency_handledata_error}}) end; -handleMsg({timeout, TimerRef, waiting}, +handleMsg({timeout, TimerRef, waiting_over}, #srvState{socket = Socket} = SrvState, #cliState{backlogNum = BacklogNum, curInfo = {FromPid, RequestId, TimerRef}} = CliState) -> agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, timeout}), @@ -112,7 +112,6 @@ handleMsg({tcp_closed, Socket}, handleMsg({tcp_error, Socket, Reason}, #srvState{socket = Socket, serverName = ServerName} = SrvState, CliState) -> - ?WARN(ServerName, "connection error: ~p~n", [Reason]), gen_tcp:close(Socket), dealClose(SrvState, CliState, {error, tcp_error}); @@ -130,7 +129,7 @@ handleMsg(?miDoNetConnect, undefined -> {ok, SrvState#srvState{userPassWord = UserPassword, host = Host, reconnectState = NewReconnectState, socket = Socket}, NewCliState}; MiRequest -> - dealQueueRequest(MiRequest, SrvState#srvState{socket = Socket}, NewCliState) + dealQueueRequest(MiRequest, SrvState#srvState{socket = Socket, reconnectState = NewReconnectState}, NewCliState) end; {error, _Reason} -> reconnectTimer(SrvState, CliState) @@ -185,9 +184,10 @@ dealQueueRequest({miRequest, FromPid, Method, Path, Headers, Body, RequestId, Ov case erlang:system_time(millisecond) > OverTime of true -> %% 超时了 + agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, timeout}), case agAgencyUtils:getQueue(RequestsOut + 2) of undefined -> - {ok, SrvState, CliState#cliState{status = waiting, requestsOut = RequestsOut + 1}}; + {ok, SrvState, CliState#cliState{requestsOut = RequestsOut + 1}}; MiRequest -> dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOut = RequestsOut + 1}) end; @@ -200,7 +200,7 @@ dealQueueRequest({miRequest, FromPid, Method, Path, Headers, Body, RequestId, Ov infinity -> undefined; _ -> - erlang:start_timer(OverTime, self(), waiting, [{abs, true}]) + erlang:start_timer(OverTime, self(), waiting_over, [{abs, true}]) end, {ok, SrvState, CliState#cliState{status = waiting, requestsOut = RequestsOut + 1, curInfo = {FromPid, RequestId, TimerRef}}}; {error, Reason} ->