kafka的暂停消费和重新开始消费问题 - 呢喃的歌声

yaohaitao 2021-08-08 原文


kafka的暂停消费和重新开始消费问题


//暂停kafka的消费 暂停分区的分配
consumer.unsubscribe();//此处不取消订阅暂停太久会出现订阅超时的错误
consumer.pause(consumer.assignment());



//重新消费分区,此处不重新分配会出错
this.open(null,null,null);
    if (null == consumer) {
Properties props = new Properties();
props.put("bootstrap.servers", PropertiesUtil.getValue("bootstrap.servers"));
// 消费者的组id
props.put("group.id", constant.kafka_groupName);//Spider2
props.put("enable.auto.commit", "false");
// max.poll.interval.ms(官网给得默认值为3000)的意思为,当我们从kafkaServer端poll消息时,poll()的调用之间的最大延迟。
// 这提供了消费者在获取更多记录之前可以空闲的时间量的上限。 如果在此超时到期之前未调用poll(),则认为使用者失败,并且消费
// 者组将重新平衡以便将分区重新分配给其他消费者,而恰好这里我们设置了Thread.sleep(6000) > max.poll.interval.ms值,
// 也就是我们在手动提交的时候,实际上分区信息已经被分配到整个消费者组里面的其它消费者了
props.put("auto.commit.interval.ms", "3000");
// 从poll(拉)的回话处理时长
props.put("session.timeout.ms", "100000");
props.put("request.timeout.ms", "200000");
props.put("max.poll.records", "2");
// poll的数量限制
// props.put("max.poll.records", "100");
/* props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");*/
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
props.put("group.name", UUID.randomUUID().toString().replaceAll("-", ""));
consumer = new KafkaConsumer<String, String>(props);

// 订阅主题列表topic
//consumer.subscribe(Arrays.asList("test_input"));
}
//注册kafka rebalanceListener
//consumer.subscribe(Arrays.asList("test_etl"), new ConsumerRebalanceListener(){

listener = new ConsumerRebalanceListener(){
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
System.out.printf("threadId = {}, onPartitionsRevoked.", Thread.currentThread().getId());
consumer.commitSync(offsetsMap);
consumer.commitSync();
}
@Override
public void onPartitionsAssigned(
Collection<TopicPartition> partitions) {
System.out.printf("threadId = {}, onPartitionsAssigned.", Thread.currentThread().getId());
consumer.commitSync();
offsetsMap.clear();
}};

consumer.subscribe(Arrays.asList(topicName.split(",")[0],topicName.split(",")[1],topicName.split(",")[2]), listener);
consumer.resume(consumer.assignment());
发表于
2020-01-09 18:08 
呢喃的歌声 
阅读(4275
评论(0
编辑 
收藏 
举报

 

版权声明:本文为yaohaitao原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://www.cnblogs.com/yaohaitao/p/12172867.html

kafka的暂停消费和重新开始消费问题 - 呢喃的歌声的更多相关文章

  1. matlab中画三维图形 – Dec-Fth

    matlab中画三维图形 这里主要讲述两个方法用matlab画三维图形: 1.mesh函数 先看一个简单的例子 […]...

  2. Java从控制台输入获取数据的几种常用方法 – android开发实例

    Java从控制台输入获取数据的几种常用方法 1、使用标准输入串对象System.in System.in.re […]...

  3. jumpserver安装 – fatyao

    jumpserver安装 一. 准备 Python3 和 Python 虚拟环境  1.1 安装依赖包 yum […]...

  4. CSS像素、物理像素、逻辑像素、设备像素比、PPI、Viewport(转载):

    最近看了很多这方面的文章,能搜到的基本看了个遍,但感觉还是似懂非懂,知道这个东西,很难说出这是个什么东西,先整 […]...

  5. 135首经典欧美歌曲 —附下载地址 – _o~ 努力!

    135首经典欧美歌曲 —附下载地址 135首经典欧美歌曲 —附下载地址  (2011- […]...

  6. 快速排序算法C语言实现(源代码) – ultimate

    快速排序算法C语言实现(源代码) 快速排序算法 快速排序算法在很多的数据结构与算法书中都有讲解,关于它不过多介 […]...

  7. IDEA安装Git – 百里登峰

    IDEA安装Git 1、下载Git 官方地址为:https://git-scm.com/download/wi […]...

  8. 异常“itunes无法连接iphone 因为收到来自此设备的无效响应”的解决办法 – z计划

    异常“itunes无法连接iphone 因为收到来自此设备的无效响应”的解决办法 删除电脑上以下位置的Lock […]...

随机推荐

  1. RFC2616-HTTP1.1-Methods(方法规定部分—译文)

    part of Hypertext Transfer Protocol — HTTP/1.1RFC […]...

  2. URL参数编码

    简单明了区分escape、encodeURI和encodeURIComponent 一、前言讲这3个方法区别的 […]...

  3. 【数据结构与算法】时间复杂度的计算

    算法时间复杂度的计算 [整理] 博客分类:   算法学习 时间复杂度算法  基本的计算步骤  时间复杂度的定义 […]...

  4. 【ipad神坑】ipad麦克风听不到声音怎么回事 微信QQ语音视频对方都听不到

      今天遇到了这个问题 说话听不见,但是敲击ipad,可以明显的听到击打的声音  siri也是可以听到 上网上 […]...

  5. spider基础知识

    1. 描述下scrapy框架运行的机制?答:从start_urls里获取第一批url并发送请求,请求由引擎交给 […]...

  6. Axure RP8默认元件库无法显示 无法使用的问题

    Axure RP8在windows10系统,过段时间就会出现默认元件库不显示图标,只显示标题,并且无法拖入使用 […]...

  7. 斯坦福大学机器学习课程第一周笔记

    课程地址:https://www.coursera.org/learn/machine-learning/ho […]...

  8. ubuntu 18.04下greenplum安装笔记(一)Linux下基础环境的搭建

    背景 需要构建一个用于数据仓库的分布式数据库集群。 每一个节点暂时不需要进行备份,同时也不考虑坏掉的情况。 每 […]...

展开目录

目录导航