【Java×分散処理】ログ分析基盤を完全自作!Kafka+Elasticsearchで爆速データ処理を実現

プログラミング

「大量のログをどう処理すればいいのかわからない」
「Javaでログ分析基盤を作れるのか不安」

このような悩みを持つ方に向けて、この記事ではJavaと分散処理技術を組み合わせたログ分析基盤の作り方を解説します。

Apache KafkaやElasticsearchを連携させることで、リアルタイムかつ大規模なログデータの収集・検索・可視化が可能になります。

サンプルコードとともに、構成・実装・エラー回避まで網羅していますので、誰でも安心して構築に取り組めます。


ログ分析基盤の役割と必要性

分散処理が求められる理由を知りましょう

結論:ログ分析にはリアルタイム性と拡張性が必要です。

現代のWebアプリやIoTサービスでは、1日あたり数百万件のログが発生します。
この膨大なデータを効率よく収集・検索・可視化するためには、以下のような要素が求められます:

  • 高速なログの受信(Apache Kafkaなど)
  • 効率的な保存と検索(Elasticsearch)
  • リアルタイム可視化(Kibanaなど)

経済産業省も、DX推進の中でデータ活用とリアルタイム分析の重要性を示しています。
(出典:経済産業省 DXレポート


全体構成と技術スタックの解説

JavaとKafkaを中心に構成を設計します

結論:ログの収集→転送→検索→可視化までの流れを押さえましょう。

全体構成:

1
[アプリケーション(Java)] → [Kafka] → [Log Consumer(Java)] → [Elasticsearch] → [Kibana]

使用技術:

  • Java:ログ生成・転送・受信処理に使用
  • Apache Kafka:ログの分散受信とバッファ
  • Elasticsearch:全文検索エンジン
  • Kibana:データの可視化ダッシュボード

JavaでKafka Producerを実装

アプリケーション側でログを送信します

結論:Kafkaにログを送るProducerをJavaで実装します。

pom.xml の依存追加

1
2
3
4
5
<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka-clients</artifactId>
  <version>3.0.0</version>
</dependency>

LogProducer.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import org.apache.kafka.clients.producer.*;
 
import java.util.Properties;
 
public class LogProducer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
 
        Producer<String, String> producer = new KafkaProducer<>(props);
 
        for (int i = 0; i < 10; i++) {
            producer.send(new ProducerRecord<>("logs", "log-" + i, "アクセスログ " + i));
        }
 
        producer.close();
    }
}

JavaでKafka Consumerを実装しElasticsearchへ連携

Kafkaから受け取りElasticsearchに保存します

結論:Consumerがログを受け取り、Elasticsearchに格納します。

LogConsumer.java(抜粋)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import org.apache.kafka.clients.consumer.*;
import org.elasticsearch.client.*;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.action.index.*;
 
public class LogConsumer {
    public static void main(String[] args) {
        // Kafka設定
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "log-group");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
 
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singletonList("logs"));
 
        // Elasticsearch設定
        RestHighLevelClient client = new RestHighLevelClient(
            RestClient.builder(new HttpHost("localhost", 9200, "http")));
 
        while (true) {
            for (ConsumerRecord<String, String> record : consumer.poll(Duration.ofMillis(100))) {
                IndexRequest request = new IndexRequest("log-index")
                    .source(Map.of("message", record.value()));
                client.index(request, RequestOptions.DEFAULT);
            }
        }
    }
}

躓きやすいエラーと解決策

エラー内容原因解決策
Connection refused: localhost:9092Kafkaが起動していないkafka-server-start.shで起動確認
IndexNotFoundExceptionElasticsearchにインデックス未作成自動作成を許可、またはPUT /log-indexで作成
Javaの依存解決エラーMavenの設定不備pom.xmlを更新後mvn clean install実行

可視化と応用例

Kibanaでダッシュボードを作成しよう

結論:Kibanaを使えばログ内容をグラフや表で直感的に表示できます。

  • ログ数の推移を時間別に表示
  • 特定のエラーメッセージをフィルタ
  • レスポンス時間をヒートマップ化

応用アイデア:

  • アラート機能(一定数のエラー発生時に通知)
  • ユーザー行動ログのマーケティング活用

完成構成とまとめ

1
2
3
4
5
6
src/
├─ LogProducer.java
├─ LogConsumer.java
├─ pom.xml
├─ Elasticsearch + Kibana
└─ Kafka Server

起動順:

  1. Kafka起動
  2. Elasticsearch起動
  3. Kibana起動
  4. Producer実行
  5. Consumer実行

まとめ:Javaでも分散ログ基盤は実現できる

この記事では、JavaとKafkaを使った分散処理対応のログ分析基盤を構築する手順を解説しました。

習得できたこと:

  • JavaからKafkaにログを送信する方法
  • KafkaからElasticsearchに転送する処理
  • よくあるエラーと解決方法
  • Kibanaでログを可視化する方法

今後は、セキュリティログやアクセスログの監視システムに応用することで、より実用的なシステム開発が可能になります。

タイトルとURLをコピーしました