Azure Databricks 情绪分析
Azure Databricks Sentiment Analysis
我正在学习有关 Databricks 的教程。在最后一节中,当调用语言和情感 API 时,情感列总是 return“无法检测到语言”。我对Scala还不够熟悉,无法解决这个问题。
这是我运行的一段代码(和教程一样):
import java.io._
import java.net._
import java.util._
case class Language(documents: Array[LanguageDocuments], errors: Array[Any]) extends Serializable
case class LanguageDocuments(id: String, detectedLanguages: Array[DetectedLanguages]) extends Serializable
case class DetectedLanguages(name: String, iso6391Name: String, score: Double) extends Serializable
case class Sentiment(documents: Array[SentimentDocuments], errors: Array[Any]) extends Serializable
case class SentimentDocuments(id: String, score: Double) extends Serializable
case class RequestToTextApi(documents: Array[RequestToTextApiDocument]) extends Serializable
case class RequestToTextApiDocument(id: String, text: String, var language: String = "") extends Serializable
import javax.net.ssl.HttpsURLConnection
import com.google.gson.Gson
import com.google.gson.GsonBuilder
import com.google.gson.JsonObject
import com.google.gson.JsonParser
import scala.util.parsing.json._
object SentimentDetector extends Serializable {
// Cognitive Services API connection settings
val accessKey = "<PROVIDE ACCESS KEY HERE>"
val host = "https://cognitive-docs.cognitiveservices.azure.com/"
val languagesPath = "/text/analytics/v2.1/languages"
val sentimentPath = "/text/analytics/v2.1/sentiment"
val languagesUrl = new URL(host+languagesPath)
val sentimenUrl = new URL(host+sentimentPath)
val g = new Gson
def getConnection(path: URL): HttpsURLConnection = {
val connection = path.openConnection().asInstanceOf[HttpsURLConnection]
connection.setRequestMethod("POST")
connection.setRequestProperty("Content-Type", "text/json")
connection.setRequestProperty("Ocp-Apim-Subscription-Key", accessKey)
connection.setDoOutput(true)
return connection
}
def prettify (json_text: String): String = {
val parser = new JsonParser()
val json = parser.parse(json_text).getAsJsonObject()
val gson = new GsonBuilder().setPrettyPrinting().create()
return gson.toJson(json)
}
// Handles the call to Cognitive Services API.
def processUsingApi(request: RequestToTextApi, path: URL): String = {
val requestToJson = g.toJson(request)
val encoded_text = requestToJson.getBytes("UTF-8")
val connection = getConnection(path)
val wr = new DataOutputStream(connection.getOutputStream())
wr.write(encoded_text, 0, encoded_text.length)
wr.flush()
wr.close()
val response = new StringBuilder()
val in = new BufferedReader(new InputStreamReader(connection.getInputStream()))
var line = in.readLine()
while (line != null) {
response.append(line)
line = in.readLine()
}
in.close()
return response.toString()
}
// Calls the language API for specified documents.
def getLanguage (inputDocs: RequestToTextApi): Option[Language] = {
try {
val response = processUsingApi(inputDocs, languagesUrl)
// In case we need to log the json response somewhere
val niceResponse = prettify(response)
// Deserializing the JSON response from the API into Scala types
val language = g.fromJson(niceResponse, classOf[Language])
if (language.documents(0).detectedLanguages(0).iso6391Name == "(Unknown)")
return None
return Some(language)
} catch {
case e: Exception => return None
}
}
// Calls the sentiment API for specified documents. Needs a language field to be set for each of them.
def getSentiment (inputDocs: RequestToTextApi): Option[Sentiment] = {
try {
val response = processUsingApi(inputDocs, sentimenUrl)
val niceResponse = prettify(response)
// Deserializing the JSON response from the API into Scala types
val sentiment = g.fromJson(niceResponse, classOf[Sentiment])
return Some(sentiment)
} catch {
case e: Exception => return None
}
}
}
// User Defined Function for processing content of messages to return their sentiment.
val toSentiment =
udf((textContent: String) =>
{
val inputObject = new RequestToTextApi(Array(new RequestToTextApiDocument(textContent, textContent)))
val detectedLanguage = SentimentDetector.getLanguage(inputObject)
detectedLanguage match {
case Some(language) =>
if(language.documents.size > 0) {
inputObject.documents(0).language = language.documents(0).detectedLanguages(0).iso6391Name
val sentimentDetected = SentimentDetector.getSentiment(inputObject)
sentimentDetected match {
case Some(sentiment) => {
if(sentiment.documents.size > 0) {
sentiment.documents(0).score.toString()
}
else {
"Error happened when getting sentiment: " + sentiment.errors(0).toString
}
}
case None => "Couldn't detect sentiment"
}
}
else {
"Error happened when getting language" + language.errors(0).toString
}
case None => "Couldn't detect language"
}
}
)
// Prepare a dataframe with Content and Sentiment columns
val streamingDataFrame = incomingStream.selectExpr("cast (body as string) AS Content").withColumn("Sentiment", toSentiment($"Content"))
// Display the streaming data with the sentiment
streamingDataFrame.writeStream.outputMode("append").format("console").option("truncate", false).start().awaitTermination()
有什么想法吗?
已编辑,2022 年 3 月 15 日
好的,发现错误。我在认知服务端点中遗漏了一个字符。
我正在学习有关 Databricks 的教程。在最后一节中,当调用语言和情感 API 时,情感列总是 return“无法检测到语言”。我对Scala还不够熟悉,无法解决这个问题。
这是我运行的一段代码(和教程一样):
import java.io._
import java.net._
import java.util._
case class Language(documents: Array[LanguageDocuments], errors: Array[Any]) extends Serializable
case class LanguageDocuments(id: String, detectedLanguages: Array[DetectedLanguages]) extends Serializable
case class DetectedLanguages(name: String, iso6391Name: String, score: Double) extends Serializable
case class Sentiment(documents: Array[SentimentDocuments], errors: Array[Any]) extends Serializable
case class SentimentDocuments(id: String, score: Double) extends Serializable
case class RequestToTextApi(documents: Array[RequestToTextApiDocument]) extends Serializable
case class RequestToTextApiDocument(id: String, text: String, var language: String = "") extends Serializable
import javax.net.ssl.HttpsURLConnection
import com.google.gson.Gson
import com.google.gson.GsonBuilder
import com.google.gson.JsonObject
import com.google.gson.JsonParser
import scala.util.parsing.json._
object SentimentDetector extends Serializable {
// Cognitive Services API connection settings
val accessKey = "<PROVIDE ACCESS KEY HERE>"
val host = "https://cognitive-docs.cognitiveservices.azure.com/"
val languagesPath = "/text/analytics/v2.1/languages"
val sentimentPath = "/text/analytics/v2.1/sentiment"
val languagesUrl = new URL(host+languagesPath)
val sentimenUrl = new URL(host+sentimentPath)
val g = new Gson
def getConnection(path: URL): HttpsURLConnection = {
val connection = path.openConnection().asInstanceOf[HttpsURLConnection]
connection.setRequestMethod("POST")
connection.setRequestProperty("Content-Type", "text/json")
connection.setRequestProperty("Ocp-Apim-Subscription-Key", accessKey)
connection.setDoOutput(true)
return connection
}
def prettify (json_text: String): String = {
val parser = new JsonParser()
val json = parser.parse(json_text).getAsJsonObject()
val gson = new GsonBuilder().setPrettyPrinting().create()
return gson.toJson(json)
}
// Handles the call to Cognitive Services API.
def processUsingApi(request: RequestToTextApi, path: URL): String = {
val requestToJson = g.toJson(request)
val encoded_text = requestToJson.getBytes("UTF-8")
val connection = getConnection(path)
val wr = new DataOutputStream(connection.getOutputStream())
wr.write(encoded_text, 0, encoded_text.length)
wr.flush()
wr.close()
val response = new StringBuilder()
val in = new BufferedReader(new InputStreamReader(connection.getInputStream()))
var line = in.readLine()
while (line != null) {
response.append(line)
line = in.readLine()
}
in.close()
return response.toString()
}
// Calls the language API for specified documents.
def getLanguage (inputDocs: RequestToTextApi): Option[Language] = {
try {
val response = processUsingApi(inputDocs, languagesUrl)
// In case we need to log the json response somewhere
val niceResponse = prettify(response)
// Deserializing the JSON response from the API into Scala types
val language = g.fromJson(niceResponse, classOf[Language])
if (language.documents(0).detectedLanguages(0).iso6391Name == "(Unknown)")
return None
return Some(language)
} catch {
case e: Exception => return None
}
}
// Calls the sentiment API for specified documents. Needs a language field to be set for each of them.
def getSentiment (inputDocs: RequestToTextApi): Option[Sentiment] = {
try {
val response = processUsingApi(inputDocs, sentimenUrl)
val niceResponse = prettify(response)
// Deserializing the JSON response from the API into Scala types
val sentiment = g.fromJson(niceResponse, classOf[Sentiment])
return Some(sentiment)
} catch {
case e: Exception => return None
}
}
}
// User Defined Function for processing content of messages to return their sentiment.
val toSentiment =
udf((textContent: String) =>
{
val inputObject = new RequestToTextApi(Array(new RequestToTextApiDocument(textContent, textContent)))
val detectedLanguage = SentimentDetector.getLanguage(inputObject)
detectedLanguage match {
case Some(language) =>
if(language.documents.size > 0) {
inputObject.documents(0).language = language.documents(0).detectedLanguages(0).iso6391Name
val sentimentDetected = SentimentDetector.getSentiment(inputObject)
sentimentDetected match {
case Some(sentiment) => {
if(sentiment.documents.size > 0) {
sentiment.documents(0).score.toString()
}
else {
"Error happened when getting sentiment: " + sentiment.errors(0).toString
}
}
case None => "Couldn't detect sentiment"
}
}
else {
"Error happened when getting language" + language.errors(0).toString
}
case None => "Couldn't detect language"
}
}
)
// Prepare a dataframe with Content and Sentiment columns
val streamingDataFrame = incomingStream.selectExpr("cast (body as string) AS Content").withColumn("Sentiment", toSentiment($"Content"))
// Display the streaming data with the sentiment
streamingDataFrame.writeStream.outputMode("append").format("console").option("truncate", false).start().awaitTermination()
有什么想法吗?
已编辑,2022 年 3 月 15 日
好的,发现错误。我在认知服务端点中遗漏了一个字符。