RocketMQ源码分析三之删除Topic

RocketMQ源码分析三之删除Topic

首页枪战射击源代码: 删除更新时间:2024-05-01

上一章中。我们分析了 RocketMQ 创建 Topic 的命令过程。本章,我们开始分析 删除 Topic 的过程。

命令参数

用法:sh mqadmin deleteTopic -n 192.168.1.100:9876
指令:deleteTopic
代码入口:org.apache.rocketmq.tools.command.topic.DeleteTopicSubCommand

参数

是否必填

说明

-c

cluster 名称,表示topic 建在该集群(集群可通过clusterList 查询)

-h

打印帮助

-n

nameserve 服务地址列表,格式ip:port;ip:port;…

-t

Topic 名字

解析命令行参数入口

// RocketMQ 配置了 命令行的执行 shell 脚本入口。就是下面的 mqadmin.sh 这个文件 mqadmin.sh // 解析命令行入口 org.apache.rocketmq.tools.command.MQAdminStartup#main0 // 设置 namesrvAddr 为全局变量。 if (commandLine.hasOption('n')) { String namesrvAddr = commandLine.getOptionValue('n'); System.setProperty(MixAll.NAMESRV_ADDR_PROPERTY, namesrvAddr); } 整体流程

RocketMQ 删除 Topic 大概流程

Broker 删除无用Topic代码片段

@Override public int cleanUnusedTopic(Set<String> topics) { Iterator<Entry<String, ConcurrentMap<Integer, ConsumeQueue>>> it = this.consumeQueueTable.entrySet().iterator(); while (it.hasNext()) { Entry<String, ConcurrentMap<Integer, ConsumeQueue>> next = it.next(); String topic = next.getKey(); // Topic可以删除 if (!topics.contains(topic) && !topic.equals(ScheduleMessageService.SCHEDULE_TOPIC)) { ConcurrentMap<Integer, ConsumeQueue> queueTable = next.getValue(); for (ConsumeQueue cq : queueTable.values()) { // 消费者队列销毁,mapped 文件删除 cq.destroy(); log.info("cleanUnusedTopic: {} {} ConsumeQueue cleaned", cq.getTopic(), cq.getQueueId() ); this.commitLog.removeQueueFromTopicQueueTable(cq.getTopic(), cq.getQueueId()); } it.remove(); log.info("cleanUnusedTopic: {},topic destroyed", topic); } } return 0; } Consume Queue 关闭&删除

在 RocketMQ 的基础概念的一章中提到。Topic 是有 N 个queue,而且,每个 Consume queue 都是独立的一个 MappFile 文件。Consumer queue 就相当于 消息在 commitLog 的二级索引。

public void destroy() { this.maxPhysicOffset = -1; this.minLogicOffset = 0; this.mappedFileQueue.destroy(); if (isExtReadEnable()) { this.consumeQueueExt.destroy(); } } public void destroy() { for (MappedFile mf : this.mappedFiles) { mf.destroy(1000 * 3); } // 移除所有的元素,让垃圾回收器回收垃圾 this.mappedFiles.clear(); this.flushedWhere = 0; // delete parent directory File file = new File(storePath); if (file.isDirectory()) { file.delete(); } } 总结




关于我
前 去哪儿网 技术专家!混迹中间件职场8 年!分享各种Java中间件知识!

查看全文
大家还看了
也许喜欢
更多游戏

Copyright © 2024 妖气游戏网 www.17u1u.com All Rights Reserved