SparkStreaming 是流式处理框架,是 Spark API 的扩展,支持可扩展、高吞吐、容错的实时数据流处理。

实时数据的来源可以是:Kafka,Flume,Twitter,ZeroMQ 或者 TCP sockets,并且可以使用高级功能的复杂算子来处理流数据。例如:map,reduce,join,window 。最终,处理后的数据可以存放在文件系统,数据库等,方便实时展现。

SparkStreaming 与 Storm 的区别

1、Storm 是纯实时的流式处理框架,SparkStreaming 是准实时的处理框架(微批处理)。因为微批处理,SparkStreaming 的吞吐量比 Storm 要高。

2、Storm 的事务机制比 SparkStreaming 的要完善。

3、Storm 支持动态资源调度。(spark1.2 开始和之后也支持)

4、SparkStreaming 擅长复杂的业务处理,Storm 不擅长复杂的业务处理,擅长简单的汇总型计算。

SparkStreaming 初始

1、SparkStreaming 初始理解

注意:

  • receiver task 是 7*24 小时一直在执行,一直接受数据,将一段时间内接收来的数据保存到 batch 中。假设 batchInterval 为 5s,那么会将接收来的数据每隔 5 秒封装到一个 batch 中,batch 没有分布式计算特性,这一个 batch 的数据又被封装到一个 RDD 中,RDD 最终封装到一个 DStream 中。

    例如:假设 batchInterval 为 5 秒,每隔 5 秒通过 SparkStreamin 将得到一个 DStream,在第 6 秒的时候计算这 5 秒的数据,假设执行任务的时间是 3 秒,那么第 6~9 秒一边在接收数据,一边在计算任务,9~10 秒只是在接收数据。然后在第 11 秒的时候重复上面的操作。

  • 如果 job 执行的时间大于 batchInterval 会有什么样的问题?

    如果接受过来的数据设置的级别是仅内存,接收来的数据会越堆积越多,最后可能会导致 OOM(如果设置 StorageLevel 包含 disk, 则内存存放不下的数据会溢写至 disk, 加大延迟 )。

2、SparkStreaming 代码

注意事项:

  • 启动 socket server 服务器:nc –lk 9999
  • eceiver 模式下接受数据,local 的模拟线程必须大于等于 2,一个线程用来 receiver 用来接受数据,另一个线程用来执行 job。
  • Durations 时间设置就是我们能接收的延迟度。这个需要根据集群的资源情况以及任务的执行情况来调节。
  • 创建 JavaStreamingContext 有两种方式(SparkConf,SparkContext)
  • 所有的代码逻辑完成后要有一个 output operation 类算子。
  • JavaStreamingContext.start() Streaming 框架启动后不能再次添加业务逻辑。
  • JavaStreamingContext.stop() 无参的 stop 方法将 SparkContext一同关闭,stop(false),不会关闭 SparkContext。
  • JavaStreamingContext.stop()停止之后不能再调用 start。
public class SparkStreamingTest {
public static void main(String[] args){

SparkConf conf = new SparkConf();
conf.setMaster("local[2]").setAppName("sparkStreaming");
// JavaSparkContext sparkContext = new JavaSparkContext(conf);
JavaStreamingContext streamingContext = new JavaStreamingContext(conf, Durations.seconds(5));
JavaReceiverInputDStream<String> dStream = streamingContext.socketTextStream("sean01", 8888);

JavaDStream<String> wordDStream = dStream.flatMap(
new FlatMapFunction<String, String>() {
private static final long serialVersionUID = 5302655187358849615L;

@Override
public Iterable<String> call(String s) throws Exception {
String[] split = s.split(" ");
return Arrays.asList(split);
}
});

JavaPairDStream<String, Integer> pairDStream = wordDStream.mapToPair(
new PairFunction<String, String, Integer>() {
private static final long serialVersionUID = 1285374334768880064L;

@Override
public Tuple2<String, Integer> call(String word) throws Exception {
return new Tuple2<>(word, 1);
}
});

JavaPairDStream<String, Integer> resultDStream = pairDStream.reduceByKey(
new Function2<Integer, Integer, Integer>() {
private static final long serialVersionUID = 5889114600370649292L;

@Override
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 + v2;
}
});
// action类算子
resultDStream.print();

streamingContext.start();
streamingContext.awaitTermination();
streamingContext.stop();
}
}

SparkStreaming 算子操作

关于几种算子的详细API,请见Github:SparkStreamingAPI

1、foreachRDD、print

output operator 类算子,必须对抽取出来的 RDD 执行 action 类算子,代码才能执行。

2、transform

  • transformation 类算子
  • 可以通过 transform 算子,对Dstream做RDD到RDD的任意操作。

3、updateStateByKey

  • transformation 算子

  • updateStateByKey 作用:

    1)为 SparkStreaming 中每一个 Key 维护一份 state 状态,state 类型可以是任意类型的,可以是一个自定义的对象,更新函数也可以是自定义的。

    2)通过更新函数对该 key 的状态不断更新,对于每个新的 batch 而言,SparkStreaming 会在使用 updateStateByKey 的时候为已经存在的 key 进行 state 的状态更新。

  • 使用到 updateStateByKey 要开启 checkpoint 机制和功能。

  • 多久会将内存中的数据写入到磁盘一份?

    1)如果 batchInterval 设置的时间小于10秒,那么10秒写入磁盘一份。

    2)如果 batchInterval 设置的时间大于 10 秒,那么就会 batchInterval 时间间隔写入磁盘一份。

4、窗口操作

  • 窗口操作理解图:

假设每隔 5s 1 个 batch,上图中窗口长度为 15s,窗口滑动间隔 10s。

  • 窗口长度和滑动间隔必须是 batchInterval 的整数倍。如果不是整数倍会检测报错。

  • 优化后的 window 窗口操作示意图:

  • 优化后的 window 操作要保存状态所以要设置 checkpoint 路径,没有优化的 window 操作可以不设置 checkpoint 路径。

Driver HA(Standalone或Mesos)

因为 SparkStreaming 是 7*24 小时运行,Driver 只是一个简单的进程,有可能挂掉,所以实现 Driver 的 HA 就有必要(如果使用的 Client 模式就无法实现 Driver HA ,这里针对的是 cluster 模式)。

Yarn 平台 cluster 模式提交任务,AM(AplicationMaster)相当于 Driver,如果挂掉会自动启动 AM。这里所说的 DriverHA 针对的是 Spark standalone 和 Mesos 资源调度的情况下。

实现 Driver 的高可用有两个步骤:

第一:提交任务层面,在提交任务的时候加上选项 --supervise,当 Driver挂掉的时候会自动重启 Driver。

第二:代码层面,使用 JavaStreamingContext.getOrCreate(checkpoint 路径,JavaStreamingContextFactory)

Driver 中元数据包括:

1、创建应用程序的配置信息。

2、DStream 的操作逻辑。

3、job 中没有完成的批次数据,也就是 job 的执行进度。

SparkStreaming+Kafka

Receiver 模式

receiver模式原理图

receiver模式理解

在 SparkSteaming 程序运行起来后,Executor 中会有 receiver tasks 接收 kafka 推送过来的数据。数据会被持久化,默认级别为 MEMORY_AND_DISK_SER_2,这个级别也可以修改。receiver task 对接收过来的数据进行存储和备份,这个过程会有节点之间的传输。备份完成后去 Zookeeper 中更新消费偏移量,然后向 Driver 中的 receiver tracker 汇报数据的位置。最后 Driver 根据数据本地化将 task 分发到不同节点上执行。

receiver模式中存在的问题:

当 Driver 进程挂掉后,Driver 下的 Executor 都会被杀掉,当更新完 zookeeper 消费偏移量的时候,Driver 如果挂掉了,就会存在找不到数据的问题,相当于丢失数据。

如何解决这个问题?

开启WAL(write ahead log)预写日志机制,在接受过来数据备份到其他节点的时候,同时备份到 HDFS 上一份(我们需要将接收来的数据的持久化级别降级到 MEMORY_AND_DISK),这样就能保证数据的安全性。

不过,因为写 HDFS 比较消耗性能,要在备份完数据之后才能进行更新 zookeeper 以及汇报位置等,这样会增加 job 的执行时间,这样对于任务的执行提高了延迟度。

Receiver可能会造成重复消费

合理假设一个场景,假如当前zookeeper中记录的偏移量是50,本次接收的数据为51~100,当数据备份之后,同时也放到 HDFS 了,此时准备去zookeeper中更新偏移量时,服务器挂掉了,这时zookeeper中的偏移量没有更新还是50。重启之后会去HDFS中检查数据,发现51~100的数据未计算(一般计算的话需要更新完偏移量才计算),这时开始计算这部分数据。紧接着Kafka就会那这zookeeper中的50继续往下读,这样一来就造成了重复消费。这就是Receiver模式只能保证至少消费一次(at-least),但不能保证有且只会消费一次(exactly-once)。

receiver 的并行度设置

receiver 的并行度是由 spark.streaming.blockInterval 来决定的,默认为200ms。

假设 batchInterval 为 5s,那么每隔 blockInterval 就会产生一个 block,这里就对应每批次产生 RDD 的 partition,这样 5 秒产生的这个 Dstream 中的这个 RDD 的 partition 为 25 个,并行度就是25。

如果想提高并行度可以减少 blockInterval 的数值,但是最好不要低于 50ms。

receiver 模式代码

先准备一个MyProducer类用于产生数据:

public class MyProducer extends Thread {
// sparkstreaming storm flink 两三年后变成主流 流式处理,可能更复杂,数据处理性能要非常好
private String topic; //发送给Kafka的数据,topic
private Producer<Integer, String> producerForKafka;

public MyProducer(String topic) {
this.topic = topic;
Properties conf = new Properties();
conf.put("metadata.broker.list", "sean01:9092,sean02:9092,sean03:9092");
conf.put("serializer.class", StringEncoder.class.getName());
conf.put("acks",1);

producerForKafka = new Producer<Integer, String>(new ProducerConfig(conf));
}

@Override
public void run() {
int counter = 0;
while (true) {
counter++;
String value = "seanxia";
KeyedMessage<Integer, String> message = new KeyedMessage<>(topic, value);
producerForKafka.send(message);
System.out.println(value + " : " + counter + " --------------");

//hash partitioner 当有key时,则默认通过key 取hash后 ,对partition_number 取余数
// producerForKafka.send(new KeyedMessage<Integer, String>(topic,22,userLog));
// 每2条数据暂停1秒
if (0 == counter % 2) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

public static void main(String[] args) {
new MyProducer("sk1").start();
new MyProducer("sk2").start();
}
}

执行以下代码:

public class SparkStreamingOnKafkaReceiver {
public static void main(String[] args) {

SparkConf conf = new SparkConf().setMaster("local[2]").
setAppName("SparkStreamingOnKafkaReceiver");
//开启预写日志 WAL机制
conf.set("spark.streaming.receiver.writeAheadLog.enable", "true");

JavaStreamingContext jsc =
new JavaStreamingContext(conf, Durations.seconds(10));
jsc.checkpoint("./checkpoint");

Map<String, Integer> topicConsumerConcurrency = new HashMap<String, Integer>();
/**
* 设置读取的topic和接受数据的线程数
*/
topicConsumerConcurrency.put("sk1", 1);
topicConsumerConcurrency.put("sk2", 1);

/**
* 第一个参数是StreamingContext
* 第二个参数是ZooKeeper集群信息(接受Kafka数据的时候会从Zookeeper中获得Offset等元数据信息)
* 第三个参数是Consumer Group 消费者组
* 第四个参数是消费的Topic以及并发读取Topic中Partition的线程数
* 注意:
* KafkaUtils.createStream 使用五个参数的方法,设置receiver的存储级别
*/
// JavaPairReceiverInputDStream<String,String> lines = KafkaUtils.createStream(
// jsc,
// "sean01:2181,sean02:2181,sean03:2181",
// "MyFirstConsumerGroup",
// topicConsumerConcurrency,
// StorageLevel.MEMORY_AND_DISK());

JavaPairReceiverInputDStream<String, String> lines = KafkaUtils.createStream(
jsc,
"sean01:2181,sean02:2181,sean03:2181",
"MyFirstConsumerGroup",
topicConsumerConcurrency);

JavaDStream<String> words = lines.flatMap(
new FlatMapFunction<Tuple2<String, String>, String>() {
private static final long serialVersionUID = 1L;

public Iterable<String> call(Tuple2<String, String> tuple)
throws Exception {
return Arrays.asList(tuple._2.split("\t"));
}
});

JavaPairDStream<String, Integer> pairs = words.mapToPair(
new PairFunction<String, String, Integer>() {
private static final long serialVersionUID = 1L;

public Tuple2<String, Integer> call(String word) throws Exception {
return new Tuple2<String, Integer>(word, 1);
}
});

JavaPairDStream<String, Integer> wordsCount = pairs.reduceByKey(
new Function2<Integer, Integer, Integer>() {
private static final long serialVersionUID = 1L;

//对相同的Key,进行Value的累计(包括Local和Reducer级别同时Reduce)
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 + v2;
}
});

wordsCount.print();
jsc.start();
jsc.awaitTermination();
jsc.close();
}
}

注意:

Receiver模式:只能保证至少被消费一次(at-least),但是不能保证有且只会消费一次(exactly-once)。

direct模式:可以保证exactly-once,能保证任务失败重读数据,但是不能保证任务中的输出数据有且只有一次。

Direct 模式

direct 模式理解

SparkStreaming+kafka 的 Driect 模式就是将 kafka 看成存数据的一方,不是被动接收数据,而是主动去取数据。

消费者偏移量也不是用 zookeeper 来管理,而是 SparkStreaming 内部对消费者偏移量自动来维护。默认消费偏移量是在内存中,当然如果设置了checkpoint 目录,那么消费偏移量也会保存在 checkpoint 中。当然也可以实现用 zookeeper 来管理。

direct 模式并行度设置

Direct 模式的并行度是由读取的 kafka 中 topic 的 partition 数决定的

direct 模式代码

public class SparkStreamingOnKafkaDirected {
public static void main(String[] args) {

SparkConf conf = new SparkConf().setMaster("local[2]")
.setAppName("SparkStreamingOnKafkaDirected");
// conf.set("spark.streaming.backpressure.enabled", "false");
// conf.set("spark.streaming.kafka.maxRatePerPartition", "100");
conf.set("spark.streaming.stopGracefullyOnShutdown","true");
JavaStreamingContext jsc = new JavaStreamingContext(conf,Durations.seconds(5));
/**
* 可以不设置checkpoint 不设置不保存offset,offset默认在内存中有一份,
* 如果设置checkpoint在checkpoint也有一份offset, 一般要设置。
*/
jsc.checkpoint("./checkpoint");
Map<String, String> kafkaParameters = new HashMap<String, String>();
kafkaParameters.put("metadata.broker.list", "sean01:9092,sean02:9092,sean03:9092");
// kafkaParameters.put("auto.offset.reset", "smallest");

HashSet<String> topics = new HashSet<String>();
topics.add("sk1");
topics.add("sk2");

JavaPairInputDStream<String,String> lines = KafkaUtils.createDirectStream(
jsc,
String.class,
String.class,
StringDecoder.class,
StringDecoder.class,
kafkaParameters,
topics);

JavaDStream<String> words = lines.flatMap(
new FlatMapFunction<Tuple2<String,String>, String>() {
private static final long serialVersionUID = 1L;

public Iterable<String> call(Tuple2<String,String> tuple)
throws Exception {
return Arrays.asList(tuple._2.split("\t"));
}
});

JavaPairDStream<String, Integer> pairs = words.mapToPair(
new PairFunction<String, String, Integer>() {
private static final long serialVersionUID = 1L;

public Tuple2<String, Integer> call(String word) throws Exception {
return new Tuple2<String, Integer>(word, 1);
}
});

JavaPairDStream<String, Integer> wordsCount = pairs.reduceByKey(
new Function2<Integer, Integer, Integer>() {
//对相同的Key,进行Value的累计(包括Local和Reducer级别同时Reduce)
private static final long serialVersionUID = 1L;

public Integer call(Integer v1, Integer v2) throws Exception {
return v1 + v2;
}
},3);

wordsCount.print();
jsc.start();
jsc.awaitTermination();
jsc.close();
}
}

Web 监控页面

由于在本地运行,输入:localhost:4040,即可查看 SparkStreaming 在kafka集群中的运行状态

相关配置

预写日志:

用于优化 receiver 模式中,Driver进程挂掉,找不到数据的问题。

spark.streaming.receiver.writeAheadLog.enable	#默认false 没有开启

blockInterval:

spark.streaming.blockInterval	#默认200ms

反压机制:

用于解决由于job执行时间大于batchInterval,接收数据内存级别为仅内存时。引起的数据堆积问题

sparkStreaming在1.5版本之后引入反压机制(back-pressure),通过动态控制数据接收速率来适配集群数据处理能力!

spark.streaming.backpressure.enabled 设置为true   #默认false

数据接收速率:

sparkStreaming在1.5版本之前通过控制接收数据的速率来解决数据堆积问题。

设置静态配置参数:

# Receiver模式
spark.streaming.receiver.maxRate #默认没有设置
# Direct模式
spark.streaming.kafka.maxRatePerPartition #默认没有设置

如何优雅的关闭SparkStreaming

spark.streaming.stopGracefullyOnShutdown 设置为true	#默认false
kill -15 进程号