Archimedes: a simple exercise with Keras and Scikit-Fuzzy

This is a simple exercise, not a real, complete implementation. However I think it’s a good starting point if you want to use Keras in order to learn time sequences and Scikit-Fuzzy, to extract probabilistic rules (which descrive the evolution) from them.

First of all, you need both packages:

I start creating a sample sequence (in this case, very simple and reasonably predictable):

import numpy as np

class SampleGenerator(object):
    def __init__(self, num_samples, num_tests):
        self.num_samples = num_samples
        self.num_tests = num_tests

    def generate_discrete_sequence(self):
        symbols = (0, 1, 2, 3, 4, 5)
        data = []

        data.append(np.random.randint(0, high=len(symbols)))

        for i in range(1, self.num_samples + self.num_tests):
            # Apply "probabilistic-logic" rules
            if data[i-1] == symbols[0]:
                if self.uniform_threshold():

            if data[i-1] == symbols[1]:
                if self.uniform_threshold():

            if data[i-1] == symbols[2]:
                if self.uniform_threshold():

            if data[i-1] == symbols[3]:
                if self.uniform_threshold():

            if data[i-1] == symbols[4]:
                if self.uniform_threshold():

            if data[i - 1] == symbols[5]:
                if self.uniform_threshold():

        return np.array(data[0:self.num_samples]).astype(np.int32), np.array(data[self.num_samples:]).astype(np.int32)

    def uniform_threshold():
        if np.random.uniform() < 0.9:
            return True
            return False

I generate a numeric sequence using 6 unique symbols (natural numbers, but they can be labels, vector, etc.), with pre-coded rules (even if the model doesn’t know them at all) and threshold-probability (90%, just to avoid too many fluctuations). Such a sequence is something like this:

[0 2 5 4 3 1 0 2 5 4 3 1 0 3 1 0 2 5 4 1 4 3 1 0 2 1 0 2 5 2 5 4 3 1 0 2 5 4 3 1 0 2 5 2 5 4 3 1 0 2 … ]

Sample sequence

import numpy as np

from processor import SequenceProcessor
from samples import SampleGenerator

if __name__ == '__main__':
    print "Archimedes test"

    # Discrete sequence
    print "Generating sequence..."

    sample_generator = SampleGenerator(20000, 3000)
    train_set, test_set = sample_generator.generate_discrete_sequence()

In this case, we have 20000 training samples and 3000 test samples.

Now, it’s time to describe my neural model (before posting the whole code). As I said before, I preferred to use Keras (with Theano backend) in order to speed up the training phase (I use my NVIDIA GPU with CUDA). After some tests, I’ve decided to keep this structure:

# Create new neural model
self._model = Sequential()

self._model.add(LSTM(self._input_block_length * 100, return_sequences=True, input_shape=(self._input_block_length, 1)))
self._model.add(LSTM(self._input_block_length * 50, return_sequences=False))
self._model.add(Dense(self._input_block_length * 10))
self._model.add(Dense(self._input_block_length * 5))

It uses two LSTM (Long-Short-term-Memory) neural layers (if you don’t know them and how they work, I suggest reading this paper: or this tutorial:, followed by three dense layers (each shorter than the previous one and separated by a dropout), till the last one, with a single output neuron activated by a softmax function.

Now it’s time to show the whole code (a bit long). It contains also some reference to AWS S3 (in order to save and restore models, but it’s a detail you can completely ignore).

import boto3, boto3.session
import numpy as np
import os, uuid

from boto3.s3.transfer import S3Transfer
from keras.models import Sequential, model_from_json
from keras.layers.core import Dense, Activation, Dropout
from keras.layers.recurrent import LSTM
from keras.utils.np_utils import to_categorical
from rule_registry import RuleRegistry
from tempfile import gettempdir

class SequenceProcessor(object):
    _s3_archimedes_region = '<REGION>'
    _s3_archimedes_bucket = '<BUCKET_NAME>'

    def __init__(self, sequence, test_sequence, model_uid=None, input_block_length=5, resolution=10, verbose=True):
        Process a sequence

        :param sequence:
        :param test_sequence:
        :param model_uid:
        :param input_block_length:
        :param resolution:
        :param verbose:

        if input_block_length < 1:
            raise AttributeError('Input block length must be greater than 0')

        if len(sequence) < input_block_length or len(test_sequence) < input_block_length:
            raise AttributeError('Sequences must be longer than input block')

        self._sequence = sequence
        self._test_sequence = test_sequence
        self._input_block_length = input_block_length
        self._resolution = resolution
        self._verbose = verbose
        self._rule_registry = None
        self._uuid = model_uid

        # Initialize AWS S3
        session = boto3.session.Session(region_name=self._s3_archimedes_region)
        s3_client = session.client('s3', config= boto3.session.Config(signature_version='s3v4'))
        self._s3_transfer = S3Transfer(s3_client)

        # Prepare sequences
        self.X, self.y, self.symbol_count = self.prepare_sequence(self.sequence)
        self.X_test, self.y_test, self.test_symbol_count = self.prepare_sequence(self._test_sequence, test=True)

        # Setup rules

        # Build neural model
        self._model = None

        if self._uuid is None:
            # Create model from scratch
            if self._verbose:
                print 'Creating new model...'


            # Load model
            self._model = self.load(self._uuid)

        # Compile model
        if self._verbose:
            print 'Compiling model...'

        self._model.compile(loss='categorical_crossentropy', optimizer='rmsprop', metrics=['accuracy'])

    def fit(self, batch_size=64, epochs=8):
        if self._verbose:
            print 'Training model...', self.y, batch_size=batch_size, nb_epoch=epochs,
                        validation_data=(self.X_test, self.y_test), verbose=self._verbose)

        # Fit rules

        # Setup rule engine model

    def predict_with_neural_network(self, x, batch_size=1):
        return self._model.predict(x, batch_size=batch_size)

    def predict_with_rule_engine(self, x):
        return self._rule_registry.predict(x)

    def load(self, uuid):
        if self._verbose:
            print 'Loading model from S3. UUID: %s' % uuid

        j_uuid = gettempdir() + os.pathsep + str(uuid.uuid4()) + '.json'
        h5_uuid = gettempdir() + os.pathsep + str(uuid.uuid4()) + '.h5'

        self._s3_transfer.download_file(self._s3_archimedes_bucket, uuid + '.json', j_uuid)
        self._s3_transfer.download_file(self._s3_archimedes_bucket, uuid + '.h5', h5_uuid)

        model = model_from_json(open(j_uuid).read())

        return model

    def save(self):
        if self._verbose:
            print 'Saving model to S3 (JSON and H5). UUID: %s' % self._uuid

        f_name = gettempdir() + os.pathsep + self._uuid
        t_name = f_name + str(uuid.uuid4())

        open(t_name + '.json', 'w').write(self._model.to_json())
        self._model.save_weights(t_name + '.h5')

        self._s3_transfer.upload_file(t_name + '.json', self._s3_archimedes_bucket, self._uuid + '.json',
                                      extra_args={'ContentType': "application/json"})
        self._s3_transfer.upload_file(t_name + '.h5', self._s3_archimedes_bucket, self._uuid + '.h5')

        return self._uuid

    def init_model(self):
        self._uuid = str(uuid.uuid4())

        # Create new neural model
        self._model = Sequential()

        self._model.add(LSTM(self._input_block_length * 100, return_sequences=True, input_shape=(self._input_block_length, 1)))
        self._model.add(LSTM(self._input_block_length * 50, return_sequences=False))
        self._model.add(Dense(self._input_block_length * 10))
        self._model.add(Dense(self._input_block_length * 5))

        if self._verbose:
            print 'New model created. UUID: %s' % self._uuid

    def prepare_sequence(self, sequence, test=False):
        if self._verbose:
            if test:
                dataset = 'Test'
                dataset = 'Training'
            print 'Preprocessing sequence (%s). Unique symbols: %d' % (dataset, self.unique_symbols_count(sequence))

        X = np.ndarray((len(sequence) - self._input_block_length, self._input_block_length, 1))
        y = np.ndarray((len(sequence) - self._input_block_length, self.unique_symbols_count(sequence)))

        for i in range(len(sequence)-self._input_block_length):
            for j in range(self._input_block_length):
                X[i, j, 0] = sequence[i+j]

        for i in range(self._input_block_length, len(sequence) - self._input_block_length + self._input_block_length):
            y[i - self._input_block_length] = to_categorical([sequence[i]], self.unique_symbols_count(sequence))

        return X, y, self.unique_symbols_count(sequence)

    def setup_rules(self):
        if self._verbose:
            print 'Setting up rules...'

        # Initialize Rule registry
        self._rule_registry = RuleRegistry(self.symbol_count, input_block_length=self._input_block_length,
                                           resolution=self._resolution, verbose=self._verbose)

        # Setup raw rules
        self._rule_registry.setup_rules(self.X, self.y)

    def unique_symbols(sequence):
        return np.unique(sequence)

    def unique_symbols_count(sequence):
        return len(SequenceProcessor.unique_symbols(sequence))

    def uuid(self):
        return self._uuid

    def sequence(self):
        return self._sequence

    def input_block_length(self):
        return self._input_block_length

    def model(self):
        return self._model

I know there are few comments and I’m going to enrich the whole code with them (before publishing it on Github). However it’s quite easy to understand how it works looking at the constructor:

  • Sequence preparation
  • Fuzzy rules setup (I’m going to show it in the next section)
  • Neural model setup

Indeed, in the main method, there are the following two lines:

# Create processor
processor = SequenceProcessor(sequence=train_set, test_sequence=test_set, input_block_length=2, resolution=5)…) is the starting point to do the whole job. Before showing how the rules work, I want to show a sample output:

Archimedes test
Generating sequence...
Preprocessing sequence (Training). Unique symbols: 6
Preprocessing sequence (Test). Unique symbols: 6
Setting up rules...
Initializing Rule registry...
Membership function ranges:
L0) array([ 0.  ,  0.  ,  0.25])
L1) array([ 0.25,  0.5 ,  0.75])
L2) array([ 0.75,  1.  ,  1.  ])
Generating antecedents and consequents...
(Antecedent: Fa(0)) -> (Consequent: Fc(0))
(Antecedent: Fa(1)) -> (Consequent: Fc(1))
(Antecedent: Fa(2)) -> (Consequent: Fc(2))
(Antecedent: Fa(3)) -> (Consequent: Fc(3))
(Antecedent: Fa(4)) -> (Consequent: Fc(4))
(Antecedent: Fa(5)) -> (Consequent: Fc(5))
Generating rules...
Generation complete. 24 raw rules created, 19974 skipped
Creating new model...
New model created. UUID: 99d34450-064e-4829-8843-9f781196c463
Compiling model...
Training model...
Train on 19998 samples, validate on 2998 samples
Epoch 1/3
19998/19998 [==============================] - 4s - loss: 1.1323 - acc: 0.6700 - val_loss: 0.6843 - val_acc: 0.8329
Epoch 2/3
19998/19998 [==============================] - 4s - loss: 0.5284 - acc: 0.8510 - val_loss: 0.4008 - val_acc: 0.8943
Epoch 3/3
19998/19998 [==============================] - 4s - loss: 0.4186 - acc: 0.8868 - val_loss: 0.3909 - val_acc: 0.8943
Fitting rules with neural engine...
Fitting complete. KB is made up of 9 rules:
R0) IF Fa(3)[L1] AND Fa(4)[L1] THEN [Fc(0)[L0], Fc(1)[L0], Fc(2)[L0], Fc(3)[L0], Fc(4)[L1], Fc(5)[L0]]
R1) IF Fa(1)[L1] AND Fa(4)[L1] THEN [Fc(0)[L0], Fc(1)[L0], Fc(2)[L0], Fc(3)[L1], Fc(4)[L0], Fc(5)[L0]]
R2) IF Fa(1)[L1] AND Fa(2)[L1] THEN [Fc(0)[L1], Fc(1)[L1], Fc(2)[L0], Fc(3)[L0], Fc(4)[L0], Fc(5)[L0]]
R3) IF Fa(0)[L1] AND Fa(3)[L1] THEN [Fc(0)[L0], Fc(1)[L2], Fc(2)[L0], Fc(3)[L0], Fc(4)[L0], Fc(5)[L0]]
R4) IF Fa(2)[L1] AND Fa(5)[L1] THEN [Fc(0)[L0], Fc(1)[L0], Fc(2)[L0], Fc(3)[L0], Fc(4)[L2], Fc(5)[L0]]
R5) IF Fa(4)[L1] AND Fa(5)[L1] THEN [Fc(0)[L0], Fc(1)[L0], Fc(2)[L0], Fc(3)[L2], Fc(4)[L0], Fc(5)[L0]]
R6) IF Fa(0)[L1] AND Fa(1)[L1] THEN [Fc(0)[L0], Fc(1)[L0], Fc(2)[L0], Fc(3)[L0], Fc(4)[L0], Fc(5)[L2]]
R7) IF Fa(0)[L1] AND Fa(2)[L1] THEN [Fc(0)[L0], Fc(1)[L0], Fc(2)[L0], Fc(3)[L0], Fc(4)[L0], Fc(5)[L2]]
R8) IF Fa(1)[L1] AND Fa(3)[L1] THEN [Fc(0)[L0], Fc(1)[L1], Fc(2)[L0], Fc(3)[L0], Fc(4)[L0], Fc(5)[L0]]

After starting fitting the network, we generate a bunch of fuzzy rules (with a level of accuracy determined by the “resolution” parameter (in this case 5-2 = 3. So each rule will have the following structure:

Fa(x) is the antecedent related to symbol x
Fc(x) is the consequent related to symbol x

IF Fa(1)[L1] <LOGICAL CONNECTION> Fa(2)[L1] THEN [Fc(0)[L0], Fc(1)[L1], Fc(2)[L0], Fc(3)[L0], Fc(4)[L0], Fc(5)[L0]]

It simply means: if the probabilities of symbol 1 and symbol 2 are both low (L1), then the next symbol is likely to be 1 (even it’s probability is rather low too, but compared to the other ones, it’s the highest). Using the neural model, all the rules (filtered in order to avoid redundancy) are fitted till we reach a final kwnoledge base (in the example, it’s made of 9 rules). Of course it’s possible to improve this algorithm (and I gonna do it), but as starting point, it can be good enough.

Now it’s time to post the whole RuleRegistry code (rule generator):

import hashlib
import numpy as np
import skfuzzy as fuzz
import re

from collections import OrderedDict
from skfuzzy.control.antecedent_consequent import Antecedent, Consequent
from skfuzzy.control import ControlSystem, ControlSystemSimulation
from skfuzzy.control.rule import Rule

class RuleRegistry(object):
    _antecedent_symbol = 'Fa(%(Symbol)s)'
    _antecedent_regex = '(.*)([0-9]+)(.*)'
    _consequent_symbol = 'Fc(%(Symbol)s)'
    _consequent_regex = '(.*)([0-9]+)(.*)'
    _range_level_symbol = 'L'
    _range_level_regex = '(.*)([0-9]+)'
    _rule_prefix = 'R'

    def __init__(self, unique_symbols, input_block_length=5, resolution=10, max_iterations=500, verbose=True):
        Rule registry

        :param unique_symbols:
        :param input_block_length:
        :param resolution:
        :param max_iterations:
        :param verbose:

        if unique_symbols <= 0:
            raise AttributeError('The number of unique symbols must be greater than zero')

        if resolution < 5:
            raise AttributeError('Resolution must be equal or greater than 5')

        self._unique_symbols = unique_symbols
        self._input_block_length = input_block_length
        self._resolution = resolution
        self._max_iterations = max_iterations
        self._verbose = verbose

        if self._verbose:
            print 'Initializing Rule registry...'

        self._raw_rules = {}
        self._rules = {}
        self._antecedents = {}
        self._consequents = {}
        self._a_regex = re.compile(self._antecedent_regex)
        self._c_regex = re.compile(self._consequent_regex)
        self._l_regex = re.compile(self._range_level_regex)
        self._model = None
        self._controller = None

        # Setup membership functions
        self._range_levels = self.membership_ranges()

        # Generate antecedents
        if self._verbose:
            print 'Generating antecedents and consequents...'

        for i in range(self._unique_symbols):
            # Antecedents
            antecedent = Antecedent(np.linspace(0, 1, self._resolution-2), self.symbol(i))

            for label, values in self._range_levels.iteritems():
                antecedent[label] = fuzz.trimf(antecedent.universe, values)

            self._antecedents[self.symbol(i)] = antecedent

            # Consequents
            consequent = Consequent(np.linspace(0, 1, self._resolution-2), self.symbol(i, True))

            for label, values in self._range_levels.iteritems():
                consequent[label] = fuzz.trimf(consequent.universe, values)

            self._consequents[self.symbol(i, True)] = consequent

            print '(%s) -> (%s)' % (antecedent, consequent)

    def setup_rules(self, X, Y):
        skipped = 0

        if self._verbose:
            print 'Generating rules...'

        for i in range(len(X)):
            x = X[i, :, 0]
            y = self.from_category(Y[i])

            # Retrieve unique elements and relative frequencies
            antecedents, antecedent_frequency = self.term_frequencies(x)

            # Generate antecedent and consequent
            key = ''
            antecedent = []
            consequent = self._consequents[self.symbol(y, True)][self.matching_level(1)]

            for j in range(len(antecedents)):
                term = self._antecedents[self.symbol(int(antecedents[j]))][self.matching_level(antecedent_frequency[j])]
                key += str(self.symbol(int(antecedents[j]))) + str(self.matching_level(antecedent_frequency[j]))

            key += str(self.symbol(y, True)) + str(self.matching_level(1))
            hashed_key = str(hashlib.md5(key).hexdigest())

            rule = Rule(self.concat_terms(antecedent, mode='AND'), consequent)

            if hashed_key not in self._raw_rules:
                self._raw_rules[hashed_key] = rule
                skipped += 1

        self._rules = self._raw_rules

        if self._verbose:
            print 'Generation complete. %d raw rules created, %d skipped' % (len(self._raw_rules), skipped)

    def fit_rules(self, model):
        if self._verbose:
            print 'Fitting rules with neural engine...'

        fitted_rules = OrderedDict()
        fitted_rules_raw = OrderedDict()
        rule_index = 0

        for key, rule in self._rules.iteritems():
            antecedents = [0 for i in range(self._unique_symbols)]

            for term in rule.antecedent_terms:
                symbol = int(self.symbol_from_label(term.parent_variable.label))
                level = int(self.level_from_label(term.label))
                antecedents[symbol] = level

            n_antecedents = np.array(antecedents)
            r_antecedents = n_antecedents / np.linalg.norm(n_antecedents)

            avg_prediction = np.array([0.0 for t in range(self._unique_symbols)])
            valid_sequences = 0

            for i in range(self._max_iterations):
                sequence = self.generate_sequence(r_antecedents)
                if len(sequence) != self._input_block_length:
                    # Skip this sequence

                # Predict with neural network
                n_sequence = np.ndarray(shape=(1, self._input_block_length, 1))
                n_sequence[0, :, 0] = sequence

                prediction = model.predict(n_sequence, batch_size=1)

                # Sum up prediction
                avg_prediction += prediction[0, :]
                valid_sequences += 1

            avg_prediction /= float(valid_sequences)

            # Generate weighted consequents
            consequents = []

            for j in range(self._unique_symbols):
                consequents.append(self._consequents[self.symbol(j, True)][self.matching_level(avg_prediction[j])])

            rule.consequent = tuple(consequents)

            # Remove redundant rules
            if rule.__repr__() in fitted_rules_raw:
                # Skip this rule
                fitted_rules_raw[rule.__repr__()] = rule

        for key, rule in fitted_rules_raw.iteritems():
            fitted_rules[self._rule_prefix + str(rule_index)] = rule
            rule_index += 1

        self._rules = fitted_rules

        if self._verbose:
            print 'Fitting complete. KB is made up of %d rules:' % len(self._rules)

            for key, rule in self._rules.iteritems():
                print '%s) %r' % (key, rule)

    def setup_model(self, rule=None):
        if len(self._rules) == 0:
            raise RuntimeError('Before setting up model in necessary to populate rule set')

        if self._verbose and rule is None:
            print 'Setting up rule engine...'

        if rule is not None:
            self._model = ControlSystem([rule])
            self._model = ControlSystem(tuple(self._rules.values()))

        self._controller = ControlSystemSimulation(self._model)

    def predict(self, x):
        if self._model is None or self._controller is None:
            raise RuntimeError('Before predicting in necessary to setup model')

        # Retrieve unique elements and relative frequencies
        antecedents, antecedent_frequency = self.term_frequencies(x)

        input_data = [0.0 for i in range(self._unique_symbols)]

        for i in range(len(antecedents)):
            input_data[antecedents[i]] = antecedent_frequency[i]

        for i in range(self._unique_symbols):
            self._controller.input[self.symbol(i)] = input_data[i]


        except AssertionError:
            if self._verbose:
                print 'Inference error'
            return None

        return self._controller.output

    def symbol(self, s, consequent=False):
        if consequent:
            return self._consequent_symbol % {
                'Symbol': str(s)
            return self._antecedent_symbol % {
                'Symbol' : str(s)

    def term_frequencies(self, x):
        terms, term_count = np.unique(x, return_counts=True)
        term_frequency = term_count / float(self._input_block_length)
        return terms, term_frequency

    def membership_ranges(self):
        ranges = OrderedDict()
        distribution = np.linspace(0, 1, self._resolution)

        for i in range(0, self._resolution-2):
            t = []

            if i == 0:

            elif i == self._resolution-3:
                t.append(distribution[self._resolution - 2])
                t.append(distribution[self._resolution - 1])
                t.append(distribution[self._resolution - 1])

                t.append(distribution[i + 1])
                t.append(distribution[i + 2])

            ranges[self._range_level_symbol + str(i)] = np.array(t)

        if self._verbose:
            print 'Membership function ranges:'

            for label, values in ranges.iteritems():
                print '%s) %r' % (label, values)

        return ranges

    def generate_sequence(self, antecedents):
        sequence = []
        weights = np.argsort(antecedents)

        for i in range(self._input_block_length):
            n = np.random.uniform(0, 1)

            for j in range(len(weights)):
                if n <= antecedents[weights[j]]:

        return np.array(sequence)

    def matching_level(self, x):
        for label, values in self._range_levels.iteritems():
            if np.amin(values) <= x <= np.amax(values):
                return label

        return self._range_levels.keys()[0]

    def from_category(self, y):
        for i in range(self._unique_symbols):
            if y[i] == 1:
                return i

    def concat_terms(terms, mode='OR'):
        clause = terms[0]

        for i in range(1, len(terms)):
            if mode == 'OR':
                clause |= terms[i]
                clause &= terms[i]

        return clause

    def symbol_from_label(self, label):
        match =

    def level_from_label(self, label):
        match =

    def rules(self):
        return self._rules

You don’t have to reference it directly, however it’s responsible for creating a set of standard rules, pruning them and fitting them using the neural model previously trained. If you want to test your final model, you can use this snippet:

a = [0, 2]
na = np.ndarray(shape=(1, 2, 1))
na[0, :, 0] = a

print processor.predict_with_neural_network(na, batch_size=1)
print processor.predict_with_rule_engine(a)

By predicting with both models (neural and fuzzy-based), we get the following results:

[[  5.30292200e-05   8.82180184e-02   6.26192850e-05   5.24612609e-03 5.47356240e-06   9.06414807e-01]]

Fuzzy based:
OrderedDict([('Fc(0)', 0.0), ('Fc(1)', 0.0), ('Fc(2)', 0.0), ('Fc(3)', 0.0), ('Fc(4)', 0.0), ('Fc(5)', 1.0)])

Applying argmax to neural result array, we get symbol 5 as the most probable, and it’s the same predicted by our fuzzy model.

That’s all! I’m going to continue my research and publish all useful updates.

Photo credit: 4 blu via photopin (license)

See also:

Machine Learning Algorithms – Giuseppe Bonaccorso

My latest machine learning book has been published and will be available during the last week of July. From the back cover: In this book you will learn all the important Machine Learning algorithms that are commonly used in the field of data science.