博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Kafka1 利用虚拟机搭建自己的Kafka集群
阅读量:5963 次
发布时间:2019-06-19

本文共 5723 字,大约阅读时间需要 19 分钟。

前言:

      上周末自己学习了一下Kafka,参考网上的文章,学习过程中还是比较顺利的,遇到的一些问题最终也都解决了,现在将学习的过程记录与此,供以后自己查阅,如果能帮助到其他人,自然是更好的。

===============================================================长长的分割线====================================================================

正文:

  关于Kafka的理论介绍,网上可以搜到到很多的资料,大家可以自行搜索,我这里就不在重复赘述。

      本文中主要涉及三块内容: 第一,就是搭建Zookeeper环境;第二,搭建Kafka环境,并学习使用基本命令发送接收消息;第三,使用Java API完成操作,以便初步了解在实际项目中的使用方式。

  闲话少说,言归正传,本次的目的是利用VMware搭建一个属于自己的ZooKeeper和Kafka集群。本次我们选择的是VMware10,具体的安装步骤大家可以到网上搜索,资源很多。

  第一步,确定目标:

      ZooKeeperOne       192.168.224.170  CentOS

      ZooKeeperTwo       192.168.224.171  CentOS

      ZooKeeperThree     192.168.224.172  CentOS

      KafkaOne                192.168.224.180  CentOS

      KafkaTwo                192.168.224.181  CentOS

      我们安装的ZooKeeper是3.4.6版本,可以从这里; Kafka安装的是0.8.1版本,可以从这里; JDK安装的版本是1.7版本。

      另: 我在学习的时候,搭建了两台Kafka服务器,正式环境中我们最好是搭建2n+1台,此处仅作为学些之用,暂不计较。

 

      第二步,搭建Zookeeper集群:

      此处大家可以参照我之前写的一篇文章  ,我在搭建Kafka的环境的时候就是使用的之前搭建好的Zookeeper集群。

       

      第三步,搭建Kafka集群:

      (1). 将第一步中下载的 kafka_2.10-0.8.1.tgz 解压缩后,进入config目录,会看到如下图所示的一些配置文件,我们准备编辑server.properties文件。

      

      (2). 打开 server.properties 文件,需要编辑的属性如下所示:

1 broker.id=02 port=90923 host.name=192.168.224.1804 5 log.dirs=/opt/kafka0.8.1/kafka-logs6 7 zookeeper.connect=192.168.224.170:2181,192.168.224.171:2181,192.168.224.172:2181

      注意: 

      a. broker.id: 每个kafka对应一个唯一的id,自行分配即可

      b. port: 默认的端口号是9092,使用默认端口即可

      c. host.name: 配置的是当前机器的ip地址

      d. log.dirs: 日志目录,此处自定义一个目录路径即可

      e. zookeeper.connect: 将我们在第二步搭建的Zookeeper集群的配置全部写上

      (3). 上边的配置完毕后,我们需要执行命令 vi /etc/hosts,将相关服务器的host配置如下图,如果没有执行此步,后边我们在执行一些命令的时候,会报无法识别主机的错误

       

      (4).  经过上述操作,我们已经完成了对Kafka的配置,很简单吧?!但是如果我们执行 bin/kafka-server-start.sh   config/server.properties  & 这个启动命令,可能我们会遇到如下两个问题:

       a. 我们在启动的报 Unrecognized VM option '+UseCompressedOops'.Could not create the Java virtual machine. 这个错误。

       

       解决方式:

       查看 bin/kafka-run-class.sh 

       找到下面这段代码,去掉-XX:+UseCompressedOops     

1 if [ -z "$KAFKA_JVM_PERFORMANCE_OPTS" ]; then2 KAFKA_JVM_PERFORMANCE_OPTS="-server -XX:+UseCompressedOops -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:+CMSClassUnloadingEnabled -XX:+CMSScavengeBeforeRemark -XX:+DisableExplicitGC -Djava.awt.headless=true"3 fi

        b. 解决了第一个问题,我们还有可能在启动的时候遇到 java.lang.NoClassDefFoundError: org/slf4j/impl/StaticLoggerBinder 这个错误。

        

        解决方式:

        从网上的下载 slf4j-nop-1.6.0.jar 这个jar包,然后放到kafka安装目录下的libs目录中即可。注意,基于我目前的kafka版本,我最开始从网上下载的slf4j-nop-1.5.0.jar 这个jar包,但是启动的时候依然会报错,所以一定要注意版本号哦~

      (5). 现在我们执行 bin/kafka-server-start.sh   config/server.properties  & 这个启动命令,应该就可以正常的启动Kafka了。命令最后的 & 符号是为了让启动程序在后台执行。如果不加这个 & 符号,当执行完启动后,我们通常会使用 ctrl + c 退出当前控制台,kafka此时会自动执行shutdown,所以此处最好加上 & 符号。

 

      第三步,使用基本命令创建消息主题,发送和接收主题消息:

      (1). 创建、查看消息主题

1 #连接zookeeper, 创建一个名为myfirsttopic的topic 2 bin/kafka-topics.sh --create --zookeeper 192.168.224.170:2181 --replication-factor 2 --partitions 1 --topic myfirsttopic 3  4 # 查看此topic的属性   5 bin/kafka-topics.sh --describe --zookeeper 192.168.224.170:2181 --topic myfirsttopic   6  7 # 查看已经创建的topic列表   8 bin/kafka-topics.sh --list --zookeeper 192.168.224.170:2181

      上述命令执行完毕后,截图如下:

       

       

      (2). 创建一个消息的生产者:

1 #启动生产者,发送消息2 bin/kafka-console-producer.sh --broker-list 192.168.224.180:9092 --topic myfirsttopic3 4 #启动消费者,接收消息5 bin/kafka-console-consumer.sh --zookeeper 192.168.224.170:2181 --from-beginning --topic myfirsttopic

      上述命令执行完毕后,截图如下:

       

      (3). 按照(1)、(2)这两步,你应该可以利用Kafka感受到了分布式消息系统。这里需要着重的再说一下我在这个过程中发现的一个问题: 大家可以看下上图中的consumer的命令,我选择了zookeeper的其中一台192.168.224.170:2181接收消息是可以正常接收的!不要忘了,我是三台zookeeper的,所以我又尝试了向192.168.224.171:2181和192.168.224.172:2181接收myfirsttopic这个主题的消息。正常情况下,三台访问的结果应该都是可以正常的接收消息,但是当时我的情况在访问了192.168.224.171:2181这台时会报 org.apache.zookeeper.clientcnxn 这个错误!!!

             我当时多试了两遍,发现我的三台zookeeper中,谁是leader(zkServer.sh status命令),concumer连接的时候就会报上面的那个异常。后来定位到了zookeeper的zoo.cfg配置文件中的maxClientCnxns属性,即客户端最大连接数,我当时使用的是默认配置是2。后来我把这个属性的值调大一些,consumer连接zookeeper leader时,就不会报这个错误了。如果你选择将这个属性注释掉(从网上查询到注释掉该属性默认值是10),也不会报这个错误了。其实网上的很多文章也只是说了此属性可以尽量设置的大一些,没有解释其他的。

             但我后来还是仔细想了想,当我把maxClientCnxns这个属性设置为2时,如果两台kafka启动时,每个kafka和zookeeper的节点之间建立了一个客户端连接,那么此时zookeeper的每个节点的客户端连接数就已经达到了最大连接数2,那么我创建consumer的时候,应该是三台zookeeper连接都有问题,而不是只有leader会有问题。所以,此处需要各位有见解的再帮忙解释一下!!!       

 

      第四步,使用Java API 操作Kafka:

      其实Java API提供的功能基本也是基于上边的客户端命令来实现的,万变不离其宗,我将我整理的网上的例子贴到下面,大家可以在本地Java工程中执行一下,即可了解调用方法。

      (1). 我的maven工程中pom.xml的配置

1 
2
4.0.0
3
com.ismurf.study
4
com.ismurf.study.kafka
5
0.0.1-SNAPSHOT
6
Kafka_Project_0001
7
war
8 9
10
11
org.apache.kafka
12
kafka_2.10
13
0.8.1.1
14
15
16 17
18
19
20
org.apache.maven.plugins
21
maven-war-plugin
22
2.1.1
23
24
@{artifactId}@.@{extension}@
25
26
27 28
29
30
org.apache.maven.plugins
31
maven-compiler-plugin
32
33
1.634
1.6
35
36
37 38
39
org.apache.maven.plugins
40
maven-surefire-plugin
41
42
true
43
44
45
46
47 48

      (2). 实例代码: 大家可以参考这片文章的   中的代码,粘贴到工程后即可使用,上述文章中的代码整理后目录截图如下:

      

   

             

   

      

 

转载于:https://www.cnblogs.com/PurpleDream/p/4845102.html

你可能感兴趣的文章
C语言 · 输出日历
查看>>
inode与ln命令
查看>>
Uncaught TypeError: this.canvas.getContext is not a function
查看>>
CSS 之 样式优先级机制
查看>>
Jenkins + GitHub + fir-cli 一行命令从源码到fir.im
查看>>
kill-9导致weblogic无法启动
查看>>
WebService服务发布与使用(JDK自带WebService)
查看>>
Java DES 加解密("DES/CBC/PKCS5Padding")
查看>>
C#编程(七十六)----------使用指针实现基于栈的高性能数组
查看>>
PostgreSql 分页limit
查看>>
在MySQL中创建cm-hive使用的数据库及账号
查看>>
HDU 2503 a/b + c/d(最大公约数与最小公倍数,板子题)
查看>>
python总结
查看>>
hdu 5215 Cycle
查看>>
GCD学习(五) dispatch_barrier_async
查看>>
file_get_contents("php://input")的使用方法
查看>>
MeasureSpec学习
查看>>
Android View体系(五)从源码解析View的事件分发机制
查看>>
数据结构 之 并查集(Disjoint Set)
查看>>
枚举类的创建和使用
查看>>