ようこそゲストさん

無能日記

2012/04/10(火) flume NGのプラグインを作成してみた

はてブ 2012/04/10 5:41 R&D (flume)poti
前回のつづき

flume NGのpluginを作成してみた

  • 1. eclipseにflumeMyPluginというプロジェクトを新たに作る

  • 2. 前回FLUME_CLASSPATHにしていしたディレクトリないのjarをコピーしてきてeclipseのビルドパスに通す。
 # scp -r /root/download/apache-flume-1.1.0-incubating/flume-ng-dist/target/flume-1.1.0-incubating/lib worker@workhost:
 コピーしたlibをD&DでflumetMyPluginにコピーして、コピーしたjarをbuild pathへ登録する
 (全部じゃなくてもいいけど、面倒なので全部がお手軽)

  • 3. srcのtestパッケージに新規にFlumeMySourceというclassファイルを作成
以下のように編集
package test;

import java.util.HashMap;
import java.util.Map;

import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.PollableSource;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.EventBuilder;
import org.apache.flume.source.AbstractSource;

public class FlumeMySource extends AbstractSource implements PollableSource,
		Configurable {
	String msg = null;

	@Override
	public void configure(Context context) {
		msg = context.getString("message");
	}

	@Override
	public void start() {
		// 開始処理
	}

	@Override
	public void stop() {
		// 停止処理
	}

	@Override
	public Status process() throws EventDeliveryException {
		// headerを作成
		Map<String, String> headers = new HashMap<String, String>();
		headers.put("hoge", "fuga");
		// bodyを作成
		String body = "hello " + msg + "!!";
		// イベントの作成
		Event e = EventBuilder.withBody(body.getBytes(), headers);
		// イベントをチャンネルへ送る
		getChannelProcessor().processEvent(e);
		// 1秒待つ
		try {
			Thread.sleep(1000);
		} catch (InterruptedException ex) {
			throw new EventDeliveryException("fail", ex);
		}
		return Status.READY;
	}
}

  • 4. propertiesというファイルで設定ファイルを作り、FlumeMySourceを設定する
foo.sources = mySource
foo.channels = memoryChannel
foo.sinks = loggerSink

foo.sources.mySource.type = test.FlumeMySource
foo.sources.mySource.message = world
foo.sources.mySource.channels = memoryChannel

foo.sinks.loggerSink.type = logger
foo.sinks.loggerSink.channel = memoryChannel

foo.channels.memoryChannel.type = memory
foo.channels.memoryChannel.capacity = 100

  • 5. 実行設定
    • run configurationでjava applicationを選ぶ
    • Main classで org.apache.flume.node.Application を指定する
    • argumentsで -n foo -f properties を指定する

  • 6. 実行してみる
以下のような出力が得られる
12/04/10 04:46:50 INFO sink.LoggerSink: Event: { headers:{hoge=fuga} body:[B@80d3d6f }
12/04/10 04:46:51 INFO sink.LoggerSink: Event: { headers:{hoge=fuga} body:[B@48ee22f7 }
12/04/10 04:46:52 INFO sink.LoggerSink: Event: { headers:{hoge=fuga} body:[B@a39ab89 }
12/04/10 04:46:53 INFO sink.LoggerSink: Event: { headers:{hoge=fuga} body:[B@2993a66f }
12/04/10 04:46:54 INFO sink.LoggerSink: Event: { headers:{hoge=fuga} body:[B@2df6df4c }
12/04/10 04:46:55 INFO sink.LoggerSink: Event: { headers:{hoge=fuga} body:[B@2d5253d5 }
12/04/10 04:46:56 INFO sink.LoggerSink: Event: { headers:{hoge=fuga} body:[B@77fddc31 }
12/04/10 04:46:57 INFO sink.LoggerSink: Event: { headers:{hoge=fuga} body:[B@3b835282 }
headersにhoge=fugaが入っている事が分かる。
ただ、これだとbodyの中身が分からないのでログメッセージをもう少しかっこ良く出力するsinkが欲しくなる

  • 7. srcのtestパッケージに新規にFlumeMySinkというclassファイルを作成
以下のように編集する(loggerSinkとほとんど同じ)
package test;

import java.util.Map.Entry;

import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.Transaction;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FlumeMySink extends AbstractSink implements Configurable {

	private static final Logger logger = LoggerFactory
			.getLogger(FlumeMySink.class);

	@Override
	public void configure(Context context) {
		// 設定処理
	}

	@Override
	public void start() {
		// 開始処理
	}

	@Override
	public void stop() {
		// 停止処理
	}

	@Override
	public Status process() throws EventDeliveryException {
		Status result = Status.READY;
                // チャンネルを取得
		Channel channel = getChannel();
                // チャンネルのトランザクションを取得
		Transaction transaction = channel.getTransaction();
		Event e = null;

		try {
			transaction.begin();
                        // チャンネルからイベントを取得
			e = channel.take();
			if (e != null) {
				if (logger.isInfoEnabled()) {
                                        // ログを作る
					StringBuilder sb = new StringBuilder();
					for(Entry<String, String> entry : e.getHeaders().entrySet()){
						sb.append(':').append(entry.getKey()).append('=').append(entry.getValue());
					}
					sb.append(":body=").append(new String(e.getBody()));
					logger.info(sb.toString());
				}
			} else {
				result = Status.BACKOFF;
			}
			transaction.commit();
		} catch (Exception ex) {
			transaction.rollback();
			throw new EventDeliveryException("fail", ex);
		} finally {
			transaction.close();
		}

		return result;
	}
}

  • 8. propertiesにFlumeMySinkを設定する
foo.sources = mySource
foo.channels = memoryChannel
foo.sinks = mySink

foo.sources.mySource.type = test.FlumeMySource
foo.sources.mySource.message = world
foo.sources.mySource.channels = memoryChannel

foo.sinks.mySink.type = test.FlumeMySink
foo.sinks.mySink.channel = memoryChannel

foo.channels.memoryChannel.type = memory
foo.channels.memoryChannel.capacity = 100
  • 9. 実行する
以下のような出力が得られたら成功
12/04/10 05:18:51 INFO test.FlumeMySink: :hoge=fuga:body=hello world!!
12/04/10 05:18:52 INFO test.FlumeMySink: :hoge=fuga:body=hello world!!
12/04/10 05:18:53 INFO test.FlumeMySink: :hoge=fuga:body=hello world!!
12/04/10 05:18:54 INFO test.FlumeMySink: :hoge=fuga:body=hello world!!
12/04/10 05:18:55 INFO test.FlumeMySink: :hoge=fuga:body=hello world!!

補足

sinkにgetTransactionというメソッドがあるが、transactionはchannnelに
実装されていて、sinkがデータを取り出してから失敗した場合に、transaction.rollback()を呼ぶ事で、ロールバックされるようになっている。
また、processメソッドで、Statusを返すようになっているが、
BACKOFFステータースを返すと、runnerがポーリングでwaitを入れるようになる。
繰り返しBACKOFFを返すと、一定時間まで少しずつwaitが入る時間がのびるようだ。
(READYを返した時点でwaitはしなくなる)


let's enjoy flume!!