Git Product home page Git Product logo

will-che / flink-recommandsystem-demo Goto Github PK

View Code? Open in Web Editor NEW
4.2K 142.0 1.4K 4.58 MB

:helicopter::rocket:基于Flink实现的商品实时推荐系统。flink统计商品热度,放入redis缓存,分析日志信息,将画像标签和实时记录放入Hbase。在用户发起推荐请求后,根据用户画像重排序热度榜,并结合协同过滤和标签两个推荐模块为新生成的榜单的每一个产品添加关联产品,最后返回新的用户列表。

Java 99.35% HTML 0.61% Shell 0.04%
flink recommander-system recommand flink-kafka flink-examples flink-hbase flink-redis

flink-recommandsystem-demo's Introduction

商品实时推荐系统

1. 系统架构 v2.0

  • 1.1 系统架构图

  • 1.2模块说明

  • a.在日志数据模块(flink-2-hbase)中,又主要分为6个Flink任务:

    • 用户-产品浏览历史 -> 实现基于协同过滤的推荐逻辑

      通过Flink去记录用户浏览过这个类目下的哪些产品,为后面的基于Item的协同过滤做准备 实时的记录用户的评分到Hbase中,为后续离线处理做准备.

      数据存储在Hbase的p_history表

    • 用户-兴趣 -> 实现基于上下文的推荐逻辑

      根据用户对同一个产品的操作计算兴趣度,计算规则通过操作间隔时间(如购物 - 浏览 < 100s)则判定为一次兴趣事件 通过Flink的ValueState实现,如果用户的操作Action=3(收藏),则清除这个产品的state,如果超过100s没有出现Action=3的事件,也会清除这个state

      数据存储在Hbase的u_interest表

    • 用户画像计算 -> 实现基于标签的推荐逻辑

      v1.0按照三个维度去计算用户画像,分别是用户的颜色兴趣,用户的产地兴趣,和用户的风格兴趣.根据日志不断的修改用户画像的数据,记录在Hbase中.

      数据存储在Hbase的user表

    • 产品画像记录 -> 实现基于标签的推荐逻辑

      用两个维度记录产品画像,一个是喜爱该产品的年龄段,另一个是性别

      数据存储在Hbase的prod表

    • 事实热度榜 -> 实现基于热度的推荐逻辑

      通过Flink时间窗口机制,统计当前时间的实时热度,并将数据缓存在Redis中.

      通过Flink的窗口机制计算实时热度,使用ListState保存一次热度榜

      数据存储在redis中,按照时间戳存储list

    • 日志导入

      从Kafka接收的数据直接导入进Hbase事实表,保存完整的日志log,日志中包含了用户Id,用户操作的产品id,操作时间,行为(如购买,点击,推荐等).

      数据按时间窗口统计数据大屏需要的数据,返回前段展示

      数据存储在Hbase的con表

  • b. web模块

    • 前台用户界面

      该页面返回给用户推荐的产品list

    • 后台监控页面

      该页面返回给管理员指标监控

2.推荐引擎逻辑说明

  • 2.1 基于热度的推荐逻辑

    现阶段推荐逻辑图

​ 根据用户特征,重新排序热度榜,之后根据两种推荐算法计算得到的产品相关度评分,为每个热度榜中的产品推荐几个关联的产品

  • 2.2 基于产品画像的产品相似度计算方法

    基于产品画像的推荐逻辑依赖于产品画像和热度榜两个维度,产品画像有三个特征,包含color/country/style三个角度,通过计算用户对该类目产品的评分来过滤热度榜上的产品

    在已经有产品画像的基础上,计算item与item之间的关联系,通过余弦相似度来计算两两之间的评分,最后在已有物品选中的情况下推荐关联性更高的产品.

相似度 A B C
A 1 0.7 0.2
B 0.7 1 0.6
C 0.2 0.6 1
  • 2.3 基于协同过滤的产品相似度计算方法

    根据产品用户表(Hbase) 去计算公式得到相似度评分:

3. 前台推荐页面

​ 当前推荐结果分为3列,分别是热度榜推荐,协同过滤推荐和产品画像推荐

4. 后台数据大屏

在后台上显示推荐系统的实时数据,数据来自其他Flink计算模块的结果.目前包含热度榜和1小时日志接入量两个指标. 真实数据位置在resource/database.sql

5. 部署说明

以下的部署均使用Docker,对于搭建一套复杂的系统,使用docker来部署各种服务中间件再合适不过了。这里有一套简单的Docker入门系列

详细的部署说明已经写了一篇文章来说明了,按照流程即可运行项目,无需自己搭建任何组件。 文章地址
Hbase部署说明->使用Docker搭建伪分布式Hbase(外置Zookeeper)
Kafka部署说明->使用Docker部署Kafka时的网络应该如何配置

6. Q & A

  1. 是否一定要使用Docker来部署环境?
    不是的,项目刚开始写的时候都是通过独立的组件来实现功能的。后来为了让大家能够迅速的体验项目,省略搭建过程 才配置了多个Docker环境。所以如果有自己有Kafka 或者 Hbase 的搭建经验,更推荐自搭建的形式,这样更容易解决问题。

  2. 部署Docker带来的一系列问题
    包括端口号/连接异常等等问题,多数原因是不同的服务器环境带来的,建议首先检查自己的防火墙等基础设施配置是否能够支持各个组件的连接。

flink-recommandsystem-demo's People

Contributors

vector4wang avatar water8394 avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

flink-recommandsystem-demo's Issues

你好

你好,我想问下我打包那一步出现错误了,没法打包,这个怎么解决啊

请教个问题 关于部署

将上述部署的几个服务的ip和端口号分别配置在flink-2-hbase和web服务中,然后首先在flink-2-hbase中的根目录执行mvn clean install目的是将其打包并放置在本地仓库中,然后再分别启动task目录下的task,直接在idea中右键启动就行了;接着把SchedulerJob启动起来,定时的去结算协同过滤和用户画像所需要的分数;

  • 请问, 这个task 是哪个目录, 顶级目录没找到。
    mvn clean install 可以执行么, 为什么有些错误 谢谢

请问1.state是否应该更新状态,2.saveToHBase是否应该放在else里?

public String map(LogEntity logEntity) throws Exception {
        Action actionLastTime = state.value();
        Action actionThisTime = new Action(logEntity.getAction(), logEntity.getTime().toString());
        int times = 1;
        // 如果用户没有操作 则为state创建值
        if (actionLastTime == null) {

            state.update(actionThisTime);
            //actionLastTime = actionThisTime

            saveToHBase(logEntity, 1);
        }else{
            times = getTimesByRule(actionLastTime, actionThisTime);

            saveToHBase(logEntity, times);
        }
        //saveToHBase(logEntity, times);

        if (actionThisTime.getType().equals("3")){
            state.clear();
        }
        return null;
    }

前台推荐页面 和后台数据展示 是一个网址么 还是有别的

你好, 不好意思没找到别的渠道就开个issue吧。
部署之后, 页面默认 xxx:8082
然后打开之后没有数据, 左边热榜, 右边是那个日志图。 没有看到可以点的地方。
请教是几个页面么。
我看controller 也没有暴露别的, 是不是理解不对。
image

Task执行异常,可能是Kafka的配置问题?

困扰了我两天,实在找不到解决方法,上来提一个issue。
出现的问题是:将kafka启动并执行后,在idea内启动LogTask后,等待一段时间Kafka首先爆出空指针异常:

2020-03-18 23:08:47 [WARN ](KafkaFetcher                  ) [TxId :  , SpanId : ] [ET:,AN:,SN:,CN:,CI:] Error while closing Kafka consumer
java.lang.NullPointerException at org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:282)

随后日志报出:

2020-03-18 23:08:47 [INFO ](ExecutionGraph                ) [TxId :  , SpanId : ] [ET:,AN:,SN:,CN:,CI:] Source: Custom Source -> Map (3/4) (63c794e9466b43ba2fb7c09f3114844b) switched from RUNNING to FAILED.
org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms expired before the position for partition con-0 could be determined

Flink最终报出JobManager is shutting down异常:

org.apache.flink.util.FlinkException: JobManager is shutting down.
	at org.apache.flink.runtime.jobmaster.JobMaster.postStop(JobMaster.java:367)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.postStop(AkkaRpcActor.java:105)
	at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.postStop(FencedAkkaRpcActor.java:40)
	at akka.actor.Actor$class.aroundPostStop(Actor.scala:515)
	at akka.actor.UntypedActor.aroundPostStop(UntypedActor.scala:95)
	at akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:210)
	at akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172)
	at akka.actor.ActorCell.terminate(ActorCell.scala:374)
	at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:467)
	at akka.actor.ActorCell.systemInvoke(ActorCell.scala:483)
	at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:282)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:260)
	at akka.dispatch.Mailbox.run(Mailbox.scala:224)
	at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

运行环境:系统是Linux,在本机上搭建并部署的Docker,在本机的IDEA中运行Task程序。
config.properties和docker-compose.yml中kafka的OUTSIDE未改动,修改后的docker-compose.yml文件如下:

version: '2.1'
services:
  zookeeper:
    container_name: zk
    image: wurstmeister/zookeeper:3.4.6
    ports:
      - "2181:2181"

  kafka:
    container_name: kafka
    image: wurstmeister/kafka:2.12-2.2.1
    ports:
      - "9092:9092"
    depends_on:
      - zookeeper
    expose:
    - "9093"
    environment:
      KAFKA_ADVERTISED_LISTENERS: INSIDE://kafka:9093,OUTSIDE://192.168.56.103:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
      KAFKA_LISTENERS: INSIDE://0.0.0.0:9093,OUTSIDE://0.0.0.0:9092
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
      KAFKA_CREATE_TOPICS: "flink:1:1"
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock
      - /home/baith/project/idea/idea/git/flink-recommand-system/resources/docker/docker-compose/generator.sh:/opt/generator.sh

  hbase:
    container_name: hbase
    hostname: localhost
    image: xinze0803/hbase
    links:
      - zookeeper
    depends_on:
      - zookeeper 
    logging:
      driver: "none"
    ports:
      - "16010:16010"
      - "8080:8080"
      - "9090:9090"
      - "16000:16000"
      - "16020:16020"
      - "16030:16030"
    command: ["/wait-for-it.sh", "zookeeper:2181", "-t", "10", "--", "/usr/bin/supervisord"]
    volumes:
      - /home/baith/project/idea/idea/git/flink-recommand-system/resources/docker/docker-compose/hbase_ini.sql:/opt/hbase_ini.sql

  mysql:
    image: mysql
    command: --default-authentication-plugin=mysql_native_password
    restart: always
    container_name: mysql
    environment:
      - MYSQL_ROOT_PASSWORD=123456
      - MYSQL_ROOT_HOST=%
    volumes:
      - /home/baith/project/idea/idea/git/flink-recommand-system/resources/docker/docker-compose/contact.sql:/opt/contact.sql
      
  redis:
    image: redis
    container_name: redis
    ports:
      - "6379:6379"
    entrypoint: redis-server --appendonly yes
    restart: always

  jobmanager:
    image: ${FLINK_DOCKER_IMAGE_NAME:-flink}
    expose:
      - "6123"
    ports:
      - "8081:8081"
    command: jobmanager
    environment:
      - JOB_MANAGER_RPC_ADDRESS=jobmanager
  taskmanager:
    image: ${FLINK_DOCKER_IMAGE_NAME:-flink}
    expose:
      - "6121"
      - "6122"
    depends_on:
      - jobmanager
    command: taskmanager
    links:
      - "jobmanager:jobmanager"
    environment:
      - JOB_MANAGER_RPC_ADDRESS=jobmanager

交流群

是否可以创建一个微信交流群,交流相关技术

用户对同一个产品的操作计算兴趣度

public class UserInterestTask {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        Properties properties = Property.getKafkaProperties("interest");
        DataStreamSource<String> dataStream = env.addSource(new FlinkKafkaConsumer<String>("con", new SimpleStringSchema(), properties));
        dataStream.map(new GetLogFunction())
                //同一个用户对同一个产品的兴趣度,我觉得这里的key分区应该要加上"productId",-->.keyBy("userId", "productId")
                // 不然原文.keyBy("userId")的意思我理解是,用一个用户只要对不同产品的操作间隔时间(如购物 - 浏览 < 100s)则判定为一次兴趣事件,似乎不太对
                .keyBy("userId", "productId")
                .map(new UserHistoryWithInterestMapFunction());

        env.execute("User Product History");
    }
}

你这个还是离线推荐把

你的推荐结果都是离线跑出来的结果 ,如果是实时推荐 那么协同过滤是不是应该在flink端实时做处理呢

这些任务要分别打包运行吗

base那个项目里面包含6个数据清洗任务,还有一个协同过滤的定时任务这些都要分别打包在flink上面运行吗,还是分别打包到docker运行

发现逻辑bug

UserHistoryWithInterestMapFunction 类里面就没有看到state.update操作,楼主跑起来真的没问题吗

跟踪学习中。。。

最近我也在看flink,想实现一个简单的实时推荐系统,找了很久找到这了,准备在本地把楼主的项目搭建起来!!!

看好楼主,还希望能有搭建步骤~~~

连不上 hbase

docker-compose 起来服务之后,test 部分连不上 hbase, 停在这里:

2020-09-11 14:34:27 INFO ZooKeeper:100 - Client environment:java.library.path=/Users/hc360/Library/Java/Extensions:/Library/Java/Extensions:/Network/Library/Java/Extensions:/System/Library/Java/Extensions:/usr/lib/java:.
2020-09-11 14:34:27 INFO ZooKeeper:100 - Client environment:java.io.tmpdir=/var/folders/2x/0v69mg453f5f1vssyxtptpf40000gn/T/
2020-09-11 14:34:27 INFO ZooKeeper:100 - Client environment:java.compiler=
2020-09-11 14:34:27 INFO ZooKeeper:100 - Client environment:os.name=Mac OS X
2020-09-11 14:34:27 INFO ZooKeeper:100 - Client environment:os.arch=x86_64
2020-09-11 14:34:27 INFO ZooKeeper:100 - Client environment:os.version=10.15.6
2020-09-11 14:34:27 INFO ZooKeeper:100 - Client environment:user.name=hc360
2020-09-11 14:34:27 INFO ZooKeeper:100 - Client environment:user.home=/Users/hc360
2020-09-11 14:34:27 INFO ZooKeeper:100 - Client environment:user.dir=/Users/hc360/flink-recommandSystem-demo/flink-2-hbase
2020-09-11 14:34:27 INFO ZooKeeper:438 - Initiating client connection, connectString=localhost:2181 sessionTimeout=90000 watcher=hconnection-0x65b104b90x0, quorum=localhost:2181, baseZNode=/hbase
2020-09-11 14:34:27 INFO ClientCnxn:975 - Opening socket connection to server localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error)
2020-09-11 14:34:27 INFO ClientCnxn:852 - Socket connection established to localhost/127.0.0.1:2181, initiating session
2020-09-11 14:34:27 INFO ClientCnxn:1235 - Session establishment complete on server localhost/127.0.0.1:2181, sessionid = 0x1747aab89a00020, negotiated timeout = 40000

sql 文件错误

contact 记录是从3 开始的, 而product 记录是从1开始的。

-- ----------------------------
-- Records of contact
-- ----------------------------
INSERT INTO `contact` VALUES (3, 'http://img01.02d.com/Public/Upload/image/20190713/5d29b8512a04f.jpg', 'BarieCat“柚屿”系列', '舒适的非离子材质融合充满复古韵味的混血花纹;虚化的深色边缘与瞳孔的轮廓完美融合;搭配低明度高显色的基色将酷感混血进行到底。', 134.00, 'Bariecat');
INSERT INTO `contact` VALUES (4, 'http://img01.02d.com/Public/Upload/image/20190713/5d297c0fa4f48.jpg', '溪悦creekeye呦呦灰_副本', '进口MPC高保湿型非离子,轻薄无感!', 89.00, '溪悦Creek eye');

这样的话, 当开始推荐的时候, 此方程会throw index exception

	private List<ProductDto> fillProductDto(List<String> list, List<ContactEntity> contactEntities,
			List<ProductEntity> productEntities, int topSize) {
		List<ProductDto> ret = new ArrayList<>();
		for (int i = 0; i < topSize; i++) {
			String topId = list.get(i);
			ProductDto dto = new ProductDto();
			dto.setScore(TOP_SIZE + 1 - i);
			for (int j = 0; j < topSize; j++) {
				if (topId.equals(String.valueOf(contactEntities.get(j).getId()))) {
					dto.setContact(contactEntities.get(j));
				}
				if (topId.equals(String.valueOf(productEntities.get(j).getProductId()))) {
					dto.setProduct(productEntities.get(j));
				}
			}
			ret.add(dto);
		}
		return ret;
	}

hbase访问失败

@CheckChe0803 @vector4wang
我用代码中HbaseClient去连接hbase数据库总是不成功,一直在等待。发现docker-compose文件中描述了hbase依赖外部zk容器(3.4.6),但是我在hbase页面总看到的zk client版本是3.4.10,两者是否有影响啊?
` zookeeper:
container_name: zk
image: wurstmeister/zookeeper:3.4.6
ports:
- "2181:2181"

hbase:
container_name: hbase
hostname: docker-linux
image: xinze0803/hbase
links:
- zookeeper
depends_on:
- zookeeper
logging:
driver: "none"
ports:
- "16010:16010"
- "8080:8080"
- "9090:9090"
- "16000:16000"
- "16020:16020"
- "16030:16030"
command: ["/wait-for-it.sh", "zookeeper:2181", "-t", "10", "--", "/usr/bin/supervisord"]
volumes:
- /opt/recommend/hbase_ini.sql:/opt/hbase_ini.sql`

谢谢

docker部署问题

docker-compose.yml里的hbase里,镜像拉取的时候,出现ERROR: unauthorized: authentication required错误,怎么处理呢

关于kafka队列的问题

你好~ 你的项目非常棒,最近照着你的文档基本搭好了环境~不过在看代码时有个疑问。

flink-2-hbase里面有6个task,分别con、history这两个topic里面取数据,但是在web项目里面,我只看到了在点击收藏、加车等按钮时,有往con这个topic发数据,搜遍整个工程,并没有发现往history这个topic发数据的代码。

## UserHistoryTask.java
DataStreamSource<String> dataStream = env.addSource(new FlinkKafkaConsumer<String>("history", new SimpleStringSchema(), properties));

请问下history这个topic的数据是从哪里来的?

shell脚本的topic是否设置错误

6个定时任务都是针对“con”主题进行消费,找遍了工程所有地方也没有找到生产"con"主题消息的地方,唯一的初始化kafka数据的地方就是generator.sh,发现你写的生产msg到"log"主题,请问是否是书写错误,正确的主题应该是"con"

UserHistoryWithInterestMapFunction 中state 更新

`public String map(LogEntity logEntity) throws Exception {
Action actionLastTime = state.value();
Action actionThisTime = new Action(logEntity.getAction(), logEntity.getTime().toString());
int times = 1;
// 如果用户没有操作 则为state创建值
if (actionLastTime == null) {
actionLastTime = actionThisTime;
saveToHBase(logEntity, 1);
}else{
times = getTimesByRule(actionLastTime, actionThisTime);
}
saveToHBase(logEntity, times);

    // 如果用户的操作为3(购物),则清除这个key的state
    if (actionThisTime.getType().equals("3")){
        state.clear();
    }
    return null;

}`

是否没有更新 state, state.update(actionThisTime);

部署问题

image
在idea中在执行打包时报错,在Hbaseclient连接zookeeper时,超时,无法连接,已将卡在这1天了,网上各种搜无果,希望master可以帮一下忙

几个内网ip地址的配置

您好,
非常感谢项目的分享。 有几个问题想要咨询一下, 在stackover flow上找了许久也没有找到解决方案。

flink-2-hbase/src/main/resources/config.properties里有一个ip地址192.168.1.7

kafka.bootstrap.servers=192.168.1.7:9092
kafka.zookeeper.connect=192.168.1.7:2181

但是在docker-compose.yml里kafka的outside接口是192.168.56.103

KAFKA_ADVERTISED_LISTENERS: INSIDE://kafka:9093,OUTSIDE://192.168.56.103:9092

当我尝试在一台机器部署所有平台时, 我将kafka outside改成了localhost:9092

  kafka:
    container_name: kafka
    image: wurstmeister/kafka:2.12-2.2.1
    ports:
      - "9092:9092"
    depends_on:
      - zookeeper
    expose:
    - "9093"
    environment:
      KAFKA_ADVERTISED_LISTENERS: INSIDE://kafka:9093,OUTSIDE://localhost:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
      KAFKA_LISTENERS: INSIDE://0.0.0.0:9093,OUTSIDE://0.0.0.0:9092
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
      KAFKA_CREATE_TOPICS: "flink:1:1"

然后新建了一个flink2kafka的消费java程序:

public class KalfkaApp {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9092");
        properties.setProperty("zookeeper.connect", "localhost:2181");
        properties.setProperty("group.id", "test");

        System.out.println(properties);
        DataStreamSource<String> dataStream = env.addSource(new FlinkKafkaConsumer<String>("log", new SimpleStringSchema(), properties));
//        dataStream.map(new LogMapFunction());
        dataStream.rebalance().map(new MapFunction<String, String>() {
            private static final long serialVersionUID = -6867736771747690202L;

            @Override
            public String map(String value) throws Exception {
                return "Kafka and Flink1 says: " + value;
            }
        }).print();

        env.execute("stout");
    }
}

这个java程序在IDE里面可以直接运行,运行效果如下:

4> Kafka and Flink1 says: 315,315,1583225504,3
9> Kafka and Flink1 says: 302,302,1583225491,2
10> Kafka and Flink1 says: 304,304,1583225493,1
1> Kafka and Flink1 says: 309,309,1583225498,3
7> Kafka and Flink1 says: 298,298,1583225487,1
12> Kafka and Flink1 says: 308,308,1583225497,2
2> Kafka and Flink1 says: 311,311,1583225500,2
3> Kafka and Flink1 says: 313,313,1583225502,1
6> Kafka and Flink1 says: 319,319,1583225508,1
7> Kafka and Flink1 says: 321,321,1583225510,3
10> Kafka and Flink1 says: 326,326,1583225515,2
9> Kafka and Flink1 says: 324,324,1583225513,3
11> Kafka and Flink1 says: 328,328,1583225517,1
12> Kafka and Flink1 says: 330,330,1583225519,3

Process finished with exit code -1

但是当我打包成jar通过您的教程docker cp进flink后, 报以下错误:

root@db0dba1666b2:/opt/flink# flink run /opt/flink-2-hbase-1.0-SNAPSHOT.jar
{bootstrap.servers=localhost:9092, group.id=test, zookeeper.connect=localhost:2181}
Job has been submitted with JobID 144b101ff9964cf7ba109cfd95e7f005

------------------------------------------------------------
 The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 144b101ff9964cf7ba109cfd95e7f005)
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
        at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
        at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
        at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
        at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
        at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
        at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
        at org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
        at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
Caused by: java.util.concurrent.ExecutionException: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 144b101ff9964cf7ba109cfd95e7f005)
        at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
        at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
        at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:83)
        at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1620)
        at com.demo.task.KalfkaApp.main(KalfkaApp.java:38)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)
        ... 8 more
Caused by: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 144b101ff9964cf7ba109cfd95e7f005)
        at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:112)
        at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
        at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
        at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
        at org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$21(RestClusterClient.java:565)
        at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
        at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
        at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
        at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:291)
        at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
        at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
        at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:575)
        at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:943)
        at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
        at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147)
        at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:110)
        ... 19 more
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
        at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:110)
        at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:76)
        at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192)
        at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:186)
        at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:180)
        at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:484)
        at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:380)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:279)
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:194)
        at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
        at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
        at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
        at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
        at akka.actor.Actor.aroundReceive(Actor.scala:517)
        at akka.actor.Actor.aroundReceive$(Actor.scala:515)
        at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
        at akka.actor.ActorCell.invoke(ActorCell.scala:561)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
        at akka.dispatch.Mailbox.run(Mailbox.scala:225)
        at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
        at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata

网上查了一下这个错误, 我感觉是不是flink没法连到zookeeper所以取不到元数据?网上说KAFKA_LISTENER_SECURITY_PROTOCOL_MAP不能是SSL,但是因为楼主设置本身就是plaintext所以再次陷入僵局。。。

谢谢!

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.