Git Product home page Git Product logo

water8394 / flink-recommandsystem-demo Goto Github PK

View Code? Open in Web Editor NEW
4.2K 4.2K 1.5K 4.58 MB

:helicopter::rocket:基于Flink实现的商品实时推荐系统。flink统计商品热度,放入redis缓存,分析日志信息,将画像标签和实时记录放入Hbase。在用户发起推荐请求后,根据用户画像重排序热度榜,并结合协同过滤和标签两个推荐模块为新生成的榜单的每一个产品添加关联产品,最后返回新的用户列表。

Java 99.35% HTML 0.61% Shell 0.04%
flink flink-examples flink-hbase flink-kafka flink-redis recommand recommander-system

flink-recommandsystem-demo's People

Contributors

vector4wang avatar water8394 avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

flink-recommandsystem-demo's Issues

你好

你好,我想问下我打包那一步出现错误了,没法打包,这个怎么解决啊

shell脚本的topic是否设置错误

6个定时任务都是针对“con”主题进行消费,找遍了工程所有地方也没有找到生产"con"主题消息的地方,唯一的初始化kafka数据的地方就是generator.sh,发现你写的生产msg到"log"主题,请问是否是书写错误,正确的主题应该是"con"

sql 文件错误

contact 记录是从3 开始的, 而product 记录是从1开始的。

-- ----------------------------
-- Records of contact
-- ----------------------------
INSERT INTO `contact` VALUES (3, 'http://img01.02d.com/Public/Upload/image/20190713/5d29b8512a04f.jpg', 'BarieCat“柚屿”系列', '舒适的非离子材质融合充满复古韵味的混血花纹;虚化的深色边缘与瞳孔的轮廓完美融合;搭配低明度高显色的基色将酷感混血进行到底。', 134.00, 'Bariecat');
INSERT INTO `contact` VALUES (4, 'http://img01.02d.com/Public/Upload/image/20190713/5d297c0fa4f48.jpg', '溪悦creekeye呦呦灰_副本', '进口MPC高保湿型非离子,轻薄无感!', 89.00, '溪悦Creek eye');

这样的话, 当开始推荐的时候, 此方程会throw index exception

	private List<ProductDto> fillProductDto(List<String> list, List<ContactEntity> contactEntities,
			List<ProductEntity> productEntities, int topSize) {
		List<ProductDto> ret = new ArrayList<>();
		for (int i = 0; i < topSize; i++) {
			String topId = list.get(i);
			ProductDto dto = new ProductDto();
			dto.setScore(TOP_SIZE + 1 - i);
			for (int j = 0; j < topSize; j++) {
				if (topId.equals(String.valueOf(contactEntities.get(j).getId()))) {
					dto.setContact(contactEntities.get(j));
				}
				if (topId.equals(String.valueOf(productEntities.get(j).getProductId()))) {
					dto.setProduct(productEntities.get(j));
				}
			}
			ret.add(dto);
		}
		return ret;
	}

关于kafka队列的问题

你好~ 你的项目非常棒,最近照着你的文档基本搭好了环境~不过在看代码时有个疑问。

flink-2-hbase里面有6个task,分别con、history这两个topic里面取数据,但是在web项目里面,我只看到了在点击收藏、加车等按钮时,有往con这个topic发数据,搜遍整个工程,并没有发现往history这个topic发数据的代码。

## UserHistoryTask.java
DataStreamSource<String> dataStream = env.addSource(new FlinkKafkaConsumer<String>("history", new SimpleStringSchema(), properties));

请问下history这个topic的数据是从哪里来的?

这些任务要分别打包运行吗

base那个项目里面包含6个数据清洗任务,还有一个协同过滤的定时任务这些都要分别打包在flink上面运行吗,还是分别打包到docker运行

docker部署问题

docker-compose.yml里的hbase里,镜像拉取的时候,出现ERROR: unauthorized: authentication required错误,怎么处理呢

你这个还是离线推荐把

你的推荐结果都是离线跑出来的结果 ,如果是实时推荐 那么协同过滤是不是应该在flink端实时做处理呢

请问1.state是否应该更新状态,2.saveToHBase是否应该放在else里?

public String map(LogEntity logEntity) throws Exception {
        Action actionLastTime = state.value();
        Action actionThisTime = new Action(logEntity.getAction(), logEntity.getTime().toString());
        int times = 1;
        // 如果用户没有操作 则为state创建值
        if (actionLastTime == null) {

            state.update(actionThisTime);
            //actionLastTime = actionThisTime

            saveToHBase(logEntity, 1);
        }else{
            times = getTimesByRule(actionLastTime, actionThisTime);

            saveToHBase(logEntity, times);
        }
        //saveToHBase(logEntity, times);

        if (actionThisTime.getType().equals("3")){
            state.clear();
        }
        return null;
    }

发现逻辑bug

UserHistoryWithInterestMapFunction 类里面就没有看到state.update操作,楼主跑起来真的没问题吗

几个内网ip地址的配置

您好,
非常感谢项目的分享。 有几个问题想要咨询一下, 在stackover flow上找了许久也没有找到解决方案。

flink-2-hbase/src/main/resources/config.properties里有一个ip地址192.168.1.7

kafka.bootstrap.servers=192.168.1.7:9092
kafka.zookeeper.connect=192.168.1.7:2181

但是在docker-compose.yml里kafka的outside接口是192.168.56.103

KAFKA_ADVERTISED_LISTENERS: INSIDE://kafka:9093,OUTSIDE://192.168.56.103:9092

当我尝试在一台机器部署所有平台时, 我将kafka outside改成了localhost:9092

  kafka:
    container_name: kafka
    image: wurstmeister/kafka:2.12-2.2.1
    ports:
      - "9092:9092"
    depends_on:
      - zookeeper
    expose:
    - "9093"
    environment:
      KAFKA_ADVERTISED_LISTENERS: INSIDE://kafka:9093,OUTSIDE://localhost:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
      KAFKA_LISTENERS: INSIDE://0.0.0.0:9093,OUTSIDE://0.0.0.0:9092
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
      KAFKA_CREATE_TOPICS: "flink:1:1"

然后新建了一个flink2kafka的消费java程序:

public class KalfkaApp {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9092");
        properties.setProperty("zookeeper.connect", "localhost:2181");
        properties.setProperty("group.id", "test");

        System.out.println(properties);
        DataStreamSource<String> dataStream = env.addSource(new FlinkKafkaConsumer<String>("log", new SimpleStringSchema(), properties));
//        dataStream.map(new LogMapFunction());
        dataStream.rebalance().map(new MapFunction<String, String>() {
            private static final long serialVersionUID = -6867736771747690202L;

            @Override
            public String map(String value) throws Exception {
                return "Kafka and Flink1 says: " + value;
            }
        }).print();

        env.execute("stout");
    }
}

这个java程序在IDE里面可以直接运行,运行效果如下:

4> Kafka and Flink1 says: 315,315,1583225504,3
9> Kafka and Flink1 says: 302,302,1583225491,2
10> Kafka and Flink1 says: 304,304,1583225493,1
1> Kafka and Flink1 says: 309,309,1583225498,3
7> Kafka and Flink1 says: 298,298,1583225487,1
12> Kafka and Flink1 says: 308,308,1583225497,2
2> Kafka and Flink1 says: 311,311,1583225500,2
3> Kafka and Flink1 says: 313,313,1583225502,1
6> Kafka and Flink1 says: 319,319,1583225508,1
7> Kafka and Flink1 says: 321,321,1583225510,3
10> Kafka and Flink1 says: 326,326,1583225515,2
9> Kafka and Flink1 says: 324,324,1583225513,3
11> Kafka and Flink1 says: 328,328,1583225517,1
12> Kafka and Flink1 says: 330,330,1583225519,3

Process finished with exit code -1

但是当我打包成jar通过您的教程docker cp进flink后, 报以下错误:

root@db0dba1666b2:/opt/flink# flink run /opt/flink-2-hbase-1.0-SNAPSHOT.jar
{bootstrap.servers=localhost:9092, group.id=test, zookeeper.connect=localhost:2181}
Job has been submitted with JobID 144b101ff9964cf7ba109cfd95e7f005

------------------------------------------------------------
 The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 144b101ff9964cf7ba109cfd95e7f005)
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
        at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
        at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
        at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
        at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
        at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
        at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
        at org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
        at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
Caused by: java.util.concurrent.ExecutionException: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 144b101ff9964cf7ba109cfd95e7f005)
        at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
        at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
        at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:83)
        at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1620)
        at com.demo.task.KalfkaApp.main(KalfkaApp.java:38)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)
        ... 8 more
Caused by: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 144b101ff9964cf7ba109cfd95e7f005)
        at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:112)
        at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
        at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
        at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
        at org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$21(RestClusterClient.java:565)
        at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
        at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
        at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
        at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:291)
        at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
        at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
        at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:575)
        at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:943)
        at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
        at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147)
        at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:110)
        ... 19 more
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
        at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:110)
        at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:76)
        at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192)
        at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:186)
        at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:180)
        at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:484)
        at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:380)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:279)
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:194)
        at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
        at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
        at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
        at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
        at akka.actor.Actor.aroundReceive(Actor.scala:517)
        at akka.actor.Actor.aroundReceive$(Actor.scala:515)
        at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
        at akka.actor.ActorCell.invoke(ActorCell.scala:561)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
        at akka.dispatch.Mailbox.run(Mailbox.scala:225)
        at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
        at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata

网上查了一下这个错误, 我感觉是不是flink没法连到zookeeper所以取不到元数据?网上说KAFKA_LISTENER_SECURITY_PROTOCOL_MAP不能是SSL,但是因为楼主设置本身就是plaintext所以再次陷入僵局。。。

谢谢!

hbase访问失败

@CheckChe0803 @vector4wang
我用代码中HbaseClient去连接hbase数据库总是不成功,一直在等待。发现docker-compose文件中描述了hbase依赖外部zk容器(3.4.6),但是我在hbase页面总看到的zk client版本是3.4.10,两者是否有影响啊?
` zookeeper:
container_name: zk
image: wurstmeister/zookeeper:3.4.6
ports:
- "2181:2181"

hbase:
container_name: hbase
hostname: docker-linux
image: xinze0803/hbase
links:
- zookeeper
depends_on:
- zookeeper
logging:
driver: "none"
ports:
- "16010:16010"
- "8080:8080"
- "9090:9090"
- "16000:16000"
- "16020:16020"
- "16030:16030"
command: ["/wait-for-it.sh", "zookeeper:2181", "-t", "10", "--", "/usr/bin/supervisord"]
volumes:
- /opt/recommend/hbase_ini.sql:/opt/hbase_ini.sql`

谢谢

用户对同一个产品的操作计算兴趣度

public class UserInterestTask {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        Properties properties = Property.getKafkaProperties("interest");
        DataStreamSource<String> dataStream = env.addSource(new FlinkKafkaConsumer<String>("con", new SimpleStringSchema(), properties));
        dataStream.map(new GetLogFunction())
                //同一个用户对同一个产品的兴趣度,我觉得这里的key分区应该要加上"productId",-->.keyBy("userId", "productId")
                // 不然原文.keyBy("userId")的意思我理解是,用一个用户只要对不同产品的操作间隔时间(如购物 - 浏览 < 100s)则判定为一次兴趣事件,似乎不太对
                .keyBy("userId", "productId")
                .map(new UserHistoryWithInterestMapFunction());

        env.execute("User Product History");
    }
}

UserHistoryWithInterestMapFunction 中state 更新

`public String map(LogEntity logEntity) throws Exception {
Action actionLastTime = state.value();
Action actionThisTime = new Action(logEntity.getAction(), logEntity.getTime().toString());
int times = 1;
// 如果用户没有操作 则为state创建值
if (actionLastTime == null) {
actionLastTime = actionThisTime;
saveToHBase(logEntity, 1);
}else{
times = getTimesByRule(actionLastTime, actionThisTime);
}
saveToHBase(logEntity, times);

    // 如果用户的操作为3(购物),则清除这个key的state
    if (actionThisTime.getType().equals("3")){
        state.clear();
    }
    return null;

}`

是否没有更新 state, state.update(actionThisTime);

Task执行异常,可能是Kafka的配置问题?

困扰了我两天,实在找不到解决方法,上来提一个issue。
出现的问题是:将kafka启动并执行后,在idea内启动LogTask后,等待一段时间Kafka首先爆出空指针异常:

2020-03-18 23:08:47 [WARN ](KafkaFetcher                  ) [TxId :  , SpanId : ] [ET:,AN:,SN:,CN:,CI:] Error while closing Kafka consumer
java.lang.NullPointerException at org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:282)

随后日志报出:

2020-03-18 23:08:47 [INFO ](ExecutionGraph                ) [TxId :  , SpanId : ] [ET:,AN:,SN:,CN:,CI:] Source: Custom Source -> Map (3/4) (63c794e9466b43ba2fb7c09f3114844b) switched from RUNNING to FAILED.
org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms expired before the position for partition con-0 could be determined

Flink最终报出JobManager is shutting down异常:

org.apache.flink.util.FlinkException: JobManager is shutting down.
	at org.apache.flink.runtime.jobmaster.JobMaster.postStop(JobMaster.java:367)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.postStop(AkkaRpcActor.java:105)
	at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.postStop(FencedAkkaRpcActor.java:40)
	at akka.actor.Actor$class.aroundPostStop(Actor.scala:515)
	at akka.actor.UntypedActor.aroundPostStop(UntypedActor.scala:95)
	at akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:210)
	at akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172)
	at akka.actor.ActorCell.terminate(ActorCell.scala:374)
	at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:467)
	at akka.actor.ActorCell.systemInvoke(ActorCell.scala:483)
	at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:282)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:260)
	at akka.dispatch.Mailbox.run(Mailbox.scala:224)
	at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

运行环境:系统是Linux,在本机上搭建并部署的Docker,在本机的IDEA中运行Task程序。
config.properties和docker-compose.yml中kafka的OUTSIDE未改动,修改后的docker-compose.yml文件如下:

version: '2.1'
services:
  zookeeper:
    container_name: zk
    image: wurstmeister/zookeeper:3.4.6
    ports:
      - "2181:2181"

  kafka:
    container_name: kafka
    image: wurstmeister/kafka:2.12-2.2.1
    ports:
      - "9092:9092"
    depends_on:
      - zookeeper
    expose:
    - "9093"
    environment:
      KAFKA_ADVERTISED_LISTENERS: INSIDE://kafka:9093,OUTSIDE://192.168.56.103:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
      KAFKA_LISTENERS: INSIDE://0.0.0.0:9093,OUTSIDE://0.0.0.0:9092
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
      KAFKA_CREATE_TOPICS: "flink:1:1"
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock
      - /home/baith/project/idea/idea/git/flink-recommand-system/resources/docker/docker-compose/generator.sh:/opt/generator.sh

  hbase:
    container_name: hbase
    hostname: localhost
    image: xinze0803/hbase
    links:
      - zookeeper
    depends_on:
      - zookeeper 
    logging:
      driver: "none"
    ports:
      - "16010:16010"
      - "8080:8080"
      - "9090:9090"
      - "16000:16000"
      - "16020:16020"
      - "16030:16030"
    command: ["/wait-for-it.sh", "zookeeper:2181", "-t", "10", "--", "/usr/bin/supervisord"]
    volumes:
      - /home/baith/project/idea/idea/git/flink-recommand-system/resources/docker/docker-compose/hbase_ini.sql:/opt/hbase_ini.sql

  mysql:
    image: mysql
    command: --default-authentication-plugin=mysql_native_password
    restart: always
    container_name: mysql
    environment:
      - MYSQL_ROOT_PASSWORD=123456
      - MYSQL_ROOT_HOST=%
    volumes:
      - /home/baith/project/idea/idea/git/flink-recommand-system/resources/docker/docker-compose/contact.sql:/opt/contact.sql
      
  redis:
    image: redis
    container_name: redis
    ports:
      - "6379:6379"
    entrypoint: redis-server --appendonly yes
    restart: always

  jobmanager:
    image: ${FLINK_DOCKER_IMAGE_NAME:-flink}
    expose:
      - "6123"
    ports:
      - "8081:8081"
    command: jobmanager
    environment:
      - JOB_MANAGER_RPC_ADDRESS=jobmanager
  taskmanager:
    image: ${FLINK_DOCKER_IMAGE_NAME:-flink}
    expose:
      - "6121"
      - "6122"
    depends_on:
      - jobmanager
    command: taskmanager
    links:
      - "jobmanager:jobmanager"
    environment:
      - JOB_MANAGER_RPC_ADDRESS=jobmanager

请教个问题 关于部署

将上述部署的几个服务的ip和端口号分别配置在flink-2-hbase和web服务中,然后首先在flink-2-hbase中的根目录执行mvn clean install目的是将其打包并放置在本地仓库中,然后再分别启动task目录下的task,直接在idea中右键启动就行了;接着把SchedulerJob启动起来,定时的去结算协同过滤和用户画像所需要的分数;

  • 请问, 这个task 是哪个目录, 顶级目录没找到。
    mvn clean install 可以执行么, 为什么有些错误 谢谢

部署问题

image
在idea中在执行打包时报错,在Hbaseclient连接zookeeper时,超时,无法连接,已将卡在这1天了,网上各种搜无果,希望master可以帮一下忙

连不上 hbase

docker-compose 起来服务之后,test 部分连不上 hbase, 停在这里:

2020-09-11 14:34:27 INFO ZooKeeper:100 - Client environment:java.library.path=/Users/hc360/Library/Java/Extensions:/Library/Java/Extensions:/Network/Library/Java/Extensions:/System/Library/Java/Extensions:/usr/lib/java:.
2020-09-11 14:34:27 INFO ZooKeeper:100 - Client environment:java.io.tmpdir=/var/folders/2x/0v69mg453f5f1vssyxtptpf40000gn/T/
2020-09-11 14:34:27 INFO ZooKeeper:100 - Client environment:java.compiler=
2020-09-11 14:34:27 INFO ZooKeeper:100 - Client environment:os.name=Mac OS X
2020-09-11 14:34:27 INFO ZooKeeper:100 - Client environment:os.arch=x86_64
2020-09-11 14:34:27 INFO ZooKeeper:100 - Client environment:os.version=10.15.6
2020-09-11 14:34:27 INFO ZooKeeper:100 - Client environment:user.name=hc360
2020-09-11 14:34:27 INFO ZooKeeper:100 - Client environment:user.home=/Users/hc360
2020-09-11 14:34:27 INFO ZooKeeper:100 - Client environment:user.dir=/Users/hc360/flink-recommandSystem-demo/flink-2-hbase
2020-09-11 14:34:27 INFO ZooKeeper:438 - Initiating client connection, connectString=localhost:2181 sessionTimeout=90000 watcher=hconnection-0x65b104b90x0, quorum=localhost:2181, baseZNode=/hbase
2020-09-11 14:34:27 INFO ClientCnxn:975 - Opening socket connection to server localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error)
2020-09-11 14:34:27 INFO ClientCnxn:852 - Socket connection established to localhost/127.0.0.1:2181, initiating session
2020-09-11 14:34:27 INFO ClientCnxn:1235 - Session establishment complete on server localhost/127.0.0.1:2181, sessionid = 0x1747aab89a00020, negotiated timeout = 40000

跟踪学习中。。。

最近我也在看flink,想实现一个简单的实时推荐系统,找了很久找到这了,准备在本地把楼主的项目搭建起来!!!

看好楼主,还希望能有搭建步骤~~~

交流群

是否可以创建一个微信交流群,交流相关技术

前台推荐页面 和后台数据展示 是一个网址么 还是有别的

你好, 不好意思没找到别的渠道就开个issue吧。
部署之后, 页面默认 xxx:8082
然后打开之后没有数据, 左边热榜, 右边是那个日志图。 没有看到可以点的地方。
请教是几个页面么。
我看controller 也没有暴露别的, 是不是理解不对。
image

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.