java - Apache Camel と Kafka を使用するときにバックプレッシャーを処理するにはどうすればよいですか?

okwaves2024-01-25  7

私は Camel を使用して Kafka と統合するアプリケーションを作成しようとしています。 (バージョン - 3.4.2)

私はこの質問への回答から借用したアプローチをとります。

Kafka トピックからのメッセージをリッスンするルートがあります。このメッセージの処理は、単純なエグゼキュータを使用することで消費から切り離されます。各処理はタスクとしてこの実行者に送信されます。メッセージの順序は重要ではありません。唯一の懸念要素は、メッセージをいかに迅速かつ効率的に処理できるかということです。自動コミットを無効にし、タスクがエグゼキュータに送信されたらメッセージを手動でコミットします。現在処理中のメッセージ (クラッシュ/シャットダウンによる) が失われるのは問題ありませんが、Kafka 内で一度も送信されていないメッセージは失われます。(オフセットのコミットにより) 処理が失われることはありません。それでは質問に移ります。

どうすれば負荷を効率的に処理できるでしょうか?たとえば、メッセージが 1000 件ありますが、一度に並列処理できるのは 100 件だけです。

現時点での解決策は、コンシューマのポーリング スレッドをブロックし、ジョブを継続的に送信しようとすることです。しかし、投票を一時停止する方がはるかに良いアプローチだと思いますが、Camel ではそれを実現する方法が見つかりません。

処理を消費から切り離し、バックプレッシャーを処理する、より良い方法 (Camel の方法) はありますか?

public static void main(String[] args) throws Exception {
        String consumerId = System.getProperty("consumerId", "1");
        ExecutorService executor = new ThreadPoolExecutor(100, 100, 0L, TimeUnit.MILLISECONDS,
                new SynchronousQueue<>());
        LOGGER.info("Consumer {} starting....", consumerId);

        Main main = new Main();
        main.init();

        CamelContext context = main.getCamelContext();
        ComponentsBuilderFactory.kafka().brokers("localhost:9092").metadataMaxAgeMs(120000).groupId("consumer")
                .autoOffsetReset("earliest").autoCommitEnable(false).allowManualCommit(true).maxPollRecords(100)
                .register(context, "kafka");

        ConsumerBean bean = new ConsumerBean();
        context.addRoutes(new RouteBuilder() {
            @Override
            public void configure() {
                from("kafka:test").process(exchange -> {
                    LOGGER.info("Consumer {} - Exhange is {}", consumerId, exchange.getIn().getHeaders());
                    processTask(exchange);
                    commitOffset(exchange);
                });
            }

            private void processTask(Exchange exchange) throws InterruptedException {
                try {
                    executor.submit(() -> bean.execute(exchange.getIn().getBody(String.class)));
                } catch (Exception e) {
                    LOGGER.error("Exception occured {}", e.getMessage());
                    Thread.sleep(1000);
                    processTask(exchange);
                }
            }

            private void commitOffset(Exchange exchange) {
                boolean lastOne = exchange.getIn().getHeader(KafkaConstants.LAST_RECORD_BEFORE_COMMIT, Boolean.class);
                if (lastOne) {
                    KafkaManualCommit manual = exchange.getIn().getHeader(KafkaConstants.MANUAL_COMMIT,
                            KafkaManualCommit.class);
                    if (manual != null) {
                        LOGGER.info("manually committing the offset for batch");
                        manual.commitSync();
                    }
                } else {
                    LOGGER.info("NOT time to commit the offset yet");
                }
            }
        });

        main.run();
    }

1

processTask を seda と options blockWhenFull=true&concurrentConsumers=100&size=100 に置き換えるとよいでしょう

– ベドラ

2020 年 9 月 4 日 1:07



------------------------

この目的には、スロットル EIP を使用できます。

from("your uri here")
.throttle(maxRequestCount)
.timePeriodMillis(inTimePeriodMs)
.to(yourProcessorUri)
.end()

ここでオリジナルのドキュメントをご覧ください。

総合生活情報サイト - OKWAVES
総合生活情報サイト - OKWAVES
生活総合情報サイトokwaves(オールアバウト)。その道のプロ(専門家)が、日常生活をより豊かに快適にするノウハウから業界の最新動向、読み物コラムまで、多彩なコンテンツを発信。