What You Will Learn
- How to put everything together
- Create a sentiment analysis model based on FLock Model.
Step by step guide
Through previous session we have crafted all the relavent support classes for us. Such as basic CNN model and data preprocessing functions. Now we need to build our model for finetuning purpose. So lets begin
Create a file called SentimentAnalysis.pyand import all the packages as following
import torch import io from loguru import logger from flock_sdk import FlockSDK, FlockModel from models.basic_cnn import CNNClassifier from data_preprocessing import IndexesDataset, get_loader from pandas import DataFrame import numpy as np import random
Before we getting into the real deal first lets understand how this model works fundatmentally. As we know model generation requires four steps, data preprocessing, data training, data evaluation and data aggregation. Lets start with the hyper parameters, why we need it and what does it do:
Batch Size — number of training examples utilized in one iteration
- Training on batches strikes a balance between computational efficiency and optimization stability.
Epochs — An epoch refers to one cycle through the full training dataset
- Training a model involves iteratively adjusting its weights based on the data. Multiple passes ensure convergence to a good solution.
Classes — a list of the possible classes/categories that the model will predict.
- Each neuron corresponds to the probability of a particular class
Class to index — a dictionary that maps each class to a unique integer index
- Converting textual class labels into numeric values and vice versa
Learning Rate — The learning rate controls how much the model adjusts itself when updating the weights based on the estimated error.
- A higher learning rate can make training faster but may result in overshooting the optimal weights. On the other hand, a lower learning rate is more accurate but may take longer and have a risk of getting stuck in local minima. Tuning the learning rate is a crucial hyperparameter.
Embedding Size — Represents the size of the embedding vectors
- Embeddings capture the meaning of words. The size of these vectors (or embeddings) can affect how much information they can store.
Vocabulary Size — Specifies the number of unique words/tokens in the vocabulary
- It can be costly and sometimes unnecessary to have embeddings for every single word in a language.
This also means these parameters will be the initial parameter for our model.
class Example_Model(FlockModel): def __init__( self, classes, batch_size=256, epochs=1, lr=0.03, emb_size=100, vocab_size=30000, client_id=1 ): """ Hyper parameters """ self.batch_size = batch_size self.epochs = epochs self.classes = classes self.class_to_idx = {_class: idx for idx, _class in enumerate(self.classes)} self.lr = lr self.emb_size = emb_size self.vocab_size = vocab_size
We will also need to define the device for training. Here we first try to use cuda (represent GPU) if fails we will turn to CPU
""" Device setting """ if torch.cuda.is_available(): device = "cuda" else: device = "cpu" self.device = torch.device(device)
The final appearance of the __init__ function should be as follows:
def __init__( self, classes, batch_size=256, epochs=1, lr=0.03, emb_size=100, vocab_size=30000, client_id=1 ): """ Hyper parameters """ self.batch_size = batch_size self.epochs = epochs self.classes = classes self.class_to_idx = {_class: idx for idx, _class in enumerate(self.classes)} self.lr = lr self.emb_size = emb_size self.vocab_size = vocab_size """ Device setting """ if torch.cuda.is_available(): device = "cuda" else: device = "cpu" self.device = torch.device(device)
init_dataset
In this section, we’ll discuss the init_dataset function, which leverages utilities from our data processing.py script.
Purpose:
The function prepares raw data, making it suitable for training or evaluating a neural network model, especially in the context of Natural Language Processing (NLP).
Steps:
Convert Dataset to IndexesDataset:
- Instead of working directly with raw data, we utilize the IndexesDataset class.
- The IndexesDataset class handles tasks like tokenization, sequence padding, and vocabulary creation. By processing our dataset with this class, we ensure our data is in a format that our neural network can understand and learn from.
Get DataLoader using get_loader:
- After processing our data, we need a way to efficiently feed it into our model in batches. This is where get_loader comes in.
- It takes our processed dataset and returns a DataLoader from PyTorch. This utility provides an iterator that yields batches of data, making the training or evaluation process more efficient.
def init_dataset(self, dataset_path: str) -> None: self.datasetpath = dataset_path with open(dataset_path, "r") as f: dataset = json.load(f) dataset_df = IndexesDataset(dataset, max_samples_count=10000, device=device) logger.debug("Processing dataset") self.test_data_loader = get_loader( dataset_df, batch_size=batch_size )
get_starting_model
The get_starting_model function appears to be a method within a class (possibly a training or model utility class). This method's primary purpose is to ensure reproducibility and then initialize a CNNClassifier model.
- We will create a function first def get_starting_model(self):
- Setting the Seed:
seed = 0 random.seed(seed) np.random.seed(seed) torch.manual_seed(seed) torch.cuda.manual_seed_all(seed)
- Making CUDA Operations Deterministic
torch.backends.cudnn.deterministic = True
- Return the CNN Classifier:
return CNNClassifier(vocab_size=self.vocab_size, emb_size=self.emb_size)
The final appearance of the get_starting_model function should be as follows:
def get_starting_model(self): seed = 0 random.seed(seed) np.random.seed(seed) torch.manual_seed(seed) torch.cuda.manual_seed_all(seed) torch.backends.cudnn.deterministic = True return CNNClassifier(vocab_size=self.vocab_size, emb_size=self.emb_size)
Training
train function should:
- Take in the model weights as bytes and load them into your model
- If parameters passed are None, initialize them to match your untrained model’s parameters (i.e. clean slate)
- If needed pre-process the dataset which is passed as a list of rows parsed as dicts
- Output the model parameters retrained on the dataset AS BYTES
We will first setup all the variables we need. For input variables we will require parameters and dataset. Following the model training process, we begin with processing dataset, which means we will be using process_dataset from data processing.py And then we will init our model by calling the get_starting_model to get the model base. Now we can come up with:
def train(self, parameters: bytes | None, dataset: list[list]) -> bytes: data_loader = self.process_dataset(dataset) # get the data loader model = self.get_starting_model() # get the model if parameters is not None: # check parameter model.load_state_dict(torch.load(io.BytesIO(parameters))) model.train() # start the training optimizer = torch.optim.Adam( model.parameters(), lr=self.lr, ) # define optimizer criterion = torch.nn.BCEWithLogitsLoss() # measure how far away from predictions model.to(self.device) # add model to the right device
Now we need to run through our training. Here is a high level overview.
- We loop through each epoch
- Each epoch reset the metrics, training loss, accuracy and count
for epoch in range(self.epochs): logger.debug(f"Epoch {epoch}") train_loss = 0.0 train_correct = 0 train_total = 0
- Then iterate over batches
- Load a batch of data and move it to the appropriate computation device. Feed the input data through the model to get predictions.
for batch_idx, (inputs, targets) in enumerate(data_loader): optimizer.zero_grad() inputs, targets = inputs.to(self.device), targets.to(self.device).unsqueeze(1) outputs = model(inputs)
- Measure the difference between the predictions and the actual targets. Calculate the contribution of each model parameter to the error.
loss = criterion(outputs, targets) loss.backward() optimizer.step()
- Adjust the model parameters to reduce the error in the next iteration.Accumulate metrics (like loss and accuracy) for evaluation and logging.
train_loss += loss.item() * inputs.size(0) predicted = torch.round(outputs).squeeze() train_total += targets.size(0) train_correct += (predicted == targets.squeeze()).sum().item()
- Log the training progress for the initial batches to monitor the training process.
if batch_idx < 2: logger.debug( f"Batch {batch_idx}, Acc: {round(100.0 * train_correct / train_total, 2)}, Loss: {round(train_loss / train_total, 4)}" )
After completing the whole process we save the parameter into a buffer in byte formate, which will be used in model evaluation stage.
buffer = io.BytesIO() torch.save(model.state_dict(), buffer) return buffer.getvalue()
The final appearance of the train function should be as follows:
def train(self, parameters: bytes | None, dataset: list[list]) -> bytes: data_loader = self.process_dataset(dataset) # get the data loader model = self.get_starting_model() # get the model if parameters is not None: # check parameter model.load_state_dict(torch.load(io.BytesIO(parameters))) model.train() # start the training optimizer = torch.optim.Adam( model.parameters(), lr=self.lr, ) # define optimizer criterion = torch.nn.BCEWithLogitsLoss() # measure how far away from predictions model.to(self.device) # add model to the right device for epoch in range(self.epochs): logger.debug(f"Epoch {epoch}") train_loss = 0.0 train_correct = 0 train_total = 0 for batch_idx, (inputs, targets) in enumerate(data_loader): optimizer.zero_grad() inputs, targets = inputs.to(self.device), targets.to(self.device).unsqueeze(1) outputs = model(inputs) loss = criterion(outputs, targets) loss.backward() optimizer.step() train_loss += loss.item() * inputs.size(0) predicted = torch.round(outputs).squeeze() train_total += targets.size(0) train_correct += (predicted == targets.squeeze()).sum().item() if batch_idx < 2: logger.debug( f"Batch {batch_idx}, Acc: {round(100.0 * train_correct / train_total, 2)}, Loss: {round(train_loss / train_total, 4)}" ) buffer = io.BytesIO() torch.save(model.state_dict(), buffer) return buffer.getvalue()
Evaluation
evaluate function should:
- Take in the model weights as bytes and load them into your model
- If parameters passed are None, initialize them to match your untrained model’s parameters (i.e. clean slate)
- If needed pre-process the dataset which is passed as a list of rows parsed as dicts
- Output the accuracy of the model parameters on the dataset as a float
The primary purpose of evaluation is to assess the model’s performance on unseen data and determine if the training process is producing satisfactory results. The evaluation function often shares structural similarities with the training function, but with key differences:
- No Parameter Updates: During evaluation, the model’s parameters are not updated. We’re only interested in measuring its performance, not refining it further.
- Single Pass: Typically, evaluation is performed in a single pass over the validation or test dataset, without the multiple epochs often seen in training.
- Efficiency and Speed: Since we don’t update parameters or iterate multiple times, evaluation can be more efficient. Disabling gradient computation, as seen with torch.no_grad(), further optimizes the process for speed.
The initalisation part of the code is very much as same as the training function. Where we define the model, get the criterion, and state metric parameters
def evaluate(self, parameters: bytes | None, dataset: list[list]) -> float: data_loader = self.process_dataset(dataset) criterion = torch.nn.BCELoss() model = self.get_starting_model() if parameters is not None: model.load_state_dict(torch.load(io.BytesIO(parameters))) model.to(self.device) model.eval() # eval() function set model to evaluation mode test_correct = 0 test_loss = 0.0 test_total = 0
And then we uses torch.no_grad(), ensuring that no updates are made to the model's parameters and that memory usage is optimized. For each batch in the DataLoader:
with torch.no_grad(): for batch_idx, (inputs, targets) in enumerate(self.test_data_loader):
- The inputs and targets are sent to the device, and the model predicts based on these inputs.
inputs, targets = inputs.to(self.device), targets.to(self.device).unsqueeze(1) outputs = model(inputs) loss = criterion(outputs, targets)
- The loss between the predictions and actual targets is computed. Metrics (like accuracy and total loss) are updated based on the current batch’s results.
test_loss += loss.item() * inputs.size(0) predicted = torch.round(outputs).squeeze() test_total += targets.size(0) test_correct += (predicted == targets.squeeze()).sum().item()
Lastly we compute overall accuracy by test_correct / test_total and we return the accuracy
The final appearance of the evaluate function should be as follows:
def evaluate(self, parameters: bytes | None, dataset: list[list]) -> float: data_loader = self.process_dataset(dataset) criterion = torch.nn.BCELoss() model = self.get_starting_model() if parameters is not None: model.load_state_dict(torch.load(io.BytesIO(parameters))) model.to(self.device) model.eval() test_correct = 0 test_loss = 0.0 test_total = 0 with torch.no_grad(): for batch_idx, (inputs, targets) in enumerate(self.test_data_loader): loss = criterion(outputs, targets) test_loss += loss.item() * inputs.size(0) predicted = torch.round(outputs).squeeze() test_total += targets.size(0) test_correct += (predicted == targets.squeeze()).sum().item() inputs, targets = inputs.to(self.device), targets.to(self.device).unsqueeze(1) outputs = model(inputs) loss = criterion(outputs, targets) inputs, targets = inputs.to(self.device), targets.to(self.device).unsqueeze(1) outputs = model(inputs) loss = criterion(outputs, targets) test_loss += loss.item() * inputs.size(0) predicted = torch.round(outputs).squeeze() test_total += targets.size(0) test_correct += (predicted == targets.squeeze()).sum().item() accuracy = test_correct / test_total logger.info( f"Model test, Acc: {accuracy}, Loss: {round(test_loss / test_total, 4)}" ) return accuracy
Aggregation
aggregate function should
- take in a list of model weights (bytes)
- aggregate them using avg and output the aggregated parameters as bytes
The main goal of the aggregate function is to calculate the average of the parameters from multiple models and provide the averaged parameters. This is often used in methods like Federated Learning where multiple local models are trained, and then their parameters are combined to get a global model.
First we need to load the model parameter. For this step, we want o convert the list of byte-represented parameters into actual PyTorch tensors or dictionaries of tensors.
parameters_list = [ torch.load(io.BytesIO(parameters)) for parameters in parameters_list ]
Next we will average parameters, each parameter key’s corresponding values from all models are averaged.
averaged_params_template = parameters_list[0] for k in averaged_params_template.keys(): temp_w = [] for local_w in parameters_list: temp_w.append(local_w[k]) averaged_params_template[k] = sum(temp_w) / len(temp_w)
Lastly we want to sabe the parameter and convert it back to byte-represented averaged parameters.
# Create a buffer buffer = io.BytesIO() # Save state dict to the buffer torch.save(averaged_params_template, buffer) # Get the byte representation aggregated_parameters = buffer.getvalue() return aggregated_parameters
The final appearance of the aggregate function should be as follows:
def aggregate(self, parameters_list: list[bytes]) -> bytes: parameters_list = [ torch.load(io.BytesIO(parameters)) for parameters in parameters_list ] averaged_params_template = parameters_list[0] for k in averaged_params_template.keys(): temp_w = [] for local_w in parameters_list: temp_w.append(local_w[k]) averaged_params_template[k] = sum(temp_w) / len(temp_w) # Create a buffer buffer = io.BytesIO() # Save state dict to the buffer torch.save(averaged_params_template, buffer) # Get the byte representation aggregated_parameters = buffer.getvalue() return aggregated_parameters
Final step
Lastly we need to set to run, define and register the actions with flock SDK. he hyperparameters for the training are set under a block commented as “Hyper parameters”, which include device specification (CUDA), maximum sequence length, number of epochs, learning rate, embedding size, batch size, and class labels. An instance of FlockModel is created with the specified hyperparameters, which is then passed to an instance of FlockSDK. The run() method of the FlockSDK instance is called to initiate the training process.
if __name__ == "__main__": """ Hyper parameters """ device = "cuda" max_seq_len = 64 epochs = 3 lr = 0.001 emb_size = 100 batch_size = 64 classes = [ "1", "2", ] flock_model = Example_model( classes, batch_size=batch_size, epochs=epochs, lr=lr, emb_size=emb_size, ) sdk = FlockSDK(flock_model) sdk.run()
Conclusion
By now, you should be able to create your own senitment model now. For next step, you can either use it locally or use it with our FLock Client to experience the Federated learning with blockchain. Great work!
Reach out to us by
Website: https://flock.io/
Twitter: https://twitter.com/flock_io
Telegram: https://t.me/flock_io_community
Discord: https://discord.gg/ay8MnJCg2W