ESP32のデータをSynapse Sparkでグラフ表示
IoTはじめました
ESP32(M5Stick-C)で計測した温度・気圧データをAzure IoT Hubに連携し、Azure SynapseでSpark(PySpark)を使ってJSONをDelta Lakeに変換し、Notebookで温度のグラフを表示してみました。
1.目的
- ESP32で計測した温度・気圧データをAzureに連携してDelta Lakeに入れたい。
- 温度のグラフを見たい。
2.概要
ESP32(M5Stic-C)で測定した温度データをSparkでDelta Lakeに変換してグラフを表示する。
- ESP32(M5Stick-C)からWi-FiでAzure IoT Hubにデータを送る。
データは MQTTでJSON形式のデータを送信する。
- IoT Hubにデータが溜まったらAzure Storage (Data Lake Storage Gen2) に出力する。
IoT Hubのルールを利用してJSONファイルを作成する。
(ESP32から送信したJSONデータがキー”Body”の内容としてBase64エンコードされた文字で格納されている)
- Synapse StudioでApache Spark Poolのタスクを実行するNotebookを作成して実行。
<内容>
1. JSONファイルをDataframeに読み込む
2. UTCをJSTに変換する
3. Base64デコードして必要なカラムだけを選択
4. DataframeをDelta Lakeに出力する
日本時間に変更した日付をパーティションキーとする

3.構築手順概要
M5Stick-C(ESP32)のコードにAzure IoT Hubの接続情報を書く必要があるので、IoT Hubを先に作成する。
3-1.Azure IoT Hub
- IoT Hubを作成
- デバイスIDを作成
- ストレージへ保存するためのルールを作成
3-2.M5Stick-C (ESP32)
●M5Stick-Cとは
espressif社が開発したモジュールESP32を使った、液晶とRTCが付いたデバイス。
現在は販売終了してM5Stack-C Plusが販売されている(2022年9月20日現在)。
<製品販売ページ>M5StickC Plus — スイッチサイエンス
今回はGroveに温度計(BMP280)を接続
- Azure BlogからArduinoソースをダウンロード
- iot_configs.hを変更
Wi-Fi、IoT Hub、デバイスIDの設定 - 送信データの取得、JSONの作成部分を変更
(ついでにログ出力と時間取得の部分も少し変更)
3-3.Synapse
- Synapseを作成
- Synapse Apache Spark Poolを作成
- Synapse StudioでNotebookを作成
4.価格
※1USD=137.275JPY(2022年8月16日現在)
4-1.各サービスの価格
■Azure IoT Hub
・Free エディションを利用しているので無料(利用できるデバイスが1台という制限がある)
■Azure Storage Account(Azure Data Lake Storage Gen2)
・約150円
・利用条件は下記の通り
→ライフサイクルポリシー
7日後にクール層へ移動
→入力ファイル
ファイル数:6,956個
ファイルサイズ:179,850,550Byte(約171.5MiB)
→出力ファイル
ファイル数:259個
ファイルサイズ:5,319,007Byte(約5MiB)
■Azure Synapse Apache Spark Pool
・Small(4コア/32GiB)x 3台
・1時間あたり:207.73円
・約15分:約52円
4-2.ストレージ価格の算出と目安
12秒に1回データを送信、12分で1回約32KiBのファイルを出力する。
1日に120個のファイルが出来上がる。
すべてクール層に移動された場合の1カ月あたりの価格:
120個 × 32KiB x ¥1.51003/GB x 1カ月730時間/1048576(KibをGibに変換)=約4円
ホット層(1週間)の価格:
120個 x 32 KiB x ¥2.7456/GB x 1週間168 時間/1048576(KibをGibに変換)=約1.7円
例えば、1,000台のデバイスからデータを受け取った時の半年分のストレージ(JSON)の価格は約25,000円となる。
5.コード
5-1.ESP32のAzure IoT Hubライブラリ
ArduinoでM5Stick-Cボードを追加してazure sdk for c arduinoライブラリを追加。
ファイルメニュー → ESP32のサンプルから利用できる。
参考URL:
●Aruinoライブラリ-azure sdk for c arduino
azure-sdk-for-c-arduino/readme.md at main · Azure/azure-sdk-for-c-arduino · GitHub
●Microsoft社-Internet of Things Blog
Azure IoT TLS: Changes are coming! (…and why you should care)
(TLSを使ってESP32とIoT Hubを接続)
5-2.Arduinoソースコード
AzIoTSasToken.cpp:
// Copyright (c) Microsoft Corporation. All rights reserved.
// SPDX-License-Identifier: MIT
#include "AzIoTSasToken.h"
#include "SerialLogger.h"
#include <az_result.h>
#include <mbedtls/base64.h>
#include <mbedtls/md.h>
#include <mbedtls/sha256.h>
#include <stdlib.h>
#include <time.h>
#define INDEFINITE_TIME ((time_t)-1)
#define az_span_is_content_equal(x, AZ_SPAN_EMPTY) (az_span_size(x) == az_span_size(AZ_SPAN_EMPTY) && az_span_ptr(x) == az_span_ptr(AZ_SPAN_EMPTY))
static uint32_t getSasTokenExpiration(const char* sasToken)
{
const char SE[] = { '&', 's', 'e', '=' };
uint32_t se_as_unix_time = 0;
int i, j;
for (i = 0, j = 0; sasToken[i] != '\0'; i++)
{
if (sasToken[i] == SE[j])
{
j++;
if (j == sizeof(SE))
{
// i is still at the '=' position. We must advance it by 1.
i++;
break;
}
}
else
{
j = 0;
}
}
if (j != sizeof(SE))
{
Logger.Error("Failed finding `se` field in SAS token");
}
else
{
int k = i;
while (sasToken[k] != '\0' && sasToken[k] != '&') { k++; }
if (az_result_failed(
az_span_atou32(az_span_create((uint8_t*)sasToken + i, k - i), &se_as_unix_time)))
{
Logger.Error("Failed parsing SAS token expiration timestamp");
}
}
return se_as_unix_time;
}
static void mbedtls_hmac_sha256(az_span key, az_span payload, az_span signed_payload)
{
mbedtls_md_context_t ctx;
mbedtls_md_type_t md_type = MBEDTLS_MD_SHA256;
mbedtls_md_init(&ctx);
mbedtls_md_setup(&ctx, mbedtls_md_info_from_type(md_type), 1);
mbedtls_md_hmac_starts(&ctx, (const unsigned char*)az_span_ptr(key), az_span_size(key));
mbedtls_md_hmac_update(&ctx, (const unsigned char*)az_span_ptr(payload), az_span_size(payload));
mbedtls_md_hmac_finish(&ctx, (byte*)az_span_ptr(signed_payload));
mbedtls_md_free(&ctx);
}
static void hmac_sha256_sign_signature(
az_span decoded_key,
az_span signature,
az_span signed_signature,
az_span* out_signed_signature)
{
mbedtls_hmac_sha256(decoded_key, signature, signed_signature);
*out_signed_signature = az_span_slice(signed_signature, 0, 32);
}
static void base64_encode_bytes(
az_span decoded_bytes,
az_span base64_encoded_bytes,
az_span* out_base64_encoded_bytes)
{
size_t len;
if (mbedtls_base64_encode(
az_span_ptr(base64_encoded_bytes),
(size_t)az_span_size(base64_encoded_bytes),
&len,
az_span_ptr(decoded_bytes),
(size_t)az_span_size(decoded_bytes))
!= 0)
{
Logger.Error("mbedtls_base64_encode fail");
}
*out_base64_encoded_bytes = az_span_create(az_span_ptr(base64_encoded_bytes), (int32_t)len);
}
static int decode_base64_bytes(
az_span base64_encoded_bytes,
az_span decoded_bytes,
az_span* out_decoded_bytes)
{
memset(az_span_ptr(decoded_bytes), 0, (size_t)az_span_size(decoded_bytes));
size_t len;
if (mbedtls_base64_decode(
az_span_ptr(decoded_bytes),
(size_t)az_span_size(decoded_bytes),
&len,
az_span_ptr(base64_encoded_bytes),
(size_t)az_span_size(base64_encoded_bytes))
!= 0)
{
Logger.Error("mbedtls_base64_decode fail");
return 1;
}
else
{
*out_decoded_bytes = az_span_create(az_span_ptr(decoded_bytes), (int32_t)len);
return 0;
}
}
static int iot_sample_generate_sas_base64_encoded_signed_signature(
az_span sas_base64_encoded_key,
az_span sas_signature,
az_span sas_base64_encoded_signed_signature,
az_span* out_sas_base64_encoded_signed_signature)
{
// Decode the sas base64 encoded key to use for HMAC signing.
char sas_decoded_key_buffer[32];
az_span sas_decoded_key = AZ_SPAN_FROM_BUFFER(sas_decoded_key_buffer);
if (decode_base64_bytes(sas_base64_encoded_key, sas_decoded_key, &sas_decoded_key) != 0)
{
Logger.Error("Failed generating encoded signed signature");
return 1;
}
// HMAC-SHA256 sign the signature with the decoded key.
char sas_hmac256_signed_signature_buffer[32];
az_span sas_hmac256_signed_signature = AZ_SPAN_FROM_BUFFER(sas_hmac256_signed_signature_buffer);
hmac_sha256_sign_signature(
sas_decoded_key, sas_signature, sas_hmac256_signed_signature, &sas_hmac256_signed_signature);
// Base64 encode the result of the HMAC signing.
base64_encode_bytes(
sas_hmac256_signed_signature,
sas_base64_encoded_signed_signature,
out_sas_base64_encoded_signed_signature);
return 0;
}
int64_t iot_sample_get_epoch_expiration_time_from_minutes(uint32_t minutes)
{
time_t now = time(NULL);
return (int64_t)(now + minutes * 60);
}
az_span generate_sas_token(
az_iot_hub_client* hub_client,
az_span device_key,
az_span sas_signature,
unsigned int expiryTimeInMinutes,
az_span sas_token)
{
az_result rc;
// Create the POSIX expiration time from input minutes.
uint64_t sas_duration = iot_sample_get_epoch_expiration_time_from_minutes(expiryTimeInMinutes);
// Get the signature that will later be signed with the decoded key.
// az_span sas_signature = AZ_SPAN_FROM_BUFFER(signature);
rc = az_iot_hub_client_sas_get_signature(hub_client, sas_duration, sas_signature, &sas_signature);
if (az_result_failed(rc))
{
Logger.Error("Could not get the signature for SAS key: az_result return code " + rc);
return AZ_SPAN_EMPTY;
}
// Generate the encoded, signed signature (b64 encoded, HMAC-SHA256 signing).
char b64enc_hmacsha256_signature[64];
az_span sas_base64_encoded_signed_signature = AZ_SPAN_FROM_BUFFER(b64enc_hmacsha256_signature);
if (iot_sample_generate_sas_base64_encoded_signed_signature(
device_key,
sas_signature,
sas_base64_encoded_signed_signature,
&sas_base64_encoded_signed_signature) != 0)
{
Logger.Error("Failed generating SAS token signed signature");
return AZ_SPAN_EMPTY;
}
// Get the resulting MQTT password, passing the base64 encoded, HMAC signed bytes.
size_t mqtt_password_length;
rc = az_iot_hub_client_sas_get_password(
hub_client,
sas_duration,
sas_base64_encoded_signed_signature,
AZ_SPAN_EMPTY,
(char*)az_span_ptr(sas_token),
az_span_size(sas_token),
&mqtt_password_length);
if (az_result_failed(rc))
{
Logger.Error("Could not get the password: az_result return code " + rc);
return AZ_SPAN_EMPTY;
}
else
{
return az_span_slice(sas_token, 0, mqtt_password_length);
}
}
AzIoTSasToken::AzIoTSasToken(
az_iot_hub_client* client,
az_span deviceKey,
az_span signatureBuffer,
az_span sasTokenBuffer)
{
this->client = client;
this->deviceKey = deviceKey;
this->signatureBuffer = signatureBuffer;
this->sasTokenBuffer = sasTokenBuffer;
this->expirationUnixTime = 0;
this->sasToken = AZ_SPAN_EMPTY;
}
int AzIoTSasToken::Generate(unsigned int expiryTimeInMinutes)
{
this->sasToken = generate_sas_token(
this->client,
this->deviceKey,
this->signatureBuffer,
expiryTimeInMinutes,
this->sasTokenBuffer);
if (az_span_is_content_equal(this->sasToken, AZ_SPAN_EMPTY))
{
Logger.Error("Failed generating SAS token");
return 1;
}
else
{
this->expirationUnixTime = getSasTokenExpiration((const char*)az_span_ptr(this->sasToken));
if (this->expirationUnixTime == 0)
{
Logger.Error("Failed getting the SAS token expiration time");
this->sasToken = AZ_SPAN_EMPTY;
return 1;
}
else
{
return 0;
}
}
}
bool AzIoTSasToken::IsExpired()
{
time_t now = time(NULL);
if (now == INDEFINITE_TIME)
{
Logger.Error("Failed getting current time");
return true;
}
else
{
return (now >= this->expirationUnixTime);
}
}
az_span AzIoTSasToken::Get() { return this->sasToken; }
AzIoTSasToken.h:
// Copyright (c) Microsoft Corporation. All rights reserved.
// SPDX-License-Identifier: MIT
#ifndef AZIOTSASTOKEN_H
#define AZIOTSASTOKEN_H
#include <Arduino.h>
#include <az_iot_hub_client.h>
#include <az_span.h>
class AzIoTSasToken
{
public:
AzIoTSasToken(
az_iot_hub_client* client,
az_span deviceKey,
az_span signatureBuffer,
az_span sasTokenBuffer);
int Generate(unsigned int expiryTimeInMinutes);
bool IsExpired();
az_span Get();
private:
az_iot_hub_client* client;
az_span deviceKey;
az_span signatureBuffer;
az_span sasTokenBuffer;
az_span sasToken;
uint32_t expirationUnixTime;
};
#endif // AZIOTSASTOKEN_H
iot_configs.h (Wi-Fi や Azure IoT Hub などの設定):
// Copyright (c) Microsoft Corporation. All rights reserved. // SPDX-License-Identifier: MIT // Wifi #define IOT_CONFIG_WIFI_SSID "<Wi-FiのSSID>" #define IOT_CONFIG_WIFI_PASSWORD "<Wi-Fiのパスワード>" // Enable macro IOT_CONFIG_USE_X509_CERT to use an x509 certificate to authenticate the IoT device. // The two main modes of authentication are through SAS tokens (automatically generated by the sample using the provided device symmetric key) or through x509 certificates. // Please choose the appropriate option according to your device authentication mode. // #define IOT_CONFIG_USE_X509_CERT #ifdef IOT_CONFIG_USE_X509_CERT /* * Please set the define IOT_CONFIG_DEVICE_CERT below with * the content of your device x509 certificate. * * Example: * #define IOT_CONFIG_DEVICE_CERT "-----BEGIN CERTIFICATE-----\r\n" \ * "MIIBJDCBywIUfeHrebBVa2eZAbouBgACp9R3BncwCgYIKoZIzj0EAwIwETEPMA0G\r\n" \ * "A1UEAwwGRFBTIENBMB4XDTIyMDMyMjazMTAzN1oXDTIzMDMyMjIzMTAzN1owGTEX\r\n" \ * "MBUGA1UEAwwOY29udG9zby1kZXZpY2UwWTATBgcqhkjOPQIBBggqhkjOPQMBBwNC\r\n" \ * ....... * "YmGzdaHTb6P1W+p+jmc+jJn1MAoGCXqGSM49BAMCA0gAMEUCIEnbEMsAdGFroMwl\r\n" \ * "vTfQahwsxN3xink9z1gtirrjQlqDAiEAyU+6TUJcG6d9JF+uJqsLFpsbbF3IzGAw\r\n" \ * "yC+koNRC0MU=\r\n" \ * "-----END CERTIFICATE-----" * */ #define IOT_CONFIG_DEVICE_CERT "Device Certificate" /* * Please set the define IOT_CONFIG_DEVICE_CERT_PRIVATE_KEY below with * the content of your device x509 private key. * * Example: * * #define IOT_CONFIG_DEVICE_CERT_PRIVATE_KEY "-----BEGIN EC PRIVATE KEY-----\r\n" \ * "MHcCAQEEIKGXkMMiO9D7jYpUjUGTBn7gGzeKPeNzCP83wbfQfLd9obAoGCCqGSM49\r\n" \ * "AwEHoUQDQgAEU6nQoYbjgJvBwaeD6MyAYmOSDg0QhEdyyV337qrlIbDEKvFsn1El\r\n" \ * "yRabc4dNp2Jhs3Xh02+j9Vvqfo5nPoyZ9Q==\r\n" \ * "-----END EC PRIVATE KEY-----" * * Note the type of key may different in your case. Such as BEGIN PRIVATE KEY * or BEGIN RSA PRIVATE KEY. * */ #define IOT_CONFIG_DEVICE_CERT_PRIVATE_KEY "Device Certificate Private Key" #endif // IOT_CONFIG_USE_X509_CERT // Azure IoT #define IOT_CONFIG_IOTHUB_FQDN "<IoT Hub の名前>.azure-devices.net" #define IOT_CONFIG_DEVICE_ID "<デバイスID>" // Use device key if not using certificates #ifndef IOT_CONFIG_USE_X509_CERT #define IOT_CONFIG_DEVICE_KEY "" #endif // IOT_CONFIG_USE_X509_CERT // Publish 1 message every 12 seconds #define TELEMETRY_FREQUENCY_MILLISECS (12 * 1000)
M5StickC_Azure-IoT-Hub_BMx280_I2C.ino:
// Copyright (c) Microsoft Corporation. All rights reserved.
// SPDX-License-Identifier: MIT
/*
This is an Arduino-based Azure IoT Hub sample for ESPRESSIF ESP32 boards.
It uses our Azure Embedded SDK for C to help interact with Azure IoT.
For reference, please visit https://github.com/azure/azure-sdk-for-c.
To connect and work with Azure IoT Hub you need an MQTT client, connecting, subscribing
and publishing to specific topics to use the messaging features of the hub.
Our azure-sdk-for-c is an MQTT client support library, helping composing and parsing the
MQTT topic names and messages exchanged with the Azure IoT Hub.
This sample performs the following tasks:
- Synchronize the device clock with a NTP server;
- Initialize our "az_iot_hub_client" (struct for data, part of our azure-sdk-for-c);
- Initialize the MQTT client (here we use ESPRESSIF's esp_mqtt_client, which also handle the tcp connection and TLS);
- Connect the MQTT client (using server-certificate validation, SAS-tokens for client authentication);
- Periodically send telemetry data to the Azure IoT Hub.
To properly connect to your Azure IoT Hub, please fill the information in the `iot_configs.h` file.
*/
// C99 libraries
#include <cstdlib>
#include <string.h>
#include <ESPPerfectTime.h>
// Libraries for MQTT client and WiFi connection
#include <WiFi.h>
#include <mqtt_client.h>
// Azure IoT SDK for C includes
#include <az_core.h>
#include <az_iot.h>
#include <azure_ca.h>
// Additional sample headers
#include "AzIoTSasToken.h"
#include "SerialLogger.h"
#include "iot_configs.h"
// When developing for your own Arduino-based platform,
// please follow the format '(ard;<platform>)'.
#define AZURE_SDK_CLIENT_USER_AGENT "c/" AZ_SDK_VERSION_STRING "(ard;esp32)"
// Utility macros and defines
#define sizeofarray(a) (sizeof(a) / sizeof(a[0]))
const char* TIME_ZONE = "JST-9";
#define NTP_SERVERS "ntp.nict.jp", "pool.ntp.org"
#define MQTT_QOS1 1
#define DO_NOT_RETAIN_MSG 0
#define SAS_TOKEN_DURATION_IN_MINUTES 60
#define UNIX_TIME_NOV_13_2017 1510592825
//#define PST_TIME_ZONE -8
//#define PST_TIME_ZONE_DAYLIGHT_SAVINGS_DIFF 1
//#define GMT_OFFSET_SECS (PST_TIME_ZONE * 3600)
//#define GMT_OFFSET_SECS_DST ((PST_TIME_ZONE + PST_TIME_ZONE_DAYLIGHT_SAVINGS_DIFF) * 3600)
#define GMT_OFFSET_SECS 0
#define GMT_OFFSET_SECS_DST 0
// Translate iot_configs.h defines into variables used by the sample
static const char* ssid = IOT_CONFIG_WIFI_SSID;
static const char* password = IOT_CONFIG_WIFI_PASSWORD;
static const char* host = IOT_CONFIG_IOTHUB_FQDN;
static const char* mqtt_broker_uri = "mqtts://" IOT_CONFIG_IOTHUB_FQDN;
static const char* device_id = IOT_CONFIG_DEVICE_ID;
static const int mqtt_port = AZ_IOT_DEFAULT_MQTT_CONNECT_PORT;
// Memory allocated for the sample's variables and structures.
static esp_mqtt_client_handle_t mqtt_client;
static az_iot_hub_client client;
static char mqtt_client_id[128];
static char mqtt_username[128];
static char mqtt_password[200];
static uint8_t sas_signature_buffer[256];
static unsigned long next_telemetry_send_time_ms = 0;
static char telemetry_topic[128];
static uint8_t telemetry_payload[200];
static uint32_t telemetry_send_count = 0;
#define INCOMING_DATA_BUFFER_SIZE 256
static char incoming_data[INCOMING_DATA_BUFFER_SIZE];
// Auxiliary functions
#ifndef IOT_CONFIG_USE_X509_CERT
static AzIoTSasToken sasToken(
&client,
AZ_SPAN_FROM_STR(IOT_CONFIG_DEVICE_KEY),
AZ_SPAN_FROM_BUFFER(sas_signature_buffer),
AZ_SPAN_FROM_BUFFER(mqtt_password));
#endif // IOT_CONFIG_USE_X509_CERT
#include <Arduino.h>
#include <Wire.h>
#include <M5StickC.h>
#include <BMx280I2C.h>
#define I2C_ADDRESS 0x76
//create a BMx280I2C object using the I2C interface with I2C Address 0x76
BMx280I2C bmx280(I2C_ADDRESS);
const unsigned char width = 150;
static unsigned int temp_index = 0;
const unsigned long measure_msec = 12 * 1000;
float temp[24*3600*1000/measure_msec];
float lastPressure;
// view_step : グラフ1メモリあたりのtemp配列をスキップする配列数
const unsigned char view_step[] = {1, 1 * 3600 * 1000 / measure_msec / width, 6 * 3600 * 1000 / measure_msec / width, 12 * 3600 * 1000 / measure_msec / width, 24 * 3600 * 1000 / measure_msec / width};
const String view_str[] = {"30m", " 1h", " 6h", "12h", "24h"};
unsigned char view_status = 4;
unsigned long last_msec;
unsigned char last_ntp_day = 0;
const unsigned char ntp_set_hour = 5;
unsigned long sync_rtc = 0;
const unsigned long sync_msec = 4 * 60 * 1000;
unsigned char lcd_day_mode_start_hour = 5;
unsigned char lcd_day_mode_end_hour = 21;
static void connectToWiFi()
{
Logger.Info("Connecting to WIFI SSID " + String(ssid));
M5.Lcd.println("Connecting to WIFI SSID " + String(ssid));
WiFi.mode(WIFI_STA);
WiFi.begin(ssid, password);
while (WiFi.status() != WL_CONNECTED)
{
delay(500);
Serial.print(".");
M5.Lcd.print(".");
}
Serial.println("");
M5.Lcd.println("");
Logger.Info("WiFi connected, IP address: " + WiFi.localIP().toString());
M5.Lcd.println("WiFi connected, IP address: " + WiFi.localIP().toString());
}
void readRtc()
{
RTC_TimeTypeDef rtctime;
RTC_DateTypeDef rtcdate;
struct tm tm_now;
M5.Rtc.GetTime(&rtctime);
int first_seconds = rtctime.Seconds;
// RTCの秒が変わる瞬間を待つ
unsigned long local_now = millis();
while (first_seconds == rtctime.Seconds)
{
while (millis() < local_now + 10 && millis() >= local_now) asm("nop \n");
M5.Rtc.GetTime(&rtctime);
}
M5.Rtc.GetData(&rtcdate);
tm_now.tm_hour = rtctime.Hours;
tm_now.tm_min = rtctime.Minutes;
tm_now.tm_sec = rtctime.Seconds;
tm_now.tm_year = rtcdate.Year - 1900;
tm_now.tm_mon = rtcdate.Month - 1;
tm_now.tm_mday = rtcdate.Date;
tm_now.tm_wday = rtcdate.WeekDay;
struct timespec tv_now;
// 秒が変わる前の時間を使っているので設定する時刻は1秒進める(処理で遅延したわずかな時間も追加して調整)
tv_now.tv_sec = mktime(&tm_now) + 1;
tv_now.tv_nsec = 050000000L;
clock_settime(CLOCK_REALTIME , &tv_now);
sync_rtc = millis();
}
void writeRtc()
{
RTC_TimeTypeDef rtctime;
RTC_DateTypeDef rtcdate;
suseconds_t us;
struct tm *now = pftime::localtime(nullptr, &us);
M5.Axp.LightSleep(SLEEP_MSEC(1000 - us / 1000));
now = pftime::localtime(nullptr);
rtctime.Hours = now->tm_hour;
rtctime.Minutes = now->tm_min;
rtctime.Seconds = now->tm_sec;
M5.Rtc.SetTime(&rtctime);
rtcdate.Year = now->tm_year + 1900;
rtcdate.Month = now->tm_mon + 1;
rtcdate.Date = now->tm_mday;
rtcdate.WeekDay = now->tm_wday;
M5.Rtc.SetData(&rtcdate);
sync_rtc = millis();
}
static void initializeTime()
{
Logger.Info("Setting time using SNTP");
M5.Lcd.println("Setting time using SNTP");
pftime::configTzTime(TIME_ZONE, NTP_SERVERS);
struct tm * now = pftime::localtime(nullptr);
unsigned long local_now = millis();
while (now->tm_year < 122)
{
while (millis() < local_now + 500 && millis() >= local_now) asm("nop \n");
Serial.print(".");
now = pftime::localtime(nullptr);
Serial.println(now, "%F %T");
}
Serial.println("");
Logger.Info("Time initialized!");
M5.Lcd.println("Time initialized!");
writeRtc();
last_ntp_day = now->tm_mday;
}
void receivedCallback(char* topic, byte* payload, unsigned int length)
{
Logger.Info("Received [");
Logger.Info(topic);
Logger.Info("]: ");
for (int i = 0; i < length; i++)
{
Serial.print((char)payload[i]);
}
Serial.println("");
}
static esp_err_t mqtt_event_handler(esp_mqtt_event_handle_t event)
{
switch (event->event_id)
{
int i, r;
case MQTT_EVENT_ERROR:
Logger.Info("MQTT event MQTT_EVENT_ERROR");
break;
case MQTT_EVENT_CONNECTED:
Logger.Info("MQTT event MQTT_EVENT_CONNECTED");
r = esp_mqtt_client_subscribe(mqtt_client, AZ_IOT_HUB_CLIENT_C2D_SUBSCRIBE_TOPIC, 1);
if (r == -1)
{
Logger.Error("Could not subscribe for cloud-to-device messages.");
}
else
{
Logger.Info("Subscribed for cloud-to-device messages; message id:" + String(r));
}
break;
case MQTT_EVENT_DISCONNECTED:
Logger.Info("MQTT event MQTT_EVENT_DISCONNECTED");
break;
case MQTT_EVENT_SUBSCRIBED:
Logger.Info("MQTT event MQTT_EVENT_SUBSCRIBED");
break;
case MQTT_EVENT_UNSUBSCRIBED:
Logger.Info("MQTT event MQTT_EVENT_UNSUBSCRIBED");
break;
case MQTT_EVENT_PUBLISHED:
Logger.Info("MQTT event MQTT_EVENT_PUBLISHED");
break;
case MQTT_EVENT_DATA:
Logger.Info("MQTT event MQTT_EVENT_DATA");
for (i = 0; i < (INCOMING_DATA_BUFFER_SIZE - 1) && i < event->topic_len; i++)
{
incoming_data[i] = event->topic[i];
}
incoming_data[i] = '\0';
Logger.Info("Topic: " + String(incoming_data));
for (i = 0; i < (INCOMING_DATA_BUFFER_SIZE - 1) && i < event->data_len; i++)
{
incoming_data[i] = event->data[i];
}
incoming_data[i] = '\0';
Logger.Info("Data: " + String(incoming_data));
break;
case MQTT_EVENT_BEFORE_CONNECT:
Logger.Info("MQTT event MQTT_EVENT_BEFORE_CONNECT");
break;
default:
Logger.Error("MQTT event UNKNOWN");
break;
}
return ESP_OK;
}
static void initializeIoTHubClient()
{
az_iot_hub_client_options options = az_iot_hub_client_options_default();
options.user_agent = AZ_SPAN_FROM_STR(AZURE_SDK_CLIENT_USER_AGENT);
if (az_result_failed(az_iot_hub_client_init(
&client,
az_span_create((uint8_t*)host, strlen(host)),
az_span_create((uint8_t*)device_id, strlen(device_id)),
&options)))
{
Logger.Error("Failed initializing Azure IoT Hub client");
return;
}
size_t client_id_length;
if (az_result_failed(az_iot_hub_client_get_client_id(
&client, mqtt_client_id, sizeof(mqtt_client_id) - 1, &client_id_length)))
{
Logger.Error("Failed getting client id");
return;
}
if (az_result_failed(az_iot_hub_client_get_user_name(
&client, mqtt_username, sizeofarray(mqtt_username), NULL)))
{
Logger.Error("Failed to get MQTT clientId, return code");
return;
}
Logger.Info("Client ID: " + String(mqtt_client_id));
Logger.Info("Username: " + String(mqtt_username));
}
static int initializeMqttClient()
{
#ifndef IOT_CONFIG_USE_X509_CERT
if (sasToken.Generate(SAS_TOKEN_DURATION_IN_MINUTES) != 0)
{
Logger.Error("Failed generating SAS token");
return 1;
}
#endif
esp_mqtt_client_config_t mqtt_config;
memset(&mqtt_config, 0, sizeof(mqtt_config));
mqtt_config.uri = mqtt_broker_uri;
mqtt_config.port = mqtt_port;
mqtt_config.client_id = mqtt_client_id;
mqtt_config.username = mqtt_username;
#ifdef IOT_CONFIG_USE_X509_CERT
Logger.Info("MQTT client using X509 Certificate authentication");
mqtt_config.client_cert_pem = IOT_CONFIG_DEVICE_CERT;
mqtt_config.client_key_pem = IOT_CONFIG_DEVICE_CERT_PRIVATE_KEY;
#else // Using SAS key
mqtt_config.password = (const char*)az_span_ptr(sasToken.Get());
#endif
mqtt_config.keepalive = 30;
mqtt_config.disable_clean_session = 0;
mqtt_config.disable_auto_reconnect = false;
mqtt_config.event_handle = mqtt_event_handler;
mqtt_config.user_context = NULL;
mqtt_config.cert_pem = (const char*)ca_pem;
mqtt_client = esp_mqtt_client_init(&mqtt_config);
if (mqtt_client == NULL)
{
Logger.Error("Failed creating mqtt client");
return 1;
}
esp_err_t start_result = esp_mqtt_client_start(mqtt_client);
if (start_result != ESP_OK)
{
Logger.Error("Could not start mqtt client; error code:" + start_result);
return 1;
}
else
{
Logger.Info("MQTT client started");
return 0;
}
}
/*
@brief Gets the number of seconds since UNIX epoch until now.
@return uint32_t Number of seconds.
*/
static uint32_t getEpochTimeInSecs()
{
return (uint32_t)pftime::localtime(NULL);
}
static void establishConnection()
{
connectToWiFi();
initializeTime();
initializeIoTHubClient();
(void)initializeMqttClient();
}
static void getTelemetryPayload(az_span payload, az_span* out_payload)
{
az_span original_payload = payload;
String str_payload;
str_payload
= "{\"msgCount\":" + String(telemetry_send_count++)
+ ",\"temperature\":" + String(temp[(temp_index + width - 1) % width], 2)
+ ",\"Pressure\":" + String(lastPressure, 2)
+ ",\"battery\":" + String(M5.Axp.GetBatVoltage(), 3) + "} ";
char buf[str_payload.length() + 1];
str_payload.toCharArray(buf, str_payload.length());
payload = az_span_copy(payload, az_span_create_from_str(buf));
payload = az_span_copy_u8(payload, '\0');
*out_payload = az_span_slice(original_payload, 0, az_span_size(original_payload) - az_span_size(payload) - 1);
}
static void printTemperature()
{
//start a measurement
if (!bmx280.measure())
{
Serial.println("could not start measurement, is a measurement already running?");
return;
}
//wait for the measurement to finish
unsigned long local_now = millis();
do
{
while (millis() < local_now + 100 && millis() >= local_now) asm("nop \n");
} while (!bmx280.hasValue());
Serial.println("temp_index: " + String(temp_index) + " width: " + String(width));
temp[temp_index] = bmx280.getTemperature();
lastPressure = bmx280.getPressure64() / 100;
Serial.print("Pressure (64 bit): "); Serial.println(lastPressure, 0);
Serial.print("Pressure : "); Serial.println(bmx280.getPressure() / 100, 0);
Serial.print("Temperature : "); Serial.println(temp[temp_index]);
float temp_min = 100;
float temp_max = -100;
float temp_ave = 0;
for (unsigned char i = 0; i < width; i++)
{
if ( temp_min > temp[i] ) temp_min = temp[i];
if ( temp_max < temp[i] ) temp_max = temp[i];
temp_ave += temp[i];
}
temp_ave = temp_ave / width;
M5.Lcd.fillScreen(TFT_BLACK);
int map_min;
int map_max;
if ( temp_max < temp_ave + 2.0f )
{
map_max = int((temp_ave + 2.0f) * 100);
} else {
map_max = int(temp_max * 100);
}
if ( temp_min > temp_ave - 2.0f )
{
map_min = int((temp_ave - 2.0f) * 100);
} else {
map_min = int(temp_min * 100);
}
for ( unsigned char i = 0; i < width; i++)
{
int y0 = map(int(temp[(i + temp_index + 1) % width] * 100), map_min, map_max, M5.Lcd.height() - 20, 2);
M5.Lcd.drawLine(i + 5, M5.Lcd.height() - 18, i + 5, y0, WHITE);
}
M5.Lcd.drawString("Temp: " + String(temp[temp_index], 2) + " " + String(temp_min, 2) + " " + String(temp_max, 2) + " ", 7, M5.Lcd.height() - 15, 1);
temp_index++;
if ( temp_index == width )
{
temp_index = 0;
}
if (!bmx280.writePowerMode(bmx280.BMx280_MODE_SLEEP)) {
Serial.print("fail for write power mode sleep.");
}
M5.Lcd.setCursor(0, 0);
struct tm * now = pftime::localtime(nullptr);
char strtimebuff[20];
strftime(strtimebuff, sizeof(strtimebuff), "%H:%M:%S", now);
M5.Lcd.setTextColor(GREEN);
M5.Lcd.printf(" %s", strtimebuff);
M5.Lcd.setTextColor(WHITE);
}
static void sendTelemetry()
{
az_span telemetry = AZ_SPAN_FROM_BUFFER(telemetry_payload);
Logger.Info("Sending telemetry ...");
// The topic could be obtained just once during setup,
// however if properties are used the topic need to be generated again to reflect the
// current values of the properties.
if (az_result_failed(az_iot_hub_client_telemetry_get_publish_topic(
&client, NULL, telemetry_topic, sizeof(telemetry_topic), NULL)))
{
Logger.Error("Failed az_iot_hub_client_telemetry_get_publish_topic");
return;
}
getTelemetryPayload(telemetry, &telemetry);
if (esp_mqtt_client_publish(
mqtt_client,
telemetry_topic,
(const char*)az_span_ptr(telemetry),
az_span_size(telemetry),
MQTT_QOS1,
DO_NOT_RETAIN_MSG)
== 0)
{
Logger.Error("Failed publishing");
}
else
{
Logger.Info("Message published successfully");
}
}
// Arduino setup and loop main functions.
void setup()
{
Serial.begin(115200);
M5.begin();
M5.Rtc.begin();
M5.Axp.begin(true, false, false, false, true, true);
M5.Axp.SetAdcState(true);
M5.Axp.ScreenBreath(8);
M5.Axp.SetChargeVoltage(VOLTAGE_4200MV);
M5.Axp.SetChargeCurrent(CURRENT_100MA);
M5.Axp.SetVOff(VOLTAGE_4360MV);
M5.Lcd.setRotation(1);
M5.Lcd.fillScreen(TFT_BLACK);
M5.Axp.SetLDO2(true);
establishConnection();
Wire.begin();
//begin() checks the Interface, reads the sensor ID (to differentiate between BMP280 and BME280)
//and reads compensation parameters.
if (!bmx280.begin())
{
Serial.println("begin() failed. check your BMx280 Interface and I2C Address.");
while (1);
}
Serial.println("sensor is a BMP280");
//reset sensor to default parameters.
bmx280.resetToDefaults();
//by default sensing is disabled and must be enabled by setting a non-zero
//oversampling setting.
//set an oversampling setting for pressure and temperature measurements.
bmx280.writeOversamplingPressure(BMx280MI::OSRS_P_x16);
bmx280.writeOversamplingTemperature(BMx280MI::OSRS_T_x16);
if (!bmx280.measure())
{
Serial.println("could not start measurement, is a measurement already running?");
M5.Lcd.println("could not start measurement, is a measurement already running?");
return;
}
//wait for the measurement to finish
unsigned long local_now = millis();
do
{
while (millis() < local_now + 100 && millis() >= local_now) asm("nop \n");
} while (!bmx280.hasValue());
float int_temp = bmx280.getTemperature();
for (unsigned char i = 0; i < sizeof(temp); i++)
{
temp[i] = int_temp;
}
temp_index = 0;
}
void loop()
{
M5.update();
unsigned long now_millis = millis();
if ( sync_rtc + sync_msec < now_millis || sync_rtc > now_millis || M5.BtnB.isPressed())
{
readRtc();
struct tm * now = pftime::localtime(nullptr);
if (last_ntp_day != now->tm_mday && now->tm_hour == ntp_set_hour )
{
initializeTime();
}
if (lcd_day_mode_start_hour <= now->tm_hour && lcd_day_mode_end_hour >= now->tm_hour)
{
M5.Axp.ScreenBreath(8);
} else {
M5.Axp.ScreenBreath(7);
}
}
if (M5.BtnA.isPressed())
{
view_status++;
view_stauts = view_status % 5;
printTemperature();
}
if (WiFi.status() != WL_CONNECTED)
{
connectToWiFi();
}
#ifndef IOT_CONFIG_USE_X509_CERT
else if (sasToken.IsExpired())
{
Logger.Info("SAS token expired; reconnecting with a new one.");
(void)esp_mqtt_client_destroy(mqtt_client);
initializeMqttClient();
}
#endif
else if (millis() > next_telemetry_send_time_ms || millis() < last_msec)
{
printTemperature();
// sendTelemetry();
last_msec = millis();
next_telemetry_send_time_ms = last_msec + TELEMETRY_FREQUENCY_MILLISECS;
}
delay(1);
}
SerialLogger.cpp:
// Copyright (c) Microsoft Corporation. All rights reserved.
// SPDX-License-Identifier: MIT
#include "SerialLogger.h"
#include <ESPPerfectTime.h>
#define UNIX_EPOCH_START_YEAR 1900
static void writeTime()
{
Serial.print(pftime::localtime(NULL), "%Y/%m/%d %T");
}
SerialLogger::SerialLogger() { Serial.begin(SERIAL_LOGGER_BAUD_RATE); }
void SerialLogger::Info(String message)
{
writeTime();
Serial.print(" [INFO] ");
Serial.println(message);
}
void SerialLogger::Error(String message)
{
writeTime();
Serial.print(" [ERROR] ");
Serial.println(message);
}
SerialLogger Logger;
SerialLogger.h:
// Copyright (c) Microsoft Corporation. All rights reserved.
// SPDX-License-Identifier: MIT
#ifndef SERIALLOGGER_H
#define SERIALLOGGER_H
#include <Arduino.h>
#ifndef SERIAL_LOGGER_BAUD_RATE
#define SERIAL_LOGGER_BAUD_RATE 115200
#endif
class SerialLogger
{
public:
SerialLogger();
void Info(String message);
void Error(String message);
};
extern SerialLogger Logger;
#endif // SERIALLOGGER_H
5-3.Synapse(PySpark)ソースコード
Synapse から Apache Spark Pool を利用する。
「開発」メニューから「ノートブック」を選択して新規ノートブックを作成する。
Synapse Spark Pool Notebook:
#!/usr/bin/env python
# coding: utf-8
# # IoTデータをDelta Lakeに変換するNotebook
#
# 入力は IoT Hub で Azure Datalake Storage Gen2 に吐き出したJSONファイル。
# ## Storage から JSON データを読み込み
#
# ディレクトリ構成: <コンテナ>/<テーブル名>/<パーティションキー>=<パーティション値>/<yyyyMMdd_HHmm_00.json
# In[1]:
sc = spark.sparkContext
telemetoryDF = spark.read.json("abfss://<コンテナ名>@<ストレージアカウント名>.dfs.core.windows.net/<テーブル名>")
telemetoryDF.count()
telemetoryDF.printSchema()
# In[2]:
telemetoryDF.count()
# In[3]:
from pyspark.sql.functions import from_json
from pyspark.sql.functions import from_utc_timestamp
from pyspark.sql.functions import unbase64
from pyspark.sql.functions import date_trunc
curratedTelemetoryDF =\
telemetoryDF\
.withColumn('unbase64', unbase64("Body").cast("string"))\
.withColumn(\
'unbase64'\
, from_json(\
unbase64("Body").cast("string")\
, 'msgCount int, temperature float, Pressure float, battery float'\
)\
)\
.withColumn(\
"EnqueuedTimeJST"\
, from_utc_timestamp(\
"EnqueuedTimeUtc"\
, "JST"\
)\
)\
.withColumn(\
"outdateJST"\
, date_trunc("day",\
from_utc_timestamp(\
"EnqueuedTimeUtc"\
, "JST"\
)\
)\
)\
.select("outdateJST", "EnqueuedTimeJST", "unbase64.temperature", "unbase64.Pressure").orderBy("EnqueuedTimeJST")
curratedTelemetoryDF\
.write\
.partitionBy("outdateJST")\
.format("delta")\
.mode("overwrite")\
.option("mergeSchema", "true")\
.option("overwriteSchema", "true")\
.save("abfss://<コンテナ名>@<ストレージアカウント名>.dfs.core.windows.net/<Delta用テーブル名>")
# ### DataFrameを明示的に削除
deltaDF = spark.read.format("delta").load("abfss://<コンテナ名>@<ストレージアカウント名>.dfs.core.windows.net/<Delta用テーブル名>")
# In[4]:
del curratedTelemetoryDF
del telemetoryDF
# ## Delta Lake から読み込み
# In[5]:
deltaDF = spark.read.format("delta").load("abfss://<コンテナ名>@<ストレージアカウント名>.dfs.core.windows.net/<Delta用テーブル名>")
deltaDF.printSchema()
# In[6]:
display(deltaDF.selectExpr("count(*) as cnt", "min(EnqueuedTimeJST) as mindate", "max(EnqueuedTimeJST) as maxdate"))
# In[7]:
display( deltaDF.where("outdateJST = '2022-08-03'").orderBy("EnqueuedTimeJST"))
# In[ ]:
6.結果
7.考察
- 簡単なグラフを見るだけならばBIツールが無くても大丈夫。
- StorageはS3と比較すると低コストな「クール層」への移動が楽。
また、思っていたより安い。 - 基本的にはHadoop Hiveと同じで、ファイルをどの大きさでどのように分割するかでパフォーマンスが変わるため、ここを考えることが重要。
(Hiveはステージングテーブルを使うとS3よりAzure Data Lake Storage Gen2がとても有利になりそうだが、Sparkなら関係なさそう) - 部屋が暑い、とても。
8.おわりに
今回ご紹介する手順は以上となります。
下記はセキュリティ関連以外でできそう、またはやると面白そうな内容になります。
ご参考になれば幸いです。
- Wi-Fi の設定は「iot_configs.h」で行っているが、初期設定でWi-Fi APを使ってWebサーバを起動し、Wi-FiやデバイスIDを設定できるようにしたほうが使いやすそう。
- Databricksでもそのまま同じことができそう。
- デバイスIDでテナント化し、クラウド側で行レベルセキュリティでユーザーごとに見えるレコードを設定しても面白そう。
- Delta LakeならSynapseのサーバレスSQLプールからView(Openrowset)として参照できる。
Power BIをはじめとするBIツールやWebアプリ(App Services)からデータが見やすいので良さそう。 - Event GridでIoT Hubがファイルを吐き出したタイミングを拾って連携すると良さそうだが、デバイスの数が少ないとDelta Lakeのファイルが小さく分割されてしまい効率が悪そう。
少なくとも、デバイスが数十台~数百台くらいからでないと効率が悪いのではないか。
それより少ない場合は、Vacuum作業と割り切ってデータを全部作り直すか、月単位とか週単位で再作成するなど。 - ディレクトリ構成は他のテレメトリデータやマートテーブル用のディレクトリを用意することを考えると、追加で1~2階層のディレクトリを用意して分類・整理したほうが良いかも知れない。
また、ゾーンやメダリオンを採用するなら、ゾーンごとにコンテナを分けるか、一部のゾーンでアクセスが多くなりそうならばそこだけアカウントを分けるなどした方が良いと考えられる。
9.Appendix:参考URL
- Azure Synapse Analytics Apache Spark Pool
・Azure Synapse Analytics-Azure Synapse Analytics | Microsoft Learn
・Apache Sparkプールの構成-Azure Synapse Analytics | Microsoft Learn
・クイックスタート: Azure portalを使用してサーバーレスApache Sparkプールを作成する-Azure Synapse Analytics | Microsoft Learn - Azure IoT Hub
・Azure IoT Hub のドキュメント | Microsoft Learn
・クイックスタート: IoTプラグ アンド プレイ デバイスからAzure IoT Hubにテレメトリを送信する | Microsoft Learn - Azure IoT Explorer
・Azure IoT Explorer | GitHub
・Azure IoTエクスプローラーをインストールして使用する | Microsoft Learn - Delta Lake
・Delta Lake on Databricks
・Delta Lake - デバイス側
・M5StickC
・Azure esp32
・Azure Samples iot middleware FreeRTOS samples
・Azure SDK for Embedded C
・Azure IoT TLS:Changes are coming!(…and why you should care) - ゾーンとメダリオンアーキテクチャ
・Data lake zones and containers-Cloud Adoption Framework | Microsoft Learn
・Medallion Architecture – Databricks
