Sentiment Analysis: Supervised Learning with SVM and Apache Spark

The objective is the two-class discrimination (positive or negative opinion) from movie reviews using data from the IMDB database (50000 reviews).

Technical Environment

This project was realised using Apache Spark. Spark is a popular open-source fast and general-purpose cluster computing system. It provides high-level APIs in Java, Scala, Python and R, and an optimized engine that supports general execution graphs. It also supports a rich set of higher-level tools including Spark SQL for SQL and structured data processing, MLlib for machine learning, GraphX for graph processing, and Spark Streaming.

The chosen programming language is Scala. Scala combines object-oriented and functional programming in one concise, high-level language. It runs on a Java virtual machine (JVM), witch provides interoperability (i.e libraries written in both languages may be referenced directly in Scala or Java code).

Finally, the chosen development environment is IntelliJ IDEA and the build program is sbt.

The following is the build.sbt file used for this project.

name := "SentimentAnalysisIMDB" version := "0.1" scalaVersion := "2.11.12" 
libraryDependencies ++= Seq( "org.apache.spark" %% "spark-sql" % "2.2.0", 
"org.apache.spark" %% 
"spark-mllib" % "2.2.0" )

Dataset

The dataset contains 50,000 movie reviews from the IMDB website. (data available here).
These reviews (one text file per review), are organized in two main folders: a folder for learning (“train” folder) and one for the test (“test” folder). In each of these directories, journals are organized in sub-directories according to whether they represent a positive feeling (“pos” directory) or negative (“neg” directory).
This database also includes 50,000 untagged journals, which will not be used in this project as this is a supervised learning achievement.
The reviews were collected with the restriction of having a maximum of 30 reviews per movie as critics for the same movie tend to be correlated. In addition, the data “test” and “training” do not contain the same films to avoid memorizing a feeling to an associated title.

Chosen approach

To complete our task, one must first build a vector representation of the text reviews. This is called word embedding. Word embedding is the collective name for a set of language modeling and feature learning techniques in natural language processing (NLP) where words or phrases from the vocabulary are mapped to vectors of real numbers.

To do this, we will use the Word2vec. Word2vec is a group of related models that are used to produce word embeddings. These models are shallow, two-layer neural networks that are trained to reconstruct linguistic contexts of words. Word2vec takes as its input a large corpus of text and produces a vector space, typically of several hundred dimensions, with each unique word in the corpus being assigned a corresponding vector in the space. Word vectors are positioned in the vector space such that words that share common contexts in the corpus are located in close proximity to one another in the space.

Word2vec can utilize either of two model architectures to produce a distributed representation of words: continuous bag-of-words (CBOW) or continuous skip-gram. In the continuous bag-of-words architecture, the model predicts the current word from a window of surrounding context words. In the continuous skip-gram architecture, the model uses the current word to predict the surrounding window of context words. The skip-gram architecture weighs nearby context words more heavily than more distant context words.

Now, the supervised learning algorithm chosen is a support vector machine (SVM). SVMs are supervised learning models with associated learning algorithms that analyze data used for classification and regression analysis. Given a set of training examples, each marked as belonging to one or the other of two categories, an SVM training algorithm builds a model that assigns new examples to one category or the other, making it a non-probabilistic binary linear classifier. An SVM model is a representation of the examples as points in space, mapped so that the examples of the separate categories are divided by a clear gap that is as wide as possible. New examples are then mapped into that same space and predicted to belong to a category based on which side of the gap they fall. In addition to performing linear classification, SVMs can efficiently perform a non-linear classification using what is called the kernel trick, implicitly mapping their inputs into high-dimensional feature spaces.

Preparing the dataset

Before generating a vectorial representation of movie reviews, we create a function that will write 4 files in total containing the 50,000 reviews. The prepareData() function writes 2 files containing negative reviews (one review per line), one for learning and one for testing, in the “train / neg” and “test / neg” folders. the function will also writes 2 files for positive reviews (folders “test / pos” and “test / pos”).

def prepareData(dataPath: String, spark: SparkSession): Unit = { val sc = spark.sparkContext
val rawImdbDataPath = dataPath + "aclImdb/"
  val rawTrainNegDataPath = rawImdbDataPath + "train/neg" val rawTrainPosDataPath = 
rawImdbDataPath + "train/pos"
val rawTestNegDataPath = rawImdbDataPath + "test/neg" val rawTestPosDataPath = 
rawImdbDataPath + "test/pos"
val trainNegData = sc.textFile(rawTrainNegDataPath)
  val trainNegDataPath = dataPath + "transform/trainNegData" 
trainNegData.repartition(1).saveAsTextFile(trainNegDataPath)
val trainPosData = sc.textFile(rawTrainPosDataPath)
  val trainPosDataPath = dataPath + "transform/trainPosData" 
trainPosData.repartition(1).saveAsTextFile(trainPosDataPath)
val testNegData = sc.textFile(rawTestNegDataPath)
  val testNegDataPath = dataPath + "transform/testNegData" 
testNegData.repartition(1).saveAsTextFile(testNegDataPath)
val testPosData = sc.textFile(rawTestPosDataPath)
  val testPosDataPath = dataPath + "transform/testPosData" 
testPosData.repartition(1).saveAsTextFile(testPosDataPath)
}

Word2Vec representations

Now, the objective is to write a vectorizeReviews() function which returns a Map containing 2 RDDs of which one RDD will be the vector representation of the negative opinions and the other of the positive opinions. This function will be called twice by our program (once for the reviews intended for learning and the other for those intended for the test).
This function first removes the stop words before the calculation of Word2Vec representations is done. It is therefore necessary to distribute to the computation nodes all the stop words, as well as the Word2Vec model so that these two actions are performed on the compute nodes.

var vSum = Vectors.zeros(vectSize) var vNb = 0
wordSeq.foreach { word =>

if(!(bStopWords.value)(word) & (word.length >= 2)) { bVectors.value.get(word).foreach { v =>

vSum = add(v, vSum)

vNb += 1 }

} }
if (vNb != 0) {

vSum = scalarMultiply(1.0 / vNb, vSum) }

vSum
 }).filter(vec => Vectors.norm(vec, 1.0) > 0.0).persist()

val posReviews2vec = posReviews.filter(sentence => sentence.length >= 1) 
.map(sentence => sentence.toLowerCase.split("\\W+"))
 .map(wordSeq => {def vectorizeReviews(dataPath: String, negReviewsPath: 
String, posReviewsPath: String,
spark: SparkSession): Map[String, RDD[SparkVector]] = { val sc = spark.sparkContext 
val stopWords = Source.fromFile(dataPath + "stop_words").getLines.toSet 
// transmission des stop words aux noeuds de calcul val bStopWords = sc.broadcast(stopWords) 
val w2vModel = Word2VecModel.load(sc, dataPath + "/w2vModel") val vectors = 
w2vModel.getVectors.mapValues(vv => Vectors.dense(vv.map(_.toDouble))).map(identity) 
val bVectors = sc.broadcast(vectors) val vectSize = 100 val negReviews = 
sc.textFile(negReviewsPath) val posReviews = sc.textFile(posReviewsPath) 
val negReviews2vec = negReviews.filter(sentence => sentence.length >= 1) .map(sentence =>

sentence.toLowerCase.split(“\\W+”)) .map(wordSeq => {

var vSum = Vectors.zeros(vectSize)

var vNb = 0 

wordSeq.foreach { word => if(!(bStopWords.value)(word) & 
(word.length >= 2)) 
{ bVectors.value.get(word).foreach { v => vSum = add(v, vSum) 
vNb += 1 } } }

 if (vNb != 0) { vSum = scalarMultiply(1.0 / vNb, vSum) } vSum }).filter(vec => 
Vectors.norm(vec, 1.0) > 0.0).persist() 

val vectorizedReviewsMap = Map("NEG_REVIEWS" -> negReviews2vec, "POS_REVIEWS" -> 
posReviews2vec) 

return vectorizedReviewsMap

 }

Learning

Before we can move on to the SVM learning stage, we need to label our data and transform our RDDs into DataSets. The goal is to have a total of two DataSets (one for Learning and one for Testing) containing positive and negative vector reviews. The next step explains the chosen method.

Creating Learning and Test DataSets

We create a class that will be used to represent our reviews. The Reviews class has two attributes: label (0 or 1) and features (our vectorized reviews).

case class Review(label: Double, features: SparkVector)

The createReviewsDataSet() function retrieves the Map generated by the vectorizeReviews() function, extracts the RDDs and transforms them into DataSets by using a list of instances of Review to label them (0 for negative reviews and 1 for positives). Finally, the function returns a DataSet which is the union of the DataSets of the negative and positive reviews.

def createReviewsDataSet(vectorizedReviewsMap: Map[String, RDD[SparkVector]], spark: 
SparkSession): Dataset[Review] = {

import spark.implicits._
  val negReviewsVecRDD = vectorizedReviewsMap("NEG_REVIEWS")

val posReviewsVecRDD = vectorizedReviewsMap("POS_REVIEWS") val negReviewsList = 
MutableList[Review]()

val posReviewsList = MutableList[Review]()

negReviewsVecRDD.collect().foreach(v => negReviewsList += Review(0, v)) 
posReviewsVecRDD.collect().foreach(v => posReviewsList += Review(1, v))

val negReviewsDS = negReviewsList.toDS() 

val posReviewsDS = posReviewsList.toDS() 

val reviewsDS = negReviewsDS.union(posReviewsDS) 

return reviewsDS

}
This function is obviously called twice by our program, a DataSet for learning (which will be kept in memory) and one for the test.
val trainVectReviewsMap = vectorizeReviews(dataPath,trainNegReviewsPath, trainPosReviewsPath, spark)

val trainReviewsDS = createReviewsDataSet(trainVectReviewsMap, spark).cache() // mise en mémoire

val testVectReviewsMap = vectorizeReviews(dataPath,testNegReviewsPath, testPosReviewsPath, spark)

val testReviewsDS = createReviewsDataSet(testVectReviewsMap, spark)

This is what our DataSets look like.
+-----+--------------------+

|label| features|

+-----+--------------------+
 | 0.0|[-0.0703578602740...|

| 0.0|[-0.0974961428484...|

| 0.0|[-0.0676019091621...|

+-----+--------------------+
val linSvc = new LinearSVC().setMaxIter(10) 
.setFeaturesCol("features") .setLabelCol("label") 
val pipeline = new Pipeline().setStages(Array(linSvc)) 
val paramGrid = new ParamGridBuilder() .addGrid(linSvc.regParam, 
Array(0.02, 0.05, 0.1, 0.2, 0.3, 0.4, 0.5)) .build() 
val cv = new CrossValidator().setEstimator(pipeline) 
.setEstimatorParamMaps(paramGrid) .setNumFolds(5) 
.setEvaluator(new BinaryClassificationEvaluator()) val 
cvSVCmodel = cv.fit(trainReviewsDS)

SVM

We group our treatments in a pipeline (here the only treatment is linear SVM). Then we define a search grid for the parameters using ParamGridBuilder. For each parameter, the values that the grid must include using addGrid are indicated. Then we define an instance of CrossValidator in order to explore the combinations of values of the grid. This instance is defined by indicating the estimator to use (our pipeline), the values for the parameters (our grid), the type of cross validation to use (we will use here a 5-fold) and finally how to evaluate the results. The results are evaluated using an instance of BinaryClassificationEvaluator and the metric used is the AUC.

 

val linSvc = new LinearSVC().setMaxIter(10) 
.setFeaturesCol("features") .setLabelCol("label") 
val pipeline = new Pipeline().setStages(Array(linSvc)) 
val paramGrid = new ParamGridBuilder() 
.addGrid(linSvc.regParam, Array(0.02, 0.05, 0.1, 0.2, 0.3, 0.4, 0.5))
 .build() val cv = 
new CrossValidator().setEstimator(pipeline) 
.setEstimatorParamMaps(paramGrid) .setNumFolds(5) 
.setEvaluator(new BinaryClassificationEvaluator()) val cvSVCmodel = 
cv.fit(trainReviewsDS)

Results

Once the learning is finished, we can record the model obtained and calculate the predictions on the learning and test data.

cvSVCmodel.write.save(dataPath + "model/cvSVCmodel") val 
resTrain = cvSVCmodel.transform(trainReviewsDS) val 
resTest = cvSVCmodel.transform(testReviewsDS)

Now let’s look at the performance of the model obtained by looking at the AUC on the learning and test data.

val resTrainAUC = cvSVCmodel.getEvaluator.evaluate(resTrain)  
val resTestAUC = cvSVCmodel.getEvaluator.evaluate(resTest) 
println("AUC for training data : " + resTrainAUC) 
println("AUC for test data : " + resTestAUC)We obtain the following AUCs.

AUC for training data : 0.7470217151999967

AUC for test data: 0.6758921359789832

results

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

This site uses Akismet to reduce spam. Learn how your comment data is processed.

Create a website or blog at WordPress.com

Up ↑

%d bloggers like this: