Initial write stream implementation

pull/22966/head
Felix Geisendörfer 2010-03-02 23:12:52 +01:00
parent 6670154819
commit 61785afb3d
2 changed files with 134 additions and 0 deletions

View File

@ -375,3 +375,90 @@ exports.realpath = function(path, callback) {
callback(null, normalize(path));
});
}
exports.fileWriteStream = function(path, options) {
return new FileWriteStream(path, options);
};
var FileWriteStream = exports.FileWriteStream = function(path, options) {
this.path = path;
this.fd = null;
this.closed = false;
this.flags = 'w';
this.encoding = 'binary';
this.mode = 0666;
process.mixin(this, options || {});
var
self = this,
queue = [],
busy = false;
queue.push([fs.open, this.path, this.flags, this.mode]);
function pump() {
if (busy) {
return;
}
var args = queue.shift();
if (!args) {
return self.emit('drain');
}
busy = true;
var method = args.shift();
args.push(function(err) {
busy = false;
if (err) {
self.emit('error', err);
return;
}
// save reference for file pointer
if (method === fs.open) {
self.fd = arguments[1];
self.emit('open', self.fd);
}
// stop pumping after close
if (method === fs.close) {
self.emit('close');
return;
}
pump();
});
// Inject the file pointer
if (method !== fs.open) {
args.unshift(self.fd);
}
method.apply(null, args);
};
this.write = function(data) {
if (this.closed) {
throw new Error('stream already closed');
}
queue.push([fs.write, data, undefined, this.encoding]);
pump();
return false;
};
this.close = function() {
this.closed = true;
queue.push([fs.close,]);
pump();
};
pump();
};
FileWriteStream.prototype.__proto__ = process.EventEmitter.prototype;

View File

@ -0,0 +1,47 @@
process.mixin(require('../common'));
var
fn = path.join(fixturesDir, "write.txt"),
file = fs.fileWriteStream(fn),
EXPECTED = '0123456789',
callbacks = {
open: -1,
drain: -2,
close: -1
};
file
.addListener('open', function(fd) {
callbacks.open++;
assert.equal('number', typeof fd);
})
.addListener('drain', function() {
callbacks.drain++;
if (callbacks.drain == -1) {
assert.equal(EXPECTED, fs.readFileSync(fn));
file.write(EXPECTED);
} else if (callbacks.drain == 0) {
assert.equal(EXPECTED+EXPECTED, fs.readFileSync(fn));
file.close();
}
})
.addListener('close', function() {
callbacks.close++;
assert.throws(function() {
file.write('should not work anymore');
});
fs.unlinkSync(fn);
});
for (var i = 0; i < 10; i++) {
assert.strictEqual(false, file.write(i));
}
process.addListener('exit', function() {
for (var k in callbacks) {
assert.equal(0, callbacks[k], k+' count off by '+callbacks[k]);
}
});