瀏覽代碼

Use enif_consume_timeslice and don't monopolize scheduler thread

pull/49/head
Sergey Urbanovich 11 年之前
父節點
當前提交
f63c10afdc
共有 6 個文件被更改,包括 200 次插入31 次删除
  1. +82
    -13
      c_src/decoder.c
  2. +45
    -10
      c_src/encoder.c
  3. +7
    -1
      c_src/jiffy.c
  4. +10
    -0
      c_src/jiffy.h
  5. +29
    -0
      c_src/util.c
  6. +27
    -7
      src/jiffy.erl

+ 82
- 13
c_src/decoder.c 查看文件

@ -47,7 +47,6 @@ typedef struct {
jiffy_st* atoms;
ERL_NIF_TERM arg;
ErlNifBinary bin;
int is_partial;
@ -59,22 +58,37 @@ typedef struct {
char* st_data;
int st_size;
int st_top;
int is_resource;
size_t reds;
} Decoder;
void
dec_init(Decoder* d, ErlNifEnv* env, ERL_NIF_TERM arg, ErlNifBinary* bin)
dec_init_bin(Decoder* d, ErlNifEnv* env, ERL_NIF_TERM arg, ErlNifBinary* bin)
{
d->arg = arg;
d->p = (char*) bin->data;
d->u = bin->data;
d->len = bin->size;
d->env = env;
}
int
dec_init(Decoder* d, ErlNifEnv* env, ERL_NIF_TERM arg,
ERL_NIF_TERM opts, ErlNifBinary* bin)
{
int i;
ERL_NIF_TERM val;
d->env = env;
d->atoms = enif_priv_data(env);
d->arg = arg;
d->is_partial = 0;
d->p = (char*) bin->data;
d->u = bin->data;
d->len = bin->size;
dec_init_bin(d, env, arg, bin);
d->i = 0;
d->st_data = (char*) enif_alloc(STACK_SIZE_INC * sizeof(char));
@ -87,11 +101,26 @@ dec_init(Decoder* d, ErlNifEnv* env, ERL_NIF_TERM arg, ErlNifBinary* bin)
d->st_data[0] = st_value;
d->st_top++;
d->reds = REDUCTIONS;
if(!enif_is_list(env, opts)) {
return 0;
}
while(enif_get_list_cell(env, opts, &val, &opts)) {
if(!get_reductions(env, val, d->atoms, &d->reds)) {
return 0;
}
}
d->is_resource = 0;
return 1;
}
void
dec_destroy(Decoder* d)
dec_destroy(ErlNifEnv* env, void* dec)
{
Decoder* d = dec;
if(d->st_data != NULL) {
enif_free(d->st_data);
}
@ -604,26 +633,60 @@ make_array(ErlNifEnv* env, ERL_NIF_TERM list)
return ret;
}
static ERL_NIF_TERM
dec_yield(Decoder* d, ERL_NIF_TERM objs, ERL_NIF_TERM curr)
{
Decoder* dec = d;
if(!d->is_resource) {
dec = enif_alloc_resource(d->atoms->res_decoder, sizeof(Decoder));
*dec = *d;
dec->is_resource = 1;
}
ERL_NIF_TERM val = enif_make_resource(d->env, dec);
return enif_make_tuple4(d->env,
d->atoms->atom_partial, val, objs, curr);
}
ERL_NIF_TERM
decode(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
{
Decoder dec;
Decoder* d = &dec;
ErlNifBinary bin;
ERL_NIF_TERM objs = enif_make_list(env, 0);
ERL_NIF_TERM curr = enif_make_list(env, 0);
ERL_NIF_TERM objs;
ERL_NIF_TERM curr;
ERL_NIF_TERM val;
ERL_NIF_TERM ret;
if(argc != 1) {
if(argc != 2) {
return enif_make_badarg(env);
} else if(!enif_inspect_binary(env, argv[0], &bin)) {
return enif_make_badarg(env);
}
dec_init(d, env, argv[0], &bin);
int arity;
ERL_NIF_TERM* args;
if(enif_get_tuple(env, argv[1], &arity, (const ERL_NIF_TERM **) &args)) {
jiffy_st *priv = enif_priv_data(env);
if(arity != 3 ) {
return enif_make_badarg(env);
}
if(!enif_get_resource(env, args[0], priv->res_decoder, (void **) &d)) {
return enif_make_badarg(env);
}
objs = args[1];
curr = args[2];
dec_init_bin(d, env, argv[0], &bin);
} else {
objs = enif_make_list(env, 0);
curr = enif_make_list(env, 0);
if (!dec_init(d, env, argv[0], argv[1], &bin)) {
return enif_make_badarg(env);
}
}
size_t processed = d->i;
//fprintf(stderr, "Parsing:\r\n");
while(d->i < bin.size) {
@ -897,6 +960,11 @@ decode(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
ret = dec_error(d, "invalid_internal_state");
goto done;
}
if(dec_curr(d) != st_done) {
if(jiffy_consume_timeslice(env, d->reds, d->i, &processed)) {
return dec_yield(d, objs, curr);
}
}
}
if(dec_curr(d) != st_done) {
@ -908,7 +976,8 @@ decode(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
}
done:
dec_destroy(d);
jiffy_consume_timeslice(env, d->reds, d->i, &processed);
dec_destroy(env, d);
return ret;
}

+ 45
- 10
c_src/encoder.c 查看文件

@ -36,12 +36,16 @@ typedef struct {
int iolen;
ERL_NIF_TERM iolist;
size_t iosize;
ErlNifBinary* curr;
char* p;
unsigned char* u;
size_t i;
int is_resource;
size_t reds;
} Encoder;
@ -61,7 +65,7 @@ static char* shifts[NUM_SHIFTS] = {
int
enc_init(Encoder* e, ErlNifEnv* env, ERL_NIF_TERM opts, ErlNifBinary* bin)
enc_init(Encoder* e, ErlNifEnv* env, ERL_NIF_TERM opts)
{
ERL_NIF_TERM val;
@ -71,6 +75,7 @@ enc_init(Encoder* e, ErlNifEnv* env, ERL_NIF_TERM opts, ErlNifBinary* bin)
e->pretty = 0;
e->shiftcnt = 0;
e->count = 0;
e->reds = REDUCTIONS;
if(!enif_is_list(env, opts)) {
return 0;
@ -83,15 +88,16 @@ enc_init(Encoder* e, ErlNifEnv* env, ERL_NIF_TERM opts, ErlNifBinary* bin)
e->pretty = 1;
} else if(enif_compare(val, e->atoms->atom_force_utf8) == 0) {
// Ignore, handled in Erlang
} else {
} else if(!get_reductions(env, val, e->atoms, &e->reds)) {
return 0;
}
}
e->iolen = 0;
e->iolist = enif_make_list(env, 0);
e->curr = bin;
if(!enif_alloc_binary(BIN_INC_SIZE, e->curr)) {
e->iosize = 0;
e->curr = enif_alloc(sizeof(ErlNifBinary));
if(!e->curr || !enif_alloc_binary(BIN_INC_SIZE, e->curr)) {
return 0;
}
@ -101,15 +107,19 @@ enc_init(Encoder* e, ErlNifEnv* env, ERL_NIF_TERM opts, ErlNifBinary* bin)
e->u = (unsigned char*) e->curr->data;
e->i = 0;
e->is_resource = 0;
return 1;
}
void
enc_destroy(Encoder* e)
enc_destroy(ErlNifEnv* env, void* enc)
{
Encoder *e = enc;
if(e->curr != NULL) {
enif_release_binary(e->curr);
}
enif_free(e->curr);
}
ERL_NIF_TERM
@ -189,6 +199,7 @@ enc_unknown(Encoder* e, ERL_NIF_TERM value)
e->iolist = enif_make_list_cell(e->env, value, e->iolist);
e->iolen++;
e->iosize += e->i;
// Reinitialize our binary for the next buffer.
e->curr = bin;
@ -493,13 +504,26 @@ enc_comma(Encoder* e)
return 1;
}
static ERL_NIF_TERM
enc_yield(Encoder* e, ERL_NIF_TERM stack)
{
Encoder* enc = e;
if(!e->is_resource) {
enc = enif_alloc_resource(e->atoms->res_encoder, sizeof(Encoder));
*enc = *e;
enc->is_resource = 1;
}
ERL_NIF_TERM val = enif_make_resource(e->env, enc);
return enif_make_tuple3(e->env, e->atoms->atom_partial, val, stack);
}
ERL_NIF_TERM
encode(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
{
Encoder enc;
Encoder* e = &enc;
ErlNifBinary bin;
ERL_NIF_TERM ret;
ERL_NIF_TERM stack;
@ -514,11 +538,18 @@ encode(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
return enif_make_badarg(env);
}
if(!enc_init(e, env, argv[1], &bin)) {
return enif_make_badarg(env);
jiffy_st *priv = enif_priv_data(env);
if(!enif_get_resource(env, argv[0], priv->res_encoder, (void **) &e)) {
if(!enc_init(e, env, argv[1])) {
return enif_make_badarg(env);
}
stack = enif_make_list(env, 1, argv[0]);
} else {
stack = argv[1];
e->env = env;
}
stack = enif_make_list(env, 1, argv[0]);
size_t processed = e->iosize + e->i;
while(!enif_is_empty_list(env, stack)) {
if(!enif_get_list_cell(env, stack, &curr, &stack)) {
@ -690,6 +721,9 @@ encode(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
goto done;
}
}
if(jiffy_consume_timeslice(env, e->reds, e->iosize + e->i, &processed)) {
return enc_yield(e, stack);
}
}
if(!enc_done(e, &item)) {
@ -704,6 +738,7 @@ encode(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
}
done:
enc_destroy(e);
jiffy_consume_timeslice(env, e->reds, e->i, &processed);
enc_destroy(env, e);
return ret;
}

+ 7
- 1
c_src/jiffy.c 查看文件

@ -23,11 +23,17 @@ load(ErlNifEnv* env, void** priv, ERL_NIF_TERM info)
st->atom_uescape = make_atom(env, "uescape");
st->atom_pretty = make_atom(env, "pretty");
st->atom_force_utf8 = make_atom(env, "force_utf8");
st->atom_reductions = make_atom(env, "reductions");
// Markers used in encoding
st->ref_object = make_atom(env, "$object_ref$");
st->ref_array = make_atom(env, "$array_ref$");
st->res_encoder = enif_open_resource_type(env, "jiffy", "encoder",
enc_destroy, ERL_NIF_RT_CREATE, NULL);
st->res_decoder = enif_open_resource_type(env, "jiffy", "decoder",
dec_destroy, ERL_NIF_RT_CREATE, NULL);
*priv = (void*) st;
return 0;
@ -54,7 +60,7 @@ unload(ErlNifEnv* env, void* priv)
static ErlNifFunc funcs[] =
{
{"nif_decode", 1, decode},
{"nif_decode", 2, decode},
{"nif_encode", 2, encode}
};

+ 10
- 0
c_src/jiffy.h 查看文件

@ -6,6 +6,8 @@
#include "erl_nif.h"
#define REDUCTIONS 1000
typedef struct {
ERL_NIF_TERM atom_ok;
ERL_NIF_TERM atom_error;
@ -19,17 +21,25 @@ typedef struct {
ERL_NIF_TERM atom_uescape;
ERL_NIF_TERM atom_pretty;
ERL_NIF_TERM atom_force_utf8;
ERL_NIF_TERM atom_reductions;
ERL_NIF_TERM ref_object;
ERL_NIF_TERM ref_array;
ErlNifResourceType *res_encoder;
ErlNifResourceType *res_decoder;
} jiffy_st;
ERL_NIF_TERM make_atom(ErlNifEnv* env, const char* name);
ERL_NIF_TERM make_ok(jiffy_st* st, ErlNifEnv* env, ERL_NIF_TERM data);
ERL_NIF_TERM make_error(jiffy_st* st, ErlNifEnv* env, const char* error);
int get_reductions(ErlNifEnv *env, ERL_NIF_TERM term, jiffy_st* st, size_t* val);
int jiffy_consume_timeslice(ErlNifEnv *env, size_t reds, size_t cur, size_t* proc);
ERL_NIF_TERM decode(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]);
ERL_NIF_TERM encode(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]);
void enc_destroy(ErlNifEnv* env, void* e);
void dec_destroy(ErlNifEnv* env, void* d);
int int_from_hex(const unsigned char* p);
int int_to_hex(int val, char* p);

+ 29
- 0
c_src/util.c 查看文件

@ -24,3 +24,32 @@ make_error(jiffy_st* st, ErlNifEnv* env, const char* error)
{
return enif_make_tuple2(env, st->atom_error, make_atom(env, error));
}
int
get_reductions(ErlNifEnv *env, ERL_NIF_TERM term, jiffy_st* st, size_t* val)
{
int arity;
const ERL_NIF_TERM *tuple;
return enif_get_tuple(env, term, &arity, &tuple) &&
arity == 2 &&
enif_compare(tuple[0], st->atom_reductions) == 0 &&
enif_get_int(env, tuple[1], (int*) val) &&
val >= 0;
}
int
jiffy_consume_timeslice(ErlNifEnv *env, size_t reds, size_t cur, size_t* proc) {
#if ERL_NIF_MAJOR_VERSION >= 2 && ERL_NIF_MINOR_VERSION >= 4
#define PERCENTS 10
if (reds > 0 && cur - *proc >= reds / PERCENTS) {
int percents = 100 * (cur - *proc) / reds;
percents = (percents < 1) ? 1 : (
(percents > 100) ? 100 :
percents );
*proc = cur;
return enif_consume_timeslice(env, percents);
}
#endif
return 0;
}

+ 27
- 7
src/jiffy.erl 查看文件

@ -2,13 +2,17 @@
% See the LICENSE file for more information.
-module(jiffy).
-export([decode/1, encode/1, encode/2]).
-export([decode/1, decode/2, encode/1, encode/2]).
-define(NOT_LOADED, not_loaded(?LINE)).
-on_load(init/0).
decode(Data) when is_binary(Data) ->
case nif_decode(Data) of
decode(Data) ->
decode(Data, []).
decode(Data, Options) when is_binary(Data) ->
case nif_decode_loop(Data, Options) of
{error, _} = Error ->
throw(Error);
{partial, EJson} ->
@ -16,8 +20,8 @@ decode(Data) when is_binary(Data) ->
EJson ->
EJson
end;
decode(Data) when is_list(Data) ->
decode(iolist_to_binary(Data)).
decode(Data, Options) when is_list(Data) ->
decode(iolist_to_binary(Data), Options).
encode(Data) ->
@ -26,7 +30,7 @@ encode(Data) ->
encode(Data, Options) ->
ForceUTF8 = lists:member(force_utf8, Options),
case nif_encode(Data, Options) of
case nif_encode_loop(Data, Options) of
{error, invalid_string} when ForceUTF8 == true ->
FixedData = jiffy_utf8:fix(Data),
encode(FixedData, Options -- [force_utf8]);
@ -99,9 +103,25 @@ init() ->
not_loaded(Line) ->
erlang:nif_error({not_loaded, [{module, ?MODULE}, {line, Line}]}).
nif_decode(_Data) ->
nif_decode_loop(Data, Options) ->
case nif_decode(Data, Options) of
{partial, Decoder, Objs, Curr} ->
nif_decode_loop(Data, {Decoder, Objs, Curr});
Other ->
Other
end.
nif_decode(_Data, _Options) ->
?NOT_LOADED.
nif_encode_loop(Data, Options) ->
case nif_encode(Data, Options) of
{partial, Encoder, Stack} ->
nif_encode_loop(Encoder, Stack);
Other ->
Other
end.
nif_encode(_Data, _Options) ->
?NOT_LOADED.

Loading…
取消
儲存