diff --git a/c_src/decoder.c b/c_src/decoder.c index c1af71e..db140dc 100644 --- a/c_src/decoder.c +++ b/c_src/decoder.c @@ -91,7 +91,7 @@ dec_new(ErlNifEnv* env) d->p = NULL; d->u = NULL; d->len = -1; - d->i = -1; + d->i = 0; d->st_data = (char*) enif_alloc(STACK_SIZE_INC); d->st_size = STACK_SIZE_INC; @@ -116,16 +116,6 @@ dec_init(Decoder* d, ErlNifEnv* env, ERL_NIF_TERM arg, ErlNifBinary* bin) d->p = (char*) bin->data; d->u = bin->data; d->len = bin->size; - - // I'd like to be more forceful on this check so that when - // we run a second iteration of the decoder we are sure - // that we're using the same binary. Unfortunately, I don't - // think there's a value to base this assertion on. - if(d->i < 0) { - d->i = 0; - } else { - assert(d->i <= d->len && "mismatched binary lengths"); - } } void @@ -715,38 +705,56 @@ decode_iter(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) ERL_NIF_TERM val = argv[2]; ERL_NIF_TERM trailer; ERL_NIF_TERM ret; - size_t bytes_read = 0; + ERL_NIF_TERM tmp_argv[5]; - if(argc != 5) { - return enif_make_badarg(env); - } else if(!enif_inspect_binary(env, argv[0], &bin)) { + size_t start; + size_t bytes_processed = 0; + + if(!enif_inspect_binary(env, argv[0], &bin)) { return enif_make_badarg(env); } else if(!enif_get_resource(env, argv[1], st->res_dec, (void**) &d)) { return enif_make_badarg(env); - } else if(!enif_is_list(env, argv[3])) { - return enif_make_badarg(env); - } else if(!enif_is_list(env, argv[4])) { - return enif_make_badarg(env); } dec_init(d, env, argv[0], &bin); objs = argv[3]; curr = argv[4]; + start = d->i; + while(d->i < bin.size) { - if(should_yield(env, &bytes_read, d->bytes_per_red)) { - return enif_make_tuple5( + bytes_processed = d->i - start; + + if(should_yield(bytes_processed, d->bytes_per_red)) { + assert(enif_is_list(env, objs)); + assert(enif_is_list(env, curr)); + + tmp_argv[0] = argv[0]; + tmp_argv[1] = argv[1]; + tmp_argv[2] = val; + tmp_argv[3] = objs; + tmp_argv[4] = curr; + + bump_used_reds(env, bytes_processed, d->bytes_per_red); + +#if SCHEDULE_NIF_PRESENT + return enif_schedule_nif( + env, + "nif_decode_iter", + 0, + decode_iter, + 5, + tmp_argv + ); +#else + return enif_make_tuple2( env, st->atom_iter, - argv[1], - val, - objs, - curr + enif_make_tuple(env, 5, tmp_argv) ); +#endif } - bytes_read += 1; - switch(dec_curr(d)) { case st_value: switch(d->p[d->i]) { @@ -1040,5 +1048,7 @@ decode_done: } done: + bump_used_reds(env, bytes_processed, d->bytes_per_red); + return ret; } diff --git a/c_src/encoder.c b/c_src/encoder.c index fdae2d1..08b74a1 100644 --- a/c_src/encoder.c +++ b/c_src/encoder.c @@ -731,20 +731,15 @@ encode_iter(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) ERL_NIF_TERM curr; ERL_NIF_TERM item; - ERL_NIF_TERM saved_stack; const ERL_NIF_TERM* tuple; int arity; ErlNifSInt64 lval; double dval; size_t start; - size_t bytes_written = 0; + size_t bytes_processed = 0; - if(argc != 3) { - return enif_make_badarg(env); - } else if(!enif_get_resource(env, argv[0], st->res_enc, (void**) &e)) { - return enif_make_badarg(env); - } else if(!enif_is_list(env, argv[2])) { + if(!enif_get_resource(env, argv[0], st->res_enc, (void**) &e)) { return enif_make_badarg(env); } @@ -761,19 +756,36 @@ encode_iter(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) start = e->iosize + e->i; while(!termstack_is_empty(&stack)) { - bytes_written += (e->iosize + e->i) - start; + bytes_processed = (e->iosize + e->i) - start; + + if(should_yield(bytes_processed, e->bytes_per_red)) { + ERL_NIF_TERM tmp_argv[3]; + + assert(enif_is_list(env, e->iolist)); + + tmp_argv[0] = argv[0]; + tmp_argv[1] = termstack_save(env, &stack); + tmp_argv[2] = e->iolist; - if(should_yield(env, &bytes_written, e->bytes_per_red)) { - saved_stack = termstack_save(env, &stack); termstack_destroy(&stack); + bump_used_reds(env, bytes_processed, e->bytes_per_red); - return enif_make_tuple4( +#if SCHEDULE_NIF_PRESENT + return enif_schedule_nif( + env, + "nif_encode_iter", + 0, + encode_iter, + 3, + tmp_argv + ); +#else + return enif_make_tuple2( env, st->atom_iter, - argv[0], - saved_stack, - e->iolist + enif_make_tuple(env, 3, tmp_argv) ); +#endif } curr = termstack_pop(&stack); @@ -956,6 +968,7 @@ encode_iter(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) } done: + bump_used_reds(env, bytes_processed, e->bytes_per_red); termstack_destroy(&stack); return ret; diff --git a/c_src/jiffy.h b/c_src/jiffy.h index ef03a06..7145b05 100644 --- a/c_src/jiffy.h +++ b/c_src/jiffy.h @@ -13,6 +13,12 @@ ((ERL_NIF_MAJOR_VERSION == 2 && ERL_NIF_MINOR_VERSION >= 6) \ || (ERL_NIF_MAJOR_VERSION > 2)) +#define CONSUME_TIMESLICE_PRESENT \ + ((ERL_NIF_MAJOR_VERSION >= 2 && ERL_NIF_MINOR_VERSION >= 4)) + +#define SCHEDULE_NIF_PRESENT \ + ((ERL_NIF_MAJOR_VERSION >= 2 && ERL_NIF_MINOR_VERSION >= 7)) + typedef struct { ERL_NIF_TERM atom_ok; ERL_NIF_TERM atom_error; @@ -53,7 +59,8 @@ ERL_NIF_TERM make_obj_error(jiffy_st* st, ErlNifEnv* env, const char* error, int get_bytes_per_iter(ErlNifEnv* env, ERL_NIF_TERM val, size_t* bpi); int get_bytes_per_red(ErlNifEnv* env, ERL_NIF_TERM val, size_t* bpr); int get_null_term(ErlNifEnv* env, ERL_NIF_TERM val, ERL_NIF_TERM *null_term); -int should_yield(ErlNifEnv* env, size_t* used, size_t bytes_per_red); +int should_yield(size_t used, size_t bytes_per_red); +void bump_used_reds(ErlNifEnv* env, size_t used, size_t bytes_per_red); ERL_NIF_TERM decode_init(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]); ERL_NIF_TERM decode_iter(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]); diff --git a/c_src/util.c b/c_src/util.c index 7d8ac74..2890a63 100644 --- a/c_src/util.c +++ b/c_src/util.c @@ -122,20 +122,31 @@ get_null_term(ErlNifEnv* env, ERL_NIF_TERM val, ERL_NIF_TERM *null_term) } int -should_yield(ErlNifEnv* env, size_t* used, size_t bytes_per_red) +should_yield(size_t used, size_t bytes_per_red) { -#if(ERL_NIF_MAJOR_VERSION >= 2 && ERL_NIF_MINOR_VERSION >= 4) - - if(((*used) / bytes_per_red) >= 20) { - *used = 0; - return enif_consume_timeslice(env, 1); - } + return (used / bytes_per_red) >= DEFAULT_ERLANG_REDUCTION_COUNT; +} - return 0; +void +bump_used_reds(ErlNifEnv* env, size_t used, size_t bytes_per_red) +{ +#if CONSUME_TIMESLICE_PRESENT + size_t reds_used; + size_t pct_used; -#else + reds_used = used / bytes_per_red; + pct_used = 100 * reds_used / DEFAULT_ERLANG_REDUCTION_COUNT; - return ((*used) / bytes_per_red) >= DEFAULT_ERLANG_REDUCTION_COUNT; + if(pct_used > 0) { + if(pct_used > 100) { + pct_used = 100; + } + enif_consume_timeslice(env, pct_used); + } #endif + + (void) env; + (void) used; + (void) bytes_per_red; } diff --git a/src/jiffy.erl b/src/jiffy.erl index c82eba4..164fa5b 100644 --- a/src/jiffy.erl +++ b/src/jiffy.erl @@ -71,7 +71,7 @@ decode(Data, Opts) when is_binary(Data), is_list(Opts) -> error(Error); {partial, EJson} -> finish_decode(EJson); - {iter, Decoder, Val, Objs, Curr} -> + {iter, {_, Decoder, Val, Objs, Curr}} -> decode_loop(Data, Decoder, Val, Objs, Curr); EJson -> EJson @@ -99,7 +99,7 @@ encode(Data, Options) -> error(Error); {partial, IOData} -> finish_encode(IOData, []); - {iter, Encoder, Stack, IOBuf} -> + {iter, {Encoder, Stack, IOBuf}} -> encode_loop(Data, Options, Encoder, Stack, IOBuf); IOData -> IOData @@ -184,7 +184,7 @@ decode_loop(Data, Decoder, Val, Objs, Curr) -> error(Error); {partial, EJson} -> finish_decode(EJson); - {iter, NewDecoder, NewVal, NewObjs, NewCurr} -> + {iter, {_, NewDecoder, NewVal, NewObjs, NewCurr}} -> decode_loop(Data, NewDecoder, NewVal, NewObjs, NewCurr); EJson -> EJson @@ -204,7 +204,7 @@ encode_loop(Data, Options, Encoder, Stack, IOBuf) -> error(Error); {partial, IOData} -> finish_encode(IOData, []); - {iter, NewEncoder, NewStack, NewIOBuf} -> + {iter, {NewEncoder, NewStack, NewIOBuf}} -> encode_loop(Data, Options, NewEncoder, NewStack, NewIOBuf); IOData -> IOData