bullmq
example bullmq todoapp + custom bullmq pub/sub
https://github.com/restuwahyu13/express-todo-bullmq
bullmq
example bullmq todoapp + custom bullmq pub/sub
https://github.com/restuwahyu13/express-todo-bullmq
bullmq
const { Worker } = require('bullmq')
const userSchema = require('./model.js')
const { resultsPublisher } = require('./publisher')
/**
* @description queueCreatePublisher
*/
const queueCreatePublisher = new Worker('create service', async (job) => {
if (job.name == 'create:service') {
queueCreatePublisher.emit('create:service', JSON.stringify({ data: job.data }))
}
})
queueCreatePublisher.on('completed', (job) => console.log(`job create completed ${job.id}`))
queueCreatePublisher.on('waiting', (job) => console.log(`job create waiting ${job.id}`))
queueCreatePublisher.on('active', (job) => console.log(`job create active ${job.id}`))
queueCreatePublisher.on('failed', (job) => console.log(`job create failed ${job.id}`))
exports.createSubscriber = () => {
return new Promise((resolve, reject) => {
queueCreatePublisher.once('create:service', async (data) => {
const response = await insertOne(JSON.parse(data).data)
resolve(response)
})
})
}
function insertOne(res) {
return new Promise(async (resolve, reject) => {
try {
const checkEmail = await userSchema.findOne({ email: res.email }).lean()
if (checkEmail) {
resolve({ statusCode: 409, message: 'email already exist' })
}
const saveEmail = await userSchema.create({ email: res.email })
if (saveEmail) {
resolve({ statusCode: 201, message: 'add new email successfully' })
} else {
resolve({ statusCode: 400, message: 'add new email failed' })
}
} catch (err) {
reject({ statusCode: 500, message: 'internal server error' })
}
})
}
/**
* @description queueResultsPublisher
*/
const queueResultsPublisher = new Worker('results service', async (job) => {
if (job.name == 'results:service') {
queueResultsPublisher.emit('results:service', JSON.stringify({ data: job.data }))
}
})
queueResultsPublisher.on('completed', (job) => console.log(`job results completed ${job.id}`))
queueResultsPublisher.on('waiting', (job) => console.log(`job results waiting ${job.id}`))
queueResultsPublisher.on('active', (job) => console.log(`job results active ${job.id}`))
queueResultsPublisher.on('failed', (job) => console.log(`job results failed ${job.id}`))
exports.findAllSubscriber = async () => {
await findAll()
return new Promise((resolve, reject) => {
queueResultsPublisher.once('results:service', (data) => {
const response = JSON.parse(data).data
resolve(response)
})
})
}
async function findAll() {
try {
const findAllEmail = await userSchema.find({}).lean()
if (findAllEmail.length < 1) {
await resultsPublisher({ statusCode: 404, message: 'email is not exist', data: findAllEmail })
} else {
await resultsPublisher({ statusCode: 200, message: 'email already to use', data: findAllEmail })
}
} catch (err) {
await resultsPublisher({ statusCode: 500, message: 'internal server error' })
}
}
Copyright © 2021 Codeinu
Forgot your account's password or having trouble logging into your Account? Don't worry, we'll help you to get back your account. Enter your email address and we'll send you a recovery link to reset your password. If you are experiencing problems resetting your password contact us