The new scanner with an internal read-ahead buffer contains an
inefficiency in which the underlying buffer is shifted left every time a
token is consumed.
Fix this by delaying the shift until an actual read-ahead from the
source IO handle is made, and only if the shift is required.
Signed-off-by: Jose Lombera <jose@lombera.dev>
---
We can observe the performance inefficiency with following simple
program that counts the newlines in a file.
use bufio;
use fmt;
use io;
use os;
use types;
export fn main() void = {
if (len(os::args) != 2) {
fmt::fatal("Usage:", os::args[0], "<file>");
};
const fd = os::open(os::args[1])!;
let s = bufio::newscanner(fd, types::SIZE_MAX);
let count = 0z;
for (true) {
match (bufio::scan_bytes(&s, '\n')!) {
case []u8 =>
count += 1;
case io::EOF =>
break;
};
};
fmt::println(count)!;
};
Let's test it in a file with 8M empty lines:
$ yes "" | pv -bSs 8m > l.txt
8.00MiB
$ time ./newlines l.txt
8388608
real 0m11.981s
user 0m11.981s
sys 0m0.000s
Let's make this worse by introducing a very long line (8KiB) at the
start of the file to force the buffer of the scanner to get bigger:
$ (while true; do echo -n "12345678"; done) | pv -bSs 8k > l8k.txt
8.00KiB
$ yes "" | pv -bSs 8m >> l8k.txt
8.00MiB
$ time ./newlines l8k.txt
8388608
real 0m35.167s
user 0m35.165s
sys 0m0.000s
These are the results after applying this patch:
$ time ./newlines l.txt
8388608
real 0m0.282s
user 0m0.281s
sys 0m0.001s
$ time ./newlines l8k.txt
8388608
real 0m0.280s
user 0m0.276s
sys 0m0.004s
bufio/scanner.ha | 111 +++++++++++++++++++++--------------------------
1 file changed, 50 insertions(+), 61 deletions(-)
diff --git a/bufio/scanner.ha b/bufio/scanner.ha
index 3eda47564a23..bc7cfe413331 100644
--- a/bufio/scanner.ha
@@ -14,10 +14,10 @@ def BUFSIZ: size = 4096;
export type scanner = struct {
src: io::handle,
buffer: []u8,
- // Number of bytes available in buffer
- pending: size,
- // Number of bytes returned to the user
- readout: size,
+ // Index of start of pending bytes in buffer
+ start: size,
+ // Sub-slice with pending bytes in buffer
+ pending: []u8,
// User-confirmed maximum size of read buffer
maxread: size,
};
@@ -34,8 +34,8 @@ export fn newscanner(src: io::handle, maxread: size) scanner = {
src = src,
buffer = alloc([0...], BUFSIZ),
maxread = maxread,
- pending = 0,
- readout = 0,
+ start = 0,
+ pending = [],
};
};
@@ -48,8 +48,8 @@ export fn newscanner_static(src: io::handle, buffer: []u8) scanner = {
src = src,
buffer = buffer,
maxread = len(buffer),
- pending = 0,
- readout = 0,
+ start = 0,
+ pending = [],
};
};
@@ -65,49 +65,51 @@ export fn finish(scan: *scanner) void = {
// is updated accordingly. Returns the number of bytes which had been available
// prior to the call.
fn scan_readahead(scan: *scanner) (size | io::EOF | io::error) = {
- if (scan.pending >= len(scan.buffer)) {
- let readahead = scan.pending + BUFSIZ;
- if (readahead > scan.maxread) {
- readahead = scan.maxread;
- };
- if (scan.pending >= readahead) {
- return errors::overflow;
+ let start = scan.start;
+ const pending = len(scan.pending);
+ const buf_len = len(scan.buffer);
+
+ if ((start + pending) >= buf_len) {
+ if (start > 0) {
+ // Shift buffer to the left to free space at the end
+ scan.buffer[..buf_len - start] = scan.buffer[start..];
+ scan.pending = scan.buffer[..pending];
+ start = 0;
+ scan.start = 0;
+ } else {
+ // Buffer is full, expand it
+ let readahead = pending + BUFSIZ;
+ if (readahead > scan.maxread) {
+ readahead = scan.maxread;
+ };
+ if (pending >= readahead) {
+ return errors::overflow;
+ };
+ append(scan.buffer, [0...], readahead);
};
- append(scan.buffer, [0...], readahead);
+
};
- const prev = scan.pending;
- match (io::read(scan.src, scan.buffer[scan.pending..])?) {
+ match (io::read(scan.src, scan.buffer[start + pending..])?) {
case let z: size =>
- scan.pending += z;
- return prev;
+ scan.pending = scan.buffer[start..start + pending + z];
+ return pending;
case io::EOF =>
return io::EOF;
};
};
-// Shifts the buffer towards the start, discarding bytes which were read out.
-fn scan_shift(scan: *scanner) void = {
- const n = scan.readout;
- if (n == 0) {
- return;
- };
- scan.buffer[..len(scan.buffer) - n] = scan.buffer[n..];
- scan.readout = 0;
- scan.pending -= n;
-};
-
-// Consumes N bytes from the buffer, updating scan.readout. User must call
-// [[scan_shift]] before calling scan_consume again.
+// Consumes N bytes from the buffer.
fn scan_consume(scan: *scanner, n: size) []u8 = {
- assert(len(scan.buffer) >= n && scan.readout == 0);
- scan.readout = n;
- return scan.buffer[..n];
+ assert(len(scan.pending) >= n);
+ scan.start += n;
+ defer scan.pending = scan.pending[n..];
+ return scan.pending[..n];
};
// Reads one byte from a [[scanner]].
export fn scan_byte(scan: *scanner) (u8 | io::EOF | io::error) = {
- if (scan.pending == 0) {
+ if (len(scan.pending) == 0) {
match (scan_readahead(scan)?) {
case io::EOF =>
return io::EOF;
@@ -116,11 +118,6 @@ export fn scan_byte(scan: *scanner) (u8 | io::EOF | io::error) = {
};
};
- // Consume previous read, if any
- scan_shift(scan);
- // Consume this read right away
- defer scan_shift(scan);
-
return scan_consume(scan, 1)[0];
};
@@ -131,13 +128,11 @@ export fn scan_bytes(
scan: *scanner,
delim: (u8 | []u8),
) ([]u8 | io::EOF | io::error) = {
- scan_shift(scan);
-
- let i = 0z, nread = 0z;
+ let i = 0z;
for (true) {
- match (bytes::index(scan.buffer[nread..scan.pending], delim)) {
+ match (bytes::index(scan.pending[i..], delim)) {
case let ix: size =>
- i = ix;
+ i += ix;
break;
case void =>
yield;
@@ -145,13 +140,13 @@ export fn scan_bytes(
match (scan_readahead(scan)?) {
case io::EOF =>
- if (scan.pending == 0) {
+ if (len(scan.pending) == 0) {
return io::EOF;
};
- return scan_consume(scan, scan.pending);
+ return scan_consume(scan, len(scan.pending));
case let z: size =>
// No need to re-index the earlier part of the buffer
- nread += z;
+ i = z;
};
};
@@ -161,15 +156,15 @@ export fn scan_bytes(
case let u: []u8 =>
yield len(u);
};
- const nuser = nread + i, nconsume = nuser + ndelim;
- return scan_consume(scan, nconsume)[..nuser];
+ const nconsume = i + ndelim;
+ return scan_consume(scan, nconsume)[..i];
};
// Reads one rune from a [[scanner]].
export fn scan_rune(
scan: *scanner,
) (rune | io::EOF | io::error | utf8::invalid) = {
- if (scan.pending < 4) {
+ if (len(scan.pending) < 4) {
match (scan_readahead(scan)?) {
case io::EOF =>
return io::EOF;
@@ -177,18 +172,13 @@ export fn scan_rune(
yield;
};
};
- const sz = match (utf8::utf8sz(scan.buffer[0])) {
+ const sz = match (utf8::utf8sz(scan.pending[0])) {
case let z: size =>
yield z;
case void =>
return utf8::invalid;
};
- // Consume previous read, if any
- scan_shift(scan);
- // Consume this read right away
- defer scan_shift(scan);
-
const buf = scan_consume(scan, sz);
const dec = utf8::decode(buf[..sz]);
match (utf8::next(&dec)?) {
@@ -229,8 +219,7 @@ export fn scan_line(
// Returns the internal scanner buffer, which contains all bytes read ahead by
// the scanner up to this point.
export fn scan_buffer(scan: *scanner) []u8 = {
- scan_shift(scan);
- return scan.buffer[..scan.pending];
+ return scan.pending[..];
};
// Reads a single byte from an [[io::handle]].
base-commit: 1cbc3d0453055fa75b15797e937f4abafe53bcbc
--
2.39.1