spark的实时计算的简单测试记录

资源类型: 技术文章 语言类型: 编程与开发 发布时间: 2016-06-14 21:37:10

更新时间:2017-07-25 22:25:11 发布者:ykswz(ykswz) 550

之前工作需要,测试过spark的事实计算方面的内容,所以这里就有了这份笔记了,所以和大家来分享关于spark的事实计算方面的内容,如果您已经是这方面的高手,可以忽略这方面的内容。但是如果您是这方面感兴趣,想要大概的学习了解一下的话,可以看看我这篇文章吧。虽然不是非常深入的讲解关于事实计算的方面的内容,但是多多少少还是讲解的一个简单版本的事实计算方面的内容。

说了半天,我们今天的spark的事实计算的内容是spark streaming + kafka 实时统计测试,程序分为两块,一个是client,提供相关的数据生产工作,server,提供数据处理操作。都是通过sbt进行打包的。下面是client的simple.sbt的内容
我们直接先放上去代码,首先就说我们sbt编译client的sbt的编写的代码具体如下所示

name := "send data to kafka"
 
version := "1.0"
 
scalaVersion := "2.10.4"
 
libraryDependencies += "org.codehaus.jettison" % "jettison" % "1.3.3"
 
libraryDependencies += "org.apache.kafka" % "kafka_2.10" % "0.8.1"


然后说真正的client的代码的内容,具体如下所示KafkaEventProducer.scala

import java.util.Properties
import scala.util.Properties
import org.codehaus.jettison.json.JSONObject
import kafka.javaapi.producer.Producer
import kafka.producer.KeyedMessage
import kafka.producer.KeyedMessage
import kafka.producer.ProducerConfig
import scala.util.Random
 
object KafkaEventProducer {
 
  private val users = Array(
      "4A4D769EB9679C054DE81B973ED5D768", "8dfeb5aaafc027d89349ac9a20b3930f",
      "011BBF43B89BFBF266C865DF0397AA71", "f2a8474bf7bd94f0aabbd4cdd2c06dcf",
      "068b746ed4620d25e26055a9f804385f", "97edfc08311c70143401745a03a50706",
      "d7f141563005d1b5d0d3dd30138f3f62", "c8ee90aade1671a21336c721512b817a",
      "6b67c8c700427dee7552f81f3228c927", "a95f22eabc4fd4b580c011a3161a9d9d")
 
  private val random = new Random()
 
  private var pointer = -1
 
  def getUserID() : String = {
       pointer = pointer + 1
    if(pointer >= users.length) {
      pointer = 0
      users(pointer)
    } else {
      users(pointer)
    }
  }
 
  def click() : Double = {
    random.nextInt(10)
  }
 
  def main(args: Array[String]): Unit = {
    val topic = "test"
    val brokers = "namenode:9092,datanode1:9092,datanode2:9092"
    val props = new Properties()
    props.put("metadata.broker.list", brokers)
    props.put("serializer.class", "kafka.serializer.StringEncoder")
 
    val kafkaConfig = new ProducerConfig(props)
    val producer = new Producer[String, String](kafkaConfig)
 
    while(true) {
      // prepare event data
      val event = new JSONObject()
      event
        .put("uid", getUserID)
        .put("event_time", System.currentTimeMillis.toString)
        .put("os_type", "Android")
        .put("click_count", click)
      producer.send(new KeyedMessage[String, String](topic, event.toString))
      println("Message sent: " + event)
      Thread.sleep(2000)
    }
  }
}


说完了我们的client客户端的代码,我们来分享对应的处理Kafka消息的服务端的代码的内容,当然一样的,我们给出对应的编译服务端的代码的sbt的代码编写

name := "LogStash"
 
version := "1.0"
 
scalaVersion := "2.10.4"
 
libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % "1.5.1" % "provided",
  "org.apache.spark" %% "spark-sql" % "1.5.1" % "provided",
  "org.apache.spark" %% "spark-streaming" % "1.5.1" % "provided",
  "org.apache.spark" %% "spark-streaming-kafka" % "1.5.1",
  "org.json4s" %% "json4s-native" % "3.2.10",
  "org.json4s" %% "json4s-jackson" % "3.2.10"
)
 
mergeStrategy in assembly := {
  case m if m.toLowerCase.endsWith("manifest.mf")          => MergeStrategy.discard
  case m if m.toLowerCase.matches("meta-inf.*\\.sf$")      => MergeStrategy.discard
  case "log4j.properties"                                  => MergeStrategy.discard
  case m if m.toLowerCase.startsWith("meta-inf/services/") => MergeStrategy.filterDistinctLines
  case "reference.conf"                                    => MergeStrategy.concat
  case _                                                   => MergeStrategy.first
}



然后,我们这里需要安装一个sbt的插件,这个插件可以帮助我们下载相关的依赖的包。对应的server的sbt,这里需要用到sbt的依赖打包插件,project/assembly.sbt里面添加内容

addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.0")


对应的服务端的测试的代码UserClickCountAnalytics.scala

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.DStream.toPairDStreamFunctions
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.kafka.KafkaUtils
import org.json4s._
import kafka.serializer.StringDecoder
import org.json4s.jackson.JsonMethods._
 
object UserClickCountAnalytics {
 
  def main(args: Array[String]) {
 
    val masterUrl = "spark://namenode:7077"
 
    val conf = new SparkConf().setMaster(masterUrl).setAppName("UserClickCountStat")
    val ssc = new StreamingContext(conf, Seconds(5))
 
    val topics = Set("test")
    val brokers = "namenode:9092,datanode1:9092,datanode2:9092"
    val kafkaParams = Map[String, String](
      "metadata.broker.list" -> brokers, "serializer.class" -> "kafka.serializer.StringEncoder")
 
    val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
    val lines = messages.map(_._2)
    val linedata = lines.map{x => val ary = parse_json(x); ary(0) }
    val clickCount = linedata.map(x => (x._1, x._2)).reduceByKey(_ + _)
    clickCount.print()
 
    ssc.start()
    ssc.awaitTermination()
  }
 
  def parse_json(line:String) = {
        val json = parse(line)
        val data = for {
         JObject(child) <- json
         JField("uid", JString(uid)) <- child
         JField("click_count", JInt(click_count)) <- child
       } yield (uid, click_count)
        data
  }
}


最好我们就说说,如何进行运行这个程序吧。具体方式如下

/usr/local/spark-1.5.1-bin-hadoop2.6/bin/spark-submit --class UserClickCountAnalytics --master spark://namenode:7077  --executor-memory 500m /home/hadoop/spark_streaming/server/target/scala-2.10/LogStash-a  ssembly-1.0.jar



好了,今天关于spark的事实计算方面的内容,我们就暂时介绍道这里吧。不知道对应大家学习这方面的内容是否有帮助呢。

最新评论

封面图片

封面图片

简介

之前工作需要,测试过spark的事实计算方面的内容,所以这里就有了这份笔记了,所以和大家来分享关于spark的事实计算方面的内容,如果您已经是这方面的高手,可以忽略这方面的内容。但是如果您是这方面感兴趣,想要大概的学习了解一下的话,可以看看我这篇文章吧。虽然不是非常深入的讲解关于事实计算的方面的内容,但是多多少少还是讲解的一个简单版本的事实计算方面的内容。