I am streaming the results from an API to gcloud.
it looks like this.
import {pipeline} from 'stream/promises';
import {Storage} from '@google-cloud/storage';
import {getData} from './data';
import {transformData} from './transform';
const datasets = await getDatasets()
for (const dataset of datasets) {
await pipeline(
getData({
data:dataset
}),
transformData(),
GCStream(dataset)
)
}
getData
export function getData(opts:any) {
return {
[Symbol.asyncIterator]() {
return{
async next() {
try {
const res = await axios.get(
'endpoint'
)
const data = res.data.result
if (!data.length) return {done: true}
return {done: false, value: data}
} catch (err) {
console.log(err);
}
await delay(10)
}
throw new Error(`Failed to fetch.`)
}
}
}
}
}
an async iterator of Promise results from the API requests. It's only returning the first result for some reason.
transformData returns stream.Transform is a non-async transform stream that pulls data from the API request iterator and massages it.
GCStream looks like this and I think it is also a problem.
const GCStream = (name:string) => {
const file = destinationBucket.file(name);
async function streamFileUpload(stream:any) {
stream.pipe(file.createWriteStream()).on('finish', () => {
console.log('uploaded')
});
}
return streamFileUpload;
}
It's only ever getting the first result off of the api stream, and it is just stalling... never creating the upload. I'm having difficulty with the pipeline documentation and hope someone can spot the obvious error here.