kafka刷盘机制(吊炸天的Kafka图形化工具)
Kafka是当下非常流行的消息中间件,据官网透露,已有成千上万的公司在使用它。最近实践了一波Kafka,确实很好很强大。今天我们来从三个方面学习下Kafka:Kafaka在Linux下的安装,Kafka的可视化工具,Kafka和SpringBoot结合使用。希望大家看完后能快速入门Kafka,掌握这个流行的消息中间件!
Kafka简介Kafka是由LinkedIn公司开发的一款开源分布式消息流平台,由Scala和JAVA编写。主要作用是为处理实时数据提供一个统一、高吞吐、低延迟的平台,其本质是基于发布订阅模式的消息引擎系统。
Kafka具有以下特性:
Kafka安装
- 高吞吐、低延迟:Kafka收发消息非常快,使用集群处理消息延迟可低至2ms。
- 高扩展性:Kafka可以弹性地扩展和收缩,可以扩展到上千个broker,数十万个partition,每天处理数万亿条消息。
- 永久存储:Kafka可以将数据安全地存储在分布式的,持久的,容错的群集中。
- 高可用性:Kafka在可用区上可以有效地扩展群集,某个节点宕机,集群照样能够正常工作。
我们将采用Linux下的安装方式,安装环境为CentOS 7.6。此处没有采用Docker来安装部署,个人感觉直接安装更简单(主要是官方没提供Docker镜像)!
- 首先我们需要下载Kafka的安装包,下载地址:https://mirrors.bfsu.edu.cn/apache/kafka/2.8.0/kafka_2.13-2.8.0.tgz
- 下载完成后将Kafka解压到指定目录:
cd/mydata/kafka/ tar-xzfkafka_2.13-2.8.0.tgz
- 解压完成后进入到解压目录:
cdkafka_2.13-2.8.0
- 虽然有消息称Kafka即将移除zookeeper,但是在Kafka最新版本中尚未移除,所以启动Kafka前还是需要先启动Zookeeper;
- 启动Zookeeper服务,服务将运行在2181端口;
#后台运行服务,并把日志输出到当前文件夹下的zookeeper-out.file文件中 nohupbin/zookeeper-server-start.shconfig/zookeeper.properties>zookeeper-out.file2>&1&
- 由于目前Kafka是部署在Linux服务器上的,外网如果想要访问,需要修改Kafka的配置文件config/server.properties,修改下Kafka的监听地址,否则会无法连接;
############################# Socket Server Settings ############################# # The address the socket server listens on. It will get the value returned from # java.net.InetAddress.getCanonicalHostName() if not configured. # FORMAT: # listeners = listener_name://host_name:port # EXAMPLE: # listeners = PLAINTEXT://your.host.name:9092 listeners=PLAINTEXT://192.168.5.78:9092
- 最后启动Kafka服务,服务将运行在9092端口。
Kafka命令行操作
#后台运行服务,并把日志输出到当前文件夹下的kafka-out.file文件中 nohupbin/kafka-server-start.shconfig/server.properties>kafka-out.file2>&1&
接下来我们使用命令行来操作下Kafka,熟悉下Kafka的使用。
- 首先创建一个叫consoleTopic的Topic;
bin/kafka-topics.sh--create--topicconsoleTopic--bootstrap-server192.168.5.78:9092
- 接下来查看Topic;
bin/kafka-topics.sh--describe--topicconsoleTopic--bootstrap-server192.168.5.78:9092
- 会显示如下Topic信息;
Topic:consoleTopicTopicId:tJmxUQ8QRJGlhCSf2ojuGwPartitionCount:1ReplicationFactor:1Configs:segment.bytes=1073741824 Topic:consoleTopicPartition:0Leader:0Replicas:0Isr:0
- 向Topic中发送消息:
bin/kafka-console-producer.sh--topicconsoleTopic--bootstrap-server192.168.5.78:9092
- 直接在命令行中输入信息即可发送;
- 重新打开一个窗口,通过如下命令可以从Topic中获取消息:
bin/kafka-console-consumer.sh --topic consoleTopic --from-beginning --bootstrap-server 192.168.5.78:9092
Kafka可视化
使用命令行操作Kafka确实有点麻烦,接下来我们试试可视化工具kafka-eagle。
安装JDK如果你使用的是CentOS的话,默认没有安装完整版的JDK,需要自行安装!
- 下载JDK 8,下载地址:https://mirrors.tuna.tsinghua.edu.cn/AdoptOpenJDK/
- 下载完成后将JDK解压到指定目录;
cd/mydata/java tar-zxvfOpenJDK8U-jdk_x64_linux_xxx.tar.gz mvOpenJDK8U-jdk_x64_linux_xxx.tar.gzjdk1.8
- 在/etc/profile文件中添加环境变量JAVA_HOME。
安装kafka-eagle
vi/etc/profile #在profile文件中添加 exportJAVA_HOME=/mydata/java/jdk1.8 exportPATH=$PATH:$JAVA_HOME/bin #使修改后的profile文件生效 ./etc/profile
- 下载kafka-eagle的安装包,下载地址:https://github.com/smartloli/kafka-eagle-bin/releases
- 下载完成后将kafka-eagle解压到指定目录;
cd/mydata/kafka/ tar-zxvfkafka-eagle-web-2.0.5-bin.tar.gz
- 在/etc/profile文件中添加环境变量KE_HOME;
vi/etc/profile #在profile文件中添加 exportKE_HOME=/mydata/kafka/kafka-eagle-web-2.0.5 exportPATH=$PATH:$KE_HOME/bin #使修改后的profile文件生效 ./etc/profile
- 安装MySQL并添加数据库ke,kafka-eagle之后会用到它;
- 修改配置文件$KE_HOME/conf/system-config.properties,主要是修改Zookeeper的配置和数据库配置,注释掉sqlite配置,改为使用MySQL;
###################################### # multi zookeeper & kafka cluster list ###################################### kafka.eagle.zk.cluster.alias=cluster1 cluster1.zk.list=localhost:2181 ###################################### # kafka eagle webui port ###################################### kafka.eagle.webui.port=8048 ###################################### # kafka sqlite jdbc driver address ###################################### # kafka.eagle.driver=org.sqlite.JDBC # kafka.eagle.url=jdbc:sqlite:/hadoop/kafka-eagle/db/ke.db # kafka.eagle.username=root # kafka.eagle.password=www.kafka-eagle.org ###################################### # kafka mysql jdbc driver address ###################################### kafka.eagle.driver=com.mysql.cj.jdbc.Driver kafka.eagle.url=jdbc:mysql://localhost:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull kafka.eagle.username=root kafka.eagle.password=root
- 使用如下命令启动kafka-eagle;
$KE_HOME/bin/ke.shstart
- 命令执行完成后会显示如下信息,但并不代表服务已经启动成功,还需要等待一会;
- 再介绍几个有用的kafka-eagle命令:
#停止服务 $KE_HOME/bin/ke.shstop #重启服务 $KE_HOME/bin/ke.shrestart #查看服务运行状态 $KE_HOME/bin/ke.shstatus #查看服务状态 $KE_HOME/bin/ke.shstats #动态查看服务输出日志 tail-f$KE_HOME/logs/ke_console.out
- 启动成功可以直接访问,输入账号密码admin:123456,访问地址:http://192.168.5.78:8048/
- 登录成功后可以访问到Dashboard,界面还是很棒的!
可视化工具使用
- 之前我们使用命令行创建了Topic,这里可以直接通过界面来创建;
- 我们还可以直接通过kafka-eagle来发送消息;
- 我们可以通过命令行来消费Topic中的消息;
bin/kafka-console-consumer.sh--topictestTopic--from-beginning--bootstrap-server192.168.5.78:9092
- 控制台获取到信息显示如下;
- 还有一个很有意思的功能叫KSQL,可以通过SQL语句来查询Topic中的消息;
- 可视化工具自然少不了监控,如果你想开启kafka-eagle对Kafka的监控功能的话,需要修改Kafka的启动脚本,暴露JMX的端口;
vikafka-server-start.sh #暴露JMX端口 if["x$KAFKA_HEAP_OPTS"="x"];then exportKAFKA_HEAP_OPTS="-server-Xms2G-Xmx2G-XX:PermSize=128m-XX: UseG1GC-XX:MaxGCPauseMillis=200-XX:ParallelGCThreads=8-XX:ConcGCThreads=5-XX:InitiatingHeapOccupancyPercent=70" exportJMX_PORT="9999" fi
- 来看下监控图表界面;
- 还有一个很骚气的监控大屏功能;
- 还有Zookeeper的命令行功能,总之功能很全,很强大!
SpringBoot整合Kafka
在SpringBoot中操作Kafka也是非常简单的,比如Kafka的消息模式很简单,没有队列,只有Topic。
- 首先在应用的pom.xml中添加Spring Kafka依赖;
<!--Spring整合Kafka--> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>2.7.1</version> </dependency>
- 修改应用配置文件application.yml,配置Kafka服务地址及consumer的group-id;
server: port:8088 spring: kafka: bootstrap-servers:'192.168.5.78:9092' consumer: group-id:"bootGroup"
- 创建一个生产者,用于向Kafka的Topic中发送消息;
/** *Kafka消息生产者 *Createdbymacroon2021/5/19. */ @Component publicclassKafkaProducer{ @Autowired privateKafkaTemplatekafkaTemplate; publicvoidsend(Stringmessage){ kafkaTemplate.send("bootTopic",message); } }
- 创建一个消费者,用于从Kafka中获取消息并消费;
/** *Kafka消息消费者 *Createdbymacroon2021/5/19. */ @Slf4j @Component publicclassKafkaConsumer{ @KafkaListener(topics="bootTopic") publicvoidprocessMessage(Stringcontent){ log.info("consumerprocessMessage:{}",content); } }
- 创建一个发送消息的接口,调用生产者去发送消息;
/** *Kafka功能测试 *Createdbymacroon2021/5/19. */ @Api(tags="KafkaController",description="Kafka功能测试") @Controller @RequestMapping("/kafka") publicclassKafkaController{ @Autowired privateKafkaProducerkafkaProducer; @ApiOperation("发送消息") @RequestMapping(value="/sendMessage",method=RequestMethod.GET) @ResponseBody publicCommonResultsendMessage(@RequestParamStringmessage){ kafkaProducer.send(message); returnCommonResult.success(null); } }
- 直接在Swagger中调用接口进行测试;
- 项目控制台会输出如下信息,表明消息已经被接收并消费掉了。
总结
2021-05-1916:59:21.016INFO2344---[ntainer#0-0-C-1]c.m.mall.tiny.component.KafkaConsumer:consumerprocessMessage:SpringBootmessage!
通过本文的一波实践,大家基本就能入门Kafka了。安装、可视化工具、结合SpringBoot,这些基本都是和开发者相关的操作,也是学习Kafka的必经之路。
,
免责声明:本文仅代表文章作者的个人观点,与本站无关。其原创性、真实性以及文中陈述文字和内容未经本站证实,对本文以及其中全部或者部分内容文字的真实性、完整性和原创性本站不作任何保证或承诺,请读者仅作参考,并自行核实相关内容。文章投诉邮箱:anhduc.ph@yahoo.com