Apache Camel camel-kafka Component to Commit Consumer Offsets Manually

Chiwa Kantawong (Pea)
1 min readDec 10, 2019

--

โดยปรกติถ้าไม่มีการ set ค่า allowManualCommit=true Camel จะทำ Auto Commit ให้อัตโนมัติ ทุกๆ 5 วินาที ไม่พูดพร่ำทำเพลงดูตัวอย่างกันเลยนะครับ

from("kafka:topic?bokers=xxxx.........&allowManualCommit=true&offsetRepository=#fileStore")
.autoStartup(true)
.bean(SomeBean1.class)
.bean(SomeBean2.class)
.bean(SomeBean3.class)
.process(exchange -> {
KafkaManualCommit manual = exchange.getIn().getHeader(KafkaConstants.MANUAL_COMMIT, KafkaManualCommit.class);
if (manual != null) {
manual.commitSync();
}
});
@Bean(name = "fileStore", initMethod = "start", destroyMethod = "stop")
private FileStateRepository fileStore() {
FileStateRepository fileStateRepository =
FileStateRepository.fileStateRepository(new File(camelKafkaFileStore));
fileStateRepository.setMaxFileStoreSize(10485760); // 10MB max

return fileStateRepository;
}

จาก Router ข้างบ้านเราต้องการให้ SomeBean1–3 ทำงานให้ครอบถ้วนทุกตัวแล้วถึงจะ commit ถ้าเกิดมีการทำงานผิดพลาดสมมติว่าที่ SomeBean2 แต่ Camel ได้ทำการ Auto Commit ไปแล้ว Message นั้นก็จะหายไปเรียบร้อย ครับ ลองเอาไปประยุกต์ใช้ดูนะครับ

--

--

Chiwa Kantawong (Pea)
Chiwa Kantawong (Pea)

Written by Chiwa Kantawong (Pea)

Software Development Expert at Central Tech

No responses yet