浏览代码

Yield back to Erlang while decoding JSON

This adds a configurable limit on the number of bytes consumed by
the decoder before yielding back to the Erlang VM. This is to avoid the
infamous scheduler collapse issues.

The `jiffy:decode/2` now takes an option `{bytes_per_iter,
pos_integer()}` that controls the yield frequency. The default value is
2048.
pull/65/head
Paul J. Davis 11 年前
父节点
当前提交
e9a102af7d
共有 5 个文件被更改,包括 124 次插入10 次删除
  1. +31
    -1
      c_src/decoder.c
  2. +3
    -1
      c_src/jiffy.c
  3. +7
    -0
      c_src/jiffy.h
  4. +56
    -0
      c_src/util.c
  5. +27
    -8
      src/jiffy.erl

+ 31
- 1
c_src/decoder.c 查看文件

@ -49,6 +49,7 @@ typedef struct {
ERL_NIF_TERM arg;
ErlNifBinary bin;
size_t bytes_per_iter;
int is_partial;
char* p;
@ -74,6 +75,7 @@ dec_new(ErlNifEnv* env)
d->atoms = st;
d->bytes_per_iter = DEFAULT_BYTES_PER_ITER;
d->is_partial = 0;
d->p = NULL;
@ -639,8 +641,10 @@ decode_init(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
Decoder* d;
jiffy_st* st = (jiffy_st*) enif_priv_data(env);
ERL_NIF_TERM tmp_argv[4];
ERL_NIF_TERM opts;
ERL_NIF_TERM val;
if(argc != 1) {
if(argc != 2) {
return enif_make_badarg(env);
}
@ -656,6 +660,19 @@ decode_init(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
enif_release_resource(d);
opts = argv[1];
if(!enif_is_list(env, opts)) {
return enif_make_badarg(env);
}
while(enif_get_list_cell(env, opts, &val, &opts)) {
if(get_bytes_per_iter(env, val, &(d->bytes_per_iter))) {
continue;
} else {
return enif_make_badarg(env);
}
}
return decode_iter(env, 4, tmp_argv);
}
@ -671,6 +688,7 @@ decode_iter(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
ERL_NIF_TERM curr;
ERL_NIF_TERM val;
ERL_NIF_TERM ret;
size_t start;
if(argc != 4) {
return enif_make_badarg(env);
@ -689,8 +707,19 @@ decode_iter(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
curr = argv[3];
//fprintf(stderr, "Parsing:\r\n");
start = d->i;
while(d->i < bin.size) {
//fprintf(stderr, "state: %d\r\n", dec_curr(d));
if(should_yield(d->i - start, d->bytes_per_iter)) {
consume_timeslice(env, d->i - start, d->bytes_per_iter);
return enif_make_tuple4(
env,
st->atom_iter,
argv[1],
objs,
curr
);
}
switch(dec_curr(d)) {
case st_value:
switch(d->p[d->i]) {
@ -971,5 +1000,6 @@ decode_iter(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
}
done:
consume_timeslice(env, d->i - start, d->bytes_per_iter);
return ret;
}

+ 3
- 1
c_src/jiffy.c 查看文件

@ -23,6 +23,8 @@ load(ErlNifEnv* env, void** priv, ERL_NIF_TERM info)
st->atom_uescape = make_atom(env, "uescape");
st->atom_pretty = make_atom(env, "pretty");
st->atom_force_utf8 = make_atom(env, "force_utf8");
st->atom_iter = make_atom(env, "iter");
st->atom_bytes_per_iter = make_atom(env, "bytes_per_iter");
// Markers used in encoding
st->ref_object = make_atom(env, "$object_ref$");
@ -72,7 +74,7 @@ unload(ErlNifEnv* env, void* priv)
static ErlNifFunc funcs[] =
{
{"nif_decode_init", 1, decode_init},
{"nif_decode_init", 2, decode_init},
{"nif_decode_iter", 4, decode_iter},
{"nif_encode_init", 2, encode_init},
{"nif_encode_iter", 3, encode_iter}

+ 7
- 0
c_src/jiffy.h 查看文件

@ -6,6 +6,8 @@
#include "erl_nif.h"
#define DEFAULT_BYTES_PER_ITER 2048
typedef struct {
ERL_NIF_TERM atom_ok;
ERL_NIF_TERM atom_error;
@ -19,6 +21,8 @@ typedef struct {
ERL_NIF_TERM atom_uescape;
ERL_NIF_TERM atom_pretty;
ERL_NIF_TERM atom_force_utf8;
ERL_NIF_TERM atom_iter;
ERL_NIF_TERM atom_bytes_per_iter;
ERL_NIF_TERM ref_object;
ERL_NIF_TERM ref_array;
@ -30,6 +34,9 @@ typedef struct {
ERL_NIF_TERM make_atom(ErlNifEnv* env, const char* name);
ERL_NIF_TERM make_ok(jiffy_st* st, ErlNifEnv* env, ERL_NIF_TERM data);
ERL_NIF_TERM make_error(jiffy_st* st, ErlNifEnv* env, const char* error);
int get_bytes_per_iter(ErlNifEnv* env, ERL_NIF_TERM val, size_t* bpi);
int should_yield(size_t used, size_t limit);
int consume_timeslice(ErlNifEnv* env, size_t used, size_t limit);
ERL_NIF_TERM decode_init(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]);
ERL_NIF_TERM decode_iter(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]);

+ 56
- 0
c_src/util.c 查看文件

@ -24,3 +24,59 @@ make_error(jiffy_st* st, ErlNifEnv* env, const char* error)
{
return enif_make_tuple2(env, st->atom_error, make_atom(env, error));
}
int
get_bytes_per_iter(ErlNifEnv* env, ERL_NIF_TERM val, size_t* bpi)
{
jiffy_st* st = (jiffy_st*) enif_priv_data(env);
const ERL_NIF_TERM* tuple;
int arity;
if(!enif_get_tuple(env, val, &arity, &tuple)) {
return 0;
}
if(arity != 2) {
return 0;
}
if(enif_compare(tuple[0], st->atom_bytes_per_iter) != 0) {
return 0;
}
if(!enif_get_uint64(env, tuple[1], bpi)) {
return 0;
}
return 1;
}
int
should_yield(size_t used, size_t limit)
{
if(limit == 0 || used < limit) {
return 0;
}
return 1;
}
int
consume_timeslice(ErlNifEnv* env, size_t used, size_t limit)
{
#if(ERL_NIF_MAJOR_VERSION >= 2 && ERL_NIF_MINOR_VERSION >= 4)
double u = (double) used;
double l = (double) limit;
int perc = (int) (100.0 * (u / l));
if(perc < 1) {
perc = 1;
} else if(perc > 100) {
perc = 100;
}
return enif_consume_timeslice(env, perc);
#else
return 0;
#endif
}

+ 27
- 8
src/jiffy.erl 查看文件

@ -2,22 +2,29 @@
% See the LICENSE file for more information.
-module(jiffy).
-export([decode/1, encode/1, encode/2]).
-export([decode/1, decode/2, encode/1, encode/2]).
-define(NOT_LOADED, not_loaded(?LINE)).
-on_load(init/0).
decode(Data) when is_binary(Data) ->
case nif_decode_init(Data) of
decode(Data) ->
decode(Data, []).
decode(Data, Opts) when is_binary(Data), is_list(Opts) ->
case nif_decode_init(Data, Opts) of
{error, _} = Error ->
throw(Error);
{partial, EJson} ->
finish_decode(EJson);
{iter, Decoder, Objs, Curr} ->
decode_loop(Data, Decoder, Objs, Curr);
EJson ->
EJson
end;
decode(Data) when is_list(Data) ->
decode(iolist_to_binary(Data)).
decode(Data, Opts) when is_list(Data) ->
decode(iolist_to_binary(Data), Opts).
encode(Data) ->
@ -96,12 +103,24 @@ init() ->
erlang:load_nif(filename:join(PrivDir, "jiffy"), 0).
decode_loop(Data, Decoder, Objs, Curr) ->
case nif_decode_iter(Data, Decoder, Objs, Curr) of
{error, _} = Error ->
throw(Error);
{partial, EJson} ->
finish_decode(EJson);
{iter, NewDecoder, NewObjs, NewCurr} ->
decode_loop(Data, NewDecoder, NewObjs, NewCurr);
EJson ->
EJson
end.
not_loaded(Line) ->
erlang:nif_error({not_loaded, [{module, ?MODULE}, {line, Line}]}).
nif_decode_init(_Data) ->
?NOT_LOADED,
nif_decode_iter(w, x, y, z).
nif_decode_init(_Data, _Opts) ->
?NOT_LOADED.
nif_decode_iter(_Data, _Decoder, _, _) ->
?NOT_LOADED.

正在加载...
取消
保存