Apache Camel camel-kafka Component to Commit Consumer Offsets Manually
1 min readDec 10, 2019
โดยปรกติถ้าไม่มีการ set ค่า allowManualCommit=true Camel จะทำ Auto Commit ให้อัตโนมัติ ทุกๆ 5 วินาที ไม่พูดพร่ำทำเพลงดูตัวอย่างกันเลยนะครับ
from("kafka:topic?bokers=xxxx.........&allowManualCommit=true&offsetRepository=#fileStore")
.autoStartup(true)
.bean(SomeBean1.class)
.bean(SomeBean2.class)
.bean(SomeBean3.class)
.process(exchange -> {
KafkaManualCommit manual = exchange.getIn().getHeader(KafkaConstants.MANUAL_COMMIT, KafkaManualCommit.class);
if (manual != null) {
manual.commitSync();
}
});
@Bean(name = "fileStore", initMethod = "start", destroyMethod = "stop")
private FileStateRepository fileStore() {
FileStateRepository fileStateRepository =
FileStateRepository.fileStateRepository(new File(camelKafkaFileStore));
fileStateRepository.setMaxFileStoreSize(10485760); // 10MB max
return fileStateRepository;
}
จาก Router ข้างบ้านเราต้องการให้ SomeBean1–3 ทำงานให้ครอบถ้วนทุกตัวแล้วถึงจะ commit ถ้าเกิดมีการทำงานผิดพลาดสมมติว่าที่ SomeBean2 แต่ Camel ได้ทำการ Auto Commit ไปแล้ว Message นั้นก็จะหายไปเรียบร้อย ครับ ลองเอาไปประยุกต์ใช้ดูนะครับ