RaspberryPiでfluentdを使ってDynamoDBに書き込む

概要

本コンテンツは、JAWS-UG関西IoT専門支部主催のハンズオン向けに作成したものです。
下図の通り、IoTデバイス(RaspberryPi)でセンシングしたデータをリアルタイムで可視化することが目標です。
通信にはWi-Fiまたは3Gを、データベースにはAWS DynamoDBを、可視化にはIoT.kyoto VISという無料サービスを利用します。

989c03b4-4479-373a-b785-2b08c5b35bc1.png
LinuxやAWSの基礎的なスキルを有している方向けの<上級編>はこちら

前提条件

本コンテンツは、「Raspberry Pi でSORACOM 接続+MQTTをしてみよう 後編」が完了している方を対象としており、下記の前提条件については説明いたしません。

  • IoTデバイスには、最新のRaspbianがインストールされているRaspberryPi 2 Model Bを使用
  • RaspberryPiがインターネットに接続できること(Wi-Fi/3G)
  • RaspberryPiにPCからSSHで接続できること
  • RaspberryPiに照度センサーが接続済みで、センサーログをJSON形式でテキストファイルに保存できること

また、AWSアカウントが必要です。

今回利用するツール/サービス

fluentd

fluentdとは、米トレジャーデータ社が中心となり、オープンソースとして開発を行っているログ収集ミドルウェアです。いわゆるPub-Subモデルによりログの配送経路を柔軟に制御することが可能で、データのinputとoutputはプラグインという形で実装されています。サーバ等のLOG収集に使われることが多いですが、IoTデバイスのストリームデータの収集にも向いています。

さくらのナレッジの「柔軟なログ収集を可能にする「fluentd」入門」がよくまとまっていて分かりやすいです。

今回はRaspberryPiにfluentdをインストールし、下図のように利用します。
fluentd.PNG

AWS DynamoDB

AWS DynamoDBとは、アマゾン ウェブ サービスが提供する完全マネージド型のNoSQLデータベース(いわゆるKVS)です。IoTと非常に親和性が高く、下記のような特徴があります。

  • サイジング不要、I/Oスループットのみ指定する
  • スキーマレス、KEYが増えてもマイグレーションしなくていい
  • SQLではなくAPIで利用する。ストリームデータを連続的に書き込みつつ即座に読み出すようなことは得意領域。一方、SQLが得意とする複雑な処理は苦手
  • 排他制御やトランザクション処理はできない(ライブラリを利用すれば可能)が、その分高速
  • テーブル内のデータの追加・変更をトリガーにコードを実行することができる(DynamoDB Streams -> Lambda)
  • indexの張り方に失敗すると痛い目に遭う。LSI/GSIの使い分けが難しい
  • 1回のQueryで取得できるデータが約1MBに制限されている

詳しく知りたい方はBlack Belt Tech シリーズ開発者ガイドを読んでみてください。

ハンズオン終了後にテーブルを削除する手順

テーブルを選択して、[Actions] -> [Delete table]

DynamoDB · AWS Console 2016-04-02 16-15-05.png

IoT.kyoto VIS

IoT.kyoto VISとは、株式会社KYOSOが提供するDynamoDB特化のグラフ化Webサービスで、リアルタイムでのグラフ表示に強みがあります。
このハンズオンのために私とjsエンジニアの2名で立ち上げました( ー`дー´)キリッ (半分冗談、半分本気w)

[ToDo 0] RaspberryPiの準備

照度計回路を準備する

「Raspberry Pi でSORACOM 接続+MQTTをしてみよう 後編」で作製した回路をそのまま利用します。未作製の方は「RaspberryPiで照度計をつくろう」にしたがって回路を作製してRaspberryPiに接続して下さい。必要なパーツはこちらを参照してください。

インターネット接続確認とRaspbianのアップデート

RaspberryPiをインターネットに接続し、PCから有線LANまたはWi-Fi経由でSSH接続します。
RaspberryPiのIPアドレスが分からない場合は、こちらを参考にGUIやifconfigコマンド等で調べてください。下記の場合、192.168.0.209がRaspberryPiのIPアドレス(Wi-Fi)であることが分かります。
Wi-Fiを利用するのにwlan0にIPアドレスが割り当てられていない場合は、こちらを参考にWi-Fiアクセスポイントへの接続設定を行って下さい。

$ ifconfig
wlan0 Link encap:Ethernet HWaddr cc:e1:d5:4d:59:5e
inet addr:192.168.0.209 Bcast:192.168.0.255 Mask:255.255.255.0
inet6 addr: 2001:268:d006:d652::2/128 Scope:Global

wgetコマンドで適当なURLを打ってみればインターネットに接続できているか確認ができます。

$ wget https://google.co.jp
--2016-03-23 17:53:19-- https://google.co.jp/
Resolving google.co.jp (google.co.jp)... 2404:6800:4003:809::2003, 106.162.198.104, 106.162.198.123, ...
Connecting to google.co.jp (google.co.jp)|2404:6800:4003:809::2003|:443... connected.
HTTP request sent, awaiting response... 301 Moved Permanently
Location: https://www.google.co.jp/ [following]
--2016-03-23 17:53:30-- https://www.google.co.jp/
Resolving www.google.co.jp (www.google.co.jp)... 2404:6800:4007:805::2003, 106.162.192.173, 106.162.192.167, ...
Connecting to www.google.co.jp (www.google.co.jp)|2404:6800:4007:805::2003|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: unspecified [text/html]
Saving to: ‘index.html’

index.html [ <=> ] 18.90K --.-KB/s in 0.1s

2016-03-23 17:53:32 (130 KB/s) - ‘index.html’ saved [19358]

アップデートとvimのインストールを行います。

$ sudo apt-get update
$ sudo apt-get install -y vim

[ToDo 1] 照度計回路の動作を確認する

下記Pythonコードを実行して、照度センサーの値が取得できることを確認します。
正常に動作すれば、~/shodo-logs/shodo-dynamo.logにJSONが記録され、標準出力にも表示されます。
※ piユーザーでログインしている前提です

$ cd ~
$ mkdir shodo-logs
$ vim shodo-dynamo.py
#coding: utf-8

# spi, time ライブラリをインポート
import spidev
import time
# コマンド実行ライブラリ
import subprocess
# センサーデータのJSON化のためにインポート
import json
from datetime import datetime

# SpiDev オブジェクトのインスタンスを生成
spi = spidev.SpiDev()
# ポート0、デバイス0のSPI をオープン
spi.open(0, 0)
# 最大クロックスピードを1MHz に設定
spi.max_speed_hz=1000000
# 1 ワードあたり8ビットに設定
spi.bits_per_word=8

# ダミーデータを設定(1111 1111)
dummy = 0xff
# スタートビットを設定(0100 0111)
start = 0x47
# シングルエンドモードを設定 (0010 0000)
sgl = 0x20
# ch0 を選択(0000 0000)
ch0 = 0x00
# ch1 を選択(0001 0000)
ch1 = 0x10
# MSB ファーストモードを選択(0000 1000)
msbf = 0x08
# IC からデータを取得する関数を定義
def measure(ch):
    # SPI インターフェイスでデータの送受信を行う
    ad = spi.xfer2( [ (start + sgl + ch + msbf), dummy ] )
    #
    val = ((ad[0] & 0x03) << 8) + ad[1]
    # 受信した2バイトのデータを10 ビットデータにまとめる
    voltage =  ( val * 3.3 ) / 1023
    # 結果を返す
    return val, voltage
try:
    # 無限ループ
    while 1:
        # 関数を呼び出してch1 のデータを取得
        ch1_val, ch1_voltage  = measure(ch1)
        # 結果を表示
        #print  'ch1 = {:4d}, {:2.2f}[V]'.format(ch1_val, ch1_voltage)
        # センサーデータをJSON化するため辞書に入れる。後々のデータ処理に用いる
        # JSONデータ構造 {"brightness": ch1_val, "ID": "id001", "time_sensor": "2015-10-15 16:21:56"}
        # "ID": "id001"の001の部分はデバイスによって変える
        json_data = {"time_sensor": datetime.now().strftime('%Y-%m-%d %H:%M:%S'), "ID": "id001", "brightness": ch1_val}
        encode_json_data = json.dumps(json_data)
        #json_dataを標準出力し、ログファイルに追記
        print encode_json_data
        f = open("/home/pi/shodo-logs/shodo-dynamo.log","a")
        f.write(encode_json_data)
        f.write("\n")
        f.close()
        # 1秒待つ
        time.sleep(1)

# キーボード例外を検出
except KeyboardInterrupt:
    # 何も処理をしない
    pass

# SPI を開放
spi.close()

照度センシングPythonコードを実行

$ python shodo-dynamo.py
{"brightness": 164, "ID": "id000", "time_sensor": "2016-03-28 15:16:48"}
{"brightness": 692, "ID": "id000", "time_sensor": "2016-03-28 15:16:49"}
{"brightness": 720, "ID": "id000", "time_sensor": "2016-03-28 15:16:50"}
{"brightness": 945, "ID": "id000", "time_sensor": "2016-03-28 15:16:51"}
{"brightness": 9, "ID": "id000", "time_sensor": "2016-03-28 15:16:52"}
{"brightness": 4, "ID": "id000", "time_sensor": "2016-03-28 15:16:53"}
# [ctrl + C]で停止
$ tail ~/shodo-logs/shodo-dynamo.log

[ToDo 2] APIアクセス用のKeyを作成する

DynamoDBにAPIでアクセスするために、IAM Access Keyを作成します。 今回作成するのは下記にある2つのユーザーおよびそれにひも付くKeyです。

ユーザー名 アクセス権 使用箇所
dynamo-full AmazonDynamoDBFullAccess RaspberryPiからDynamoDBへの書き込み
dynamo-read AmazonDynamoDBReadOnlyAccess IoT.kyoto VISからDynamoDBへの読み出し
  • AWSにサインインする
  • Identity and Access Management(IAM)コンソールを開く
    ※ 英語画面で説明するので日本語表示になっている場合は、画面左下の言語選択を「English」に変更する

    AWS Management Console 2017-02-05 15-21-17.png

  • 「dynamo-full」という名前でProgrammatic accessユーザーを作成する

    IAM Management Console 2017-02-05 15-25-26.png

    IAM Management Console 2017-02-15 22-18-07.png

  • [Attach existing policies directly]をクリックし、[dynamo]で検索して、[AmazonDynamoDBFullAccess]にチェックを入れる
  • [Next: Review]をクリックする

    IAM Management Console 2017-02-15 22-20-06.png

  • [Create user]をクリックする

    IAM Management Console 2017-02-05 15-38-24.png

  • 認証情報の書かれたCSVをダウンロードし、[Close]をクリックする

    IAM Management Console 2017-02-05 15-41-43.png

  • 同様に「dynamo-read」ユーザーを作成してIDとKeyを取得し、[AmazonDynamoDBReadOnlyAccess]の権限をアタッチする

[ToDo 3] fluentdのインストール

RaspberryPiにfluentdをインストールします。

fluentd本体のインストール

プラグインとの適合性を考慮して、0.12系の最新版をインストールします。

$ sudo apt-get install ruby-dev libssl-dev
$ sudo gem install fluentd -v "~> 0.12.0" --no-ri --no-rdoc

DynamoDBプラグインのインストール

$ sudo fluent-gem install fluent-plugin-dynamodb --no-ri --no-rdoc

本プラグインのGitHubリポジトリ

DynamoDBプラグインのソース修正

デフォルトのままではリージョンが[us-east-1]になっていますので、[ap-northeast-1]に書き換えます。
プラグインのバージョンが下記のパスと異なる場合は、ファイル名をTABキーで補完してください。

$ sudo vim /var/lib/gems/2.1.0/gems/fluent-plugin-dynamodb-0.1.11/lib/fluent/plugin/out_dynamodb.rb
#29行目あたり
# config_param :dynamo_db_region, :string, default: ENV["AWS_REGION"] || "us-east-1"-コメントアウトまたは削除

config_param :dynamo_db_region, :string, default: ENV["AWS_REGION"] || "ap-northeast-1"

インストールの確認

バージョンは若干異なるかもしれません。

$ gem list fluent

*** LOCAL GEMS ***

fluent-plugin-dynamodb (0.1.11)
fluentd (0.12.29, 0.10.61)

[ToDo 4] fluentdの設定

設定ファイルとログファイルの作成

AWSのKeyには、[ToDo 2]で作成した「dynamo-full」ユーザーのKeyを使用します。**********************の部分を書き換えて下さい。

$ sudo touch /home/pi/shodo-logs/shodo-dynamo.pos
$ sudo mkdir /fluent
$ sudo touch /fluent/fluent.log
$ sudo vim /fluent/fluent.conf
<source>
  # inputにtailプラグインを指定
  @type tail
  # フォーマットを指定
  format json
  # ログファイルをフルパスで指定
  path /home/pi/shodo-logs/shodo-dynamo.log
  # ファイル内のどの行までを読んだかを記録しておくファイルを指定
  pos_file /home/pi/shodo-logs/shodo-dynamo.pos
  # 分かりやすいタグを指定
  tag dynamodb.shodo
</source>

# 以下のタグにマッチしたものだけ処理される
<match dynamodb.**>
  # outputにDynamoDBプラグインを指定
  @type dynamodb
  # AWSのKeyを指定
  aws_key_id **********************
  aws_sec_key **********************
  # DynamoDBのリージョンを指定
  dynamo_db_endpoint https://dynamodb.ap-northeast-1.amazonaws.com
  # DynamoDBのテーブルを指定
  dynamo_db_table fluent-shodo
  # flush間隔(送信間隔)を指定
  flush_interval 1s
</match>

テスト

[error]の行がないか確認します。

$ sudo fluentd -c /fluent/fluent.conf --dry-run
2016-09-04 12:16:31 +0900 [info]: reading config file path="/fluent/fluent.conf"
2016-09-04 12:16:31 +0900 [info]: starting fluentd-0.12.29 as dry run mode
2016-09-04 12:16:31 +0900 [info]: gem 'fluent-plugin-dynamodb' version '0.1.11'
2016-09-04 12:16:31 +0900 [info]: gem 'fluentd' version '0.12.29'
2016-09-04 12:16:31 +0900 [info]: gem 'fluentd' version '0.10.61'
2016-09-04 12:16:31 +0900 [info]: adding match pattern="dynamodb.**" type="dynamodb"
2016-09-04 12:16:32 +0900 [info]: adding source type="tail"
2016-09-04 12:16:32 +0900 [info]: using configuration file: <ROOT>
  <source>
    @type tail
    format json
    path "/home/pi/shodo-logs/shodo-dynamo.log"
    pos_file "/home/pi/shodo-logs/shodo-dynamo.pos"
    tag "dynamodb.shodo"
  </source>
  <match dynamodb.**>
    @type dynamodb
    aws_key_id "**********************"
    aws_sec_key "**********************"
    dynamo_db_endpoint "dynamodb.ap-northeast-1.amazonaws.com"
    dynamo_db_table "fluent-shodo"
    flush_interval 1s
    <buffer>
      flush_mode interval
      retry_type exponential_backoff
      flush_interval 1s
    </buffer>
  </match>
</ROOT>

[ToDo 5] DynamoDBのテーブルを作成する

  • DynamoDBコンソールを開く
    ※ 英語画面で説明するので日本語表示になっている場合は、画面左下の言語選択を「English」に変更する

AWS Management Console 2017-02-15 22-15-58.png

  • リージョンが「Asia Pacific (Tokyo)」でない場合は変更する

DynamoDB · AWS Console 2016-04-02 16-55-11.png

  • [Create table]をクリックする

DynamoDB · AWS Console 2016-04-01 23-02-21.png

  • [Table name]に先ほどfluent.confで指定したテーブル名を入力する
  • [Partition key]にデバイスを識別するKey(今回は「ID」)を入力する
  • [Add sort key]にチェックを入れ、タイムスタンプのKey(今回は「time_sensor」)を入力する
  • [Create]をクリックする

DynamoDB · AWS Console 2016-04-01 23-05-56.png
– テーブル作成が終わるのを待つ

[ToDo 6] DynamoDBにセンサーログを送信する

fluentdを実行する

$ sudo fluentd -c /fluent/fluent.conf -v -o /fluent/fluent.log &
[1] 20284

正常に実行されているかどうかは/fluent/fluent.logを確認します。
※「20284」はPIDなので、値は実行の度に変わります

Pythonコードを実行する

$ python ~/shodo-dynamo.py
{"brightness": 169, "ID": "id000", "time_sensor": "2016-03-28 18:45:25"}
{"brightness": 171, "ID": "id000", "time_sensor": "2016-03-28 18:45:26"}
{"brightness": 171, "ID": "id000", "time_sensor": "2016-03-28 18:45:27"}
{"brightness": 170, "ID": "id000", "time_sensor": "2016-03-28 18:45:28"}

DynamoDBに正常にPutできているか確認する

  • DynamoDBコンソールを開く
    ※ 英語画面で説明するので日本語表示になっている場合は、画面左下の言語選択を「English」に変更する

AWS Management Console 2016-04-01 22-50-43.png

  • [Tables] -> (作成したテーブル) -> [Items]の順にクリックする

DynamoDB · AWS Console 2016-04-01 23-18-32.png

  • [Query]を選択する
  • [Partition key]に現在送信中のデバイスIDを入力する
  • [Descending]を選択する
  • [Start search]をクリックする度に最新のレコードが追加されていくのがわかる

DynamoDB · AWS Console 2016-04-01 23-25-44.png

[ToDo 7] IoT.kyoto VISでグラフ表示する

ブラウザはChrome(Win/Mac)またはSafari(Mac)をお使い下さい。

サインアップ

https://vis.iot.kyoto にアクセスし、「新規登録」をクリックします。

設定

IoT.kyoto VIS公式マニュアルの3~4章を参照。
AWSのKeyには、[ToDo 2]で作成した「dynamo-read」ユーザーのKeyを使用します。

いろいろ試してみる

[レンジ設定]を[手動]に変更して0~1000くらいに設定するとグラフが見やすくなります。下記のようなことを試してみましょう。

  • フォトトランジスタにライトを当てたり手で覆ったりして、グラフの変化や描画までの遅延を確認する
  • 閾値超えのアラートメールが届くか試してみる
  • 日時指定で過去の履歴を表示してみる
  • 別windowでターミナルを開き、デバイスIDを変えたPythonコードを実行(擬似的に複数デバイスが接続された状態となる)して、IoT.kyoto VISの画面で2個のグラフを同時に表示してみる
  • データの送信間隔を変えてみる(「グラフ設定(歯車)」でグラフの更新間隔を変えることができます)

スクリーンショット 2016-04-02 16.39.38.png

[ToDo 8] 自動実行設定

RaspberryPi電源投入時に各種処理を自動実行させるようにします。自動実行の手段はいくつかありますが、/etc/rc.localにスクリプトを記述する方法が一番手軽で確実です。今回は/etc/rc.localには親スクリプト1つだけを登録し、親スクリプトに子スクリプトを記述します。

親スクリプト

$ sudo touch /fluent/start.sh
$ sudo vim /etc/rc.local

fiexit 0の間に追記

_IP=$(hostname -I) || true
if [ "$_IP" ]; then
printf "My IP address is %s\n" "$_IP"
fi

sudo sh /fluent/start.sh # 親スクリプト

exit 0

子スクリプト

fluentd起動

RaspberryPiは内部クロックのバッテリバックアップをもっておらず、起動都度NTPで時刻合わせを行っています。一方、fluentd起動時に時刻が合っていないとDynamoDBプラグイン読込みでエラーが発生するため、30~60秒程度待ってから起動するとよいでしょう。3Gドングルを利用する場合は、スリープ時間を少し長めにとった方が確実です。

$ sudo vim /fluent/fluent_start.sh
#!/bin/sh
sleep 30s
fluentd -c /fluent/fluent.conf -v -o /fluent/fluent.log &
親スクリプトへの記述

SORACOM自動接続は、必要な人のみ記述して下さい。自動接続に失敗する場合は、start.shの1行目と2行目の間にsleep 10sを挿入してみましょう。

$ sudo vim /fluent/start.sh
#!/bin/sh
sh /fluent/fluent_start.sh & # fluentd起動スクリプト
sh /opt/sora/connect_air.sh & # SORACOM自動接続
python /home/pi/shodo-dynamo.py & # 照度センサーPythonコード

動作確認

再起動後、しばらく待ってIoT.kyoto VISのグラフが表示されることを確かめましょう。もし、表示されない場合は、DynamoDBのテーブル更新状況や各種ログを調べてデバッグします。

$ sudo reboot

[ToDo 9] シャットダウンボタンをつくろう

自動起動ができるようになったら、シャットダウンのためにSSHでログインするのは面倒ですよね! シャットダウンのための物理スイッチを作ってPCレス運用ができるようにしましょう。
今回は、物理スイッチはパーツリストに入れていませんでしたので、ジャンパワイヤで代用します。物理スイッチを購入するのであれば、ブレッドボードに挿さる6mm程度のタクトスイッチを選べばよいでしょう。
http://www.amazon.co.jp/dp/B00H3CVRGY

配線

GPIO 23(16番ピン)、GND(14番ピン等)にそれぞれジャンパワイヤを繋ぎ、下図の通り、タクトスイッチの代わりにオス-オスのジャンパワイヤで短絡できるようにします。

IMG_1547.png

Pythonコード

コードを実行した後に、ジャンパワイヤで4秒以上短絡させてみましょう。now power-off !と表示された後にシャットダウンされるはずです。

$ cd ~
$ vim sw.py
$ sudo python sw.py
#coding: utf-8
import RPi.GPIO as GPIO
import os
import time

GPIO.setmode(GPIO.BCM)
GPIO.setup(23, GPIO.IN, pull_up_down=GPIO.PUD_UP)

cnt = 0
# 4回実行=4秒でループを抜ける
while cnt &lt; 4:
if GPIO.input(23) == GPIO.LOW:
cnt += 1
# 1秒待ってループ
time.sleep(1)
else:
print "now power-off !"
GPIO.cleanup()
# 4秒以上短絡状態が続いたらシャットダウン実行
os.system("/sbin/shutdown -h now")

自動実行設定

[ToDo 8]で作成した/fluent/start.shを修正します。

$ sudo vim /fluent/start.sh
#!/bin/sh
sh /fluent/fluent_start.sh & # fluentd起動スクリプト
sh /opt/sora/connect_air.sh & # SORACOM自動接続
python /home/pi/shodo-dynamo.py & # 照度センサーPythonコード
python /home/pi/sw.py & # シャットダウンボタンPythonコード

再起動して、IoT.kyoto VISのグラフが表示されること、短絡でシャットダウンが実行されることを確かめましょう。
正常に動作していることが確認できたら本コンテンツは終了です。お疲れ様でした!