We’re at 05/06/2025 and kafkaJS is still unmaintained. Plus, if you’re using NestJS microservices approach, you’re using a not flexible library for the Kafka Transport that uses an unmaintained library.
Luckily, NestJS allows to define your own Transport Strategy. And it can be a Kafka Strategy through a different library.
3 simple steps:
- Define your custom class for Transport, the way NestJS forces it
export class RDKafkaServer extends Server implements CustomTransportStrategy {
...
}
- Implement the .listen() method in the fashion/library you prefer. Here I’m using node-rdKafka. Notice the getHandlerByPattern usage. Assume that we have a topic called my-sample-topic on kafka. Disclaimer: Don’t use this code on production
async listen(callback: () => void) {
const kafkaConfig: Kafka.ConsumerGlobalConfig = {
'metadata.broker.list': this.options.brokers,
'group.id': this.options.groupId,
'client.id': this.options.clientId,
'enable.auto.commit': true,
'auto.commit.interval.ms': this.options.autoCommitInterval ?? 5000,
};
if (this.options.ssl) {
kafkaConfig['security.protocol'] = 'sasl_ssl';
} else if (this.options.sasl) {
kafkaConfig['security.protocol'] = 'sasl_plaintext';
}
if (this.options.sasl) {
kafkaConfig['sasl.mechanisms'] = this.options.sasl.mechanism;
kafkaConfig['sasl.username'] = this.options.sasl.username;
kafkaConfig['sasl.password'] = this.options.sasl.password;
}
this.consumer = new Kafka.KafkaConsumer(kafkaConfig, {
'auto.offset.reset': this.options.fromBeginning ? 'earliest' : 'latest',
});
this.consumer.on('ready', () => {
this.logger.log('Kafka consumer ready. Subscribing to topics...');
this.consumer.subscribe(this.options.topics);
this.consumer.consume();
callback();
});
this.consumer.on('data', async (message: Kafka.Message) => {
const topic = message.topic;
const value = message.value?.toString() ?? '{}';
const handler = this.getHandlerByPattern(topic);
if (!handler) {
this.logger.warn(`No handler for topic: ${topic}`);
return;
}
try {
const payload = JSON.parse(value);
await handler(payload);
} catch (err) {
this.logger.error(`Error parsing message from ${topic}`, err);
}
});
this.consumer.on('event.error', (err) => {
this.logger.error('Kafka error:', err);
});
this.consumer.connect();
}
-
Use the @EventPattern() decorator to trigger whatever you want
@Controller() export class CustomListener { @EventPattern('my-sample-topic') handleSalesforceContract(@Payload() message: any) { console.log('Message Received on my-sample-topic:'); console.log(message); } }
Voila, custom transport setup. Fine-tune it in the way you prefer.
Full code available here: https://github.com/Farmijo/nestjs-kafka-confluent