Quellcode durchsuchen

Yield back to Erlang while encoding JSON

This adds a configurable limit on the number of bytes produced by
the encoder before yielding back to the Erlang VM. This is to avoid the
infamous scheduler collapse issues.

The `jiffy:encode/2` now takes an option `{bytes_per_iter,
pos_integer()}` that controls the yield frequency. The default value is
2048.
pull/65/head 0.10.0
Paul J. Davis vor 11 Jahren
Ursprung
Commit
bda503527d
2 geänderte Dateien mit 57 neuen und 4 gelöschten Zeilen
  1. +36
    -1
      c_src/encoder.c
  2. +21
    -3
      src/jiffy.erl

+ 36
- 1
c_src/encoder.c Datei anzeigen

@ -28,13 +28,17 @@ do { \
typedef struct { typedef struct {
ErlNifEnv* env; ErlNifEnv* env;
jiffy_st* atoms; jiffy_st* atoms;
size_t bytes_per_iter;
int uescape; int uescape;
int pretty; int pretty;
int shiftcnt; int shiftcnt;
int count; int count;
int iolen;
size_t iolen;
size_t iosize;
ERL_NIF_TERM iolist; ERL_NIF_TERM iolist;
ErlNifBinary bin; ErlNifBinary bin;
ErlNifBinary* curr; ErlNifBinary* curr;
@ -68,12 +72,14 @@ enc_new(ErlNifEnv* env)
Encoder* e = enif_alloc_resource(st->res_enc, sizeof(Encoder)); Encoder* e = enif_alloc_resource(st->res_enc, sizeof(Encoder));
e->atoms = st; e->atoms = st;
e->bytes_per_iter = DEFAULT_BYTES_PER_ITER;
e->uescape = 0; e->uescape = 0;
e->pretty = 0; e->pretty = 0;
e->shiftcnt = 0; e->shiftcnt = 0;
e->count = 0; e->count = 0;
e->iolen = 0; e->iolen = 0;
e->iosize = 0;
e->curr = &(e->bin); e->curr = &(e->bin);
if(!enif_alloc_binary(BIN_INC_SIZE, e->curr)) { if(!enif_alloc_binary(BIN_INC_SIZE, e->curr)) {
e->curr = NULL; e->curr = NULL;
@ -184,6 +190,12 @@ enc_unknown(Encoder* e, ERL_NIF_TERM value)
e->iolist = enif_make_list_cell(e->env, value, e->iolist); e->iolist = enif_make_list_cell(e->env, value, e->iolist);
e->iolen++; e->iolen++;
// Track the total number of bytes produced before
// splitting our IO buffer. We add 16 to this value
// as a rough estimate of the number of bytes that
// a bignum might produce when encoded.
e->iosize += e->i + 16;
// Reinitialize our binary for the next buffer. // Reinitialize our binary for the next buffer.
e->curr = bin; e->curr = bin;
@ -525,6 +537,8 @@ encode_init(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
e->pretty = 1; e->pretty = 1;
} else if(enif_compare(val, e->atoms->atom_force_utf8) == 0) { } else if(enif_compare(val, e->atoms->atom_force_utf8) == 0) {
// Ignore, handled in Erlang // Ignore, handled in Erlang
} else if(get_bytes_per_iter(env, val, &(e->bytes_per_iter))) {
continue;
} else { } else {
return enif_make_badarg(env); return enif_make_badarg(env);
} }
@ -549,6 +563,9 @@ encode_iter(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
ErlNifSInt64 lval; ErlNifSInt64 lval;
double dval; double dval;
size_t start;
size_t processed;
if(argc != 3) { if(argc != 3) {
return enif_make_badarg(env); return enif_make_badarg(env);
} else if(!enif_get_resource(env, argv[0], st->res_enc, (void**) &e)) { } else if(!enif_get_resource(env, argv[0], st->res_enc, (void**) &e)) {
@ -566,7 +583,22 @@ encode_iter(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
stack = argv[1]; stack = argv[1];
e->iolist = argv[2]; e->iolist = argv[2];
start = e->iosize + e->i;
while(!enif_is_empty_list(env, stack)) { while(!enif_is_empty_list(env, stack)) {
processed = (e->iosize + e->i) - start;
if(should_yield(processed, e->bytes_per_iter)) {
consume_timeslice(env, processed, e->bytes_per_iter);
return enif_make_tuple4(
env,
st->atom_iter,
argv[0],
stack,
e->iolist
);
}
if(!enif_get_list_cell(env, stack, &curr, &stack)) { if(!enif_get_list_cell(env, stack, &curr, &stack)) {
ret = enc_error(e, "internal_error"); ret = enc_error(e, "internal_error");
goto done; goto done;
@ -750,5 +782,8 @@ encode_iter(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
} }
done: done:
processed = (e->iosize + e->i) - start;
consume_timeslice(env, processed, e->bytes_per_iter);
return ret; return ret;
} }

+ 21
- 3
src/jiffy.erl Datei anzeigen

@ -41,6 +41,8 @@ encode(Data, Options) ->
throw(Error); throw(Error);
{partial, IOData} -> {partial, IOData} ->
finish_encode(IOData, []); finish_encode(IOData, []);
{iter, Encoder, Stack, IOBuf} ->
encode_loop(Data, Options, Encoder, Stack, IOBuf);
IOData -> IOData ->
IOData IOData
end. end.
@ -116,6 +118,23 @@ decode_loop(Data, Decoder, Objs, Curr) ->
end. end.
encode_loop(Data, Options, Encoder, Stack, IOBuf) ->
ForceUTF8 = lists:member(force_utf8, Options),
case nif_encode_iter(Encoder, Stack, IOBuf) of
{error, invalid_string} when ForceUTF8 == true ->
FixedData = jiffy_utf8:fix(Data),
encode(FixedData, Options -- [force_utf8]);
{error, _} = Error ->
throw(Error);
{partial, IOData} ->
finish_encode(IOData, []);
{iter, NewEncoder, NewStack, NewIOBuf} ->
encode_loop(Data, Options, NewEncoder, NewStack, NewIOBuf);
IOData ->
IOData
end.
not_loaded(Line) -> not_loaded(Line) ->
erlang:nif_error({not_loaded, [{module, ?MODULE}, {line, Line}]}). erlang:nif_error({not_loaded, [{module, ?MODULE}, {line, Line}]}).
@ -126,8 +145,7 @@ nif_decode_iter(_Data, _Decoder, _, _) ->
?NOT_LOADED. ?NOT_LOADED.
nif_encode_init(_Data, _Options) -> nif_encode_init(_Data, _Options) ->
?NOT_LOADED,
nif_encode_iter(x, y, z).
?NOT_LOADED.
nif_encode_iter(_Encoder, _Stack, _IoList) -> nif_encode_iter(_Encoder, _Stack, _IoList) ->
?NOT_LOADED.
?NOT_LOADED.

Laden…
Abbrechen
Speichern