- How to delete records from a Kafka topic
- Kafka-delete-records
- Deleting all the records in a topic
- Delete a topic and create it again
- Few things to be aware of when using this approach
- Would you like to learn more about Kafka?
- Java kafka clean topic
- Руководство по очистке темы Apache Kafka
- 2. Сценарий очистки
- 2.1. Сценарий
- 2.2. Моделирование
- 3. Срок действия сообщения
- 4. Выборочное удаление записи
- 5. Удалите и заново создайте тему
- 6. Заключение
How to delete records from a Kafka topic
Photo credit: Adli Wahid Every now and then I get a request from my colleagues who would like to delete some or all the records from a Kafka topic. The request usually comes after someone has produced the wrong data in a test topic while playing around or due to a bug in the producer code. Or simply because they want a clean slate. Whatever the reason, today I’ll show you a few ways to delete some or all the records from a Kafka topic. It should go without saying that you should use your best judgment and check (at least) twice before using the methods described below in a production environment.
Kafka-delete-records
The command allows you to delete all the records from the beginning of a partition, until the specified offset.
NOTE: It is not possible to delete records in the middle of the topic.
The JSON file specifies one or more partitions from which we want to remove the records. Let’s create delete-records.json file as below:
Here we’ve specified that for the partition 0 of the topic “my-topic” we want to delete all the records from the beginning until offset 3.
Now we’re ready to delete records. Execute:
kafka-delete-records --bootstrap-server localhost:9092 \ --offset-json-file delete-records.json
After the command finishes the start offset for the partition 0 will be 3.
Deleting all the records in a topic
NOTE: This will not work for compacted topics
If you want to prune all the messages, another option is to reduce the retention of the topic to a small value (e.g. 100ms), wait for the brokers to remove all the records from the topic and then set the topic retention to its original value. Here’s how to do it.
First, set the retention.ms to 100 milliseconds.
kafka-configs --zookeeper localhost:2181 \ --entity-type topics \ --entity-name my-topic \ --alter --add-config retention.ms=100
Then, wait for the brokers to remove the messages with expired retention (that is, all of them). To know if the process is finished, check whether the start offset and end offset are the same. This means there are no more records available on the topic. Depending on your setup, it might take few minutes for Kafka to clean up the topic, so keep checking the start offset.
Use the GetOffsetShell class to check the beginning and ending offset of a topic’s partitions. To check the end offset set parameter time to value -1:
kafka-run-class kafka.tools.GetOffsetShell \ --broker-list localhost:9092 \ --topic my-topic \ --time -1
To check the start offset, use —time -2
kafka-run-class kafka.tools.GetOffsetShell \ --broker-list localhost:9092 \ --topic my-topic \ --time -2
Once the topic has been purged, return the retention.ms to its original value:
kafka-configs --zookeeper localhost:2181 \ --entity-type topics \ --entity-name my-topic \ --alter --add-config retention.ms=
Delete a topic and create it again
Not as elegant as the previous two approaches, yet it might be an easier solution in some cases (e.g. if topic creation is scripted).
kafka-topics --bootstrap-server localhost:9092 \ --topic my-topic \ --delete
kafka-topics --bootstrap-server localhost:9092 \ --topic my-topic \ --create \ --partitions \ --replication-factor
Few things to be aware of when using this approach
Make sure the deletion of topics is enabled in your cluster. Set delete.topic.enable=true. From Kafka 1.0.0 this property is true by default.
Make sure all consumers have stopped consuming the data from the topic you want to delete. Otherwise, they will throw errors like:
Received unknown topic or partition error in fetch for partition my-topic-0
Error while fetching metadata with correlation id 123 :
One more thing that might happen if you have consumers up and running is that the topic will get auto-created if the cluster-wide property auto.create.topics.enable is true (and by default it is). Not bad per se, but it will use a default number of partitions (1) and a replication factor (1), which might not be what you wanted.
Moral of the story is – make sure to stop your consumers before using this approach 🙂
Would you like to learn more about Kafka?
I have created a Kafka mini-course that you can get absolutely free. Sign up for it over at Coding Harbour.
Java kafka clean topic
Tip, before deleting Topic, be sure to confirm whether Kafka’s Broker has this configuration
If False is deleted unsuccessful, it must be confirmed whether it is true
PS -EF | GREP KAFKA to see Kafka’s startup parameter configuration
1. Low version of kafka_2.11
org.apache.kafka kafka_2.11 0.9.0.1
public Map deleteTopic(List topics) if (CollectionUtils.isEmpty(topics)) return null; > String zkServer = "zk-1-svc:4180,zk-2-svc:4180,zk-3-svc:4180"; int sessionTimeoutMs = 30 * 1000; int connectionTimeoutMs = 30 * 1000; ZkClient client = new ZkClient(zkServer, sessionTimeoutMs, connectionTimeoutMs); client.setZkSerializer(new StringZkSerializer()); // Be sure to add ZKSerializer // Security for Kafka was added in Kafka 0.9.0.1 boolean isSecureKafkaCluster = false; ZkUtils zkUtils = new ZkUtils(client, new ZkConnection(zkServer), isSecureKafkaCluster); Map result = new HashMap<>(); for (String topicName : topics) try if (!AdminUtils.topicExists(zkUtils,topicName)) continue; > AdminUtils.deleteTopic(zkUtils, topicName); result.put(topicName,"success"); > catch (Exception e) < logger.error("TopicService-DeleTetopic delete topic failure, topicName = <>", topicName, e); result.put(topicName,"fail"); > > client.close(); return result; >
2. High This version of Kafka_2.11 deletes Topic
org.apache.kafka kafka_2.11 1.1.1
public void delete(String topic) int sessionTimeoutMs = 30 * 1000; int connectionTimeoutMs = 30 * 1000; ZooKeeperClient zooKeeperClient = new ZooKeeperClient("zk-1-svc:4180,zk-2-svc:4180,zk-3-svc:4180",sessionTimeoutMs,connectionTimeoutMs,10, Time.SYSTEM,"",""); KafkaZkClient kafkaZkClient = new KafkaZkClient(zooKeeperClient,false,Time.SYSTEM); try < TopicCommand.deleteTopic(kafkaZkClient, new TopicCommand.TopicCommandOptions( new String[], topic>)); >catch (Exception e)< System.out.println("fail"); > System.out.println("success"); >
3: The deletion method of the Kafka client comes with
org.apache.kafka kafka-clients 1.1.1
public Map deleteTopic(String topicName) < Properties properties = new Properties(); properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "broker-1:9092,broker-2:9092,broker-3:9092"); properties.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000); AdminClient adminClient = AdminClient.create(properties); DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(Collections.singleton(topicName)); Map map = new HashMap<>(); try for (Map.Entry> entry : deleteTopicsResult.values().entrySet()) < String topic = entry.getKey(); KafkaFuture future = entry.getValue(); future.get();//implement map.put(topic, !future.isCompletedExceptionally()); > > catch (Exception e) return map; >
Руководство по очистке темы Apache Kafka
В этой статье мы рассмотрим несколько стратегий очистки данных из раздела Apache Kafka .
2. Сценарий очистки
Прежде чем мы изучим стратегии очистки данных, давайте познакомимся с простым сценарием, требующим действия по очистке.
2.1. Сценарий
Сообщения в Apache Kafka автоматически удаляются после настроенного времени хранения . Тем не менее, в некоторых случаях мы можем захотеть, чтобы удаление сообщения произошло немедленно.
Давайте представим, что в код приложения, которое генерирует сообщения в топике Kafka, был внесен дефект. К тому времени, когда будет интегрировано исправление ошибки, у нас уже есть много поврежденных сообщений в теме Kafka , которые готовы к употреблению.
Такие проблемы чаще всего встречаются в среде разработки, и нам нужны быстрые результаты. Так что массовое удаление сообщений — дело разумное.
2.2. Моделирование
Чтобы смоделировать сценарий, давайте начнем с создания темы purge-scenario из каталога установки Kafka:
$ bin/kafka-topics.sh \ --create --topic purge-scenario --if-not-exists \ --partitions 2 --replication-factor 1 \ --zookeeper localhost:2181
Далее воспользуемся командой shuf для генерации случайных данных и передачи их сценарию kafka-console-producer.sh :
$ /usr/bin/shuf -i 1-100000 -n 50000000 \ | tee -a /tmp/kafka-random-data \ | bin/kafka-console-producer.sh \ --bootstrap-server=0.0.0.0:9092 \ --topic purge-scenario
Мы должны отметить, что мы использовали команду tee для сохранения данных моделирования для последующего использования.
Наконец, давайте проверим, что потребитель может потреблять сообщения из темы:
$ bin/kafka-console-consumer.sh \ --bootstrap-server=0.0.0.0:9092 \ --from-beginning --topic purge-scenario \ --max-messages 3 76696 49425 1744 Processed a total of 3 messages
3. Срок действия сообщения
Сообщения, созданные в теме purge-scenario , будут иметь срок хранения по умолчанию , равный семи дням. Чтобы очистить сообщения, мы можем временно сбросить свойство уровня темы Retention.ms на десять секунд и дождаться истечения срока действия сообщений:
$ bin/kafka-configs.sh --alter \ --add-config retention.ms=10000 \ --bootstrap-server=0.0.0.0:9092 \ --topic purge-scenario \ && sleep 10
Далее, давайте проверим, истек ли срок действия сообщений из темы:
$ bin/kafka-console-consumer.sh \ --bootstrap-server=0.0.0.0:9092 \ --from-beginning --topic purge-scenario \ --max-messages 1 --timeout-ms 1000 [2021-02-28 11:20:15,951] ERROR Error processing message, terminating consumer process: (kafka.tools.ConsoleConsumer$) org.apache.kafka.common.errors.TimeoutException Processed a total of 0 messages
Наконец, мы можем восстановить исходный семидневный срок хранения темы:
$ bin/kafka-configs.sh --alter \ --add-config retention.ms=604800000 \ --bootstrap-server=0.0.0.0:9092 \ --topic purge-scenario
При таком подходе Kafka будет удалять сообщения во всех разделах для темы purge-scenario .
4. Выборочное удаление записи
Иногда нам может понадобиться выборочно удалить записи в одном или нескольких разделах из определенной темы . Мы можем удовлетворить такие требования, используя скрипт kafka-delete-records.sh .
Во-первых, нам нужно указать смещение на уровне раздела в файле конфигурации delete-config.json .
Давайте очистим все сообщения от partition=1 , используя offset=-1 :
"partitions": [ "topic": "purge-scenario", "partition": 1, "offset": -1 > ], "version": 1 >
Далее приступим к удалению записи:
$ bin/kafka-delete-records.sh \ --bootstrap-server localhost:9092 \ --offset-json-file delete-config.json
Мы можем убедиться, что мы все еще можем читать из partition=0 :
$ bin/kafka-console-consumer.sh \ --bootstrap-server=0.0.0.0:9092 \ --from-beginning --topic purge-scenario --partition=0 \ --max-messages 1 --timeout-ms 1000 44017 Processed a total of 1 messages
Однако, когда мы читаем из partition=1 , записей для обработки не будет:
$ bin/kafka-console-consumer.sh \ --bootstrap-server=0.0.0.0:9092 \ --from-beginning --topic purge-scenario \ --partition=1 \ --max-messages 1 --timeout-ms 1000 [2021-02-28 11:48:03,548] ERROR Error processing message, terminating consumer process: (kafka.tools.ConsoleConsumer$) org.apache.kafka.common.errors.TimeoutException Processed a total of 0 messages
5. Удалите и заново создайте тему
Другой обходной путь для очистки всех сообщений темы Kafka — удалить и создать ее заново. Однако это возможно только в том случае, если мы установим для свойства delete.topic.enable значение true при запуске сервера Kafka : ** « **
$ bin/kafka-server-start.sh config/server.properties \ --override delete.topic.enable=true
Чтобы удалить тему, мы можем использовать скрипт kafka-topics.sh :
$ bin/kafka-topics.sh \ --delete --topic purge-scenario \ --zookeeper localhost:2181 Topic purge-scenario is marked for deletion. Note: This will have no impact if delete.topic.enable is not set to true.
Давайте проверим это, перечислив тему:
$ bin/kafka-topics.sh --zookeeper localhost:2181 --list
Убедившись, что темы больше нет в списке, мы можем продолжить и воссоздать ее.
6. Заключение
В этом руководстве мы смоделировали сценарий, в котором нам нужно очистить тему Apache Kafka. Более того, мы рассмотрели несколько стратегий полной или выборочной очистки данных по разделам .