D. Walsh D. Walsh - 2 months ago 14
Node.js Question

Resizing Pictures with RxJS and Node

I'm new to RxJS and trying this (seemingly) simple task, but I just can't figure it out.

I want:

1. Read an image from file

2. Convert that image to several smaller images

3. Save all images to file


I've converted fs.readFile and fs.writeFile to observables.

const readFile$ = Rx.Observable.bindNodeCallback(fs.readFile);
const writeFile$ = Rx.Observable.bindNodeCallback(fs.writeFile);


I made a pictures array pipeline.

var pictureSizes = [
{width: 100, size: 'thumbnail', suffix: '_t'},
{width: 300, size: 'small', suffix: '_s'},
{width: 600, size: 'medium', suffix: '_m'},
{width: 1000, size: 'large', suffix: '_l'}
];


And I made a resizeImage$ function using graphics magic

function resizeImage$(picture, data) {
return Rx.Observable.create(observer => {
gm(data)
.resize(picture.width)
.toBuffer('jpg', function(err, buffer) {
if (err) {
console.log(err);
observer.error(err);
} else {
observer.next(buffer);
observer.complete();
}
});
})
}


I think (hope) the above is ok. I can't figure out how to chain my operators.

readFile$('./largeimage.jpg')
.mergeMap(data => pictureSizes.map(picture => resizeImage$(picture, data)))
.flatMap(picture => writeFile$('./testImages/resized.jpg', picture))
.subscribe(
(x) => console.log('Next', x),
(e) => console.log('Error', e),
(c) => console.log('Complete',c )
)


This above corrupted data to a jpeg file. (And rewrites that file because I can't figure out how to get pictureSizes.suffix into the outputted file name.

Anything helps! Thank you.

UPDATE

I got it to work, but I know this bizarre multiple subscription is a horrid anti pattern. The main subscription completes before the image is resized. I have a feeling this is a hot/cold issue, but I have no idea how to fix it. Here's my now working code..

const pictureSizes = [
{width: 100, size: 'thumbnail', suffix: '_t'},
{width: 300, size: 'small', suffix: '_s'},
{width: 600, size: 'medium', suffix: '_m'},
{width: 1000, size: 'large', suffix: '_l'}
];

const image = 'truck.jpg';

function resizeImage$(binary, pictureSize) {
return new Rx.Observable(observer => {
gm(binary)
.resize(pictureSize.width)
.toBuffer('jpg', function(err, buffer) {
console.log('BUFFER');
if (err) {
console.log(err);
observer.error(err);
} else {
observer.next({binary: buffer, pictureSize: pictureSize});
observer.complete('done');
}
});
}).subscribe(
(resizedImage) => {
console.log(resizedImage);
const binary = resizedImage.binary;
const pictureSize = resizedImage.pictureSize;
const fileName = image.split('.')[0];
const fileExtension = image.split('.')[1];
fs.writeFile(`./testImages/${fileName}${pictureSize.suffix}.${fileExtension}`, binary);
})
}
var readFile$ = new Rx.Observable.bindNodeCallback(fs.readFile);
readFile$(`./${image}`)
.zip(Rx.Observable.of(pictureSizes), (binary, sizes) =>
Rx.Observable.of({ binary: binary, sizes: sizes }))
.mergeMap(x => x.value.sizes.map(pictureSize =>
resizeImage$(x.value.binary, pictureSize)))
.subscribe()

Answer

In case anyone is interested, I have an answer. If anyone would like to refactor it further, please do so.

var pictureSizes = [
  {width: 100, size: 'thumbnail', suffix: '_t'},
  {width: 300, size: 'small', suffix: '_s'},
  {width: 600, size: 'medium', suffix: '_m'},
  {width: 1000, size: 'large', suffix: '_l'}
];

function scaleImage$(binary, pictureSize) {
  return new Rx.Observable(observer => {
      gm(binary)
        .resize(pictureSize.width)
        .toBuffer('jpg', function(err, buffer) {
          if (err) {
            observer.error(err);
          } else {
            observer.next({ binary: buffer, pictureSize: pictureSize });
            observer.complete();
          }
        });
  })
}

function writeFile(binary, pictureSize, image) {
  const fileName = image.split('.')[0];
  const fileExtension = image.split('.')[1];
  fs.writeFile(`./resized/${fileName}${pictureSize.suffix}.${fileExtension}`, binary);
}

function resizeImage(imagePath) {
  var readFile$ = new  Rx.Observable.bindNodeCallback(fs.readFile);
  readFile$(imagePath)
    .combineLatest(Rx.Observable.of(pictureSizes),(binary,y) => y.map(pictureSize => Object.assign({}, {binary, pictureSize} )))
    .mergeMap(arr => arr.map(obj => scaleImage$(obj.binary, obj.pictureSize)))
    .mergeAll()
    .subscribe((obj) => writeFile(obj.binary, obj.pictureSize, image))

}

And if you want synchronous behavior (scale image 1 -> write image 1 -> scale image 2 ...), use concatMap and concatAll.

Comments