实例:NodeJS 操作 Kafka

发布时间:2024-01-04 09:37:18

本人是C#出身的程序员,c#很简单就能实现,有需要的可以加我私聊。但是就目前流行的开发语言,尤其是面向web方向应用的,我感觉就是Nodejs最简单了。下面介绍:

本文将会介绍在windows环境下启动Kafka,并通过nodejs作为客户端,生产并消费消息。

步骤一,Kafka需要java运行时,先安装配置java环境。通过在命令行中输入java -version确认java是否成功安装(可能需要查看windows的环境变量中是否有java)。

步骤二Kafka官网下载最新版本的压缩包(.tgz格式),并解压。分别在两个命令行里面启动zookeeper、kafka(解压缩路径下)

bin\windows\zookeeper-server-start.bat config\zookeeper.properties

bin\windows\kafka-server-start.bat config\server.properties

说明一下zookeeper和kafka的关系:zookeeper是集群的调度者,kafka才是消息队列。 zookeeper的默认端口:2181,kafka的默认端口:9092
相关配置可以在config文件下的server.properties和zookeeper.properties中找到

用记事本打开就可以编辑

建立data,logs,kafka-logs目录,用于日志,备用。

消费者客户端需要的group.id可以在config->consumer.properties中找到。

步骤三,使用DOS的CMD管理员命令行方式测试生产者生产、消费者消费。
//创建一个topic:test
bin\windows\kafka-topics.bat --create --zookeeper?localhost:2181 --replication-factor 1 --partitions 1 --topic test

//查看topic
bin\windows\kafka-topics.bat --list --zookeeper localhost:2181

//创建生产者主题mytest
bin\windows\kafka-console-producer.bat --broker-list?localhost:9092 --topic test-nodetopic

//创建消费者消费mytest
bin\windows\kafka-console-consumer.bat?--bootstrap-server?localhost:9092 --topic test-nodetopic --from-beginning

步骤四,生产者发送消息
在生产者窗口,随意输入一条消息,可以在消费者窗口看到该消息。

最后,使用nodejs访问kafka? 首先安装kafkajs

初始化项目

npm init -y

没有安装kafkajs的,在这里可以安装,互联网在线安装。
npm install kafkajs

新建demo2023.js,输入以下代码

const { Kafka } = require('kafkajs')

const kafka = new Kafka({

??clientId: 'my-app',

??brokers: ['localhost:9092']

})

const producer = kafka.producer()

const consumer = kafka.consumer({ groupId: 'test-consumer-group' })

const run = async () => {

??// Producing

??await producer.connect()

??await producer.send({

????topic: 'test-nodetopic',

????messages: [

??????{ value: ' Hello KafkaJS user,I?am producer ! ' },

????],

??})

??// Consuming

??await consumer.connect()

??await consumer.subscribe({ topic: 'test-nodetopic', fromBeginning: true })

??await consumer.run({

????eachMessage: async ({ topic, partition, message }) => {

??????console.log({

????????partition,

????????offset: message.offset,

????????value: message.value.toString(),

??????})

????},

??})

}

run().catch(console.error)

最后执行命令
node demo2023.js

文章来源:https://blog.csdn.net/qqxinxi/article/details/134512455
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。