Реализация клиента MQTT для реактивных систем
MQTT-Reactive - это клиент MQTT v3.1.1, производный от библиотеки LiamBindle MQTT-C. Цель MQTT-Reactive - предоставить переносимый и неблокирующий клиент MQTT, написанный на C, для использования во встроенных реактивных системах. Прежде всего, в этой статье объясняется, что такое реактивная система. Затем описывается, как разработать подходящую структуру программного обеспечения для такого типа системы. Наконец, в статье показано, как использовать библиотеку MQTT-Reactive в реактивной системе с помощью конечного автомата и парадигмы, управляемой событиями. Для этого в статье используется реальное устройство IoT в качестве наглядного примера, из которого в статье объясняется его программная структура и поведение на основе состояний с использованием диаграмм UML, таких как конечный автомат, взаимодействие и структура. В статье также приведены рекомендации по реализации MQTT-Reactive клиента IoT-устройства на языке C.
Многие встроенные системы являются реактивными, т. Е. Реагируют на внутренние или внешние события. Как только эти реакции завершены, программное обеспечение возвращается к ожиданию следующего события. Вот почему системы, управляемые событиями, также называют реактивными системами.
Программирование, управляемое событиями, или реактивное программирование, является одной из наиболее подходящих парадигм программирования для создания гибкого, предсказуемого и обслуживаемого программного обеспечения для реактивных систем. В этой парадигме ход программы определяется событиями. Часто структура реактивного программного обеспечения состоит из нескольких одновременно работающих модулей, так называемых активных объектов, которые ожидают и обрабатывают различные виды событий. Каждый активный объект владеет потоком управления и очередью событий, через которую он обрабатывает свои входящие события. В реактивных системах активные объекты обычно имеют поведение на основе состояний, определенное в диаграмме состояний.
Чтобы изучить, как использовать библиотеку MQTT-Reactive в реактивной системе с множеством одновременных задач и с использованием как конечного автомата, так и парадигмы, управляемой событиями, мы используем устройство IoT в качестве примера.
Идея использования протокола MQTT родилась во время разработки устройства IoT для железнодорожной компании. Это устройство представляло собой явную реактивную систему, которая могла:
- обнаруживать и сохранять изменения нескольких цифровых входов
- собирать, фильтровать и сохранять несколько аналоговых сигналов.
- периодически отправлять сохраненную информацию на удаленный сервер
- отправлять и получать информацию по протоколу MQTT по сети GSM
MQTT был выбран потому, что это легкий протокол обмена сообщениями на основе издателя и подписчика, который обычно используется в IoT и сетевых приложениях, где ожидаются каналы с высокой задержкой и низкой скоростью передачи данных, например в сетях GSM.
Возможность MQTT для упомянутого устройства IoT была реализована с помощью модифицированной версии MQTT-C LiamBindle. Поскольку программное обеспечение этого устройства было разработано как реактивное программное обеспечение, MQTT-C пришлось модифицировать, чтобы обмениваться данными с остальной системой посредством асинхронных событий. Эти события использовались для приема и отправки трафика по сети, а также для подключения и публикации конфиденциальной информации на сервере. Получившаяся программная библиотека получила название MQTT-Reactive.
Конечный автомат
MQTT-Reactive использовался через конечный автомат, как показано на рисунке 1, который моделирует базовое поведение клиента MQTT-Reactive. Это был активный объект под названием MqttMgr (MQTT Manager). Действия конечного автомата на рисунке 1 демонстрируют, как можно использовать библиотеку MQTT-Reactive из конечного автомата. Несмотря на то, что язык C использовался в качестве языка действий на рисунке 1, можно использовать любой компьютерный или формальный язык.
щелкните, чтобы увеличить изображение
Рис. 1. Конечный автомат MQTT-Reactive клиента (Источник:VortexMakes)
Конечный автомат на рисунке 1 запускается в состоянии WaitingForNetConnection. После того, как сетевое соединение установлено с сервером, WaitingForNetConnection получает событие Activate, а затем конечный автомат переходит в состояние WaitingForSync. Только в этом состоянии конечный автомат может передавать сообщения MQTT брокеру, такие как CONNECT или PUBLISH, через события Connect и Publish соответственно. Состояние Sync использует специальный механизм UML для отсрочки события Publish, который задается ключевым словом defer, включенным во внутреннюю часть состояния Sync. Если событие публикации происходит, когда синхронизация является текущим состоянием, оно будет сохранено (отложено) для будущей обработки до тех пор, пока SM не перейдет в состояние, в котором событие публикации не находится в его списке отложенных событий, таком как WaitingForSync или WaitingForNetConnection. При входе в такие состояния конечный автомат автоматически вызывает любое сохраненное событие публикации и затем принимает или отбрасывает это событие в соответствии с целевым состоянием перехода.
Каждые миллисекунды SyncTime конечный автомат переходит в составное состояние Sync, которое выполняет фактическую отправку и получение трафика из сети путем отправки событий получения и отправки диспетчеру сети. Это параллельная организация, которая занимается проблемами сети.
Несмотря на то, что представленный MqttMgr поддерживает только пакеты CONNECT и PUBLISH, он может поддерживать пакет SUBSCRIBE с довольно простыми изменениями.
Действия конечного автомата получают доступ к параметрам потребляемого события с помощью ключевого слова params. Например, в следующем переходе событие Connect несет два параметра, clientId и keepAlive, значения которых используются для обновления соответствующих атрибутов объекта MqttMgr:
Connect (clientId, keepAlive) / me-> clientId =params-> clientId; мне-> keepAlive =params-> keepAlive; me-> operRes =mqtt_connect (&me-> client, me-> clientId, NULL, NULL, 0, NULL, NULL, 0, me-> keepAlive);
В этом примере событие Connect (clientId, keepAlive) является триггером перехода, а вызов mqtt_connect () является частью действия, которое выполняется в результате. Другими словами, когда объект MqttMgr получает событие Connect (clientId, keepAlive) с параметрами 'publishing_client' и '400', Connect («publishing_client», 400), атрибуты clientId и keepAlive MqttMgr обновляются со значениями ' Publish_client 'и' 400 'соответственно.
Для создания и отправки событий в действиях конечного автомата используется макрос GEN (). Например, следующий оператор отправляет событие Receive объекту Collector, на который указывает указатель itsCollector как на атрибут объекта MqttMgr:
GEN (me-> itsCollector, Receive ());
Первый аргумент оператора GEN () - это объект, который получает событие, тогда как второй аргумент - это отправляемое событие, включая аргументы события (если они есть). Аргументы должны соответствовать параметрам события. Например, следующий оператор генерирует событие ConnRefused (code) и отправляет его объекту Collector, передавая код, возвращенный брокером, в качестве параметра события:
GEN (me-> itsCollector, ConRefused (код));
Идея использования ключевого слова params для доступа к параметрам потребляемого события и макроса GEN () для генерации событий из действий была заимствована из генератора кода Rational Rhapsody Developer для чисто иллюстративных целей.
Действие конечного автомата по умолчанию на рисунке 1 устанавливает обратный вызов, который вызывается MQTT-Reactive всякий раз, когда от брокера принимается подтверждение соединения. Этот обратный вызов должен быть реализован в коде MqttMgr. Этот обратный вызов должен генерировать события ConnAccepted или ConnRefused (код) для отправки объекту Collector, как показано ниже.
статический недействительно connack_response_callback ( перечисление MQTTConnackReturnCode return_code) {/*...*/ если (return_code ==MQTT_CONNACK_ACCEPTED) {GEN (me-> itsCollector, ConnAccepted ()); } еще {GEN (me-> itsCollector, ConnRefused (return_code)); }}
Реализация модели
Модель на рис. 1 может быть реализована на C или C ++ с помощью вашего любимого программного инструмента или только вашей собственной реализации конечного автомата. Для этого в Интернете доступны различные инструменты, такие как RKH framework, QP framework, Yakindu Statechart Tool или Rational Rhapsody Developer и другие. Все они поддерживают диаграммы состояний и языки C / C ++. Более того, некоторые из них включают инструмент для рисования диаграммы состояний и генерации из нее кода.
Этот конечный автомат был запущен из активного объекта MqttMgr (MQTT Manager), который обеспечивал строгую инкапсуляцию кода MQTT-Reactive и был единственным объектом, которому разрешалось вызывать любую функцию MQTT-Reactive или получать доступ к данным MQTT-Reactive. Другие параллельные сущности в системе, а также любые ISR могли использовать MQTT-Reactive только косвенно, обмениваясь событиями с MqttMgr. Использование этого механизма для синхронизации параллельных объектов и обмена данными между ними позволяет избежать опасностей традиционных механизмов блокировки, таких как семафоры, мьютекс, задержки или флаги событий. Эти механизмы могут вызвать неожиданные сбои, которые сложно и утомительно диагностировать и исправлять.
Активный объект MqttMgr инкапсулирует свои атрибуты как набор элементов данных. Элемент данных обозначает переменную с именем и типом, где тип фактически является типом данных. Элемент данных для объекта MqttMgr отображается на член структуры объекта. Имя и тип члена такие же, как и у данных объекта. Например, атрибут client типа объекта MqttMgr внедряется по значению как элемент данных внутри структуры MqttMgr:
структура MqttMgr { / * ... * / структура mqtt_client клиент ; / * атрибут клиента * / LocalRecvAll localRecv; / * атрибут localRecv * / };
Доступ к данным объекта MqttMgr осуществляется напрямую, без использования операций доступа или мутатора. Например, доступ к клиенту и localRecv осуществляется через указатель me, который указывает на экземпляр MqttMgr.
mqtt_recvMsgError (&me-> client, &me-> localRecv);
MqttMgr имеет список атрибутов, показанный в таблице 1.
Таблица 1. Атрибуты MqttMgr
Структура на рисунке 2 полезна, чтобы иметь в виду отношения между заинтересованными сторонами. Это:объект Collector, который хочет отправить информацию брокеру; объект NetMgr, имеющий дело с сетью; и объект MqttMgr.
Рисунок 2. Проект структуры системы Интернета вещей (Источник:VortexMakes)
Диаграмма последовательности на рисунке 3 показывает, как объект MqttMgr взаимодействует с остальной частью системы, когда требуется открыть сеанс с сервером MQTT. На этой диаграмме состояние MqttMgr и асинхронные сообщения, которыми обмениваются, показаны между участниками Collector, MqttMgr и NetMgr.
Рисунок 3. Подключение к брокеру MQTT (Источник:VortexMakes)
После того, как объект NetMgr устанавливает сетевое соединение с брокером, первый пакет, отправленный от MqttMgr на сервер MQTT, должен быть пакетом CONNECT. Таким образом, субъект Collector отправляет событие Connect (clientId, keepAlive) субъекту MqttMgr. Это событие должно содержать идентификатор клиента и интервал времени активности. Если сервер принимает запрос на подключение, субъект MqttMgr отправляет событие ConnAccepted субъекту Collector, чтобы уведомить об этой ситуации. С этого момента субъект-коллектор может публиковать информационные сообщения для этого брокера.
Если сервер отклоняет запрос на соединение, субъект MqttMgr отправляет событие ConnRefused субъекту Collector. Это событие несет в себе код, который уведомляет о причине отклонения, как показано на рисунке 4. См. Раздел 3.2.2.3 MQTT v3.1.1.
Рис. 4. Брокер отклоняет запрос на подключение (Источник:VortexMakes)
На рисунке 5 показан поток взаимодействия при публикации сообщения. Для этого актор Collector отправляет событие Publish (data, size, topic, qos), которое несет информацию, которая должна быть опубликована (данные), длину информации в байтах (размер), название темы, к которой информация будет опубликована (тема) и уровень уверенности в доставке этого сообщения (qos). В ранее упомянутом устройстве IoT опубликованная информация была отформатирована с использованием спецификации JSON. Это открытый стандартный формат, который содержит объекты данных с парами атрибут-значение в удобочитаемом тексте. Этот формат был реализован с помощью jWrite, простой и легкой библиотеки, написанной на C.
Рисунок 5. Публикация данных брокеру (Источник:VortexMakes)
На рисунке 6 показан сценарий, в котором не удается получить и отправить сообщения MQTT в сеть. Если сетевой менеджер не может получать трафик из сети, он отправит сообщение ReceiveFail субъекту MqttMgr. Точно так же, если сетевой менеджер не может отправлять данные в сеть, он отправит SendFail актору MqttMgr.
Рисунок 6. Сбои в сети (Источник:VortexMakes)
В таблице 2 приведены соответствующие события в показанных сценариях.
Таблица 2. События
Заключение
Избегая опасностей традиционных механизмов блокировки, таких как семафоры, мьютексы, задержки или флаги событий, библиотека MQTT-Reactive, конечный автомат и архитектура программного обеспечения, предложенные в этой статье, позволяют реактивным встроенным системам реализовывать клиент MQTT в новаторской форме. способ. Это достигается путем инкапсуляции кода MQTT-Reactive в блок параллелизма, называемый активный объект . , поведение которого на основе состояний определено в предлагаемом конечном автомате. Этот активный объект взаимодействует с остальной системой путем обмена используемыми асинхронными событиями:не только для приема и отправки трафика по сети, но также для подключения и публикации информации на сервере для приложений Интернета вещей.
Встроенный
- Таксономия для IIoT
- Создание гибких производственных систем для Industrie 4.0
- Краткое описание технологии ИС для микроконтроллеров и встроенных систем
- Würth Elektronik eiSos представляет новые компоненты для интеллектуальных систем
- Разработка элементов управления двигателями для роботизированных систем
- Syslogic:защищенные компьютеры и системы HMI для строительной техники
- Вычислительная платформа Kontron и SYSGO:SAFe-VX для критически важных для безопасности систем
- Конфигурация желаемого состояния для цепей
- Предприятия устанавливают сроки для интеллектуальных систем
- 10 лучших рабочих процессов для производителей