detect-gui/components/camera.py

864 lines
31 KiB
Python
Raw Normal View History

2024-11-21 11:39:52 +08:00
from dynaconf.base import Settings
from core.edge_component import EdgeComponent, device, service
from device.mv.ImageConvert import *
from device.mv.MVSDK import *
import struct
import time
import datetime
import logging
from typing import List
from core.logging import logger
from PyQt5.QtCore import QObject, pyqtSignal
g_cameraStatusUserInfo = b"statusInfo"
class BITMAPFILEHEADER(Structure):
_fields_ = [
('bfType', c_ushort),
('bfSize', c_uint),
('bfReserved1', c_ushort),
('bfReserved2', c_ushort),
('bfOffBits', c_uint),
]
class BITMAPINFOHEADER(Structure):
_fields_ = [
('biSize', c_uint),
('biWidth', c_int),
('biHeight', c_int),
('biPlanes', c_ushort),
('biBitCount', c_ushort),
('biCompression', c_uint),
('biSizeImage', c_uint),
('biXPelsPerMeter', c_int),
('biYPelsPerMeter', c_int),
('biClrUsed', c_uint),
('biClrImportant', c_uint),
]
# 调色板只有8bit及以下才需要
class RGBQUAD(Structure):
_fields_ = [
('rgbBlue', c_ubyte),
('rgbGreen', c_ubyte),
('rgbRed', c_ubyte),
('rgbReserved', c_ubyte),
]
# 相机设备类
class CameraDevice:
def __init__(self, camera_id, key, vendor_name, model_name, serial_number):
self.camera_id = camera_id
self.key = key
self.vendor_name = vendor_name
self.model_name = model_name
self.serial_number = serial_number
def __repr__(self):
return (f"CameraDevice(camera_id={self.camera_id}, key={self.key}, "
f"vendor_name={self.vendor_name}, model_name={self.model_name}, "
f"serial_number={self.serial_number})")
class CameraSignals(QObject):
# QT 信号
on_device_list = pyqtSignal(list)
on_device_link = pyqtSignal(int, str)
on_device_change = pyqtSignal()
on_device_error = pyqtSignal()
on_device_data = pyqtSignal(object)
# pyqt的相机管理类
@device("camera", auto_start=True)
class Camera(EdgeComponent):
signals = CameraSignals()
def __init__(self, context):
super().__init__(context)
global mv_camera
mv_camera = self
self.devices = None
self.current_index = 0
self.streamSource = None
def configure(self, setting: Settings) -> None:
self.logger.info(f"Camera configure done.")
@service()
def start(self):
if MVSDKdll is None:
raise RuntimeError("相机设备SDK未正确加载")
camera_count = self.start_discovery()
if camera_count <= 0:
self.logger.info('no cameras found!')
else:
self.open_camera()
super().start()
self.logger.info("Camera start!")
@service()
def stop(self):
self.stop_camera()
super().stop()
self.logger.info("Camera stop!")
def start_discovery(self):
cameraCnt, cameraList = enumCameras()
if cameraCnt is None:
return -1
self.devices = cameraList
camera_devices = []
# 显示相机信息
for index in range(0, cameraCnt):
camera = cameraList[index]
self.logger.info("Camera Id = " + str(index))
self.logger.info("Key = " + str(camera.getKey(camera)))
self.logger.info("vendor name = " + str(camera.getVendorName(camera)))
self.logger.info("Model name = " + str(camera.getModelName(camera)))
self.logger.info("Serial number = " + str(camera.getSerialNumber(camera)))
camera_devices.append(CameraDevice(index, str(camera.getKey(camera)), str(camera.getVendorName(camera)),
str(camera.getModelName(camera)), str(camera.getSerialNumber(camera))))
self.signals.on_device_list.emit(camera_devices)
return cameraCnt
def open_camera(self, index = 0):
# 打开相机
camera = self.devices[index]
nRet = openCamera(camera)
if nRet != 0:
self.logger.error("openCamera fail.")
return -1
# 创建流对象
streamSourceInfo = GENICAM_StreamSourceInfo()
streamSourceInfo.channelId = 0
streamSourceInfo.pCamera = pointer(camera)
self.streamSource = pointer(GENICAM_StreamSource())
nRet = GENICAM_createStreamSource(pointer(streamSourceInfo), byref(self.streamSource))
if nRet != 0:
self.logger.error("create StreamSource fail!")
return -1
# 注册相机连接状态回调
nRet = subscribeCameraStatus(camera)
if nRet != 0:
self.logger.error("subscribeCameraStatus fail!")
return -1
self.current_index = index
return 0
def stop_camera(self):
camera = self.devices[self.current_index]
# 关闭相机
nRet = closeCamera(camera)
if nRet != 0:
self.logger.error("closeCamera fail")
# 释放相关资源
if self.streamSource is not None:
self.streamSource.contents.release(self.streamSource)
return -1
# 反向注册相机连接状态回调
nRet = unsubscribeCameraStatus(camera)
if nRet != 0:
self.logger.error("unsubscribeCameraStatus fail!")
return -1
# 释放相关资源
if self.streamSource is not None:
self.streamSource.contents.release(self.streamSource)
def set_exposure_time(self, exposure_time: int):
# 设置曝光时间
camera = self.devices[self.current_index]
setExposureTime(camera, exposure_time)
def set_soft_trigger(self):
# 设置软触发
camera = self.devices[self.current_index]
setSoftTriggerConf(camera)
def set_line_trigger(self):
# 设置外触发
camera = self.devices[self.current_index]
setLineTriggerConf(camera)
def set_roi(self, offset_x, offset_y, width, height):
# 设置相机ROI
camera = self.devices[self.current_index]
setROI(camera, offset_x, offset_y, width, height)
def grab_one(self):
# 设置软触发
camera = self.devices[self.current_index]
nRet = setSoftTriggerConf(camera)
if nRet != 0:
self.logger.error("set SoftTriggerConf fail!")
# 释放相关资源
self.streamSource.contents.release(self.streamSource)
return -1
else:
self.logger.info("set SoftTriggerConf success!")
# 开始拉流
nRet = self.streamSource.contents.startGrabbing(self.streamSource, c_ulonglong(0),
c_int(GENICAM_EGrabStrategy.grabStrartegySequential))
if nRet != 0:
self.logger.error("startGrabbing fail!")
# 释放相关资源
self.streamSource.contents.release(self.streamSource)
return -1
time.sleep(1)
# 软触发取一张图
nRet = grabOne(camera)
if nRet != 0:
self.logger.error("grabOne fail!")
# 释放相关资源
self.streamSource.contents.release(self.streamSource)
return -1
else:
self.logger.info("trigger time: " + str(datetime.datetime.now()))
# 主动取图
frame = pointer(GENICAM_Frame())
nRet = self.streamSource.contents.getFrame(self.streamSource, byref(frame), c_uint(100000))
if nRet != 0:
self.logger.error("SoftTrigger getFrame fail! timeOut [100000]ms")
# 释放相关资源
self.streamSource.contents.release(self.streamSource)
return -1
else:
self.logger.info("SoftTrigger getFrame success BlockId = " + str(frame.contents.getBlockId(frame)))
self.logger.info("get frame time: " + str(datetime.datetime.now()))
nRet = frame.contents.valid(frame)
if nRet != 0:
self.logger.error("frame is invalid!")
# 释放驱动图像缓存资源
frame.contents.release(frame)
# 释放相关资源
self.streamSource.contents.release(self.streamSource)
return -1
# 将裸数据图像拷出
imageSize = frame.contents.getImageSize(frame)
self.logger.info(f"imageSize is {imageSize}!")
buffAddr = frame.contents.getImage(frame)
frameBuff = c_buffer(b'\0', imageSize)
memmove(frameBuff, c_char_p(buffAddr), imageSize)
# 给转码所需的参数赋值
convertParams = IMGCNV_SOpenParam()
convertParams.dataSize = imageSize
convertParams.height = frame.contents.getImageHeight(frame)
convertParams.width = frame.contents.getImageWidth(frame)
convertParams.paddingX = frame.contents.getImagePaddingX(frame)
convertParams.paddingY = frame.contents.getImagePaddingY(frame)
convertParams.pixelForamt = frame.contents.getImagePixelFormat(frame)
# 释放驱动图像缓存
frame.contents.release(frame)
# 保存bmp图片
bmpInfoHeader = BITMAPINFOHEADER()
bmpFileHeader = BITMAPFILEHEADER()
uRgbQuadLen = 0
rgbQuad = (RGBQUAD * 256)() # 调色板信息
rgbBuff = c_buffer(b'\0', convertParams.height * convertParams.width * 3)
# 如果图像格式是 Mono8 不需要转码
if convertParams.pixelForamt == EPixelType.gvspPixelMono8:
# 初始化调色板rgbQuad 实际应用中 rgbQuad 只需初始化一次
for i in range(0, 256):
rgbQuad[i].rgbBlue = rgbQuad[i].rgbGreen = rgbQuad[i].rgbRed = i
uRgbQuadLen = sizeof(RGBQUAD) * 256
bmpFileHeader.bfSize = sizeof(bmpFileHeader) + sizeof(bmpInfoHeader) + uRgbQuadLen + convertParams.dataSize
bmpInfoHeader.biBitCount = 8
else:
# 转码 => BGR24
rgbSize = c_int()
nRet = IMGCNV_ConvertToBGR24(cast(frameBuff, c_void_p), byref(convertParams),
cast(rgbBuff, c_void_p), byref(rgbSize))
if nRet != 0:
mv_camera.logger.error("image convert fail! errorCode = " + str(nRet))
# 释放相关资源
self.streamSource.contents.release(self.streamSource)
return -1
bmpFileHeader.bfSize = sizeof(bmpFileHeader) + sizeof(bmpInfoHeader) + rgbSize.value
bmpInfoHeader.biBitCount = 24
bmpFileHeader.bfType = 0x4D42 # 文件头类型 'BM'(42 4D)
bmpFileHeader.bfReserved1 = 0 # 保留字
bmpFileHeader.bfReserved2 = 0 # 保留字
bmpFileHeader.bfOffBits = 54 + uRgbQuadLen # 位图像素数据的起始位置
bmpInfoHeader.biSize = 40 # 信息头所占字节数
bmpInfoHeader.biWidth = convertParams.width
bmpInfoHeader.biHeight = -convertParams.height
bmpInfoHeader.biPlanes = 1 # 位图平面数
bmpInfoHeader.biCompression = 0 # 压缩类型0 即不压缩
bmpInfoHeader.biSizeImage = 0
bmpInfoHeader.biXPelsPerMeter = 0
bmpInfoHeader.biYPelsPerMeter = 0
bmpInfoHeader.biClrUsed = 0
bmpInfoHeader.biClrImportant = 0
fileName = './image.bmp'
imageFile = open(fileName, 'wb+')
imageFile.write(struct.pack('H', bmpFileHeader.bfType))
imageFile.write(struct.pack('I', bmpFileHeader.bfSize))
imageFile.write(struct.pack('H', bmpFileHeader.bfReserved1))
imageFile.write(struct.pack('H', bmpFileHeader.bfReserved2))
imageFile.write(struct.pack('I', bmpFileHeader.bfOffBits))
imageFile.write(struct.pack('I', bmpInfoHeader.biSize))
imageFile.write(struct.pack('i', bmpInfoHeader.biWidth))
imageFile.write(struct.pack('i', bmpInfoHeader.biHeight))
imageFile.write(struct.pack('H', bmpInfoHeader.biPlanes))
imageFile.write(struct.pack('H', bmpInfoHeader.biBitCount))
imageFile.write(struct.pack('I', bmpInfoHeader.biCompression))
imageFile.write(struct.pack('I', bmpInfoHeader.biSizeImage))
imageFile.write(struct.pack('i', bmpInfoHeader.biXPelsPerMeter))
imageFile.write(struct.pack('i', bmpInfoHeader.biYPelsPerMeter))
imageFile.write(struct.pack('I', bmpInfoHeader.biClrUsed))
imageFile.write(struct.pack('I', bmpInfoHeader.biClrImportant))
if convertParams.pixelForamt == EPixelType.gvspPixelMono8:
# 写入调色板信息
for i in range(0, 256):
imageFile.write(struct.pack('B', rgbQuad[i].rgbBlue))
imageFile.write(struct.pack('B', rgbQuad[i].rgbGreen))
imageFile.write(struct.pack('B', rgbQuad[i].rgbRed))
imageFile.write(struct.pack('B', rgbQuad[i].rgbReserved))
imageFile.writelines(frameBuff)
else:
imageFile.writelines(rgbBuff)
imageFile.close()
self.logger.info("save " + fileName + " success.")
self.logger.info("save bmp time: " + str(datetime.datetime.now()))
# 停止拉流
nRet = self.streamSource.contents.stopGrabbing(self.streamSource)
if nRet != 0:
self.logger.error("stopGrabbing fail!")
# 释放相关资源
self.streamSource.contents.release(self.streamSource)
return -1
# 相机管理类实例
mv_camera = None
# 取流回调函数
def onGetFrame(frame):
nRet = frame.contents.valid(frame)
if nRet != 0:
mv_camera.logger.error("frame is invalid!")
# 释放驱动图像缓存资源
frame.contents.release(frame)
return
mv_camera.logger.info("BlockId = %d" %(frame.contents.getBlockId(frame)))
#此处客户应用程序应将图像拷贝出使用
'''
'''
# 释放驱动图像缓存资源
frame.contents.release(frame)
# 取流回调函数Ex
def onGetFrameEx(frame, userInfo):
nRet = frame.contents.valid(frame)
if ( nRet != 0):
mv_camera.logger.error("frame is invalid!")
# 释放驱动图像缓存资源
frame.contents.release(frame)
return
mv_camera.logger.info("BlockId = %d userInfo = %s" %(frame.contents.getBlockId(frame), c_char_p(userInfo).value))
#此处客户应用程序应将图像拷贝出使用
'''
'''
# 释放驱动图像缓存资源
frame.contents.release(frame)
# 相机连接状态回调函数
def deviceLinkNotify(connectArg, linkInfo):
mv_camera.on_device_link.emit(connectArg.contents.m_event, c_char_p(linkInfo).value.decode('utf-8'))
if EVType.offLine == connectArg.contents.m_event:
mv_camera.logger.info("camera has off line, userInfo [%s]" %(c_char_p(linkInfo).value))
elif EVType.onLine == connectArg.contents.m_event:
mv_camera.logger.info("camera has on line, userInfo [%s]" %(c_char_p(linkInfo).value))
connectCallBackFuncEx = connectCallBackEx(deviceLinkNotify)
frameCallbackFunc = callbackFunc(onGetFrame)
frameCallbackFuncEx = callbackFuncEx(onGetFrameEx)
# 注册相机连接状态回调
def subscribeCameraStatus(camera):
# 注册上下线通知
eventSubscribe = pointer(GENICAM_EventSubscribe())
eventSubscribeInfo = GENICAM_EventSubscribeInfo()
eventSubscribeInfo.pCamera = pointer(camera)
nRet = GENICAM_createEventSubscribe(byref(eventSubscribeInfo), byref(eventSubscribe))
if nRet != 0:
mv_camera.logger.error("create eventSubscribe fail!")
return -1
nRet = eventSubscribe.contents.subscribeConnectArgsEx(eventSubscribe, connectCallBackFuncEx, g_cameraStatusUserInfo)
if nRet != 0:
mv_camera.logger.error("subscribeConnectArgsEx fail!")
# 释放相关资源
eventSubscribe.contents.release(eventSubscribe)
return -1
# 不再使用时,需释放相关资源
eventSubscribe.contents.release(eventSubscribe)
return 0
# 反注册相机连接状态回调
def unsubscribeCameraStatus(camera):
# 反注册上下线通知
eventSubscribe = pointer(GENICAM_EventSubscribe())
eventSubscribeInfo = GENICAM_EventSubscribeInfo()
eventSubscribeInfo.pCamera = pointer(camera)
nRet = GENICAM_createEventSubscribe(byref(eventSubscribeInfo), byref(eventSubscribe))
if nRet != 0:
mv_camera.logger.error("create eventSubscribe fail!")
return -1
nRet = eventSubscribe.contents.unsubscribeConnectArgsEx(eventSubscribe, connectCallBackFuncEx, g_cameraStatusUserInfo)
if nRet != 0:
mv_camera.logger.error("unsubscribeConnectArgsEx fail!")
# 释放相关资源
eventSubscribe.contents.release(eventSubscribe)
return -1
# 不再使用时,需释放相关资源
eventSubscribe.contents.release(eventSubscribe)
return 0
# 设置软触发
def setSoftTriggerConf(camera):
# 创建control节点
acqCtrlInfo = GENICAM_AcquisitionControlInfo()
acqCtrlInfo.pCamera = pointer(camera)
acqCtrl = pointer(GENICAM_AcquisitionControl())
nRet = GENICAM_createAcquisitionControl(pointer(acqCtrlInfo), byref(acqCtrl))
if nRet != 0:
mv_camera.logger.error("create AcquisitionControl fail!")
return -1
# 设置触发源为软触发
trigSourceEnumNode = acqCtrl.contents.triggerSource(acqCtrl)
nRet = trigSourceEnumNode.setValueBySymbol(byref(trigSourceEnumNode), b"Software")
if nRet != 0:
mv_camera.logger.error("set TriggerSource value [Software] fail!")
# 释放相关资源
trigSourceEnumNode.release(byref(trigSourceEnumNode))
acqCtrl.contents.release(acqCtrl)
return -1
# 需要释放Node资源
trigSourceEnumNode.release(byref(trigSourceEnumNode))
# 设置触发方式
trigSelectorEnumNode = acqCtrl.contents.triggerSelector(acqCtrl)
nRet = trigSelectorEnumNode.setValueBySymbol(byref(trigSelectorEnumNode), b"FrameStart")
if nRet != 0:
mv_camera.logger.error("set TriggerSelector value [FrameStart] fail!")
# 释放相关资源
trigSelectorEnumNode.release(byref(trigSelectorEnumNode))
acqCtrl.contents.release(acqCtrl)
return -1
# 需要释放Node资源
trigSelectorEnumNode.release(byref(trigSelectorEnumNode))
# 打开触发模式
trigModeEnumNode = acqCtrl.contents.triggerMode(acqCtrl)
nRet = trigModeEnumNode.setValueBySymbol(byref(trigModeEnumNode), b"On")
if nRet != 0:
mv_camera.logger.error("set TriggerMode value [On] fail!")
# 释放相关资源
trigModeEnumNode.release(byref(trigModeEnumNode))
acqCtrl.contents.release(acqCtrl)
return -1
# 需要释放相关资源
trigModeEnumNode.release(byref(trigModeEnumNode))
acqCtrl.contents.release(acqCtrl)
return 0
# 设置外触发
def setLineTriggerConf(camera):
# 创建control节点
acqCtrlInfo = GENICAM_AcquisitionControlInfo()
acqCtrlInfo.pCamera = pointer(camera)
acqCtrl = pointer(GENICAM_AcquisitionControl())
nRet = GENICAM_createAcquisitionControl(pointer(acqCtrlInfo), byref(acqCtrl))
if nRet != 0:
mv_camera.logger.error("create AcquisitionControl fail!")
return -1
# 设置触发源为软触发
trigSourceEnumNode = acqCtrl.contents.triggerSource(acqCtrl)
nRet = trigSourceEnumNode.setValueBySymbol(byref(trigSourceEnumNode), b"Line1")
if nRet != 0:
mv_camera.logger.error("set TriggerSource value [Line1] fail!")
# 释放相关资源
trigSourceEnumNode.release(byref(trigSourceEnumNode))
acqCtrl.contents.release(acqCtrl)
return -1
# 需要释放Node资源
trigSourceEnumNode.release(byref(trigSourceEnumNode))
# 设置触发方式
trigSelectorEnumNode = acqCtrl.contents.triggerSelector(acqCtrl)
nRet = trigSelectorEnumNode.setValueBySymbol(byref(trigSelectorEnumNode), b"FrameStart")
if nRet != 0:
mv_camera.logger.error("set TriggerSelector value [FrameStart] fail!")
# 释放相关资源
trigSelectorEnumNode.release(byref(trigSelectorEnumNode))
acqCtrl.contents.release(acqCtrl)
return -1
# 需要释放Node资源
trigSelectorEnumNode.release(byref(trigSelectorEnumNode))
# 打开触发模式
trigModeEnumNode = acqCtrl.contents.triggerMode(acqCtrl)
nRet = trigModeEnumNode.setValueBySymbol(byref(trigModeEnumNode), b"On")
if nRet != 0:
mv_camera.logger.error("set TriggerMode value [On] fail!")
# 释放相关资源
trigModeEnumNode.release(byref(trigModeEnumNode))
acqCtrl.contents.release(acqCtrl)
return -1
# 需要释放Node资源
trigModeEnumNode.release(byref(trigModeEnumNode))
# 设置触发沿
trigActivationEnumNode = acqCtrl.contents.triggerActivation(acqCtrl)
nRet = trigActivationEnumNode.setValueBySymbol(byref(trigActivationEnumNode), b"RisingEdge")
if nRet != 0:
mv_camera.logger.error("set TriggerActivation value [RisingEdge] fail!")
# 释放相关资源
trigActivationEnumNode.release(byref(trigActivationEnumNode))
acqCtrl.contents.release(acqCtrl)
return -1
# 需要释放Node资源
trigActivationEnumNode.release(byref(trigActivationEnumNode))
acqCtrl.contents.release(acqCtrl)
return 0
# 打开相机
def openCamera(camera):
# 连接相机
nRet = camera.connect(camera, c_int(GENICAM_ECameraAccessPermission.accessPermissionControl))
if nRet != 0:
mv_camera.logger.error("camera connect fail!")
return -1
else:
mv_camera.logger.info("camera connect success.")
# # 注册相机连接状态回调
# nRet = subscribeCameraStatus(camera)
# if nRet != 0:
# mv_camera.logger.error("subscribeCameraStatus fail!")
# return -1
return 0
# 关闭相机
def closeCamera(camera):
# 反注册相机连接状态回调
nRet = unsubscribeCameraStatus(camera)
if nRet != 0:
mv_camera.logger.error("unsubscribeCameraStatus fail!")
return -1
# 断开相机
# nRet = camera.disConnect(byref(camera))
# if nRet != 0:
# mv_camera.logger.error("disConnect camera fail!")
# return -1
return 0
# 设置曝光
def setExposureTime(camera, dVal):
# 通用属性设置:设置曝光 --根据属性类型,直接构造属性节点。如曝光是 double类型构造doubleNode节点
exposureTimeNode = pointer(GENICAM_DoubleNode())
exposureTimeNodeInfo = GENICAM_DoubleNodeInfo()
exposureTimeNodeInfo.pCamera = pointer(camera)
exposureTimeNodeInfo.attrName = b"ExposureTime"
nRet = GENICAM_createDoubleNode(byref(exposureTimeNodeInfo), byref(exposureTimeNode))
if nRet != 0:
mv_camera.logger.error("create ExposureTime Node fail!")
return -1
# 设置曝光时间
nRet = exposureTimeNode.contents.setValue(exposureTimeNode, c_double(dVal))
if nRet != 0:
mv_camera.logger.error("set ExposureTime value [%f]us fail!" % (dVal))
# 释放相关资源
exposureTimeNode.contents.release(exposureTimeNode)
return -1
else:
mv_camera.logger.info("set ExposureTime value [%f]us success." % (dVal))
# 释放节点资源
exposureTimeNode.contents.release(exposureTimeNode)
return 0
# 枚举相机
def enumCameras():
# 获取系统单例
system = pointer(GENICAM_System())
nRet = GENICAM_getSystemInstance(byref(system))
if nRet != 0:
mv_camera.logger.error("getSystemInstance fail!")
return None, None
# 发现相机
cameraList = pointer(GENICAM_Camera())
cameraCnt = c_uint()
nRet = system.contents.discovery(system, byref(cameraList), byref(cameraCnt), c_int(GENICAM_EProtocolType.typeAll));
if nRet != 0:
mv_camera.logger.error("discovery fail!")
return None, None
elif cameraCnt.value < 1:
mv_camera.logger.info("discovery no camera!")
return None, None
else:
mv_camera.logger.info("cameraCnt: " + str(cameraCnt.value))
return cameraCnt.value, cameraList
def grabOne(camera):
# 创建流对象
streamSourceInfo = GENICAM_StreamSourceInfo()
streamSourceInfo.channelId = 0
streamSourceInfo.pCamera = pointer(camera)
streamSource = pointer(GENICAM_StreamSource())
nRet = GENICAM_createStreamSource(pointer(streamSourceInfo), byref(streamSource))
if nRet != 0:
mv_camera.logger.error("create StreamSource fail!")
return -1
# 创建control节点
acqCtrlInfo = GENICAM_AcquisitionControlInfo()
acqCtrlInfo.pCamera = pointer(camera)
acqCtrl = pointer(GENICAM_AcquisitionControl())
nRet = GENICAM_createAcquisitionControl(pointer(acqCtrlInfo), byref(acqCtrl))
if nRet != 0:
mv_camera.logger.error("create AcquisitionControl fail!")
# 释放相关资源
streamSource.contents.release(streamSource)
return -1
# 执行一次软触发
trigSoftwareCmdNode = acqCtrl.contents.triggerSoftware(acqCtrl)
nRet = trigSoftwareCmdNode.execute(byref(trigSoftwareCmdNode))
if nRet != 0:
mv_camera.logger.error("Execute triggerSoftware fail!")
# 释放相关资源
trigSoftwareCmdNode.release(byref(trigSoftwareCmdNode))
acqCtrl.contents.release(acqCtrl)
streamSource.contents.release(streamSource)
return -1
# 释放相关资源
trigSoftwareCmdNode.release(byref(trigSoftwareCmdNode))
acqCtrl.contents.release(acqCtrl)
streamSource.contents.release(streamSource)
return 0
# 设置感兴趣区域 --- 感兴趣区域的宽高 和 xy方向的偏移量 入参值应符合对应相机的递增规则
def setROI(camera, OffsetX, OffsetY, nWidth, nHeight):
#获取原始的宽度
widthMaxNode = pointer(GENICAM_IntNode())
widthMaxNodeInfo = GENICAM_IntNodeInfo()
widthMaxNodeInfo.pCamera = pointer(camera)
widthMaxNodeInfo.attrName = b"WidthMax"
nRet = GENICAM_createIntNode(byref(widthMaxNodeInfo), byref(widthMaxNode))
if nRet != 0:
mv_camera.logger.error("create WidthMax Node fail!")
return -1
oriWidth = c_longlong()
nRet = widthMaxNode.contents.getValue(widthMaxNode, byref(oriWidth))
if nRet != 0:
mv_camera.logger.error("widthMaxNode getValue fail!")
# 释放相关资源
widthMaxNode.contents.release(widthMaxNode)
return -1
# 释放相关资源
widthMaxNode.contents.release(widthMaxNode)
# 获取原始的高度
heightMaxNode = pointer(GENICAM_IntNode())
heightMaxNodeInfo = GENICAM_IntNodeInfo()
heightMaxNodeInfo.pCamera = pointer(camera)
heightMaxNodeInfo.attrName = b"HeightMax"
nRet = GENICAM_createIntNode(byref(heightMaxNodeInfo), byref(heightMaxNode))
if nRet != 0:
mv_camera.logger.error("create HeightMax Node fail!")
return -1
oriHeight = c_longlong()
nRet = heightMaxNode.contents.getValue(heightMaxNode, byref(oriHeight))
if nRet != 0:
mv_camera.logger.error("heightMaxNode getValue fail!")
# 释放相关资源
heightMaxNode.contents.release(heightMaxNode)
return -1
# 释放相关资源
heightMaxNode.contents.release(heightMaxNode)
# 检验参数
if (oriWidth.value < (OffsetX + nWidth)) or (oriHeight.value < (OffsetY + nHeight)):
mv_camera.logger.error("please check input param!")
return -1
# 设置宽度
widthNode = pointer(GENICAM_IntNode())
widthNodeInfo = GENICAM_IntNodeInfo()
widthNodeInfo.pCamera = pointer(camera)
widthNodeInfo.attrName = b"Width"
nRet = GENICAM_createIntNode(byref(widthNodeInfo), byref(widthNode))
if nRet != 0:
mv_camera.logger.error("create Width Node fail!")
return -1
nRet = widthNode.contents.setValue(widthNode, c_longlong(nWidth))
if nRet != 0:
mv_camera.logger.error("widthNode setValue [%d] fail!" % (nWidth))
# 释放相关资源
widthNode.contents.release(widthNode)
return -1
# 释放相关资源
widthNode.contents.release(widthNode)
# 设置高度
heightNode = pointer(GENICAM_IntNode())
heightNodeInfo = GENICAM_IntNodeInfo()
heightNodeInfo.pCamera = pointer(camera)
heightNodeInfo.attrName = b"Height"
nRet = GENICAM_createIntNode(byref(heightNodeInfo), byref(heightNode))
if nRet != 0:
mv_camera.logger.error("create Height Node fail!")
return -1
nRet = heightNode.contents.setValue(heightNode, c_longlong(nHeight))
if nRet != 0:
mv_camera.logger.error("heightNode setValue [%d] fail!" % (nHeight))
# 释放相关资源
heightNode.contents.release(heightNode)
return -1
# 释放相关资源
heightNode.contents.release(heightNode)
# 设置OffsetX
OffsetXNode = pointer(GENICAM_IntNode())
OffsetXNodeInfo = GENICAM_IntNodeInfo()
OffsetXNodeInfo.pCamera = pointer(camera)
OffsetXNodeInfo.attrName = b"OffsetX"
nRet = GENICAM_createIntNode(byref(OffsetXNodeInfo), byref(OffsetXNode))
if nRet != 0:
mv_camera.logger.error("create OffsetX Node fail!")
return -1
nRet = OffsetXNode.contents.setValue(OffsetXNode, c_longlong(OffsetX))
if nRet != 0:
mv_camera.logger.error("OffsetX setValue [%d] fail!" % (OffsetX))
# 释放相关资源
OffsetXNode.contents.release(OffsetXNode)
return -1
# 释放相关资源
OffsetXNode.contents.release(OffsetXNode)
# 设置OffsetY
OffsetYNode = pointer(GENICAM_IntNode())
OffsetYNodeInfo = GENICAM_IntNodeInfo()
OffsetYNodeInfo.pCamera = pointer(camera)
OffsetYNodeInfo.attrName = b"OffsetY"
nRet = GENICAM_createIntNode(byref(OffsetYNodeInfo), byref(OffsetYNode))
if nRet != 0:
mv_camera.logger.error("create OffsetY Node fail!")
return -1
nRet = OffsetYNode.contents.setValue(OffsetYNode, c_longlong(OffsetY))
if nRet != 0:
mv_camera.logger.error("OffsetY setValue [%d] fail!" % (OffsetY))
# 释放相关资源
OffsetYNode.contents.release(OffsetYNode)
return -1
# 释放相关资源
OffsetYNode.contents.release(OffsetYNode)
return 0
def demo():
# 发现相机
cameraCnt, cameraList = enumCameras()
if cameraCnt is None:
return -1
# 显示相机信息
for index in range(0, cameraCnt):
camera = cameraList[index]
print("\nCamera Id = " + str(index))
print("Key = " + str(camera.getKey(camera)))
print("vendor name = " + str(camera.getVendorName(camera)))
print("Model name = " + str(camera.getModelName(camera)))
print("Serial number = " + str(camera.getSerialNumber(camera)))
camera = cameraList[0]
return 0
if __name__=="__main__":
pass
# camera_count = mv_camera.start_discovery()
# if camera_count <= 0:
# mv_camera.logger.info('no cameras found!')
# else:
# mv_camera.open()
# mv_camera.set_roi(0, 0, 3000, 3000)
# mv_camera.grab_one()
#
# time.sleep(2)
#
# mv_camera.set_exposure_time(20000)
# mv_camera.stop()
# # 3s exit
# time.sleep(3)