ようこそゲストさん

無能日記

メッセージ欄

2012年4月の日記

一覧で表示する

2012/04/10(火) flume NGのプラグインを作成してみた

はてブ 2012/04/10 5:41 R&D (flume)poti
前回のつづき

flume NGのpluginを作成してみた

  • 1. eclipseにflumeMyPluginというプロジェクトを新たに作る

  • 2. 前回FLUME_CLASSPATHにしていしたディレクトリないのjarをコピーしてきてeclipseのビルドパスに通す。
 # scp -r /root/download/apache-flume-1.1.0-incubating/flume-ng-dist/target/flume-1.1.0-incubating/lib worker@workhost:
 コピーしたlibをD&DでflumetMyPluginにコピーして、コピーしたjarをbuild pathへ登録する
 (全部じゃなくてもいいけど、面倒なので全部がお手軽)

  • 3. srcのtestパッケージに新規にFlumeMySourceというclassファイルを作成
以下のように編集
package test;

import java.util.HashMap;
import java.util.Map;

import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.PollableSource;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.EventBuilder;
import org.apache.flume.source.AbstractSource;

public class FlumeMySource extends AbstractSource implements PollableSource,
		Configurable {
	String msg = null;

	@Override
	public void configure(Context context) {
		msg = context.getString("message");
	}

	@Override
	public void start() {
		// 開始処理
	}

	@Override
	public void stop() {
		// 停止処理
	}

	@Override
	public Status process() throws EventDeliveryException {
		// headerを作成
		Map<String, String> headers = new HashMap<String, String>();
		headers.put("hoge", "fuga");
		// bodyを作成
		String body = "hello " + msg + "!!";
		// イベントの作成
		Event e = EventBuilder.withBody(body.getBytes(), headers);
		// イベントをチャンネルへ送る
		getChannelProcessor().processEvent(e);
		// 1秒待つ
		try {
			Thread.sleep(1000);
		} catch (InterruptedException ex) {
			throw new EventDeliveryException("fail", ex);
		}
		return Status.READY;
	}
}

  • 4. propertiesというファイルで設定ファイルを作り、FlumeMySourceを設定する
foo.sources = mySource
foo.channels = memoryChannel
foo.sinks = loggerSink

foo.sources.mySource.type = test.FlumeMySource
foo.sources.mySource.message = world
foo.sources.mySource.channels = memoryChannel

foo.sinks.loggerSink.type = logger
foo.sinks.loggerSink.channel = memoryChannel

foo.channels.memoryChannel.type = memory
foo.channels.memoryChannel.capacity = 100

  • 5. 実行設定
    • run configurationでjava applicationを選ぶ
    • Main classで org.apache.flume.node.Application を指定する
    • argumentsで -n foo -f properties を指定する

  • 6. 実行してみる
以下のような出力が得られる
12/04/10 04:46:50 INFO sink.LoggerSink: Event: { headers:{hoge=fuga} body:[B@80d3d6f }
12/04/10 04:46:51 INFO sink.LoggerSink: Event: { headers:{hoge=fuga} body:[B@48ee22f7 }
12/04/10 04:46:52 INFO sink.LoggerSink: Event: { headers:{hoge=fuga} body:[B@a39ab89 }
12/04/10 04:46:53 INFO sink.LoggerSink: Event: { headers:{hoge=fuga} body:[B@2993a66f }
12/04/10 04:46:54 INFO sink.LoggerSink: Event: { headers:{hoge=fuga} body:[B@2df6df4c }
12/04/10 04:46:55 INFO sink.LoggerSink: Event: { headers:{hoge=fuga} body:[B@2d5253d5 }
12/04/10 04:46:56 INFO sink.LoggerSink: Event: { headers:{hoge=fuga} body:[B@77fddc31 }
12/04/10 04:46:57 INFO sink.LoggerSink: Event: { headers:{hoge=fuga} body:[B@3b835282 }
headersにhoge=fugaが入っている事が分かる。
ただ、これだとbodyの中身が分からないのでログメッセージをもう少しかっこ良く出力するsinkが欲しくなる

  • 7. srcのtestパッケージに新規にFlumeMySinkというclassファイルを作成
以下のように編集する(loggerSinkとほとんど同じ)
package test;

import java.util.Map.Entry;

import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.Transaction;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FlumeMySink extends AbstractSink implements Configurable {

	private static final Logger logger = LoggerFactory
			.getLogger(FlumeMySink.class);

	@Override
	public void configure(Context context) {
		// 設定処理
	}

	@Override
	public void start() {
		// 開始処理
	}

	@Override
	public void stop() {
		// 停止処理
	}

	@Override
	public Status process() throws EventDeliveryException {
		Status result = Status.READY;
                // チャンネルを取得
		Channel channel = getChannel();
                // チャンネルのトランザクションを取得
		Transaction transaction = channel.getTransaction();
		Event e = null;

		try {
			transaction.begin();
                        // チャンネルからイベントを取得
			e = channel.take();
			if (e != null) {
				if (logger.isInfoEnabled()) {
                                        // ログを作る
					StringBuilder sb = new StringBuilder();
					for(Entry<String, String> entry : e.getHeaders().entrySet()){
						sb.append(':').append(entry.getKey()).append('=').append(entry.getValue());
					}
					sb.append(":body=").append(new String(e.getBody()));
					logger.info(sb.toString());
				}
			} else {
				result = Status.BACKOFF;
			}
			transaction.commit();
		} catch (Exception ex) {
			transaction.rollback();
			throw new EventDeliveryException("fail", ex);
		} finally {
			transaction.close();
		}

		return result;
	}
}

  • 8. propertiesにFlumeMySinkを設定する
foo.sources = mySource
foo.channels = memoryChannel
foo.sinks = mySink

foo.sources.mySource.type = test.FlumeMySource
foo.sources.mySource.message = world
foo.sources.mySource.channels = memoryChannel

foo.sinks.mySink.type = test.FlumeMySink
foo.sinks.mySink.channel = memoryChannel

foo.channels.memoryChannel.type = memory
foo.channels.memoryChannel.capacity = 100
  • 9. 実行する
以下のような出力が得られたら成功
12/04/10 05:18:51 INFO test.FlumeMySink: :hoge=fuga:body=hello world!!
12/04/10 05:18:52 INFO test.FlumeMySink: :hoge=fuga:body=hello world!!
12/04/10 05:18:53 INFO test.FlumeMySink: :hoge=fuga:body=hello world!!
12/04/10 05:18:54 INFO test.FlumeMySink: :hoge=fuga:body=hello world!!
12/04/10 05:18:55 INFO test.FlumeMySink: :hoge=fuga:body=hello world!!

補足

sinkにgetTransactionというメソッドがあるが、transactionはchannnelに
実装されていて、sinkがデータを取り出してから失敗した場合に、transaction.rollback()を呼ぶ事で、ロールバックされるようになっている。
また、processメソッドで、Statusを返すようになっているが、
BACKOFFステータースを返すと、runnerがポーリングでwaitを入れるようになる。
繰り返しBACKOFFを返すと、一定時間まで少しずつwaitが入る時間がのびるようだ。
(READYを返した時点でwaitはしなくなる)


let's enjoy flume!!

2012/04/08(日) flume NGを動かしてみた。

はてブ 2012/04/08 7:29 R&D (flume)poti

flume NGを動かしてみた

fluent(ruby)が盛り上がってそうなので、ここはあえてflume(java)を使ってみた

flume 本家サイト

fluentとの比較

ログを収集するこの手のツールは、scribe, flume, fluentなどいくつかある
それらの比較表を拾ってたのが下の図

flume-fluent-scribe.png

参考: http://blog.treasure-data.com/post/13047440992/fluentd-the-m...

この図を見ると、flumeの行数がすごい事になっている
ただここに書かれているのは、古いflumeなのでflume NGではない
(現在は0.9.Xまでの古いflumeはflume OGと呼び、あたらしい、1.0.0以降のflume NGをflumeと呼ぶらしい)

じゃぁ、flume NGではどうなのか?
ざっくり行数を調べてみる
  # wget http://ftp.riken.jp/net/apache/incubator/flume/flume-1.1.0-incubating/apache-flume-1.1.0-incubating.tar.gz
  # tar -zxvf apache-flume-1.1.0-incubating.tar.gz 
  # cd apache-flume-1.1.0-incubating
  # find . -type f | grep .java | xargs wc -l
  > 25804 total 
50000の半分ぐらい
さらに、テストコードを排除してみる
  # find . -type f | grep .java | grep -v Test | xargs wc -l
    >  18086 total
まぁ、ちょっと大きい
ちなみにfluentはどうなのか?
  # git clone https://github.com/fluent/fluentd.git
  # find . -type f | grep .rb | xargs wc -l
    > 9346 total
同様にテストコードを削除してみる
  # find . -type f | grep -v test |  grep .rb | xargs wc -l
    > 7404 total
ん、表に書いている事とだいぶ違うくない?
さらにさらに、どちらもプラグイン形式なのでコア部分だけを比較してみる
  # find . -type f | egrep "flume-ng-core|flume-ng-node" | grep .java |  grep -v Test | xargs wc -l
    > 9762 total 
  # find . -type f | grep -v test | grep -v plugin |  grep .rb | xargs wc -l
    > 3917 total  
という訳で、確かにfluentの方がコード量はすくないようだが、
比較表に書いてあるほどではなくなっている
flume NGの方がfluentより3倍弱程度コード量が多いといった感じ
まぁ、許せる範囲

flume NGのアーキテクチャ

flume NGは主にsource, channel(古いflumeではsink decoratorと
呼ばれていたものにバッファリング機能をつけたものに近い?), sinkの
3部構成になっていて、それぞれpluginがかけるようになっている
また、sourceで生成したログはeventという形でchannel、sinkへ渡されていく
このeventはヘッダと実データ部を持った単純な形式になっている
さらに、sourceとsinkはrunnerが処理しておりsourceに関しては、
event駆動のrunnerとpollingベースのrunnerが選べるsinkは、
pollingベースのrunnerになっている
そして、fluentの図と比べると、まぁ大枠はだいたい同じ
  • flume NG:
flume-arch.png
  • fluent:
fluent-image.png


大きな違いは何なのかというと、前述の表にあるように、古いflumeには
master serverがあり、設定を動的に更新したりできるという点だと思う
だが、flume NGがこの辺を実装完了しているかはまだ未確認
ぱっと見まだ実装されていないように見える
現在のflume NGは、まだ開発途中なので、古いflumeからpluginを
flume NGにimportしていたり、pluginを作る場合のapiに変更が
入ったりと、慌ただしい感じ。ドキュメントも十分に揃ってない

最初の一歩、flume NGを実際に動かしてみる

  • 1. 必要なもの
    • linux環境を用意する
    • javaのインストールする
    • flumeはmavenを使っているので、mavenをインストールする
    • flume-1.1.0.tar.gzをダウンロードする
  • 2. flume-1.1.0.tar.gzを展開したディレクトリに移動し、mavenでパッケージを作る
  # mvn package  -DskipTests
  (テストスキップしないと環境によっては失敗する事がある)
  • 3. flume-ng-dist/target/のしたにできたパッケージを展開する
  # cd flume-ng-dist/target/
  # tar -zxvf flume-ng-dist-1.1.0-incubating-dist.tar.gz  
  # cd flume-1.1.0-incubating 
  • 4. confディレクトリのflume-conf.properties.template, flume-env.sh.template をリネーム
  # cd conf
  # cp flume-env.sh.template  flume-env.sh
  # cp flume-conf.properties.template  flume-conf.properties
  # cd ..
  • 5. FLUME_CLASSパスを設定する
  # vi cond/flume-env.sh
  export FLUME_CLASSPATH="/root/download/apache-flume-1.1.0-incubating/flume-ng-dist/target/flume-1.1.0-incubating/lib"

  • 6. flume実行
  # JAVA_HOME=/usr/local/java/jdk1.7.0_03 bin/flume-ng node -n foo -c ./conf -f conf/flume-conf.properties
  (JAVA_HOMEが設定されていないなといけない)
  (この他の細かいパラメータは--help参照)
これでとりあず、flume NGが動く
内部で生成したログが出力されいるだけなのでCTL+Cで止めて、次へ

設定をみてみる

conf/flume-conf.properties が設定ファイルになっている
  --- conf/flume-conf.properties ---
  foo.sources = seqGenSrc
  foo.channels = memoryChannel
  foo.sinks = loggerSink
  foo.sources.seqGenSrc.type = seq
  foo.sources.seqGenSrc.channels = memoryChannel
  foo.sinks.loggerSink.type = logger
  foo.sinks.loggerSink.channel = memoryChannel
  foo.channels.memoryChannel.type = memory
  foo.channels.memoryChannel.capacity = 100
fooというのが実行コマンドの-nで指定したnode名になる
foo.sources, foo.channels, foo.sinksでfooノードが使うソース、
チャンネル、シンクの名前を指定する

foo.sources.seqGenSrc.type = seq で seqGenSrcにはseqを
使う事を示している
プラグインを使う場合はここで、クラスパスをしていすればいい

foo.sources.seqGenSrc.channels = memoryChannelは、
sourceからsinkへeventの橋渡しにmemoryChannelを使う事を示しており、
複数書いた場合は、複数のchannelを経てsinkに渡される事になる

同様に、foo.sinks.loggerSink.type = logger でloggerSinkには
loggerを使う事を示している
プラグインを使う場合はここで、クラスパスをしていすればいい

foo.sinks.loggerSink.channel = memoryChannelはsinkがデータを
受け取るチャンネルがmemoryChannelである事を示している

次も同様に foo.channels.memoryChannel.type = memory で
memoryChannelにはmemoryを使う事をしめしている
プラグインを使う場合はここで、クラスパスをしていすればいい

foo.channels.memoryChannel.capacity = 100 はmemoryの
キャパシティが100であることを示している

補足
typeは内部的に定義されているものと定義されていないものがあり、
定義されていないものはクラスパスを指定する事で利用できる

 設定を変えてみる

  • 1. sourceをsyslogに変えてみる
  # vi conf/flume-syslog-conf.properties

  foo.sources = syslogSrc
  foo.channels = memoryChannel
  foo.sinks = loggerSink

  foo.sources.syslogSrc.type = org.apache.flume.source.SyslogUDPSource
  foo.sources.syslogSrc.port = 514
  foo.sources.syslogSrc.host = 127.0.0.1
  foo.sources.syslogSrc.channels = memoryChannel

  foo.sinks.loggerSink.type = logger
  foo.sinks.loggerSink.channel = memoryChannel

  foo.channels.memoryChannel.type = memory
  foo.channels.memoryChannel.capacity = 100

  # JAVA_HOME=/usr/local/java/jdk1.7.0_03 bin/flume-ng node -n foo -c ./conf -f conf/flume-syslog-conf.properties

  (別の端末などで)
  # netstat -an

  Active Internet connections (servers and established)
  Proto Recv-Q Send-Q Local Address           Foreign Address         
  ...
  udp6       0      0 127.0.0.1:514           :::*                               

  (514をバインドしている事が分かる)

  • 2. sinkをfile_rollに変えてみる
  # vi conf/flume-syslog-conf.properties

  foo.sources = syslogSrc
  foo.channels = memoryChannel
  foo.sinks = fileSink

  foo.sources.syslogSrc.type = org.apache.flume.source.SyslogUDPSource
  foo.sources.syslogSrc.port = 514
  foo.sources.syslogSrc.host = 127.0.0.1
  foo.sources.syslogSrc.channels = memoryChannel

  foo.sinks.fileSink.type = file_roll
  foo.sinks.fileSink.sink.directory = /var/tmp
  foo.sinks.fileSink.sink.rollInterval = 300
  foo.sinks.fileSink.channel = memoryChannel

  foo.channels.memoryChannel.type = memory
  foo.channels.memoryChannel.capacity = 100

  # JAVA_HOME=/usr/local/java/jdk1.7.0_03 bin/flume-ng node -n foo -c ./conf -f conf/flume-syslog-conf.properties

 (ログメッセージでなにやらファイルを作った風なログが出る)
  2012-04-08 06:43:46,076 (SinkRunner-PollingRunner-DefaultSinkProcessor) [DEBUG - org.apache.flume.sink.RollingFileSink.process(RollingFileSink.java:140)] Opening output stream for file /var/tmp/1333835026070-1

  # ls /var/tmp/1333835026070-1
  /var/tmp/1333835026070-1
  (ファイルができてる事が分かる)

とまぁ、こんな感じで自分に必要な設定に変えてやればいい
で、そもそも論として、設定項目はどうやったら分かるのかというと、
各source, channel, sinkのconfigureメソッドをみると分かる
多分、今の段階ではそれが一番手っ取り早い

例えば、irc sink (IRCSink.java) であれば、
  public void configure(Context context) {
    hostname = context.getString("hostname");
    String portStr = context.getString("port");
    nick = context.getString("nick");
    password = context.getString("password");
    user = context.getString("user");
    name = context.getString("name");
    chan = context.getString("chan");
    splitLines = context.getBoolean("splitlines");
    splitChars = context.getString("splitchars");

    if (portStr != null) {
      port = Integer.parseInt(portStr);
    } else {
      port = DEFAULT_PORT;
    }

    if (splitChars == null) {
      splitChars = DEFAULT_SPLIT_CHARS;
    }
    
    Preconditions.checkState(hostname != null, "No hostname specified");
    Preconditions.checkState(nick != null, "No nick specified");
    Preconditions.checkState(chan != null, "No chan specified");
  }
というように、hostname, port, nick, password等々
いろいろ設定が必要な事が分かる

プラグインを作成してみる

次回につづく

その他参考資料