# BEGIN OF LICENSE NOTE
# This file is part of Pyoints.
# Copyright (c) 2018, Sebastian Lamprecht, Trier University,
# lamprecht@uni-trier.de
#
# Pyoints is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Pyoints is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Pyoints. If not, see <https://www.gnu.org/licenses/>.
# END OF LICENSE NOTE
"""Handling of .las-files.
"""
import os
import sys
import numpy as np
import laspy
try:
# use liblas to provide full spatial reference support (workaround)
import liblas
except ImportError:
pass
from ..extent import Extent
from ..georecords import (
GeoRecords,
LasRecords,
)
from .. import (
transformation,
projection,
assertion,
nptools,
)
from .BaseGeoHandler import GeoFile
from .dtype_converters import numpy_to_laspy_dtype
SUPPORTED_FORMATS = [0, 1, 2, 3, 4, 5]
[docs]class LasReader(GeoFile):
"""Class to read .las-files.
Parameters
----------
infile : String
Las file to be read.
proj : optional, Proj
Spatial reference system. Usually provided only, if the spatial
reference of the file has not be set yet.
See Also
--------
GeoFile
"""
def __init__(self, infile, proj=None):
GeoFile.__init__(self, infile)
lasFile = laspy.file.File(self.file, mode='r')
# try to read projection from file
if proj is None:
if 'liblas' in sys.modules:
reader = liblas.file.File(self.file, mode='r')
wkt = reader.header.srs.get_wkt().decode('utf-8')
if not wkt == '':
proj = projection.Proj.from_wkt(wkt)
reader.close()
else:
for vlr in lasFile.header.vlrs:
if vlr.record_id == 2112:
# Spatial reference in well known text format
wkt = str(vlr.VLR_body.decode('utf-8'))
proj = projection.Proj.from_wkt(wkt)
elif vlr.record_id == 34735:
# GeoTIFF GeoKeyDirectoryTag
# not supported yet
pass
elif vlr.record_id == 34736:
# GeoTIFF GeoDoubleParamsTag
# not supported yet
pass
elif vlr.record_id == 34737:
# GeoTIFF GeoAsciiParamsTag
# not supported yet
pass
self.proj = proj
self.date = lasFile.header.date
self.t = transformation.t_matrix(lasFile.header.offset)
self._extent = Extent((lasFile.header.min, lasFile.header.max))
self._count = int(lasFile.header.point_records_count)
lasFile.close()
del lasFile
def __len__(self):
return self._count
@property
def extent(self):
return self._extent
@property
def corners(self):
return Extent(self.extent[[0, 1, 3, 4]]).corners
[docs] def load(self, extent=None):
lasFile = laspy.file.File(self.file, mode='r')
# check point formats
if lasFile.header.data_format_id not in SUPPORTED_FORMATS:
m = "Only point formats %s supported yet, got %"
raise ValueError(
m %
(SUPPORTED_FORMATS, lasFile.header.data_format_id))
date = lasFile.header.date
scale = np.array(lasFile.header.scale, dtype=np.float64)
offset = np.array(lasFile.header.offset, dtype=np.float64)
las_fields = [
str(dim.name.encode().decode()) for dim in lasFile.point_format
] # ugly workaround to get actual strings
points = lasFile.points['point'].copy().view(np.recarray)
# Close File
lasFile.close()
del lasFile
# filter by extent (before scaling)
if extent is not None:
ext = Extent(extent)
if ext.dim == 2:
ecoords = np.vstack([points.X, points.Y])
else:
ecoords = np.vstack([points.X, points.Y, points.Z])
iext = Extent([
(ext.min_corner - offset[:ext.dim]) / scale[:ext.dim],
(ext.max_corner - offset[:ext.dim]) / scale[:ext.dim]
])
sids = iext.intersection(ecoords.T)
points = points[sids]
# much faster than accessing lasFile.x
coords = np.empty((len(points), 3), dtype=np.float64)
coords[:, 0] = points.X * scale[0] + offset[0]
coords[:, 1] = points.Y * scale[1] + offset[1]
coords[:, 2] = points.Z * scale[2] + offset[2]
# grep data
omit = ['X', 'Y', 'Z']
dtypes = []
dataDict = {'coords': coords}
for name in las_fields:
if name == 'flag_byte':
values = points.flag_byte
if np.any(values):
dataDict['return_num'] = values % 8 # bits 0, 1, 2
values = values // 8
if np.any(values):
dataDict['num_returns'] = values % 8 # bits 3, 4, 5
values = values // 8
if np.any(values):
dataDict['scan_direction_flag'] = values % 2 # bit 6
values = values // 2
if np.any(values):
dataDict['edge_of_flight_line'] = values # bit 7
elif name == 'raw_classification':
values = points.raw_classification
if np.any(values):
dataDict['classification'] = values % 32 # bits 0 to 4
values = values // 32
if np.any(values):
dataDict['synthetic'] = values % 2 # bit 5
values = values // 2
if np.any(values):
dataDict['keypoint'] = values % 2 # bit 6
values = values // 2
if np.any(values):
dataDict['withheld'] = values # bit 7
elif name not in omit:
values = points[name]
if np.any(values):
dataDict[name] = values
# collect dtypes
available_dtypes = LasRecords.available_fields()
for name in dataDict.keys():
for descr in available_dtypes:
if descr[0] == name:
dtypes.append(descr)
# create recarray
data = nptools.recarray(dataDict, dtype=dtypes)
if len(points) == 0:
t = np.eye(4)
else:
t = transformation.t_matrix(offset)
return LasRecords(self.proj, data, T=t, date=date)
[docs]def writeLas(geoRecords, outfile, point_format=3):
""" Write a LAS file to disc.
Parameters
----------
geoRecords : GeoRecords
Points to store to disk.
outfile : String
Desired output file.
point_format : optional, positive int
Desired LAS point format. See LAS specification for details.
"""
# validate input
if not isinstance(geoRecords, GeoRecords):
raise TypeError("'geoRecords' needs to be of type 'GeoRecords'")
if not os.access(os.path.dirname(outfile), os.W_OK):
raise IOError('File %s is not writable' % outfile)
if point_format not in SUPPORTED_FORMATS:
raise ValueError("'point_format' %s not supported" % str(point_format))
records = geoRecords.records()
# Create file header
header = laspy.header.Header(file_version=1.3, point_format=point_format)
header.file_sig = 'LASF'
# Open file in write mode
lasFile = laspy.file.File(outfile, mode='w', header=header)
# create VLR records
vlrs = []
if 'liblas' in sys.modules:
# use liblas to create spatial reference
srs = liblas.srs.SRS()
srs.set_wkt(str.encode(geoRecords.proj.wkt))
for i in range(srs.vlr_count()):
vlr = laspy.header.VLR(
user_id="LASF_Projection",
record_id=srs.GetVLR(i).recordid,
VLR_body=srs.GetVLR(i).data,
description="OGC Coordinate System GeoTIFF"
)
vlr.parse_data()
vlrs.append(vlr)
else:
# create wkt record only
vlr = laspy.header.VLR(
user_id="LASF_Projection",
record_id=2112,
VLR_body=str.encode(geoRecords.proj.wkt),
description="OGC Coordinate System WKT"
)
vlrs.append(vlr)
# set VLRs
lasFile.header.set_vlrs(vlrs)
if point_format > 5:
lasFile.header.wkt = 1
dim = min(geoRecords.dim, 3)
# find optimal offset and scale scale to achieve highest precision
offset = np.zeros(3)
scale = np.ones(3)
offset[:dim] = geoRecords.t.origin
max_values = np.abs(records.extent().corners - offset[:dim]).max(0)
max_digits = 2**28 # long int
scale[:dim] = max_values / max_digits
scale[np.isclose(scale, 0)] = 1 / max_digits
if geoRecords.date is not None:
lasFile.header.date = geoRecords.date
lasFile.header.scale = scale.copy()
lasFile.header.offset = offset.copy()
# get fields
las_fields = [field.name for field in lasFile.point_format]
field_names = records.dtype.names
# Fields to omit
omit = ['X', 'Y', 'Z', 'flag_byte', 'raw_classification']
omit.extend(las_fields)
omit.extend(np.dtype(LasRecords.CUSTOM_FIELDS).names)
# create user defined fields
for name in field_names:
if name not in omit:
dtype = records.dtype[name]
type_id = numpy_to_laspy_dtype(dtype)
if type_id is None:
omit.append(name)
else:
lasFile.define_new_dimension(name, type_id, '')
# initialize
flag_byte = np.zeros(len(records), dtype=np.uint)
raw_classification = np.zeros(len(records), dtype=np.uint8)
# set fields
for name in field_names:
if name == 'coords':
lasFile.set_x_scaled(records.coords[:, 0])
lasFile.set_y_scaled(records.coords[:, 1])
if records.dim > 2:
lasFile.set_z_scaled(records.coords[:, 2])
elif name == 'classification': # bits 0, 1, 2, 3, 4
raw_classification += records.classification
elif name == 'synthetic': # bit 5
raw_classification += records.synthetic.astype(np.uint8) * 32
elif name == 'keypoint': # bit 6
raw_classification += records.keypoint.astype(np.uint8) * 64
elif name == 'withheld': # bit 7
raw_classification += records.withheld.astype(np.uint8) * 128
elif name == 'return_num': # bits 0, 1, 2
flag_byte += records.return_num
elif name == 'num_returns': # bits 3, 4, 5
flag_byte += records.num_returns * 8
elif name == 'scan_direction_flag': # bit 6
flag_byte += records.scan_direction_flag.astype(np.uint8) * 64
elif name == 'edge_of_flight_line': # bit 7
flag_byte += records.edge_of_flight_line.astype(np.uint8) * 128
elif name in las_fields or name not in omit:
if np.any(records[name]):
#lasFile[name] = records[name]
lasFile._writer.set_dimension(name, records[name])
if np.any(flag_byte):
lasFile.set_flag_byte(flag_byte)
if np.any(raw_classification):
lasFile.set_raw_classification(raw_classification)
# close file
lasFile.header.update_min_max()
lasFile.close()
del lasFile
def _updateLasHeader(las_file, offset=None, translate=None, precision=None):
# experimental
lasFile = laspy.file.File(las_file, mode='rw')
if precision is not None:
precision = assertion.ensure_numvector(precision)
if not len(precision) == 3:
raise ValueError('"precision" has to have a length of 3')
scale = np.repeat(10.0, 3)**-np.array(precision)
lasFile.header.scale = scale
if offset is not None:
offset = assertion.ensure_numvector(offset)
if not len(offset) == 3:
raise ValueError('"offset" has to have a length of 3')
lasFile.header.offset = offset
if translate is not None:
translate = assertion.ensure_numvector(translate)
if not len(translate) == 3:
raise ValueError('"translate" has to have a length of 3')
lasFile.header.offset = lasFile.header.offset + translate
lasFile.header.update_min_max()
lasFile.close()
del lasFile
def _createTypeTestLas(outfile):
# experimental
# Create file header
header = laspy.header.Header()
header.file_sig = 'LASF'
header.format = 1.2
header.data_format_id = 3
# Open file in write mode
lasFile = laspy.file.File(outfile, mode='w', header=header)
lasFile.header.scale = [1, 1, 1]
lasFile.header.offset = [0, 0, 0]
names = []
for type_id in range(1, 31):
name = 'field_%i' % type_id
if type_id not in [8, 18, 28]:
lasFile.define_new_dimension(name, type_id, '')
names.append(name)
k = 10
lasFile.x = np.random.rand(k)
lasFile.y = np.random.rand(k)
lasFile.z = np.random.rand(k)
lasFile.header.update_min_max()
lasFile.close()
del lasFile