This commit is contained in:
Tiago Conceição 2026-03-02 22:35:39 +01:00 committed by GitHub
commit 3511ca763e
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
13 changed files with 1725 additions and 27 deletions

1
.gitignore vendored
View file

@ -1,6 +1,7 @@
out
*.so
*.pyc
.idea
.config
.config.old
klippy/.version

View file

@ -0,0 +1,15 @@
[output_pin msla_uvled]
pin: PA5
[msla_display]
model: TM089CFSP01
pixel_format: Mono
framebuffer_index: 0
resolution_x: 3840
resolution_y: 2400
pixel_width: 0.05
pixel_height: 0.05
uvled_output_pin_name: msla_uvled
[printer]
manufacturing_process: mSLA

View file

@ -4965,6 +4965,59 @@ information on menu attributes available during template rendering.
# mode start or end.
```
### [framebuffer_display]
Support for a HDMI and DSI display attached to the micro-controller accessible
via the framebuffer devices (/dev/fb*).
See the [command reference](G-Codes.md#framebuffer_display) for
more information.
```
[framebuffer_display my_display]
model: LS055R1SX04
# The display model, informative only.
pixel_format: RGB
# The display pixel format, possible values:
# Mono, RGB, BGR
# Default: RGB
framebuffer_index: 0
# The framebuffer index, this maps to /dev/fb<index>
resolution_x: 1440
resolution_y: 2560
# The display resolution, these values are compared and validated against the
# the framebuffer device information.
pixel_width: 0.01575
pixel_height: 0.04725
# The pixel pitch/size in millimeters, these values are used to calculate
# the display size accordingly.
```
### [msla_display]
Support for a mSLA display that will be used to display images and cure resin
by lit a UV LED under the display pixels.
For the complete configuration follow the
[framebuffer_display](Config_Reference.md#framebuffer_display)
See the [command reference](G-Codes.md#msla_display) for more information.
```
[msla_display]
# Complete with framebuffer_display config in addition to:
buffer_cache_count: 1
# Number of images to cache while printing, cached buffers cut the processing
# time and render the image faster into the display. Default: 1
uvled_output_pin_name: msla_uvled
# The configurated UV LED [output_pin msla_uvled] or [pwm_tool msla_uvled],
# either section must exist and be configured to attach a UV LED to this
# display. Default: msla_uvled
uvled_response_delay: 0
# The UV LED response delay in milliseconds, often this is the time the
# micro-controller takes to switch the UV LED pin. This offset will subtract
# to a defined exposure time when issue the M1400 Sx Px command. Default: 0
```
## Filament sensors
### [filament_switch_sensor]

View file

@ -336,6 +336,57 @@ Also provided is the following extended G-Code command:
setting the supplied `MSG` as the current display message. If
`MSG` is omitted the display will be cleared.
### [framebuffer_display]
The following commands are available when a
[framebuffer_display config section](Config_Reference.md#framebuffer_display)
is enabled.
- Clear the framebuffer (fill with zeros):
`FRAMEBUFFER_CLEAR DEVICE=<name> WAIT=[0/1]`
- Send image: `FRAMEBUFFER_DRAW_IMAGE DEVICE=<name> PATH=<path> CLEAR=[0/1/2]
OFFSET=[n] WAIT=[0/1]`
- `DEVICE`: Configured device name in [framebuffer_display name]
- `PATH`: Absolute image path
- `OFFSET`: Offset from buffer start to write the image
- `CLEAR`: Clear the remaining buffer, 0=No, 1=Yes, 2=Auto. (int) Default: 2
- `WAIT`: Wait for the render to complete
### [msla_display]
The following commands are available when a
[msla_display config section](Config_Reference.md#msla_display)
is enabled.
- Validate print resolution and pixel size against the display information:
`MSLA_DISPLAY_VALIDATE RESOLUTION=<x,y> PIXEL=<width,height> STRICT=[0/1]`
- `RESOLUTION:` Resolution size in pixels
- `PIXEL`: Pixel size in millimeters
- `STRICT`: 0 = Prints if same or lower resolutions and same pixel size
1 = Prints if same resolutions and pixel size
- Tests the display response time: `MSLA_DISPLAY_RESPONSE_TIME AVG=[1]`
- `AVG`: Number of samples to average the results
- Test the display by showing full white and grey shades: `MSLA_DISPLAY_TEST
DELAY=[ms]`
- `DELAY`: Time in milliseconds between tests
- Display clear: `M1450`
- Display image: `M1451 F<"image.png"> O[n] C[0/1] W[0/1]`
- `F`: Image file to display, when printing this file is relative to the
base path of the print gcode
- `O`: Positive offset from start position of the buffer. Default: 0
- `C`: Clear the remaining buffer, 0=No, 1=Yes, 2=Auto. Default: 2
- `W`: Wait for the render to complete. Default: 0
- Tests the UV LED response time:
`MSLA_UVLED_RESPONSE_TIME TIME=[ms] OFFSET=[ms] TYPE=[0/1]`
- `TIME`: Exposure time in milliseconds
- `OFFSET`: Offset time from exposure time. Default: 0
- `TYPE`: 0 = When using M1400 Sx and Px. (Faster response time). (Default)
1 = When using M1400 Sx, G4 Px, M1400 S0 (Slower response time)
- Set the UV LED power: `M1400 S[0-255] P[ms]`
- `S`: The LED Power (Non PWM LEDs will turn on from 1 to 255). Default: 255
- `P`: Time to wait in milliseconds when (S>0) before turn off. Default: 0
- Turn off the UV LED: `M1401`
### [dual_carriage]
The following command is available when the

View file

@ -9,7 +9,6 @@
######################################################################
# Helper macros for showing common screen values
######################################################################
[display_template _heater_temperature]
param_heater_name: "extruder"
text:
@ -65,11 +64,17 @@ text:
Ready
{% endif %}
[display_template _uvled]
text:
{% if 'msla_display' in printer %}
{% if printer.msla_display.is_exposure %}
~uvled~
{% endif %}
{% endif %}
######################################################################
# Default 16x4 display
######################################################################
[display_data _default_16x4 extruder]
position: 0, 0
text:
@ -105,11 +110,13 @@ text: { "%6s" % (render("_printing_time").strip(),) }
position: 3, 0
text: { render("_print_status") }
[display_data _default_16x4 uvled]
position: 1, 0
text: { render("_uvled") }
######################################################################
# Alternative 16x4 layout for multi-extruders
######################################################################
[display_data _multiextruder_16x4 extruder]
position: 0, 0
text: { render("_heater_temperature", param_heater_name="extruder") }
@ -145,7 +152,6 @@ text: { render("_print_status") }
######################################################################
# Default 20x4 display
######################################################################
[display_data _default_20x4 extruder]
position: 0, 0
text: { render("_heater_temperature", param_heater_name="extruder") }
@ -191,11 +197,13 @@ text:
position: 3, 0
text: { render("_print_status") }
[display_data _default_20x4 uvled]
position: 1, 0
text: { render("_uvled") }
######################################################################
# Default 16x4 glyphs
######################################################################
[display_glyph extruder]
data:
................
@ -329,6 +337,25 @@ data:
................
................
[display_glyph uvled]
data:
................
...*...*...*....
....*..*..*.....
.....*.*.*......
......***.......
......*.*.......
......***.......
.....*.*.*......
....*..*..*.....
...*...*...*....
................
.....******.....
...**********...
.**************.
.**************.
................
# In addition to the above glyphs, 16x4 displays also have the
# following hard-coded single character glyphs: right_arrow, degrees.
@ -336,7 +363,6 @@ data:
######################################################################
# Default 20x4 glyphs
######################################################################
[display_glyph extruder]
hd44780_slot: 0
hd44780_data:
@ -457,5 +483,17 @@ hd44780_data:
*****
.....
[display_glyph uvled]
hd44780_slot: 0
hd44780_data:
*****
.....
*.*.*
*.*.*
*.*.*
.....
.***.
*****
# In addition to the above glyphs, 20x4 displays also have the
# following hard-coded glyphs: right_arrow.

View file

@ -30,6 +30,8 @@
# + Steppers off
# + Fan: OFF
# + Fan speed: 000%
# + UVLED: OFF [mSLA only]
# + UVLED pwr: 000% [mSLA only]
# + Lights: OFF
# + Lights: 000%
# + Move 10mm
@ -75,6 +77,7 @@
# + Restart
# + Restart host
# + Restart FW
# + Display Test [mSLA only]
# + PID tuning
# + Tune Hotend PID
# + Tune Hotbed PID
@ -113,6 +116,7 @@ gcode:
[menu __main __tune __flow]
type: input
enable: {('extruder' in printer) and ('extruder' in printer.heaters.available_heaters)}
name: Flow: {'%3d' % (menu.input*100)}%
input: {printer.gcode_move.extrude_factor}
input_min: 0.01
@ -256,6 +260,7 @@ gcode: BED_MESH_CALIBRATE
[menu __main __control __disable]
type: command
enable: {not printer.idle_timeout.state == "Printing"}
name: Steppers off
gcode:
M84
@ -283,6 +288,28 @@ input_step: 0.01
gcode:
M106 S{'%d' % (menu.input*255)}
[menu __main __control __uvledoff]
type: input
enable: {'msla_display' in printer and not printer.idle_timeout.state == "Printing"}
name: UVLED: {'ON ' if menu.input else 'OFF'}
input: {printer.msla_display.uvled_value_raw}
input_min: 0
input_max: 1
input_step: 1
gcode:
M1400 S{255 if menu.input else 0}
[menu __main __control __uvled]
type: input
enable: {'msla_display' in printer and not printer.idle_timeout.state == "Printing" and printer.msla_display.uvled_is_pwm}
name: UVLED pwr: {'%3d' % (menu.input*100)}%
input: {printer.msla_display.uvled_value_raw}
input_min: 0
input_max: 1
input_step: 0.01
gcode:
M1400 S{'%d' % (menu.input*255)}
[menu __main __control __caselightonoff]
type: input
enable: {'output_pin caselight' in printer}
@ -353,7 +380,7 @@ gcode:
[menu __main __control __move_10mm __axis_e]
type: input
enable: {not printer.idle_timeout.state == "Printing"}
enable: {not printer.idle_timeout.state == "Printing" and ('extruder' in printer)}
name: Move E:{'%+06.1f' % menu.input}
input: 0
input_min: -{printer.configfile.config.extruder.max_extrude_only_distance|default(50)}
@ -413,7 +440,7 @@ gcode:
[menu __main __control __move_1mm __axis_e]
type: input
enable: {not printer.idle_timeout.state == "Printing"}
enable: {not printer.idle_timeout.state == "Printing" and ('extruder' in printer)}
name: Move E:{'%+06.1f' % menu.input}
input: 0
input_min: -{printer.configfile.config.extruder.max_extrude_only_distance|default(50)}
@ -473,7 +500,7 @@ gcode:
[menu __main __control __move_01mm __axis_e]
type: input
enable: {not printer.idle_timeout.state == "Printing"}
enable: {not printer.idle_timeout.state == "Printing" and ('extruder' in printer)}
name: Move E:{'%+06.1f' % menu.input}
input: 0
input_min: -{printer.configfile.config.extruder.max_extrude_only_distance|default(50)}
@ -489,6 +516,7 @@ gcode:
[menu __main __temp]
type: list
name: Temperature
enable: {('extruder' in printer) or ('heater_bed' in printer)}
[menu __main __temp __hotend0_target]
type: input
@ -597,6 +625,7 @@ gcode: M140 S0
[menu __main __filament]
type: list
name: Filament
enable: {'extruder' in printer}
[menu __main __filament __hotend0_target]
type: input
@ -682,9 +711,16 @@ enable: {not printer.idle_timeout.state == "Printing"}
name: Restart FW
gcode: FIRMWARE_RESTART
[menu __main __setup __display_test]
type: command
name: Display test
enable: {'msla_display' in printer and (not printer.idle_timeout.state == "Printing")}
gcode: MSLA_DISPLAY_TEST
[menu __main __setup __tuning]
type: list
name: PID tuning
enable: {('extruder' in printer) or ('heater_bed' in printer)}
[menu __main __setup __tuning __hotend_pid_tuning]
type: command

View file

@ -0,0 +1,504 @@
# Write to framebuffer devices (HDMI and DSI) display
#
# Copyright (C) 2024 Tiago Conceicao <tiago_caza@hotmail.com>
#
# This file may be distributed under the terms of the GNU GPLv3 license.
#
import logging
import math
import re
import os
import mmap
import struct
from PIL import Image
import threading
CLEAR_NO_FLAG = 0
CLEAR_YES_FLAG = 1
CLEAR_AUTO_FLAG = 2
class FramebufferDisplay:
def __init__(self, config):
self.printer = config.get_printer()
self.gcode = self.printer.lookup_object('gcode')
self.model = config.get('model', None)
atypes = {'Mono': 'Mono',
'RGB': 'RGB',
'BGR': 'BGR',
}
self.pixel_format = config.getchoice('pixel_format', atypes,
default='RGB')
self.framebuffer_index = config.getint('framebuffer_index', minval=0)
if not os.path.exists(self.get_framebuffer_path()):
msg = ("The frame buffer device %s does not exists."
% self.get_framebuffer_path())
logging.exception(msg)
raise config.error(msg)
self.resolution_x = config.getint('resolution_x', minval=1) # In pixels
self.resolution_y = config.getint('resolution_y', minval=1) # In pixels
self.pixel_width = config.getfloat('pixel_width', 0.,
above=0.) # In millimeters
self.pixel_height = config.getfloat('pixel_height', 0.,
above=0.) # In millimeters
self.display_width = round(self.resolution_x * self.pixel_width, 4)
self.display_height = round(self.resolution_y * self.pixel_height, 4)
if self.pixel_format == 'Mono':
self.channels = 1
self.bit_depth = 8
if self.pixel_format == 'RGB' or self.pixel_format == 'BGR':
self.channels = 3
self.bit_depth = 24
else:
self.channels = len(self.pixel_format)
self.bit_depth = self.channels * 8
self.is_landscape = self.resolution_x >= self.resolution_y
self.is_portrait = not self.is_landscape
self.diagonal_inch = round(math.sqrt(
self.display_width ** 2 + self.display_height ** 2) / 25.4, 2)
# get modes width and height
with open("/sys/class/graphics/fb%d/modes" % self.framebuffer_index,
"r") as f:
modes = f.read()
match = re.search(r"(\d+)x(\d+)", modes)
if match:
self.fb_modes_width = int(match.group(1))
self.fb_modes_height = int(match.group(2))
else:
msg = ('Unable to extract "modes" information from famebuffer '
"device fb%d." % self.framebuffer_index)
logging.exception(msg)
raise config.error(msg)
# get virtual width and height
with open(
"/sys/class/graphics/fb%d/virtual_size"
% self.framebuffer_index, "r") as f:
virtual_size = f.read()
width_string, height_string = virtual_size.split(',')
self.fb_virtual_width = int(width_string) # width
self.fb_virtual_height = int(height_string) # height
# get stride
with open("/sys/class/graphics/fb%d/stride" % self.framebuffer_index,
"r") as f:
self.fb_stride = int(f.read())
# get bits per pixel
with open(
"/sys/class/graphics/fb%d/bits_per_pixel"
% self.framebuffer_index, "r") as f:
self.fb_bits_per_pixel = int(f.read())
# Check if configured resolutions match framebuffer information
if self.resolution_x not in (self.fb_stride, self.fb_modes_width):
msg = ("The configured resolution_x of %d does not match any of "
"the framebuffer stride of (%d) nor the modes of (%d)."
% (self.resolution_x, self.fb_stride, self.fb_modes_width))
logging.exception(msg)
raise config.error(msg)
if self.resolution_y != self.fb_modes_height:
msg = ("The configured resolution_y of %d does not match the "
"framebuffer modes of (%d)."
% (self.resolution_y, self.fb_modes_height))
logging.exception(msg)
raise config.error(msg)
self.fb_maxsize = self.fb_stride * self.fb_modes_height
# Defines the current thread that hold the display session to the fb
self._framebuffer_thread = None
self._framebuffer_lock = threading.Lock()
# Keeps device open to spare transfer times
self._fb_device = open(self.get_framebuffer_path(), mode='r+b')
self.fb_memory_map = mmap.mmap(self._fb_device.fileno(),
self.fb_stride * self.fb_modes_height,
mmap.MAP_SHARED,
mmap.PROT_WRITE | mmap.PROT_READ)
self.clear_buffer_threaded()
def is_busy(self):
"""
Checks if the device is busy.
@return: True if the device is busy, false otherwise.
"""
return (self._framebuffer_lock.locked() or
(self._framebuffer_thread is not None
and self._framebuffer_thread.is_alive()))
def wait_framebuffer_thread(self, timeout=None):
"""
Wait for the framebuffer thread to terminate (Finish writes).
@param timeout: Timeout time to wait for framebuffer thread to terminate
@return: True if waited for framebuffer thread, otherwise False.
"""
if (self._framebuffer_thread is not None
and self._framebuffer_thread.is_alive()):
# Wait for other render completion
self._framebuffer_thread.join(timeout)
return True
return False
def get_framebuffer_path(self):
"""
Gets the framebuffer device path associated with this object.
@return: /dev/fb[i]
"""
return "/dev/fb%d" % self.framebuffer_index
def get_max_offset(self, buffer_size, stride, height=-1):
"""
Gets the maximum possible offset of the framebuffer device given a
buffer and it stride.
@param buffer_size: The total buffer size
@param stride: The buffer stride, how many bytes per row
@param height: The buffer height
@return:
"""
if height < 0:
height = buffer_size // stride
return max(self.fb_maxsize - self.fb_stride * (height - 1) - stride, 0)
def get_position_from_xy(self, x, y):
"""
Gets the starting point in the buffer given an x and y coordinate
@param x: X coordinate
@param y: Y coordinate
@return: Buffer position
"""
if x < 0:
msg = "The x value can not be negative."
logging.exception(msg)
raise self.gcode.error(msg)
if x >= self.resolution_x:
msg = "The x value must be less than %d" % self.resolution_x
logging.exception(msg)
raise self.gcode.error(msg)
if y < 0:
msg = "The y value can not be negative."
logging.exception(msg)
raise self.gcode.error(msg)
if y >= self.resolution_y:
msg = "The y value must be less than %d" % self.resolution_y
logging.exception(msg)
raise self.gcode.error(msg)
return y * self.fb_stride + x * self.channels
def get_image_buffer(self, path, strip_alpha=True):
"""
Reads an image from disk and return it buffer, ready to send to fb.
Note that it does not do any file check,
it will always try to open an image file.
@param path: Image path
@param strip_alpha: Strip the alpha channel of the image if present
@return: dict with image data
"""
with Image.open(path) as img:
depth = 3
if img.mode in ("L", "P"):
depth = 1
elif img.mode in ("RGBA", "CMYK"):
depth = 4
if img.mode == "RGBA" and strip_alpha:
depth = 3
# Strip alpha
with Image.new('RGB', img.size) as img2:
img2.paste(img, (0, 0), img)
buffer = img2.tobytes()
width = img.width
height = img.height
else:
buffer = img.tobytes()
width = img.width
height = img.height
buffer_size = len(buffer)
#logging.exception(f"buffer: {buffer_size}, "
# f"stride: {width * depth}, "
# f"width: {width}, "
# f"height: {height}, "
# f"depth: {depth}, "
# f"bpp: {depth * 8}")
return {'buffer': buffer,
'stride': img.width * depth,
'width': width,
'height': height,
'depth': depth,
'bpp': depth * 8,
'size': buffer_size}
def clear_buffer(self):
"""
Clears the display with zeros (black background).
"""
self.fill_buffer(0)
def clear_buffer_threaded(self, wait_thread=False):
"""
Clears the display with zeros (black background).
@param wait_thread: Wait for the framebuffer thread to terminate.
"""
self.wait_framebuffer_thread()
self._framebuffer_thread = threading.Thread(
target=self.clear_buffer,
name="Clears the fb%d" % self.framebuffer_index)
self._framebuffer_thread.start()
if wait_thread:
self._framebuffer_thread.join()
def write_buffer(self, buffer, buffer_stride=0, offset=0,
clear_flag=CLEAR_NO_FLAG):
"""
Write a byte array into the framebuffer device.
@param buffer: A 1D byte array with data to send
@param offset: Sets a positive offset from start position of the buffer.
@param buffer_stride: The stride/row of the buffer, set if buffer is a
cropped region.
@param clear_flag: Clear the remaining buffer, 0=No, 1=Yes, 2=Auto
"""
if buffer is None or len(buffer) == 0:
return
if not isinstance(buffer, (list, bytes)):
msg = ("The buffer must be a 1D byte array. %s was passed"
% type(buffer))
logging.exception(msg)
raise self.gcode.error(msg)
if offset < 0:
msg = "The offset %d can not be negative value." % offset
logging.exception(msg)
raise self.gcode.error(msg)
if clear_flag < 0 or clear_flag > CLEAR_AUTO_FLAG:
msg = "The clear flag must be between 0 and %d." % CLEAR_AUTO_FLAG
raise self.gcode.error(msg)
clear = clear_flag
buffer_size = len(buffer)
if buffer_size + offset > self.fb_maxsize:
msg = ("The buffer size of %d + %d is greater "
"than actual framebuffer size of %d."
% (buffer_size, offset, self.fb_maxsize))
logging.exception(msg)
raise self.gcode.error(msg)
# Auto clear, clears if buffer is smaller than framebuffer device
if clear_flag == CLEAR_AUTO_FLAG and buffer_size != self.fb_maxsize:
clear = CLEAR_YES_FLAG
is_region = False
if buffer_stride > 0 and buffer_stride < self.fb_stride:
is_region = True
if buffer_size % buffer_stride != 0:
msg = ("The buffer stride of %d must be an exact"
" multiple of the buffer size %d."
% (buffer_stride, buffer_size))
logging.exception(msg)
raise self.gcode.error(msg)
max_offset = self.get_max_offset(buffer_size, buffer_stride)
if offset > max_offset:
msg = ("The offset of %d can not be greater than %d in order "
"to fit the a buffer of %d." %
(offset, max_offset, buffer_size))
logging.exception(msg)
raise self.gcode.error(msg)
with (self._framebuffer_lock):
if offset > 0:
if clear:
self.fb_memory_map.write(
struct.pack('<B', 0) * offset)
else:
self.fb_memory_map.seek(offset)
if is_region:
stride_offset = self.fb_stride - buffer_stride
if clear:
stride_cleaner = struct.pack('<B', 0) * stride_offset
for i in range(0, buffer_size, buffer_stride):
if i > 0:
if clear:
self.fb_memory_map.write(stride_cleaner)
else:
self.fb_memory_map.seek(stride_offset, os.SEEK_CUR)
self.fb_memory_map.write(buffer[i:i + buffer_stride])
else:
self.fb_memory_map.write(buffer)
pos = self.fb_memory_map.tell()
if clear and pos < self.fb_maxsize:
self.fb_memory_map.write(
struct.pack('<B', 0) * (self.fb_maxsize - pos))
self.fb_memory_map.seek(0)
def write_buffer_threaded(self, buffer, buffer_stride=0, offset=0,
clear_flag=CLEAR_NO_FLAG, wait_thread=False):
"""
Write a byte array into the framebuffer device in a separate thread.
@param buffer: A 1D byte array with data to send
@param buffer_stride: The stride/row of the buffer, set if buffer is a
cropped region.
@param offset: Sets a positive offset from start position of the buffer.
@param clear_flag: Clear the remaining buffer, 0=No, 1=Yes, 2=Auto
@param wait_thread: If true wait for the framebuffer thread to terminate
"""
self.wait_framebuffer_thread()
self._framebuffer_thread = threading.Thread(
target=lambda: self.write_buffer(buffer, buffer_stride, offset,
clear_flag),
name="Sends an buffer to fb%d" % self.framebuffer_index)
self._framebuffer_thread.start()
if wait_thread:
self._framebuffer_thread.join()
def fill_buffer(self, color):
"""
Fills the framebuffer with the given color.
@param color: From 0 to 255 or a tuple/list of RGB
"""
if isinstance(color, int):
if color < 0 or color > 255:
msg = "The color must be an int number from 0 to 255."
logging.exception(msg)
raise self.gcode.error(msg)
buffer = struct.pack('<B', color)
elif isinstance(color, (tuple, list)):
for x in color:
if x < 0 or x > 255:
msg = "The color must be an int number from 0 to 255."
logging.exception(msg)
raise self.gcode.error(msg)
buffer = bytes(color)
else:
msg = ("The color must be an int number from 0 to 255 "
"or a tuple/list of RGB.")
logging.exception(msg)
raise self.gcode.error(msg)
buffer_size = len(buffer)
if self.fb_maxsize % buffer_size != 0:
msg = (
"The buffer size of %d must be an exact"
" multiple of the framebuffer size %d."
% (buffer_size, self.fb_maxsize))
logging.exception(msg)
raise self.gcode.error(msg)
self.write_buffer(buffer * (self.fb_maxsize // buffer_size))
def fill_buffer_threaded(self, color, wait_thread=False):
"""
Fills the framebuffer with the given color.
@param color: From 0 to 255 or a tuple/list of RGB
@param wait_thread: Wait for the framebuffer thread to terminate.
"""
self.wait_framebuffer_thread()
self._framebuffer_thread = threading.Thread(
target=self.fill_buffer, args=(color,),
name="Fill the fb%d with %d" % (self.framebuffer_index, color))
self._framebuffer_thread.start()
if wait_thread:
self._framebuffer_thread.join()
def draw_image(self, path, offset=0, clear_flag=CLEAR_NO_FLAG):
"""
Reads an image from a path and draw the bitmap to the fb device.
@param path: Image file path to be read.
@param offset: Sets a positive offset from start position of the buffer.
@param clear_flag: Clear the remaining buffer, 0=No, 1=Yes, 2=Auto
"""
gcode = self.printer.lookup_object('gcode')
if offset < 0:
msg = "The offset %d can not be negative value." % offset
logging.exception(msg)
raise gcode.error(msg)
if not isinstance(path, str):
msg = "Path must be a string."
logging.exception(msg)
raise gcode.error(msg)
if not os.path.isfile(path):
msg = "The file '%s' does not exists." % path
logging.exception(msg)
raise gcode.error(msg)
di = self.get_image_buffer(path)
self.write_buffer(di['buffer'], di['stride'], offset, clear_flag)
def draw_image_threaded(self, path, offset=0, clear_flag=CLEAR_NO_FLAG,
wait_thread=False):
"""
Reads an image from a path and draw the bitmap to the fb device.
@param path: Image file path to be read.
@param offset: Sets a positive offset from start position of the buffer.
@param clear_flag: Clear the remaining buffer, 0=No, 1=Yes, 2=Auto
@param wait_thread: If true wait for the framebuffer thread to terminate
"""
self.wait_framebuffer_thread()
self._framebuffer_thread = threading.Thread(
target=lambda: self.draw_image(path, offset, clear_flag),
name="Render an image to fb%d" % self.framebuffer_index)
self._framebuffer_thread.start()
if wait_thread:
self._framebuffer_thread.join()
class FramebufferDisplayWrapper(FramebufferDisplay):
def __init__(self, config):
super(FramebufferDisplayWrapper, self).__init__(config)
device_name = config.get_name().split()[1]
self.gcode.register_mux_command("FRAMEBUFFER_CLEAR", "DEVICE",
device_name, self.cmd_FRAMEBUFFER_CLEAR,
desc=self.cmd_FRAMEBUFFER_CLEAR_help)
self.gcode.register_mux_command("FRAMEBUFFER_SEND_IMAGE", "DEVICE",
device_name, self.cmd_FRAMEBUFFER_DRAW_IMAGE,
desc=self.cmd_FRAMEBUFFER_DRAW_IMAGE_help)
def get_status(self, eventtime):
return {'is_busy': self.is_busy()}
cmd_FRAMEBUFFER_CLEAR_help = ("Clears the display with zeros "
"(black background)")
def cmd_FRAMEBUFFER_CLEAR(self, gcmd):
wait_thread = gcmd.get_int('WAIT', 0, 0, 1)
self.clear_buffer_threaded(wait_thread)
cmd_FRAMEBUFFER_DRAW_IMAGE_help = ("Reads a image from a path "
"and render it on the framebuffer.")
def cmd_FRAMEBUFFER_DRAW_IMAGE(self, gcmd):
path = gcmd.get('PATH')
offset = gcmd.get_int('OFFSET', 0, 0)
clear_flag = gcmd.get_int('CLEAR', CLEAR_AUTO_FLAG, 0, CLEAR_AUTO_FLAG)
wait_thread = gcmd.get_int('WAIT', 0, 0, 1)
if not os.path.isfile(path):
raise gcmd.error("The file '%s' does not exists." % path)
self.draw_image_threaded(path, offset, clear_flag, wait_thread)
def load_config_prefix(config):
return FramebufferDisplayWrapper(config)

View file

@ -0,0 +1,675 @@
# mSLA and DLP display properties
#
# Copyright (C) 2024 Tiago Conceicao <tiago_caza@hotmail.com>
#
# This file may be distributed under the terms of the GNU GPLv3 license.
#
import logging
import os
import threading
import re
import time
import tempfile
from PIL import Image
from . import framebuffer_display, pwm_tool, pwm_cycle_time, output_pin
CACHE_MIN_RAM = 256 # 256 MB
class mSLADisplay(framebuffer_display.FramebufferDisplay):
"""
Represents an mSLA display with an associated UV LED
"""
def __init__(self, config):
super(mSLADisplay, self).__init__(config)
# CACHE CONFIG
self.buffer_cache_count = config.getint('cache', 0, 0, 100)
# RAM guard
if self.buffer_cache_count > 0 and os.path.isfile('/proc/meminfo'):
with open('/proc/meminfo', 'r') as mem:
for i in mem:
sline = i.split()
if str(sline[0]) == 'MemFree:':
free_memory_mb = int(sline[1]) / 1024.
buffer_max_size = (self.buffer_cache_count + 1
) * self.fb_maxsize / 1024. / 1024.
if free_memory_mb - buffer_max_size < CACHE_MIN_RAM:
msg = ("The current cache count of %d requires %.2f"
" MB of RAM, currently have %.2f MB of free "
"memory. It requires at least a margin of "
"%.2f MB of free RAM.\n"
"Please reduce the cache count.") % (
self.buffer_cache_count, buffer_max_size,
free_memory_mb, CACHE_MIN_RAM)
logging.exception(msg)
raise config.error(msg)
break
self._cache = []
self._cache_thread = None
# UVLED CONFIG
self.uvled_output_pin_name = config.get('uvled_output_pin_name',
'msla_uvled')
self.uvled_response_delay = config.getfloat('uvled_response_delay', 0,
minval=0., maxval=1000.)
self.uvled = self.printer.lookup_object("output_pin %s"
% self.uvled_output_pin_name,
None)
if self.uvled is None:
self.uvled = self.printer.lookup_object("pwm_tool %s" %
self.uvled_output_pin_name,
None)
if self.uvled is None:
msg = "The [output_pin %s] or [pwm_tool %s] was not found." % (
self.uvled_output_pin_name, self.uvled_output_pin_name)
logging.exception(msg)
raise config.error(msg)
# Variables
self._sd = self.printer.lookup_object("virtual_sdcard")
self._M1451_file_regex = re.compile(r'F(.+[.](png|jpg|jpeg|bmp|gif))')
# Events
self.printer.register_event_handler("virtual_sdcard:reset_file",
self.clear_cache)
# Register commands
self.gcode = self.printer.lookup_object('gcode')
self.gcode.register_command("MSLA_DISPLAY_VALIDATE",
self.cmd_MSLA_DISPLAY_VALIDATE,
desc=self.cmd_MSLA_DISPLAY_VALIDATE_help)
self.gcode.register_command("MSLA_DISPLAY_TEST",
self.cmd_MSLA_DISPLAY_TEST,
desc=self.cmd_MSLA_DISPLAY_TEST_help)
self.gcode.register_command("MSLA_DISPLAY_RESPONSE_TIME",
self.cmd_MSLA_DISPLAY_RESPONSE_TIME,
desc=self.cmd_MSLA_DISPLAY_RESPONSE_TIME_help)
self.gcode.register_command('M1450', self.cmd_M1450, # Display clear
desc=self.cmd_M1451_help)
self.gcode.register_command('M1451', self.cmd_M1451, # Display image
desc=self.cmd_M1451_help)
self.gcode.register_command("MSLA_UVLED_RESPONSE_TIME",
self.cmd_MSLA_UVLED_RESPONSE_TIME,
desc=self.cmd_MSLA_UVLED_RESPONSE_TIME_help)
self.gcode.register_command('M1400', self.cmd_M1400, # UVLED SET
desc=self.cmd_M1400_help)
self.gcode.register_command('M1401', self.cmd_M1401, # UVLED OFF
desc=self.cmd_M1401_help)
def get_status(self, eventtime):
return {'display_busy': self.is_busy(),
'is_exposure': self.is_exposure(),
'uvled_is_pwm': self.is_uvled_pwm(),
'uvled_value_raw': self.uvled.last_value,
'uvled_value': self.uvled.last_value * 255.}
def clear_cache(self):
"""
Clear all cached buffers
"""
self._cache = []
def _can_cache(self):
"""
Returns true if is ready to cache
@rtype: bool
"""
return (self.buffer_cache_count > 0
and (self._cache_thread is None
or not self._cache_thread.is_alive()))
def _wait_cache_thread(self):
"""
Waits for cache thread completion
@rtype: bool
@return: True if waited, otherwise false
"""
if self._cache_thread is not None and self._cache_thread.is_alive():
self._cache_thread.join()
return True
return False
def _process_cache(self, last_cache):
"""
Cache next items from last cache position.
@param last_cache:
"""
# Dequeue items
while len(self._cache) >= self.buffer_cache_count:
self._cache.pop(0)
cache_len = len(self._cache)
if cache_len:
last_cache = self._cache[-1]
# Cache new items
item_count = self.buffer_cache_count - cache_len
layer_index = last_cache.get_layer_index()
for i in range(item_count):
layer_index += 1
new_path = last_cache.new_path_layerindex(layer_index)
if os.path.isfile(new_path):
di = self.get_image_buffer(new_path)
self._cache.append(BufferCache(new_path, di))
def _get_cache_index(self, path):
"""
Gets the index of the cache position on the list given a path
@param path: Path to seek
@return: Index of the cache on the list, otherwise -1
"""
for i, cache in enumerate(self._cache):
if cache.path == path:
return i
return -1
def _get_image_buffercache(self, gcmd, path):
"""
Gets an image buffer from cache if available, otherwise it creates the
buffer from the path image
@param gcmd:
@param path: Image path
@return: Image buffer
"""
# If currently printing, use print directory to select the file
current_filepath = self._sd.file_path()
if current_filepath is not None and path[0] != '/':
path = os.path.join(os.path.dirname(current_filepath), path)
if not os.path.isfile(path):
raise (gcmd.error("%s: The file '%s' does not exists.") %
(gcmd.get_command(), path))
if self.buffer_cache_count > 0:
self._wait_cache_thread() # May be worth to wait if streaming
index = self._get_cache_index(path)
if index >= 0:
cache = self._cache[index]
# RTrim up to cache
self._cache = self._cache[index+1:]
if self._can_cache():
self._cache_thread = threading.Thread(
target=self._process_cache, args=(cache,))
self._cache_thread.start()
return cache.data
if not os.path.isfile(path):
raise (gcmd.error("%s: The file '%s' does not exists.") %
(gcmd.get_command(), path))
di = self.get_image_buffer(path)
if current_filepath and self._can_cache():
cache = BufferCache(path, di)
self._cache_thread = threading.Thread(target=self._process_cache,
args=(cache,))
self._cache_thread.start()
return di
cmd_MSLA_DISPLAY_VALIDATE_help = ("Validate the display against resolution "
"and pixel parameters. Throw error if out"
" of parameters.")
def cmd_MSLA_DISPLAY_VALIDATE(self, gcmd):
"""
Layer images are universal within same pixel pitch, but it also must fit
within the LCD area.
Other printers with same or higher resolution and same pixel pitch can
print same file.
This command ensure print only continue if requirements are meet.
Syntax: MSLA_DISPLAY_VALIDATE RESOLUTION=<x,y> PIXEL=<width,height>
STRICT=[0/1]
RESOLUTION: Machine LCD resolution
PIXEL: Machine LCD pixel pitch
STRICT: 0 = Prints if same or lower resolutions
1 = Prints if same resolutions
@param gcmd:
@return:
"""
resolution = gcmd.get('RESOLUTION')
pixel_size = gcmd.get('PIXEL')
strict = gcmd.get_int('STRICT', 0, 0, 1)
resolution_split = resolution.split(',', 1)
if len(resolution_split) < 2:
raise gcmd.error("The resolution of %d is malformed. "
"Format: RESOLUTION_X,RESOLUTION_Y." % resolution)
try:
resolution_x = int(resolution_split[0])
except:
raise gcmd.error("The resolution x must be an integer number.")
try:
resolution_y = int(resolution_split[1])
except:
raise gcmd.error("The resolution y must be an integer.")
if strict:
if resolution_x > self.resolution_x:
raise gcmd.error(
"The resolution X of %d is invalid. "
"Should be equal to %d."
% (resolution_x, self.resolution_x))
if resolution_y > self.resolution_y:
raise gcmd.error(
"The resolution Y of %d is invalid. "
"Should be equal to %d."
% (resolution_y, self.resolution_y))
else:
if resolution_x > self.resolution_x:
raise gcmd.error("The resolution X of %d is "
"invalid. Should be less or equal to %d."
% (resolution_x, self.resolution_x))
if resolution_y > self.resolution_y:
raise gcmd.error("The resolution Y of %d is "
"invalid. Should be less or equal to %d."
% (resolution_y, self.resolution_y))
pixel_size_split = pixel_size.split(',', 1)
if len(pixel_size_split) < 2:
raise gcmd.error("The pixel size of %f is malformed. "
"Format: PIXEL_WIDTH,PIXEL_HEIGHT." % pixel_size)
try:
pixel_width = float(pixel_size_split[0])
except:
raise gcmd.error("The pixel width must be an floating point "
"number.")
try:
pixel_height = float(pixel_size_split[1])
except:
raise gcmd.error("The pixel height must be an floating point "
"number.")
if pixel_width != self.pixel_width:
raise gcmd.error("The pixel width of %f is invalid. "
"Should be %f."
% (pixel_width, self.pixel_width))
if pixel_height != self.pixel_height:
raise gcmd.error("The pixel height of %f is invalid. "
"Should be %f."
% (pixel_height, self.pixel_height))
cmd_MSLA_DISPLAY_TEST_help = ("Test the display by showing full white image"
" and grey shades. Use a white paper on top "
"of display and confirm if the pixels are "
"healthy.")
def cmd_MSLA_DISPLAY_TEST(self, gcmd):
"""
Test the display by showing full white and grey shades.
Use a white paper on top of display and confirm if the pixels
are healthy.
Syntax: MSLA_DISPLAY_TEST DELAY=[3000]
DELAY: Time in milliseconds between tests (int)
@param gcmd:
@return:
"""
if self._sd.current_file is not None:
gcmd.respond_raw("MSLA_DISPLAY_TEST: Can not run this command while"
" printing.")
return
delay = gcmd.get_int('DELAY', 3000, 1000, 5000) / 1000.
toolhead = self.printer.lookup_object('toolhead')
color = 255
decrement = 45
self.set_uvled_on()
while color > 0:
gcmd.respond_raw("Fill color: %d" % color)
self.fill_buffer(color)
color -= decrement
toolhead.dwell(delay)
self.set_uvled_off()
self.clear_buffer()
gcmd.respond_raw("Test finished.")
cmd_MSLA_DISPLAY_RESPONSE_TIME_help = ("Sends a buffer to display and test "
"it response time to complete the "
"render.")
def cmd_MSLA_DISPLAY_RESPONSE_TIME(self, gcmd):
"""
Sends a buffer to display and test it response time
to complete the render.
Syntax: MSLA_DISPLAY_RESPONSE_TIME AVG=[x]
AVG: Number of samples to average (int)
@param gcmd:
@return:
"""
if self._sd.current_file is not None:
gcmd.respond_raw("MSLA_DISPLAY_RESPONSE_TIME: Can not run this "
"command while printing.")
avg = gcmd.get_int('AVG', 1, 1, 20)
fb_maxsize_mb = round(self.fb_maxsize / 1024.0 / 1024.0, 2)
gcmd.respond_raw('Buffer size: %d bytes (%.2f MB)'
% (self.fb_maxsize, fb_maxsize_mb))
time_sum = 0
with tempfile.TemporaryDirectory() as tmp:
path = os.path.join(tmp, 'buffer.png')
with Image.new('L',
(self.resolution_x, self.resolution_y),
color='black') as image:
image.save(path, "PNG")
for _ in range(avg):
timems = time.time()
self.draw_image(path)
time_sum += time.time() - timems
gcmd.respond_raw('Read image from disk + send time: %fms (%d samples)'
% (round(time_sum * 1000 / avg, 6), avg))
time_sum = 0
buffer = bytes(os.urandom(self.fb_maxsize))
for _ in range(avg):
timems = time.time()
self.write_buffer(buffer)
time_sum += time.time() - timems
gcmd.respond_raw('Send cached buffer: %fms (%d samples)'
% (round(time_sum * 1000 / avg, 6), avg))
time_sum = 0
for _ in range(avg):
timems = time.time()
self.clear_buffer()
time_sum += time.time() - timems
gcmd.respond_raw('Clear time: %fms (%d samples)' %
(round(time_sum * 1000 / avg, 6), avg))
cmd_M1450_help = ("Clears the display with all black pixels "
"(Fill with zeros)")
def cmd_M1450(self, gcmd):
"""
Clears the display with all black pixels (Fill with zeros)
Syntax: M1450 W[0/1]
W: Wait for buffer to complete the transfer. (int)
@param gcmd:
"""
wait_thread = gcmd.get_int('W', 0, 0, 1)
toolhead = self.printer.lookup_object('toolhead')
if self.is_exposure():
toolhead.wait_moves()
self.clear_buffer_threaded(True)
else:
self.clear_buffer_threaded(wait_thread)
cmd_M1451_help = ("Reads an image from a path and display it on "
"the display.")
def cmd_M1451(self, gcmd):
"""
Display an image from a path and display it on the main display.
Syntax: M1451 F<"file.png"> O[x] C[0/1/3] W[0/1]
F: File path eg: "1.png". quotes are optional. (str)
O: Sets a positive offset from start position of the buffer. (int)
C: Clear the remaining buffer, 0=No, 1=Yes, 2=Auto. (int) Default: 2
W: Wait for buffer to complete the transfer. (int)
@param gcmd:
@return:
"""
clear_flag = gcmd.get_int('C', framebuffer_display.CLEAR_AUTO_FLAG, 0,
framebuffer_display.CLEAR_AUTO_FLAG)
wait_thread = gcmd.get_int('W', 0, 0, 1)
params = gcmd.get_raw_command_parameters().strip()
if 'F' not in params:
raise gcmd.error("Error on '%s': missing F" % gcmd.get_command())
toolhead = self.printer.lookup_object('toolhead')
match = self._M1451_file_regex.search(params)
if not match:
raise gcmd.error("Error on '%s': The F parameter"
" is malformed. Use a proper image file."
% gcmd.get_command())
path = match.group(1).strip(' "\'')
path = os.path.normpath(os.path.expanduser(path))
di = self._get_image_buffercache(gcmd, path)
max_offset = self.get_max_offset(di['size'], di['stride'], di['height'])
offset = gcmd.get_int('O', 0, 0, max_offset)
if self.is_exposure():
# LED is ON, sync to prevent image change while exposure
toolhead.wait_moves()
self.write_buffer_threaded(di['buffer'], di['stride'], offset,
clear_flag, True)
else:
self.write_buffer_threaded(di['buffer'], di['stride'], offset,
clear_flag, wait_thread)
def is_exposure(self):
"""
True if the UV LED is on, exposure a layer, otherwise False
@return:
"""
return self.uvled.last_value > 0
def is_uvled_pwm(self):
"""
True if UV LED is configured as PWM pin, otherwise False
"""
return (isinstance(self.uvled, (pwm_tool.PrinterOutputPin,
pwm_cycle_time.PrinterOutputPWMCycle))
or (isinstance(self.uvled, output_pin.PrinterOutputPin)
and self.uvled.is_pwm))
def _set_uvled_callback(self, print_time, value, delay=0):
"""
The uvled setter callback for the lookahead function
"""
self.uvled._set_pin(print_time, value)
if value > 0 and delay > 0:
delay -= self.uvled_response_delay / 1000.
if delay < 0:
delay = 0
self.uvled._set_pin(print_time + delay, 0)
def set_uvled(self, value, delay=0):
"""
Turns the UV LED on or off.
@param value: PWM value from 0 to 255
@param delay: If set, it will switch off from on state after specified
delay. This also ensures that the UV LED is in sync with
the display.
@return:
"""
if self.is_uvled_pwm():
value /= 255.0
else:
value = 1 if value else 0
if value > 0:
# Sync display render
self.wait_framebuffer_thread()
# Do not continue if same value
if self.uvled.last_value == value:
return
toolhead = self.printer.lookup_object('toolhead')
#toolhead.register_lookahead_callback(
# lambda print_time: self._set_uvled_callback(print_time,
# value, delay))
#toolhead.wait_moves()
#return
toolhead.register_lookahead_callback(
lambda print_time: self.uvled._set_pin(print_time, value))
if value > 0 and delay > 0:
delay -= self.uvled_response_delay / 1000.
if delay > 0:
toolhead.dwell(delay)
toolhead.register_lookahead_callback(
lambda print_time: self.uvled._set_pin(print_time, 0))
toolhead.wait_moves()
def set_uvled_on(self, delay=0):
"""
Turns the UV LED on with max power
@param delay: If set, it will switch off from on state after specified
delay. This also ensures that the UV LED is in sync with
the display.
@return:
"""
self.set_uvled(255, delay)
def set_uvled_off(self):
"""
Turns the UV LED off
"""
self.set_uvled(0)
cmd_MSLA_UVLED_RESPONSE_TIME_help = ("Tests the response time for the "
"UV LED shutter.")
def cmd_MSLA_UVLED_RESPONSE_TIME(self, gcmd):
"""
Tests the response time for the UV LED to turn on and off.
Syntax: MSLA_UVLED_RESPONSE_TIME TIME=<ms> OFFSET=[ms]
TIME: Exposure time in milliseconds. (int)
OFFSET: Offset time from exposure time. (int)
TYPE: <0> when using M1400 Sx and Px. (Faster response time)
<1> when using M1400 Sx, G4 Px, M1400 S0 (Slower response time)
@param gcmd:
@return:
"""
if self._sd.current_file is not None:
gcmd.respond_raw("MSLA_UVLED_RESPONSE_TIME: Can not run this "
"command while printing.")
self.set_uvled(0)
self.clear_buffer()
delay = gcmd.get_int('TIME', 3000, minval=500, maxval=5000) / 1000.
offset = gcmd.get_int('OFFSET', 0, minval=-1000, maxval=0) / 1000.
etype = gcmd.get_int('TYPE', 0, minval=0, maxval=1)
if etype == 0:
offset -= 0.2
toolhead = self.printer.lookup_object('toolhead')
toolhead.wait_moves()
time_total = enable_time = time.time()
toolhead.register_lookahead_callback(
lambda print_time: self.uvled._set_pin(print_time, 1))
toolhead.wait_moves()
enable_time = round(time.time() - enable_time, 4)
exposure_time = time.time()
toolhead.dwell(delay + offset)
toolhead.register_lookahead_callback(
lambda print_time: self.uvled._set_pin(print_time, 0))
toolhead.wait_moves()
final_time = time.time()
exposure_time = round(final_time - exposure_time, 4)
time_total = round(final_time - time_total, 4)
if etype == 0:
offset += 0.2
calculated_offset = max(0,
round((exposure_time - delay + offset) * 1000))
exposure_factor = round(exposure_time / delay, 2)
gcmd.respond_raw("UV LED RESPONSE TIME TEST (%fs):\n"
"Switch time: %d ms\n"
"Exposure time: %f s\n"
"Total time: %f s\n"
"Exposure time factor: %f x\n"
"Calculated delay offset: %d ms\n"
% (delay, round(enable_time * 1000), exposure_time,
time_total, exposure_factor, calculated_offset))
cmd_M1400_help = "Turn the main UV LED to cure the pixels."
def cmd_M1400(self, gcmd):
"""
Turn the main UV LED to cure the pixels.
M1400 comes from the wavelength of UV radiation (UVR)
lies in the range of 100-400 nm, and is further subdivided into
UVA (315-400 nm), UVB (280-315 nm), and UVC (100-280 nm).
Syntax: M1400 S[0-255] P[ms]
S: LED Power (Non PWM LEDs will turn on from 1 to 255). (float)
P: Time to wait in milliseconds when (S>0) before turn off. (int)
@param gcmd:
"""
value = gcmd.get_float('S', 255., minval=0., maxval=255.)
delay = gcmd.get_float('P', 0, above=0.) / 1000.
self.set_uvled(value, delay)
cmd_M1401_help = "Turn the main UV LED off."
def cmd_M1401(self, gcmd):
"""
Turn the main UV LED off
Syntax: M1401
@param gcmd:
"""
self.set_uvled(0)
class BufferCache:
def __init__(self, path, data):
self.path = path
self.data = data
def new_path_layerindex(self, n):
"""
Return a new path with same base directory but new layer index
@param n:
@return:
"""
extension = self.path[self.path.rfind('.'):]
return re.sub(r"\d+%s$" % re.escape(extension),
"%d%s" % (n, extension),
self.path)
def get_layer_index(self):
"""
Gets the layer index from a path
@return: New layer index. -1 if not founded.
"""
extension = self.path[self.path.rfind('.'):]
match = re.search(r"(\d+)%s$" % re.escape(extension), self.path)
if match is None:
return -1
return int(match.group(1))
def load_config(config):
return mSLADisplay(config)

View file

@ -6,26 +6,44 @@
class PrintStats:
def __init__(self, config):
printer = config.get_printer()
self.gcode_move = printer.load_object(config, 'gcode_move')
self.reactor = printer.get_reactor()
self.printer = config.get_printer()
self.gcode_move = self.printer.load_object(config, 'gcode_move')
self.toolhead = None
self.reactor = self.printer.get_reactor()
self.material_type = ''
self.material_unit = ''
self.reset()
# Register events
self.printer.register_event_handler("klippy:mcu_identify",
self._init_delayed_stats)
# Register commands
self.gcode = printer.lookup_object('gcode')
self.gcode = self.printer.lookup_object('gcode')
self.gcode.register_command(
"SET_PRINT_STATS_INFO", self.cmd_SET_PRINT_STATS_INFO,
desc=self.cmd_SET_PRINT_STATS_INFO_help)
printer.register_event_handler("extruder:activate_extruder",
self._handle_activate_extruder)
def _init_delayed_stats(self):
if self.toolhead is None:
self.toolhead = self.printer.lookup_object('toolhead')
if self.toolhead:
if self.toolhead.manufacturing_process == 'FDM':
self.material_type = 'filament'
self.material_unit = 'mm'
elif self.toolhead.manufacturing_process in ('SLA', 'mSLA', 'DLP'):
self.material_type = 'resin'
self.material_unit = 'ml'
def _handle_activate_extruder(self):
gc_status = self.gcode_move.get_status()
self.last_epos = gc_status['position'].e
def _update_filament_usage(self, eventtime):
gc_status = self.gcode_move.get_status(eventtime)
cur_epos = gc_status['position'].e
self.filament_used += (cur_epos - self.last_epos) \
/ gc_status['extrude_factor']
self.last_epos = cur_epos
if self.toolhead.manufacturing_process == 'FDM':
gc_status = self.gcode_move.get_status(eventtime)
cur_epos = gc_status['position'].e
self.material_used += (cur_epos - self.last_epos) \
/ gc_status['extrude_factor']
self.last_epos = cur_epos
def set_current_file(self, filename):
self.reset()
self.filename = filename
@ -64,7 +82,7 @@ class PrintStats:
self.error_message = error_message
eventtime = self.reactor.monotonic()
self.total_duration = eventtime - self.print_start_time
if self.filament_used < 0.0000001:
if self.material_used < 0.0000001:
# No positive extusion detected during print
self.init_duration = self.total_duration - \
self.prev_pause_duration
@ -72,10 +90,35 @@ class PrintStats:
cmd_SET_PRINT_STATS_INFO_help = "Pass slicer info like layer act and " \
"total to klipper"
def cmd_SET_PRINT_STATS_INFO(self, gcmd):
"""
Sets print stats info
Syntax: SET_PRINT_STATS_INFO TOTAL_LAYER=[count]
CURRENT_LAYER=[number]
MATERIAL_NAME=["name"]
MATERIAL_UNIT=[unit]
MATERIAL_TOTAL=[total]
CONSUME_MATERIAL=[amount]
TOTAL_LAYER: Total layer count
CURRENT_LAYER: Current printing layer number
MATERIAL_NAME: Name of the material being used
MATERIAL_UNIT: Material unit
MATERIAL_TOTAL: Total material this print will consume
CONSUME_MATERIAL: Consume material and increment the used material
@param gcmd:
@return:
"""
total_layer = gcmd.get_int("TOTAL_LAYER", self.info_total_layer, \
minval=0)
current_layer = gcmd.get_int("CURRENT_LAYER", self.info_current_layer, \
minval=0)
material_name = gcmd.get("MATERIAL_NAME", None)
material_unit = gcmd.get("MATERIAL_UNIT", None)
material_total = gcmd.get_float("MATERIAL_TOTAL", -1., 0.)
consume_material = gcmd.get_float("CONSUME_MATERIAL", 0., 0.)
if total_layer == 0:
self.info_total_layer = None
self.info_current_layer = None
@ -87,15 +130,37 @@ class PrintStats:
current_layer is not None and \
current_layer != self.info_current_layer:
self.info_current_layer = min(current_layer, self.info_total_layer)
if material_name:
self.material_name = material_name
if material_unit:
self.material_unit = material_unit
if material_total >= 0:
self.material_total = material_total
if consume_material > 0:
self.last_material_used = consume_material
self.material_used += consume_material
if self.material_used > self.material_total:
self.material_total = self.material_used
def reset(self):
self.filename = self.error_message = ""
self.state = "standby"
self.prev_pause_duration = self.last_epos = 0.
self.filament_used = self.total_duration = 0.
self.material_used = self.total_duration = 0.
self.material_total = self.last_material_used = 0.
self.material_name = ''
self.print_start_time = self.last_pause_time = None
self.init_duration = 0.
self.info_total_layer = None
self.info_current_layer = None
if self.toolhead is not None:
self._init_delayed_stats()
def get_status(self, eventtime):
time_paused = self.prev_pause_duration
if self.print_start_time is not None:
@ -106,7 +171,7 @@ class PrintStats:
# Accumulate filament if not paused
self._update_filament_usage(eventtime)
self.total_duration = eventtime - self.print_start_time
if self.filament_used < 0.0000001:
if self.material_used < 0.0000001:
# Track duration prior to extrusion
self.init_duration = self.total_duration - time_paused
print_duration = self.total_duration - self.init_duration - time_paused
@ -114,11 +179,17 @@ class PrintStats:
'filename': self.filename,
'total_duration': self.total_duration,
'print_duration': print_duration,
'filament_used': self.filament_used,
'filament_used': self.material_used, # Deprecated to: material_used
'material_type': self.material_type, # Material type, eg: filament
'material_name': self.material_name, # Material name by user
'material_unit': self.material_unit, # Material measure unit
'material_total': self.material_total, # Total material in a print
'material_used': self.material_used, # Amount of used material
'state': self.state,
'message': self.error_message,
'info': {'total_layer': self.info_total_layer,
'current_layer': self.info_current_layer}
'current_layer': self.info_current_layer,
'last_material_used': self.last_material_used}
}
def load_config(config):

View file

@ -4,8 +4,12 @@
#
# This file may be distributed under the terms of the GNU GPLv3 license.
import os, sys, logging, io
import hashlib, shutil
import zipfile, tarfile
import time
VALID_GCODE_EXTS = ['gcode', 'g', 'gco']
VALID_ARCHIVE_EXTS = ['zip', 'tar']
DEFAULT_ERROR_GCODE = """
{% if 'heaters' in printer %}
@ -13,12 +17,26 @@ DEFAULT_ERROR_GCODE = """
{% endif %}
"""
DEFAULT_ARCHIVE_HASH_FILENAME = 'hash.md5'
class VirtualSD:
def __init__(self, config):
self.printer = config.get_printer()
# sdcard state
sd = config.get('path')
self.sdcard_dirname = os.path.normpath(os.path.expanduser(sd))
archive_temp_path = os.path.join(os.sep,
'tmp',
'klipper-archive-print-contents')
if self.sdcard_dirname.count(os.sep) >= 3:
archive_temp_path = os.path.normpath(
os.path.expanduser(
os.path.join(self.sdcard_dirname, '..', 'tmparchive')))
archive_temp_path = config.get('archive_temp_path', archive_temp_path)
self.sdcard_archive_temp_dirname = os.path.normpath(
os.path.expanduser(archive_temp_path))
self.current_file = None
self.file_position = self.file_size = 0
# Print Stat Tracking
@ -64,6 +82,20 @@ class VirtualSD:
if self.work_timer is None:
return False, ""
return True, "sd_pos=%d" % (self.file_position,)
def get_archive_entries(self, path):
if path.endswith('.zip'):
try:
with zipfile.ZipFile(path, "r") as file:
return file.namelist()
except:
pass
elif path.endswith('.tar'):
try:
with tarfile.open(path, "r") as file:
return file.getnames()
except:
pass
return []
def get_file_list(self, check_subdirs=False):
if check_subdirs:
flist = []
@ -71,10 +103,27 @@ class VirtualSD:
self.sdcard_dirname, followlinks=True):
for name in files:
ext = name[name.rfind('.')+1:]
if ext not in VALID_GCODE_EXTS:
if ext not in VALID_GCODE_EXTS + VALID_ARCHIVE_EXTS:
continue
full_path = os.path.join(root, name)
r_path = full_path[len(self.sdcard_dirname) + 1:]
if ext in VALID_ARCHIVE_EXTS:
entries = self.get_archive_entries(full_path)
# Support only 1 gcode file as if there's more we are
# unable to guess which to print
count = 0
for entry in entries:
entry_ext = entry[entry.rfind('.') + 1:]
if entry_ext in VALID_GCODE_EXTS:
count += 1
if count != 1:
continue
size = os.path.getsize(full_path)
flist.append((r_path, size))
return sorted(flist, key=lambda f: f[0].lower())
@ -182,11 +231,77 @@ class VirtualSD:
try:
if fname not in flist:
fname = files_by_lower[fname.lower()]
fname = os.path.join(self.sdcard_dirname, fname)
ext = fname[fname.rfind('.') + 1:]
if ext in VALID_ARCHIVE_EXTS:
need_extract = True
hashfile = os.path.join(self.sdcard_archive_temp_dirname,
DEFAULT_ARCHIVE_HASH_FILENAME)
gcmd.respond_raw("Calculating %s hash" % fname)
hash = self._file_hash(os.path.join(self.sdcard_dirname, fname))
if os.path.isfile(hashfile):
with open(hashfile, 'r') as f:
found_hash = f.readline()
if len(found_hash) == 32:
if hash == found_hash:
need_extract = False
if ext == 'zip':
with zipfile.ZipFile(os.path.join(
self.sdcard_dirname, fname),"r") as zip_file:
if need_extract:
if os.path.isdir(self.sdcard_archive_temp_dirname):
shutil.rmtree(self.sdcard_archive_temp_dirname)
gcmd.respond_raw("Decompressing %s..." % fname)
timenow = time.time()
zip_file.extractall(
self.sdcard_archive_temp_dirname)
timenow = time.time() - timenow
gcmd.respond_raw("Decompress done in %.2f seconds"
% timenow)
with open(hashfile, 'w') as f:
f.write(hash)
entries = zip_file.namelist()
for entry in entries:
entry_ext = entry[entry.rfind('.') + 1:]
if entry_ext in VALID_GCODE_EXTS:
fname = os.path.join(os.path.join(
self.sdcard_archive_temp_dirname, entry))
break
elif ext == 'tar':
with tarfile.open(os.path.join(self.sdcard_dirname, fname),
"r") as tar_file:
if need_extract:
if os.path.isdir(self.sdcard_archive_temp_dirname):
shutil.rmtree(self.sdcard_archive_temp_dirname)
gcmd.respond_raw("Decompressing %s..." % fname)
timenow = time.time()
tar_file.extractall(
self.sdcard_archive_temp_dirname)
timenow = time.time() - timenow
gcmd.respond_raw("Decompress done in %.2f seconds"
% timenow)
with open(hashfile, 'w') as f:
f.write(hash)
entries = tar_file.getnames()
for entry in entries:
entry_ext = entry[entry.rfind('.') + 1:]
if entry_ext in VALID_GCODE_EXTS:
fname = os.path.join(os.path.join(
self.sdcard_archive_temp_dirname, entry))
break
else:
fname = os.path.join(self.sdcard_dirname, fname)
f = io.open(fname, 'r', newline='')
f.seek(0, os.SEEK_END)
fsize = f.tell()
f.seek(0)
except:
logging.exception("virtual_sdcard file open")
raise gcmd.error("Unable to open file")
@ -223,7 +338,8 @@ class VirtualSD:
return self.cmd_from_sd
# Background work timer
def work_handler(self, eventtime):
logging.info("Starting SD card print (position %d)", self.file_position)
logging.info("Starting SD card print (position %d)",
self.file_position)
self.reactor.unregister_timer(self.work_timer)
try:
self.current_file.seek(self.file_position)
@ -293,7 +409,8 @@ class VirtualSD:
return self.reactor.NEVER
lines = []
partial_input = ""
logging.info("Exiting SD card print (position %d)", self.file_position)
logging.info("Exiting SD card print (position %d)",
self.file_position)
self.work_timer = None
self.cmd_from_sd = False
if error_message is not None:
@ -303,6 +420,15 @@ class VirtualSD:
else:
self.print_stats.note_complete()
return self.reactor.NEVER
def _file_hash(self, filepath, block_size=2**20):
md5 = hashlib.md5()
with open(filepath, "rb") as f:
while True:
data = f.read(block_size)
if not data:
break
md5.update(data)
return md5.hexdigest()
def load_config(config):
return VirtualSD(config)

102
klippy/kinematics/zaxis.py Normal file
View file

@ -0,0 +1,102 @@
# Code for handling the kinematics of a single Z Axis
# This kinematic still need to fake XY presence
#
# Copyright (C) 2017-2020 Kevin O'Connor <kevin@koconnor.net>
#
# This file may be distributed under the terms of the GNU GPLv3 license.
import stepper
class ZAxisKinematics:
def __init__(self, toolhead, config):
self.printer = config.get_printer()
# Setup axis rails
self.rails = [stepper.LookupMultiRail(config.getsection('stepper_' + n))
for n in 'z']
for rail, axis in zip(self.rails, 'z'):
rail.setup_itersolve('cartesian_stepper_alloc', axis.encode())
ranges = [r.get_range() for r in self.rails]
self.axes_min = toolhead.Coord(0, 0, ranges[0][0], e=0.)
self.axes_max = toolhead.Coord(0, 0, ranges[0][1], e=0.)
for s in self.get_steppers():
s.set_trapq(toolhead.get_trapq())
toolhead.register_step_generator(s.generate_steps)
self.printer.register_event_handler("stepper_enable:motor_off",
self._motor_off)
# Setup boundary checks
max_velocity, max_accel = toolhead.get_max_velocity()
self.max_z_velocity = config.getfloat('max_z_velocity', max_velocity,
above=0., maxval=max_velocity)
self.max_z_accel = config.getfloat('max_z_accel', max_accel,
above=0., maxval=max_accel)
self.limits = [(1.0, 1.0),(1.0, 1.0),(1.0, -1.0)]
def get_steppers(self):
return [s for rail in self.rails for s in rail.get_steppers()]
def calc_position(self, stepper_positions):
pos = [stepper_positions[rail.get_name()] for rail in self.rails]
return [0, 0, pos[0]]
def update_limits(self, i, range):
l, h = self.limits[i]
# Only update limits if this axis was already homed,
# otherwise leave in un-homed state.
if l <= h:
self.limits[i] = range
def override_rail(self, i, rail):
self.rails[i] = rail
def set_position(self, newpos, homing_axes):
for i, rail in enumerate(self.rails):
rail.set_position(newpos)
if 2 in homing_axes:
self.limits[2] = rail.get_range()
def note_z_not_homed(self):
# Helper for Safe Z Home
self.limits[2] = (1.0, -1.0)
def home_axis(self, homing_state, axis, rail):
# Determine movement
position_min, position_max = rail.get_range()
hi = rail.get_homing_info()
homepos = [None, None, None, None]
homepos[axis] = hi.position_endstop
forcepos = list(homepos)
if hi.positive_dir:
forcepos[axis] -= 1.5 * (hi.position_endstop - position_min)
else:
forcepos[axis] += 1.5 * (position_max - hi.position_endstop)
# Perform homing
homing_state.home_rails([rail], forcepos, homepos)
def home(self, homing_state):
# Each axis is homed independently and in order
for axis in homing_state.get_axes():
if axis == 2:
self.home_axis(homing_state, axis, self.rails[0])
#self.home_axis(homing_state, axis, self.rails[axis])
def _motor_off(self, print_time):
self.limits = [(1.0, 1.0),(1.0, 1.0),(1.0, -1.0)]
def _check_endstops(self, move):
end_pos = move.end_pos
for i in (0, 1, 2,):
if (move.axes_d[i]
and (end_pos[i] < self.limits[i][0]
or end_pos[i] > self.limits[i][1])):
if self.limits[i][0] > self.limits[i][1]:
raise move.move_error("Must home axis first")
raise move.move_error()
def check_move(self, move):
if not move.axes_d[2]:
# Normal XY move - use defaults
return
# Move with Z - update velocity and accel for slower Z axis
self._check_endstops(move)
z_ratio = move.move_d / abs(move.axes_d[2])
move.limit_speed(
self.max_z_velocity * z_ratio, self.max_z_accel * z_ratio)
def get_status(self, eventtime):
axes = [a for a, (l, h) in zip("z", self.limits) if l <= h]
return {
'homed_axes': "".join(axes),
'axis_minimum': self.axes_min,
'axis_maximum': self.axes_max,
}
def load_kinematics(toolhead, config):
return ZAxisKinematics(toolhead, config)

View file

@ -205,6 +205,20 @@ class ToolHead:
self.lookahead = LookAheadQueue()
self.lookahead.set_flush_time(BUFFER_TIME_HIGH)
self.commanded_pos = [0., 0., 0., 0.]
# The manufacturing process type
atypes = {'FDM': 'FDM', 'SLA': 'SLA', 'mSLA': 'mSLA', 'DLP': 'DLP'}
self.manufacturing_process = config.getchoice('manufacturing_process',
atypes, default='FDM')
if self.manufacturing_process in ('mSLA', 'DLP'):
for s in ('msla_display',):
section = self.printer.lookup_object(s, None)
if section is None:
msg = ("Error: A section with [%s] is required "
"for mSLA/DLP printers.") % s
logging.exception(msg)
raise config.error(msg)
# Velocity and acceleration control
self.max_velocity = config.getfloat('max_velocity', above=0.)
self.max_accel = config.getfloat('max_accel', above=0.)
@ -562,6 +576,9 @@ class ToolHeadCommandHelper:
self.cmd_SET_VELOCITY_LIMIT,
desc=self.cmd_SET_VELOCITY_LIMIT_help)
gcode.register_command('M204', self.cmd_M204)
gcode.register_command('QUERY_MANUFACTORING_PROCESS',
self.cmd_QUERY_MANUFACTORING_PROCESS,
desc="Query manufacturing process")
def cmd_G4(self, gcmd):
# Dwell
delay = gcmd.get_float('P', 0., minval=0.) / 1000.
@ -601,6 +618,14 @@ class ToolHeadCommandHelper:
accel = min(p, t)
self.toolhead.set_max_velocities(None, accel, None, None)
def cmd_QUERY_MANUFACTORING_PROCESS(self, gcmd):
"""
Returns the manufacturing process
@param gcmd:
"""
gcmd.respond_raw(self.manufacturing_process)
def add_printer_objects(config):
printer = config.get_printer()
printer.add_object('toolhead', ToolHead(config))

View file

@ -19,3 +19,4 @@ python-can==3.3.4
setuptools==78.1.1 ; python_version >= '3.12' # Needed by python-can
# msgspec is an optional dependency of webhooks.py
msgspec==0.19.0 ; python_version >= '3.9'
pillow