# Copyright (C) 2017-2019 COAL Developers
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; version 2.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty
# of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
# See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public
# License along with this program; if not, write to the Free
# Software Foundation, Inc., 51 Franklin Street, Fifth
# Floor, Boston, MA 02110-1301, USA.
import sys
import os
import logging
import math
import numpy
from spectral.io.spyfile import SubImage
import pycoal
import spectral
import time
import fnmatch
import shutil
"""
Classifier callbacks functions must have at least the following args: library,
image_file_name, classified_file_name; which will always be passed by the calling
method (MineralClassification.classify_image). The remaining arguments, specific
of each classifier function, will also be passed by the calling function, but are
optionals and may vary from one classifier to another.
"""
def SAM(image_file_name, classified_file_name, library_file_name, scores_file_name=None, class_names=None,
threshold=0.0, in_memory=False, subset_rows=None, subset_cols=None):
"""
Parameter 'scores_file_name' optionally receives the path to where to save
an image that holds all the SAM scores yielded for each pixel of the
classified image. No score image is create if not provided.
The optional 'threshold' parameter defines a confidence value between zero
and one below which SAM classifications will be discarded, otherwise all
classifications will be included.
In order to improve performance on systems with sufficient memory,
enable the optional parameter to load entire images.
Args:
library_file_name (str): filename of the spectral library
image_file_name (str): filename of the image to be classified
classified_file_name (str): filename of the classified image
scores_file_name (str, optional): filename of the image to hold each pixel's classification score
class_names (str[], optional): list of classes' names to include
threshold (float, optional): classification threshold
in_memory (boolean, optional): enable loading entire image
subset_rows (2-tuple, optional): range of rows to read (empty to read the whole image)
subset_cols (2-tuple, optional): range of columns to read (empty to read the whole image)
Returns:
None
"""
# load and optionally subset the spectral library
library = spectral.open_image(library_file_name)
if class_names is not None:
library = pycoal.mineral.MineralClassification.subset_spectral_library(library, class_names)
# open the image
image = spectral.open_image(image_file_name)
if subset_rows is not None and subset_cols is not None:
subset_image = SubImage(image, subset_rows, subset_cols)
M = subset_rows[1]
N = subset_cols[1]
else:
if in_memory:
data = image.load()
else:
data = image.asarray()
M = image.shape[0]
N = image.shape[1]
logging.info("Classifying a %iX%i image" % (M, N))
# define a resampler
# TODO detect and scale units
# TODO band resampler should do this
resample = spectral.BandResampler([x / 1000 for x in image.bands.centers],
library.bands.centers)
# allocate a zero-initialized MxN array for the classified image
classified = numpy.zeros(shape=(M, N), dtype=numpy.uint16)
if scores_file_name is not None:
# allocate a zero-initialized MxN array for the scores image
scored = numpy.zeros(shape=(M, N), dtype=numpy.float64)
# for each pixel in the image
for x in range(M):
for y in range(N):
# read the pixel from the file
if subset_rows is not None and subset_cols is not None:
pixel = subset_image.read_pixel(x, y)
else:
pixel = data[x, y]
# if it is not a no data pixel
if not numpy.isclose(pixel[0], -0.005) and not pixel[0] == -50:
# resample the pixel ignoring NaNs from target bands that don't overlap
# TODO fix spectral library so that bands are in order
resampled_pixel = numpy.nan_to_num(resample(pixel))
# calculate spectral angles
angles = spectral.spectral_angles(resampled_pixel[numpy.newaxis,
numpy.newaxis,
...],
library.spectra)
# normalize confidence values from [pi,0] to [0,1]
for z in range(angles.shape[2]):
angles[0, 0, z] = 1 - angles[0, 0, z] / math.pi
# get index of class with largest confidence value
index_of_max = numpy.argmax(angles)
# get confidence value of the classified pixel
score = angles[0, 0, index_of_max]
# classify pixel if confidence above threshold
if score > threshold:
# index from one (after zero for no data)
classified[x, y] = index_of_max + 1
if scores_file_name is not None:
# store score value
scored[x, y] = score
# save the classified image to a file
spectral.io.envi.save_classification(
classified_file_name,
classified,
class_names=['No data'] + library.names,
metadata={
'data ignore value': 0,
'description': 'COAL ' + pycoal.version + ' mineral classified image.',
'map info': image.metadata.get('map info')
})
# remove unused classes from the image
pycoal.mineral.MineralClassification.filter_classes(classified_file_name)
if scores_file_name is not None:
# save the scored image to a file
spectral.io.envi.save_image(
scores_file_name,
scored,
dtype=numpy.float64,
metadata={
'data ignore value': -50,
'description': 'COAL ' + pycoal.version + ' mineral scored image.',
'map info': image.metadata.get('map info')
})
def avngDNN(image_file_name, classified_file_name, model_file_name, class_names=None, scores_file_name=None,
in_memory=False):
"""
This callback function takes a Keras model, trained to classify pixels from AVIRIS-NG
imagery, and assigns a class for every single pixel of the input image.
Parameter 'scores_file_name' optionally receives the path to where to save
an image that holds all the SAM scores yielded for each pixel of the
classified image. No score image is create if not provided.
In order to improve performance on systems with sufficient memory,
enable the optional parameter to load entire images.
Args:
image_file_name (str): filename of the image to be classified
classified_file_name (str): filename of the classified image
model_file_name (str): filename of the Keras model used to classify
class_names (str[], optional): list of names of classes handled by the model
scores_file_name (str): filename of the image to hold each pixel's classification score
in_memory (boolean, optional): enable loading entire image
Returns:
None
"""
from keras.models import load_model
# open the image
image = spectral.open_image(image_file_name)
if in_memory:
data = image.load()
else:
data = image.asarray()
M = image.shape[0]
N = image.shape[1]
# allocate a zero-initialized MxN array for the classified image
classified = numpy.zeros(shape=(M, N), dtype=numpy.uint16)
if scores_file_name is not None:
# allocate a zero-initialized MxN array for the scores image
scored = numpy.zeros(shape=(M, N), dtype=numpy.float64)
model = load_model(model_file_name)
# for each pixel in the image
for x in range(M):
for y in range(N):
# read the pixel from the file
pixel = numpy.array(data[x, y])
# adjust shape to comprise the AVIRIS-NG bands and comply with Keras model input format
pixel = numpy.reshape(pixel, (1, 432, 1))
# get the scores for each class considered
predict = model.predict(pixel)
# get the index of the class into the outputted list
index_of_max = numpy.argmax(predict).astype(numpy.uint16)
# store the class index (0 meaning nodata)
classified[x, y] = index_of_max + 1
# store the outcome score
if scores_file_name is not None:
scored[x, y] = predict[0][index_of_max]
# save the classified image to a file
spectral.io.envi.save_classification(
classified_file_name,
classified,
class_names=['No data'] + class_names,
metadata={
'data ignore value': 0,
'description': 'COAL ' + pycoal.version + ' mineral classified image.',
'map info': image.metadata.get('map info')
})
if scores_file_name is not None:
# save the scored image to a file
spectral.io.envi.save_image(
scores_file_name,
scored,
dtype=numpy.float64,
metadata={
'data ignore value': -50,
'description': 'COAL ' + pycoal.version + ' mineral scored image.',
'map info': image.metadata.get('map info')
})
[docs]class MineralClassification:
[docs] def __init__(self, algorithm=SAM, **kwargs):
"""
Construct a new ``MineralClassification`` object with a spectral library
in ENVI format such as the `USGS Digital Spectral Library 06
<https://speclab.cr.usgs.gov/spectral.lib06/>`_ or the `ASTER Spectral
Library Version 2.0 <https://asterweb.jpl.nasa.gov/>`_ converted with
``pycoal.mineral.AsterConversion.convert()``.
If provided, the optional class name parameter will initialize the
classifier with a subset of the spectral library, otherwise the full
spectral library will be used.
Args:
algorithm (function, optional): the classifier callback
**kwargs: arguments that will be passed to the chosen classifier
"""
# set the user's chosen classifier
self.algorithm = algorithm
# hold the remaining arguments that will be passed to self.algorithm
self.args = kwargs
logging.info("Instantiated Mineral Classifier with following specification: " \
"-classifier function '%s'"
% (self.algorithm.__name__,))
[docs] def classify_image(self, image_file_name, classified_file_name):
"""
Classify minerals in an AVIRIS image using chosen specified
classification algorithm and save the results to a file.
Args:
image_file_name (str): filename of the image to be classified
classified_file_name (str): filename of the classified image
Returns:
None
"""
start = time.time()
logging.info("Starting Mineral Classification for image '%s', saving classified image to '%s'"
% (image_file_name, classified_file_name))
# run the classifier callback expanding self.args to fulfill the specific args of the function
self.algorithm(image_file_name, classified_file_name, **self.args)
end = time.time()
seconds_elapsed = end - start
m, s = divmod(seconds_elapsed, 60)
h, m = divmod(m, 60)
logging.info("Completed Mineral Classification. Time elapsed: '%d:%02d:%02d'" % (h, m, s))
[docs] @staticmethod
def filter_classes(classified_file_name):
"""
Modify a classified image to remove unused classes.
Args:
classified_file_name (str): file of the classified image
Returns:
None
"""
# open the image
classified = spectral.open_image(classified_file_name)
data = classified.asarray()
M = classified.shape[0]
N = classified.shape[1]
# allocate a copy for reindexed pixels
copy = numpy.zeros(shape=(M, N), dtype=numpy.uint16)
# find classes actually present in the image
classes = sorted(set(classified.asarray().flatten().tolist()))
lookup = [classes.index(i) if i in classes else 0 for i in range(int(classified.metadata.get('classes')))]
# reindex each pixel
for x in range(M):
for y in range(N):
copy[x, y] = lookup[data[x, y, 0]]
# overwrite the file
spectral.io.envi.save_classification(
classified_file_name,
copy,
force=True,
class_names=[classified.metadata.get('class names')[i] for i in classes],
metadata=classified.metadata)
[docs] @staticmethod
def to_rgb(image_file_name, rgb_image_file_name, red=680.0, green=532.5, blue=472.5):
"""
Generate a three-band RGB image from an AVIRIS image and save it to a file.
Args:
image_file_name (str): filename of the source image
rgb_image_file_name (str): filename of the three-band RGB image
red (float, optional): wavelength in nanometers of the red band
green (float, optional): wavelength in nanometers of the green band
blue (float, optional): wavelength in nanometers of the blue band
Returns:
None
"""
# find the index of the first element in a list greater than the value
start = time.time()
logging.info(
"Starting generation of three-band RGB image from input file: '%s' with following RGB values R: '%s', G: '%s', B: '%s'" % (
image_file_name, red, green, blue))
def index_of_greater_than(elements, value):
for index, element in enumerate(elements):
if element > value:
return index
# open the image
image = spectral.open_image(image_file_name)
# load the list of wavelengths as floats
wavelength_strings = image.metadata.get('wavelength')
wavelength_floats = list(map(float, wavelength_strings))
# find the index of the red, green, and blue bands
red_index = index_of_greater_than(wavelength_floats, red)
green_index = index_of_greater_than(wavelength_floats, green)
blue_index = index_of_greater_than(wavelength_floats, blue)
# read the red, green, and blue bands from the image
red_band = image[:, :, red_index]
green_band = image[:, :, green_index]
blue_band = image[:, :, blue_index]
# remove no data pixels
for band in [red_band, green_band, blue_band]:
for x in range(band.shape[0]):
for y in range(band.shape[1]):
if numpy.isclose(band[x, y, 0], -0.005) or band[x, y, 0] == -50:
band[x, y] = 0
# combine the red, green, and blue bands into a three-band RGB image
rgb = numpy.concatenate([red_band, green_band, blue_band], axis=2)
# update the metadata
rgb_metadata = image.metadata
rgb_metadata['description'] = 'COAL ' + pycoal.version + ' three-band RGB image.'
rgb_metadata['data ignore value'] = 0
if wavelength_strings:
rgb_metadata['wavelength'] = [
wavelength_strings[red_index],
wavelength_strings[green_index],
wavelength_strings[blue_index]]
if image.metadata.get('correction factors'):
rgb_metadata['correction factors'] = [
image.metadata.get('correction factors')[red_index],
image.metadata.get('correction factors')[green_index],
image.metadata.get('correction factors')[blue_index]]
if image.metadata.get('fwhm'):
rgb_metadata['fwhm'] = [
image.metadata.get('fwhm')[red_index],
image.metadata.get('fwhm')[green_index],
image.metadata.get('fwhm')[blue_index]]
if image.metadata.get('bbl'):
rgb_metadata['bbl'] = [
image.metadata.get('bbl')[red_index],
image.metadata.get('bbl')[green_index],
image.metadata.get('bbl')[blue_index]]
if image.metadata.get('smoothing factors'):
rgb_metadata['smoothing factors'] = [
image.metadata.get('smoothing factors')[red_index],
image.metadata.get('smoothing factors')[green_index],
image.metadata.get('smoothing factors')[blue_index]]
# save the three-band RGB image to a file
logging.info("Saving RGB image as '%s'" % rgb_image_file_name)
spectral.envi.save_image(rgb_image_file_name, rgb, metadata=rgb_metadata)
end = time.time()
seconds_elapsed = end - start
m, s = divmod(seconds_elapsed, 60)
h, m = divmod(m, 60)
logging.info("Completed RGB image generation. Time elapsed: '%d:%02d:%02d'" % (h, m, s))
[docs] @staticmethod
def subset_spectral_library(spectral_library, class_names):
# adapted from https://git.io/v9ThM
"""
Create a copy of the spectral library containing only the named classes.
Args:
spectral_library (SpectralLibrary): ENVI spectral library
class_names (str[]): list of names of classes to include
Returns:
SpectralLibrary: subset of ENVI spectral library
"""
# empty array for spectra
spectra = numpy.empty((len(class_names), len(spectral_library.bands.centers)))
# empty list for names
names = []
# copy class spectra and names
for new_index, class_name in enumerate(class_names):
old_index = spectral_library.names.index(class_name)
spectra[new_index] = spectral_library.spectra[old_index]
names.append(class_name)
# copy metadata
metadata = {'wavelength units': spectral_library.metadata.get('wavelength units'),
'spectra names': names,
'wavelength': spectral_library.bands.centers}
# return new spectral library
return spectral.io.envi.SpectralLibrary(spectra, metadata, {})
class AsterConversion:
def __init__(self):
"""
This class provides a method for converting the `ASTER Spectral
Library Version 2.0 <https://asterweb.jpl.nasa.gov/>`_ into ENVI format.
Args:
None
"""
pass
@classmethod
def convert(cls, data_dir="", db_file="", hdr_file=""):
"""
This class method generates an ENVI format spectral library file.
``data_dir`` is optional as long as ``db_file`` is provided. Note that
generating an SQLite database takes upwards of 10 minutes and creating
an ENVI format file takes up to 5 minutes. Note: This feature is still
experimental.
Args:
data_dir (str, optional): path to directory containing ASCII data files
db_file (str): name of the SQLite file that either already exists if
``data_dir`` isn't provided, or will be generated if
``data_dir`` is provided
hdr_file (str): name of the ENVI spectral library to generate
(without the ``.hdr`` or ``.sli`` extension)
"""
if not hdr_file:
raise ValueError("Must provide path for generated ENVI header file.")
elif not db_file:
raise ValueError("Must provide path for sqlite file.")
if data_dir:
spectral.AsterDatabase.create(db_file, data_dir)
aster_database = spectral.AsterDatabase(db_file)
spectrum_ids = [x[0] for x in aster_database.query('SELECT SampleID FROM Samples').fetchall()]
band_min = 0.38315
band_max = 2.5082
band_num = 128
band_info = spectral.BandInfo()
band_info.centers = numpy.arange(band_min, band_max, (band_max - band_min) / band_num)
band_info.band_unit = 'micrometer'
library = aster_database.create_envi_spectral_library(spectrum_ids, band_info)
library.save(hdr_file)
class SpectalToAsterFileFormat:
def __init__(self):
"""
This class provides a method for converting `USGS Spectral Library Version 7
<https://speclab.cr.usgs.gov/spectral-lib.html>`_ .txt files into ASTER Spectral
Library Version 2.0 <https://asterweb.jpl.nasa.gov/> .txt files
Args:
none
"""
pass
@classmethod
def convert(cls, library_filename=""):
"""
This class method converts a `USGS Spectral Library Version 7
<https://speclab.cr.usgs.gov/spectral-lib.html>`_ .txt file into
an `ASTER Spectral Library Version 2.0 <https://asterweb.jpl.nasa.gov/>`_ .spectrum.txt file
ASTER Library Version 2.0 Spectral Library files are in .spectrum.txt file format
Spectral Library Version 7 can be downloaded `here <https://speclab.cr.usgs.gov/spectral-lib.html>`_
Args:
library_filename (str): path to Spectral File you wish to convert
"""
if not library_filename:
raise ValueError("Must provide path for Spectral File.")
line_count = 1
with open(library_filename, 'r') as input_file:
for line_count, l in enumerate(input_file):
pass
input_file = open(library_filename, 'r')
# Read Name of Spectra on first line of the file
spectra_line = input_file.readline()
spectra_name = spectra_line[23:]
k = 0
# Loop through file and store all wavelength values for the given Spectra
spectra_values_file = open('SpectraValues.txt', 'w')
spectra_wave_length = 0
while (k < line_count):
spectra_wave_length = float(input_file.readline()) * 100
spectra_wave_length = spectra_wave_length / 1000
spectra_wave_length = float("{0:.5f}".format(spectra_wave_length))
spectra_y_value = spectra_wave_length * 10
line = str(spectra_wave_length) + ' ' + str(spectra_y_value)
spectra_values_file.write(line)
spectra_values_file.write('\n')
k = k + 1
# Write new file in the form of an ASTER .spectrum.txt file while using stored
# Spectra Name and stored Spectra Wavelength values`
input_file = open(library_filename, 'w')
input_file.write('Name:')
input_file.write(spectra_name)
input_file.write('Type:\n')
input_file.write('Class:\n')
input_file.write('Subclass:\n')
input_file.write('Particle Size: Unknown\n')
input_file.write('Sample No.: 0000000000\n')
input_file.write('Owner:\n')
input_file.write('Wavelength Range: ALL\n')
input_file.write('Origin: Spectra obtained from the Noncoventional Exploitation Factors\n')
input_file.write('Data System of the National Photographic Interpretation Center.\n')
input_file.write('Description: Gray and black construction asphalt. The sample was\n')
input_file.write('soiled and weathered, with some limestone and quartz aggregate\n')
input_file.write('showing.\n')
input_file.write('\n')
input_file.write('\n')
input_file.write('\n')
input_file.write('Measurement: Unknown\n')
input_file.write('First Column: X\n')
input_file.write('Second Column: Y\n')
input_file.write('X Units: Wavelength (micrometers)\n')
input_file.write('Y Units: Reflectance (percent)\n')
input_file.write('First X Value:\n')
input_file.write('Last X Value:\n')
input_file.write('Number of X Values:\n')
input_file.write('Additional Information:\n')
input_file.write('\n')
j = 0
spectra_values_file.close()
# Read in values saved in SpectraValues.txt and output them to the library_filename
spectra_values_file = open('SpectraValues.txt', 'r')
while (j < line_count):
spectra_wave_length = spectra_values_file.readline()
input_file.write(spectra_wave_length)
j = j + 1
# Close all open files
input_file.close()
spectra_values_file.close()
# Rename library_filename to match ASTER .spectrum.txt file format
os.rename(library_filename, library_filename + '.spectrum.txt')
# Remove temporary file for storing wavelength data
os.remove('SpectraValues.txt')
class FullSpectralLibrary7Convert:
def __init__(self):
"""
This class method converts the entire `USGS Spectral Library Version 7
<https://speclab.cr.usgs.gov/spectral-lib.html>`_ library into
its convolved envi format
Args:
none
"""
pass
@classmethod
def convert(cls, library_filename=""):
"""
This class method converts the entire `USGS Spectral Library Version 7
<https://speclab.cr.usgs.gov/spectral-lib.html>`_ library into
its convolved envi format
Spectral Library Version 7 can be downloaded `here <https://speclab.cr.usgs.gov/spectral-lib.html>`_
Args:
library_filename (str): path to USGS Spectral Library Version 7 directory
"""
if not library_filename:
raise ValueError("Must provide path for USGS Spectral Library Version 7.")
# This will take all the necessary .txt files for spectra in USGS
# Spectral Library Version 7 and put them in a new directory called
# "usgs_splib07_modified" in the examples directory
directory = 'usgs_splib07_modified'
if not os.path.exists(directory):
os.makedirs(directory)
for root, dir, files in os.walk(library_filename + "/ASCIIdata"):
dir[:] = [d for d in dir]
for items in fnmatch.filter(files, "*.txt"):
if "Bandpass" not in items:
if "errorbars" not in items:
if "Wave" not in items:
if "SpectraValues" not in items:
shutil.copy2(os.path.join(root, items), directory)
# This will take the .txt files for Spectra in USGS Spectral Version 7 and
# convert their format to match that of ASTER .spectrum.txt files for spectra
# create a new mineral aster conversion instance
spectral_aster = SpectalToAsterFileFormat()
# List to check for duplicates
spectra_list = []
# Convert all files
files = os.listdir(directory + '/')
for x in range(0, len(files)):
name = directory + '/' + files[x]
# Get name
input_file = open(name, 'r')
spectra_line = input_file.readline()
spectra_cut = spectra_line[23:]
spectra_name = spectra_cut[:-14]
# Remove first and last char in case extra spaces are added
spectra_first_space = spectra_name[1:]
spectra_last_space = spectra_first_space[:-1]
# Check if Spectra is unique
set_spectra = set(spectra_list)
if not any(spectra_name in s for s in set_spectra):
if not any(spectra_last_space in a for a in set_spectra):
spectral_aster.convert(name)
spectra_list.append(spectra_name)
set_spectra = set(spectra_list)
print(set_spectra)
# This will generate three files s07AV95a_envi.hdr, s07AV95a_envi.hdr.sli,splib.db and dataSplib07.db
# For a library in `ASTER Spectral Library Version 2.0 <https://asterweb.jpl.nasa.gov/>`_ format
data_dir = "dataSplib07.db"
# Avoid overwrite during nosetests of full .hdr and .sli files with sample .hdr and .sli
if (os.path.isfile('s07_AV95_envi.hdr')):
header_name = "s07_AV95_envi_sample"
else:
header_name = "s07_AV95_envi"
# create a new mineral aster conversion instance
spectral_envi = AsterConversion()
# Generate .sli and .hdr
spectral_envi.convert(directory, data_dir, header_name)