▼ 2012/04/10(火) flume NGのプラグインを作成してみた
前回のつづき
■ flume NGのpluginを作成してみた
- 1. eclipseにflumeMyPluginというプロジェクトを新たに作る
- 2. 前回FLUME_CLASSPATHにしていしたディレクトリないのjarをコピーしてきてeclipseのビルドパスに通す。
# scp -r /root/download/apache-flume-1.1.0-incubating/flume-ng-dist/target/flume-1.1.0-incubating/lib worker@workhost: コピーしたlibをD&DでflumetMyPluginにコピーして、コピーしたjarをbuild pathへ登録する (全部じゃなくてもいいけど、面倒なので全部がお手軽)
- 3. srcのtestパッケージに新規にFlumeMySourceというclassファイルを作成
package test; import java.util.HashMap; import java.util.Map; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.EventDeliveryException; import org.apache.flume.PollableSource; import org.apache.flume.conf.Configurable; import org.apache.flume.event.EventBuilder; import org.apache.flume.source.AbstractSource; public class FlumeMySource extends AbstractSource implements PollableSource, Configurable { String msg = null; @Override public void configure(Context context) { msg = context.getString("message"); } @Override public void start() { // 開始処理 } @Override public void stop() { // 停止処理 } @Override public Status process() throws EventDeliveryException { // headerを作成 Map<String, String> headers = new HashMap<String, String>(); headers.put("hoge", "fuga"); // bodyを作成 String body = "hello " + msg + "!!"; // イベントの作成 Event e = EventBuilder.withBody(body.getBytes(), headers); // イベントをチャンネルへ送る getChannelProcessor().processEvent(e); // 1秒待つ try { Thread.sleep(1000); } catch (InterruptedException ex) { throw new EventDeliveryException("fail", ex); } return Status.READY; } }
- 4. propertiesというファイルで設定ファイルを作り、FlumeMySourceを設定する
foo.sources = mySource foo.channels = memoryChannel foo.sinks = loggerSink foo.sources.mySource.type = test.FlumeMySource foo.sources.mySource.message = world foo.sources.mySource.channels = memoryChannel foo.sinks.loggerSink.type = logger foo.sinks.loggerSink.channel = memoryChannel foo.channels.memoryChannel.type = memory foo.channels.memoryChannel.capacity = 100
- 5. 実行設定
- run configurationでjava applicationを選ぶ
- Main classで org.apache.flume.node.Application を指定する
- argumentsで -n foo -f properties を指定する
- 6. 実行してみる
12/04/10 04:46:50 INFO sink.LoggerSink: Event: { headers:{hoge=fuga} body:[B@80d3d6f } 12/04/10 04:46:51 INFO sink.LoggerSink: Event: { headers:{hoge=fuga} body:[B@48ee22f7 } 12/04/10 04:46:52 INFO sink.LoggerSink: Event: { headers:{hoge=fuga} body:[B@a39ab89 } 12/04/10 04:46:53 INFO sink.LoggerSink: Event: { headers:{hoge=fuga} body:[B@2993a66f } 12/04/10 04:46:54 INFO sink.LoggerSink: Event: { headers:{hoge=fuga} body:[B@2df6df4c } 12/04/10 04:46:55 INFO sink.LoggerSink: Event: { headers:{hoge=fuga} body:[B@2d5253d5 } 12/04/10 04:46:56 INFO sink.LoggerSink: Event: { headers:{hoge=fuga} body:[B@77fddc31 } 12/04/10 04:46:57 INFO sink.LoggerSink: Event: { headers:{hoge=fuga} body:[B@3b835282 }headersにhoge=fugaが入っている事が分かる。
ただ、これだとbodyの中身が分からないのでログメッセージをもう少しかっこ良く出力するsinkが欲しくなる
- 7. srcのtestパッケージに新規にFlumeMySinkというclassファイルを作成
package test; import java.util.Map.Entry; import org.apache.flume.Channel; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.EventDeliveryException; import org.apache.flume.Transaction; import org.apache.flume.conf.Configurable; import org.apache.flume.sink.AbstractSink; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class FlumeMySink extends AbstractSink implements Configurable { private static final Logger logger = LoggerFactory .getLogger(FlumeMySink.class); @Override public void configure(Context context) { // 設定処理 } @Override public void start() { // 開始処理 } @Override public void stop() { // 停止処理 } @Override public Status process() throws EventDeliveryException { Status result = Status.READY; // チャンネルを取得 Channel channel = getChannel(); // チャンネルのトランザクションを取得 Transaction transaction = channel.getTransaction(); Event e = null; try { transaction.begin(); // チャンネルからイベントを取得 e = channel.take(); if (e != null) { if (logger.isInfoEnabled()) { // ログを作る StringBuilder sb = new StringBuilder(); for(Entry<String, String> entry : e.getHeaders().entrySet()){ sb.append(':').append(entry.getKey()).append('=').append(entry.getValue()); } sb.append(":body=").append(new String(e.getBody())); logger.info(sb.toString()); } } else { result = Status.BACKOFF; } transaction.commit(); } catch (Exception ex) { transaction.rollback(); throw new EventDeliveryException("fail", ex); } finally { transaction.close(); } return result; } }
- 8. propertiesにFlumeMySinkを設定する
foo.sources = mySource foo.channels = memoryChannel foo.sinks = mySink foo.sources.mySource.type = test.FlumeMySource foo.sources.mySource.message = world foo.sources.mySource.channels = memoryChannel foo.sinks.mySink.type = test.FlumeMySink foo.sinks.mySink.channel = memoryChannel foo.channels.memoryChannel.type = memory foo.channels.memoryChannel.capacity = 100
- 9. 実行する
12/04/10 05:18:51 INFO test.FlumeMySink: :hoge=fuga:body=hello world!! 12/04/10 05:18:52 INFO test.FlumeMySink: :hoge=fuga:body=hello world!! 12/04/10 05:18:53 INFO test.FlumeMySink: :hoge=fuga:body=hello world!! 12/04/10 05:18:54 INFO test.FlumeMySink: :hoge=fuga:body=hello world!! 12/04/10 05:18:55 INFO test.FlumeMySink: :hoge=fuga:body=hello world!!
■ 補足
sinkにgetTransactionというメソッドがあるが、transactionはchannnelに実装されていて、sinkがデータを取り出してから失敗した場合に、transaction.rollback()を呼ぶ事で、ロールバックされるようになっている。
また、processメソッドで、Statusを返すようになっているが、
BACKOFFステータースを返すと、runnerがポーリングでwaitを入れるようになる。
繰り返しBACKOFFを返すと、一定時間まで少しずつwaitが入る時間がのびるようだ。
(READYを返した時点でwaitはしなくなる)
let's enjoy flume!!