Browse Source

Merge pull request #117 from basho/adt-sync-switch

Switch between gen_event notify/sync_notify based on message queue length
pull/118/merge
Andrew Thompson 12 years ago
parent
commit
cf408671db
4 changed files with 148 additions and 2 deletions
  1. +8
    -2
      src/lager.erl
  2. +12
    -0
      src/lager_app.erl
  3. +77
    -0
      src/lager_backend_throttle.erl
  4. +51
    -0
      test/lager_test_backend.erl

+ 8
- 2
src/lager.erl View File

@ -78,8 +78,14 @@ dispatch_log(Severity, Metadata, Format, Args, Size) when is_atom(Severity)->
_ ->
Format
end,
gen_event:sync_notify(Pid, {log, lager_msg:new(Msg, Timestamp,
Severity, Metadata, Destinations)});
LagerMsg = lager_msg:new(Msg, Timestamp,
Severity, Metadata, Destinations),
case lager_config:get(async, false) of
true ->
gen_event:notify(Pid, {log, LagerMsg});
false ->
gen_event:sync_notify(Pid, {log, LagerMsg})
end;
false ->
ok
end;

+ 12
- 0
src/lager_app.erl View File

@ -34,6 +34,18 @@ start() ->
start(_StartType, _StartArgs) ->
{ok, Pid} = lager_sup:start_link(),
case application:get_env(lager, async_threshold) of
undefined ->
ok;
{ok, Threshold} when is_integer(Threshold), Threshold >= 0 ->
_ = supervisor:start_child(lager_handler_watcher_sup, [lager_event, lager_backend_throttle, Threshold]);
{ok, BadThreshold} ->
error_logger:error_msg("Invalid value for 'async_threshold': ~p~n", [BadThreshold]),
throw({error, bad_config})
end,
Handlers = case application:get_env(lager, handlers) of
undefined ->
[{lager_console_backend, info},

+ 77
- 0
src/lager_backend_throttle.erl View File

@ -0,0 +1,77 @@
%% Copyright (c) 2011-2013 Basho Technologies, Inc. All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%% @doc A simple gen_event backend used to monitor mailbox size and
%% switch log messages between synchronous and asynchronous modes.
%% A gen_event handler is used because a process getting its own mailbox
%% size doesn't involve getting a lock, and gen_event handlers run in their
%% parent's process.
-module(lager_backend_throttle).
-include("lager.hrl").
-behaviour(gen_event).
-export([init/1, handle_call/2, handle_event/2, handle_info/2, terminate/2,
code_change/3]).
-record(state, {
hwm,
async = true
}).
init(Hwm) ->
lager_config:set(async, true),
{ok, #state{hwm=Hwm}}.
handle_call(get_loglevel, State) ->
{ok, {mask, ?LOG_NONE}, State};
handle_call({set_loglevel, _Level}, State) ->
{ok, ok, State};
handle_call(_Request, State) ->
{ok, ok, State}.
handle_event({log, _Message},State) ->
{message_queue_len, Len} = erlang:process_info(self(), message_queue_len),
case {Len > State#state.hwm, State#state.async} of
{true, true} ->
%% need to flip to sync mode
lager_config:set(async, false),
{ok, State#state{async=false}};
{false, false} ->
%% need to flip to async mode
lager_config:set(async, true),
{ok, State#state{async=true}};
_ ->
%% nothing needs to change
{ok, State}
end;
handle_event(_Event, State) ->
{ok, State}.
handle_info(_Info, State) ->
{ok, State}.
%% @private
terminate(_Reason, _State) ->
ok.
%% @private
code_change(_OldVsn, State, _Extra) ->
{ok, State}.

+ 51
- 0
test/lager_test_backend.erl View File

@ -1024,6 +1024,57 @@ safe_format_test() ->
?assertEqual("FORMAT ERROR: \"~p ~p ~p\" [foo,bar]", lists:flatten(lager:safe_format("~p ~p ~p", [foo, bar], 1024))),
ok.
async_threshold_test_() ->
{foreach,
fun() ->
error_logger:tty(false),
application:load(lager),
application:set_env(lager, error_logger_redirect, false),
application:set_env(lager, async_threshold, 10),
application:set_env(lager, handlers, [{?MODULE, info}]),
application:start(lager)
end,
fun(_) ->
application:unset_env(lager, async_threshold),
application:stop(lager),
error_logger:tty(true)
end,
[
{"async threshold works",
fun() ->
%% we start out async
?assertEqual(true, lager_config:get(async)),
%% put a ton of things in the queue
Workers = [spawn_monitor(fun() -> [lager:info("hello world") || _ <- lists:seq(1, 1000)] end) || _ <- lists:seq(1, 10)],
%% serialize on mailbox
_ = gen_event:which_handlers(lager_event),
%% there should be a ton of outstanding messages now, so async is false
?assertEqual(false, lager_config:get(async)),
%% wait for all the workers to return, meaning that all the messages have been logged (since we're in sync mode)
collect_workers(Workers),
%% serialize ont the mailbox again
_ = gen_event:which_handlers(lager_event),
%% just in case...
timer:sleep(100),
%% async is true again now that the mailbox has drained
?assertEqual(true, lager_config:get(async)),
ok
end
}
]
}.
collect_workers([]) ->
ok;
collect_workers(Workers) ->
receive
{'DOWN', Ref, _, _, _} ->
collect_workers(lists:keydelete(Ref, 2, Workers))
end.
-endif.

Loading…
Cancel
Save