gRPC Python — чтение и запись данных процесса
В этой статье описывается, как получить доступ и записать простые данные процесса с помощью Python с помощью AXC F 3152, использующего gRPC. (https://www.plcnext.help/te/Service_Components/gRPC_Introduction.htm)
Необходимое условие
Сначала мы должны подготовить необходимые файлы вне ПЛК, например, на компьютере с Windows.
- Установите Python 3.9 (версия 3.10 может вызвать ошибки)
- Установите необходимый пакет Python для создания кода из файлов .proto:
pip install grpcio-tools==1.36.1
- Загрузите и разархивируйте репозиторий, содержащий файлы .proto, с https://github.com/PLCnext/gRPC
Создать _pb2.py и _pb2_grpc.py из файлов .proto
Затем мы должны сгенерировать необходимые файлы Python из предоставленных файлов .proto. Последние находятся в следующей папке:gRPC-master/protobuf.
Используйте этот код для создания скрипта Python в папке gRPC-master, например, generate_grpc.py. Скрипт
- создает необходимые файлы и помещает их в gRPC-master/pxc_grpc
- изменить пути импорта
import glob
import os
from pathlib import Path
# create the output directory
Path('pxc_grpc').mkdir(parents=True, exist_ok=True)
grpc_command_base = 'python -m grpc_tools.protoc -I./protobuf --python_out=pxc_grpc --grpc_python_out=pxc_grpc '
import_paths = set()
# generate the *_pb2.py and *_pb2_grpc.py files
for filename in glob.iglob('./protobuf/**', recursive=True):
if filename.endswith('.proto'):
# store the import path
path_parts = filename.split(os.sep)
import_paths.add('.'.join(path_parts[1:-1]))
grpc_command = ''.join([grpc_command_base, os.path.join('.', os.path.relpath(filename))])
stream = os.popen(grpc_command)
output = stream.read()
if output != '':
print(''.join(['error/info for file ', os.path.relpath(filename), ' - ', output]))
# get the python files in the base directory
base_pys = set()
for (dirpath, dirnames, filenames) in os.walk('./pxc_grpc'):
for f in filenames:
base_pys.add(f.split('.py')[0])
break
# reformat the stored paths to adapt the import statements
try:
import_paths.remove('')
except:
pass
import_paths = list(import_paths)
import_paths.sort(key=len)
import_paths.reverse()
# adapt the imports
for filename in glob.iglob('./pxc_grpc/**', recursive=True):
if filename.endswith('.py'):
new_lines = []
with open(filename, 'r') as file:
lines = file.readlines()
for line in lines:
if line.startswith('from'):
for import_path in import_paths:
if import_path in line:
line = line.replace(import_path, ''.join(['pxc_grpc.', import_path]), 1)
break
elif line.startswith('import'):
parts = line.split()
if parts[1] in base_pys:
line = line.replace('import', 'from pxc_grpc import')
new_lines.append(line)
with open(filename, 'w') as file:
file.write(''.join(new_lines))
Откройте оболочку и выполните скрипт:pyton generate_grpc.py
Создание демонстрационного проекта PLCnext
Показанный проект должен только демонстрировать, как интерфейс gRPC взаимодействует с GDS. Вместо этого вы можете использовать существующий проект. Для отдельного проекта вам необходимо соответствующим образом отредактировать имена портов в следующем скрипте Python, например, Arp.Plc.Eclr/MainInstance.strInput
.

Подготовка ПЛК
Установите pip для управления пакетами Python:
- Подключите контроллер AXC F 3152 к Интернету.
- Введите команду curl https://bootstrap.pypa.io/get-pip.py -o get-pip.py.
- Затем введите команду python3 get-pip.py.
Установите необходимые пакеты:pip install grpcio protobuf==3.20.0
Создайте папку grpc_test в каталоге проектов (/opt/plcnext/projects/).
Скопируйте папку «pxc_grpc», содержащую созданные файлы Python, в «grpc_test», например, с помощью WinSCP.
Создайте скрипт Python в папке «grpc_test» с именем «grpc_test.py» и вставьте следующий код:
import grpc
from pxc_grpc.Plc.Gds.IDataAccessService_pb2 import IDataAccessServiceReadSingleRequest, \
IDataAccessServiceReadRequest, IDataAccessServiceWriteSingleRequest, IDataAccessServiceWriteRequest
from pxc_grpc.Plc.Gds.IDataAccessService_pb2_grpc import IDataAccessServiceStub
from pxc_grpc.Plc.Gds.WriteItem_pb2 import WriteItem
def write_single_string(stub, port_name, value):
single_write_request = IDataAccessServiceWriteSingleRequest()
single_write_request.data.PortName = port_name
single_write_request.data.Value.TypeCode = 19
single_write_request.data.Value.StringValue = value
return stub.WriteSingle(single_write_request)
def write_single_int(stub, port_name, value):
single_write_request = IDataAccessServiceWriteSingleRequest()
single_write_request.data.PortName = port_name
single_write_request.data.Value.TypeCode = 6
single_write_request.data.Value.Int16Value = value
return stub.WriteSingle(single_write_request)
def write_multiple_values(stub):
write_request = IDataAccessServiceWriteRequest()
wi1 = WriteItem()
wi1.PortName = 'Arp.Plc.Eclr/MainInstance.strInput'
wi1.Value.StringValue = "test1"
wi1.Value.TypeCode = 19
wi2 = WriteItem()
wi2.PortName = 'Arp.Plc.Eclr/MainInstance.strInput2'
wi2.Value.StringValue = "test2"
wi2.Value.TypeCode = 19
# add multiple WriteItems at once
write_request.data.extend([wi1, wi2])
# add WriteItems separately
# response1.data.append(wi1)
# response1.data.append(wi2)
return stub.Write(write_request)
def read_single_value(stub, port_name):
single_read_request = IDataAccessServiceReadSingleRequest()
single_read_request.portName=port_name
return stub.ReadSingle(single_read_request)
def read_multiple_values(stub, port_names):
read_request = IDataAccessServiceReadRequest()
read_request.portNames.extend(port_names)
return stub.Read(read_request)
if __name__ == "__main__":
# create channel and stub
channel = grpc.insecure_channel('unix:/run/plcnext/grpc.sock')
stub = IDataAccessServiceStub(channel)
print(write_single_string(stub, 'Arp.Plc.Eclr/MainInstance.strInput', 'test123'))
print(write_single_int(stub, 'Arp.Plc.Eclr/MainInstance.iInput', 18))
print(write_multiple_values(stub))
r = read_single_value(stub, 'Arp.Plc.Eclr/MainInstance.strInput')
print(r)
print(r._ReturnValue.Value.TypeCode)
print(r._ReturnValue.Value.StringValue)
r = read_multiple_values(stub, ['Arp.Plc.Eclr/MainInstance.iInput', 'Arp.Plc.Eclr/MainInstance.strInput'])
for value in r._ReturnValue:
print(value, value.Value.TypeCode)
Подключите ваш ПЛК к PLCnext Engineer, загрузите проект и запустите просмотр в реальном времени.
Запустите пример
Теперь запустите пример. Войдите в ПЛК через ssh и перейдите к 'grpc_test', затем запустите скрипт Python:
cd projects/grpc_test/
python3 grpc_test.py
gRPC позволяет взаимодействовать с переменными GDS.
Типы данных
Для чтения и записи переменных требуется тип данных, например, wi1.Value.TypeCode = 19
. Типы описаны в сгенерированном файле gRPC-master/pxc_grpc/ArpTypes_pb2.py
начиная со строки 242:
CT_None = 0
CT_End = 0
CT_Void = 1
CT_Boolean = 2
CT_Char = 3
CT_Int8 = 4
CT_Uint8 = 5
CT_Int16 = 6
CT_Uint16 = 7
CT_Int32 = 8
CT_Uint32 = 9
CT_Int64 = 10
CT_Uint64 = 11
CT_Real32 = 12
CT_Real64 = 13
CT_Struct = 18
CT_String = 19
CT_Utf8String = 19
CT_Array = 20
CT_DateTime = 23
CT_Version = 24
CT_Guid = 25
CT_AnsiString = 26
CT_Object = 28
CT_Utf16String = 30
CT_Stream = 34
CT_Enumerator = 35
CT_SecureString = 36
CT_Enum = 37
CT_Dictionary = 38
CT_SecurityToken = 39
CT_Exception = 40
CT_IecTime = 41
CT_IecTime64 = 42
CT_IecDate = 43
CT_IecDate64 = 44
CT_IecDateTime = 45
CT_IecDateTime64 = 46
CT_IecTimeOfDay = 47
CT_IecTimeOfDay64 = 48
Соответствующие переменные значения, например, r._ReturnValue.Value.StringValue
, можно найти в том же файле, начиная со строки 365, например, BoolValue
, Int8Value
, StringValue
.
Промышленные технологии
- Apacer:во всем мире выпущены карты CV110-SD и CV110-MSD
- Apacer:серия SSD промышленного класса SV250 со скоростью чтения / записи 560 и 520 МБ / с
- Как разобраться в больших данных:RTU и приложения для управления процессами
- Что такое фрезерование? - Определение, процесс и операции
- Что такое сверление? - определение, процесс и советы
- Что такое порошковая металлургия? - Определение и процесс
- Что такое протяжка? - Процесс, работа и типы
- Что такое химическая обработка? - Работа и процесс
- Что такое ультразвуковая обработка? - Работа и процесс
- Что такое сварка распылением? - Процесс и методы