Подключите PLCnext Control через MQTT к Apache Kafka
Техническая информация
Кафка
Apache Kafka — это платформа для приема, хранения, обработки и распространения данных. В настоящее время он широко используется в компаниях по всему миру. Официальный сайт Kafka предлагает больше информации о своей идее и о том, как ее развернуть. Одной из его ключевых особенностей является огромное количество уже существующих коннекторов для других приложений и протоколов связи, таких как MQTT.
MQTT
MQTT — это облегченный протокол обмена сообщениями на основе TCP, часто используемый для связи IoT из-за его надежности и небольшого размера. Подробности о стандарте OASIS MQTT можно найти на его веб-сайте.
Здесь вы можете найти статью в блоге Makers о том, как кросс-компилировать mosquitto для PLCnext, реализации MQTT от Eclipse. В качестве альтернативы магазин PLCnext Store предлагает готовые приложения MQTT.
Требования
- Клиент MQTT на PLCnext (подсказки по реализации см. в предыдущем разделе)
- контроллер подключен к ПК/ВМ
- Брокер MQTT на ПК/ВМ (например, mosquitto)
- Экземпляр Kafka на ПК/ВМ (см. краткое руководство по Kafka)
Настройка
На следующем рисунке показан обзор настройки, которую мы собираемся реализовать для приема данных из элемента управления PLCnext в Kafka. Хотя для их версии Kafka (2) можно использовать MQTT-прокси Confluent, мы сосредоточимся на более общем решении (1). Он состоит из брокера MQTT, к которому клиент подключается и публикует сообщения, и коннектора, который подписывается на тему в брокере, обрабатывает сообщения и перенаправляет их в Kafka.
Создание соединителя
В этом руководстве наш коннектор основан на репозитории evokly/kafka-connect-mqtt из GitHub под лицензией MIT License (подробная информация о лицензии). Сначала мы загружаем и извлекаем репозиторий. Поскольку последняя версия репозитория относится к концу 2016 года, мы обновляем build.gradle
файла, заменив старые зависимости их новыми версиями:
ext { kafkaVersion = '2.6.0' }
...
dependencies {
testCompile group: 'junit', name: 'junit', version: '4.13'
compile "org.apache.kafka:connect-api:$kafkaVersion"
compile 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.2.5'
compile 'org.bouncycastle:bcprov-jdk15on:1.67'
compile 'org.bouncycastle:bcpkix-jdk15on:1.67'
compile 'org.bouncycastle:bcpg-jdk15on:1.67'
compile 'commons-io:commons-io:2.8.0'
compile 'org.slf4j:slf4j-api:1.7.30'
testCompile 'org.slf4j:slf4j-simple:1.7.30'
}
В этом примере мы будем отправлять в Kafka простые сообщения типа String. Поэтому нам нужно отредактировать класс Java DumbProcessor.java
в папке /kafka-connect-mqtt-master/src/main/java/com/evokly/kafka/connect/mqtt
, который является обработчиком сообщений по умолчанию:
@Override
public SourceRecord[] getRecords(String kafkaTopic) {
return new SourceRecord[]{new SourceRecord(null, //sourcePartition
null, //sourceOffset
kafkaTopic, //topic
null, //partition
null, //keySchema
mTopic, //key
null, //valueSchema
mMessage.toString(), //value
new Long(123L))}; //long timestamp
}
После этого мы создаем архивный файл Java (JAR), который содержит зависимости:./gradlew clean jar
. Мы копируем выходной JAR kafka-connect-mqtt-1.1-SNAPSHOT.jar
который можно найти в папке /kafka-connect-mqtt-master/build/libs
к libs
каталог Кафки.
Нам также понадобится копия архива org.eclipse.paho.client.mqttv3-1.2.5.jar в каталоге libs Kafka. Мы можем скачать его здесь.
Кроме того, мы должны создать файл конфигурации для коннектора mqtt.properties
в config
Кафки папка. Файл имеет следующее содержимое:
name=mqtt
connector.class=com.evokly.kafka.connect.mqtt.MqttSourceConnector
tasks.max=1
# converters for plain String messages without schemas
key.converter = org.apache.kafka.connect.storage.StringConverter
value.converter = org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
kafka.topic=test_in # Kafka destination topic for the MQTT messages
mqtt.client_id=mqtt-kafka-123
mqtt.clean_session=true
mqtt.connection_timeout=30
mqtt.keep_alive_interval=60
mqtt.server_uris=tcp://172.17.0.1:1883 # address of the MQTT broker
mqtt.topic=test/# # MQTT topic where the messages should be collected
#if we want to use our own processor class
#message_processor_class=com.evokly.kafka.connect.mqtt.sample.OwnProcessor
Локальный тест
Теперь мы можем протестировать наш коннектор локально. Перейдите в каталог Kafka и запустите экземпляр ZooKeeper и Broker:
# start ZooKeeper
bin/zookeeper-server-start.sh config/zookeeper.properties
# start Kafka:
bin/kafka-server-start.sh config/server.properties
# start an MQTT-Broker (here a mosquitto docker container)
sudo docker run -d --name mosquitto -p 1883:1883 eclipse-mosquitto
# start the MQTT-Kafka connector
bin/connect-standalone.sh config/connect-standalone.properties config/mqtt.properties
# start a Kafka console consumer
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test_in --from-beginning --property print.value=true --property print.timestamp=true
# publish an MQTT message
mosquitto_pub -h 172.17.0.1 -p 1883 -t test/1 -m test123
Сообщение отображается в потребителе консоли.
Промышленные технологии
- Цепи управления двигателем
- Цепи управления
- Eclipse Hono поддерживает Apache Kafka для обмена сообщениями
- 5 преимуществ дистанционного управления производством
- Управление импедансом переходных отверстий и его влияние на целостность сигнала при проектировании печатны…
- Управление устройством PLCnext Control через SNMP
- Управление кластером на PLCnext?
- Информационная панель PLCnext Tableau
- Отчеты PLCnext Power BI
- Java-приложение на PLCnext Control