卡夫卡的迅速实战演练以及基本概念详细说明。

环境

Kafka最开始由Linkedin企业开发设计,是一个适用系统分区和团本的分布式系统信息系统软件,根据zookeeper融洽。它最高的特征是能够并行处理很多数据信息,达到各种各样要求情景,如根据hadoop的批处理命令系统软件,低延迟时间实时系统,Storm/Spark流解决模块,web/Nginx日志,浏览日志,信息服务项目等。,他们是用scala语言开发的。Linkedin在2010年为Apache Foundation作出了奉献,变成顶尖开源软件。

kafka应用实例-kafka简单的入门案例-第1张图片卡夫卡基本要素。

Kafka是一个分布式系统和分支的信息(官方网称之为递交日志)服务项目。它给予了信息系统软件应当具有的作用,但它的确有与众不同的设计方案。那样,卡夫卡使用了JMS标准的观念,但并沒有彻底遵循JMS标准。

最先,使我们看一下基本上的信息有关专业术语。

名字表述Broker消息中间件解决连接点,一个Kafka连接点便是一个broker,一个或是好几个Broker能够构成一个Kafka群集TopicKafka依据topic对信息开展分类,公布到Kafka群集的一条信息都必须特定一个topicProducer信息经营者,向Broker推送信息的手机客户端Consumer信息顾客,从Broker载入信息的手机客户端ConsumerGroup每一个Consumer归属于一个指定的Consumer Group,一条信息能够被好几个不一样的Consumer Group消費,可是一个Consumer Group中只有有一个Consumer可以消費该信息Partition物理学上的定义,一个topic能够分成好几个partition,每一个partition小道消息是井然有序的

因而,从更多的方面而言,经营者根据互联网向卡夫卡群集推送信息,随后顾客开展消費,如下图所显示:

kafka应用实例-kafka简单的入门案例-第2张图片代理商和顾客中间的通讯是根据TCP协议书进行的。

卡夫卡基本上使用方法。

安裝前的自然环境提前准备。

因为Kafka是用Scala语言联合开发的,运作在JVM上,因此在安裝Kafka以前必须先安裝JDK。

yum install Java-1 . 8 . 0-open JDK *-y

卡夫卡依靠野生动物园管理人员,因此必须先安裝野生动物园管理人员。

wget https://mirror . bit . edu . cn/Apache/zoo keeper/zoo keeper-3 . 5 . 8/Apache-zoo keeper-3 . 5 . 8-bin . tar . gz

apache-zookeeper-3.5.8-bin.tar.gz

cd apache-zookeeper-3.5.8-bin

CP/zoo _ sample.cfgconf/zoo.cfg #运行野生动物园管理人员bin/zkServer.sh

start bin/zkCli.sh

Ls/#查询zk的网站根目录有关连接点。

流程1:下载安装包。

免费下载2.4.1版本号并压缩包解压:

ethttps://mirror . bit . edu . cn/Apache/kafka/2.4.1/Kafka _ 2.11-2 . 4 . 1 . tgz # 2.11是scala的版本号,2 . 4 . 1是Kafka的版本号。

tar -xzf kafka_2.11-2.4.1.tgz

cd kafka_2.11-2.4.1

流程2:改动配备。

改动环境变量config/server.properties:

kafka群集中的特性#broker.id务必是唯一的broker.id=0 #kafka布署的设备ip和端口给予服务项目侦听器=密文://192.168.65.60: 9092 # Kafka的信息储存文档log . dir =/usr/local/data/Kafka-logs # Kafka联接zookeeper的详细地址zookeeper。Connect = 192.168.65.60: 2181。

第三步:运行服务项目。

从现在起卡夫卡服务项目:

运行脚本制作英语的语法:Kafka-server-start . sh[-daemon]server . properties。

能够见到server.properties的配备途径是强制性主要参数,而-daemon表示运作后台进程,不然ssh手机客户端撤出后会终止服务项目。(留意,kafka运行的时候会应用与linuxIP地址关系的ip详细地址,因而必须将IP地址和linux ip映射配备到当地服务器中,并应用vim /etc/hosts。)

#运行kafka,运作日意在server.log文件的logs文件目录下。

bin/Kafka-server-start . sh-daemon config/server . properties

#在后台管理运行,不将日志打印出到控制面板或应用。

bin/kafka-server-start . shconfig/server . properties & #大家进到zookeeper文件目录,根据zookeeper手机客户端bin/zkCli.sh ls/#查询zk,Kafka有关连接点的网站根目录。

Ls /brokers/ids #查询卡夫卡连接点# stop Kafka bin/Kafka-server-stop . sh。

Server.properties关键配备详细资料:

PropertyDefaultDescriptionbroker.id0每一个broker都能够用一个唯一的非负整数id开展标志;这一id能够做为broker的“名称”,你能挑选随意你喜爱的数据做为id,只需id是唯一的就可以。log.dirs/tmp/kafka-logskafka存取数据的途径。这一途径并非唯一的,能够是好几个,途径中间只要应用分号隔开就可以;每每建立新partition时,都是会挑选在包括至少partitions的途径下开展。listenersPLAINTEXT://192.168.65.60:9092server接纳手机客户端联接的端口号,ip配置kafka本机ip就可以zookeeper.connectlocalhost:2181zooKeeper联接字符串数组的形式为:hostname:port,这里hostname和port分别是ZooKeeper群集中某一连接点的host和port;zookeeper如果是群集,接口方式为 hostname1:port1, hostname2:port2, hostname3:port3log.retention.hours168每一个日志删除文件以前储存的時间。默认设置数据信息储存時间对全部topic都一样。num.partitions1建立topic的默认设置系统分区数default.replication.factor1全自动建立topic的默认设置团本总数,提议设定为高于或等于2min.insync.replicas1当producer设定acks为-1时,min.insync.replicas特定replicas的最少数量(务必确定每一个repica的写数据信息全是取得成功的),假如这一数量沒有做到,producer推送信息会造成出现异常delete.topic.enablefalse是不是容许删掉主题风格

第四步:建立主题风格。

如今使我们建立一个名叫“test”的主题风格,它只有一个系统分区,备份数据因素配置为1:

bin/Kafka-topics . sh-create-zookeeper 192 . 168 . 65 . 60:2181-拷贝-因素1-系统分区1-主题风格检测

如今我们可以应用下边的指令来查询kafka中当今出现的主题风格。

bin/Kafka-topics . sh-list-野生动物园管理人员192.168.65.60:2181

除开大家手动式建立一个主题风格以外,当经营者向特定的主题风格推送信息时,假如该主题风格不会有,它将被全自动建立。

删掉主题风格

bin/Kafka-topics . sh-delete-topic test-zoom keeper 192 . 168 . 65 . 60:2181

第五步:发送短信。

kafka内置经营者指令手机客户端,能够从本地文件载入內容,还可以同时在cmd键入內容,以信息的方式发送至Kafka群集。默认设置状况下,每一行都被视作单独的信息。

最先,大家必须运作脚本制作来公布信息,随后在命令行中键入要推送的信息內容:

bin/Kafka-console-producer . sh-broker-list 192 . 168 . 65 . 60:9092-主题风格检测>这也是一条信息>这也是另一条信息

第六步:消費新闻报道。

针对顾客而言,kafka还带上了一个cmd手机客户端,它将在命令行中輸出得到的內容。默认设置状况下,它会应用最新动向:

bin/Kafka-console-consumer . sh-bootstrap-server 192 . 168 . 65 . 60:9092-主题风格检测

假如要应用前一条信息,能够根据–-重新开始主要参数特定,如下所示所显示:

bin/Kafka-console-consumer . sh-bootstrap-server 192 . 168 . 65 . 60:9092-重新开始-主题风格检测

假如您根据不一样的终端窗口运作以上指令,您将见到在经营者终端设备中填写的內容,而且迅速会表明在顾客的终端窗口中。

之上全部指令都是有一些额外选择项;在我们在沒有什么主要参数的情形下运作该指令时,大家将表明该指令的详尽使用方法。

多主题风格消費

bin/Kafka-console-consumer . sh-bootstrap-server 192 . 168 . 65 . 60:9092-授权管理“test|test-2”

单播消費

一条信息只有由某一顾客消費。类似序列方式,全部顾客必须在同一个顾客组里。

各自在2个手机客户端上实行下列消費指令,随后向主题风格推送信息。因而,只有一个手机客户端能够接受信息。

bin/Kafka-console-consumer . sh-bootstrap-server 192 . 168 . 65 . 60:9092-consumer-property group . id = TestGrouP-topic test

组播消費

一条信息能够被好几个顾客消費的方式类似公布-定阅方式花费。由于卡夫卡的特性,即同一条信息只有由同一消費人群中的某一顾客消費,要完成组播,只必须确保这种顾客归属于不一样的消费者人群。大家加上另一个顾客,他归属于testGroup-2顾客组,2个手机客户端都能够接受信息。

bin/Kafka-console-consumer . sh-bootstrap-server 192 . 168 . 65 . 60:9092-consumer-property group . id = TestGrouP-2-主题风格检测

查询顾客组名字。

bin/Kafka-consumer-groups . sh-bootstrap-server 192 . 168 . 65 . 60:9092-list

查验消費组的消費冲抵。

bin/Kafka-consumer-groups . sh-bootstrap-server 192 . 168 . 65 . 60:9092-description-group testGroup

kafka应用实例-kafka简单的入门案例-第3张图片当今冲抵:当今耗费组的耗费冲抵。

Log-end-offset:与主题风格相匹配的系统分区信息的完毕偏移(HW)。

落后:当今消費组未消費的信息数。

主题风格和信息日志。

能够解释为:Topic是一个类型的名字,类似信息在同一个Topic下推送。针对每一个主题风格,下边可能有好几个系统分区日志文档:

kafka应用实例-kafka简单的入门案例-第4张图片系统分区是一个井然有序信息编码序列,他们被先后加上到一个名叫递交日志的文档中。每一个系统分区中的信息都是有一个唯一的序号,称之为offset,用以唯一标志系统分区中的信息。

每一个系统分区相匹配一个递交日志文档。系统分区中的信息偏移是唯一的,可是不一样系统分区中的信息偏移可能是同样的。

不管信息是不是被消費,卡夫卡一般都不容易删掉。仅依据配备的日志保存期(log.retention.hours),确定信息将被删掉多久,默认设置保存近期一周的日志信息。kafka的功能与留下的信息信息量不相干,因而储存很多数据信息信息日志信息内容不容易有其他危害。

每一个顾客都根据他自己在递交日志中的购物方案(偏移)工作中。在卡夫卡那边,消費相抵是由顾客自身保持的;一般,大家按序逐一应用递交日志中的信息。自然,我能根据特定偏移来反复耗费一些信息,或是绕过一些信息。

这代表着卡夫卡中的顾客对群集的危害十分小。提升或降低一个用户对群集或别的顾客沒有危害,由于每一个顾客都维持自身的消費赔偿。

为好几个系统分区建立主题风格:

bin/Kafka-topics . sh-create-zookeeper 192 . 168 . 65 . 60:2181-拷贝-因素1-系统分区2-topic test 1

查验题型的状况。

bin/Kafka-topics . sh–description–野生动物园管理人员192 . 168 . 65 . 60:2181–topic test1

kafka应用实例-kafka简单的入门案例-第5张图片leader连接点承担给出系统分区的全部读写能力要求。团本标示系统分区在哪个代理商上面有备份数据。无论这一点是否“领导干部”,即便这一连接点去世了,也会被整理出来。Isr是团本的一个非空子集,它只列举依然活跃性而且早已同歩备份数据系统分区的连接点。提升话题讨论系统分区数(kafka现阶段不兼容降低系统分区):

bin/Kafka-topics . sh-alter–系统分区3–野生动物园管理人员192 . 168 . 65 . 60:2181–主题风格检测

您能够那样了解主题风格,系统分区和代理商。

一个主题风格在逻辑性上表明一个业务流程数据,例如依据数据库查询中不一样表的信息实际操作信息放不一样的主题风格,订单信息有关的实际操作信息放进订单信息主题风格,客户有关的实际操作信息放进客户主题风格。针对商业网站而言,后面数据信息是大量的,订单信息信息很可能是十分极大的,例如有上一百多个G乃至做到TB级别。假如在一台设备上面这么多数据信息必定会出现容积约束的难题,那麼能够在主题风格中区划好几个系统分区,将数据储存在分块中,不一样的系统分区能够坐落于不一样的设备上,每台设备运转一个Kafka process Broker。

为何要对题材下的信息开展系统分区?

1.递交日志文档将遭受其所属设备的系统文件尺寸的限定。系统分区后,不一样的系统分区能够放到不一样的设备上,等同于数据库的分布式系统。理论上,一个主题风格能够解决随意总数的数据信息。

2.为了更好地提升并行性。

卡夫卡群集实战演练

针对kafka而言,单独代理商代表着kafka群集中只有一个连接点。为了更好地提升kafka群集中的进程总数,只必须重新启动好多个代理商案例。为了更好地能够更好地了解,如今我们在一台设备上运行三个代理商案例。

最先,大家必须创建此外2个代理商的环境变量:

cp配备/服务端.特性配备/网络服务器-1 .特性

cp配备/网络服务器.特性配备/网络服务器-2 .特性

环境变量时要改动的信息如下所示:

config/server-1 .特性:

#broker.id特性务必是唯一的broker.id = 1 # kafka布署的用以提高业务的设备ip和端口侦听器=密文://192 . 168 . 65 . 60:9093 log . dir =/usr/local/data/Kafka-logs-1 #。卡夫卡联接的植物园管理人员的详细地址与野生动物园管理人员同样。Connect = 192.168.65.60: 2181,便于集群好几个kafka案例。

config/server-2 .特性:

broker.id = 2 listeners =密文://192 . 168 . 65 . 60:9094 log . dir =/usr/local/data/Kafka-logs-2 zookeeper . connect = 192 . 168 . 65 . 60:2181

现阶段,大家有一个zookeeper案例和一个broker案例已经运作,如今大家只必须运行2个broker案例:

bin/Kafka-server-start . sh-daemon config/server-1 .特性bin/Kafka-server-start . sh-daemon config/server-2 .特性

查验zookeeper以确定是不是全部群集连接点早已取得成功申请注册:

kafka应用实例-kafka简单的入门案例-第6张图片建立一个新主题,将团本数设定为3,系统分区数设定为2:

bin/Kafka-topics . sh–create–zookeeper 192 . 168 . 65 . 60:2181–拷贝因素3–系统分区2–topic my-replicated-topic

查验题型的状况。

bin/Kafka-topics . sh–description–野生动物园管理人员192 . 168 . 65 . 60:2181–topic my-replicated-topic

kafka应用实例-kafka简单的入门案例-第7张图片leader连接点承担给出系统分区的全部读/写要求,在同一主题风格的不一样系统分区中,leader团本一般是不一样的(用以容灾备份)。团本标示系统分区在哪个代理商上面有备份数据。无论这一点是否“领导干部”,即便这一连接点去世了,也会被整理出来。Isr是团本的一个非空子集,它只列举依然活跃性而且早已同歩备份数据系统分区的连接点。Kafka在zookeeper中纪录了大批量的重要群集信息内容,确保了其无状态情况,十分有利于横着拓展。

群集消費

的日志系统分区遍布在kafka群集的不一样代理商上,每一个代理商都能够要求备份数据别的代理商上系统分区的数据信息。Kafka群集适用配备系统分区备份数据的总数。

针对每一个系统分区,一个代理商当做“管理者”,0个或大量的别的代理商当做“跟随者”。管理者解决该系统分区的全部读写能力要求,而跟随者处于被动拷贝管理者的結果,不给予读写能力(主要是为了更好地确保多团本数据信息和耗费中间的一致性)。假如这些管理者失败了,在其中一个跟随者将全自动变成新的管理者。

经营者

经营者将讯息发送至主题风格,并承担挑选将讯息发送至主题风格的哪一个系统分区。循环系统用以简易的负载均衡。还可以依据信息中的某一关键词来区别。一般第二种方式用的比较多。

消费者

有二种传统式的消息传递方式:序列和公布-定阅。

queue方式:好几个consumer从服务器中获取数据,信息只能抵达一个consumer。publish-subscribe方式:信息会被广播节目给全部的consumer。

卡夫卡根据这2种形式给予了一个抽象性的消費定义:消費人群。

queue方式:全部的consumer都在同一个consumer group 下。publish-subscribe方式:全部的consumer都拥有自身唯一的consumer group。

由2个艺人经纪人构成的卡夫卡群集在一个主题风格上一共有四个系统分区(P0-P3-P3),他们坐落于不一样的艺人经纪人上。该群集由2个顾客组消費,A有两个顾客案例,B有四个。

一般,一个主题风格会几个消费人群,每一个消费人群全是一个逻辑性定阅者。每一个顾客组由好几个顾客案例构成,进而完成可扩展性和容灾备份作用。

消費次序

针对一个系统分区,一个消費组里只有与此同时消費一个consumersinstance,进而保障了消費的次序。

使用人组里的使用人案例数不可以超出主题风格中的系统分区数,不然,附加的使用人将会耗费信息。

Kafka只确保系统分区范畴内信息消費的部分次序,不可以确保同一主题风格好几个分区域内消費的总次序。

假如必须确保总体上的消費次序,那麼我们可以将主题风格的系统分区数设定为1,将消費组里的消費案例数设定为1,可是那样会危害特性,因此非常少应用卡夫卡的次序消費。

Java手机客户端浏览卡夫卡。

引进maven依靠。

org . Apache . Kafka Kafka-clients 2 . 4 . 1

信息发布者编码(经营者)

package com.yundasys.usercenter.collect.api.vo.req;import com.alibaba.fastjson.JSON;import com.fasterxml.jackson.databind.ser.std.StringSerializer;import org.apache.kafka.clients.producer.*;import java.util.Properties;import java.util.concurrent.CountDownLatch;import java.util.concurrent.ExecutionException;import java.util.concurrent.TimeUnit;/** * @program: usercenter-portrait-collect * @description: MsgProducer * @author: yxh-word * @create: 2021-07-14 * @version: v1.0.0 创建文件, yxh-word, 2021-07-14 **/public class MsgProducer { private final static String TOPIC_NAME = "my-replicated-topic"; public static void main(String[] args) throws InterruptedException, ExecutionException { Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.65.60:9092,192.168.65.60:9093,192.168.65.60:9094"); /* 传出信息分布式锁体制主要参数 (1)acks=0: 表明producer不用等候一切broker确定接到信息的回应,就可以再次推送下一条信息。特性最大,可是最可能丢信息。 (2)acks=1: 最少要等候leader早已取得成功将信息载入当地log,可是不用等候全部follower是不是取得成功载入。就可以再次推送下一 条信息。这类情形下,假如follower沒有取得成功备份数据数据信息,而这时leader又挂了,则信息会遗失。 (3)acks=-1或all: 必须等候 min.insync.replicas(默认设置为1,电脑配置推荐高于或等于2) 这一参数配置的团本数量都取得成功载入日志,这类对策会确保 只需有一个备份数据生存就不容易损坏数据信息。这也是最強的数据信息确保。一般除非是是金融业等级,或跟钱相处的情景才会应用这类配备。 */ /*props.put(ProducerConfig.ACKS_CONFIG, "1"); *//* 推送不成功会再试,默认设置再试间距100ms,再试能确保信息推送的稳定性,可是也可能导致信息反复推送,例如网络抖动,因此必须在 接受者那里搞好信息接受的幂等性解决 *//* props.put(ProducerConfig.RETRIES_CONFIG, 3); //再试间距设定 props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 300); //设定推送信息的当地缓冲区域,假如设定了该缓冲区域,信息会先发送至当地缓冲区域,能够提升信息推送特性,初始值是33554432,即32MB props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); *//* kafka本地进程会从缓冲区域取数据信息,大批量发送至broker, 设定大批量推送信息的尺寸,初始值是16384,即16kb,就是一个batch满了16kb就推送出来 *//* props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); *//* 初始值是0,含意便是信息务必马上被推送,但那样会危害特性 一般设定10ms上下,就是这一信息推送完后会进到当地的一个batch,假如10ms内,这一batch满了16kb便会随batch一起被推送出来 假如10ms内,batch未满,那麼也务必把信息推送出来,不可以让信息的推送时间延迟过长 *//* props.put(ProducerConfig.LINGER_MS_CONFIG, 10);*/ //把推送的key从字符串数组实例化为字节数二维数组 props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); //把推送信息value从字符串数组实例化为字节数二维数组 props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); Producer producer = new KafkaProducer(props); int msgNum = 5; final CountDownLatch countDownLatch = new CountDownLatch(msgNum); for (int i = 1; i 0) { // 手动式同歩递交offset,当今进程会堵塞直至offset操作成功 // 一般应用同歩递交,由于递交以后一般都没有什么逻辑编码了 consumer.commitSync(); // 手动式多线程递交offset,当今进程递交offset不容易堵塞,能够再次解决之后的程序结构 consumer.commitAsync(new OffsetCommitCallback() { @Override public void onComplete(Map offsets, Exception exception) { if (exception != null) { System.err.println("Commit failed for " offsets); System.err.println("Commit failed exception: " exception.getStackTrace()); } } }); }*/ } }}

春靴融进了卡夫卡。

详细介绍扭簧靴卡夫卡依靠:

org . spring framework . Kafka spring-Kafka

application.yml配备如下所示:

server: port: 8080spring: kafka: bootstrap-servers: 192.168.65.60:9092,192.168.65.60:9093,192.168.65.60:9094 producer: # 经营者 retries: 3 # 设定超过0的值,则手机客户端会将推送不成功的纪录再次推送 batch-size: 16384 buffer-memory: 33554432 acks: 1 # 特定信息key和信息体的编解码方法 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer consumer: group-id: default-group enable-auto-commit: false auto-offset-reset: earliest key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer listener: # 当每一条纪录被顾客窃听器(ListenerConsumer)解决以后递交 # RECORD # 当每一批poll()的数据信息被顾客窃听器(ListenerConsumer)解决以后递交 # BATCH # 当每一批poll()的数据信息被顾客窃听器(ListenerConsumer)解决以后,间距之前递交時间超过TIME时递交 # TIME # 当每一批poll()的数据信息被顾客窃听器(ListenerConsumer)解决以后,被解决record总数高于或等于COUNT时递交 # COUNT # TIME | COUNT 有一个标准符合时递交 # COUNT_TIME # 当每一批poll()的数据信息被顾客窃听器(ListenerConsumer)解决以后, 手动式启用Acknowledgment.acknowledge()后递交 # MANUAL # 手动式启用Acknowledgment.acknowledge()后马上递交,一般应用这类 # MANUAL_IMMEDIATE ack-mode: manual_immediate

经营者编码:

package com.yundasys.usercenter.collect.api.vo.req;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.kafka.core.KafkaTemplate;import org.springframework.web.bind.annotation.RequestMapping;/** * @program: usercenter-portrait-collect * @description: KafkaController * @author: yxh-word * @create: 2021-07-14 * @version: v1.0.0 创建文件, yxh-word, 2021-07-14 **/public class KafkaController { private final static String TOPIC_NAME = "my-replicated-topic"; @Autowired private KafkaTemplate kafkaTemplate; @RequestMapping("/send") public void send() { kafkaTemplate.send(TOPIC_NAME, 0, "key", "this is a msg"); }}

顾客编码:

package com.yundasys.usercenter.collect.api.vo.req;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.springframework.kafka.annotation.KafkaListener;import org.springframework.kafka.support.Acknowledgment;/** * @program: usercenter-portrait-collect * @description: MyConsumer * @author: yxh-word * @create: 2021-07-14 * @version: v1.0.0 创建文件, yxh-word, 2021-07-14 **/public class MyConsumer { /** * @KafkaListener(groupId = "testGroup", topicPartitions = { * @TopicPartition(topic = "topic1", partitions = {"0", "1"}), * @TopicPartition(topic = "topic2", partitions = "0", * partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100")) * },concurrency = "6") * //concurrency便是一个组下的顾客数量,便是高并发消費数,务必不大于系统分区数量 * @param record */ @KafkaListener(topics = "my-replicated-topic",groupId = "yundaGroup") public void listenZhugeGroup(ConsumerRecord record, Acknowledgment ack) { String value = record.value(); System.out.println(value); System.out.println(record); //手动式递交offset ack.acknowledge(); } /*//配备好几个消費组 @KafkaListener(topics = "my-replicated-topic",groupId = "likeGroup") public void listenTulingGroup(ConsumerRecord record, Acknowledgment ack) { String value = record.value(); System.out.println(value); System.out.println(record); ack.acknowledge(); }*/}

评论(0条)

刀客源码 游客评论