Procházet zdrojové kódy

Revamp yields back to Erlang

In the original PR for `return_trailer` @vlm pointed out that I wasn't
using enif_consume_timeslice correctly. This fixes that by changing out
its called.

Previously we attempted to define the total number of bytes to decode or
encode in a single NIF call and then would consume as much of the
timeslice as we processed. This is wrong because we may start the NIF
call with less than an entire timeslice left.

The new approach is to define the number of bytes to encode or decode
per reduction and then iteratively call enif_consume_timeslice until it
indicates that we should return.
return-trailer
Paul J. Davis před 9 roky
rodič
revize
fa3902ab38
4 změnil soubory, kde provedl 67 přidání a 43 odebrání
  1. +11
    -8
      c_src/decoder.c
  2. +11
    -11
      c_src/encoder.c
  3. +4
    -3
      c_src/jiffy.h
  4. +41
    -21
      c_src/util.c

+ 11
- 8
c_src/decoder.c Zobrazit soubor

@ -49,7 +49,7 @@ typedef struct {
ERL_NIF_TERM arg;
ErlNifBinary bin;
size_t bytes_per_iter;
size_t bytes_per_red;
int is_partial;
int return_maps;
int return_trailer;
@ -78,7 +78,7 @@ dec_new(ErlNifEnv* env)
d->atoms = st;
d->bytes_per_iter = DEFAULT_BYTES_PER_ITER;
d->bytes_per_red = DEFAULT_BYTES_PER_REDUCTION;
d->is_partial = 0;
d->return_maps = 0;
d->return_trailer = 0;
@ -704,7 +704,9 @@ decode_init(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
}
while(enif_get_list_cell(env, opts, &val, &opts)) {
if(get_bytes_per_iter(env, val, &(d->bytes_per_iter))) {
if(get_bytes_per_iter(env, val, &(d->bytes_per_red))) {
continue;
} else if(get_bytes_per_red(env, val, &(d->bytes_per_red))) {
continue;
} else if(enif_compare(val, d->atoms->atom_return_maps) == 0) {
#if MAP_TYPE_PRESENT
@ -739,7 +741,7 @@ decode_iter(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
ERL_NIF_TERM val = argv[2];
ERL_NIF_TERM trailer;
ERL_NIF_TERM ret;
size_t start;
size_t bytes_read = 0;
if(argc != 5) {
return enif_make_badarg(env);
@ -758,11 +760,10 @@ decode_iter(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
curr = argv[4];
//fprintf(stderr, "Parsing:\r\n");
start = d->i;
while(d->i < bin.size) {
//fprintf(stderr, "state: %d\r\n", dec_curr(d));
if(should_yield(d->i - start, d->bytes_per_iter)) {
consume_timeslice(env, d->i - start, d->bytes_per_iter);
if(should_yield(env, &bytes_read, d->bytes_per_red)) {
return enif_make_tuple5(
env,
st->atom_iter,
@ -772,6 +773,9 @@ decode_iter(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
curr
);
}
bytes_read += 1;
switch(dec_curr(d)) {
case st_value:
switch(d->p[d->i]) {
@ -1064,6 +1068,5 @@ decode_done:
}
done:
consume_timeslice(env, d->i - start, d->bytes_per_iter);
return ret;
}

+ 11
- 11
c_src/encoder.c Zobrazit soubor

@ -29,7 +29,7 @@ typedef struct {
ErlNifEnv* env;
jiffy_st* atoms;
size_t bytes_per_iter;
size_t bytes_per_red;
int uescape;
int pretty;
@ -74,7 +74,7 @@ enc_new(ErlNifEnv* env)
Encoder* e = enif_alloc_resource(st->res_enc, sizeof(Encoder));
e->atoms = st;
e->bytes_per_iter = DEFAULT_BYTES_PER_ITER;
e->bytes_per_red = DEFAULT_BYTES_PER_REDUCTION;
e->uescape = 0;
e->pretty = 0;
e->use_nil = 0;
@ -200,7 +200,7 @@ enc_unknown(Encoder* e, ERL_NIF_TERM value)
e->iolist = enif_make_list_cell(e->env, value, e->iolist);
e->iolen++;
// Track the total number of bytes produced before
// splitting our IO buffer. We add 16 to this value
// as a rough estimate of the number of bytes that
@ -221,7 +221,7 @@ enc_unknown(Encoder* e, ERL_NIF_TERM value)
e->p = (char*) e->curr->data;
e->u = (unsigned char*) e->curr->data;
e->i = 0;
e->i = 0;
}
return 1;
@ -612,7 +612,9 @@ encode_init(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
e->use_nil = 1;
} else if(enif_compare(val, e->atoms->atom_force_utf8) == 0) {
// Ignore, handled in Erlang
} else if(get_bytes_per_iter(env, val, &(e->bytes_per_iter))) {
} else if(get_bytes_per_iter(env, val, &(e->bytes_per_red))) {
continue;
} else if(get_bytes_per_red(env, val, &(e->bytes_per_red))) {
continue;
} else {
return enif_make_badarg(env);
@ -639,7 +641,7 @@ encode_iter(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
double dval;
size_t start;
size_t processed;
size_t bytes_written = 0;
if(argc != 3) {
return enif_make_badarg(env);
@ -662,9 +664,9 @@ encode_iter(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
while(!enif_is_empty_list(env, stack)) {
processed = (e->iosize + e->i) - start;
if(should_yield(processed, e->bytes_per_iter)) {
consume_timeslice(env, processed, e->bytes_per_iter);
bytes_written += (e->iosize + e->i) - start;
if(should_yield(env, &bytes_written, e->bytes_per_red)) {
return enif_make_tuple4(
env,
st->atom_iter,
@ -870,8 +872,6 @@ encode_iter(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
}
done:
processed = (e->iosize + e->i) - start;
consume_timeslice(env, processed, e->bytes_per_iter);
return ret;
}

+ 4
- 3
c_src/jiffy.h Zobrazit soubor

@ -6,7 +6,8 @@
#include "erl_nif.h"
#define DEFAULT_BYTES_PER_ITER 2048
#define DEFAULT_BYTES_PER_REDUCTION 20
#define DEFAULT_ERLANG_REDUCTION_COUNT 2000
#define MAP_TYPE_PRESENT \
((ERL_NIF_MAJOR_VERSION == 2 && ERL_NIF_MINOR_VERSION >= 6) \
@ -48,9 +49,9 @@ ERL_NIF_TERM make_error(jiffy_st* st, ErlNifEnv* env, const char* error);
ERL_NIF_TERM make_obj_error(jiffy_st* st, ErlNifEnv* env, const char* error,
ERL_NIF_TERM obj);
int get_bytes_per_iter(ErlNifEnv* env, ERL_NIF_TERM val, size_t* bpi);
int get_bytes_per_red(ErlNifEnv* env, ERL_NIF_TERM val, size_t* bpr);
int get_null_term(ErlNifEnv* env, ERL_NIF_TERM val, ERL_NIF_TERM *null_term);
int should_yield(size_t used, size_t limit);
int consume_timeslice(ErlNifEnv* env, size_t used, size_t limit);
int should_yield(ErlNifEnv* env, size_t* used, size_t bytes_per_red);
ERL_NIF_TERM decode_init(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]);
ERL_NIF_TERM decode_iter(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]);

+ 41
- 21
c_src/util.c Zobrazit soubor

@ -2,6 +2,7 @@
// See the LICENSE file for more information.
#include "jiffy.h"
#include <stdio.h>
ERL_NIF_TERM
make_atom(ErlNifEnv* env, const char* name)
@ -57,6 +58,36 @@ get_bytes_per_iter(ErlNifEnv* env, ERL_NIF_TERM val, size_t* bpi)
return 0;
}
// Calculate the number of bytes per reduction
*bpi = (size_t) (bytes / DEFAULT_ERLANG_REDUCTION_COUNT);
return 1;
}
int
get_bytes_per_red(ErlNifEnv* env, ERL_NIF_TERM val, size_t* bpi)
{
jiffy_st* st = (jiffy_st*) enif_priv_data(env);
const ERL_NIF_TERM* tuple;
int arity;
unsigned int bytes;
if(!enif_get_tuple(env, val, &arity, &tuple)) {
return 0;
}
if(arity != 2) {
return 0;
}
if(enif_compare(tuple[0], st->atom_bytes_per_iter) != 0) {
return 0;
}
if(!enif_get_uint(env, tuple[1], &bytes)) {
return 0;
}
*bpi = (size_t) bytes;
return 1;
@ -91,31 +122,20 @@ get_null_term(ErlNifEnv* env, ERL_NIF_TERM val, ERL_NIF_TERM *null_term)
}
int
should_yield(size_t used, size_t limit)
{
if(limit == 0 || used < limit) {
return 0;
}
return 1;
}
int
consume_timeslice(ErlNifEnv* env, size_t used, size_t limit)
should_yield(ErlNifEnv* env, size_t* used, size_t bytes_per_red)
{
#if(ERL_NIF_MAJOR_VERSION >= 2 && ERL_NIF_MINOR_VERSION >= 4)
double u = (double) used;
double l = (double) limit;
int perc = (int) (100.0 * (u / l));
if(perc < 1) {
perc = 1;
} else if(perc > 100) {
perc = 100;
if(((*used) / bytes_per_red) >= 20) {
*used = 0;
return enif_consume_timeslice(env, 1);
}
return enif_consume_timeslice(env, perc);
#else
return 0;
#else
return ((*used) / bytes_per_red) >= DEFAULT_ERLANG_REDUCTION_COUNT;
#endif
}

Načítá se…
Zrušit
Uložit