本記事はSORACOMサービスリリース1周年記念リレーブログ10月26日分です。
ども、AWS Samurai 2015の辻です。
SORACOMにはIoTプレイヤーにとって嬉しいサービスがたくさんありますが、その中でも私が大好きなのがSORACOM Funnelです。AWSであればKinesisに直結してくれるサービスなのですが、Kinesis自体が割と敷居高めなサービスであるせいか、SORACOM Funnelの使い方を紹介した記事はネット上にもあまりありません。1周年記念ですので、可視化まで含めて手順を大公開したいと思いますっ!!
SORACOM Funnelの何がいいって、そりゃぁもうお客様に「セキュリティは・・・」って聞かれてもすかさず「閉域だから大丈夫です!」ってドヤ顔できるところです。しかもIoTデバイスにクレデンシャル(認証情報)をもたなくていいなんてまさに夢のようです。今回はこんな感じで遊んでみようと思います。
なにやら萌えキャラがいますね!可視化に利用するIoT.kyoto VISのイメージキャラクター「いおたん(IoTan)」です。いおたんはこんな経緯で誕生しました。そして巨大化して目からビームを出し喋るようになりました!(今回の記事には全く関係ありませんw)
前提条件
- AWSアカウントをもっていること
- SORACOM Airでインターネット接続できるRaspberryPiをもっていること(この部分の手順は説明しません)
- RaspberryPiに接続して計測できる、照度・温度・超音波等のセンサーをもっていること
RASPBIAN JESSIE 2016-05-27版で動作確認を行っています。これより古いバージョンだと期待通りに動作しない可能性があります。
[ToDo 1] RaspberryPiで何かをセンシングし、テキストファイルに保存できるようにする(RasPi)
RaspberryPiにセンサーを接続し、計測値を基にこんな感じのJSONがテキストファイルに保存できていればOKです。デバイスID・計測値・タイムスタンプが必須です。以後、/home/pi/shodo-logs/shodo-funnel.log
にログが保存されるものとして説明します。
{"brightness": 164, "ID": "id000", "time_sensor": "2016-03-28 15:16:48"}
{"brightness": 692, "ID": "id000", "time_sensor": "2016-03-28 15:16:51"}
{"brightness": 720, "ID": "id000", "time_sensor": "2016-03-28 15:16:54"}
{"brightness": 945, "ID": "id000", "time_sensor": "2016-03-28 15:16:57"}
{"brightness": 9, "ID": "id000", "time_sensor": "2016-03-28 15:17:00"}
実装例として、照度センサーを使った手順を以前のリレーブログで紹介しました。RaspberryPiでfluentdを使ってDynamoDBに書き込むの[Todo 0]と[ToDo 1]を参考にしてください。以下、注意点です。
$ sudo apt-get update
は必ず実行するようにしてください- 今回はSORACOM Airでインターネットに接続しますので、リンク先[ToDo 1]のWi-Fi接続確認で必ずしもインターネット接続できている必要はなく、SSH接続の確認ができればOKです。有線LAN接続やHDMI接続のモニタでも代用できます
- 今回の実装において、SORACOM Funnelは1秒間隔といった短い周期でメッセージを送信すると処理が間に合わないことがあります。3秒以上間隔を空けてメッセージを送信するようにしてください。リンク先[ToDo 1]のコードは1秒間隔になっていますので修正が必要です
[ToDo 2] SORACOM Airでインターネット接続する(RasPi)
SORACOM公式ハンズオンテキストが参考になると思います。接続が完了したらRaspberryPiにSSHで接続して下記コマンドを実行します。
$ curl ifconfig.io
54.250.xxx.xxx
#IPv6アドレスが表示される場合は次のコマンドを使って下さい
$ curl ifconfig.me
54.250.xxx.xxx
返り値(グローバルIPアドレス)が54.250.xxx.xxxとなっていればSORACOM Air経由でインターネットに接続できています。
[ToDo 3] APIアクセス用のKeyを作成する(AWS)
KinesisやDynamoDBにAPIでアクセスするために、AWSのIAM Access Keyを作成します。
今回作成するのは下記にある2つのユーザーおよびそれにひも付くKeyです。
手順はこちらを参照してください。(手抜きですいません汗)
ユーザー名 | アクセス権 | 使用箇所 |
---|---|---|
Kinesis-full | AmazonKinesisFullAccess AmazonKinesisFirehoseFullAccess |
FunnelからKinesisへの書き込み |
dynamo-read | AmazonDynamoDBReadOnlyAccess | IoT.kyoto VISからDynamoDBへの読み出し |
[ToDo 4] Kinesisの設定(AWS)
今回は動作確認用として、S3 / Redshift / Elasticsearch Serviceにログを配送してくれるKinesis Firehoseと、概要の図で登場しているKinesis Streamsを利用します。
Kinesis Streamsをざっくり説明する以下のような感じでしょうか。
- Streamにデータをに投げ込むと一定期間(デフォルトで24時間)キャッシュしてくれて、その間にデータを取り出すことができる
- データはクレデンシャルをつけてエンドポイントにHTTPSでPOSTする
- データの取り出しはキャッシュ期間中何回でもOK
- サイジングは「シャード」という単位で指定
- シャードをオートスケールする機能はない
- 1シャードで最大1000メッセージ/秒もさばいてくれるので、スパイクが来ても安心
- シャードを拡張してもエンドポイントは1つのまま。よろしく分散してくれる(分散処理のためのパーティションキーの指定が必要)
- マネージドサービスなのでインフラの運用は不要。SPOFも考慮しなくて良い
Kinesis Streamsはデータを取り出して処理をする部分(Kinesis App)をLambdaなどで開発する必要がありますが、Kinesis Firehoseを使えばS3 / Redshift / Elasticsearch Serviceのどれかにデータを配送してくれます。Kinesis Firehoseの仕様はKinesis Streamsとは若干異なりますので、詳細は上記リンク先を参照して下さい。
AWS IoTと何が違うの?
IoT向けの同じような機能を持つサービスとしてAWS IoTがあります。誤解を恐れずざっくりと違いを説明します。
- 全ノード合計で1メッセージ/秒を超えるような場合は、AWS IoTの方が割高
- Kinesisの対応プロトコルはHTTPSのみ、AWS IoTはMQTTなど様々なプロトコルに対応
- 1メッセージあたりの最大ペイロードは、Kinesis -> 1MB 、AWS IoT ->128KB
- Kinesisはデバイス-> クラウドの1方向のみ、AWS IoTはクラウド側からデバイスを制御できる
AWS IoTの方が高機能な分割高と言えそうですね。要件に合わせてうまく使い分けましょう。
Kinesis Firehoseの設定
- Kinesisコンソールを開く
- Kinesis Firehoseは2016年10月現在、東京リージョンでは提供されていないので、利用可能な北バージニア(N.Virginia)リージョンを選択する
- [Go to Firehose]をクリックする
- [Create Delivery Stream]をクリックする
- 配送先に[Amazon S3]を選択する
- Stream Nameを入力し、ログを格納するS3バケットを選択(ここではともに「iotan-f」としている)する。バケットはここで作成することもできる
- ログのファイル名はデフォルトでStream名+タイムスタンプとなるが、必要であればプレフィックスを指定する
- [Next]をクリックする
- IAM roleで[Firehose Delivery IAM role]を選択する。他はデフォルトでOK
- そのまま[Allow]をクリックする
- そのまま[Next]をクリックする
- [Create Delivery Stream]をクリックする
- S3配送Streamが作成された。[ACTIVE]になったら利用可能
Kinesis Streamsの設定
- Kinesisコンソールを開く
- とくに理由がなければ、パフォーマンス面で有利な東京リージョンを選択する
- [Go to Streams]をクリックする
- [Create Stream]をクリックする
- Stream Nameを入力(ここでは「iotan-s」としている)する
- シャード数を入力する。1で充分
- [Create]をクリックする
- Streamが作成された。[ACTIVE]になったら利用可能
[ToDo 5] SORACOM Funnelの設定(SORACOM)
- SORACOMユーザーコンソールを開く
- [セキュリティ]の画面を開く
- [認証情報ストア] -> [認証情報を登録]
- 任意の認証情報ID(ここでは「iotan-kinesis」としている)を入力する
- [Todo 2]で作成した「kinesis-full」の認証情報を入力する
- [登録]をクリックする
- 認証情報が登録された
- [グループ]の画面を開く
- [追加]をクリックする
- 任意のグループ名(ここでは「iotan-kinesis」としている)を入力して[グループ作成]をクリックする
- 登録済みのグループが一覧表示されるので、先ほど登録したグループをクリックする
- [SORACOM Funnel 設定]を展開する
- まずは動作確認でKinesis Firehoseにデータを流すので、リソースタイプで[Kinesis Firehose]を選択する
- リソースURLは次のように入力する
https://kfirehose.us-east-1.amazonaws.com/{stream_name}
(北バージニアリージョンの場合) - 認証情報は先ほど作成した認証情報名を選択する
- 送信するデータ形式は[JSON]を選択する
- [保存]をクリックする
- [SIM管理]の画面を開く
- RaspberryPiの通信に使用しているSIMを選択して右クリック -> [所属グループ変更]
- 先ほど作成したグループを選択し、[グループ変更]をクリックする
- 所属グループが変更された
データの配送先をKinesis Streamsに変更する
[ToDo 8]でKinesis FirehoseによるSORACOM Funnelの動作確認が終わったら下記の手順で配送先を変更します。
- [グループ]の画面を開く
- 登録済みのグループが一覧表示されるので、先ほど登録したグループをクリックする
- [SORACOM Funnel 設定]を展開する
- リソースタイプで[Kinesis Streams]を選択する
- リソースURLは次のように入力する
https://kinesis.ap-northeast-1.amazonaws.com/{stream_name}
(東京リージョンの場合) - [保存]をクリックする
[ToDo 6] fluentdのインストール(RasPi)
RaspberryPiからのデータ配送ミドルウェアとしてfluentdを使用します。動作概念についてはこちらを参照下さい。今回はoutputプラグインとして「out-httpプラグイン」を使用します。他にはリンク先にあるようにDynamoDBに配送したり、KinesisやS3に配送することなどもできます。
「out-httpプラグインは」単にHTTPでPOSTするだけのプラグインです。そう、暗号化もしません。セキュリティ大丈夫なの?と思ったあなたはSORACOM Funnelのサービス説明を熟読しましょう。モバイルネットワーク上の暗号化は3G網側にオフロードされ、AWSへの認証や暗号化はSORACOM Funnelがよろしくやってくれます。
そう、IoT開発者にとって面倒臭いところはSORACOMさんが頑張ってくれるのです。素敵(はぁと。さらにKinesisまで閉域網相当で接続できるのです。もう抱いてw。「相当」と書いたのは、Kinesisがユーザー毎の閉じた環境(AWSでいうVPC)内にあるわけじゃなくて、AWSユーザー共用のネットワーク空間に存在するからです。
以下、RaspberryPiへのfluentdインストール手順です。
fluentd本体のインストール
数分かかります。
$ sudo apt-get install ruby-dev libssl-dev
$ sudo gem install fluentd --no-ri --no-rdoc
※今回使うout-httpプラグインは最新バージョンである0.14系で動作することを確認していますが、他のプラグインも使用する場合は0.12系の最新版をインストールした方が良いかもしれません。その場合のコマンドは下記
$ sudo gem install fluentd -v "~> 0.12.0" --no-ri --no-rdoc
out-httpプラグインのインストール
数分かかります。
$ sudo fluent-gem install fluent-plugin-out-http --no-ri --no-rdoc
インストールの確認
バージョンは若干異なるかもしれません。
$ sudo gem list fluent
*** LOCAL GEMS ***
fluent-plugin-out-http (0.1.4)
fluentd (0.14.4, 0.10.61)
[ToDo 7] fluentdの設定(RasPi)
設定ファイルとログファイルの作成
センサーログファイル(.logと.pos)のパスは適宜書き換えて下さい。
$ sudo touch /home/pi/shodo-logs/shodo-funnel.pos
$ sudo mkdir /fluent
$ sudo touch /fluent/fluent.log
$ sudo vim /fluent/fluent.conf
↓fluent.conf↓
<source>
# inputにtailプラグインを指定
@type tail
# フォーマットを指定
format json
# ログファイルをフルパスで指定
path /home/pi/shodo-logs/shodo-funnel.log
# ファイル内のどの行までを読んだかを記録しておくファイルを指定
pos_file /home/pi/shodo-logs/shodo-funnel.pos
# 分かりやすいタグを指定
tag funnel.shodo
</source>
# 以下のタグにマッチしたものだけoutput処理される
<match funnel.**>
@type http
# SORACOM Funnelエンドポイントを指定
endpoint_url http://funnel.soracom.io/
#以下、各種設定
http_method post # default: post
serializer json # default: form
rate_limit_msec 100 # default: 0 = no rate limiting
raise_on_error true # default: true
</match>
テスト
[error]
の行がないか確認します。
$ sudo fluentd -c /fluent/fluent.conf --dry-run
2016-10-20 19:35:36 +0900 [info]: reading config file path="/fluent/fluent.conf"
2016-10-20 19:35:36 +0900 [info]: starting fluentd-0.14.4 as dry run mode
2016-10-20 19:35:36 +0900 [info]: gem 'fluent-plugin-out-http' version '0.1.4'
2016-10-20 19:35:36 +0900 [info]: gem 'fluentd' version '0.14.4'
2016-10-20 19:35:36 +0900 [info]: gem 'fluentd' version '0.10.61'
2016-10-20 19:35:36 +0900 [info]: adding match pattern="funnel.**" type="http"
2016-10-20 19:35:36 +0900 [info]: adding source type="tail"
2016-10-20 19:35:36 +0900 [info]: using configuration file: <ROOT>
<source>
@type tail
format json
path "/home/pi/shodo-logs/shodo-funnel.log"
pos_file "/home/pi/shodo-logs/shodo-funnel.pos"
tag "funnel.shodo"
</source>
<match funnel.**>
@type http
endpoint_url "http://funnel.soracom.io/"
http_method "post"
serializer "json"
rate_limit_msec 100
raise_on_error false
</match>
</ROOT>
fluentdを実行する
$ sudo fluentd -c /fluent/fluent.conf -v -o /fluent/fluent.log &
[1] 20284
正常に実行されているかどうかは/fluent/fluent.logを確認します。起動時に自動実行する場合は、/etc/rc.local
に記述してください。
※「20284」はPIDなので、値は実行の度に変わります
[ToDo 8] Kinesis Firehoseにデータを流してみる(主にAWS)
センシングを開始してログファイルにJSONが書き込まれていることを確認します。正しく設定されていれば、S3バケットにログファイルが書き込まれるはずです。ログは5分毎に書き込まれるので、S3バケットに保存されるまで数分のタイムラグがあります。
正しく動作していなければ/fluent/fluent.log
を確認しましょう。
- Kinesis Firehoseコンソールを開く
- Streamを選択 -> [Actions] -> [Monitor]
- 折れ線グラフが表示されていれば正常に動作している
- S3コンソールを開く
- Kinesis Firehoseの設定で指定したバケットを開く
- ログが書き込まれていることが確認できる(ファイル名のタイムスタンプはUTC)
- ログをダウンロードして開くとJSONが書き込まれていることが確認できる
[ToDo 9] Lambda Functionを作成する(AWS)
先ほどはKinesis FirehoseがS3バケットにデータを配送してくれましたが、DynamoDBにデータを格納するにはRaspberryPi -> Kinesis Streams -> Lambda -> DynamoDBというデータストリームを構築する必要があります。Kinesis Streamsにデータが書き込まれたことをトリガーにLambda Functionが起動して、DynamoDBにデータを書き込みます。では、Lambda Functionを作成しましょう。
asyncライブラリを取得する
Lambda Functionは、Java / NodeJS / Pythonで記述することができますが、今回はNodeJSを使用します。実行に必要なライブラリは、あらかじめ実行するコードとともにアップロードする必要があります。ライブラリはLinux環境でnpmコマンドを使って取得します。もちろん、RaspberryPiも利用できます。
$ cd ~
$ sudo apt-get install -y npm
$ mkdir node-lib
$ cd node-lib
$ npm install async
$ ls -l
total 4
drwxr-xr-x 3 pi pi 4096 Oct 20 23:03 node_modules
node-lib/node_modules/
にライブラリがインストールされていますので、node_modulesディレクトリごとSFTP等でPCローカルにダウンロードしておきます。
Lambda Function用のコードを作成する
- PCのテキストエディタで
index.js
という名前のファイルを作成する。コメント行の箇所は適宜修正する
var AWS = require('aws-sdk');
var async = require('async');
//DynamoDBのリージョンに合わせて変更する
var dynamodb = new AWS.DynamoDB({region: 'ap-northeast-1'});
var get_current_timestamp = function() {
var d = new Date();
var month = d.getMonth() + 1;
var day = d.getDate();
var hour = d.getHours();
var minute = d.getMinutes();
var second = d.getSeconds();
if (month < 10) {month = "0" + month;}
if (day < 10) {day = "0" + day;}
if (hour < 10) {hour = "0" + hour;}
if (minute < 10) {minute = "0" + minute;}
if (second < 10) {second = "0" + second;}
return d.getFullYear() + "-" + month + "-" + day + "T" + hour + ":" + minute + ":" + second + "Z";
}
exports.handler = function(event, context) {
async.eachSeries(event.Records, function(v, callback){
var encodedPayload = v.kinesis.data;
var payload = new Buffer(encodedPayload, 'base64').toString('ascii');
var data = JSON.parse(payload);
if(typeof data.operatorId != "undefined"){
var params = {
Item: {
//タイムスタンプのkey名に合わせて変更する
'time_sensor': { S: data.payloads.time_sensor},
//Lambdaの処理時刻をUTCで記録する
'time_lambda': { S: get_current_timestamp()},
//デバイスIDのkey名に合わせて変更する
'ID': { S: data.payloads.ID},
//センサー計測値のkey名に合わせて変更する
'brightness': { N: String(data.payloads.brightness)}
},
//DynamoDBのテーブル名に合わせて変更する
'TableName': 'iotan-funnel'
};
}else{
console.log('NOT SORACOM');
}
dynamodb.putItem(params, function(err) {
if (err) console.log(err, err.stack); callback();
});
}, function(err,result){
if (err) {
context.fail(err);
} else {
context.succeed('success');
}
});
}
index.js
と先ほどダウンロードしたnode_modules
をmyfunc.zip
というファイル名でZIP圧縮する
Lambda Functionを作成する
- Lambdaコンソールを開く
- リージョンが「Asia Pacific (Tokyo)」でない場合は変更する
- [Get Started Now]をクリックする
- [Blank Function]をクリックする
- 点線囲みをクリック -> [Kinesis] -> [Next]
- Kinesis streamで[ToDo 4]で作成したStreamを選択する
- Starting positionで[Trim horizon]を選択する
- [Enable trigger]にチェックを入れる
- [Next]をクリックする
- Nameに任意のFunction名(ここでは「iotan-funnel」としている)を入力する
- 先ほど作成したmyfunc.zipをアップロードする
- Roleで[Create a custom role]を選択する
- 新しいタブでロールの設定画面が開く
- Role Nameに任意のロール名(ここでは「lambda_DynamoDB」としている)を入力する
- [Edit]をクリックし、テキストボックスのJSONを削除して下記のJSONを貼り付ける
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "Stmt1428341300017",
"Action": [
"dynamodb:DeleteItem",
"dynamodb:GetItem",
"dynamodb:PutItem",
"dynamodb:Query",
"dynamodb:Scan",
"dynamodb:UpdateItem"
],
"Effect": "Allow",
"Resource": "*"
},
{
"Sid": "",
"Resource": "*",
"Action": [
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:PutLogEvents"
],
"Effect": "Allow"
},
{
"Effect": "Allow",
"Action": [
"lambda:InvokeFunction"
],
"Resource": "*"
},
{
"Effect": "Allow",
"Action": [
"kinesis:*"
],
"Resource": "*"
}
]
}
- [Allow]をクリックするとタブが閉じる
- 元の画面に戻って、Timeoutを30秒に設定する
- [Next]をクリックする
- ロール名が表示されていることを確認して[Create function]をクリックする。もし、ロールが当たっていない場合は[Edit]をクリックして修正する
[ToDo 10] DynamoDBのテーブルを作成する(AWS)
key名は[ToDo 9]で設定したものと同じものを入力して下さい。
- DynamoDBコンソールを開く
- リージョンが「Asia Pacific (Tokyo)」でない場合は変更する
- [Create table]をクリックする
- Table nameに先ほどLambdaで指定したテーブル名(ここでは「iotan-funnel」としている)を入力する
- Partition keyにデバイスIDのKey(ここでは「ID」としている)を入力する
- [Add sort key]にチェックを入れ、タイムスタンプのKey(ここではは「time_sensor」としている)を入力する
- [Create]をクリックする
- テーブル作成が終わるのを待つ
[ToDo 11] Kinesis Streamsにデータを流してDynamoDBにデータが格納されることを確かめる(主にAWS)
SORACOM Funnelの配送先をKinesis Streamsに変更する
[ToDo 5]の最終項の手順で変更してください。
DynamoDBにデータが格納されることを確かめる
RaspberryPiからデータを送信し、下記手順でDynamoDBにデータが書き込まれていることを確かめます。
- [Tables] -> (作成したテーブル) -> [Items]の順にクリックする
- [Query]を選択する
- [Partition key]に現在送信中のデバイスIDを入力する
- [Descending]を選択する
- [Start search]をクリックする度に最新のレコードが追加されていくのがわかる
トラブルシューティング
Kinesis StreamsとLambdaのメトリックスやログを確認すると良いでしょう。
- Kinesis Streamsのメトリックス
- Lambdaの簡易メトリックス。この画面からログの画面に遷移できる
- Lambdaのログの画面。エラーメッセージも表示される
[ToDo 12] IoT.kyoto VISでグラフ表示する
ブラウザはChrome(Win/Mac)またはSafari(Mac)をお使い下さい。
最後にDynamoDBのデータをIoT.kyoto VISで可視化しましょう。
サインアップ
https://vis.iot.kyotoにアクセスし、「新規登録」をクリックします。
設定
IoT.kyoto VIS公式マニュアルの3~4章を参照。
AWSのKeyには、[ToDo 3]で作成した「dynamo-read」ユーザーのKeyを使用します。
いろいろ試してみる
[レンジ設定]を[手動]に変更して0~1000くらいに設定するとグラフが見やすくなります。下記のようなことを試してみましょう。
- フォトトランジスタにライトを当てたり手で覆ったりして、グラフの変化や描画までの遅延を確認する
- 閾値超えのアラートメールが届くか試してみる
- 日時指定で過去の履歴を表示してみる
- 別windowでターミナルを開き、デバイスIDを変えたPythonコードを実行(擬似的に複数デバイスが接続された状態となる)して、IoT.kyoto VISの画面で2個のグラフを同時に表示してみる
- データの送信間隔を変えてみる(「グラフ設定(歯車)」でグラフの更新間隔を変えることができます)
まとめ
今回は下記2通りのデータストリームを構築しました。
- デバイス -> SORACOM Funnel -> Kinesis Firehose -> S3
- デバイス -> SORACOM Funnel -> Kinesis Streams -> Lambda -> DynamoDB
前者は、とりあえず手軽にデータを集めたいというようなケースに最適です。また、RedshiftやElasticsearch Serviceにも流せるので夢が広がりングですね!
後者は、集めたデータを基に可視化や監視などのアプリケーションを構築したい場合に最適です。DynamoDBはちゃんと分かって使えばSQL系のDBMSでは実現できなかったような超強力なアプリケーションが作れちゃいます。また、監視系のビジネスロジックはLambdaでサクッと実装しちゃうのが良いですね。
SORACOMを単なるインターネット接続手段として使うなんてもったいない!!ぜひSORACOMの世界にDeep Diveしてみてください。