Fran Armijo Farmijo | Random thoughts blog

6/5/2025 ~ 2 min read

Custom transports on NestJS (and how to avoid KafkaJS)

At some point, NestJS should drop the unmaintained KafkaJS and decide which native approach substitute. In the meantime, consider this.


We’re at 05/06/2025 and kafkaJS is still unmaintained. Plus, if you’re using NestJS microservices approach, you’re using a not flexible library for the Kafka Transport that uses an unmaintained library.

Luckily, NestJS allows to define your own Transport Strategy. And it can be a Kafka Strategy through a different library.

3 simple steps:

  • Define your custom class for Transport, the way NestJS forces it
export class RDKafkaServer extends Server implements CustomTransportStrategy {
...
}
  • Implement the .listen() method in the fashion/library you prefer. Here I’m using node-rdKafka. Notice the getHandlerByPattern usage. Assume that we have a topic called my-sample-topic on kafka. Disclaimer: Don’t use this code on production
  async listen(callback: () => void) {
    const kafkaConfig: Kafka.ConsumerGlobalConfig = {
      'metadata.broker.list': this.options.brokers,
      'group.id': this.options.groupId,
      'client.id': this.options.clientId,
      'enable.auto.commit': true,
      'auto.commit.interval.ms': this.options.autoCommitInterval ?? 5000,
    };

    if (this.options.ssl) {
      kafkaConfig['security.protocol'] = 'sasl_ssl';
    } else if (this.options.sasl) {
      kafkaConfig['security.protocol'] = 'sasl_plaintext';
    }

    if (this.options.sasl) {
      kafkaConfig['sasl.mechanisms'] = this.options.sasl.mechanism;
      kafkaConfig['sasl.username'] = this.options.sasl.username;
      kafkaConfig['sasl.password'] = this.options.sasl.password;
    }

    this.consumer = new Kafka.KafkaConsumer(kafkaConfig, {
      'auto.offset.reset': this.options.fromBeginning ? 'earliest' : 'latest',
    });

    this.consumer.on('ready', () => {
      this.logger.log('Kafka consumer ready. Subscribing to topics...');
      this.consumer.subscribe(this.options.topics);
      this.consumer.consume();
      callback();
    });

    this.consumer.on('data', async (message: Kafka.Message) => {
      const topic = message.topic;
      const value = message.value?.toString() ?? '{}';
      const handler = this.getHandlerByPattern(topic);

      if (!handler) {
        this.logger.warn(`No handler for topic: ${topic}`);
        return;
      }

      try {
        const payload = JSON.parse(value);
        await handler(payload);
      } catch (err) {
        this.logger.error(`Error parsing message from ${topic}`, err);
      }
    });

    this.consumer.on('event.error', (err) => {
      this.logger.error('Kafka error:', err);
    });

    this.consumer.connect();
  }
  • Use the @EventPattern() decorator to trigger whatever you want

    @Controller()
    export class CustomListener {
      @EventPattern('my-sample-topic')
      handleSalesforceContract(@Payload() message: any) {
        console.log('Message Received on my-sample-topic:');
        console.log(message);
      }
    }

    Voila, custom transport setup. Fine-tune it in the way you prefer.

Full code available here: https://github.com/Farmijo/nestjs-kafka-confluent


Headshot of Fran Armijo

Hi, I'm Fran. I'm a software engineer and musician based in Barcelona.